protosocket_rpc/server/
server_traits.rs

1use std::{future::Future, net::SocketAddr};
2
3use protosocket::{Deserializer, Serializer};
4
5use crate::Message;
6
7/// SocketService receives connections and produces ConnectionServices.
8///
9/// The SocketService is notified when a new connection is established. It is given the address of the
10/// remote peer and it returns a ConnectionService for that connection. You can think of this as the
11/// "connection factory" for your server. It is the "top" of your service stack.
12pub trait SocketService: 'static {
13    /// The type of deserializer for incoming messages.
14    type RequestDeserializer: Deserializer<Message: Message> + 'static;
15    /// The type of serializer for outgoing messages.
16    type ResponseSerializer: Serializer<Message: Message> + 'static;
17    /// The type of connection service that will be created for each connection.
18    type ConnectionService: ConnectionService<
19        Request = <Self::RequestDeserializer as Deserializer>::Message,
20        Response = <Self::ResponseSerializer as Serializer>::Message,
21    >;
22
23    /// Create a new deserializer for incoming messages.
24    fn deserializer(&self) -> Self::RequestDeserializer;
25    /// Create a new serializer for outgoing messages.
26    fn serializer(&self) -> Self::ResponseSerializer;
27
28    /// Create a new ConnectionService for a new connection.
29    fn new_connection_service(&self, address: SocketAddr) -> Self::ConnectionService;
30}
31
32/// A connection service receives rpcs from clients and sends responses.
33///
34/// Each client connection gets a ConnectionService. You put your per-connection state in your
35/// ConnectionService implementation.
36///
37/// Every interaction with a client is done via an RPC. You are called with the initiating message
38/// from the client, and you return the kind of response future that is used to complete the RPC.
39///
40/// A ConnectionService is executed in the context of an RPC connection server, which is a future.
41/// This means you get `&mut self` when you are called with a new rpc. You can use simple mutable
42/// state per-connection; but if you need to share state between connections or elsewhere in your
43/// application, you will need to use an appropriate state sharing mechanism.
44pub trait ConnectionService: Send + Unpin + 'static {
45    /// The type of request message, These messages initiate rpcs.
46    type Request: Message;
47    /// The type of response message, These messages complete rpcs, or are streamed from them.
48    type Response: Message;
49    /// The type of future that completes a unary rpc.
50    type UnaryFutureType: Future<Output = Self::Response> + Send + Unpin;
51    /// The type of stream that completes a streaming rpc.
52    type StreamType: futures::Stream<Item = Self::Response> + Send + Unpin;
53
54    /// Create a new rpc task completion.
55    ///
56    /// You can provide a concrete Future and it will be polled in the context of the Connection
57    /// itself. This would limit your Connection and all of its outstanding rpc's to 1 cpu at a time.
58    /// That might be good for your use case, or it might be suboptimal.
59    /// You can of course also spawn a task and return a completion future that completes when the
60    /// task completes, e.g., with a tokio::sync::oneshot or mpsc stream. In general, try to do as
61    /// little as possible: Return a future (rather than a task handle) and let the ConnectionServer
62    /// task poll it. This keeps your task count low and your wakes more tightly related to the
63    /// cooperating tasks (e.g., ConnectionServer and Connection) that need to be woken.
64    fn new_rpc(
65        &mut self,
66        initiating_message: Self::Request,
67    ) -> RpcKind<Self::UnaryFutureType, Self::StreamType>;
68}
69
70/// Type of rpc to be awaited
71pub enum RpcKind<Unary, Streaming> {
72    /// This is a unary rpc. It will complete with a single response.
73    Unary(Unary),
74    /// This is a streaming rpc. It will complete with a stream of responses.
75    Streaming(Streaming),
76    /// This is an unknown rpc. It will be skipped.
77    Unknown,
78}