workflow_rpc/server/protocol/
borsh.rs

1//!
2//! Module containing [`BorshProtocol`] responsible for server-side
3//! dispatch of RPC methods and notifications when using `Borsh`
4//! protocol.
5//!
6
7use super::Encoding;
8use crate::imports::*;
9use crate::messages::borsh::*;
10pub use crate::server::result::Result;
11use crate::server::Interface;
12use crate::server::ProtocolHandler;
13use workflow_websocket::server::{
14    Error as WebSocketError, Message, Result as WebSocketResult, WebSocketSink,
15};
16
17/// Server-side message serializer and dispatcher when using `Borsh` protocol.
18pub struct BorshProtocol<ServerContext, ConnectionContext, Ops, Id>
19where
20    ServerContext: Clone + Send + Sync + 'static,
21    ConnectionContext: Clone + Send + Sync + 'static,
22    Ops: OpsT,
23    Id: IdT,
24{
25    id: PhantomData<Id>,
26    ops: PhantomData<Ops>,
27    interface: Arc<Interface<ServerContext, ConnectionContext, Ops>>,
28}
29
30#[async_trait]
31impl<ServerContext, ConnectionContext, Ops, Id>
32    ProtocolHandler<ServerContext, ConnectionContext, Ops>
33    for BorshProtocol<ServerContext, ConnectionContext, Ops, Id>
34where
35    ServerContext: Clone + Send + Sync + 'static,
36    ConnectionContext: Clone + Send + Sync + 'static,
37    Ops: OpsT,
38    Id: IdT,
39{
40    fn new(interface: Arc<Interface<ServerContext, ConnectionContext, Ops>>) -> Self
41    where
42        Self: Sized,
43    {
44        BorshProtocol {
45            id: PhantomData,
46            ops: PhantomData,
47            interface,
48        }
49    }
50
51    fn encoding(&self) -> Encoding {
52        Encoding::Borsh
53    }
54
55    async fn handle_message(
56        &self,
57        connection_ctx: ConnectionContext,
58        msg: Message,
59        sink: &WebSocketSink,
60    ) -> WebSocketResult<()> {
61        let data = &msg.into_data();
62        let req: BorshClientMessage<Ops, Id> = data
63            .try_into()
64            .map_err(|_| WebSocketError::MalformedMessage)?;
65
66        if req.header.id.is_some() {
67            let result = self
68                .interface
69                .call_method_with_borsh(&req.header.op, connection_ctx, req.payload)
70                .await;
71
72            match result {
73                Ok(data) => {
74                    if let Ok(msg) = BorshServerMessage::<Ops, Id>::new(
75                        BorshServerMessageHeader::new(
76                            req.header.id,
77                            ServerMessageKind::Success,
78                            Some(req.header.op),
79                        ),
80                        &data,
81                    )
82                    .try_to_vec()
83                    {
84                        if let Err(e) = sink.send(msg.into()) {
85                            log_trace!("Sink error: {:?}", e);
86                        }
87                    }
88                }
89                Err(err) => {
90                    // log_trace!("RPC server error: {:?} req: {:#?}", err, req);
91                    if err == ServerError::Close {
92                        return Err(WebSocketError::ServerClose);
93                    } else if let Ok(err_vec) = borsh::to_vec(&err) {
94                        if let Ok(msg) = BorshServerMessage::new(
95                            BorshServerMessageHeader::<Ops, Id>::new(
96                                req.header.id,
97                                ServerMessageKind::Error,
98                                None,
99                            ),
100                            &err_vec,
101                        )
102                        .try_to_vec()
103                        {
104                            if let Err(e) = sink.send(msg.into()) {
105                                log_trace!("Sink error: {:?}", e);
106                            }
107                        }
108                    }
109                }
110            }
111        } else {
112            self.interface
113                .call_notification_with_borsh(&req.header.op, connection_ctx, req.payload)
114                .await
115                .unwrap_or_else(|err| {
116                    log_trace!("error handling client-side notification {}", err)
117                });
118        }
119
120        Ok(())
121    }
122
123    fn serialize_notification_message<Msg>(&self, op: Ops, msg: Msg) -> Result<tungstenite::Message>
124    where
125        Msg: BorshSerialize + Send + Sync + 'static,
126    {
127        create_serialized_notification_message(op, msg)
128    }
129}
130
131pub fn create_serialized_notification_message<Ops, Msg>(op: Ops, msg: Msg) -> Result<Message>
132where
133    Ops: OpsT,
134    Msg: BorshSerialize + Send + Sync + 'static,
135{
136    let payload = borsh::to_vec(&msg)?;
137    let data = BorshServerMessage::new(
138        BorshServerMessageHeader::<Ops, ()>::new(None, ServerMessageKind::Notification, Some(op)),
139        &payload,
140    )
141    .try_to_vec()?;
142    Ok(Message::Binary(data))
143}