protosocket_server/
connection_server.rs1use protosocket::Codec;
2use protosocket::Connection;
3use protosocket::Decoder;
4use protosocket::Encoder;
5use protosocket::MessageReactor;
6use protosocket::SocketListener;
7use protosocket::SocketResult;
8use protosocket::TcpSocketListener;
9use std::future::Future;
10use std::io::Error;
11use std::net::SocketAddr;
12use std::pin::Pin;
13use std::task::Context;
14use std::task::Poll;
15
16pub trait ServerConnector: Unpin {
18 type Codec: Codec
20 + Decoder<Message = <Self::Reactor as MessageReactor>::Inbound>
21 + Encoder<Message = <Self::Reactor as MessageReactor>::Outbound>;
22 type Reactor: MessageReactor;
24 type SocketListener: SocketListener;
26
27 fn codec(&self) -> Self::Codec;
29
30 fn new_reactor(
33 &self,
34 optional_outbound: spillway::Sender<<Self::Reactor as MessageReactor>::LogicalOutbound>,
35 _connection: &<Self::SocketListener as SocketListener>::Stream,
36 ) -> Self::Reactor;
37
38 fn spawn_connection(
40 &self,
41 connection: Connection<
42 <Self::SocketListener as SocketListener>::Stream,
43 Self::Codec,
44 Self::Reactor,
45 >,
46 );
47}
48
49pub struct ProtosocketServer<Connector: ServerConnector> {
68 connector: Connector,
69 listener: Connector::SocketListener,
70 max_buffer_length: usize,
71 buffer_allocation_increment: usize,
72 max_queued_outbound_messages: usize,
73}
74
75pub struct ProtosocketSocketConfig {
77 nodelay: bool,
78 reuse: bool,
79 keepalive_duration: Option<std::time::Duration>,
80 listen_backlog: u32,
81}
82
83impl ProtosocketSocketConfig {
84 pub fn nodelay(mut self, nodelay: bool) -> Self {
86 self.nodelay = nodelay;
87 self
88 }
89 pub fn reuse(mut self, reuse: bool) -> Self {
91 self.reuse = reuse;
92 self
93 }
94 pub fn keepalive_duration(mut self, keepalive_duration: std::time::Duration) -> Self {
96 self.keepalive_duration = Some(keepalive_duration);
97 self
98 }
99 pub fn listen_backlog(mut self, backlog: u32) -> Self {
101 self.listen_backlog = backlog;
102 self
103 }
104}
105
106impl Default for ProtosocketSocketConfig {
107 fn default() -> Self {
108 Self {
109 nodelay: true,
110 reuse: true,
111 keepalive_duration: None,
112 listen_backlog: 65536,
113 }
114 }
115}
116
117pub struct ProtosocketServerConfig {
119 max_buffer_length: usize,
120 max_queued_outbound_messages: usize,
121 buffer_allocation_increment: usize,
122 socket_config: ProtosocketSocketConfig,
123}
124
125impl ProtosocketServerConfig {
126 pub fn max_buffer_length(mut self, max_buffer_length: usize) -> Self {
128 self.max_buffer_length = max_buffer_length;
129 self
130 }
131 pub fn max_queued_outbound_messages(mut self, max_queued_outbound_messages: usize) -> Self {
133 self.max_queued_outbound_messages = max_queued_outbound_messages;
134 self
135 }
136 pub fn buffer_allocation_increment(mut self, buffer_allocation_increment: usize) -> Self {
138 self.buffer_allocation_increment = buffer_allocation_increment;
139 self
140 }
141 pub fn socket_config(mut self, config: ProtosocketSocketConfig) -> Self {
143 self.socket_config = config;
144 self
145 }
146
147 pub fn bind_tcp<Connector: ServerConnector<SocketListener = TcpSocketListener>>(
150 self,
151 address: SocketAddr,
152 connector: Connector,
153 ) -> crate::Result<ProtosocketServer<Connector>> {
154 Ok(ProtosocketServer::new(
155 TcpSocketListener::listen(
156 address,
157 self.socket_config.listen_backlog,
158 self.socket_config.keepalive_duration,
159 )?,
160 connector,
161 self,
162 ))
163 }
164
165 pub fn build_server<
168 Connector: ServerConnector<SocketListener = Listener>,
169 Listener: SocketListener,
170 >(
171 self,
172 listener: Listener,
173 connector: Connector,
174 ) -> crate::Result<ProtosocketServer<Connector>> {
175 Ok(ProtosocketServer::new(listener, connector, self))
176 }
177}
178
179impl Default for ProtosocketServerConfig {
180 fn default() -> Self {
181 Self {
182 max_buffer_length: 16 * (2 << 20),
183 max_queued_outbound_messages: 128,
184 buffer_allocation_increment: 1 << 20,
185 socket_config: Default::default(),
186 }
187 }
188}
189
190impl<Connector: ServerConnector> ProtosocketServer<Connector> {
191 fn new(
193 listener: Connector::SocketListener,
194 connector: Connector,
195 config: ProtosocketServerConfig,
196 ) -> Self {
197 Self {
198 connector,
199 listener,
200 max_buffer_length: config.max_buffer_length,
201 max_queued_outbound_messages: config.max_queued_outbound_messages,
202 buffer_allocation_increment: config.buffer_allocation_increment,
203 }
204 }
205}
206
207impl<Connector: ServerConnector> Unpin for ProtosocketServer<Connector> {}
208impl<Connector: ServerConnector> Future for ProtosocketServer<Connector> {
209 type Output = Result<(), Error>;
210
211 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
212 loop {
213 break match self.listener.poll_accept(context) {
214 Poll::Ready(result) => match result {
215 SocketResult::Stream(stream) => {
216 let (outbound_submission_queue, outbound_messages) = spillway::channel();
217 let reactor = self
221 .connector
222 .new_reactor(outbound_submission_queue.clone(), &stream);
223 let connection = Connection::new(
224 stream,
225 self.connector.codec(),
226 self.max_buffer_length,
227 self.buffer_allocation_increment,
228 self.max_queued_outbound_messages,
229 outbound_messages,
230 reactor,
231 );
232 self.connector.spawn_connection(connection);
233 continue;
234 }
235 SocketResult::Disconnect => Poll::Ready(Ok(())),
236 },
237 Poll::Pending => Poll::Pending,
238 };
239 }
240 }
241}