1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
use crate::Peer;
use crate::PeerHandle;
use crate::util;
pub struct Server<Listener>
where
Listener: ServerListener,
{
listener: Listener,
config: Listener::Config,
}
pub trait ServerListener: util::Listener + Unpin {
#[doc(hidden)]
type Body: crate::Body;
#[doc(hidden)]
type Config: Clone + Send + Sync + 'static;
#[doc(hidden)]
type Transport: Send + 'static;
#[doc(hidden)]
fn spawn(connection: Self::Connection, config: Self::Config) -> PeerHandle<Self::Body>;
}
impl<Listener> ServerListener for Listener
where
Listener: util::Listener + Unpin,
Listener::Connection: util::IntoTransport,
{
type Body = <Listener::Connection as util::IntoTransport>::Body;
type Config = <Listener::Connection as util::IntoTransport>::Config;
type Transport = <Listener::Connection as util::IntoTransport>::Transport;
fn spawn(connection: Self::Connection, config: Self::Config) -> PeerHandle<Self::Body> {
use util::IntoTransport;
Peer::spawn(connection.into_transport(config))
}
}
impl<Listener: ServerListener> Server<Listener> {
pub fn new(listener: Listener, config: Listener::Config) -> Self {
Self { listener, config }
}
pub async fn bind<'a, Address: 'a>(address: Address, config: Listener::Config) -> std::io::Result<Self>
where
Listener: util::Bind<'a, Address>,
{
Ok(Self::new(Listener::bind(address).await?, config))
}
pub async fn run<F, R>(&mut self, task: F) -> std::io::Result<()>
where
F: FnMut(PeerHandle<Listener::Body>) -> R,
R: std::future::Future<Output = ()> + Send + 'static,
{
let mut task = task;
loop {
let peer = self.accept().await?;
let join_handle = tokio::spawn((task)(peer));
drop(join_handle);
}
}
pub async fn accept(&mut self) -> std::io::Result<PeerHandle<Listener::Body>> {
let (connection, _addr) = self.listener.accept().await?;
Ok(Listener::spawn(connection, self.config.clone()))
}
}