protosocket_rpc/server/server_traits.rs
1use std::{future::Future, net::SocketAddr};
2
3use protosocket::{Deserializer, Serializer};
4use tokio::io::{AsyncRead, AsyncWrite};
5
6use crate::Message;
7
8/// SocketService receives connections and produces ConnectionServices.
9///
10/// The SocketService is notified when a new connection is established. It is given the address of the
11/// remote peer and it returns a ConnectionService for that connection. You can think of this as the
12/// "connection factory" for your server. It is the "top" of your service stack.
13pub trait SocketService: 'static {
14 /// The type of deserializer for incoming messages.
15 type RequestDeserializer: Deserializer<Message: Message> + 'static;
16 /// The type of serializer for outgoing messages.
17 type ResponseSerializer: Serializer<Message: Message> + 'static;
18 /// The type of connection service that will be created for each connection.
19 type ConnectionService: ConnectionService<
20 Request = <Self::RequestDeserializer as Deserializer>::Message,
21 Response = <Self::ResponseSerializer as Serializer>::Message,
22 >;
23 /// The type of stream that will be used for the connection.
24 /// Something like a `tokio::net::TcpStream` or `tokio_rustls::TlsStream<tokio::net::TcpStream>`.
25 type Stream: AsyncRead + AsyncWrite + Unpin + Send + 'static;
26
27 /// Create a new deserializer for incoming messages.
28 fn deserializer(&self) -> Self::RequestDeserializer;
29 /// Create a new serializer for outgoing messages.
30 fn serializer(&self) -> Self::ResponseSerializer;
31
32 /// Create a new ConnectionService for a new connection.
33 fn new_connection_service(&self, address: SocketAddr) -> Self::ConnectionService;
34
35 /// Accept and possibly customize the stream for a new connection.
36 /// This is where you can wrap the stream with TLS.
37 fn accept_stream(
38 &self,
39 stream: tokio::net::TcpStream,
40 ) -> impl Future<Output = std::io::Result<Self::Stream>> + Send + 'static;
41}
42
43/// A connection service receives rpcs from clients and sends responses.
44///
45/// Each client connection gets a ConnectionService. You put your per-connection state in your
46/// ConnectionService implementation.
47///
48/// Every interaction with a client is done via an RPC. You are called with the initiating message
49/// from the client, and you return the kind of response future that is used to complete the RPC.
50///
51/// A ConnectionService is executed in the context of an RPC connection server, which is a future.
52/// This means you get `&mut self` when you are called with a new rpc. You can use simple mutable
53/// state per-connection; but if you need to share state between connections or elsewhere in your
54/// application, you will need to use an appropriate state sharing mechanism.
55pub trait ConnectionService: Send + Unpin + 'static {
56 /// The type of request message, These messages initiate rpcs.
57 type Request: Message;
58 /// The type of response message, These messages complete rpcs, or are streamed from them.
59 type Response: Message;
60 /// The type of future that completes a unary rpc.
61 type UnaryFutureType: Future<Output = Self::Response> + Send + Unpin;
62 /// The type of stream that completes a streaming rpc.
63 type StreamType: futures::Stream<Item = Self::Response> + Send + Unpin;
64
65 /// Create a new rpc task completion.
66 ///
67 /// You can provide a concrete Future and it will be polled in the context of the Connection
68 /// itself. This would limit your Connection and all of its outstanding rpc's to 1 cpu at a time.
69 /// That might be good for your use case, or it might be suboptimal.
70 /// You can of course also spawn a task and return a completion future that completes when the
71 /// task completes, e.g., with a tokio::sync::oneshot or mpsc stream. In general, try to do as
72 /// little as possible: Return a future (rather than a task handle) and let the ConnectionServer
73 /// task poll it. This keeps your task count low and your wakes more tightly related to the
74 /// cooperating tasks (e.g., ConnectionServer and Connection) that need to be woken.
75 fn new_rpc(
76 &mut self,
77 initiating_message: Self::Request,
78 ) -> RpcKind<Self::UnaryFutureType, Self::StreamType>;
79}
80
81/// Type of rpc to be awaited
82pub enum RpcKind<Unary, Streaming> {
83 /// This is a unary rpc. It will complete with a single response.
84 Unary(Unary),
85 /// This is a streaming rpc. It will complete with a stream of responses.
86 Streaming(Streaming),
87 /// This is an unknown rpc. It will be skipped.
88 Unknown,
89}