protosocket_server/
connection_server.rs1use protosocket::Codec;
2use protosocket::Connection;
3use protosocket::Decoder;
4use protosocket::Encoder;
5use protosocket::MessageReactor;
6use protosocket::SocketListener;
7use protosocket::SocketResult;
8use protosocket::TcpSocketListener;
9use std::future::Future;
10use std::io::Error;
11use std::net::SocketAddr;
12use std::pin::Pin;
13use std::task::Context;
14use std::task::Poll;
15
16pub trait ServerConnector: Unpin {
18 type Codec: Codec
20 + Decoder<Message = <Self::Reactor as MessageReactor>::Inbound>
21 + Encoder<Message = <Self::Reactor as MessageReactor>::Outbound>;
22 type Reactor: MessageReactor;
24 type SocketListener: SocketListener;
26
27 fn codec(&self) -> Self::Codec;
29
30 fn new_reactor(
33 &self,
34 optional_outbound: spillway::Sender<<Self::Reactor as MessageReactor>::LogicalOutbound>,
35 _connection: &<Self::SocketListener as SocketListener>::Stream,
36 ) -> Self::Reactor;
37
38 fn spawn_connection(
40 &self,
41 connection: Connection<
42 <Self::SocketListener as SocketListener>::Stream,
43 Self::Codec,
44 Self::Reactor,
45 >,
46 );
47}
48
49pub struct ProtosocketServer<Connector: ServerConnector> {
68 connector: Connector,
69 listener: Connector::SocketListener,
70 max_buffer_length: usize,
71 buffer_allocation_increment: usize,
72 max_queued_outbound_messages: usize,
73}
74
75pub struct ProtosocketSocketConfig {
77 nodelay: bool,
78 reuse: bool,
79 keepalive_duration: Option<std::time::Duration>,
80 listen_backlog: u32,
81}
82
83impl ProtosocketSocketConfig {
84 pub fn nodelay(mut self, nodelay: bool) -> Self {
86 self.nodelay = nodelay;
87 self
88 }
89 pub fn reuse(mut self, reuse: bool) -> Self {
91 self.reuse = reuse;
92 self
93 }
94 pub fn keepalive_duration(mut self, keepalive_duration: std::time::Duration) -> Self {
96 self.keepalive_duration = Some(keepalive_duration);
97 self
98 }
99 pub fn listen_backlog(mut self, backlog: u32) -> Self {
101 self.listen_backlog = backlog;
102 self
103 }
104}
105
106impl Default for ProtosocketSocketConfig {
107 fn default() -> Self {
108 Self {
109 nodelay: true,
110 reuse: true,
111 keepalive_duration: None,
112 listen_backlog: 65536,
113 }
114 }
115}
116
117pub struct ProtosocketServerConfig {
118 max_buffer_length: usize,
119 max_queued_outbound_messages: usize,
120 buffer_allocation_increment: usize,
121 socket_config: ProtosocketSocketConfig,
122}
123
124impl ProtosocketServerConfig {
125 pub fn max_buffer_length(mut self, max_buffer_length: usize) -> Self {
127 self.max_buffer_length = max_buffer_length;
128 self
129 }
130 pub fn max_queued_outbound_messages(mut self, max_queued_outbound_messages: usize) -> Self {
132 self.max_queued_outbound_messages = max_queued_outbound_messages;
133 self
134 }
135 pub fn buffer_allocation_increment(mut self, buffer_allocation_increment: usize) -> Self {
137 self.buffer_allocation_increment = buffer_allocation_increment;
138 self
139 }
140 pub fn socket_config(mut self, config: ProtosocketSocketConfig) -> Self {
142 self.socket_config = config;
143 self
144 }
145
146 pub fn bind_tcp<Connector: ServerConnector<SocketListener = TcpSocketListener>>(
149 self,
150 address: SocketAddr,
151 connector: Connector,
152 ) -> crate::Result<ProtosocketServer<Connector>> {
153 Ok(ProtosocketServer::new(
154 TcpSocketListener::listen(
155 address,
156 self.socket_config.listen_backlog,
157 self.socket_config.keepalive_duration,
158 )?,
159 connector,
160 self,
161 ))
162 }
163
164 pub fn build_server<
167 Connector: ServerConnector<SocketListener = Listener>,
168 Listener: SocketListener,
169 >(
170 self,
171 listener: Listener,
172 connector: Connector,
173 ) -> crate::Result<ProtosocketServer<Connector>> {
174 Ok(ProtosocketServer::new(listener, connector, self))
175 }
176}
177
178impl Default for ProtosocketServerConfig {
179 fn default() -> Self {
180 Self {
181 max_buffer_length: 16 * (2 << 20),
182 max_queued_outbound_messages: 128,
183 buffer_allocation_increment: 1 << 20,
184 socket_config: Default::default(),
185 }
186 }
187}
188
189impl<Connector: ServerConnector> ProtosocketServer<Connector> {
190 fn new(
192 listener: Connector::SocketListener,
193 connector: Connector,
194 config: ProtosocketServerConfig,
195 ) -> Self {
196 Self {
197 connector,
198 listener,
199 max_buffer_length: config.max_buffer_length,
200 max_queued_outbound_messages: config.max_queued_outbound_messages,
201 buffer_allocation_increment: config.buffer_allocation_increment,
202 }
203 }
204}
205
206impl<Connector: ServerConnector> Unpin for ProtosocketServer<Connector> {}
207impl<Connector: ServerConnector> Future for ProtosocketServer<Connector> {
208 type Output = Result<(), Error>;
209
210 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
211 loop {
212 break match self.listener.poll_accept(context) {
213 Poll::Ready(result) => match result {
214 SocketResult::Stream(stream) => {
215 let (outbound_submission_queue, outbound_messages) = spillway::channel();
216 let reactor = self
220 .connector
221 .new_reactor(outbound_submission_queue.clone(), &stream);
222 let connection = Connection::new(
223 stream,
224 self.connector.codec(),
225 self.max_buffer_length,
226 self.buffer_allocation_increment,
227 self.max_queued_outbound_messages,
228 outbound_messages,
229 reactor,
230 );
231 self.connector.spawn_connection(connection);
232 continue;
233 }
234 SocketResult::Disconnect => Poll::Ready(Ok(())),
235 },
236 Poll::Pending => Poll::Pending,
237 };
238 }
239 }
240}