workflow_rpc/server/protocol/
serde_json.rs

1//!
2//! Module containing [`JsonProtocol`] responsible for server-side
3//! dispatch of RPC methods and notifications when using `JSON`
4//! protocol.
5//!
6use super::Encoding;
7use crate::imports::*;
8use crate::messages::serde_json::*;
9pub use crate::server::result::Result;
10use crate::server::Interface;
11use crate::server::ProtocolHandler;
12use workflow_websocket::server::{
13    Error as WebSocketError, Message, Result as WebSocketResult, WebSocketSink,
14};
15
16/// Server-side message serializer and dispatcher when using `JSON` protocol.
17pub struct JsonProtocol<ServerContext, ConnectionContext, Ops, Id>
18where
19    ServerContext: Clone + Send + Sync + 'static,
20    ConnectionContext: Clone + Send + Sync + 'static,
21    Ops: OpsT,
22    Id: IdT,
23{
24    id: PhantomData<Id>,
25    ops: PhantomData<Ops>,
26    interface: Arc<Interface<ServerContext, ConnectionContext, Ops>>,
27}
28
29#[async_trait]
30impl<ServerContext, ConnectionContext, Ops, Id>
31    ProtocolHandler<ServerContext, ConnectionContext, Ops>
32    for JsonProtocol<ServerContext, ConnectionContext, Ops, Id>
33where
34    ServerContext: Clone + Send + Sync + 'static,
35    ConnectionContext: Clone + Send + Sync + 'static,
36    Ops: OpsT,
37    Id: IdT,
38{
39    fn new(interface: Arc<Interface<ServerContext, ConnectionContext, Ops>>) -> Self
40    where
41        Self: Sized,
42    {
43        JsonProtocol {
44            id: PhantomData,
45            ops: PhantomData,
46            interface,
47        }
48    }
49
50    fn encoding(&self) -> Encoding {
51        Encoding::SerdeJson
52    }
53
54    async fn handle_message(
55        &self,
56        connection_ctx: ConnectionContext,
57        msg: Message,
58        sink: &WebSocketSink,
59    ) -> WebSocketResult<()> {
60        let text = &msg.into_text()?;
61        let req: JsonClientMessage<Ops, Id> =
62            serde_json::from_str(text).map_err(|_| WebSocketError::MalformedMessage)?;
63
64        if req.id.is_some() {
65            let result = self
66                .interface
67                .call_method_with_serde_json(&req.method, connection_ctx, req.params)
68                .await;
69
70            match result {
71                Ok(payload) => {
72                    if let Ok(msg) = serde_json::to_string(&JSONServerMessage::new(
73                        req.id,
74                        Some(req.method),
75                        Some(payload),
76                        None,
77                    )) {
78                        if let Err(e) = sink.send(msg.into()) {
79                            log_trace!("Sink error: {:?}", e);
80                        }
81                    }
82                }
83                Err(err) => {
84                    if err == ServerError::Close {
85                        return Err(WebSocketError::ServerClose);
86                    } else {
87                        let server_err = JsonServerError::from(err);
88                        if let Ok(msg) = serde_json::to_string(&JSONServerMessage::new(
89                            req.id,
90                            Some(req.method),
91                            None,
92                            Some(server_err),
93                        )) {
94                            if let Err(e) = sink.send(msg.into()) {
95                                log_trace!("Sink error: {:?}", e);
96                            }
97                        }
98                    }
99                }
100            }
101        } else {
102            self.interface
103                .call_notification_with_serde_json(&req.method, connection_ctx, req.params)
104                .await
105                .unwrap_or_else(|err| {
106                    log_trace!("error handling client-side notification {}", err)
107                });
108        }
109        Ok(())
110    }
111
112    fn serialize_notification_message<Msg>(&self, op: Ops, msg: Msg) -> Result<tungstenite::Message>
113    where
114        Msg: Serialize + Send + Sync + 'static,
115    {
116        create_serialized_notification_message(op, msg)
117    }
118}
119
120pub fn create_serialized_notification_message<Ops, Msg>(op: Ops, msg: Msg) -> Result<Message>
121where
122    Ops: OpsT,
123    Msg: Serialize + Send + Sync + 'static,
124{
125    let payload = serde_json::to_value(msg)?;
126    let json = serde_json::to_string(&JSONServerMessage::<Ops, ()>::new(
127        None,
128        Some(op),
129        Some(payload),
130        None,
131    ))?;
132    Ok(Message::Text(json))
133}