ocapn-netlayer 0.1.4

OCapN transport layer interfaces and types
Documentation
use std::net::ToSocketAddrs;
use std::net::{Ipv4Addr, SocketAddr};

use anyhow::{Error, Result};
use async_trait::async_trait;
use tokio::net::{TcpListener, TcpStream};

use crate::connector::Connector;

pub struct Config {
    listen_addr: SocketAddr,
}

const DEFAULT_TCP_PORT: u16 = 9045;

impl Default for Config {
    fn default() -> Self {
        Config {
            listen_addr: SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0),
        }
    }
}

pub(crate) struct TcpConnector {
    listener: TcpListener,
}

impl TryFrom<Config> for TcpConnector {
    type Error = Error;

    fn try_from(config: Config) -> Result<Self, Self::Error> {
        let listener = std::net::TcpListener::bind(config.listen_addr)?;
        Ok(TcpConnector {
            listener: TcpListener::from_std(listener)?,
        })
    }
}

#[async_trait]
impl Connector for TcpConnector {
    async fn new_outgoing_connection(
        &mut self,
        locator: &crate::locator::Peer,
    ) -> Result<crate::connection::Connection> {
        if !locator.transport.eq("tcp") {
            return Err(Error::msg("not a tcp address"));
        }
        let port: u16 = match locator.hints.get("port") {
            Some(port_as_str) => port_as_str.parse()?,
            None => DEFAULT_TCP_PORT,
        };
        let addr = (locator.designator.as_str(), port)
            .to_socket_addrs()?
            .next()
            .ok_or_else(|| Error::msg("could not resolve designator host"))?;
        let stream = TcpStream::connect(addr).await?;
        Ok(stream.into())
    }

    async fn accept_incoming_connection(
        &mut self,
    ) -> Result<Option<crate::connection::Connection>> {
        let (stream, _) = self.listener.accept().await?;
        Ok(Some(stream.into()))
    }
}

#[cfg(test)]
mod tests {
    use std::{collections::HashMap, time::Duration};

    use ocapn_syrup::Value;
    use tokio::{join, select, spawn, time::interval};

    use crate::{connector::Connector, locator::Peer, Error, Result};

    use super::{Config, TcpConnector};

    struct PingPongAgent {
        connector: TcpConnector,
    }

    impl PingPongAgent {
        fn new() -> Self {
            let c = Config::default();
            PingPongAgent {
                connector: c.try_into().expect("create connector"),
            }
        }

        fn whoami(&self) -> Peer {
            let local_addr = self.connector.listener.local_addr().expect("local address");
            Peer {
                designator: local_addr.ip().to_string(),
                transport: "tcp".to_owned(),
                hints: HashMap::from([("port".to_owned(), local_addr.port().to_string())]),
            }
        }

        async fn run_client(&mut self, peer: Peer) -> Result<()> {
            let mut outbound = self
                .connector
                .new_outgoing_connection(&peer)
                .await
                .expect("connect to peer");
            let mut pongs_rcvd = 0u32;
            for _ in 0..10 {
                outbound.send(Value::String("ping".to_owned())).await?;
            }
            let mut timeout = interval(Duration::from_secs(5));
            timeout.tick().await;
            loop {
                select! {
                    res = outbound.recv() => {
                        let value = res?;
                        assert_eq!(value, Value::string("pong"));
                        pongs_rcvd+=1;
                        if pongs_rcvd >= 10 {
                            break;
                        }
                    }
                    _ = timeout.tick() => {
                        println!("client giving up");
                        return Err(Error::msg("timeout waiting for pings and pongs"));
                    }
                }
            }
            if pongs_rcvd >= 10 {
                outbound.close().await?;
                Ok(())
            } else {
                Err(Error::msg("missing some pings and/or pongs hmm"))
            }
        }

        async fn run_server(&mut self) -> Result<()> {
            let mut timeout = interval(Duration::from_secs(5));
            timeout.tick().await;
            loop {
                select! {
                    res = self.connector.accept_incoming_connection() => {
                        let mut conn = res?.ok_or(Error::msg("connector shut down"))?;
                        for _ in 0..10 {
                            let ping = conn.recv().await?;
                            assert_eq!(ping, Value::string("ping"));
                            conn.send(Value::string("pong")).await?;
                        }
                        return Ok(());
                    }
                    _ = timeout.tick() => {
                        println!("server giving up");
                        return Err(Error::msg("timeout waiting for pings and pongs"));
                    }
                }
            }
        }
    }

    #[tokio::test]
    async fn client_server_oneway() {
        let mut alice = PingPongAgent::new();
        let alice_locator = alice.whoami();
        let mut bob = PingPongAgent::new();
        let alice_task = spawn(async move { alice.run_server().await });
        let bob_task = spawn(async move { bob.run_client(alice_locator).await });
        let (alice_result, bob_result) = join!(alice_task, bob_task);
        alice_result.expect("alice run").expect("alice run ok");
        bob_result.expect("bob run").expect("bob run ok");
    }
}