volt_ws_protocol/
rpc.rs

1use crate::{
2    binary_message::BinaryMessage,
3    constant::{PAYLOAD_TYPE_KEY_EXCHANGE, PAYLOAD_TYPE_PAYLOAD},
4    crypto_utils::{
5        self, aes_decrypt, aes_encrypt, derive_shared_key, verify_signature, IdentityToken,
6    },
7    log,
8};
9use base64ct::{Base64, Base64UrlUnpadded, Encoding};
10use serde_json::{json, Value};
11use std::{collections::VecDeque, vec};
12
13pub struct Rpc {
14    method_id: u64,
15    method_name: String,
16    metadata: IdentityToken,
17    token: String,
18    response_count: u32,
19    client_end_pending: bool,
20    client_ended: bool,
21    server_ended: bool,
22    target_did: Vec<String>,
23    target_public_key: String,
24    service_id: String,
25    call_started: bool,
26    key_exchange_pending: bool,
27    encryption_key: Option<[u8; 32]>,
28    pending_payloads: VecDeque<String>,
29}
30
31impl Rpc {
32    pub fn new(
33        id: &u64,
34        method_name: &str,
35        target_did: Vec<String>,
36        metadata: IdentityToken,
37        target_public_key: String,
38        service_id: String,
39    ) -> Rpc {
40        Rpc {
41            method_id: id.clone(),
42            method_name: method_name.to_string(),
43            token: metadata.token.clone(),
44            metadata: metadata,
45            response_count: 0,
46            client_end_pending: false,
47            client_ended: false,
48            server_ended: false,
49            target_did: target_did,
50            target_public_key: target_public_key,
51            service_id: service_id,
52            call_started: false,
53            key_exchange_pending: false,
54            encryption_key: None,
55            pending_payloads: VecDeque::new(),
56        }
57    }
58
59    pub fn encode_payload(
60        &self,
61        payload_type: &u8,
62        payload_json: &str,
63        meta_json: Option<&str>,
64    ) -> Vec<u8> {
65        // Create a random IV.
66        let iv_generate = crypto_utils::random_bytes(16);
67        let iv: [u8; 16] = iv_generate.try_into().unwrap();
68
69        // Encrypt the call payload.
70        let encrypted_payload = aes_encrypt(&payload_json, &self.encryption_key.unwrap(), &iv);
71
72        // Create a binary message with the encrypted payload.
73        let bin_message = BinaryMessage::new_payload(
74            payload_type,
75            self.method_id,
76            meta_json.unwrap_or("").as_bytes().to_vec(),
77            iv.to_vec(),
78            encrypted_payload,
79        );
80
81        bin_message.to_bytes()
82    }
83
84    pub fn send(&mut self, payload: String) -> Vec<u8> {
85        if self.server_ended {
86            // The server has closed their side of the stream.
87            log("server has ended the stream, not sending payload");
88
89            // This means we can't send any more data.
90            self.client_ended = true;
91
92            // Don't send any payload.
93            return vec![];
94        }
95
96        if self.call_started {
97            // The call has already started, so we're either sending more data or ending the call.
98            if !payload.is_empty() {
99                log("sending payload");
100
101                // Send the payload to the server
102                let payload_json = json!({
103                    "method_payload": {
104                      "json_payload": payload
105                    }
106                })
107                .to_string();
108
109                self.encode_payload(&PAYLOAD_TYPE_PAYLOAD, &payload_json, None)
110            } else {
111                log("ending call");
112
113                // Tell the server we've finished sending data (the server may continue to send us data).
114                self.client_ended = true;
115
116                let payload_json = json!({
117                    "method_end": {
118                      "ended": true
119                  },
120                })
121                .to_string();
122
123                let meta_json = json!({
124                  "client_end": true
125                })
126                .to_string();
127
128                self.encode_payload(&PAYLOAD_TYPE_PAYLOAD, &payload_json, Some(&meta_json))
129            }
130        } else if let Some(_key) = &self.encryption_key {
131            // The call hasn't started yet but we have an encryption key, indicating that
132            // the key exchange has completed. We can now start the call.
133            log("encryption key is present");
134
135            self.call_started = true;
136
137            let mut method_invoke = json!({
138                "method_name": self.method_name,
139                "json_request": payload
140            });
141
142            if !self.service_id.is_empty() {
143                method_invoke["service_id"] = Value::String(self.service_id.clone());
144            }
145
146            let payload_json = json!({
147                "method_invoke": method_invoke
148            })
149            .to_string();
150
151            self.encode_payload(&PAYLOAD_TYPE_PAYLOAD, &payload_json, None)
152        } else {
153            // The call hasn't started yet and we don't have an encryption key, indicating that
154            // we need to initiate a key exchange.
155            log("encryption key is not present");
156
157            // Key exchange is about to begin or pending, so queue this payload.
158            self.pending_payloads.push_back(payload);
159
160            // Only send the key exchange payload if it's not already pending.
161            if !self.key_exchange_pending {
162                log("sending key exchange payload");
163
164                self.key_exchange_pending = true;
165
166                let mut key_exchange_json = json!({
167                  "i": self.method_id,
168                  "d": self.target_did,
169                });
170
171                if !self.service_id.is_empty() {
172                    key_exchange_json["s"] = Value::String(self.service_id.clone());
173                }
174
175                log(&format!("key exchange json: {}", key_exchange_json));
176
177                // Extract the signature and payload from the token.
178                let token_parts: Vec<&str> = self.token.split('.').collect();
179                let token_payload = Base64UrlUnpadded::decode_vec(token_parts[1]);
180                let token_signature = Base64UrlUnpadded::decode_vec(token_parts[2]);
181
182                let bin_message = BinaryMessage::new_key_exchange(
183                    self.method_id,
184                    key_exchange_json.to_string().as_bytes().to_vec(),
185                    token_signature.unwrap(),
186                    token_payload.unwrap(),
187                );
188
189                bin_message.to_bytes()
190            } else {
191                // We're already waiting for a key exchange to complete, so don't send the payload yet.
192                vec![]
193            }
194        }
195    }
196
197    pub fn receive(&mut self, bin_message: BinaryMessage) -> Result<String, String> {
198        self.response_count += 1;
199
200        if &bin_message.payload_type == PAYLOAD_TYPE_KEY_EXCHANGE {
201            log("received key exchange message");
202
203            // The signed message is a concatenation of the public key and nonce.
204            let mut verify_message = bin_message.public_key.clone();
205            verify_message.extend_from_slice(&bin_message.nonce);
206
207            // Verify the signature using the target service public key (or the Volt
208            let verified = verify_signature(
209                &self.target_public_key,
210                &verify_message,
211                &bin_message.public_key_signature,
212            );
213
214            if !verified {
215                log("signature verification failed");
216                return Err("signature verification failed".to_string());
217            }
218
219            // Add PEM headers and base64 encode the public key.
220            let public_key_pem = format!(
221                "-----BEGIN PUBLIC KEY-----\n{}\n-----END PUBLIC KEY-----",
222                Base64::encode_string(&bin_message.public_key)
223            );
224
225            let derived_key = derive_shared_key(
226                &self.metadata.encryption_key.as_ref().unwrap().0,
227                &public_key_pem,
228            );
229
230            match derived_key {
231                Ok(k) => {
232                    self.encryption_key = Some(k);
233
234                    Ok(json!({
235                      "method_id": self.method_id,
236                      "key_exchanged": true
237                    })
238                    .to_string())
239                }
240                Err(e) => {
241                    log(&format!("error deriving key: {}", e));
242                    self.encryption_key = None;
243
244                    Err(e)
245                }
246            }
247        } else if &bin_message.payload_type == PAYLOAD_TYPE_PAYLOAD {
248            log("received payload message");
249
250            let decrypted_json: String;
251
252            if bin_message.iv.len() > 0 {
253                let iv: [u8; 16] = bin_message.iv[..16].try_into().unwrap();
254                let mut encrypted_payload = bin_message.payload.clone();
255
256                decrypted_json = aes_decrypt(
257                    encrypted_payload.as_mut_slice(),
258                    &self.encryption_key.unwrap(),
259                    &iv,
260                );
261            } else {
262                decrypted_json = "{}".to_string();
263            }
264
265            let decrypted_payload: Value = serde_json::from_str(&decrypted_json).unwrap();
266
267            let meta_json = match bin_message.meta.len() {
268                0 => "{}".to_string(),
269                _ => String::from_utf8_lossy(&bin_message.meta).to_string(),
270            };
271
272            let mut response: Value = serde_json::from_str(&meta_json).unwrap();
273            response["method_id"] = self.method_id.into();
274            response["payload"] = decrypted_payload;
275
276            let response_json = response.to_string();
277
278            Ok(response_json)
279        } else {
280            log("received non-key exchange message");
281            Err("unsupported message type".to_string())
282        }
283    }
284
285    pub fn end(&mut self) -> Vec<u8> {
286        match self.client_end_pending {
287            true => {
288                // The client has already ended the stream, so do nothing.
289                vec![]
290            }
291            false => {
292                // The client hasn't ended the stream yet, so do so now.
293                log(&format!("client side ended: {}", self.method_id));
294                self.client_end_pending = true;
295                self.send(String::new())
296            }
297        }
298    }
299
300    /**
301     * Get the next pending payload to send - there may be none.
302     * This is used to send payloads that were queued while waiting for the key exchange to complete.
303     * @returns The next pending payload to send, or an empty array if there are no pending payloads.
304     */
305    pub fn pending_payload(&mut self) -> Vec<u8> {
306        match self.pending_payloads.pop_front() {
307            Some(p) => self.send(p),
308            None => vec![],
309        }
310    }
311
312    pub fn drop(self) {
313        // Instance is dropped.
314    }
315}
316
317impl Drop for Rpc {
318    fn drop(&mut self) {
319        log(&format!("dropping RPC: {}", self.method_id));
320    }
321}