volt_ws_protocol/
rpc_manager.rs

1use crate::{
2    binary_message::BinaryMessage,
3    crypto_utils::{get_identity_token, signing_key_from_pem, IdentityToken, SigningKeyPair},
4    log,
5    rpc::Rpc,
6    volt_client_configuration::VoltClientConfiguration,
7    volt_configuration::RelayConfiguration,
8};
9use base64ct::{Base64UrlUnpadded, Encoding};
10use ed25519_dalek::Signer;
11use serde_json::Value;
12use std::collections::HashMap;
13use std::sync::{Arc, Mutex};
14
15pub struct RpcManager {
16    pub client_config: VoltClientConfiguration,
17    pub connect_url: String,
18    pub key_pair: SigningKeyPair,
19    pub relaying: bool,
20    pub next_id: u64,
21    pub rpcs: Arc<Mutex<HashMap<u64, Rpc>>>,
22}
23
24impl RpcManager {
25    pub fn new() -> RpcManager {
26        log("creating new VoltClient");
27
28        RpcManager {
29            client_config: VoltClientConfiguration::default(),
30            connect_url: String::new(),
31            key_pair: SigningKeyPair {
32                public_key_pem: None,
33                private_key: None,
34            },
35            relaying: false,
36            next_id: 0,
37            rpcs: Arc::new(Mutex::new(HashMap::new())),
38        }
39    }
40
41    /**
42     * Initialise the client with the provided configuration.
43     * @param config_json The JSON configuration for the client, in stringified form.
44     * @returns The URL to connect to the Volt websocket server.
45     */
46    pub fn set_configuration(&mut self, config_json: &str) -> Result<String, String> {
47        log(&format!("initialising client with config: {}", config_json));
48
49        self.rpcs.lock().unwrap().clear();
50
51        // Parse the client configuration from the provided JSON.
52        self.client_config = match serde_json::from_str(config_json) {
53            Ok(config) => {
54                log("successfully parsed client config");
55                config
56            }
57            Err(e) => {
58                log(&format!("failed to parse client config: {}", e));
59                return Err("Failed to parse client config".to_string());
60            }
61        };
62
63        self.relaying = self.client_config.volt.relay.is_some();
64        log(&format!("relaying connection {}", self.relaying));
65
66        // Make sure a public key is provided in the Volt configuration.
67        // We could extract it from the certificate, but that would involve
68        // adding dependencies, which we want to keep to a minimum at the moment.
69        let volt_config = &self.client_config.volt;
70        match volt_config.public_key.len() {
71            0 => {
72                log("no public key provided");
73                return Err("No public key provided".to_string());
74            }
75            _ => {
76                log(&format!("volt public key {}", volt_config.public_key));
77            }
78        }
79
80        // Parse and load the private key from the PEM-encoded credential string.
81        let credential = &self.client_config.credential;
82        self.key_pair = match signing_key_from_pem(&credential.key) {
83            Ok(key_pair) => {
84                log("successfully parsed private key");
85                key_pair
86            }
87            Err(e) => {
88                log(&format!("failed to parse private key: {}", e));
89                return Err("Failed to parse private key".to_string());
90            }
91        };
92
93        let http_addresss = match &self.client_config.volt.relay {
94            // Relay can be either a string or an object.
95            Some(relay) => match &relay {
96                RelayConfiguration::String(url) => url.clone(),
97                RelayConfiguration::Object(relay) => relay.http_address.clone(),
98            },
99            None => {
100                // There is no relay configuration, so use the P2P address.
101                self.client_config.volt.http_address.clone()
102            }
103        };
104
105        // Determine the websocket protocol to use.
106        self.connect_url = http_addresss.replace("http", "ws");
107        log(&format!(
108            "initialised, connect URL is: {}",
109            self.connect_url
110        ));
111
112        Ok(self.connect_url.clone())
113    }
114
115    /**
116     * Create a new RPC method.
117     * @param method_name The name of the method to create.
118     * @param service_json The JSON string representing the service, or an empty string if this is a built-in service.
119     * @returns The ID of the new RPC method.
120     */
121    #[cfg(feature = "chrono")]
122    pub fn create_rpc(&mut self, method_name: &str, service_json: &str) -> Result<u64, String> {
123        let token_time = chrono::Utc::now().timestamp() as u64;
124        self.create_rpc_with_time(method_name, token_time, service_json)
125    }
126
127    /**
128     * Create a new RPC method.
129     * @param method_name The name of the method to create.
130     * @param token_time The time in seconds since the Unix epoch for the rpc token.
131     * @param service_json The JSON string representing the service, or an empty string if this is a built-in service.
132     * @returns The ID of the new RPC method.
133     */
134    pub fn create_rpc_with_time(
135        &mut self,
136        method_name: &str,
137        token_time: u64,
138        service_json: &str,
139    ) -> Result<u64, String> {
140        let mut service_id = String::new();
141        let mut service_client_id = String::new();
142        let mut service_client_key = String::new();
143        let mut service_relayed = false;
144
145        // Parse the service JSON if it's provided.
146        if service_json.len() > 0 {
147            log(&format!("service JSON: {}", service_json));
148
149            let service: Value = match serde_json::from_str(&service_json) {
150                Ok(service) => service,
151                Err(e) => {
152                    return Err("Failed to parse service JSON ".to_string() + &e.to_string());
153                }
154            };
155
156            // Extract the `service_id` property from the service JSON, which is a string.
157            service_id = match service["id"].as_str() {
158                Some(id) => id.to_string(),
159                _ => {
160                    return Err("Service ID is not a string".to_string());
161                }
162            };
163
164            // Extract the `service_description` property from the service JSON, which is a JSON object.
165            let service_description = match service["service_description"].as_object() {
166                Some(description) => description,
167                None => {
168                    return Err("Service description is not an object".to_string());
169                }
170            };
171
172            // Extract the host type from the service description, which is a string.
173            let host_type = match service_description["host_type"].as_str() {
174                Some(host_type) => host_type,
175                None => {
176                    return Err("Host type is not a string".to_string());
177                }
178            };
179
180            service_relayed = match host_type {
181                "SERVICE_HOST_TYPE_RELAYED" => true,
182                _ => false,
183            };
184
185            // Extract the `client_id` property from the service description, which is a string.
186            service_client_id = match service_description["host_client_id"].as_str() {
187                Some(client_id) => client_id.to_string(),
188                None => {
189                    return Err("Client ID is not a string".to_string());
190                }
191            };
192
193            let has_pk = service_description.contains_key("host_public_key");
194            if has_pk {
195                // Extract the `client_key` property from the service description, which is a string.
196                service_client_key = match service_description["host_public_key"].as_str() {
197                    Some(client_key) => client_key.to_string(),
198                    None => {
199                        return Err("Client key is not a string".to_string());
200                    }
201                };
202            } else {
203                // Default to the Volt public key.
204                service_client_key = self.client_config.volt.public_key.clone();
205            }
206        }
207
208        if service_client_id.is_empty() || service_client_id == self.client_config.volt.id {
209            // If no service is provided or this is a built-in service, we use the Volt public key.
210            service_client_key = self.client_config.volt.public_key.clone();
211            service_relayed = false;
212            service_id = "".to_string();
213            service_client_id = "".to_string();
214        }
215
216        let mut target_did = Vec::<String>::new();
217        if self.relaying {
218            target_did.push(self.client_config.volt.id.clone());
219        }
220
221        if service_relayed
222            && !service_client_id.is_empty()
223            && &service_client_id != &self.client_config.volt.id
224        {
225            target_did.push(service_client_id.clone());
226        }
227
228        log(&format!(
229            "creating call metadata with {} and {}",
230            service_client_id, service_client_key
231        ));
232
233        let metadata = match self.get_call_metadata(&service_client_id, token_time) {
234            Ok(metadata) => metadata,
235            Err(e) => {
236                return Err("Failed to create call metadata: ".to_string() + &e);
237            }
238        };
239
240        log("created call metadata");
241
242        self.next_id += 1;
243
244        let new_rpc = Rpc::new(
245            &self.next_id,
246            method_name,
247            target_did,
248            metadata,
249            service_client_key,
250            service_id,
251        );
252
253        let mut rpcs = self.rpcs.lock().unwrap();
254        rpcs.insert(self.next_id, new_rpc);
255
256        log(&format!("allocated new ID: {}", self.next_id));
257
258        Ok(self.next_id)
259    }
260
261    /**
262     * Remove an RPC method with the given method ID.
263     * @param method_id The ID of the RPC method to remove.
264     */
265    pub fn remove_rpc(&mut self, method_id: &u64) {
266        log(&format!("removing RPC with method ID: {}", method_id));
267        let mut rpcs = self.rpcs.lock().unwrap();
268        match rpcs.remove(method_id) {
269            Some(rpc) => {
270                drop(rpc);
271            }
272            None => {
273                log(&format!("No RPC found for method ID {}", method_id));
274            }
275        }
276    }
277
278    /**
279     * Create an identity token for the given target audience, using the client's private key.
280     * @param target_host_audience The target audience for the identity token.
281     * @param token_time The time in seconds since the Unix epoch for the token.
282     * @returns The identity token.
283     */
284    fn get_call_metadata(
285        &self,
286        target_host_audience: &String,
287        token_time: u64,
288    ) -> Result<IdentityToken, String> {
289        // Default the audience to the Volt id.
290        let mut target_audience = self.client_config.volt.id.clone();
291        if !target_host_audience.is_empty() {
292            target_audience = target_host_audience.clone();
293        }
294
295        let session_id = match &self.client_config.credential.session_id {
296            Some(session_id) => session_id,
297            None => {
298                log("no session ID found");
299                return Err("No session ID found".to_string());
300            }
301        };
302
303        log("creating identity token");
304
305        // Check if we have a session and we're using long-lived tokens.
306        let token_info = get_identity_token(
307            &session_id,
308            &self.key_pair,
309            &target_audience,
310            true,
311            token_time,
312            60,
313        );
314
315        Ok(token_info)
316    }
317
318    /**
319     * Encodes and encrypts a JSON payload for the given method ID.
320     * @param method_id The method ID to encode the payload for.
321     * @param payload_json The JSON payload to encode.
322     * @returns The encoded and encrypted payload as a binary buffer.
323     */
324    pub fn encode_payload(&self, method_id: &u64, payload_json: &str) -> Result<Vec<u8>, String> {
325        log(&format!("preparing payload for method ID: {}", method_id));
326
327        // See if we have an RPC for this method ID.
328        let mut rpcs = self.rpcs.lock().unwrap();
329        let lookup = rpcs.get_mut(method_id);
330
331        match lookup {
332            Some(rpc) => {
333                // We have an RPC for this method ID.
334                log("RPC found for method ID");
335
336                match payload_json.is_empty() {
337                    true => {
338                        // We have an empty payload, so just end the RPC.
339                        Ok(rpc.end())
340                    }
341                    false => {
342                        // We have a payload, so send it.
343                        Ok(rpc.send(payload_json.to_string()))
344                    }
345                }
346            }
347            None => {
348                // We don't have an RPC for this method ID.
349                Err(format!("No RPC found for method ID {}", method_id))
350            }
351        }
352    }
353
354    /**
355     * Decode and decrypts a binary payload.
356     * @param payload The binary payload to decode.
357     * @returns The decoded payload as a JSON string.
358     */
359    pub fn decode_payload(&self, payload: Vec<u8>) -> Result<String, String> {
360        let message = BinaryMessage::from_bytes(&payload);
361
362        let mut rpcs = self.rpcs.lock().unwrap();
363        let rpc = rpcs.get_mut(&message.method_id);
364
365        match rpc {
366            Some(rpc) => rpc.receive(message),
367            None => {
368                log(&format!("no RPC found for method ID {}", message.method_id));
369                Err("No RPC found for method ID".to_string())
370            }
371        }
372    }
373
374    /**
375     * Returns any pending payload for the given method ID.
376     * @param method_id The method ID to check for pending payloads.
377     * @returns The pending payload, or empty array if none.
378     */
379    pub fn pending_payload(&self, method_id: &u64) -> Result<Vec<u8>, String> {
380        let mut rpcs = self.rpcs.lock().unwrap();
381        let rpc = rpcs.get_mut(method_id);
382
383        match rpc {
384            Some(rpc) => Ok(rpc.pending_payload()),
385            None => {
386                log("No RPC found for method ID");
387                Err("No RPC found for method ID".to_string())
388            }
389        }
390    }
391
392    /**
393     * Sign the provided data using the client's private key.
394     * @param data The data to sign.
395     * @returns The signature of the data, encoded as a base64Url string.
396     */
397    pub fn sign_base64(&self, data: &str) -> Result<String, String> {
398        // Use the client private key to sign the data.
399        let key = match &self.key_pair.private_key {
400            Some(key) => key,
401            None => {
402                log("no private key found");
403                return Err("No private key found".to_string());
404            }
405        };
406
407        // Sign the payload using the private key
408        let signature = match key.try_sign(data.as_bytes()) {
409            Ok(signature) => signature,
410            Err(e) => {
411                log(&format!("failed to sign data: {}", e));
412                return Err("Failed to sign data".to_string());
413            }
414        };
415
416        let signature_bytes = signature.to_bytes();
417
418        let signature_base64 = Base64UrlUnpadded::encode_string(&signature_bytes);
419
420        Ok(signature_base64)
421    }
422
423    /**
424     * Get the client's public key in PEM format.
425     * @returns The client's public key in PEM format.
426     */
427    pub fn public_key_pem(&self) -> String {
428        match &self.key_pair.public_key_pem {
429            Some(key) => key.clone(),
430            None => "".to_string(),
431        }
432    }
433
434    pub fn drop(self) {
435        // Instance is dropped.
436    }
437}
438
439impl Drop for RpcManager {
440    fn drop(&mut self) {
441        log(&format!(
442            "dropping RpcManager: {}",
443            self.client_config.volt.id
444        ));
445    }
446}