protosocket_server/
connection_server.rs1use std::future::Future;
2use std::io::Error;
3use std::net::SocketAddr;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::Context;
7use std::task::Poll;
8
9use protosocket::Connection;
10use protosocket::ConnectionBindings;
11use protosocket::Serializer;
12use tokio::sync::mpsc;
13
14pub trait ServerConnector: Unpin {
15 type Bindings: ConnectionBindings;
16
17 fn serializer(&self) -> <Self::Bindings as ConnectionBindings>::Serializer;
18 fn deserializer(&self) -> <Self::Bindings as ConnectionBindings>::Deserializer;
19
20 fn new_reactor(
21 &self,
22 optional_outbound: mpsc::Sender<
23 <<Self::Bindings as ConnectionBindings>::Serializer as Serializer>::Message,
24 >,
25 address: SocketAddr,
26 ) -> <Self::Bindings as ConnectionBindings>::Reactor;
27
28 fn connect(
29 &self,
30 stream: tokio::net::TcpStream,
31 ) -> <Self::Bindings as ConnectionBindings>::Stream;
32}
33
34pub struct ProtosocketServer<Connector: ServerConnector> {
51 connector: Connector,
52 listener: tokio::net::TcpListener,
53 max_buffer_length: usize,
54 buffer_allocation_increment: usize,
55 max_queued_outbound_messages: usize,
56 runtime: tokio::runtime::Handle,
57}
58
59impl<Connector: ServerConnector> ProtosocketServer<Connector> {
60 pub async fn new(
64 address: std::net::SocketAddr,
65 runtime: tokio::runtime::Handle,
66 connector: Connector,
67 ) -> crate::Result<Self> {
68 let listener = tokio::net::TcpListener::bind(address)
69 .await
70 .map_err(Arc::new)?;
71 Ok(Self {
72 connector,
73 listener,
74 max_buffer_length: 16 * (2 << 20),
75 max_queued_outbound_messages: 128,
76 buffer_allocation_increment: 1 << 20,
77 runtime,
78 })
79 }
80
81 pub fn set_max_buffer_length(&mut self, max_buffer_length: usize) {
83 self.max_buffer_length = max_buffer_length;
84 }
85
86 pub fn set_max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
88 self.max_queued_outbound_messages = max_queued_outbound_messages;
89 }
90
91 pub fn set_buffer_allocation_increment(&mut self, buffer_allocation_increment: usize) {
93 self.buffer_allocation_increment = buffer_allocation_increment;
94 }
95}
96
97impl<Connector: ServerConnector> Future for ProtosocketServer<Connector> {
98 type Output = Result<(), Error>;
99
100 fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
101 loop {
102 break match self.listener.poll_accept(context) {
103 Poll::Ready(result) => match result {
104 Ok((stream, address)) => {
105 stream.set_nodelay(true)?;
106 let (outbound_submission_queue, outbound_messages) =
107 mpsc::channel(self.max_queued_outbound_messages);
108 let reactor = self
109 .connector
110 .new_reactor(outbound_submission_queue.clone(), address);
111 let stream = self.connector.connect(stream);
112 let connection: Connection<Connector::Bindings> = Connection::new(
113 stream,
114 address,
115 self.connector.deserializer(),
116 self.connector.serializer(),
117 self.max_buffer_length,
118 self.buffer_allocation_increment,
119 self.max_queued_outbound_messages,
120 outbound_messages,
121 reactor,
122 );
123 self.runtime.spawn(connection);
124 continue;
125 }
126 Err(e) => {
127 log::error!("failed to accept connection: {e:?}");
128 continue;
129 }
130 },
131 Poll::Pending => Poll::Pending,
132 };
133 }
134 }
135}