protosocket_rpc/server/
socket_server.rs1use protosocket::Connection;
2use protosocket::Encoder;
3use protosocket::SocketListener;
4use protosocket::SocketResult;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::task::Context;
9use std::task::Poll;
10
11use crate::server::Spawn;
12
13use super::rpc_submitter::RpcSubmitter;
14use super::server_traits::SocketService;
15
16pub struct SocketRpcServer<TSocketService, TSpawnConnection>
27where
28 TSocketService: SocketService,
29{
30 socket_server: TSocketService,
31 spawner: TSpawnConnection,
32 listener: TSocketService::SocketListener,
33 max_buffer_length: usize,
34 buffer_allocation_increment: usize,
35 max_queued_outbound_messages: usize,
36}
37
38impl<TSocketService>
39 SocketRpcServer<
40 TSocketService,
41 super::TokioSpawn<
42 Connection<
43 <TSocketService::SocketListener as SocketListener>::Stream,
44 TSocketService::Codec,
45 RpcSubmitter<TSocketService::ConnectionService>,
46 >,
47 >,
48 >
49where
50 TSocketService: SocketService,
51 TSocketService::Codec: Send,
52 <TSocketService::Codec as Encoder>::Serialized: Send,
53 <TSocketService::SocketListener as SocketListener>::Stream: Send,
54 TSocketService::ConnectionService: Send,
55{
56 pub fn new(
61 listener: TSocketService::SocketListener,
62 socket_server: TSocketService,
63 max_buffer_length: usize,
64 buffer_allocation_increment: usize,
65 max_queued_outbound_messages: usize,
66 ) -> crate::Result<Self> {
67 Self::new_with_spawner(
68 listener,
69 socket_server,
70 max_buffer_length,
71 buffer_allocation_increment,
72 max_queued_outbound_messages,
73 super::TokioSpawn::default(),
74 )
75 }
76}
77
78impl<TSocketService, TSpawnConnection> SocketRpcServer<TSocketService, TSpawnConnection>
79where
80 TSocketService: SocketService,
81 TSpawnConnection: Spawn<
82 Connection<
83 <TSocketService::SocketListener as SocketListener>::Stream,
84 TSocketService::Codec,
85 RpcSubmitter<TSocketService::ConnectionService>,
86 >,
87 >,
88{
89 #[allow(clippy::too_many_arguments)]
91 pub fn new_with_spawner(
92 listener: TSocketService::SocketListener,
93 socket_server: TSocketService,
94 max_buffer_length: usize,
95 buffer_allocation_increment: usize,
96 max_queued_outbound_messages: usize,
97 spawner: TSpawnConnection,
98 ) -> crate::Result<Self> {
99 Ok(Self {
100 socket_server,
101 spawner,
102 listener,
103 max_buffer_length,
104 buffer_allocation_increment,
105 max_queued_outbound_messages,
106 })
107 }
108
109 pub fn set_max_buffer_length(&mut self, max_buffer_length: usize) {
111 self.max_buffer_length = max_buffer_length;
112 }
113
114 pub fn set_max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
116 self.max_queued_outbound_messages = max_queued_outbound_messages;
117 }
118}
119
120impl<TSocketService, TSpawnConnection> Unpin for SocketRpcServer<TSocketService, TSpawnConnection> where
121 TSocketService: SocketService
122{
123}
124impl<TSocketService, TSpawnConnection> Future for SocketRpcServer<TSocketService, TSpawnConnection>
125where
126 TSocketService: SocketService,
127 TSpawnConnection: Spawn<
128 Connection<
129 <TSocketService::SocketListener as SocketListener>::Stream,
130 TSocketService::Codec,
131 RpcSubmitter<TSocketService::ConnectionService>,
132 >,
133 >,
134{
135 type Output = Result<(), Error>;
136
137 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
138 loop {
139 break match self.listener.poll_accept(context) {
140 Poll::Ready(result) => match result {
141 SocketResult::Stream(stream) => {
142 let connection_service = self.socket_server.new_stream_service(&stream);
143 let (outbound_messages, outbound_messages_receiver) = spillway::channel();
144 let submitter = RpcSubmitter::new(connection_service, outbound_messages);
145 #[allow(clippy::type_complexity)]
146 let connection: Connection<
147 <TSocketService::SocketListener as SocketListener>::Stream,
148 TSocketService::Codec,
149 RpcSubmitter<TSocketService::ConnectionService>,
150 > = Connection::new(
151 stream,
152 self.socket_server.codec(),
153 self.max_buffer_length,
154 self.buffer_allocation_increment,
155 self.max_queued_outbound_messages,
156 outbound_messages_receiver,
157 submitter,
158 );
159 self.spawner.spawn(connection);
160 continue;
161 }
162 SocketResult::Disconnect => Poll::Ready(Ok(())),
163 },
164 Poll::Pending => {
165 Poll::Pending
167 }
168 };
169 }
170 }
171}