protosocket_rpc/server/
socket_server.rs1use protosocket::Connection;
2use protosocket::Encoder;
3use protosocket::SocketListener;
4use protosocket::SocketResult;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::task::Context;
9use std::task::Poll;
10
11use crate::server::Spawn;
12
13use super::rpc_submitter::RpcSubmitter;
14use super::server_traits::SocketService;
15
16pub struct SocketRpcServer<TSocketService, TSpawnConnection>
26where
27 TSocketService: SocketService,
28{
29 socket_server: TSocketService,
30 spawner: TSpawnConnection,
31 listener: TSocketService::SocketListener,
32 max_buffer_length: usize,
33 buffer_allocation_increment: usize,
34 max_queued_outbound_messages: usize,
35}
36
37impl<TSocketService>
38 SocketRpcServer<
39 TSocketService,
40 super::TokioSpawn<
41 Connection<
42 <TSocketService::SocketListener as SocketListener>::Stream,
43 TSocketService::Codec,
44 RpcSubmitter<TSocketService::ConnectionService>,
45 >,
46 >,
47 >
48where
49 TSocketService: SocketService,
50 TSocketService::Codec: Send,
51 <TSocketService::Codec as Encoder>::Serialized: Send,
52 <TSocketService::SocketListener as SocketListener>::Stream: Send,
53 TSocketService::ConnectionService: Send,
54{
55 pub fn new(
60 listener: TSocketService::SocketListener,
61 socket_server: TSocketService,
62 max_buffer_length: usize,
63 buffer_allocation_increment: usize,
64 max_queued_outbound_messages: usize,
65 ) -> crate::Result<Self> {
66 Self::new_with_spawner(
67 listener,
68 socket_server,
69 max_buffer_length,
70 buffer_allocation_increment,
71 max_queued_outbound_messages,
72 super::TokioSpawn::default(),
73 )
74 }
75}
76
77impl<TSocketService, TSpawnConnection> SocketRpcServer<TSocketService, TSpawnConnection>
78where
79 TSocketService: SocketService,
80 TSpawnConnection: Spawn<
81 Connection<
82 <TSocketService::SocketListener as SocketListener>::Stream,
83 TSocketService::Codec,
84 RpcSubmitter<TSocketService::ConnectionService>,
85 >,
86 >,
87{
88 #[allow(clippy::too_many_arguments)]
90 pub fn new_with_spawner(
91 listener: TSocketService::SocketListener,
92 socket_server: TSocketService,
93 max_buffer_length: usize,
94 buffer_allocation_increment: usize,
95 max_queued_outbound_messages: usize,
96 spawner: TSpawnConnection,
97 ) -> crate::Result<Self> {
98 Ok(Self {
99 socket_server,
100 spawner,
101 listener,
102 max_buffer_length,
103 buffer_allocation_increment,
104 max_queued_outbound_messages,
105 })
106 }
107
108 pub fn set_max_buffer_length(&mut self, max_buffer_length: usize) {
110 self.max_buffer_length = max_buffer_length;
111 }
112
113 pub fn set_max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
115 self.max_queued_outbound_messages = max_queued_outbound_messages;
116 }
117}
118
119impl<TSocketService, TSpawnConnection> Unpin for SocketRpcServer<TSocketService, TSpawnConnection> where
120 TSocketService: SocketService
121{
122}
123impl<TSocketService, TSpawnConnection> Future for SocketRpcServer<TSocketService, TSpawnConnection>
124where
125 TSocketService: SocketService,
126 TSpawnConnection: Spawn<
127 Connection<
128 <TSocketService::SocketListener as SocketListener>::Stream,
129 TSocketService::Codec,
130 RpcSubmitter<TSocketService::ConnectionService>,
131 >,
132 >,
133{
134 type Output = Result<(), Error>;
135
136 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
137 loop {
138 break match self.listener.poll_accept(context) {
139 Poll::Ready(result) => match result {
140 SocketResult::Stream(stream) => {
141 let connection_service = self.socket_server.new_stream_service(&stream);
142 let (outbound_messages, outbound_messages_receiver) = spillway::channel();
143 let submitter = RpcSubmitter::new(connection_service, outbound_messages);
144 #[allow(clippy::type_complexity)]
145 let connection: Connection<
146 <TSocketService::SocketListener as SocketListener>::Stream,
147 TSocketService::Codec,
148 RpcSubmitter<TSocketService::ConnectionService>,
149 > = Connection::new(
150 stream,
151 self.socket_server.codec(),
152 self.max_buffer_length,
153 self.buffer_allocation_increment,
154 self.max_queued_outbound_messages,
155 outbound_messages_receiver,
156 submitter,
157 );
158 self.spawner.spawn(connection);
159 continue;
160 }
161 SocketResult::Disconnect => Poll::Ready(Ok(())),
162 },
163 Poll::Pending => {
164 Poll::Pending
166 }
167 };
168 }
169 }
170}