protosocket_server/
connection_server.rs

1use protosocket::Codec;
2use protosocket::Connection;
3use protosocket::Decoder;
4use protosocket::Encoder;
5use protosocket::MessageReactor;
6use protosocket::SocketListener;
7use protosocket::SocketResult;
8use protosocket::TcpSocketListener;
9use std::future::Future;
10use std::io::Error;
11use std::net::SocketAddr;
12use std::pin::Pin;
13use std::task::Context;
14use std::task::Poll;
15
16/// The ServerConnector listens to a socket and spawns a Reactor for each new connection.
17pub trait ServerConnector: Unpin {
18    /// Message encoding
19    type Codec: Codec
20        + Decoder<Message = <Self::Reactor as MessageReactor>::Inbound>
21        + Encoder<Message = <Self::Reactor as MessageReactor>::Outbound>;
22    /// Per-connection message handler
23    type Reactor: MessageReactor;
24    /// The listener type for this service. E.g., `TcpSocketListener`
25    type SocketListener: SocketListener;
26
27    /// Create a new message codec for a connection
28    fn codec(&self) -> Self::Codec;
29
30    /// Create a per-connection message Reactor.
31    /// You can look at the connection in here if you need some data, like a SocketAddr
32    fn new_reactor(
33        &self,
34        optional_outbound: spillway::Sender<<Self::Reactor as MessageReactor>::LogicalOutbound>,
35        _connection: &<Self::SocketListener as SocketListener>::Stream,
36    ) -> Self::Reactor;
37
38    /// Spawn a connection - probably you just want tokio::spawn, but you might have other needs.
39    fn spawn_connection(
40        &self,
41        connection: Connection<
42            <Self::SocketListener as SocketListener>::Stream,
43            Self::Codec,
44            Self::Reactor,
45        >,
46    );
47}
48
49/// A `protosocket::Connection` is an IO driver. It directly uses tokio's io wrapper of mio to poll
50/// the OS's io primitives, manages read and write buffers, and vends messages to & from connections.
51/// Connections send messages to the ConnectionServer through an mpsc channel, and they receive
52/// inbound messages via a reactor callback.
53///
54/// Protosockets are monomorphic messages: You can only have 1 kind of message per service.
55/// The expected way to work with this is to use prost and protocol buffers to encode messages.
56/// Of course you can do whatever you want, as the telnet example shows.
57///
58/// Protosocket messages are not opinionated about request & reply. If you are, you will need
59/// to implement such a thing. This allows you freely choose whether you want to send
60/// fire-&-forget messages sometimes; however it requires you to write your protocol's rules.
61/// You get an inbound iterable of <MessageIn> batches and an outbound stream of <MessageOut> per
62/// connection - you decide what those mean for you!
63///
64/// A ProtosocketServer is a future: You spawn it and it runs forever.
65///
66/// Construct a new ProtosocketServer by creating a ProtosocketServerConfig and calling the {{bind_tcp}} method.
67pub struct ProtosocketServer<Connector: ServerConnector> {
68    connector: Connector,
69    listener: Connector::SocketListener,
70    max_buffer_length: usize,
71    buffer_allocation_increment: usize,
72    max_queued_outbound_messages: usize,
73}
74
75/// Socket configuration options for a ProtosocketServer.
76pub struct ProtosocketSocketConfig {
77    nodelay: bool,
78    reuse: bool,
79    keepalive_duration: Option<std::time::Duration>,
80    listen_backlog: u32,
81}
82
83impl ProtosocketSocketConfig {
84    /// Whether nodelay should be set on the socket.
85    pub fn nodelay(mut self, nodelay: bool) -> Self {
86        self.nodelay = nodelay;
87        self
88    }
89    /// Whether reuseaddr and reuseport should be set on the socket.
90    pub fn reuse(mut self, reuse: bool) -> Self {
91        self.reuse = reuse;
92        self
93    }
94    /// The keepalive window to be set on the socket.
95    pub fn keepalive_duration(mut self, keepalive_duration: std::time::Duration) -> Self {
96        self.keepalive_duration = Some(keepalive_duration);
97        self
98    }
99    /// The backlog to be set on the socket when invoking `listen`.
100    pub fn listen_backlog(mut self, backlog: u32) -> Self {
101        self.listen_backlog = backlog;
102        self
103    }
104}
105
106impl Default for ProtosocketSocketConfig {
107    fn default() -> Self {
108        Self {
109            nodelay: true,
110            reuse: true,
111            keepalive_duration: None,
112            listen_backlog: 65536,
113        }
114    }
115}
116
117pub struct ProtosocketServerConfig {
118    max_buffer_length: usize,
119    max_queued_outbound_messages: usize,
120    buffer_allocation_increment: usize,
121    socket_config: ProtosocketSocketConfig,
122}
123
124impl ProtosocketServerConfig {
125    /// The maximum buffer length per connection on this server.
126    pub fn max_buffer_length(mut self, max_buffer_length: usize) -> Self {
127        self.max_buffer_length = max_buffer_length;
128        self
129    }
130    /// The maximum number of queued outbound messages per connection on this server.
131    pub fn max_queued_outbound_messages(mut self, max_queued_outbound_messages: usize) -> Self {
132        self.max_queued_outbound_messages = max_queued_outbound_messages;
133        self
134    }
135    /// The step size for allocating additional memory for connection buffers on this server.
136    pub fn buffer_allocation_increment(mut self, buffer_allocation_increment: usize) -> Self {
137        self.buffer_allocation_increment = buffer_allocation_increment;
138        self
139    }
140    /// The tcp socket configuration options for this server.
141    pub fn socket_config(mut self, config: ProtosocketSocketConfig) -> Self {
142        self.socket_config = config;
143        self
144    }
145
146    /// Binds a tcp listener to the given address and returns a ProtosocketServer with this configuration.
147    /// After binding, you must await the returned server future to process requests.
148    pub fn bind_tcp<Connector: ServerConnector<SocketListener = TcpSocketListener>>(
149        self,
150        address: SocketAddr,
151        connector: Connector,
152    ) -> crate::Result<ProtosocketServer<Connector>> {
153        Ok(ProtosocketServer::new(
154            TcpSocketListener::listen(
155                address,
156                self.socket_config.listen_backlog,
157                self.socket_config.keepalive_duration,
158            )?,
159            connector,
160            self,
161        ))
162    }
163
164    /// Build a socket server with a configured listener and a socket Connector.
165    /// After building, you must await the returned server future to process requests.
166    pub fn build_server<
167        Connector: ServerConnector<SocketListener = Listener>,
168        Listener: SocketListener,
169    >(
170        self,
171        listener: Listener,
172        connector: Connector,
173    ) -> crate::Result<ProtosocketServer<Connector>> {
174        Ok(ProtosocketServer::new(listener, connector, self))
175    }
176}
177
178impl Default for ProtosocketServerConfig {
179    fn default() -> Self {
180        Self {
181            max_buffer_length: 16 * (2 << 20),
182            max_queued_outbound_messages: 128,
183            buffer_allocation_increment: 1 << 20,
184            socket_config: Default::default(),
185        }
186    }
187}
188
189impl<Connector: ServerConnector> ProtosocketServer<Connector> {
190    /// Construct a new `ProtosocketServer`.
191    fn new(
192        listener: Connector::SocketListener,
193        connector: Connector,
194        config: ProtosocketServerConfig,
195    ) -> Self {
196        Self {
197            connector,
198            listener,
199            max_buffer_length: config.max_buffer_length,
200            max_queued_outbound_messages: config.max_queued_outbound_messages,
201            buffer_allocation_increment: config.buffer_allocation_increment,
202        }
203    }
204}
205
206impl<Connector: ServerConnector> Unpin for ProtosocketServer<Connector> {}
207impl<Connector: ServerConnector> Future for ProtosocketServer<Connector> {
208    type Output = Result<(), Error>;
209
210    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
211        loop {
212            break match self.listener.poll_accept(context) {
213                Poll::Ready(result) => match result {
214                    SocketResult::Stream(stream) => {
215                        let (outbound_submission_queue, outbound_messages) = spillway::channel();
216                        // I want to let people make their stream,reactor tuple in an async context.
217                        // I want it to not require Send, so that io_uring is possible
218                        // That unfortunately means that Stream might have to be internally async
219                        let reactor = self
220                            .connector
221                            .new_reactor(outbound_submission_queue.clone(), &stream);
222                        let connection = Connection::new(
223                            stream,
224                            self.connector.codec(),
225                            self.max_buffer_length,
226                            self.buffer_allocation_increment,
227                            self.max_queued_outbound_messages,
228                            outbound_messages,
229                            reactor,
230                        );
231                        self.connector.spawn_connection(connection);
232                        continue;
233                    }
234                    SocketResult::Disconnect => Poll::Ready(Ok(())),
235                },
236                Poll::Pending => Poll::Pending,
237            };
238        }
239    }
240}