tentacle 0.7.5

Minimal implementation for a multiplexed p2p network framework.
#![cfg(feature = "unstable")]

/// Implement simple.rs example using `ProtocolSpawn`.
use bytes::Bytes;
use env_logger;
use futures::StreamExt;
use log::info;
use std::{str, sync::Arc, time::Duration};
use tentacle::{
    ProtocolId, SubstreamReadPart, async_trait,
    builder::{MetaBuilder, ServiceBuilder},
    context::{ServiceContext, SessionContext},
    secio::SecioKeyPair,
    service::{
        ProtocolMeta, Service, ServiceAsyncControl, ServiceError, ServiceEvent, TargetProtocol,
        TargetSession,
    },
    traits::{ProtocolSpawn, ServiceHandle},
};

struct ProtocolStream;

impl ProtocolSpawn for ProtocolStream {
    fn spawn(
        &self,
        context: Arc<SessionContext>,
        control: &ServiceAsyncControl,
        mut read_part: SubstreamReadPart,
    ) {
        let control = control.clone();
        tokio::spawn(async move {
            info!(
                "{}, {:?}, {}, opened",
                context.id,
                context.ty,
                read_part.protocol_id()
            );
            if read_part.protocol_id() == 1.into() {
                let c = control.clone();
                let pid = read_part.protocol_id();
                let mut interval =
                    tokio::time::interval_at(tokio::time::Instant::now(), Duration::from_secs(5));
                tokio::spawn(async move {
                    loop {
                        interval.tick().await;
                        let _ = c
                            .filter_broadcast(
                                TargetSession::All,
                                pid,
                                Bytes::from("I am a interval message"),
                            )
                            .await;
                    }
                });
            }
            loop {
                if let Some(Ok(data)) = read_part.next().await {
                    info!(
                        "received from [{}]: proto [{}] data {:?}",
                        context.id,
                        read_part.protocol_id(),
                        str::from_utf8(data.as_ref()).unwrap(),
                    );
                    if context.ty.is_outbound() {
                        let pid = read_part.protocol_id();
                        let _ = control.send_message_to(context.id, pid, data).await;
                    }
                } else {
                    break;
                }
            }
            info!(
                "{}, {:?}, {}, closed",
                context.id,
                context.ty,
                read_part.protocol_id()
            );
        });
    }
}

fn create_meta(id: ProtocolId) -> ProtocolMeta {
    MetaBuilder::new()
        .id(id)
        .protocol_spawn(ProtocolStream)
        .build()
}

struct SHandle;

#[async_trait]
impl ServiceHandle for SHandle {
    async fn handle_error(&mut self, _context: &mut ServiceContext, error: ServiceError) {
        info!("service error: {:?}", error);
    }
    async fn handle_event(&mut self, context: &mut ServiceContext, event: ServiceEvent) {
        info!("service event: {:?}", event);
        if let ServiceEvent::SessionOpen { .. } = event {
            let delay_sender = context.control().clone();

            let _ = context
                .future_task(async move {
                    tokio::time::sleep_until(tokio::time::Instant::now() + Duration::from_secs(3))
                        .await;
                    let _ = delay_sender.filter_broadcast(
                        TargetSession::All,
                        0.into(),
                        Bytes::from("I am a delayed message"),
                    );
                })
                .await;
        }
    }
}

fn main() {
    env_logger::init();

    if std::env::args().nth(1) == Some("server".to_string()) {
        info!("Starting server ......");
        server();
    } else {
        info!("Starting client ......");
        client();
    }
}

fn create_server() -> Service<SHandle, SecioKeyPair> {
    ServiceBuilder::default()
        .insert_protocol(create_meta(0.into()))
        .insert_protocol(create_meta(1.into()))
        .handshake_type(SecioKeyPair::secp256k1_generated().into())
        .build(SHandle)
}

/// Proto 0 open success
/// Proto 1 open success
/// Proto 2 open failure
///
/// Because server only supports 0,1
fn create_client() -> Service<SHandle, SecioKeyPair> {
    ServiceBuilder::default()
        .insert_protocol(create_meta(0.into()))
        .insert_protocol(create_meta(1.into()))
        .insert_protocol(create_meta(2.into()))
        .handshake_type(SecioKeyPair::secp256k1_generated().into())
        .build(SHandle)
}

fn server() {
    let rt = tokio::runtime::Runtime::new().unwrap();

    rt.block_on(async {
        let mut service = create_server();
        service
            .listen("/ip4/127.0.0.1/tcp/1337".parse().unwrap())
            .await
            .unwrap();
        #[cfg(feature = "ws")]
        service
            .listen("/ip4/127.0.0.1/tcp/1338/ws".parse().unwrap())
            .await
            .unwrap();
        service.run().await
    });
}

fn client() {
    let rt = tokio::runtime::Runtime::new().unwrap();

    rt.block_on(async {
        let mut service = create_client();
        service
            .dial(
                "/ip4/127.0.0.1/tcp/1337".parse().unwrap(),
                TargetProtocol::All,
            )
            .await
            .unwrap();
        service.run().await
    });
}