tentacle 0.7.5

Minimal implementation for a multiplexed p2p network framework.
use std::{borrow::Cow, sync::mpsc::channel, thread};
use tentacle::{
    ProtocolId, async_trait,
    builder::{MetaBuilder, ServiceBuilder},
    context::{ProtocolContext, ServiceContext},
    error::DialerErrorKind,
    multiaddr::Multiaddr,
    multiaddr::Protocol as MultiProtocol,
    secio::SecioKeyPair,
    service::{
        ProtocolHandle, ProtocolMeta, Service, ServiceControl, ServiceError, ServiceEvent,
        TargetProtocol,
    },
    traits::{ServiceHandle, ServiceProtocol},
};

pub fn create<F>(key_pair: SecioKeyPair, meta: ProtocolMeta, shandle: F) -> Service<F, SecioKeyPair>
where
    F: ServiceHandle + Unpin + 'static,
{
    ServiceBuilder::default()
        .insert_protocol(meta)
        .forever(true)
        .handshake_type(key_pair.into())
        .build(shandle)
}

struct PHandle;

#[async_trait]
impl ServiceProtocol for PHandle {
    async fn init(&mut self, _control: &mut ProtocolContext) {}
}

#[derive(Clone)]
struct EmptySHandle {
    sender: crossbeam_channel::Sender<usize>,
    error_count: usize,
}

#[async_trait]
impl ServiceHandle for EmptySHandle {
    async fn handle_error(&mut self, _env: &mut ServiceContext, error: ServiceError) {
        self.error_count += 1;

        match error {
            ServiceError::DialerError { error, .. } => match error {
                DialerErrorKind::PeerIdNotMatch => {}
                err => panic!(
                    "test fail, expected DialerErrorKind::PeerIdNotMatch, got {:?}",
                    err
                ),
            },
            _ => {
                panic!("test fail {:?}", error);
            }
        }

        if self.error_count > 8 {
            let _res = self.sender.try_send(self.error_count);
        }
    }

    async fn handle_event(&mut self, _control: &mut ServiceContext, event: ServiceEvent) {
        if let ServiceEvent::SessionOpen { .. } = event {
            let _res = self.sender.try_send(self.error_count);
        }
    }
}

fn create_shandle() -> (EmptySHandle, crossbeam_channel::Receiver<usize>) {
    let (sender, receiver) = crossbeam_channel::bounded(2);
    (
        EmptySHandle {
            sender,
            error_count: 0,
        },
        receiver,
    )
}

fn create_meta(id: ProtocolId) -> ProtocolMeta {
    MetaBuilder::new()
        .id(id)
        .service_handle(move || {
            if id == 0.into() {
                ProtocolHandle::None
            } else {
                let handle = Box::new(PHandle);
                ProtocolHandle::Callback(handle)
            }
        })
        .build()
}

fn test_peer_id(fail: bool) {
    let meta = create_meta(1.into());
    let key = SecioKeyPair::secp256k1_generated();
    let (addr_sender, addr_receiver) = channel::<Multiaddr>();
    let mut service = create(key.clone(), meta, ());

    thread::spawn(move || {
        let rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(async move {
            let listen_addr = service
                .listen("/ip4/127.0.0.1/tcp/0".parse().unwrap())
                .await
                .unwrap();

            addr_sender.send(listen_addr).unwrap();

            service.run().await
        });
    });

    let mut listen_addr = addr_receiver.recv().unwrap();

    let (shandle, error_receiver) = create_shandle();
    let meta = create_meta(1.into());
    let mut service = create(SecioKeyPair::secp256k1_generated(), meta, shandle);
    let control: ServiceControl = service.control().clone().into();
    thread::spawn(move || {
        let rt = tokio::runtime::Runtime::new().unwrap();
        rt.block_on(async move { service.run().await });
    });

    if fail {
        (1..11).for_each(|_| {
            let mut addr = listen_addr.clone();
            addr.push(MultiProtocol::P2P(Cow::Owned(
                SecioKeyPair::secp256k1_generated().peer_id().into_bytes(),
            )));
            control.dial(addr, TargetProtocol::All).unwrap();
        });
        assert_eq!(error_receiver.recv(), Ok(9));
    } else {
        listen_addr.push(MultiProtocol::P2P(Cow::Owned(key.peer_id().into_bytes())));
        control.dial(listen_addr, TargetProtocol::All).unwrap();
        assert_eq!(error_receiver.recv(), Ok(0));
    }
}

#[test]
fn test_fail() {
    test_peer_id(true)
}

#[test]
fn test_succeed() {
    test_peer_id(false)
}