Skip to main content

protosocket_rpc/server/
socket_server.rs

1use protosocket::Connection;
2use protosocket::Encoder;
3use protosocket::SocketListener;
4use protosocket::SocketResult;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::task::Context;
9use std::task::Poll;
10
11use crate::server::Spawn;
12
13use super::rpc_submitter::RpcSubmitter;
14use super::server_traits::SocketService;
15
16/// A `SocketRpcServer` is a server future. It listens on a socket and spawns new connections,
17/// with a ConnectionService to handle each connection.
18///
19/// Protosockets use monomorphic messages: You can only have 1 kind of message per service.
20/// The expected way to work with this is to use prost and protocol buffers to encode messages.
21///
22/// The socket server hosts your SocketService.
23/// Your SocketService creates a ConnectionService for each new connection.
24/// Your ConnectionService manages one connection. It is Dropped when the connection is closed.
25pub struct SocketRpcServer<TSocketService, TSpawnConnection>
26where
27    TSocketService: SocketService,
28{
29    socket_server: TSocketService,
30    spawner: TSpawnConnection,
31    listener: TSocketService::SocketListener,
32    max_buffer_length: usize,
33    buffer_allocation_increment: usize,
34    max_queued_outbound_messages: usize,
35}
36
37impl<TSocketService>
38    SocketRpcServer<
39        TSocketService,
40        super::TokioSpawn<
41            Connection<
42                <TSocketService::SocketListener as SocketListener>::Stream,
43                TSocketService::Codec,
44                RpcSubmitter<TSocketService::ConnectionService>,
45            >,
46        >,
47    >
48where
49    TSocketService: SocketService,
50    TSocketService::Codec: Send,
51    <TSocketService::Codec as Encoder>::Serialized: Send,
52    <TSocketService::SocketListener as SocketListener>::Stream: Send,
53    TSocketService::ConnectionService: Send,
54{
55    /// Construct a new `SocketRpcServer` with a listener.
56    ///
57    /// This assumes a Tokio runtime, and is only available when your `SocketService` is
58    /// transitively `Send`.
59    pub fn new(
60        listener: TSocketService::SocketListener,
61        socket_server: TSocketService,
62        max_buffer_length: usize,
63        buffer_allocation_increment: usize,
64        max_queued_outbound_messages: usize,
65    ) -> crate::Result<Self> {
66        Self::new_with_spawner(
67            listener,
68            socket_server,
69            max_buffer_length,
70            buffer_allocation_increment,
71            max_queued_outbound_messages,
72            super::TokioSpawn::default(),
73        )
74    }
75}
76
77impl<TSocketService, TSpawnConnection> SocketRpcServer<TSocketService, TSpawnConnection>
78where
79    TSocketService: SocketService,
80    TSpawnConnection: Spawn<
81        Connection<
82            <TSocketService::SocketListener as SocketListener>::Stream,
83            TSocketService::Codec,
84            RpcSubmitter<TSocketService::ConnectionService>,
85        >,
86    >,
87{
88    /// Construct a new `SocketRpcServer` with a listener and a spawner.
89    #[allow(clippy::too_many_arguments)]
90    pub fn new_with_spawner(
91        listener: TSocketService::SocketListener,
92        socket_server: TSocketService,
93        max_buffer_length: usize,
94        buffer_allocation_increment: usize,
95        max_queued_outbound_messages: usize,
96        spawner: TSpawnConnection,
97    ) -> crate::Result<Self> {
98        Ok(Self {
99            socket_server,
100            spawner,
101            listener,
102            max_buffer_length,
103            buffer_allocation_increment,
104            max_queued_outbound_messages,
105        })
106    }
107
108    /// Set the maximum buffer length for connections created by this server after the setting is applied.
109    pub fn set_max_buffer_length(&mut self, max_buffer_length: usize) {
110        self.max_buffer_length = max_buffer_length;
111    }
112
113    /// Set the maximum queued outbound messages for connections created by this server after the setting is applied.
114    pub fn set_max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
115        self.max_queued_outbound_messages = max_queued_outbound_messages;
116    }
117}
118
119impl<TSocketService, TSpawnConnection> Unpin for SocketRpcServer<TSocketService, TSpawnConnection> where
120    TSocketService: SocketService
121{
122}
123impl<TSocketService, TSpawnConnection> Future for SocketRpcServer<TSocketService, TSpawnConnection>
124where
125    TSocketService: SocketService,
126    TSpawnConnection: Spawn<
127        Connection<
128            <TSocketService::SocketListener as SocketListener>::Stream,
129            TSocketService::Codec,
130            RpcSubmitter<TSocketService::ConnectionService>,
131        >,
132    >,
133{
134    type Output = Result<(), Error>;
135
136    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
137        loop {
138            break match self.listener.poll_accept(context) {
139                Poll::Ready(result) => match result {
140                    SocketResult::Stream(stream) => {
141                        let connection_service = self.socket_server.new_stream_service(&stream);
142                        let (outbound_messages, outbound_messages_receiver) = spillway::channel();
143                        let submitter = RpcSubmitter::new(connection_service, outbound_messages);
144                        #[allow(clippy::type_complexity)]
145                        let connection: Connection<
146                            <TSocketService::SocketListener as SocketListener>::Stream,
147                            TSocketService::Codec,
148                            RpcSubmitter<TSocketService::ConnectionService>,
149                        > = Connection::new(
150                            stream,
151                            self.socket_server.codec(),
152                            self.max_buffer_length,
153                            self.buffer_allocation_increment,
154                            self.max_queued_outbound_messages,
155                            outbound_messages_receiver,
156                            submitter,
157                        );
158                        self.spawner.spawn(connection);
159                        continue;
160                    }
161                    SocketResult::Disconnect => Poll::Ready(Ok(())),
162                },
163                Poll::Pending => {
164                    // hooray, listener is pending.
165                    Poll::Pending
166                }
167            };
168        }
169    }
170}