protosocket_server/
connection_server.rs

1use protosocket::Codec;
2use protosocket::Connection;
3use protosocket::Decoder;
4use protosocket::Encoder;
5use protosocket::MessageReactor;
6use protosocket::SocketListener;
7use protosocket::SocketResult;
8use protosocket::TcpSocketListener;
9use std::future::Future;
10use std::io::Error;
11use std::net::SocketAddr;
12use std::pin::Pin;
13use std::task::Context;
14use std::task::Poll;
15
16/// The ServerConnector listens to a socket and spawns a Reactor for each new connection.
17pub trait ServerConnector: Unpin {
18    /// Message encoding
19    type Codec: Codec
20        + Decoder<Message = <Self::Reactor as MessageReactor>::Inbound>
21        + Encoder<Message = <Self::Reactor as MessageReactor>::Outbound>;
22    /// Per-connection message handler
23    type Reactor: MessageReactor;
24    /// The listener type for this service. E.g., `TcpSocketListener`
25    type SocketListener: SocketListener;
26
27    /// Create a new message codec for a connection
28    fn codec(&self) -> Self::Codec;
29
30    /// Create a per-connection message Reactor.
31    /// You can look at the connection in here if you need some data, like a SocketAddr
32    fn new_reactor(
33        &self,
34        optional_outbound: spillway::Sender<<Self::Reactor as MessageReactor>::LogicalOutbound>,
35        _connection: &<Self::SocketListener as SocketListener>::Stream,
36    ) -> Self::Reactor;
37
38    /// Spawn a connection - probably you just want tokio::spawn, but you might have other needs.
39    fn spawn_connection(
40        &self,
41        connection: Connection<
42            <Self::SocketListener as SocketListener>::Stream,
43            Self::Codec,
44            Self::Reactor,
45        >,
46    );
47}
48
49/// A `protosocket::Connection` is an IO driver. It directly uses tokio's io wrapper of mio to poll
50/// the OS's io primitives, manages read and write buffers, and vends messages to & from connections.
51/// Connections send messages to the ConnectionServer through an mpsc channel, and they receive
52/// inbound messages via a reactor callback.
53///
54/// Protosockets are monomorphic messages: You can only have 1 kind of message per service.
55/// The expected way to work with this is to use prost and protocol buffers to encode messages.
56/// Of course you can do whatever you want, as the telnet example shows.
57///
58/// Protosocket messages are not opinionated about request & reply. If you are, you will need
59/// to implement such a thing. This allows you freely choose whether you want to send
60/// fire-&-forget messages sometimes; however it requires you to write your protocol's rules.
61/// You get an inbound iterable of <MessageIn> batches and an outbound stream of <MessageOut> per
62/// connection - you decide what those mean for you!
63///
64/// A ProtosocketServer is a future: You spawn it and it runs forever.
65///
66/// Construct a new ProtosocketServer by creating a ProtosocketServerConfig and calling the {{bind_tcp}} method.
67pub struct ProtosocketServer<Connector: ServerConnector> {
68    connector: Connector,
69    listener: Connector::SocketListener,
70    max_buffer_length: usize,
71    buffer_allocation_increment: usize,
72    max_queued_outbound_messages: usize,
73}
74
75/// Socket configuration options for a ProtosocketServer.
76pub struct ProtosocketSocketConfig {
77    nodelay: bool,
78    reuse: bool,
79    keepalive_duration: Option<std::time::Duration>,
80    listen_backlog: u32,
81}
82
83impl ProtosocketSocketConfig {
84    /// Whether nodelay should be set on the socket.
85    pub fn nodelay(mut self, nodelay: bool) -> Self {
86        self.nodelay = nodelay;
87        self
88    }
89    /// Whether reuseaddr and reuseport should be set on the socket.
90    pub fn reuse(mut self, reuse: bool) -> Self {
91        self.reuse = reuse;
92        self
93    }
94    /// The keepalive window to be set on the socket.
95    pub fn keepalive_duration(mut self, keepalive_duration: std::time::Duration) -> Self {
96        self.keepalive_duration = Some(keepalive_duration);
97        self
98    }
99    /// The backlog to be set on the socket when invoking `listen`.
100    pub fn listen_backlog(mut self, backlog: u32) -> Self {
101        self.listen_backlog = backlog;
102        self
103    }
104}
105
106impl Default for ProtosocketSocketConfig {
107    fn default() -> Self {
108        Self {
109            nodelay: true,
110            reuse: true,
111            keepalive_duration: None,
112            listen_backlog: 65536,
113        }
114    }
115}
116
117/// A configurator for a protosocket server
118pub struct ProtosocketServerConfig {
119    max_buffer_length: usize,
120    max_queued_outbound_messages: usize,
121    buffer_allocation_increment: usize,
122    socket_config: ProtosocketSocketConfig,
123}
124
125impl ProtosocketServerConfig {
126    /// The maximum buffer length per connection on this server.
127    pub fn max_buffer_length(mut self, max_buffer_length: usize) -> Self {
128        self.max_buffer_length = max_buffer_length;
129        self
130    }
131    /// The maximum number of queued outbound messages per connection on this server.
132    pub fn max_queued_outbound_messages(mut self, max_queued_outbound_messages: usize) -> Self {
133        self.max_queued_outbound_messages = max_queued_outbound_messages;
134        self
135    }
136    /// The step size for allocating additional memory for connection buffers on this server.
137    pub fn buffer_allocation_increment(mut self, buffer_allocation_increment: usize) -> Self {
138        self.buffer_allocation_increment = buffer_allocation_increment;
139        self
140    }
141    /// The tcp socket configuration options for this server.
142    pub fn socket_config(mut self, config: ProtosocketSocketConfig) -> Self {
143        self.socket_config = config;
144        self
145    }
146
147    /// Binds a tcp listener to the given address and returns a ProtosocketServer with this configuration.
148    /// After binding, you must await the returned server future to process requests.
149    pub fn bind_tcp<Connector: ServerConnector<SocketListener = TcpSocketListener>>(
150        self,
151        address: SocketAddr,
152        connector: Connector,
153    ) -> crate::Result<ProtosocketServer<Connector>> {
154        Ok(ProtosocketServer::new(
155            TcpSocketListener::listen(
156                address,
157                self.socket_config.listen_backlog,
158                self.socket_config.keepalive_duration,
159            )?,
160            connector,
161            self,
162        ))
163    }
164
165    /// Build a socket server with a configured listener and a socket Connector.
166    /// After building, you must await the returned server future to process requests.
167    pub fn build_server<
168        Connector: ServerConnector<SocketListener = Listener>,
169        Listener: SocketListener,
170    >(
171        self,
172        listener: Listener,
173        connector: Connector,
174    ) -> crate::Result<ProtosocketServer<Connector>> {
175        Ok(ProtosocketServer::new(listener, connector, self))
176    }
177}
178
179impl Default for ProtosocketServerConfig {
180    fn default() -> Self {
181        Self {
182            max_buffer_length: 16 * (2 << 20),
183            max_queued_outbound_messages: 128,
184            buffer_allocation_increment: 1 << 20,
185            socket_config: Default::default(),
186        }
187    }
188}
189
190impl<Connector: ServerConnector> ProtosocketServer<Connector> {
191    /// Construct a new `ProtosocketServer`.
192    fn new(
193        listener: Connector::SocketListener,
194        connector: Connector,
195        config: ProtosocketServerConfig,
196    ) -> Self {
197        Self {
198            connector,
199            listener,
200            max_buffer_length: config.max_buffer_length,
201            max_queued_outbound_messages: config.max_queued_outbound_messages,
202            buffer_allocation_increment: config.buffer_allocation_increment,
203        }
204    }
205}
206
207impl<Connector: ServerConnector> Unpin for ProtosocketServer<Connector> {}
208impl<Connector: ServerConnector> Future for ProtosocketServer<Connector> {
209    type Output = Result<(), Error>;
210
211    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
212        loop {
213            break match self.listener.poll_accept(context) {
214                Poll::Ready(result) => match result {
215                    SocketResult::Stream(stream) => {
216                        let (outbound_submission_queue, outbound_messages) = spillway::channel();
217                        // I want to let people make their stream,reactor tuple in an async context.
218                        // I want it to not require Send, so that io_uring is possible
219                        // That unfortunately means that Stream might have to be internally async
220                        let reactor = self
221                            .connector
222                            .new_reactor(outbound_submission_queue.clone(), &stream);
223                        let connection = Connection::new(
224                            stream,
225                            self.connector.codec(),
226                            self.max_buffer_length,
227                            self.buffer_allocation_increment,
228                            self.max_queued_outbound_messages,
229                            outbound_messages,
230                            reactor,
231                        );
232                        self.connector.spawn_connection(connection);
233                        continue;
234                    }
235                    SocketResult::Disconnect => Poll::Ready(Ok(())),
236                },
237                Poll::Pending => Poll::Pending,
238            };
239        }
240    }
241}