volt_client_rs/
volt_client.rs

1use crate::websocket_rpc::{Event, EventType, WebsocketRpc};
2use futures::{stream::StreamExt, SinkExt};
3use serde_json;
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex as StdMutex};
6use tokio::sync::{
7    mpsc::{self, Receiver, Sender},
8    Mutex,
9};
10use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
11use volt_ws_protocol;
12
13/// The VoltClient struct is the main entry point for interacting with the Volt API.
14/// It provides methods for making unary, server streaming, and client streaming RPC calls.
15/// The client maintains a connection to the Volt server and handles the WebSocket communication,
16/// it manages the RPC lifecycle, including sending requests and receiving responses.
17/// The client also provides a set of convenience methods for making calls to the Volt API.
18pub struct VoltClient {
19    send_channel: mpsc::Sender<Vec<u8>>,
20    protocol_manager: Arc<Mutex<volt_ws_protocol::rpc_manager::RpcManager>>,
21    active_rpc: Arc<Mutex<HashMap<u64, Arc<Mutex<WebsocketRpc>>>>>,
22}
23
24impl VoltClient {
25    /// Creates a new API client instance and connects to the WebSocket server.
26    /// The client is configured with the provided JSON configuration.
27    /// # Arguments
28    /// * `config_json` - A JSON string containing the configuration settings.
29    /// # Returns
30    /// A Result containing the API client instance or an error message.
31    pub async fn create(config_json: &str) -> Result<VoltClient, String> {
32        // Create the protocol manager.
33        let protocol_manager =
34            Arc::new(Mutex::new(volt_ws_protocol::rpc_manager::RpcManager::new()));
35
36        let url: String;
37        {
38            let mut protocol_manager = protocol_manager.lock().await;
39
40            // Initialise the protocol manager and get the WebSocket server URL.
41            url = match protocol_manager.set_configuration(config_json) {
42                Ok(url) => url,
43                Err(e) => return Err(e),
44            };
45
46            println!("Connecting to WebSocket server at {}", url);
47        }
48
49        // Connect to the WebSocket server.
50        let ws_stream = match connect_async(url).await {
51            Ok((ws_stream, _)) => ws_stream,
52            Err(e) => return Err(format!("Failed to connect to the WebSocket server: {}", e)),
53        };
54
55        // Split the WebSocket stream into a write and read stream.
56        let (write, read) = ws_stream.split();
57
58        // Create a channel for sending requests
59        let (send_channel, mut receive_channel) = mpsc::channel(100);
60
61        // Spawn a task to handle sending requests
62        tokio::spawn(async move {
63            let mut write = write;
64            while let Some(req) = receive_channel.recv().await {
65                if write.send(Message::Binary(req)).await.is_err() {
66                    break;
67                }
68            }
69        });
70
71        // Create a map to store active RPCs.
72        let active_rpc = Arc::new(Mutex::new(HashMap::<u64, Arc<Mutex<WebsocketRpc>>>::new()));
73
74        // Clone the active RPC map and the send channel for use in the read task.
75        let active_rpc_clone = Arc::clone(&active_rpc);
76        let send_channel_clone = Arc::<mpsc::Sender<Vec<u8>>>::new(send_channel.clone());
77        let protocol_manager_clone = Arc::clone(&protocol_manager);
78
79        // Spawn a task to handle reading responses
80        tokio::spawn(async move {
81            // Capture the read stream in the closure.
82            let mut read = read;
83
84            // Loop to read messages from the WebSocket server.
85            while let Some(msg) = read.next().await {
86                match msg {
87                    Ok(payload) => {
88                        // Decode the payload using the wasm protocol manager.
89                        let protocol = protocol_manager_clone.lock().await;
90                        let decoded = match protocol.decode_payload(payload.into_data()) {
91                            Ok(decoded) => decoded,
92                            Err(e) => {
93                                println!("Failed to decode payload: {:?}", e);
94                                continue;
95                            }
96                        };
97
98                        // Parse the decoded payload as JSON.
99                        let response: serde_json::Value = match serde_json::from_str(&decoded) {
100                            Ok(response) => response,
101                            Err(e) => {
102                                println!("Failed to parse JSON: {:?}", e);
103                                continue;
104                            }
105                        };
106
107                        // Extract the target rpc method_id from the response.
108                        let method_id = match response["method_id"].as_u64() {
109                            Some(method_id) => method_id,
110                            None => {
111                                println!("Received response with no method_id");
112                                continue;
113                            }
114                        };
115
116                        // Check if the response contains a 'key_exchanged' field.
117                        if !response["key_exchanged"].is_null() {
118                            // Handle key exchange.
119                            let mut pending_payload = match protocol.pending_payload(&method_id) {
120                                Ok(pending_payload) => pending_payload,
121                                Err(e) => {
122                                    println!(
123                                        "Failure fetching pending payload for method_id: {} {}",
124                                        method_id, e
125                                    );
126                                    continue;
127                                }
128                            };
129
130                            while pending_payload.len() > 0 {
131                                // Send the pending request.
132                                let send_result = send_channel_clone.send(pending_payload).await;
133
134                                match send_result {
135                                    Ok(_) => println!("Sent pending payload"),
136                                    Err(e) => {
137                                        println!("Failed to send pending payload: {:?}", e);
138                                        break;
139                                    }
140                                }
141
142                                // Fetch the next pending payload.
143                                pending_payload = match protocol.pending_payload(&method_id) {
144                                    Ok(pending_payload) => pending_payload,
145                                    Err(e) => {
146                                        println!(
147                                            "Failure fetching pending payload for method_id: {} {}",
148                                            method_id, e
149                                        );
150                                        break;
151                                    }
152                                };
153                            }
154                        } else {
155                            // Not a key exchange response, handle as a normal response.
156                            let mut active_rpc = active_rpc_clone.lock().await;
157
158                            if !active_rpc.contains_key(&method_id) {
159                                println!("Received response for unknown method_id: {}", method_id);
160                            } else {
161                                // Let the RPC handle the response.
162                                let mut rpc = match active_rpc.get_mut(&method_id) {
163                                    Some(rpc) => rpc.lock().await,
164                                    None => {
165                                        println!("Failed to get RPC for method_id: {}", method_id);
166                                        continue;
167                                    }
168                                };
169
170                                rpc.handle_response(&response);
171                            }
172                        }
173                    }
174                    _ => {
175                        println!("Failure in received message");
176                    }
177                }
178            }
179        });
180
181        // Create the API client instance.
182        let client = VoltClient {
183            send_channel,
184            protocol_manager,
185            active_rpc,
186        };
187
188        Ok(client)
189    }
190
191    async fn call_internal(
192        &mut self,
193        method: &str,
194        service: &str,
195    ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
196        if method.is_empty() {
197            return Err("Method cannot be empty".to_string());
198        }
199
200        let rpc_id: u64;
201
202        {
203            let mut rpc_manager = self.protocol_manager.lock().await;
204            rpc_id = match rpc_manager.create_rpc(method, service) {
205                Ok(rpc_id) => rpc_id,
206                Err(e) => return Err(e),
207            };
208        }
209
210        let rpc = WebsocketRpc::new(
211            rpc_id,
212            Arc::clone(&self.protocol_manager),
213            self.send_channel.clone(),
214        );
215
216        // Register the end event to remove the RPC from the active RPC map.
217        rpc.on(EventType::End, {
218            let rpc_id = rpc_id;
219            let active_rpc = Arc::clone(&self.active_rpc);
220            move |_response| {
221                let active_rpc = Arc::clone(&active_rpc);
222                tokio::spawn(async move {
223                    println!("api_client: removing rpc: {}", rpc_id);
224                    let mut active_rpc = active_rpc.lock().await;
225                    active_rpc.remove(&rpc_id);
226                });
227            }
228        });
229
230        // Cache the RPC in the active RPC map.
231        let rpc_cached = Arc::new(Mutex::new(rpc));
232        self.active_rpc
233            .lock()
234            .await
235            .insert(rpc_id, rpc_cached.clone());
236
237        Ok(rpc_cached)
238    }
239
240    /// Generic method for unary RPC calls.
241    /// Sends a request and awaits the response.
242    /// # Arguments
243    /// * `method` - The method to call.
244    /// * `request` - The request payload.
245    /// * `service` - The service to call.
246    /// # Returns
247    /// A Result containing the response or an error message.
248    pub async fn unary_rpc(
249        &mut self,
250        method: &str,
251        request: &serde_json::Value,
252        service: &str,
253    ) -> Result<serde_json::Value, String> {
254        if request.is_null() {
255            return Err("Request cannot be null".to_string());
256        }
257
258        // Start the call.
259        let rpc = match self.call_internal(method, service).await {
260            Ok(rpc) => rpc,
261            Err(e) => return Err(e),
262        };
263
264        // Create a channel to asynchronously receive the response.
265        let (tx, mut rx): (Sender<()>, Receiver<()>) = mpsc::channel(1);
266
267        // Clone the channel for use in the event handlers.
268        let tx = Arc::new(tx);
269
270        // Create a mutex to store the unary response or error.
271        let unary_response = Arc::new(StdMutex::new(serde_json::Value::Null));
272        let unary_error = Arc::new(StdMutex::new(String::new()));
273
274        {
275            // Register to receive rpc events
276            let rpc = rpc.lock().await;
277
278            rpc.on(EventType::Data, {
279                let unary_response = Arc::clone(&unary_response);
280                move |response| {
281                    println!("api_client: received response: {:?}", response);
282                    if let Event::Data(response) = response {
283                        match unary_response.lock() {
284                            Ok(mut unary_response) => {
285                                // Copy the response into the unary_response so we can emit it from the `end` event.
286                                *unary_response = response;
287                            }
288                            Err(e) => {
289                                println!("failed to lock unary_response: {:?}", e);
290                                return;
291                            }
292                        };
293                    }
294                }
295            });
296
297            rpc.on(EventType::Error, {
298                let unary_error = Arc::clone(&unary_error);
299                move |response| {
300                    println!("api_client: received error");
301                    if let Event::Error(response) = response {
302                        match unary_error.lock() {
303                            Ok(mut unary_error) => {
304                                // Copy the response into the unary_response so we can emit it from the `end` event.
305                                *unary_error = response;
306                            }
307                            Err(e) => {
308                                println!("failed to lock unary_response: {:?}", e);
309                                return;
310                            }
311                        };
312                    }
313                }
314            });
315
316            rpc.on(EventType::End, {
317                let tx = Arc::clone(&tx);
318                move |_response| {
319                    println!("api_client: received end");
320                    let tx = Arc::clone(&tx);
321                    tokio::spawn(async move {
322                        match tx.send(()).await {
323                            Ok(_) => println!("unary response sent successfully"),
324                            Err(e) => println!("failed to send unary response: {:?}", e),
325                        }
326                    });
327                }
328            });
329
330            // Send the payload.
331            match rpc.send(request).await {
332                Ok(_) => println!("sent request"),
333                Err(e) => {
334                    return Err(e);
335                }
336            }
337
338            let _ = rpc.end().await;
339        }
340
341        // Await the response.
342        rx.recv().await;
343
344        // Determine if we received an error or a response.
345        let result = match unary_error.lock() {
346            Ok(unary_error) => {
347                if unary_error.is_empty() {
348                    // There is no error so return the response.
349                    match unary_response.lock() {
350                        Ok(unary_response) => Ok(unary_response.clone()),
351                        Err(e) => Err(format!("failed to lock unary_response: {:?}", e)),
352                    }
353                } else {
354                    // Propagate the error.
355                    Err(unary_error.clone())
356                }
357            }
358            Err(e) => Err(format!("failed to lock unary_error: {:?}", e)),
359        };
360
361        result
362    }
363
364    /// Generic method for server streaming RPC calls.
365    /// Sends a request and returns an RPC instance.
366    /// # Arguments
367    /// * `method` - The method to call.
368    /// * `request` - The request payload.
369    /// * `service` - The service to call.
370    /// # Returns
371    /// A Result containing the RPC event emitter or an error message.
372    pub async fn server_streaming_call(
373        &mut self,
374        method: &str,
375        request: &serde_json::Value,
376        service: &str,
377    ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
378        if request.is_null() {
379            return Err("Request cannot be null".to_string());
380        }
381
382        let rpc = match self.call_internal(method, service).await {
383            Ok(rpc) => rpc,
384            Err(e) => return Err(e),
385        };
386
387        {
388            let rpc = rpc.lock().await;
389
390            // Send the payload.
391            match rpc.send(request).await {
392                Ok(_) => println!("sent request"),
393                Err(e) => {
394                    return Err(e);
395                }
396            }
397
398            let _ = rpc.end().await;
399        }
400
401        Ok(rpc)
402    }
403
404    /// Generic method for client streaming RPC calls.
405    /// Sends a request and returns an RPC instance.
406    /// # Arguments
407    /// * `method` - The method to call.
408    /// * `request` - The request payload.
409    /// * `service` - The service to call.
410    /// # Returns
411    /// A Result containing the RPC event emitter or an error message.
412    pub async fn streaming_call(
413        &mut self,
414        method: &str,
415        request: &serde_json::Value,
416        service: &str,
417    ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
418        let rpc = match self.call_internal(method, service).await {
419            Ok(rpc) => rpc,
420            Err(e) => return Err(e),
421        };
422
423        if !request.is_null() {
424            let rpc = rpc.lock().await;
425
426            // Send the payload.
427            match rpc.send(request).await {
428                Ok(_) => println!("sent request"),
429                Err(e) => {
430                    println!("failed to send request: {:?}", e);
431                    return Err(e);
432                }
433            }
434        } else {
435            println!("no initial request");
436        }
437
438        Ok(rpc)
439    }
440
441    ///
442    /// RESOURCE API
443    ///
444
445    /// See https://docs.tdxvolt.com/en/api/volt_api#CanAccessResource
446    pub async fn can_access_resource(
447        &mut self,
448        request: &serde_json::Value,
449    ) -> Result<serde_json::Value, String> {
450        self.unary_rpc(
451            "/tdx.volt_api.volt.v1.VoltAPI/CanAccessResource",
452            request,
453            "",
454        )
455        .await
456    }
457
458    /// See https://docs.tdxvolt.com/en/api/volt_api#Connect
459    pub async fn connect(
460        &mut self,
461        request: &serde_json::Value,
462    ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
463        self.streaming_call("/tdx.volt_api.volt.v1.VoltAPI/Connect", request, "")
464            .await
465    }
466
467    /// See https://docs.tdxvolt.com/en/api/volt_api#DeleteResource
468    pub async fn delete_resource(
469        &mut self,
470        request: &serde_json::Value,
471    ) -> Result<serde_json::Value, String> {
472        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/DeleteResource", request, "")
473            .await
474    }
475
476    /// See https://docs.tdxvolt.com/en/api/volt_api#DiscoverServices
477    pub async fn discover_services(
478        &mut self,
479        request: &serde_json::Value,
480    ) -> Result<serde_json::Value, String> {
481        self.unary_rpc(
482            "/tdx.volt_api.volt.v1.VoltAPI/DiscoverServices",
483            request,
484            "",
485        )
486        .await
487    }
488
489    /// See https://docs.tdxvolt.com/en/api/volt_api#GetResource
490    pub async fn get_resource(
491        &mut self,
492        request: &serde_json::Value,
493    ) -> Result<serde_json::Value, String> {
494        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetResource", request, "")
495            .await
496    }
497
498    /// See https://docs.tdxvolt.com/en/api/volt_api#GetResources
499    pub async fn get_resources(
500        &mut self,
501        request: &serde_json::Value,
502    ) -> Result<serde_json::Value, String> {
503        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetResources", request, "")
504            .await
505    }
506
507    /// See https://docs.tdxvolt.com/en/api/volt_api#GetResourceAncestors
508    pub async fn get_resource_ancestors(
509        &mut self,
510        request: &serde_json::Value,
511    ) -> Result<serde_json::Value, String> {
512        self.unary_rpc(
513            "/tdx.volt_api.volt.v1.VoltAPI/GetResourceAncestors",
514            request,
515            "",
516        )
517        .await
518    }
519
520    /// See https://docs.tdxvolt.com/en/api/volt_api#GetResourceDescendants
521    pub async fn get_resource_descendants(
522        &mut self,
523        request: &serde_json::Value,
524    ) -> Result<serde_json::Value, String> {
525        self.unary_rpc(
526            "/tdx.volt_api.volt.v1.VoltAPI/GetResourceDescendants",
527            request,
528            "",
529        )
530        .await
531    }
532
533    /// See https://docs.tdxvolt.com/en/api/volt_api#RequestAccess
534    pub async fn request_access(
535        &mut self,
536        request: &serde_json::Value,
537    ) -> Result<serde_json::Value, String> {
538        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/RequestAccess", request, "")
539            .await
540    }
541
542    /// See https://docs.tdxvolt.com/en/api/volt_api#SaveResource
543    pub async fn save_resource(
544        &mut self,
545        request: &serde_json::Value,
546    ) -> Result<serde_json::Value, String> {
547        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/SaveResource", request, "")
548            .await
549    }
550
551    /// See https://docs.tdxvolt.com/en/api/volt_api#SaveResourceAttribute
552    pub async fn save_resource_attribute(
553        &mut self,
554        request: &serde_json::Value,
555    ) -> Result<serde_json::Value, String> {
556        self.unary_rpc(
557            "/tdx.volt_api.volt.v1.VoltAPI/SaveResourceAttribute",
558            request,
559            "",
560        )
561        .await
562    }
563
564    /// See https://docs.tdxvolt.com/en/api/volt_api#SetServiceStatus
565    pub async fn set_service_status(
566        &mut self,
567        request: &serde_json::Value,
568    ) -> Result<serde_json::Value, String> {
569        self.unary_rpc(
570            "/tdx.volt_api.volt.v1.VoltAPI/SetServiceStatus",
571            request,
572            "",
573        )
574        .await
575    }
576
577    ///
578    /// FILE API
579    ///
580
581    /// See https://docs.tdxvolt.com/en/api/file_api#DownloadFile
582    pub async fn download_file(
583        &mut self,
584        request: &serde_json::Value,
585    ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
586        self.server_streaming_call("/tdx.volt_api.volt.v1.FileAPI/DownloadFile", request, "")
587            .await
588    }
589
590    /// See https://docs.tdxvolt.com/en/api/file_api#GetFile
591    pub async fn get_file(
592        &mut self,
593        request: &serde_json::Value,
594    ) -> Result<serde_json::Value, String> {
595        self.unary_rpc("/tdx.volt_api.volt.v1.FileAPI/GetFile", request, "")
596            .await
597    }
598
599    /// See https://docs.tdxvolt.com/en/api/file_api#GetFileContent
600    pub async fn get_file_content(
601        &mut self,
602        request: &serde_json::Value,
603    ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
604        self.server_streaming_call("/tdx.volt_api.volt.v1.FileAPI/GetFileContent", request, "")
605            .await
606    }
607
608    /// See https://docs.tdxvolt.com/en/api/file_api#GetFileDescendants
609    pub async fn get_file_descendants(
610        &mut self,
611        request: &serde_json::Value,
612    ) -> Result<serde_json::Value, String> {
613        self.unary_rpc(
614            "/tdx.volt_api.volt.v1.FileAPI/GetFileDescendants",
615            request,
616            "",
617        )
618        .await
619    }
620
621    /// See https://docs.tdxvolt.com/en/api/file_api#SetFileContent
622    pub async fn set_file_content(
623        &mut self,
624        request: &serde_json::Value,
625    ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
626        self.streaming_call("/tdx.volt_api.volt.v1.FileAPI/SetFileContent", request, "")
627            .await
628    }
629
630    /// See https://docs.tdxvolt.com/en/api/file_api#UploadFile
631    pub async fn upload_file(
632        &mut self,
633        request: &serde_json::Value,
634    ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
635        self.streaming_call("/tdx.volt_api.volt.v1.FileAPI/UploadFile", request, "")
636            .await
637    }
638
639    ///
640    /// VOLT MANAGEMENT API
641    ///
642
643    /// See https://docs.tdxvolt.com/en/api/volt_api#Authenticate
644    pub async fn authenticate(
645        &mut self,
646        request: &serde_json::Value,
647    ) -> Result<serde_json::Value, String> {
648        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/Authenticate", request, "")
649            .await
650    }
651
652    /// See https://docs.tdxvolt.com/en/api/volt_api#DeleteAccess
653    pub async fn delete_access(
654        &mut self,
655        request: &serde_json::Value,
656    ) -> Result<serde_json::Value, String> {
657        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/DeleteAccess", request, "")
658            .await
659    }
660
661    /// See https://docs.tdxvolt.com/en/api/volt_api#DeleteVolt
662    pub async fn delete_volt(
663        &mut self,
664        request: &serde_json::Value,
665    ) -> Result<serde_json::Value, String> {
666        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/DeleteVolt", request, "")
667            .await
668    }
669
670    /// See https://docs.tdxvolt.com/en/api/volt_api#GetAccess
671    pub async fn get_access(
672        &mut self,
673        request: &serde_json::Value,
674    ) -> Result<serde_json::Value, String> {
675        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetAccess", request, "")
676            .await
677    }
678
679    /// See https://docs.tdxvolt.com/en/api/volt_api#GetIdentities
680    pub async fn get_identities(
681        &mut self,
682        request: &serde_json::Value,
683    ) -> Result<serde_json::Value, String> {
684        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetIdentities", request, "")
685            .await
686    }
687
688    /// See https://docs.tdxvolt.com/en/api/volt_api#GetIdentity
689    pub async fn get_identity(
690        &mut self,
691        request: &serde_json::Value,
692    ) -> Result<serde_json::Value, String> {
693        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetIdentity", request, "")
694            .await
695    }
696
697    /// See https://docs.tdxvolt.com/en/api/volt_api#GetOneTimeToken
698    pub async fn get_one_time_token(
699        &mut self,
700        request: &serde_json::Value,
701    ) -> Result<serde_json::Value, String> {
702        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetOneTimeToken", request, "")
703            .await
704    }
705
706    /// See https://docs.tdxvolt.com/en/api/volt_api#GetPolicy
707    pub async fn get_policy(
708        &mut self,
709        request: &serde_json::Value,
710    ) -> Result<serde_json::Value, String> {
711        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetPolicy", request, "")
712            .await
713    }
714
715    /// See https://docs.tdxvolt.com/en/api/volt_api#GetSettings
716    pub async fn get_settings(
717        &mut self,
718        request: &serde_json::Value,
719    ) -> Result<serde_json::Value, String> {
720        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetSettings", request, "")
721            .await
722    }
723
724    /// See https://docs.tdxvolt.com/en/api/volt_api#Invoke
725    pub async fn invoke(
726        &mut self,
727        request: &serde_json::Value,
728    ) -> Result<serde_json::Value, String> {
729        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/Invoke", request, "")
730            .await
731    }
732
733    /// See https://docs.tdxvolt.com/en/api/volt_api#SaveAccess
734    pub async fn save_access(
735        &mut self,
736        request: &serde_json::Value,
737    ) -> Result<serde_json::Value, String> {
738        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/SaveAccess", request, "")
739            .await
740    }
741
742    /// See https://docs.tdxvolt.com/en/api/volt_api#SaveIdentity
743    pub async fn save_identity(
744        &mut self,
745        request: &serde_json::Value,
746    ) -> Result<serde_json::Value, String> {
747        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/SaveIdentity", request, "")
748            .await
749    }
750
751    /// See https://docs.tdxvolt.com/en/api/volt_api#SaveSettings
752    pub async fn save_settings(
753        &mut self,
754        request: &serde_json::Value,
755    ) -> Result<serde_json::Value, String> {
756        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/SaveSettings", request, "")
757            .await
758    }
759
760    /// See https://docs.tdxvolt.com/en/api/volt_api#SetAccessRequestDecision
761    pub async fn set_access_request_decision(
762        &mut self,
763        request: &serde_json::Value,
764    ) -> Result<serde_json::Value, String> {
765        self.unary_rpc(
766            "/tdx.volt_api.volt.v1.VoltAPI/SetAccessRequestDecision",
767            request,
768            "",
769        )
770        .await
771    }
772
773    /// See https://docs.tdxvolt.com/en/api/volt_api#SignVerify
774    pub async fn sign_verify(
775        &mut self,
776        request: &serde_json::Value,
777    ) -> Result<serde_json::Value, String> {
778        self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/SignVerify", request, "")
779            .await
780    }
781
782    ///
783    /// WIRE API
784    ///
785
786    /// See https://docs.tdxvolt.com/en/api/wire_api#PublishWire
787    pub async fn publish_wire(
788        &mut self,
789        request: &serde_json::Value,
790    ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
791        self.streaming_call("/tdx.volt_api.volt.v1.WireAPI/PublishWire", request, "")
792            .await
793    }
794
795    /// See https://docs.tdxvolt.com/en/api/wire_api#SubscribeWire
796    pub async fn subscribe_wire(
797        &mut self,
798        request: &serde_json::Value,
799    ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
800        self.streaming_call("/tdx.volt_api.volt.v1.WireAPI/SubscribeWire", request, "")
801            .await
802    }
803
804    ///
805    /// DATABASE API
806    ///
807
808    /// See https://docs.tdxvolt.com/en/api/sqlite_database_api#BulkUpdate
809    pub async fn bulk_update(
810        &mut self,
811        request: &serde_json::Value,
812    ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
813        self.streaming_call(
814            "/tdx.volt_api.data.v1.SqliteDatabaseAPI/BulkUpdate",
815            request,
816            "",
817        )
818        .await
819    }
820
821    /// https://docs.tdxvolt.com/en/api/sqlite_server_api#CreateDatabase
822    pub async fn create_database(
823        &mut self,
824        request: &serde_json::Value,
825    ) -> Result<serde_json::Value, String> {
826        self.unary_rpc(
827            "/tdx.volt_api.data.v1.SqliteServerAPI/CreateDatabase",
828            request,
829            "",
830        )
831        .await
832    }
833
834    /// See https://docs.tdxvolt.com/en/api/sqlite_database_api#Execute
835    pub async fn sql_execute(
836        &mut self,
837        request: &serde_json::Value,
838        service: &str,
839    ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
840        self.streaming_call(
841            "/tdx.volt_api.data.v1.SqliteDatabaseAPI/Execute",
842            request,
843            service,
844        )
845        .await
846    }
847
848    /// See https://docs.tdxvolt.com/en/api/sqlite_database_api#Execute
849    pub async fn sql_execute_json(
850        &mut self,
851        request: &serde_json::Value,
852        service: &str,
853    ) -> Result<serde_json::Value, String> {
854        let rpc = match self.sql_execute(request, service).await {
855            Ok(call) => call,
856            Err(e) => return Err(e),
857        };
858
859        // Create a channel to asynchronously receive the response.
860        let (tx, mut rx): (Sender<()>, Receiver<()>) = mpsc::channel(1);
861
862        // Clone the channel for use in the event handlers.
863        let tx = Arc::new(tx);
864
865        // Create a mutex to store the unary response or error.
866        let header = Arc::new(StdMutex::new(Vec::<serde_json::Value>::new()));
867        let rows = Arc::new(StdMutex::new(Vec::<serde_json::Value>::new()));
868        let json_error = Arc::new(StdMutex::new(String::new()));
869
870        {
871            // Register to receive rpc events
872            let rpc = rpc.lock().await;
873
874            rpc.on(EventType::Data, {
875                let header = Arc::clone(&header);
876                let rows = Arc::clone(&rows);
877                move |response| {
878                    println!("sql_execute_json: received response: {:?}", response);
879                    if let Event::Data(response) = response {
880                        match response["header"].as_object() {
881                            Some(header_obj) => {
882                                match header.lock() {
883                                    Ok(mut header) => {
884                                        // Copy the response into the unary_response so we can emit it from the `end` event.
885                                        *header = header_obj["column"].as_array().unwrap().clone();
886                                    }
887                                    Err(e) => {
888                                        println!("failed to lock unary_response: {:?}", e);
889                                        return;
890                                    }
891                                };
892                            }
893                            None => {
894                                let row = response["row"].as_object().unwrap()["column"]
895                                    .as_array()
896                                    .unwrap();
897
898                                let columns = header.lock().unwrap();
899
900                                let mut row_map = serde_json::Map::new();
901                                for (i, column) in columns.iter().enumerate() {
902                                    let name = column["name"].as_str().unwrap();
903
904                                    let cell = row[i].as_object().unwrap();
905
906                                    if cell.contains_key("null") {
907                                        row_map.insert(name.to_string(), serde_json::Value::Null);
908                                    } else if cell.contains_key("text") {
909                                        row_map.insert(name.to_string(), cell["text"].clone());
910                                    } else if cell.contains_key("integer") {
911                                        row_map.insert(
912                                            name.to_string(),
913                                            cell["integer"]
914                                                .as_str()
915                                                .unwrap()
916                                                .parse::<i64>()
917                                                .unwrap()
918                                                .into(),
919                                        );
920                                    } else if cell.contains_key("real") {
921                                        // Store as real.
922                                        row_map.insert(
923                                            name.to_string(),
924                                            cell["real"]
925                                                .as_str()
926                                                .unwrap()
927                                                .parse::<f64>()
928                                                .unwrap()
929                                                .into(),
930                                        );
931                                    } else if cell.contains_key("blob") {
932                                        row_map.insert(name.to_string(), cell["blob"].clone());
933                                    }
934                                }
935
936                                match rows.lock() {
937                                    Ok(mut rows) => {
938                                        // Copy the response into the unary_response so we can emit it from the `end` event.
939                                        (*rows).push(row_map.into());
940                                    }
941                                    Err(e) => {
942                                        println!("failed to lock unary_response: {:?}", e);
943                                        return;
944                                    }
945                                };
946                            }
947                        }
948                    }
949                }
950            });
951
952            rpc.on(EventType::Error, {
953                let json_error = Arc::clone(&json_error);
954                move |response| {
955                    println!("api_client: received error");
956                    if let Event::Error(response) = response {
957                        match json_error.lock() {
958                            Ok(mut json_error) => {
959                                // Copy the response into the unary_response so we can emit it from the `end` event.
960                                *json_error = response;
961                            }
962                            Err(e) => {
963                                println!("failed to lock unary_response: {:?}", e);
964                                return;
965                            }
966                        };
967                    }
968                }
969            });
970
971            rpc.on(EventType::End, {
972                let tx = Arc::clone(&tx);
973                move |_response| {
974                    println!("api_client: received end");
975                    let tx = Arc::clone(&tx);
976                    tokio::spawn(async move {
977                        match tx.send(()).await {
978                            Ok(_) => println!("unary response sent successfully"),
979                            Err(e) => println!("failed to send unary response: {:?}", e),
980                        }
981                    });
982                }
983            });
984
985            // Send the payload.
986            match rpc.send(request).await {
987                Ok(_) => println!("sent request"),
988                Err(e) => {
989                    return Err(e);
990                }
991            }
992        }
993
994        // Await the response.
995        rx.recv().await;
996
997        // Determine if we received an error or a response.
998        let result = match json_error.lock() {
999            Ok(json_error) => {
1000                if json_error.is_empty() {
1001                    // There is no error so return the response.
1002                    match rows.lock() {
1003                        Ok(rows) => Ok(rows.clone().into()),
1004                        Err(e) => Err(format!("failed to lock unary_response: {:?}", e)),
1005                    }
1006                } else {
1007                    // Propagate the error.
1008                    Err(json_error.clone())
1009                }
1010            }
1011            Err(e) => Err(format!("failed to lock unary_error: {:?}", e)),
1012        };
1013
1014        result
1015    }
1016
1017    /// See https://docs.tdxvolt.com/en/api/sqlite_database_api#ImportCSV
1018    pub async fn import_csv(
1019        &mut self,
1020        request: &serde_json::Value,
1021    ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
1022        self.streaming_call(
1023            "/tdx.volt_api.data.v1.SqliteDatabaseAPI/ImportCSV",
1024            request,
1025            "",
1026        )
1027        .await
1028    }
1029}