workflow_rpc/server/protocol/
borsh.rs1use super::Encoding;
8use crate::imports::*;
9use crate::messages::borsh::*;
10pub use crate::server::result::Result;
11use crate::server::Interface;
12use crate::server::ProtocolHandler;
13use workflow_websocket::server::{
14 Error as WebSocketError, Message, Result as WebSocketResult, WebSocketSink,
15};
16
17pub struct BorshProtocol<ServerContext, ConnectionContext, Ops, Id>
19where
20 ServerContext: Clone + Send + Sync + 'static,
21 ConnectionContext: Clone + Send + Sync + 'static,
22 Ops: OpsT,
23 Id: IdT,
24{
25 id: PhantomData<Id>,
26 ops: PhantomData<Ops>,
27 interface: Arc<Interface<ServerContext, ConnectionContext, Ops>>,
28}
29
30#[async_trait]
31impl<ServerContext, ConnectionContext, Ops, Id>
32 ProtocolHandler<ServerContext, ConnectionContext, Ops>
33 for BorshProtocol<ServerContext, ConnectionContext, Ops, Id>
34where
35 ServerContext: Clone + Send + Sync + 'static,
36 ConnectionContext: Clone + Send + Sync + 'static,
37 Ops: OpsT,
38 Id: IdT,
39{
40 fn new(interface: Arc<Interface<ServerContext, ConnectionContext, Ops>>) -> Self
41 where
42 Self: Sized,
43 {
44 BorshProtocol {
45 id: PhantomData,
46 ops: PhantomData,
47 interface,
48 }
49 }
50
51 fn encoding(&self) -> Encoding {
52 Encoding::Borsh
53 }
54
55 async fn handle_message(
56 &self,
57 connection_ctx: ConnectionContext,
58 msg: Message,
59 sink: &WebSocketSink,
60 ) -> WebSocketResult<()> {
61 let data = &msg.into_data();
62 let req: BorshClientMessage<Ops, Id> = data
63 .try_into()
64 .map_err(|_| WebSocketError::MalformedMessage)?;
65
66 if req.header.id.is_some() {
67 let result = self
68 .interface
69 .call_method_with_borsh(&req.header.op, connection_ctx, req.payload)
70 .await;
71
72 match result {
73 Ok(data) => {
74 if let Ok(msg) = BorshServerMessage::<Ops, Id>::new(
75 BorshServerMessageHeader::new(
76 req.header.id,
77 ServerMessageKind::Success,
78 Some(req.header.op),
79 ),
80 &data,
81 )
82 .try_to_vec()
83 {
84 if let Err(e) = sink.send(msg.into()) {
85 log_trace!("Sink error: {:?}", e);
86 }
87 }
88 }
89 Err(err) => {
90 if err == ServerError::Close {
92 return Err(WebSocketError::ServerClose);
93 } else if let Ok(err_vec) = borsh::to_vec(&err) {
94 if let Ok(msg) = BorshServerMessage::new(
95 BorshServerMessageHeader::<Ops, Id>::new(
96 req.header.id,
97 ServerMessageKind::Error,
98 None,
99 ),
100 &err_vec,
101 )
102 .try_to_vec()
103 {
104 if let Err(e) = sink.send(msg.into()) {
105 log_trace!("Sink error: {:?}", e);
106 }
107 }
108 }
109 }
110 }
111 } else {
112 self.interface
113 .call_notification_with_borsh(&req.header.op, connection_ctx, req.payload)
114 .await
115 .unwrap_or_else(|err| {
116 log_trace!("error handling client-side notification {}", err)
117 });
118 }
119
120 Ok(())
121 }
122
123 fn serialize_notification_message<Msg>(&self, op: Ops, msg: Msg) -> Result<tungstenite::Message>
124 where
125 Msg: BorshSerialize + Send + Sync + 'static,
126 {
127 create_serialized_notification_message(op, msg)
128 }
129}
130
131pub fn create_serialized_notification_message<Ops, Msg>(op: Ops, msg: Msg) -> Result<Message>
132where
133 Ops: OpsT,
134 Msg: BorshSerialize + Send + Sync + 'static,
135{
136 let payload = borsh::to_vec(&msg)?;
137 let data = BorshServerMessage::new(
138 BorshServerMessageHeader::<Ops, ()>::new(None, ServerMessageKind::Notification, Some(op)),
139 &payload,
140 )
141 .try_to_vec()?;
142 Ok(Message::Binary(data))
143}