workflow_rpc/
messages.rs

1//!
2//! RPC message serialization module (header serialization and deserialization for `Borsh` and `JSON` data structures)
3//!
4
5pub mod serde_json {
6    //! RPC message serialization for JSON encoding
7    use serde::{Deserialize, Serialize};
8    use serde_json::{self, Value};
9
10    #[derive(Debug, Serialize, Deserialize)]
11    pub struct JsonClientMessage<Ops, Id> {
12        // pub jsonrpc: String,
13        pub id: Option<Id>,
14        pub method: Ops,
15        pub params: Value,
16    }
17
18    impl<Ops, Id> JsonClientMessage<Ops, Id> {
19        pub fn new(id: Option<Id>, method: Ops, payload: Value) -> Self {
20            JsonClientMessage {
21                // jsonrpc: "2.0".to_owned(),
22                id,
23                method,
24                params: payload,
25            }
26        }
27    }
28
29    #[derive(Debug, Serialize, Deserialize)]
30    pub struct JSONServerMessage<Ops, Id> {
31        // pub jsonrpc: String,
32        #[serde(skip_serializing_if = "Option::is_none")]
33        pub id: Option<Id>,
34        #[serde(skip_serializing_if = "Option::is_none")]
35        pub method: Option<Ops>,
36        #[serde(skip_serializing_if = "Option::is_none")]
37        pub params: Option<Value>,
38        // #[serde(skip_serializing_if = "Option::is_none")]
39        // pub result: Option<Value>,
40        #[serde(skip_serializing_if = "Option::is_none")]
41        pub error: Option<JsonServerError>,
42    }
43
44    impl<Ops, Id> JSONServerMessage<Ops, Id> {
45        pub fn new(
46            id: Option<Id>,
47            method: Option<Ops>,
48            params: Option<Value>,
49            // result: Option<Value>,
50            error: Option<JsonServerError>,
51        ) -> Self {
52            JSONServerMessage {
53                // jsonrpc: "2.0".to_owned(),
54                method,
55                params,
56                // result,
57                error,
58                id,
59            }
60        }
61    }
62
63    #[derive(Debug, Serialize, Deserialize)]
64    pub struct JsonServerError {
65        code: u64,
66        message: String,
67        data: Option<Value>,
68    }
69
70    impl std::fmt::Display for JsonServerError {
71        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
72            write!(
73                f,
74                "code:{}  message:`{}` data:{:?}",
75                self.code, self.message, self.data
76            )
77        }
78    }
79
80    impl From<crate::error::ServerError> for JsonServerError {
81        fn from(err: crate::error::ServerError) -> Self {
82            JsonServerError {
83                code: 0, //err.code,
84                message: err.to_string(),
85                data: None, //err.data,
86            }
87        }
88    }
89}
90
91pub mod borsh {
92    //! RPC message serialization for Borsh encoding
93
94    use crate::error::Error;
95    use borsh::{BorshDeserialize, BorshSerialize};
96    use workflow_websocket::client::message::Message as WebSocketMessage;
97    // use borsh::de::*;
98
99    pub fn to_ws_msg<Ops, Id>(header: BorshReqHeader<Ops, Id>, payload: &[u8]) -> WebSocketMessage
100    where
101        Id: BorshSerialize + BorshDeserialize,
102        Ops: BorshSerialize + BorshDeserialize,
103    {
104        let header = borsh::to_vec(&header).expect("to_ws_msg header serialize error");
105        let header_len = header.len();
106        let len = payload.len() + header_len;
107        let mut buffer = Vec::with_capacity(len);
108        #[allow(clippy::uninit_vec)]
109        unsafe {
110            buffer.set_len(len);
111        }
112        buffer[0..header_len].copy_from_slice(&header);
113        buffer[header_len..].copy_from_slice(payload);
114        buffer.into()
115    }
116
117    #[derive(Debug, BorshSerialize, BorshDeserialize)]
118    pub struct BorshReqHeader<Ops, Id>
119    where
120        Id: BorshSerialize + BorshDeserialize,
121        Ops: BorshSerialize + BorshDeserialize,
122    {
123        pub id: Option<Id>, //u64,
124        pub op: Ops,
125    }
126
127    impl<Ops, Id> BorshReqHeader<Ops, Id>
128    where
129        Id: BorshSerialize + BorshDeserialize,
130        Ops: BorshSerialize + BorshDeserialize,
131    {
132        pub fn new(id: Option<Id>, op: Ops) -> Self {
133            BorshReqHeader { id, op }
134        }
135    }
136
137    #[derive(Debug, BorshSerialize, BorshDeserialize)]
138    pub struct BorshServerMessageHeader<Ops, Id> {
139        pub id: Option<Id>, //u64,
140        pub kind: ServerMessageKind,
141        pub op: Option<Ops>,
142    }
143
144    impl<Ops, Id> BorshServerMessageHeader<Ops, Id>
145    // where
146    //     Id: Default,
147    {
148        pub fn new(id: Option<Id>, kind: ServerMessageKind, op: Option<Ops>) -> Self {
149            Self { id, kind, op }
150        }
151    }
152
153    #[derive(Debug, Clone, Copy, BorshSerialize, BorshDeserialize)]
154    #[borsh(use_discriminant = true)]
155    pub enum ServerMessageKind {
156        Success = 0,
157        Error = 1,
158        Notification = 0xff,
159    }
160
161    impl From<ServerMessageKind> for u32 {
162        fn from(kind: ServerMessageKind) -> u32 {
163            kind as u32
164        }
165    }
166
167    #[derive(Debug)]
168    pub enum RespError<T>
169    where
170        T: BorshDeserialize,
171    {
172        NoData,
173        Data(T),
174        Rpc(Error),
175    }
176
177    #[derive(Debug)]
178    pub struct BorshClientMessage<'data, Ops, Id>
179    where
180        Id: BorshSerialize + BorshDeserialize + 'data,
181        Ops: BorshSerialize + BorshDeserialize + 'data,
182    {
183        pub header: BorshReqHeader<Ops, Id>,
184        pub payload: &'data [u8],
185    }
186
187    impl<'data, Ops, Id> TryFrom<&'data Vec<u8>> for BorshClientMessage<'data, Ops, Id>
188    where
189        Id: BorshSerialize + BorshDeserialize + 'data,
190        Ops: BorshSerialize + BorshDeserialize + 'data,
191    {
192        type Error = Error;
193
194        fn try_from(src: &'data Vec<u8>) -> Result<Self, Self::Error> {
195            let v: BorshClientMessage<Ops, Id> = src[..].try_into()?;
196            Ok(v)
197        }
198    }
199
200    impl<'data, Ops, Id> TryFrom<&'data [u8]> for BorshClientMessage<'data, Ops, Id>
201    where
202        Id: BorshSerialize + BorshDeserialize + 'data,
203        Ops: BorshSerialize + BorshDeserialize + 'data,
204    {
205        type Error = Error;
206
207        fn try_from(src: &'data [u8]) -> Result<Self, Self::Error> {
208            let mut payload = src;
209            let header = BorshReqHeader::<Ops, Id>::deserialize(&mut payload)?;
210            let message = BorshClientMessage { header, payload };
211            Ok(message)
212        }
213    }
214
215    #[derive(Debug)]
216    pub struct BorshServerMessage<'data, Ops, Id>
217    where
218        Id: BorshSerialize + BorshDeserialize + 'data,
219        Ops: BorshSerialize + BorshDeserialize + 'data,
220    {
221        pub header: BorshServerMessageHeader<Ops, Id>,
222        pub payload: &'data [u8],
223    }
224
225    impl<'data, Ops, Id> BorshServerMessage<'data, Ops, Id>
226    where
227        Id: BorshSerialize + BorshDeserialize + 'data,
228        Ops: BorshSerialize + BorshDeserialize + 'data,
229    {
230        pub fn new(
231            header: BorshServerMessageHeader<Ops, Id>,
232            payload: &'data [u8],
233        ) -> BorshServerMessage<'data, Ops, Id> {
234            BorshServerMessage { header, payload }
235        }
236
237        pub fn try_to_vec(&self) -> Result<Vec<u8>, Error> {
238            let header = borsh::to_vec(&self.header)?;
239            let header_len = header.len();
240
241            let len = header_len + self.payload.len();
242            let mut buffer = Vec::with_capacity(len);
243            #[allow(clippy::uninit_vec)]
244            unsafe {
245                buffer.set_len(len);
246            }
247
248            buffer[0..header_len].copy_from_slice(&header);
249            if !self.payload.is_empty() {
250                buffer[header_len..].copy_from_slice(self.payload);
251            }
252            Ok(buffer)
253        }
254    }
255
256    impl<'data, Ops, Id> TryFrom<&'data [u8]> for BorshServerMessage<'data, Ops, Id>
257    where
258        Id: BorshSerialize + BorshDeserialize + 'data,
259        Ops: BorshSerialize + BorshDeserialize + 'data,
260    {
261        type Error = Error;
262
263        fn try_from(src: &'data [u8]) -> Result<Self, Self::Error> {
264            let mut payload = src;
265            let header = <BorshServerMessageHeader<Ops, Id>>::deserialize(&mut payload)?;
266            let message = BorshServerMessage { header, payload };
267            Ok(message)
268        }
269    }
270}