workflow_rpc/server/protocol/
serde_json.rs1use super::Encoding;
7use crate::imports::*;
8use crate::messages::serde_json::*;
9pub use crate::server::result::Result;
10use crate::server::Interface;
11use crate::server::ProtocolHandler;
12use workflow_websocket::server::{
13 Error as WebSocketError, Message, Result as WebSocketResult, WebSocketSink,
14};
15
16pub struct JsonProtocol<ServerContext, ConnectionContext, Ops, Id>
18where
19 ServerContext: Clone + Send + Sync + 'static,
20 ConnectionContext: Clone + Send + Sync + 'static,
21 Ops: OpsT,
22 Id: IdT,
23{
24 id: PhantomData<Id>,
25 ops: PhantomData<Ops>,
26 interface: Arc<Interface<ServerContext, ConnectionContext, Ops>>,
27}
28
29#[async_trait]
30impl<ServerContext, ConnectionContext, Ops, Id>
31 ProtocolHandler<ServerContext, ConnectionContext, Ops>
32 for JsonProtocol<ServerContext, ConnectionContext, Ops, Id>
33where
34 ServerContext: Clone + Send + Sync + 'static,
35 ConnectionContext: Clone + Send + Sync + 'static,
36 Ops: OpsT,
37 Id: IdT,
38{
39 fn new(interface: Arc<Interface<ServerContext, ConnectionContext, Ops>>) -> Self
40 where
41 Self: Sized,
42 {
43 JsonProtocol {
44 id: PhantomData,
45 ops: PhantomData,
46 interface,
47 }
48 }
49
50 fn encoding(&self) -> Encoding {
51 Encoding::SerdeJson
52 }
53
54 async fn handle_message(
55 &self,
56 connection_ctx: ConnectionContext,
57 msg: Message,
58 sink: &WebSocketSink,
59 ) -> WebSocketResult<()> {
60 let text = &msg.into_text()?;
61 let req: JsonClientMessage<Ops, Id> =
62 serde_json::from_str(text).map_err(|_| WebSocketError::MalformedMessage)?;
63
64 if req.id.is_some() {
65 let result = self
66 .interface
67 .call_method_with_serde_json(&req.method, connection_ctx, req.params)
68 .await;
69
70 match result {
71 Ok(payload) => {
72 if let Ok(msg) = serde_json::to_string(&JSONServerMessage::new(
73 req.id,
74 Some(req.method),
75 Some(payload),
76 None,
77 )) {
78 if let Err(e) = sink.send(msg.into()) {
79 log_trace!("Sink error: {:?}", e);
80 }
81 }
82 }
83 Err(err) => {
84 if err == ServerError::Close {
85 return Err(WebSocketError::ServerClose);
86 } else {
87 let server_err = JsonServerError::from(err);
88 if let Ok(msg) = serde_json::to_string(&JSONServerMessage::new(
89 req.id,
90 Some(req.method),
91 None,
92 Some(server_err),
93 )) {
94 if let Err(e) = sink.send(msg.into()) {
95 log_trace!("Sink error: {:?}", e);
96 }
97 }
98 }
99 }
100 }
101 } else {
102 self.interface
103 .call_notification_with_serde_json(&req.method, connection_ctx, req.params)
104 .await
105 .unwrap_or_else(|err| {
106 log_trace!("error handling client-side notification {}", err)
107 });
108 }
109 Ok(())
110 }
111
112 fn serialize_notification_message<Msg>(&self, op: Ops, msg: Msg) -> Result<tungstenite::Message>
113 where
114 Msg: Serialize + Send + Sync + 'static,
115 {
116 create_serialized_notification_message(op, msg)
117 }
118}
119
120pub fn create_serialized_notification_message<Ops, Msg>(op: Ops, msg: Msg) -> Result<Message>
121where
122 Ops: OpsT,
123 Msg: Serialize + Send + Sync + 'static,
124{
125 let payload = serde_json::to_value(msg)?;
126 let json = serde_json::to_string(&JSONServerMessage::<Ops, ()>::new(
127 None,
128 Some(op),
129 Some(payload),
130 None,
131 ))?;
132 Ok(Message::Text(json))
133}