1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use super::Peer;
use crate::IntoTransport;
use crate::PeerHandle;

/// Server that spawns peers for all accepted connections.
pub struct Server<Listener>
where
	Listener: ServerListener,
{
	listener: Listener,
	config: Listener::Config,
}

/// Helper trait for [`Server`].
///
/// This trait encapsulates all requirements for the `Listener` type of a [`Server`].
///
/// The trait has a blanket implementation,
/// so you can not implement it for your own types.
///
/// You *can* use it as trait bound for generic arguments,
/// but you should not rely on any of the items in this trait.
pub trait ServerListener: crate::util::Listener + Unpin {
	#[doc(hidden)]
	type Body: crate::Body;

	#[doc(hidden)]
	type Config: Clone + Send + Sync + 'static;

	#[doc(hidden)]
	type Transport: Send + 'static;

	#[doc(hidden)]
	fn spawn(connection: Self::Connection, config: Self::Config) -> PeerHandle<Self::Body>;
}

impl<Listener> ServerListener for Listener
where
	Listener: crate::util::Listener + Unpin,
	Listener::Connection: IntoTransport,
{
	type Body = <Listener::Connection as IntoTransport>::Body;
	type Config = <Listener::Connection as IntoTransport>::Config;
	type Transport = <Listener::Connection as IntoTransport>::Transport;

	fn spawn(connection: Self::Connection, config: Self::Config) -> PeerHandle<Self::Body> {
		Peer::spawn(connection.into_transport(config))
	}
}

impl<Listener: ServerListener> Server<Listener> {
	/// Create a server on a listening socket.
	///
	/// The passed in config is used to create transports and peers for all accepted connections.
	pub fn new(listener: Listener, config: Listener::Config) -> Self {
		Self { listener, config }
	}

	/// Run the server.
	///
	/// The server will accept connections in a loop and spawn a user task for each new peer.
	pub async fn run<F, R>(&mut self, task: F) -> std::io::Result<()>
	where
		F: FnMut(PeerHandle<Listener::Body>) -> R,
		R: std::future::Future<Output = ()> + Send + 'static,
	{
		let mut task = task;
		loop {
			let peer = self.accept().await?;
			let join_handle = tokio::spawn((task)(peer));
			// TODO: keep join handles around so we can await them later.
			// If we do, we should also clean them from time to time though.
			drop(join_handle);
		}
	}

	/// Accept a connection and spawn a peer for it.
	///
	/// A [`Peer`] is spawned for the new connection,
	/// and a [`PeerHandle`] is returned to allow interaction with the peer.
	pub async fn accept(&mut self) -> std::io::Result<PeerHandle<Listener::Body>> {
		let (connection, _addr) = self.listener.accept().await?;
		Ok(Listener::spawn(connection, self.config.clone()))
	}
}