datacake_rpc/
handler.rs

1use std::collections::BTreeMap;
2use std::marker::PhantomData;
3use std::net::SocketAddr;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use rkyv::ser::serializers::AllocSerializer;
8use rkyv::{AlignedVec, Archive, Serialize};
9
10use crate::net::Status;
11use crate::request::{Request, RequestContents};
12use crate::{Body, SCRATCH_SPACE};
13
14/// A specific handler key.
15///
16/// This is in the format of (service_name, handler_path).
17pub type HandlerKey = u64;
18
19/// A registry system used for linking a service's message handlers
20/// with the RPC system at runtime.
21///
22/// Since the RPC system cannot determine what message payload matches
23/// with which handler at compile time, it must dynamically link them
24/// at runtime.
25///
26/// Not registering a handler will cause the handler to not be triggered
27/// even if a valid message comes through.
28///
29///
30/// ```rust
31/// use rkyv::{Archive, Deserialize, Serialize};
32/// use datacake_rpc::{Handler, Request, RpcService, ServiceRegistry, Status, RpcClient, Channel};
33/// use std::net::SocketAddr;
34///
35/// #[repr(C)]
36/// #[derive(Serialize, Deserialize, Archive, Debug)]
37/// #[archive(check_bytes)]
38/// #[archive_attr(derive(Debug))]
39/// pub struct MyMessage {
40///     name: String,
41///     age: u32,
42/// }
43///
44/// #[repr(C)]
45/// #[derive(Serialize, Deserialize, Archive, Debug)]
46/// #[archive(check_bytes)]
47/// #[archive_attr(derive(Debug))]
48/// pub struct MyOtherMessage {
49///     age: u32,
50/// }
51///
52/// pub struct EchoService;
53///
54/// impl RpcService for EchoService {
55///     fn register_handlers(registry: &mut ServiceRegistry<Self>) {
56///         // Since we've registered the `MyMessage` handler, the RPC system
57///         // will dispatch the messages to out handler.
58///         //
59///         // But because we *haven't* registered our `MyOtherMessage` handler,
60///         // even though our service implements the handler, no messages will
61///         // be dispatched.
62///         registry.add_handler::<MyMessage>();
63///
64///     }
65/// }
66///
67/// #[datacake_rpc::async_trait]
68/// impl Handler<MyMessage> for EchoService {
69///     type Reply = MyMessage;
70///
71///     async fn on_message(&self, msg: Request<MyMessage>) -> Result<Self::Reply, Status> {
72///         Ok(msg.to_owned().unwrap())
73///     }
74/// }
75///
76/// #[datacake_rpc::async_trait]
77/// impl Handler<MyOtherMessage> for EchoService {
78///     type Reply = MyOtherMessage;
79///
80///     async fn on_message(&self, msg: Request<MyOtherMessage>) -> Result<Self::Reply, Status> {
81///         Ok(msg.to_owned().unwrap())
82///     }
83/// }
84/// ```
85pub struct ServiceRegistry<Svc> {
86    handlers: BTreeMap<HandlerKey, Arc<dyn OpaqueMessageHandler>>,
87    service: Arc<Svc>,
88}
89
90impl<Svc> ServiceRegistry<Svc>
91where
92    Svc: RpcService + Send + Sync + 'static,
93{
94    pub(crate) fn new(service: Svc) -> Self {
95        Self {
96            handlers: BTreeMap::new(),
97            service: Arc::new(service),
98        }
99    }
100
101    /// Consumes the registry into the produced handlers.
102    pub(crate) fn into_handlers(
103        self,
104    ) -> BTreeMap<HandlerKey, Arc<dyn OpaqueMessageHandler>> {
105        self.handlers
106    }
107
108    /// Adds a new handler to the registry.
109    ///
110    /// This is done in the form of specifying what message types are handled
111    /// by the service via the generic.
112    pub fn add_handler<Msg>(&mut self)
113    where
114        Msg: RequestContents + Sync + Send + 'static,
115        Svc: Handler<Msg>,
116    {
117        let phantom = PhantomHandler {
118            handler: self.service.clone(),
119            _msg: PhantomData::<Msg>::default(),
120        };
121
122        let uri = crate::to_uri_path(Svc::service_name(), <Svc as Handler<Msg>>::path());
123        self.handlers.insert(crate::hash(&uri), Arc::new(phantom));
124    }
125}
126
127/// A standard RPC server that handles messages.
128///
129/// ```rust
130/// use datacake_rpc::{RpcService, ServiceRegistry};
131///
132/// pub struct MyService;
133///
134/// impl RpcService for MyService {
135///     // This is an optional method which can be used
136///     // to avoid naming conflicts between two services.
137///     // By default this uses the type name of the service.
138///     fn service_name() -> &'static str {
139///         "my-lovely-service"
140///     }
141///
142///     fn register_handlers(registry: &mut ServiceRegistry<Self>) {
143///         // Register each one of our handlers here.
144///     }
145/// }
146/// ```
147pub trait RpcService: Sized {
148    /// An optional name of the service.
149    ///
150    /// This can be used to prevent overlaps or clashes
151    /// in handlers as two services may handle the same
152    /// message but behave differently, to distinguish between
153    /// these services, the message paths also use the service name
154    /// to create a unique key.
155    fn service_name() -> &'static str {
156        std::any::type_name::<Self>()
157    }
158
159    /// Register all message handlers for this server with the registry.
160    ///
161    /// See [ServiceRegistry] for more information.
162    fn register_handlers(registry: &mut ServiceRegistry<Self>);
163}
164
165#[async_trait]
166/// A generic RPC message handler.
167///
168/// ```rust
169/// use rkyv::{Archive, Deserialize, Serialize};
170/// use datacake_rpc::{Handler, Request, RpcService, ServiceRegistry, Status, RpcClient, Channel};
171/// use std::net::SocketAddr;
172///
173/// #[repr(C)]
174/// #[derive(Serialize, Deserialize, Archive, Debug)]
175/// #[archive(check_bytes)]
176/// #[archive_attr(derive(Debug))]
177/// pub struct MyMessage {
178///     name: String,
179///     age: u32,
180/// }
181///
182/// pub struct EchoService;
183///
184/// impl RpcService for EchoService {
185///     fn register_handlers(registry: &mut ServiceRegistry<Self>) {
186///         registry.add_handler::<MyMessage>();
187///     }
188/// }
189///
190/// // Our message must implement `Archive` and have it's archived value
191/// // implement check bytes, this is used to provide the zero-copy functionality.
192/// #[datacake_rpc::async_trait]
193/// impl Handler<MyMessage> for EchoService {
194///     // Our reply can be any type that implements `Archive` and `Serialize` as part
195///     // of the rkyv package. Here we're just echoing the message back.
196///     type Reply = MyMessage;
197///
198///     // We get passed a `Request` which is a thin wrapper around the `DataView` type.
199///     // This means we are simply being given a zero-copy view of the message rather
200///     // than a owned value. If you need a owned version which is not tied ot the
201///     // request buffer, you can use the `to_owned` method which will attempt to
202///     // deserialize the inner message/view.
203///     async fn on_message(&self, msg: Request<MyMessage>) -> Result<Self::Reply, Status> {
204///         Ok(msg.to_owned().unwrap())
205///     }
206/// }
207/// ```
208pub trait Handler<Msg>: RpcService
209where
210    Msg: RequestContents,
211{
212    /// Our reply can be any type that implements [Archive] and [Serialize] as part
213    /// of the [rkyv] package. Here we're just echoing the message back.
214    type Reply: TryIntoBody;
215
216    /// The path of the message, this is similar to the service name which can
217    /// be used to avoid conflicts, by default this uses the name of the message type.
218    fn path() -> &'static str {
219        std::any::type_name::<Msg>()
220    }
221
222    /// Process a message.
223    /// We get passed a [Request] which is a thin wrapper around the inner content of
224    /// the specified type as defined by [RequestContents::Content]
225    ///
226    /// This means we are simply being given a zero-copy view of the message rather
227    /// than a owned value. If you need a owned version which is not tied ot the
228    /// request buffer, you can use the `to_owned` method which will attempt to
229    /// deserialize the inner message/view.
230    async fn on_message(&self, msg: Request<Msg>) -> Result<Self::Reply, Status>;
231}
232
233#[async_trait]
234pub(crate) trait OpaqueMessageHandler: Send + Sync {
235    async fn try_handle(
236        &self,
237        remote_addr: SocketAddr,
238        data: Body,
239    ) -> Result<Body, AlignedVec>;
240}
241
242struct PhantomHandler<H, Msg>
243where
244    H: Send + Sync + 'static,
245    Msg: Send + 'static,
246{
247    handler: Arc<H>,
248    _msg: PhantomData<Msg>,
249}
250
251#[async_trait]
252impl<H, Msg> OpaqueMessageHandler for PhantomHandler<H, Msg>
253where
254    Msg: RequestContents + Send + Sync + 'static,
255    H: Handler<Msg> + Send + Sync + 'static,
256{
257    async fn try_handle(
258        &self,
259        remote_addr: SocketAddr,
260        data: Body,
261    ) -> Result<Body, AlignedVec> {
262        let view = match Msg::from_body(data).await {
263            Ok(view) => view,
264            Err(status) => {
265                let error = rkyv::to_bytes::<_, SCRATCH_SPACE>(&status)
266                    .unwrap_or_else(|_| AlignedVec::new());
267                return Err(error);
268            },
269        };
270
271        let msg = Request::new(remote_addr, view);
272
273        self.handler
274            .on_message(msg)
275            .await
276            .and_then(|reply| reply.try_into_body())
277            .map_err(|status| {
278                rkyv::to_bytes::<_, SCRATCH_SPACE>(&status)
279                    .unwrap_or_else(|_| AlignedVec::new())
280            })
281    }
282}
283
284/// The serializer trait converting replies into hyper bodies.
285///
286/// This is a light abstraction to allow users to be able to
287/// stream data across the RPC system which may not fit in memory.
288///
289/// Any types which implement [TryAsBody] will implement this type.
290pub trait TryIntoBody {
291    /// Try convert the reply into a body or return an error
292    /// status.
293    fn try_into_body(self) -> Result<Body, Status>;
294}
295
296/// The serializer trait for converting replies into hyper bodies
297/// using a reference to self.
298///
299/// This will work for most implementations but if you want to stream
300/// hyper bodies for example, you cannot implement this trait.
301pub trait TryAsBody {
302    /// Try convert the reply into a body or return an error
303    /// status.
304    fn try_as_body(&self) -> Result<Body, Status>;
305}
306
307impl<T> TryAsBody for T
308where
309    T: Archive + Serialize<AllocSerializer<SCRATCH_SPACE>>,
310{
311    fn try_as_body(&self) -> Result<Body, Status> {
312        rkyv::to_bytes::<_, SCRATCH_SPACE>(self)
313            .map(|v| Body::from(v.to_vec()))
314            .map_err(|e| Status::internal(e.to_string()))
315    }
316}
317
318impl<T> TryIntoBody for T
319where
320    T: TryAsBody,
321{
322    fn try_into_body(self) -> Result<Body, Status> {
323        <Self as TryAsBody>::try_as_body(&self)
324    }
325}
326
327impl TryIntoBody for Body {
328    fn try_into_body(self) -> Result<Body, Status> {
329        Ok(self)
330    }
331}