workflow_rpc/server/interface/
mod.rs

1//!
2//! Module containing an [`Interface`] struct that carries
3//! mappings of RPC method and notification handlers.
4//!
5
6pub mod method;
7pub mod notification;
8
9use crate::imports::*;
10pub use method::*;
11pub use notification::*;
12
13/// [`Interface`] struct carries a mapping of RPC methods
14/// and notifications, used by protocols to dispatch calls
15/// to their respective handlers.
16pub struct Interface<ServerContext, ConnectionContext, Ops>
17where
18    ServerContext: Clone + Send + Sync + 'static,
19    ConnectionContext: Clone + Send + Sync + 'static,
20    Ops: OpsT,
21{
22    server_ctx: ServerContext,
23    methods: AHashMap<Ops, Box<dyn MethodTrait<ServerContext, ConnectionContext>>>,
24    notifications: AHashMap<Ops, Box<dyn NotificationTrait<ServerContext, ConnectionContext>>>,
25}
26
27impl<ServerContext, ConnectionContext, Ops> Interface<ServerContext, ConnectionContext, Ops>
28where
29    ServerContext: Clone + Send + Sync + 'static,
30    ConnectionContext: Clone + Send + Sync + 'static,
31    Ops: OpsT,
32{
33    /// Create an interface that will contain user-defined
34    /// RPC message and notification handlers. This method
35    /// accepts `server_ctx` argument that will be subsequently
36    /// passed to each RPC method or notification invocation.
37    pub fn new(server_ctx: ServerContext) -> Interface<ServerContext, ConnectionContext, Ops> {
38        Interface {
39            server_ctx,
40            methods: AHashMap::new(),
41            notifications: AHashMap::new(),
42        }
43    }
44
45    ///
46    /// Declare an RPC method handler. You can use a [`method!()`](macro@crate::server::method)
47    /// macro to declare the method as follows:
48    ///
49    ///
50    /// ```ignore
51    /// interface.method(MyOps::Method, method!(
52    ///   | connection_ctx: ConnectionCtx,
53    ///     server_ctx: ServerContext,
54    ///     req: MyReq |
55    /// async move {
56    ///     // ...
57    ///     Ok(MyResp { })
58    /// }))
59    /// ```
60    ///
61    ///
62    pub fn method<Req, Resp>(
63        &mut self,
64        op: Ops,
65        method: Method<ServerContext, ConnectionContext, Req, Resp>,
66    )
67    // -> Self
68    where
69        Ops: Debug + Clone,
70        Req: MsgT,
71        Resp: MsgT,
72    {
73        let method: Box<dyn MethodTrait<ServerContext, ConnectionContext>> = Box::new(method);
74        if self.methods.insert(op.clone(), method).is_some() {
75            panic!("RPC method {op:?} is declared multiple times")
76        }
77    }
78
79    ///
80    /// Declare an RPC notification handler. You can use a [`notification!()`](macro@crate::server::notification)
81    /// macro to declare the notification as follows:
82    ///
83    ///
84    /// ```ignore
85    /// interface.notification(MyOps::Notify, notification!(
86    ///   | connection_ctx: ConnectionCtx,
87    ///     server_ctx: ServerContext,
88    ///     msg: MyMsg |
89    /// async move {
90    ///     // ...
91    ///     Ok(())
92    /// }))
93    /// ```
94    ///
95    ///
96    pub fn notification<Msg>(
97        &mut self,
98        op: Ops,
99        method: Notification<ServerContext, ConnectionContext, Msg>,
100    ) where
101        Ops: Debug + Clone,
102        Msg: MsgT,
103    {
104        let method: Box<dyn NotificationTrait<ServerContext, ConnectionContext>> = Box::new(method);
105        if self.notifications.insert(op.clone(), method).is_some() {
106            panic!("RPC notification {op:?} is declared multiple times")
107        }
108    }
109
110    pub(crate) async fn call_method_with_borsh(
111        &self,
112        op: &Ops,
113        connection_ctx: ConnectionContext,
114        payload: &[u8],
115    ) -> ServerResult<Vec<u8>> {
116        if let Some(method) = self.methods.get(op) {
117            method
118                .call_with_borsh(self.server_ctx.clone(), connection_ctx, payload)
119                .await
120        } else {
121            Err(ServerError::NotFound)
122        }
123    }
124
125    pub(crate) async fn call_method_with_serde_json(
126        &self,
127        op: &Ops,
128        connection_ctx: ConnectionContext,
129        payload: Value,
130    ) -> ServerResult<Value> {
131        if let Some(method) = self.methods.get(op) {
132            method
133                .call_with_serde_json(self.server_ctx.clone(), connection_ctx, payload)
134                .await
135        } else {
136            Err(ServerError::NotFound)
137        }
138    }
139
140    pub(crate) async fn call_notification_with_borsh(
141        &self,
142        op: &Ops,
143        connection_ctx: ConnectionContext,
144        payload: &[u8],
145    ) -> ServerResult<()> {
146        if let Some(notification) = self.notifications.get(op) {
147            notification
148                .call_with_borsh(self.server_ctx.clone(), connection_ctx, payload)
149                .await
150        } else {
151            Err(ServerError::NotFound)
152        }
153    }
154
155    pub(crate) async fn call_notification_with_serde_json(
156        &self,
157        op: &Ops,
158        connection_ctx: ConnectionContext,
159        payload: Value,
160    ) -> ServerResult<()> {
161        if let Some(notification) = self.notifications.get(op) {
162            notification
163                .call_with_serde_json(self.server_ctx.clone(), connection_ctx, payload)
164                .await
165        } else {
166            Err(ServerError::NotFound)
167        }
168    }
169}