1use crate::websocket_rpc::{Event, EventType, WebsocketRpc};
2use futures::{stream::StreamExt, SinkExt};
3use serde_json;
4use std::collections::HashMap;
5use std::sync::{Arc, Mutex as StdMutex};
6use tokio::sync::{
7 mpsc::{self, Receiver, Sender},
8 Mutex,
9};
10use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
11use volt_ws_protocol;
12
13pub struct VoltClient {
19 send_channel: mpsc::Sender<Vec<u8>>,
20 protocol_manager: Arc<Mutex<volt_ws_protocol::rpc_manager::RpcManager>>,
21 active_rpc: Arc<Mutex<HashMap<u64, Arc<Mutex<WebsocketRpc>>>>>,
22}
23
24impl VoltClient {
25 pub async fn create(config_json: &str) -> Result<VoltClient, String> {
32 let protocol_manager =
34 Arc::new(Mutex::new(volt_ws_protocol::rpc_manager::RpcManager::new()));
35
36 let url: String;
37 {
38 let mut protocol_manager = protocol_manager.lock().await;
39
40 url = match protocol_manager.set_configuration(config_json) {
42 Ok(url) => url,
43 Err(e) => return Err(e),
44 };
45
46 println!("Connecting to WebSocket server at {}", url);
47 }
48
49 let ws_stream = match connect_async(url).await {
51 Ok((ws_stream, _)) => ws_stream,
52 Err(e) => return Err(format!("Failed to connect to the WebSocket server: {}", e)),
53 };
54
55 let (write, read) = ws_stream.split();
57
58 let (send_channel, mut receive_channel) = mpsc::channel(100);
60
61 tokio::spawn(async move {
63 let mut write = write;
64 while let Some(req) = receive_channel.recv().await {
65 if write.send(Message::Binary(req)).await.is_err() {
66 break;
67 }
68 }
69 });
70
71 let active_rpc = Arc::new(Mutex::new(HashMap::<u64, Arc<Mutex<WebsocketRpc>>>::new()));
73
74 let active_rpc_clone = Arc::clone(&active_rpc);
76 let send_channel_clone = Arc::<mpsc::Sender<Vec<u8>>>::new(send_channel.clone());
77 let protocol_manager_clone = Arc::clone(&protocol_manager);
78
79 tokio::spawn(async move {
81 let mut read = read;
83
84 while let Some(msg) = read.next().await {
86 match msg {
87 Ok(payload) => {
88 let protocol = protocol_manager_clone.lock().await;
90 let decoded = match protocol.decode_payload(payload.into_data()) {
91 Ok(decoded) => decoded,
92 Err(e) => {
93 println!("Failed to decode payload: {:?}", e);
94 continue;
95 }
96 };
97
98 let response: serde_json::Value = match serde_json::from_str(&decoded) {
100 Ok(response) => response,
101 Err(e) => {
102 println!("Failed to parse JSON: {:?}", e);
103 continue;
104 }
105 };
106
107 let method_id = match response["method_id"].as_u64() {
109 Some(method_id) => method_id,
110 None => {
111 println!("Received response with no method_id");
112 continue;
113 }
114 };
115
116 if !response["key_exchanged"].is_null() {
118 let mut pending_payload = match protocol.pending_payload(&method_id) {
120 Ok(pending_payload) => pending_payload,
121 Err(e) => {
122 println!(
123 "Failure fetching pending payload for method_id: {} {}",
124 method_id, e
125 );
126 continue;
127 }
128 };
129
130 while pending_payload.len() > 0 {
131 let send_result = send_channel_clone.send(pending_payload).await;
133
134 match send_result {
135 Ok(_) => println!("Sent pending payload"),
136 Err(e) => {
137 println!("Failed to send pending payload: {:?}", e);
138 break;
139 }
140 }
141
142 pending_payload = match protocol.pending_payload(&method_id) {
144 Ok(pending_payload) => pending_payload,
145 Err(e) => {
146 println!(
147 "Failure fetching pending payload for method_id: {} {}",
148 method_id, e
149 );
150 break;
151 }
152 };
153 }
154 } else {
155 let mut active_rpc = active_rpc_clone.lock().await;
157
158 if !active_rpc.contains_key(&method_id) {
159 println!("Received response for unknown method_id: {}", method_id);
160 } else {
161 let mut rpc = match active_rpc.get_mut(&method_id) {
163 Some(rpc) => rpc.lock().await,
164 None => {
165 println!("Failed to get RPC for method_id: {}", method_id);
166 continue;
167 }
168 };
169
170 rpc.handle_response(&response);
171 }
172 }
173 }
174 _ => {
175 println!("Failure in received message");
176 }
177 }
178 }
179 });
180
181 let client = VoltClient {
183 send_channel,
184 protocol_manager,
185 active_rpc,
186 };
187
188 Ok(client)
189 }
190
191 async fn call_internal(
192 &mut self,
193 method: &str,
194 service: &str,
195 ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
196 if method.is_empty() {
197 return Err("Method cannot be empty".to_string());
198 }
199
200 let rpc_id: u64;
201
202 {
203 let mut rpc_manager = self.protocol_manager.lock().await;
204 rpc_id = match rpc_manager.create_rpc(method, service) {
205 Ok(rpc_id) => rpc_id,
206 Err(e) => return Err(e),
207 };
208 }
209
210 let rpc = WebsocketRpc::new(
211 rpc_id,
212 Arc::clone(&self.protocol_manager),
213 self.send_channel.clone(),
214 );
215
216 rpc.on(EventType::End, {
218 let rpc_id = rpc_id;
219 let active_rpc = Arc::clone(&self.active_rpc);
220 move |_response| {
221 let active_rpc = Arc::clone(&active_rpc);
222 tokio::spawn(async move {
223 println!("api_client: removing rpc: {}", rpc_id);
224 let mut active_rpc = active_rpc.lock().await;
225 active_rpc.remove(&rpc_id);
226 });
227 }
228 });
229
230 let rpc_cached = Arc::new(Mutex::new(rpc));
232 self.active_rpc
233 .lock()
234 .await
235 .insert(rpc_id, rpc_cached.clone());
236
237 Ok(rpc_cached)
238 }
239
240 pub async fn unary_rpc(
249 &mut self,
250 method: &str,
251 request: &serde_json::Value,
252 service: &str,
253 ) -> Result<serde_json::Value, String> {
254 if request.is_null() {
255 return Err("Request cannot be null".to_string());
256 }
257
258 let rpc = match self.call_internal(method, service).await {
260 Ok(rpc) => rpc,
261 Err(e) => return Err(e),
262 };
263
264 let (tx, mut rx): (Sender<()>, Receiver<()>) = mpsc::channel(1);
266
267 let tx = Arc::new(tx);
269
270 let unary_response = Arc::new(StdMutex::new(serde_json::Value::Null));
272 let unary_error = Arc::new(StdMutex::new(String::new()));
273
274 {
275 let rpc = rpc.lock().await;
277
278 rpc.on(EventType::Data, {
279 let unary_response = Arc::clone(&unary_response);
280 move |response| {
281 println!("api_client: received response: {:?}", response);
282 if let Event::Data(response) = response {
283 match unary_response.lock() {
284 Ok(mut unary_response) => {
285 *unary_response = response;
287 }
288 Err(e) => {
289 println!("failed to lock unary_response: {:?}", e);
290 return;
291 }
292 };
293 }
294 }
295 });
296
297 rpc.on(EventType::Error, {
298 let unary_error = Arc::clone(&unary_error);
299 move |response| {
300 println!("api_client: received error");
301 if let Event::Error(response) = response {
302 match unary_error.lock() {
303 Ok(mut unary_error) => {
304 *unary_error = response;
306 }
307 Err(e) => {
308 println!("failed to lock unary_response: {:?}", e);
309 return;
310 }
311 };
312 }
313 }
314 });
315
316 rpc.on(EventType::End, {
317 let tx = Arc::clone(&tx);
318 move |_response| {
319 println!("api_client: received end");
320 let tx = Arc::clone(&tx);
321 tokio::spawn(async move {
322 match tx.send(()).await {
323 Ok(_) => println!("unary response sent successfully"),
324 Err(e) => println!("failed to send unary response: {:?}", e),
325 }
326 });
327 }
328 });
329
330 match rpc.send(request).await {
332 Ok(_) => println!("sent request"),
333 Err(e) => {
334 return Err(e);
335 }
336 }
337
338 let _ = rpc.end().await;
339 }
340
341 rx.recv().await;
343
344 let result = match unary_error.lock() {
346 Ok(unary_error) => {
347 if unary_error.is_empty() {
348 match unary_response.lock() {
350 Ok(unary_response) => Ok(unary_response.clone()),
351 Err(e) => Err(format!("failed to lock unary_response: {:?}", e)),
352 }
353 } else {
354 Err(unary_error.clone())
356 }
357 }
358 Err(e) => Err(format!("failed to lock unary_error: {:?}", e)),
359 };
360
361 result
362 }
363
364 pub async fn server_streaming_call(
373 &mut self,
374 method: &str,
375 request: &serde_json::Value,
376 service: &str,
377 ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
378 if request.is_null() {
379 return Err("Request cannot be null".to_string());
380 }
381
382 let rpc = match self.call_internal(method, service).await {
383 Ok(rpc) => rpc,
384 Err(e) => return Err(e),
385 };
386
387 {
388 let rpc = rpc.lock().await;
389
390 match rpc.send(request).await {
392 Ok(_) => println!("sent request"),
393 Err(e) => {
394 return Err(e);
395 }
396 }
397
398 let _ = rpc.end().await;
399 }
400
401 Ok(rpc)
402 }
403
404 pub async fn streaming_call(
413 &mut self,
414 method: &str,
415 request: &serde_json::Value,
416 service: &str,
417 ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
418 let rpc = match self.call_internal(method, service).await {
419 Ok(rpc) => rpc,
420 Err(e) => return Err(e),
421 };
422
423 if !request.is_null() {
424 let rpc = rpc.lock().await;
425
426 match rpc.send(request).await {
428 Ok(_) => println!("sent request"),
429 Err(e) => {
430 println!("failed to send request: {:?}", e);
431 return Err(e);
432 }
433 }
434 } else {
435 println!("no initial request");
436 }
437
438 Ok(rpc)
439 }
440
441 pub async fn can_access_resource(
447 &mut self,
448 request: &serde_json::Value,
449 ) -> Result<serde_json::Value, String> {
450 self.unary_rpc(
451 "/tdx.volt_api.volt.v1.VoltAPI/CanAccessResource",
452 request,
453 "",
454 )
455 .await
456 }
457
458 pub async fn connect(
460 &mut self,
461 request: &serde_json::Value,
462 ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
463 self.streaming_call("/tdx.volt_api.volt.v1.VoltAPI/Connect", request, "")
464 .await
465 }
466
467 pub async fn delete_resource(
469 &mut self,
470 request: &serde_json::Value,
471 ) -> Result<serde_json::Value, String> {
472 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/DeleteResource", request, "")
473 .await
474 }
475
476 pub async fn discover_services(
478 &mut self,
479 request: &serde_json::Value,
480 ) -> Result<serde_json::Value, String> {
481 self.unary_rpc(
482 "/tdx.volt_api.volt.v1.VoltAPI/DiscoverServices",
483 request,
484 "",
485 )
486 .await
487 }
488
489 pub async fn get_resource(
491 &mut self,
492 request: &serde_json::Value,
493 ) -> Result<serde_json::Value, String> {
494 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetResource", request, "")
495 .await
496 }
497
498 pub async fn get_resources(
500 &mut self,
501 request: &serde_json::Value,
502 ) -> Result<serde_json::Value, String> {
503 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetResources", request, "")
504 .await
505 }
506
507 pub async fn get_resource_ancestors(
509 &mut self,
510 request: &serde_json::Value,
511 ) -> Result<serde_json::Value, String> {
512 self.unary_rpc(
513 "/tdx.volt_api.volt.v1.VoltAPI/GetResourceAncestors",
514 request,
515 "",
516 )
517 .await
518 }
519
520 pub async fn get_resource_descendants(
522 &mut self,
523 request: &serde_json::Value,
524 ) -> Result<serde_json::Value, String> {
525 self.unary_rpc(
526 "/tdx.volt_api.volt.v1.VoltAPI/GetResourceDescendants",
527 request,
528 "",
529 )
530 .await
531 }
532
533 pub async fn request_access(
535 &mut self,
536 request: &serde_json::Value,
537 ) -> Result<serde_json::Value, String> {
538 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/RequestAccess", request, "")
539 .await
540 }
541
542 pub async fn save_resource(
544 &mut self,
545 request: &serde_json::Value,
546 ) -> Result<serde_json::Value, String> {
547 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/SaveResource", request, "")
548 .await
549 }
550
551 pub async fn save_resource_attribute(
553 &mut self,
554 request: &serde_json::Value,
555 ) -> Result<serde_json::Value, String> {
556 self.unary_rpc(
557 "/tdx.volt_api.volt.v1.VoltAPI/SaveResourceAttribute",
558 request,
559 "",
560 )
561 .await
562 }
563
564 pub async fn set_service_status(
566 &mut self,
567 request: &serde_json::Value,
568 ) -> Result<serde_json::Value, String> {
569 self.unary_rpc(
570 "/tdx.volt_api.volt.v1.VoltAPI/SetServiceStatus",
571 request,
572 "",
573 )
574 .await
575 }
576
577 pub async fn download_file(
583 &mut self,
584 request: &serde_json::Value,
585 ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
586 self.server_streaming_call("/tdx.volt_api.volt.v1.FileAPI/DownloadFile", request, "")
587 .await
588 }
589
590 pub async fn get_file(
592 &mut self,
593 request: &serde_json::Value,
594 ) -> Result<serde_json::Value, String> {
595 self.unary_rpc("/tdx.volt_api.volt.v1.FileAPI/GetFile", request, "")
596 .await
597 }
598
599 pub async fn get_file_content(
601 &mut self,
602 request: &serde_json::Value,
603 ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
604 self.server_streaming_call("/tdx.volt_api.volt.v1.FileAPI/GetFileContent", request, "")
605 .await
606 }
607
608 pub async fn get_file_descendants(
610 &mut self,
611 request: &serde_json::Value,
612 ) -> Result<serde_json::Value, String> {
613 self.unary_rpc(
614 "/tdx.volt_api.volt.v1.FileAPI/GetFileDescendants",
615 request,
616 "",
617 )
618 .await
619 }
620
621 pub async fn set_file_content(
623 &mut self,
624 request: &serde_json::Value,
625 ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
626 self.streaming_call("/tdx.volt_api.volt.v1.FileAPI/SetFileContent", request, "")
627 .await
628 }
629
630 pub async fn upload_file(
632 &mut self,
633 request: &serde_json::Value,
634 ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
635 self.streaming_call("/tdx.volt_api.volt.v1.FileAPI/UploadFile", request, "")
636 .await
637 }
638
639 pub async fn authenticate(
645 &mut self,
646 request: &serde_json::Value,
647 ) -> Result<serde_json::Value, String> {
648 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/Authenticate", request, "")
649 .await
650 }
651
652 pub async fn delete_access(
654 &mut self,
655 request: &serde_json::Value,
656 ) -> Result<serde_json::Value, String> {
657 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/DeleteAccess", request, "")
658 .await
659 }
660
661 pub async fn delete_volt(
663 &mut self,
664 request: &serde_json::Value,
665 ) -> Result<serde_json::Value, String> {
666 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/DeleteVolt", request, "")
667 .await
668 }
669
670 pub async fn get_access(
672 &mut self,
673 request: &serde_json::Value,
674 ) -> Result<serde_json::Value, String> {
675 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetAccess", request, "")
676 .await
677 }
678
679 pub async fn get_identities(
681 &mut self,
682 request: &serde_json::Value,
683 ) -> Result<serde_json::Value, String> {
684 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetIdentities", request, "")
685 .await
686 }
687
688 pub async fn get_identity(
690 &mut self,
691 request: &serde_json::Value,
692 ) -> Result<serde_json::Value, String> {
693 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetIdentity", request, "")
694 .await
695 }
696
697 pub async fn get_one_time_token(
699 &mut self,
700 request: &serde_json::Value,
701 ) -> Result<serde_json::Value, String> {
702 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetOneTimeToken", request, "")
703 .await
704 }
705
706 pub async fn get_policy(
708 &mut self,
709 request: &serde_json::Value,
710 ) -> Result<serde_json::Value, String> {
711 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetPolicy", request, "")
712 .await
713 }
714
715 pub async fn get_settings(
717 &mut self,
718 request: &serde_json::Value,
719 ) -> Result<serde_json::Value, String> {
720 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/GetSettings", request, "")
721 .await
722 }
723
724 pub async fn invoke(
726 &mut self,
727 request: &serde_json::Value,
728 ) -> Result<serde_json::Value, String> {
729 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/Invoke", request, "")
730 .await
731 }
732
733 pub async fn save_access(
735 &mut self,
736 request: &serde_json::Value,
737 ) -> Result<serde_json::Value, String> {
738 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/SaveAccess", request, "")
739 .await
740 }
741
742 pub async fn save_identity(
744 &mut self,
745 request: &serde_json::Value,
746 ) -> Result<serde_json::Value, String> {
747 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/SaveIdentity", request, "")
748 .await
749 }
750
751 pub async fn save_settings(
753 &mut self,
754 request: &serde_json::Value,
755 ) -> Result<serde_json::Value, String> {
756 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/SaveSettings", request, "")
757 .await
758 }
759
760 pub async fn set_access_request_decision(
762 &mut self,
763 request: &serde_json::Value,
764 ) -> Result<serde_json::Value, String> {
765 self.unary_rpc(
766 "/tdx.volt_api.volt.v1.VoltAPI/SetAccessRequestDecision",
767 request,
768 "",
769 )
770 .await
771 }
772
773 pub async fn sign_verify(
775 &mut self,
776 request: &serde_json::Value,
777 ) -> Result<serde_json::Value, String> {
778 self.unary_rpc("/tdx.volt_api.volt.v1.VoltAPI/SignVerify", request, "")
779 .await
780 }
781
782 pub async fn publish_wire(
788 &mut self,
789 request: &serde_json::Value,
790 ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
791 self.streaming_call("/tdx.volt_api.volt.v1.WireAPI/PublishWire", request, "")
792 .await
793 }
794
795 pub async fn subscribe_wire(
797 &mut self,
798 request: &serde_json::Value,
799 ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
800 self.streaming_call("/tdx.volt_api.volt.v1.WireAPI/SubscribeWire", request, "")
801 .await
802 }
803
804 pub async fn bulk_update(
810 &mut self,
811 request: &serde_json::Value,
812 ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
813 self.streaming_call(
814 "/tdx.volt_api.data.v1.SqliteDatabaseAPI/BulkUpdate",
815 request,
816 "",
817 )
818 .await
819 }
820
821 pub async fn create_database(
823 &mut self,
824 request: &serde_json::Value,
825 ) -> Result<serde_json::Value, String> {
826 self.unary_rpc(
827 "/tdx.volt_api.data.v1.SqliteServerAPI/CreateDatabase",
828 request,
829 "",
830 )
831 .await
832 }
833
834 pub async fn sql_execute(
836 &mut self,
837 request: &serde_json::Value,
838 service: &str,
839 ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
840 self.streaming_call(
841 "/tdx.volt_api.data.v1.SqliteDatabaseAPI/Execute",
842 request,
843 service,
844 )
845 .await
846 }
847
848 pub async fn sql_execute_json(
850 &mut self,
851 request: &serde_json::Value,
852 service: &str,
853 ) -> Result<serde_json::Value, String> {
854 let rpc = match self.sql_execute(request, service).await {
855 Ok(call) => call,
856 Err(e) => return Err(e),
857 };
858
859 let (tx, mut rx): (Sender<()>, Receiver<()>) = mpsc::channel(1);
861
862 let tx = Arc::new(tx);
864
865 let header = Arc::new(StdMutex::new(Vec::<serde_json::Value>::new()));
867 let rows = Arc::new(StdMutex::new(Vec::<serde_json::Value>::new()));
868 let json_error = Arc::new(StdMutex::new(String::new()));
869
870 {
871 let rpc = rpc.lock().await;
873
874 rpc.on(EventType::Data, {
875 let header = Arc::clone(&header);
876 let rows = Arc::clone(&rows);
877 move |response| {
878 println!("sql_execute_json: received response: {:?}", response);
879 if let Event::Data(response) = response {
880 match response["header"].as_object() {
881 Some(header_obj) => {
882 match header.lock() {
883 Ok(mut header) => {
884 *header = header_obj["column"].as_array().unwrap().clone();
886 }
887 Err(e) => {
888 println!("failed to lock unary_response: {:?}", e);
889 return;
890 }
891 };
892 }
893 None => {
894 let row = response["row"].as_object().unwrap()["column"]
895 .as_array()
896 .unwrap();
897
898 let columns = header.lock().unwrap();
899
900 let mut row_map = serde_json::Map::new();
901 for (i, column) in columns.iter().enumerate() {
902 let name = column["name"].as_str().unwrap();
903
904 let cell = row[i].as_object().unwrap();
905
906 if cell.contains_key("null") {
907 row_map.insert(name.to_string(), serde_json::Value::Null);
908 } else if cell.contains_key("text") {
909 row_map.insert(name.to_string(), cell["text"].clone());
910 } else if cell.contains_key("integer") {
911 row_map.insert(
912 name.to_string(),
913 cell["integer"]
914 .as_str()
915 .unwrap()
916 .parse::<i64>()
917 .unwrap()
918 .into(),
919 );
920 } else if cell.contains_key("real") {
921 row_map.insert(
923 name.to_string(),
924 cell["real"]
925 .as_str()
926 .unwrap()
927 .parse::<f64>()
928 .unwrap()
929 .into(),
930 );
931 } else if cell.contains_key("blob") {
932 row_map.insert(name.to_string(), cell["blob"].clone());
933 }
934 }
935
936 match rows.lock() {
937 Ok(mut rows) => {
938 (*rows).push(row_map.into());
940 }
941 Err(e) => {
942 println!("failed to lock unary_response: {:?}", e);
943 return;
944 }
945 };
946 }
947 }
948 }
949 }
950 });
951
952 rpc.on(EventType::Error, {
953 let json_error = Arc::clone(&json_error);
954 move |response| {
955 println!("api_client: received error");
956 if let Event::Error(response) = response {
957 match json_error.lock() {
958 Ok(mut json_error) => {
959 *json_error = response;
961 }
962 Err(e) => {
963 println!("failed to lock unary_response: {:?}", e);
964 return;
965 }
966 };
967 }
968 }
969 });
970
971 rpc.on(EventType::End, {
972 let tx = Arc::clone(&tx);
973 move |_response| {
974 println!("api_client: received end");
975 let tx = Arc::clone(&tx);
976 tokio::spawn(async move {
977 match tx.send(()).await {
978 Ok(_) => println!("unary response sent successfully"),
979 Err(e) => println!("failed to send unary response: {:?}", e),
980 }
981 });
982 }
983 });
984
985 match rpc.send(request).await {
987 Ok(_) => println!("sent request"),
988 Err(e) => {
989 return Err(e);
990 }
991 }
992 }
993
994 rx.recv().await;
996
997 let result = match json_error.lock() {
999 Ok(json_error) => {
1000 if json_error.is_empty() {
1001 match rows.lock() {
1003 Ok(rows) => Ok(rows.clone().into()),
1004 Err(e) => Err(format!("failed to lock unary_response: {:?}", e)),
1005 }
1006 } else {
1007 Err(json_error.clone())
1009 }
1010 }
1011 Err(e) => Err(format!("failed to lock unary_error: {:?}", e)),
1012 };
1013
1014 result
1015 }
1016
1017 pub async fn import_csv(
1019 &mut self,
1020 request: &serde_json::Value,
1021 ) -> Result<Arc<Mutex<WebsocketRpc>>, String> {
1022 self.streaming_call(
1023 "/tdx.volt_api.data.v1.SqliteDatabaseAPI/ImportCSV",
1024 request,
1025 "",
1026 )
1027 .await
1028 }
1029}