1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
use bytes::{Bytes, BytesMut};
use futures::compat::{Future01CompatExt, Sink01CompatExt, Stream01CompatExt};
use futures::sink::Sink;
use futures::stream::{Stream, StreamExt};
use futures::task::SpawnExt;
use std::io::Error;
use std::net::SocketAddr;
use std::pin::Pin;
use tokio::codec::{Framed, LengthDelimitedCodec};
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::Stream as Stream01;

pub type Sinker = Pin<Box<dyn Sink<Bytes, SinkError = Error> + Send + 'static>>;
pub type Streamer = Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send + 'static>>;

pub async fn listen<Addr, Handler, F>(
    spawner: &mut impl SpawnExt,
    addr: Addr,
    handler: Handler,
) -> Result<(), Error>
where
    Addr: Into<SocketAddr>,
    Handler: Fn(Sinker, Streamer) -> F + Send + Clone + 'static,
    F: std::future::Future<Output = Result<(), Error>> + Send,
{
    let listener = TcpListener::bind(&addr.into())?;
    let mut incoming = listener.incoming().compat();

    while let Some(socket) = incoming.next().await {
        let frame = Framed::new(socket?, LengthDelimitedCodec::new());
        let (sink, stream) = frame.split();
        let sink = sink.sink_compat();
        let stream = stream.compat();
        let handler = handler.clone();
        let spawn = spawner.spawn(async move {
            if let Err(err) = handler(Box::pin(sink), Box::pin(stream)).await {
                log::info!("bad socket: {}", err);
            }
        });
        if let Err(err) = spawn {
            log::error!("error spawning: {:?}", err);
        }
    }
    Ok(())
}

pub async fn connect<Addr, Handler, F>(addr: Addr, handler: Handler) -> Result<(), Error>
where
    Addr: Into<SocketAddr>,
    Handler: Fn(Sinker, Streamer) -> F + Send + Clone + 'static,
    F: std::future::Future<Output = Result<(), Error>> + Send,
{
    let socket = TcpStream::connect(&addr.into()).compat().await?;
    let frame = Framed::new(socket, LengthDelimitedCodec::new());
    let (sink, stream) = frame.split();
    let sink = sink.sink_compat();
    let stream = stream.compat();
    handler(Box::pin(sink), Box::pin(stream)).await
}

#[cfg(test)]
mod test {

    use super::*;
    use futures::executor::ThreadPool;
    use futures::future::FutureExt;
    use futures::prelude::{SinkExt, StreamExt};
    use std::time::SystemTime;

    #[test]
    fn test_echo() -> Result<(), Error> {
        // Simple client implementation that sends 128 messages of 128 bytes.
        // After every sending a message, the client will wait to receive a
        // response.
        async fn client(sink: Sinker, stream: Streamer) -> Result<(), Error> {
            let mut sink = sink;
            let mut stream = stream;

            let mut data = Vec::with_capacity(128);
            for _ in 0..128 {
                for i in 0..data.len() {
                    data[i] = rand::random::<u8>();
                }
                let sys_time_begin = SystemTime::now();

                sink.send(data.as_slice().into()).await?;
                stream.next().await;

                let sys_time_end = SystemTime::now();
                println!(
                    "received in {} µs",
                    sys_time_end
                        .duration_since(sys_time_begin)
                        .expect("SystemTime is before UNIX_EPOCH")
                        .subsec_micros()
                );
            }
            Ok(())
        }

        // Simple server implementation that receives messages and immediately
        // echos the message as a response.
        async fn server(sink: Sinker, stream: Streamer) -> Result<(), Error> {
            let mut sink = sink;
            let mut stream = stream;

            while let Some(chunk) = stream.next().await {
                let sys_time_begin = SystemTime::now();

                let chunk = chunk?.freeze();
                sink.send(chunk).await?;

                let sys_time_end = SystemTime::now();
                println!(
                    "processed in {} µs",
                    sys_time_end
                        .duration_since(sys_time_begin)
                        .expect("SystemTime is before UNIX_EPOCH")
                        .subsec_micros()
                );
            }

            Ok(())
        }

        let mut pool = ThreadPool::new().unwrap();
        let spawner = pool.clone();

        pool.run(async move {
            let mut client_spawner = spawner.clone();
            let mut server_spawner = spawner.clone();

            // Spawn clients
            for _ in 0..64 {
                let addr: SocketAddr = "0.0.0.0:3000".parse().unwrap();
                client_spawner
                    .spawn(connect(addr, client).map(|res| {
                        if let Err(err) = res {
                            panic!("error connecting: {:?}", err);
                        }
                    }))
                    .unwrap();
            }

            // Spawn server
            let addr: SocketAddr = "0.0.0.0:3000".parse().unwrap();
            if let Err(err) = listen(&mut server_spawner, addr, server).await {
                panic!("error listening: {:?}", err);
            }
        });

        Ok(())
    }
}