Skip to main content

protosocket_rpc/server/
socket_server.rs

1use protosocket::Connection;
2use protosocket::Encoder;
3use protosocket::SocketListener;
4use protosocket::SocketResult;
5use std::future::Future;
6use std::io::Error;
7use std::pin::Pin;
8use std::task::Context;
9use std::task::Poll;
10
11use crate::server::Spawn;
12
13use super::rpc_submitter::RpcSubmitter;
14use super::server_traits::SocketService;
15
16/// A `SocketRpcServer` is a server future. It listens on a socket and spawns new connections,
17/// with a ConnectionService to handle each connection.
18///
19/// Protosockets use monomorphic messages: You can only have 1 kind of message per service.
20/// The expected way to work with this is to use prost, messagepack, or some other format that
21/// has a notion of a `oneof`, and list your message types in that.
22///
23/// The socket server hosts your SocketService.
24/// Your SocketService creates a ConnectionService for each new connection.
25/// Your ConnectionService manages one connection. It is Dropped when the connection is closed.
26pub struct SocketRpcServer<TSocketService, TSpawnConnection>
27where
28    TSocketService: SocketService,
29{
30    socket_server: TSocketService,
31    spawner: TSpawnConnection,
32    listener: TSocketService::SocketListener,
33    max_buffer_length: usize,
34    buffer_allocation_increment: usize,
35    max_queued_outbound_messages: usize,
36}
37
38impl<TSocketService>
39    SocketRpcServer<
40        TSocketService,
41        super::TokioSpawn<
42            Connection<
43                <TSocketService::SocketListener as SocketListener>::Stream,
44                TSocketService::Codec,
45                RpcSubmitter<TSocketService::ConnectionService>,
46            >,
47        >,
48    >
49where
50    TSocketService: SocketService,
51    TSocketService::Codec: Send,
52    <TSocketService::Codec as Encoder>::Serialized: Send,
53    <TSocketService::SocketListener as SocketListener>::Stream: Send,
54    TSocketService::ConnectionService: Send,
55{
56    /// Construct a new `SocketRpcServer` with a listener.
57    ///
58    /// This assumes a Tokio runtime, and is only available when your `SocketService` is
59    /// transitively `Send`.
60    pub fn new(
61        listener: TSocketService::SocketListener,
62        socket_server: TSocketService,
63        max_buffer_length: usize,
64        buffer_allocation_increment: usize,
65        max_queued_outbound_messages: usize,
66    ) -> crate::Result<Self> {
67        Self::new_with_spawner(
68            listener,
69            socket_server,
70            max_buffer_length,
71            buffer_allocation_increment,
72            max_queued_outbound_messages,
73            super::TokioSpawn::default(),
74        )
75    }
76}
77
78impl<TSocketService, TSpawnConnection> SocketRpcServer<TSocketService, TSpawnConnection>
79where
80    TSocketService: SocketService,
81    TSpawnConnection: Spawn<
82        Connection<
83            <TSocketService::SocketListener as SocketListener>::Stream,
84            TSocketService::Codec,
85            RpcSubmitter<TSocketService::ConnectionService>,
86        >,
87    >,
88{
89    /// Construct a new `SocketRpcServer` with a listener and a spawner.
90    #[allow(clippy::too_many_arguments)]
91    pub fn new_with_spawner(
92        listener: TSocketService::SocketListener,
93        socket_server: TSocketService,
94        max_buffer_length: usize,
95        buffer_allocation_increment: usize,
96        max_queued_outbound_messages: usize,
97        spawner: TSpawnConnection,
98    ) -> crate::Result<Self> {
99        Ok(Self {
100            socket_server,
101            spawner,
102            listener,
103            max_buffer_length,
104            buffer_allocation_increment,
105            max_queued_outbound_messages,
106        })
107    }
108
109    /// Set the maximum buffer length for connections created by this server after the setting is applied.
110    pub fn set_max_buffer_length(&mut self, max_buffer_length: usize) {
111        self.max_buffer_length = max_buffer_length;
112    }
113
114    /// Set the maximum queued outbound messages for connections created by this server after the setting is applied.
115    pub fn set_max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
116        self.max_queued_outbound_messages = max_queued_outbound_messages;
117    }
118}
119
120impl<TSocketService, TSpawnConnection> Unpin for SocketRpcServer<TSocketService, TSpawnConnection> where
121    TSocketService: SocketService
122{
123}
124impl<TSocketService, TSpawnConnection> Future for SocketRpcServer<TSocketService, TSpawnConnection>
125where
126    TSocketService: SocketService,
127    TSpawnConnection: Spawn<
128        Connection<
129            <TSocketService::SocketListener as SocketListener>::Stream,
130            TSocketService::Codec,
131            RpcSubmitter<TSocketService::ConnectionService>,
132        >,
133    >,
134{
135    type Output = Result<(), Error>;
136
137    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
138        loop {
139            break match self.listener.poll_accept(context) {
140                Poll::Ready(result) => match result {
141                    SocketResult::Stream(stream) => {
142                        let connection_service = self.socket_server.new_stream_service(&stream);
143                        let (outbound_messages, outbound_messages_receiver) = spillway::channel();
144                        let submitter = RpcSubmitter::new(connection_service, outbound_messages);
145                        #[allow(clippy::type_complexity)]
146                        let connection: Connection<
147                            <TSocketService::SocketListener as SocketListener>::Stream,
148                            TSocketService::Codec,
149                            RpcSubmitter<TSocketService::ConnectionService>,
150                        > = Connection::new(
151                            stream,
152                            self.socket_server.codec(),
153                            self.max_buffer_length,
154                            self.buffer_allocation_increment,
155                            self.max_queued_outbound_messages,
156                            outbound_messages_receiver,
157                            submitter,
158                        );
159                        self.spawner.spawn(connection);
160                        continue;
161                    }
162                    SocketResult::Disconnect => Poll::Ready(Ok(())),
163                },
164                Poll::Pending => {
165                    // hooray, listener is pending.
166                    Poll::Pending
167                }
168            };
169        }
170    }
171}