1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
use bytes::{Bytes, BytesMut};
use futures::compat::{Future01CompatExt, Sink01CompatExt, Stream01CompatExt};
use futures::sink::Sink;
use futures::stream::{Stream, StreamExt};
use futures::task::SpawnExt;
use std::io::Error;
use std::net::SocketAddr;
use std::pin::Pin;
use tokio::codec::{Framed, LengthDelimitedCodec};
use tokio::net::{TcpListener, TcpStream};
use tokio::prelude::Stream as Stream01;
pub type Sinker = Pin<Box<dyn Sink<Bytes, SinkError = Error> + Send + 'static>>;
pub type Streamer = Pin<Box<dyn Stream<Item = Result<BytesMut, Error>> + Send + 'static>>;
pub async fn listen<Addr, Handler, F>(
spawner: &mut impl SpawnExt,
addr: Addr,
handler: Handler,
) -> Result<(), Error>
where
Addr: Into<SocketAddr>,
Handler: Fn(Sinker, Streamer) -> F + Send + Clone + 'static,
F: std::future::Future<Output = Result<(), Error>> + Send,
{
let listener = TcpListener::bind(&addr.into())?;
let mut incoming = listener.incoming().compat();
while let Some(socket) = incoming.next().await {
let frame = Framed::new(socket?, LengthDelimitedCodec::new());
let (sink, stream) = frame.split();
let sink = sink.sink_compat();
let stream = stream.compat();
let handler = handler.clone();
let spawn = spawner.spawn(async move {
if let Err(err) = handler(Box::pin(sink), Box::pin(stream)).await {
log::info!("bad socket: {}", err);
}
});
if let Err(err) = spawn {
log::error!("error spawning: {:?}", err);
}
}
Ok(())
}
pub async fn connect<Addr, Handler, F>(addr: Addr, handler: Handler) -> Result<(), Error>
where
Addr: Into<SocketAddr>,
Handler: Fn(Sinker, Streamer) -> F + Send + Clone + 'static,
F: std::future::Future<Output = Result<(), Error>> + Send,
{
let socket = TcpStream::connect(&addr.into()).compat().await?;
let frame = Framed::new(socket, LengthDelimitedCodec::new());
let (sink, stream) = frame.split();
let sink = sink.sink_compat();
let stream = stream.compat();
handler(Box::pin(sink), Box::pin(stream)).await
}
#[cfg(test)]
mod test {
use super::*;
use futures::executor::ThreadPool;
use futures::future::FutureExt;
use futures::prelude::{SinkExt, StreamExt};
use std::time::SystemTime;
#[test]
fn test_echo() -> Result<(), Error> {
async fn client(sink: Sinker, stream: Streamer) -> Result<(), Error> {
let mut sink = sink;
let mut stream = stream;
let mut data = Vec::with_capacity(128);
for _ in 0..128 {
for i in 0..data.len() {
data[i] = rand::random::<u8>();
}
let sys_time_begin = SystemTime::now();
sink.send(data.as_slice().into()).await?;
stream.next().await;
let sys_time_end = SystemTime::now();
println!(
"received in {} µs",
sys_time_end
.duration_since(sys_time_begin)
.expect("SystemTime is before UNIX_EPOCH")
.subsec_micros()
);
}
Ok(())
}
async fn server(sink: Sinker, stream: Streamer) -> Result<(), Error> {
let mut sink = sink;
let mut stream = stream;
while let Some(chunk) = stream.next().await {
let sys_time_begin = SystemTime::now();
let chunk = chunk?.freeze();
sink.send(chunk).await?;
let sys_time_end = SystemTime::now();
println!(
"processed in {} µs",
sys_time_end
.duration_since(sys_time_begin)
.expect("SystemTime is before UNIX_EPOCH")
.subsec_micros()
);
}
Ok(())
}
let mut pool = ThreadPool::new().unwrap();
let spawner = pool.clone();
pool.run(async move {
let mut client_spawner = spawner.clone();
let mut server_spawner = spawner.clone();
for _ in 0..64 {
let addr: SocketAddr = "0.0.0.0:3000".parse().unwrap();
client_spawner
.spawn(connect(addr, client).map(|res| {
if let Err(err) = res {
panic!("error connecting: {:?}", err);
}
}))
.unwrap();
}
let addr: SocketAddr = "0.0.0.0:3000".parse().unwrap();
if let Err(err) = listen(&mut server_spawner, addr, server).await {
panic!("error listening: {:?}", err);
}
});
Ok(())
}
}