workflow_rpc/server/interface/
mod.rs1pub mod method;
7pub mod notification;
8
9use crate::imports::*;
10pub use method::*;
11pub use notification::*;
12
13pub struct Interface<ServerContext, ConnectionContext, Ops>
17where
18 ServerContext: Clone + Send + Sync + 'static,
19 ConnectionContext: Clone + Send + Sync + 'static,
20 Ops: OpsT,
21{
22 server_ctx: ServerContext,
23 methods: AHashMap<Ops, Box<dyn MethodTrait<ServerContext, ConnectionContext>>>,
24 notifications: AHashMap<Ops, Box<dyn NotificationTrait<ServerContext, ConnectionContext>>>,
25}
26
27impl<ServerContext, ConnectionContext, Ops> Interface<ServerContext, ConnectionContext, Ops>
28where
29 ServerContext: Clone + Send + Sync + 'static,
30 ConnectionContext: Clone + Send + Sync + 'static,
31 Ops: OpsT,
32{
33 pub fn new(server_ctx: ServerContext) -> Interface<ServerContext, ConnectionContext, Ops> {
38 Interface {
39 server_ctx,
40 methods: AHashMap::new(),
41 notifications: AHashMap::new(),
42 }
43 }
44
45 pub fn method<Req, Resp>(
63 &mut self,
64 op: Ops,
65 method: Method<ServerContext, ConnectionContext, Req, Resp>,
66 )
67 where
69 Ops: Debug + Clone,
70 Req: MsgT,
71 Resp: MsgT,
72 {
73 let method: Box<dyn MethodTrait<ServerContext, ConnectionContext>> = Box::new(method);
74 if self.methods.insert(op.clone(), method).is_some() {
75 panic!("RPC method {op:?} is declared multiple times")
76 }
77 }
78
79 pub fn notification<Msg>(
97 &mut self,
98 op: Ops,
99 method: Notification<ServerContext, ConnectionContext, Msg>,
100 ) where
101 Ops: Debug + Clone,
102 Msg: MsgT,
103 {
104 let method: Box<dyn NotificationTrait<ServerContext, ConnectionContext>> = Box::new(method);
105 if self.notifications.insert(op.clone(), method).is_some() {
106 panic!("RPC notification {op:?} is declared multiple times")
107 }
108 }
109
110 pub(crate) async fn call_method_with_borsh(
111 &self,
112 op: &Ops,
113 connection_ctx: ConnectionContext,
114 payload: &[u8],
115 ) -> ServerResult<Vec<u8>> {
116 if let Some(method) = self.methods.get(op) {
117 method
118 .call_with_borsh(self.server_ctx.clone(), connection_ctx, payload)
119 .await
120 } else {
121 Err(ServerError::NotFound)
122 }
123 }
124
125 pub(crate) async fn call_method_with_serde_json(
126 &self,
127 op: &Ops,
128 connection_ctx: ConnectionContext,
129 payload: Value,
130 ) -> ServerResult<Value> {
131 if let Some(method) = self.methods.get(op) {
132 method
133 .call_with_serde_json(self.server_ctx.clone(), connection_ctx, payload)
134 .await
135 } else {
136 Err(ServerError::NotFound)
137 }
138 }
139
140 pub(crate) async fn call_notification_with_borsh(
141 &self,
142 op: &Ops,
143 connection_ctx: ConnectionContext,
144 payload: &[u8],
145 ) -> ServerResult<()> {
146 if let Some(notification) = self.notifications.get(op) {
147 notification
148 .call_with_borsh(self.server_ctx.clone(), connection_ctx, payload)
149 .await
150 } else {
151 Err(ServerError::NotFound)
152 }
153 }
154
155 pub(crate) async fn call_notification_with_serde_json(
156 &self,
157 op: &Ops,
158 connection_ctx: ConnectionContext,
159 payload: Value,
160 ) -> ServerResult<()> {
161 if let Some(notification) = self.notifications.get(op) {
162 notification
163 .call_with_serde_json(self.server_ctx.clone(), connection_ctx, payload)
164 .await
165 } else {
166 Err(ServerError::NotFound)
167 }
168 }
169}