1use futures::Stream;
6use std::collections::HashMap;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10use tokio::sync::{mpsc, Mutex, RwLock};
11use tokio_stream::wrappers::ReceiverStream;
12use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
13use tonic::Streaming;
14
15use crate::config::{resolve_volt_config, InitialiseOptions, VoltClientConfig, VoltConfig};
16use crate::constants::{PolicyDecision, VoltAccessType};
17use crate::credential::VoltCredential;
18use crate::error::{Result, VoltError};
19use crate::proto::*;
20use crate::relay::{RelayContext, RelayState};
21use crate::utils::get_local_ip;
22use crate::volt_connection::{EventReceiver, VoltConnection};
23
24pub enum SyncDocumentStream {
34 Direct(Streaming<volt::SyncDocumentResponse>),
36 Relay(ReceiverStream<std::result::Result<volt::SyncDocumentResponse, tonic::Status>>),
38}
39
40impl Stream for SyncDocumentStream {
41 type Item = std::result::Result<volt::SyncDocumentResponse, tonic::Status>;
42
43 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
44 match self.get_mut() {
45 SyncDocumentStream::Direct(stream) => {
46 Pin::new(stream).poll_next(cx)
49 }
50 SyncDocumentStream::Relay(stream) => Pin::new(stream).poll_next(cx),
51 }
52 }
53}
54
55pub struct VoltClient {
57 config: Option<VoltClientConfig>,
59 credential: Option<VoltCredential>,
61 volt_config: Option<VoltConfig>,
63 connection: Option<VoltConnection>,
65 cached_channel: Arc<Mutex<Option<Channel>>>,
67 cached_services: Arc<RwLock<HashMap<String, Channel>>>,
69 connected: Arc<RwLock<bool>>,
71 session: Arc<RwLock<Option<String>>>,
73 relay_context: Arc<Mutex<Option<RelayContext>>>,
75 relay_invoke_tx: Arc<Mutex<Option<mpsc::Sender<volt::InvokeRequest>>>>,
77 relay_invoke_rx: Arc<Mutex<Option<Streaming<volt::InvokeResponse>>>>,
79}
80
81impl VoltClient {
82 pub fn new() -> Result<Self> {
84 Ok(Self {
85 config: None,
86 credential: None,
87 volt_config: None,
88 connection: None,
89 cached_channel: Arc::new(Mutex::new(None)),
90 cached_services: Arc::new(RwLock::new(HashMap::new())),
91 connected: Arc::new(RwLock::new(false)),
92 session: Arc::new(RwLock::new(None)),
93 relay_context: Arc::new(Mutex::new(None)),
94 relay_invoke_tx: Arc::new(Mutex::new(None)),
95 relay_invoke_rx: Arc::new(Mutex::new(None)),
96 })
97 }
98
99 pub fn config(&self) -> Option<&VoltClientConfig> {
101 self.config.as_ref()
102 }
103
104 pub fn credential(&self) -> Option<&VoltCredential> {
106 self.credential.as_ref()
107 }
108
109 pub fn credential_mut(&mut self) -> Option<&mut VoltCredential> {
111 self.credential.as_mut()
112 }
113
114 pub fn volt_config(&self) -> Option<&VoltConfig> {
116 self.volt_config.as_ref()
117 }
118
119 pub fn is_relayed(&self) -> bool {
121 self.volt_config
122 .as_ref()
123 .map(|c| c.relay.is_some() && !c.id.is_empty())
124 .unwrap_or(false)
125 }
126
127 pub async fn is_connected(&self) -> bool {
129 *self.connected.read().await
130 }
131
132 pub async fn close(&mut self) {
134 *self.cached_channel.lock().await = None;
136
137 self.cached_services.write().await.clear();
139 }
140
141 pub async fn initialise(
147 &mut self,
148 config: ConfigSource,
149 options: InitialiseOptions,
150 ) -> Result<VoltClientConfig> {
151 tracing::debug!("Initializing Volt client");
152
153 let (mut config, config_path) = match config {
155 ConfigSource::File(path) => {
156 let path_str = path.to_string_lossy().to_string();
157 tracing::debug!("Loading config from file: {}", path_str);
158
159 if path.exists() {
160 let config = VoltClientConfig::from_file(&path).await?;
161 (config, Some(path_str))
162 } else {
163 (VoltClientConfig::default(), Some(path_str))
165 }
166 }
167 ConfigSource::Config(config) => (config, None),
168 ConfigSource::Json(json) => (VoltClientConfig::from_value(json)?, None),
169 };
170
171 if let Some(extra) = options.extra_config {
173 if let Ok(extra_config) = serde_json::from_value::<VoltClientConfig>(extra) {
174 if config.client_name.is_empty() {
176 config.client_name = extra_config.client_name;
177 }
178 }
179 }
180
181 if config.client_name.is_empty() {
183 return Err(VoltError::missing_config("client_name"));
184 }
185
186 let registry_list = options.did_registry_list.as_deref();
188 config.volt = resolve_volt_config(config.volt, registry_list).await?;
189 self.volt_config = Some(config.volt.clone());
190
191 tracing::debug!("Volt config: {:?}", self.volt_config);
192
193 if config.volt.id.is_empty() {
194 return Err(VoltError::missing_config("volt.id"));
195 }
196
197 let mut credential = VoltCredential::new(config.clone(), config_path)?;
199 credential.initialise().await?;
200
201 if !self.is_relayed() {
203 if config.volt.address.is_none() {
204 return Err(VoltError::missing_config("volt.address"));
205 }
206 if config.volt.ca_pem.is_none() {
207 return Err(VoltError::missing_config("volt.ca_pem"));
208 }
209 }
210
211 self.config = Some(config.clone());
213
214 let is_bound = if credential.certificate().is_some() {
216 true
217 } else {
218 self.authenticate_internal(
219 &mut credential,
220 options.no_did,
221 options.own_did,
222 options.exchange_token.as_deref(),
223 options.wait_for_auth,
224 )
225 .await?
226 };
227
228 if !is_bound {
229 return Err(VoltError::auth("Volt authentication failed"));
230 }
231
232 credential.save_cache().await?;
234
235 self.credential = Some(credential);
236
237 Ok(config)
238 }
239
240 pub async fn initialise_and_connect(
242 &mut self,
243 config: ConfigSource,
244 options: InitialiseOptions,
245 hello_payload: Option<serde_json::Value>,
246 ) -> Result<EventReceiver> {
247 self.initialise(config, options).await?;
248 self.connect(hello_payload).await
249 }
250
251 pub async fn connect(
253 &mut self,
254 hello_payload: Option<serde_json::Value>,
255 ) -> Result<EventReceiver> {
256 if self.connection.is_some() {
257 tracing::debug!("Already have a connection");
258 return Err(VoltError::AlreadyConnected);
259 }
260
261 let config = self.config.as_ref().ok_or(VoltError::NotInitialized)?;
262
263 let mut connection = VoltConnection::new(config);
264 let event_rx = connection.connect(hello_payload).await?;
265
266 self.connection = Some(connection);
267
268 Ok(event_rx)
269 }
270
271 pub async fn disconnect(&mut self) {
273 if let Some(ref mut connection) = self.connection {
274 connection.disconnect().await;
275 }
276 self.connection = None;
277 *self.connected.write().await = false;
278 *self.session.write().await = None;
279 self.close().await;
280 }
281
282 async fn authenticate_internal(
284 &self,
285 credential: &mut VoltCredential,
286 no_did: bool,
287 own_did: bool,
288 exchange_token: Option<&str>,
289 wait_for_permit: bool,
290 ) -> Result<bool> {
291 let config = self.config.as_ref().ok_or(VoltError::NotInitialized)?;
292 let volt_config = self.volt_config.as_ref().ok_or(VoltError::NotInitialized)?;
293
294 let cache = credential.cache_mut();
296 if cache.ca.is_none() {
297 cache.ca = volt_config.ca_pem.clone();
298 }
299
300 if cache.ca.is_none() {
301 return Err(VoltError::CertificateError(
302 "No CA certificate found".into(),
303 ));
304 }
305
306 credential.create_key().await?;
308 let public_key_pem = credential.public_key_pem()?;
309
310 if credential
312 .cache()
313 .and_then(|c| c.bind_ip.as_ref())
314 .is_none()
315 {
316 let ip = get_local_ip().unwrap_or_else(|_| "127.0.0.1".to_string());
317 credential.cache_mut().bind_ip = Some(ip);
318 }
319
320 credential.save_cache().await?;
321
322 let mut auth_request = AuthenticateRequest {
324 client_name: config.client_name.clone(),
325 host: credential.cache().and_then(|c| c.bind_ip.clone()),
326 ..Default::default()
327 };
328
329 if let Some(vcs) = credential.cache().map(|c| &c.vc).filter(|v| !v.is_empty()) {
331 let mut presentation = VerifiablePresentation::default();
332 let mut presentation_data = String::new();
333
334 for vc in vcs {
335 let vc_str = serde_json::to_string_pretty(vc)?;
336 presentation.credential.push(vc_str.clone());
337 presentation_data.push_str(&vc_str);
338 }
339
340 let key = credential.get_key()?;
341 presentation.signature = key.sign_base64(presentation_data.as_bytes());
342 auth_request.verifiable_presentation.push(presentation);
343 }
344
345 if let Some(token) = exchange_token {
347 auth_request.exchange_token = Some(token.to_string());
348 }
349
350 if let Some(challenge) = credential.cache().and_then(|c| c.challenge_code.clone()) {
352 auth_request.challenge = Some(challenge);
353 }
354
355 if no_did {
357 auth_request.public_key = Some(public_key_pem);
359 } else if credential.identity_did().is_none() {
360 if !own_did {
361 auth_request.did_public_key = Some(public_key_pem);
363 } else {
364 let did = format!("did:volt:{}", uuid::Uuid::new_v4());
366 credential.cache_mut().identity_did = Some(did.clone());
367
368 let did_document = serde_json::json!({
370 "@context": [
371 "https://www.w3.org/ns/did/v1",
372 "https://tdxvolt.com/ns/did/v1"
373 ],
374 "authentication": [format!("{}#key-1", did)],
375 "controller": did,
376 "id": did,
377 "verificationMethod": [{
378 "id": format!("{}#key-1", did),
379 "publicKeyPem": public_key_pem,
380 "type": "Ed25519Signature2018"
381 }]
382 });
383
384 let did_doc_str = serde_json::to_string_pretty(&did_document)?;
385 let key = credential.get_key()?;
386 let signature = key.sign_base64(did_doc_str.as_bytes());
387
388 auth_request.did_document = Some(did_doc_str);
389 auth_request.did_document_signature = Some(signature);
390 }
391 } else {
392 auth_request.did = credential.identity_did().map(|s| s.to_string());
394 }
395
396 let poll_interval = std::time::Duration::from_millis(10000);
398 let mut decision = PolicyDecision::Unknown;
399
400 loop {
401 if decision != PolicyDecision::Unknown {
402 tokio::time::sleep(poll_interval).await;
403 }
404
405 let response = self.issue_authenticate(&auth_request).await?;
406 decision = PolicyDecision::from_str(&response.decision);
407
408 tracing::debug!("Authenticate decision: {:?}", decision);
409
410 match decision {
411 PolicyDecision::Permit | PolicyDecision::Prompt | PolicyDecision::Pending => {
412 let cache = credential.cache_mut();
414
415 if let Some(cert) = response.cert {
416 cache.cert = Some(cert);
417 }
418 if let Some(ca) = response.chain {
419 cache.ca = Some(ca);
420 }
421 if let Some(session_id) = response.session_id {
422 cache.session_id = Some(session_id);
423 }
424 if cache.identity_did.is_none() {
425 cache.identity_did = response.identity_did;
426 }
427
428 for cred_str in response.credential {
430 if let Ok(vc) = serde_json::from_str(&cred_str) {
431 let exists = cache.vc.iter().any(|existing| {
433 existing.get("id")
434 == serde_json::from_str::<serde_json::Value>(&cred_str)
435 .ok()
436 .and_then(|v| v.get("id").cloned())
437 .as_ref()
438 });
439 if !exists {
440 cache.vc.push(vc);
441 }
442 }
443 }
444
445 credential.save_cache().await?;
446 }
447 PolicyDecision::Deny => {
448 tracing::warn!("Access request denied");
449 break;
450 }
451 _ => {
452 tracing::debug!("Unknown decision: {}", response.decision);
453 }
454 }
455
456 if decision == PolicyDecision::Permit {
458 break;
459 }
460 if decision == PolicyDecision::Pending {
461 continue;
462 }
463 if decision == PolicyDecision::Prompt && wait_for_permit {
464 continue;
465 }
466 break;
467 }
468
469 Ok(decision == PolicyDecision::Permit)
470 }
471
472 async fn issue_authenticate(
474 &self,
475 request: &AuthenticateRequest,
476 ) -> Result<AuthenticateResponse> {
477 if self.is_relayed() {
478 tracing::warn!("Relay authentication not yet implemented; returning simulated permit");
479 return Ok(AuthenticateResponse {
480 decision: "POLICY_DECISION_PERMIT".to_string(),
481 session_id: Some(uuid::Uuid::new_v4().to_string()),
482 ..Default::default()
483 });
484 }
485
486 tracing::debug!("Authenticating directly with Volt");
487
488 let request_value =
489 serde_json::to_value(request).map_err(|e| VoltError::serialization(e.to_string()))?;
490
491 let grpc_request: volt::AuthenticateRequest = serde_json::from_value(request_value)
492 .map_err(|e| VoltError::serialization(e.to_string()))?;
493
494 let mut client = self.get_volt_client().await?;
495 let response = client
496 .authenticate(tonic::Request::new(grpc_request))
497 .await
498 .map_err(VoltError::GrpcError)?;
499
500 let response_json = serde_json::to_value(response.into_inner())
501 .map_err(|e| VoltError::serialization(e.to_string()))?;
502
503 let auth_response: AuthenticateResponse = serde_json::from_value(response_json)
504 .map_err(|e| VoltError::serialization(e.to_string()))?;
505
506 tracing::debug!(
507 "Authentication response decision: {}",
508 auth_response.decision
509 );
510
511 Ok(auth_response)
512 }
513
514 pub async fn request_access_blocking(
518 &self,
519 target_resource_id: &str,
520 access_type: VoltAccessType,
521 ) -> Result<bool> {
522 let request = AccessRequest {
523 resource_id: target_resource_id.to_string(),
524 access: access_type.as_str().to_string(),
525 };
526
527 let poll_interval = std::time::Duration::from_millis(10000);
528 let mut decision = PolicyDecision::Unknown;
529
530 loop {
531 if decision != PolicyDecision::Unknown {
532 tokio::time::sleep(poll_interval).await;
533 }
534
535 let response = self.request_access(request.clone()).await?;
536 decision = PolicyDecision::from_str(&response.decision);
537
538 tracing::debug!("Access decision: {:?}", decision);
539
540 if decision != PolicyDecision::Prompt && decision != PolicyDecision::Pending {
541 break;
542 }
543 }
544
545 Ok(decision == PolicyDecision::Permit)
546 }
547
548 pub async fn can_access_resource(&self, request: serde_json::Value) -> Result<ApiResponse> {
550 self.unary_call("CanAccessResource", request).await
551 }
552
553 pub async fn delete_resource(&self, request: serde_json::Value) -> Result<ApiResponse> {
555 self.unary_call("DeleteResource", request).await
556 }
557
558 pub async fn discover_services(&self, request: serde_json::Value) -> Result<ApiResponse> {
560 self.unary_call("DiscoverServices", request).await
561 }
562
563 pub async fn get_resource(&self, request: serde_json::Value) -> Result<ApiResponse> {
565 self.unary_call("GetResource", request).await
566 }
567
568 pub async fn get_resources(&self, request: serde_json::Value) -> Result<ApiResponse> {
570 self.unary_call("GetResources", request).await
571 }
572
573 pub async fn get_resource_ancestors(&self, request: serde_json::Value) -> Result<ApiResponse> {
575 self.unary_call("GetResourceAncestors", request).await
576 }
577
578 pub async fn get_resource_descendants(
580 &self,
581 request: serde_json::Value,
582 ) -> Result<ApiResponse> {
583 self.unary_call("GetResourceDescendants", request).await
584 }
585
586 pub async fn request_access(&self, request: AccessRequest) -> Result<AccessResponse> {
588 let response = self
589 .unary_call("RequestAccess", serde_json::to_value(&request)?)
590 .await?;
591 serde_json::from_value(response.payload).map_err(VoltError::from)
592 }
593
594 pub async fn save_resource(&self, request: serde_json::Value) -> Result<ApiResponse> {
596 self.unary_call("SaveResource", request).await
597 }
598
599 pub async fn authenticate(&self, request: serde_json::Value) -> Result<ApiResponse> {
601 self.unary_call("Authenticate", request).await
602 }
603
604 pub async fn bind(&self, request: serde_json::Value) -> Result<ApiResponse> {
606 self.unary_call("Bind", request).await
607 }
608
609 pub async fn get_bindings(&self, request: serde_json::Value) -> Result<ApiResponse> {
611 self.unary_call("GetBindings", request).await
612 }
613
614 pub async fn get_identities(&self, request: serde_json::Value) -> Result<ApiResponse> {
616 self.unary_call("GetIdentities", request).await
617 }
618
619 pub async fn get_identity(&self, request: serde_json::Value) -> Result<ApiResponse> {
621 self.unary_call("GetIdentity", request).await
622 }
623
624 pub async fn get_policy(&self, request: serde_json::Value) -> Result<ApiResponse> {
626 self.unary_call("GetPolicy", request).await
627 }
628
629 pub async fn get_access(&self, request: serde_json::Value) -> Result<ApiResponse> {
631 self.unary_call("GetAccess", request).await
632 }
633
634 pub async fn get_settings(&self, request: serde_json::Value) -> Result<ApiResponse> {
636 self.unary_call("GetSettings", request).await
637 }
638
639 pub async fn save_access(&self, request: serde_json::Value) -> Result<ApiResponse> {
641 self.unary_call("SaveAccess", request).await
642 }
643
644 pub async fn save_identity(&self, request: serde_json::Value) -> Result<ApiResponse> {
646 self.unary_call("SaveIdentity", request).await
647 }
648
649 pub async fn save_settings(&self, request: serde_json::Value) -> Result<ApiResponse> {
651 self.unary_call("SaveSettings", request).await
652 }
653
654 pub async fn shutdown(&self, request: serde_json::Value) -> Result<ApiResponse> {
656 self.unary_call("Shutdown", request).await
657 }
658
659 pub async fn get_file_descendants(&self, request: serde_json::Value) -> Result<ApiResponse> {
663 self.unary_call("GetFileDescendants", request).await
664 }
665
666 pub async fn get_file_content(&self, request: serde_json::Value) -> Result<ApiResponse> {
668 self.unary_call("GetFileContent", request).await
669 }
670
671 pub async fn set_file_content(&self, request: serde_json::Value) -> Result<ApiResponse> {
673 self.unary_call("SetFileContent", request).await
674 }
675
676 pub async fn create_database(&self, request: serde_json::Value) -> Result<ApiResponse> {
680 self.unary_call("CreateDatabase", request).await
681 }
682
683 pub async fn bulk_update(&self, request: serde_json::Value) -> Result<ApiResponse> {
685 self.unary_call("BulkUpdate", request).await
686 }
687
688 fn get_relay_auth_metadata(&self) -> Option<tonic::metadata::MetadataMap> {
693 tracing::info!("get_relay_auth_metadata: is_relayed={}", self.is_relayed());
694
695 if !self.is_relayed() {
696 tracing::info!("Not relayed, skipping auth metadata");
697 return None;
698 }
699
700 let credential = self.credential.as_ref();
701 if credential.is_none() {
702 tracing::info!("No credential available");
703 return None;
704 }
705 let credential = credential.unwrap();
706
707 let volt_id = self
708 .volt_config
709 .as_ref()
710 .map(|c| c.id.as_str())
711 .unwrap_or("");
712 tracing::info!("Creating auth metadata for volt_id: {}", volt_id);
713
714 match credential.get_identity_metadata(Some(volt_id), None, true, 60) {
715 Ok(identity) => {
716 tracing::info!("Created relay authentication metadata successfully");
717 Some(identity.metadata)
718 }
719 Err(e) => {
720 tracing::warn!("Failed to create relay auth metadata: {}", e);
721 None
722 }
723 }
724 }
725
726 fn add_auth_to_request<T>(&self, request: &mut tonic::Request<T>) {
728 if let Some(metadata) = self.get_relay_auth_metadata() {
729 let req_metadata = request.metadata_mut();
730 for key_and_value in metadata.iter() {
731 match key_and_value {
732 tonic::metadata::KeyAndValueRef::Ascii(key, value) => {
733 tracing::info!("Adding metadata: {}={:?}", key, value);
734 req_metadata.insert(key, value.clone());
735 }
736 tonic::metadata::KeyAndValueRef::Binary(key, value) => {
737 tracing::info!("Adding binary metadata: {}", key);
738 req_metadata.insert_bin(key, value.clone());
739 }
740 }
741 }
742 }
743 }
744
745 async fn get_channel(&self) -> Result<Channel> {
747 if !self.is_relayed() {
751 let cached = self.cached_channel.lock().await;
752 if let Some(channel) = cached.as_ref() {
753 return Ok(channel.clone());
754 }
755 }
756
757 let volt_config = self.volt_config.as_ref().ok_or(VoltError::NotInitialized)?;
759 let credential = self.credential.as_ref().ok_or(VoltError::NotInitialized)?;
760
761 let is_relayed = self.is_relayed();
763 let (address, ca_pem) = if is_relayed {
764 let relay = volt_config
765 .relay
766 .as_ref()
767 .ok_or_else(|| VoltError::missing_config("volt.relay"))?;
768 tracing::info!("Using relay connection to: {}", relay.address);
769 (relay.address.clone(), relay.ca_pem.clone())
770 } else {
771 let addr = volt_config
772 .address
773 .as_ref()
774 .ok_or_else(|| VoltError::missing_config("volt.address"))?;
775 tracing::info!("Using direct connection to: {}", addr);
776 (addr.clone(), volt_config.ca_pem.clone())
777 };
778
779 let uri = if address.starts_with("http://") || address.starts_with("https://") {
782 address.clone()
783 } else if ca_pem.is_some() {
784 format!("https://{}", address)
785 } else {
786 format!("http://{}", address)
787 };
788
789 tracing::info!("Connecting to URI: {}", uri);
790 tracing::info!(
791 "has CA cert: {}, is_relayed: {}",
792 ca_pem.is_some(),
793 is_relayed
794 );
795
796 let mut endpoint = Channel::from_shared(uri.clone())
798 .map_err(|e| VoltError::ConnectionError(format!("Invalid URI: {}", e)))?;
799
800 endpoint = endpoint
804 .tcp_keepalive(Some(std::time::Duration::from_secs(60)))
806 .tcp_nodelay(true)
808 .http2_adaptive_window(true)
810 .initial_connection_window_size(1024 * 1024) .initial_stream_window_size(1024 * 1024) .connect_timeout(std::time::Duration::from_secs(30));
815 if let Some(ca_pem) = &ca_pem {
821 tracing::info!("Configuring TLS with CA certificate (len={})", ca_pem.len());
822 let ca = Certificate::from_pem(ca_pem);
823 let mut tls_config = ClientTlsConfig::new().ca_certificate(ca);
824
825 if is_relayed {
827 tls_config = tls_config.domain_name("coreid.com");
828 }
829
830 if !is_relayed {
833 if let (Some(cert_pem), Some(key_pem)) = (
834 credential.cache().and_then(|c| c.cert.as_ref()),
835 credential.cache().and_then(|c| c.key.as_ref()),
836 ) {
837 let identity = Identity::from_pem(cert_pem.as_bytes(), key_pem.as_bytes());
838 tls_config = tls_config.identity(identity);
839 }
840 }
841
842 endpoint = endpoint
843 .tls_config(tls_config)
844 .map_err(|e| VoltError::ConnectionError(format!("TLS config error: {}", e)))?;
845 }
846
847 let channel = if is_relayed {
850 tracing::info!("Eagerly connecting to relay...");
852 endpoint.connect().await.map_err(|e| {
853 VoltError::ConnectionError(format!("Failed to connect to relay {}: {}", uri, e))
854 })?
855 } else {
856 endpoint.connect().await.map_err(|e| {
858 VoltError::ConnectionError(format!("Failed to connect to {}: {}", uri, e))
859 })?
860 };
861
862 {
864 let mut cached = self.cached_channel.lock().await;
865 *cached = Some(channel.clone());
866 }
867
868 Ok(channel)
869 }
870
871 async fn get_volt_client(&self) -> Result<volt::volt_api_client::VoltApiClient<Channel>> {
873 let channel = self.get_channel().await?;
874 let client = if self.is_relayed() {
876 volt::volt_api_client::VoltApiClient::new(channel)
877 } else {
878 volt::volt_api_client::VoltApiClient::new(channel)
880 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
881 .accept_compressed(tonic::codec::CompressionEncoding::Zstd)
882 .accept_compressed(tonic::codec::CompressionEncoding::Deflate)
883 };
884 Ok(client)
885 }
886
887 async fn get_sync_client(&self) -> Result<volt::sync_api_client::SyncApiClient<Channel>> {
889 let channel = self.get_channel().await?;
890 let client = volt::sync_api_client::SyncApiClient::new(channel)
892 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
893 .accept_compressed(tonic::codec::CompressionEncoding::Zstd)
894 .accept_compressed(tonic::codec::CompressionEncoding::Deflate);
895 Ok(client)
896 }
897
898 async fn establish_relay_connection(&self) -> Result<()> {
903 if !self.is_relayed() {
904 tracing::debug!("Not a relayed connection, skipping relay setup");
905 return Ok(());
906 }
907
908 {
910 let ctx_guard = self.relay_context.lock().await;
911 if let Some(ctx) = ctx_guard.as_ref() {
912 if ctx.state() == &RelayState::Connected {
913 tracing::debug!("Relay already connected");
914 return Ok(());
915 }
916 }
917 }
918
919 tracing::info!("Establishing relay connection...");
920
921 let volt_config = self.volt_config.as_ref().ok_or(VoltError::NotInitialized)?;
923 let credential = self.credential.as_ref().ok_or(VoltError::NotInitialized)?;
924 let target_did = &volt_config.id;
925
926 let mut relay_ctx = RelayContext::new(target_did.clone());
928
929 if let Some(public_key_pem) = &volt_config.public_key {
931 if let Ok(pk_bytes) =
932 crate::crypto::from_base64(&crate::crypto::strip_pem_headers(public_key_pem))
933 {
934 relay_ctx.set_target_public_key(pk_bytes);
935 }
936 }
937
938 let relay_public_key_b64 = relay_ctx.public_key_base64();
940 let identity = credential.get_identity_metadata(
941 Some(target_did),
942 Some(&relay_public_key_b64),
943 true,
944 60,
945 )?;
946 let token = identity.token.clone();
947
948 let key_exchange_request = relay_ctx.create_key_exchange_request(&token);
950 tracing::debug!(
951 "Created key exchange request with invoke_id: {}",
952 key_exchange_request.invoke_id
953 );
954
955 let mut volt_client = self.get_volt_client().await?;
957
958 let (invoke_tx, invoke_rx) = mpsc::channel::<volt::InvokeRequest>(100);
960
961 invoke_tx
963 .send(key_exchange_request)
964 .await
965 .map_err(|_| VoltError::connection("Failed to send key exchange request"))?;
966
967 let request_stream = ReceiverStream::new(invoke_rx);
969 let mut request = tonic::Request::new(request_stream);
970
971 let req_metadata = request.metadata_mut();
973 for key_and_value in identity.metadata.iter() {
974 match key_and_value {
975 tonic::metadata::KeyAndValueRef::Ascii(key, value) => {
976 req_metadata.insert(key, value.clone());
977 }
978 tonic::metadata::KeyAndValueRef::Binary(key, value) => {
979 req_metadata.insert_bin(key, value.clone());
980 }
981 }
982 }
983
984 tracing::debug!("establish_relay: Starting Invoke stream...");
986 let response = volt_client.invoke(request).await.map_err(|e| {
987 tracing::error!(
988 "establish_relay: Invoke stream error: code={:?}, msg={}",
989 e.code(),
990 e.message()
991 );
992 VoltError::connection(format!("Failed to start Invoke stream: {}", e))
993 })?;
994
995 let mut invoke_response_stream = response.into_inner();
996
997 tracing::debug!("Waiting for key exchange response...");
999 let key_exchange_response = invoke_response_stream
1000 .message()
1001 .await
1002 .map_err(|e| {
1003 VoltError::connection(format!("Failed to receive key exchange response: {}", e))
1004 })?
1005 .ok_or_else(|| VoltError::connection("Invoke stream closed before key exchange"))?;
1006
1007 tracing::debug!(
1008 "Received key exchange response: invoke_id={}",
1009 key_exchange_response.invoke_id
1010 );
1011
1012 if let Some(key_exchange) = &key_exchange_response.key_exchange {
1014 relay_ctx.process_key_exchange_response(key_exchange)?;
1015 tracing::info!("Relay key exchange completed successfully");
1016 } else if let Some(volt::invoke_response::ResponsePayload::Status(status)) =
1017 &key_exchange_response.response_payload
1018 {
1019 return Err(VoltError::server(status.code, &status.message));
1020 } else {
1021 return Err(VoltError::protocol(
1022 "Expected key exchange response, got something else",
1023 ));
1024 }
1025
1026 *self.relay_context.lock().await = Some(relay_ctx);
1028 *self.relay_invoke_tx.lock().await = Some(invoke_tx);
1029 *self.relay_invoke_rx.lock().await = Some(invoke_response_stream);
1030
1031 tracing::info!("Relay connection established");
1032 Ok(())
1033 }
1034
1035 pub async fn is_relay_connected(&self) -> bool {
1037 let ctx_guard = self.relay_context.lock().await;
1038 ctx_guard
1039 .as_ref()
1040 .map(|ctx| ctx.state() == &RelayState::Connected)
1041 .unwrap_or(false)
1042 }
1043
1044 pub async fn sync_document(
1079 &self,
1080 initial_request: volt::SyncDocumentRequest,
1081 ) -> Result<(mpsc::Sender<volt::SyncDocumentRequest>, SyncDocumentStream)> {
1082 if self.is_relayed() {
1084 return self.sync_document_via_relay(initial_request).await;
1085 }
1086
1087 let mut client = self.get_sync_client().await?;
1089
1090 let (tx, rx) = mpsc::channel::<volt::SyncDocumentRequest>(100);
1092
1093 tx.send(initial_request)
1095 .await
1096 .map_err(|_| VoltError::ConnectionError("Failed to send initial request".into()))?;
1097
1098 let request_stream = ReceiverStream::new(rx);
1100
1101 let mut request = tonic::Request::new(request_stream);
1103 self.add_auth_to_request(&mut request);
1104
1105 let response = client
1107 .sync_document(request)
1108 .await
1109 .map_err(VoltError::GrpcError)?;
1110
1111 let response_stream = response.into_inner();
1112
1113 Ok((tx, SyncDocumentStream::Direct(response_stream)))
1114 }
1115
1116 async fn sync_document_via_relay(
1120 &self,
1121 initial_request: volt::SyncDocumentRequest,
1122 ) -> Result<(mpsc::Sender<volt::SyncDocumentRequest>, SyncDocumentStream)> {
1123 tracing::info!("Using relay protocol for sync_document");
1124
1125 self.establish_relay_connection().await?;
1127
1128 let relay_ctx = self.relay_context.lock().await;
1130 let ctx = relay_ctx
1131 .as_ref()
1132 .ok_or_else(|| VoltError::protocol("Relay context not initialized"))?;
1133
1134 if ctx.state() != &RelayState::Connected {
1135 return Err(VoltError::protocol("Relay not in Connected state"));
1136 }
1137
1138 let service_id = "tdx.volt_api.volt.v1.SyncAPI".to_string();
1140
1141 let invoke_id = ctx.next_invoke_id();
1143
1144 drop(relay_ctx);
1145
1146 let (user_tx, mut user_rx) = mpsc::channel::<volt::SyncDocumentRequest>(100);
1148 let (response_tx, response_rx) =
1149 mpsc::channel::<std::result::Result<volt::SyncDocumentResponse, tonic::Status>>(100);
1150
1151 let response_stream = tokio_stream::wrappers::ReceiverStream::new(response_rx);
1154
1155 let invoke_tx_guard = self.relay_invoke_tx.lock().await;
1157 let invoke_tx = invoke_tx_guard
1158 .as_ref()
1159 .ok_or_else(|| VoltError::protocol("Invoke stream not initialized"))?
1160 .clone();
1161 drop(invoke_tx_guard);
1162
1163 let relay_context = self.relay_context.clone();
1165 let relay_invoke_rx = self.relay_invoke_rx.clone();
1166
1167 tokio::spawn(async move {
1169 use prost::Message;
1170
1171 {
1173 let ctx_guard = relay_context.lock().await;
1174 let ctx = match ctx_guard.as_ref() {
1175 Some(c) => c,
1176 None => {
1177 tracing::error!("Relay context disappeared");
1178 return;
1179 }
1180 };
1181
1182 let mut request_bytes = Vec::new();
1184 if let Err(e) = initial_request.encode(&mut request_bytes) {
1185 tracing::error!("Failed to encode initial request: {}", e);
1186 return;
1187 }
1188
1189 let remote_response = ctx.create_method_invoke(
1191 invoke_id,
1192 &service_id,
1193 "SyncDocument",
1194 volt::MethodType::Bidi,
1195 request_bytes,
1196 );
1197
1198 match ctx.create_encrypted_request(invoke_id, &remote_response) {
1200 Ok(invoke_req) => {
1201 if let Err(e) = invoke_tx.send(invoke_req).await {
1202 tracing::error!("Failed to send initial invoke request: {}", e);
1203 return;
1204 }
1205 }
1206 Err(e) => {
1207 tracing::error!("Failed to create encrypted request: {}", e);
1208 return;
1209 }
1210 }
1211 }
1212
1213 let invoke_tx_clone = invoke_tx.clone();
1215 let relay_context_clone = relay_context.clone();
1216 let forward_task = tokio::spawn(async move {
1217 while let Some(sync_req) = user_rx.recv().await {
1218 let ctx_guard = relay_context_clone.lock().await;
1219 let ctx = match ctx_guard.as_ref() {
1220 Some(c) => c,
1221 None => break,
1222 };
1223
1224 let mut request_bytes = Vec::new();
1226 if let Err(e) = sync_req.encode(&mut request_bytes) {
1227 tracing::error!("Failed to encode sync request: {}", e);
1228 continue;
1229 }
1230
1231 let remote_response = ctx.create_method_payload(invoke_id, request_bytes);
1233
1234 match ctx.create_encrypted_request(invoke_id, &remote_response) {
1236 Ok(invoke_req) => {
1237 drop(ctx_guard);
1238 if let Err(e) = invoke_tx_clone.send(invoke_req).await {
1239 tracing::error!("Failed to send invoke request: {}", e);
1240 break;
1241 }
1242 }
1243 Err(e) => {
1244 tracing::error!("Failed to create encrypted request: {}", e);
1245 }
1246 }
1247 }
1248 });
1249
1250 loop {
1252 let mut rx_guard = relay_invoke_rx.lock().await;
1253 let rx = match rx_guard.as_mut() {
1254 Some(r) => r,
1255 None => break,
1256 };
1257
1258 let response = match rx.message().await {
1259 Ok(Some(resp)) => resp,
1260 Ok(None) => {
1261 tracing::debug!("Invoke stream ended");
1262 break;
1263 }
1264 Err(e) => {
1265 tracing::error!("Error receiving invoke response: {}", e);
1266 let _ = response_tx.send(Err(e)).await;
1267 break;
1268 }
1269 };
1270 drop(rx_guard);
1271
1272 let mut ctx_guard = relay_context.lock().await;
1274 let ctx = match ctx_guard.as_mut() {
1275 Some(c) => c,
1276 None => break,
1277 };
1278
1279 let remote_request = match ctx.parse_response(&response) {
1280 Ok(Some(req)) => req,
1281 Ok(None) => continue, Err(e) => {
1283 tracing::error!("Failed to parse response: {}", e);
1284 continue;
1285 }
1286 };
1287 drop(ctx_guard);
1288
1289 match remote_request.payload {
1291 Some(volt::remote_request::Payload::MethodPayload(mp)) => {
1292 let payload = match mp.method_payload {
1293 Some(volt::method_payload::MethodPayload::Payload(p)) => p,
1294 Some(volt::method_payload::MethodPayload::JsonPayload(j)) => {
1295 j.into_bytes()
1296 }
1297 None => continue,
1298 };
1299
1300 match volt::SyncDocumentResponse::decode(payload.as_slice()) {
1301 Ok(sync_resp) => {
1302 if let Err(e) = response_tx.send(Ok(sync_resp)).await {
1303 tracing::error!("Failed to forward response: {}", e);
1304 break;
1305 }
1306 }
1307 Err(e) => {
1308 tracing::error!("Failed to decode SyncDocumentResponse: {}", e);
1309 }
1310 }
1311 }
1312 Some(volt::remote_request::Payload::MethodEnd(end)) => {
1313 tracing::debug!("Received MethodEnd for invoke_id={}", end.id);
1314 if end.error_code != 0 {
1315 let _ = response_tx
1316 .send(Err(tonic::Status::new(
1317 tonic::Code::from_i32(end.error_code),
1318 &end.error,
1319 )))
1320 .await;
1321 }
1322 break;
1323 }
1324 _ => {}
1325 }
1326 }
1327
1328 forward_task.abort();
1329 });
1330
1331 let relay_stream = SyncDocumentStream::Relay(response_stream);
1333
1334 Ok((user_tx, relay_stream))
1335 }
1336
1337 async fn unary_call_via_relay(
1343 &self,
1344 method: &str,
1345 request_json: serde_json::Value,
1346 ) -> Result<ApiResponse> {
1347 use crate::proto::volt::{method_payload, remote_request, MethodType};
1348 use crate::relay::RelayContext;
1349
1350 tracing::info!("Making relay call for method: {}", method);
1351
1352 let request_json = normalize_payload(request_json);
1353
1354 let volt_config = self.volt_config.as_ref().ok_or(VoltError::NotInitialized)?;
1356 let credential = self.credential.as_ref().ok_or(VoltError::NotInitialized)?;
1357 let target_did = &volt_config.id;
1358
1359 let mut relay_ctx = RelayContext::new(target_did.clone());
1361
1362 if let Some(public_key_pem) = &volt_config.public_key {
1364 if let Ok(pk_bytes) =
1365 crate::crypto::from_base64(&crate::crypto::strip_pem_headers(public_key_pem))
1366 {
1367 relay_ctx.set_target_public_key(pk_bytes);
1368 }
1369 }
1370
1371 let relay_public_key_b64 = relay_ctx.public_key_base64();
1373 let identity = credential.get_identity_metadata(
1374 Some(target_did),
1375 Some(&relay_public_key_b64),
1376 true,
1377 60,
1378 )?;
1379 let token = identity.token.clone();
1380
1381 let key_exchange_request = relay_ctx.create_key_exchange_request(&token);
1383 let invoke_id = key_exchange_request.invoke_id;
1384 tracing::info!(
1385 "Created key exchange request: invoke_id={}, target_did={:?}, token_len={}, iv_len={}",
1386 invoke_id,
1387 key_exchange_request.target_did,
1388 key_exchange_request.token.len(),
1389 key_exchange_request.iv.len()
1390 );
1391
1392 let mut volt_client = self.get_volt_client().await?;
1394 tracing::info!("Got VoltApiClient successfully");
1395
1396 let (invoke_tx, invoke_rx) = mpsc::channel::<volt::InvokeRequest>(10);
1398 tracing::info!("Created mpsc channel");
1399
1400 invoke_tx
1402 .send(key_exchange_request)
1403 .await
1404 .map_err(|_| VoltError::connection("Failed to send key exchange request"))?;
1405 tracing::info!("Sent key exchange request to channel");
1406
1407 let request_stream = ReceiverStream::new(invoke_rx);
1409 let mut request = tonic::Request::new(request_stream);
1410 tracing::info!("Created request stream");
1411
1412 {
1414 let req_metadata = request.metadata_mut();
1415 for key_and_value in identity.metadata.iter() {
1416 match key_and_value {
1417 tonic::metadata::KeyAndValueRef::Ascii(key, value) => {
1418 tracing::info!("Adding metadata: {}={:?}", key, value);
1419 req_metadata.insert(key, value.clone());
1420 }
1421 tonic::metadata::KeyAndValueRef::Binary(key, value) => {
1422 tracing::info!("Adding binary metadata: {}", key);
1423 req_metadata.insert_bin(key, value.clone());
1424 }
1425 }
1426 }
1427 }
1428 tracing::info!("Added auth metadata to request");
1429
1430 tracing::info!("Request metadata contents:");
1432 for key_and_value in request.metadata().iter() {
1433 match key_and_value {
1434 tonic::metadata::KeyAndValueRef::Ascii(key, value) => {
1435 tracing::info!(" {}={:?}", key, value);
1436 }
1437 tonic::metadata::KeyAndValueRef::Binary(key, _value) => {
1438 tracing::info!(" {} (binary)", key);
1439 }
1440 }
1441 }
1442
1443 tracing::info!("unary_call_via_relay: Calling invoke()...");
1448 let response = volt_client.invoke(request).await.map_err(|e| {
1449 tracing::error!("unary_call_via_relay: invoke() failed: {:?}", e);
1450 VoltError::connection(format!("Failed to start Invoke stream: {}", e))
1451 })?;
1452 tracing::info!("unary_call_via_relay: invoke() succeeded!");
1453
1454 let mut invoke_response_stream = response.into_inner();
1455
1456 tracing::debug!("Waiting for key exchange response...");
1458 let key_exchange_response = invoke_response_stream
1459 .message()
1460 .await
1461 .map_err(|e| {
1462 VoltError::connection(format!("Failed to receive key exchange response: {}", e))
1463 })?
1464 .ok_or_else(|| VoltError::connection("Invoke stream closed before key exchange"))?;
1465
1466 tracing::debug!(
1467 "Received key exchange response: invoke_id={}",
1468 key_exchange_response.invoke_id
1469 );
1470
1471 if let Some(key_exchange) = &key_exchange_response.key_exchange {
1473 relay_ctx.process_key_exchange_response(key_exchange)?;
1474 tracing::info!("Relay key exchange completed successfully");
1475 } else if let Some(volt::invoke_response::ResponsePayload::Status(status)) =
1476 &key_exchange_response.response_payload
1477 {
1478 return Err(VoltError::server(status.code, &status.message));
1479 } else {
1480 return Err(VoltError::protocol(
1481 "Expected key exchange response, got something else",
1482 ));
1483 }
1484
1485 let service_id = "tdx.volt_api.volt.v1.VoltAPI";
1487
1488 let request_bytes = self.serialize_request(method, &request_json)?;
1491
1492 let remote_response = relay_ctx.create_method_invoke(
1494 invoke_id,
1495 service_id,
1496 method,
1497 MethodType::Unary,
1498 request_bytes,
1499 );
1500
1501 let invoke_request = relay_ctx.create_encrypted_request(invoke_id, &remote_response)?;
1503 invoke_tx
1504 .send(invoke_request)
1505 .await
1506 .map_err(|e| VoltError::connection(format!("Failed to send method invoke: {}", e)))?;
1507
1508 let response = invoke_response_stream
1510 .message()
1511 .await
1512 .map_err(|e| VoltError::grpc(e.code(), e.message()))?
1513 .ok_or_else(|| VoltError::connection("Stream closed before response"))?;
1514
1515 let remote_request = relay_ctx
1517 .parse_response(&response)?
1518 .ok_or_else(|| VoltError::protocol("Expected payload response"))?;
1519
1520 match remote_request.payload {
1522 Some(remote_request::Payload::MethodPayload(mp)) => {
1523 let payload = match mp.method_payload {
1524 Some(method_payload::MethodPayload::Payload(p)) => p,
1525 Some(method_payload::MethodPayload::JsonPayload(j)) => j.into_bytes(),
1526 None => return Err(VoltError::protocol("Empty method payload")),
1527 };
1528
1529 let response_json = self.deserialize_response(method, &payload)?;
1531
1532 let status = response_json
1534 .get("status")
1535 .and_then(|s| serde_json::from_value(s.clone()).ok());
1536
1537 Ok(ApiResponse {
1538 status,
1539 payload: response_json,
1540 })
1541 }
1542 Some(remote_request::Payload::MethodEnd(end)) => {
1543 if end.error_code != 0 || !end.error.is_empty() {
1544 Err(VoltError::server(end.error_code, &end.error))
1545 } else {
1546 Err(VoltError::protocol("Received MethodEnd without response"))
1547 }
1548 }
1549 _ => Err(VoltError::protocol("Expected MethodPayload response")),
1550 }
1551 }
1552
1553 fn serialize_request(&self, method: &str, request: &serde_json::Value) -> Result<Vec<u8>> {
1555 use prost::Message;
1556
1557 match method {
1558 "GetResource" => {
1559 let req: volt::GetResourceRequest = serde_json::from_value(request.clone())?;
1560 let mut buf = Vec::new();
1561 req.encode(&mut buf)
1562 .map_err(|e| VoltError::serialization(e.to_string()))?;
1563 Ok(buf)
1564 }
1565 "GetResources" => {
1566 let req: volt::GetResourcesRequest = serde_json::from_value(request.clone())?;
1567 let mut buf = Vec::new();
1568 req.encode(&mut buf)
1569 .map_err(|e| VoltError::serialization(e.to_string()))?;
1570 Ok(buf)
1571 }
1572 "CanAccessResource" => {
1573 let req: volt::CanAccessResourceRequest = serde_json::from_value(request.clone())?;
1574 let mut buf = Vec::new();
1575 req.encode(&mut buf)
1576 .map_err(|e| VoltError::serialization(e.to_string()))?;
1577 Ok(buf)
1578 }
1579 "DeleteResource" => {
1580 let req: volt::DeleteResourceRequest = serde_json::from_value(request.clone())?;
1581 let mut buf = Vec::new();
1582 req.encode(&mut buf)
1583 .map_err(|e| VoltError::serialization(e.to_string()))?;
1584 Ok(buf)
1585 }
1586 "SaveResource" => {
1587 let req: volt::SaveResourceRequest = serde_json::from_value(request.clone())?;
1588 let mut buf = Vec::new();
1589 req.encode(&mut buf)
1590 .map_err(|e| VoltError::serialization(e.to_string()))?;
1591 Ok(buf)
1592 }
1593 "RequestAccess" => {
1594 let req: volt::RequestAccessRequest = serde_json::from_value(request.clone())?;
1595 let mut buf = Vec::new();
1596 req.encode(&mut buf)
1597 .map_err(|e| VoltError::serialization(e.to_string()))?;
1598 Ok(buf)
1599 }
1600 "Authenticate" => {
1601 let req: volt::AuthenticateRequest = serde_json::from_value(request.clone())?;
1602 let mut buf = Vec::new();
1603 req.encode(&mut buf)
1604 .map_err(|e| VoltError::serialization(e.to_string()))?;
1605 Ok(buf)
1606 }
1607 "DiscoverServices" => {
1608 let req: volt::DiscoverServicesRequest = serde_json::from_value(request.clone())?;
1609 let mut buf = Vec::new();
1610 req.encode(&mut buf)
1611 .map_err(|e| VoltError::serialization(e.to_string()))?;
1612 Ok(buf)
1613 }
1614 "GetResourceAncestors" => {
1615 let req: volt::GetResourceAncestorsRequest =
1616 serde_json::from_value(request.clone())?;
1617 let mut buf = Vec::new();
1618 req.encode(&mut buf)
1619 .map_err(|e| VoltError::serialization(e.to_string()))?;
1620 Ok(buf)
1621 }
1622 "GetResourceDescendants" => {
1623 let req: volt::GetResourceDescendantsRequest =
1624 serde_json::from_value(request.clone())?;
1625 let mut buf = Vec::new();
1626 req.encode(&mut buf)
1627 .map_err(|e| VoltError::serialization(e.to_string()))?;
1628 Ok(buf)
1629 }
1630 "SaveAccess" => {
1631 let req: volt::SaveAccessRequest = serde_json::from_value(request.clone())?;
1632 let mut buf = Vec::new();
1633 req.encode(&mut buf)
1634 .map_err(|e| VoltError::serialization(e.to_string()))?;
1635 Ok(buf)
1636 }
1637 "GetIdentities" => {
1638 let req: volt::GetIdentitiesRequest = serde_json::from_value(request.clone())?;
1639 let mut buf = Vec::new();
1640 req.encode(&mut buf)
1641 .map_err(|e| VoltError::serialization(e.to_string()))?;
1642 Ok(buf)
1643 }
1644 "GetIdentity" => {
1645 let req: volt::GetIdentityRequest = serde_json::from_value(request.clone())?;
1646 let mut buf = Vec::new();
1647 req.encode(&mut buf)
1648 .map_err(|e| VoltError::serialization(e.to_string()))?;
1649 Ok(buf)
1650 }
1651 "GetPolicy" => {
1652 let req: volt::GetPolicyRequest = serde_json::from_value(request.clone())?;
1653 let mut buf = Vec::new();
1654 req.encode(&mut buf)
1655 .map_err(|e| VoltError::serialization(e.to_string()))?;
1656 Ok(buf)
1657 }
1658 "GetAccess" => {
1659 let req: volt::GetAccessRequest = serde_json::from_value(request.clone())?;
1660 let mut buf = Vec::new();
1661 req.encode(&mut buf)
1662 .map_err(|e| VoltError::serialization(e.to_string()))?;
1663 Ok(buf)
1664 }
1665 "SaveIdentity" => {
1666 let req: volt::SaveIdentityRequest = serde_json::from_value(request.clone())?;
1667 let mut buf = Vec::new();
1668 req.encode(&mut buf)
1669 .map_err(|e| VoltError::serialization(e.to_string()))?;
1670 Ok(buf)
1671 }
1672 "Shutdown" => {
1673 let req: volt::ShutdownRequest = serde_json::from_value(request.clone())?;
1674 let mut buf = Vec::new();
1675 req.encode(&mut buf)
1676 .map_err(|e| VoltError::serialization(e.to_string()))?;
1677 Ok(buf)
1678 }
1679 "GetFileDescendants" => {
1680 let req: volt::GetFileDescendantsRequest = serde_json::from_value(request.clone())?;
1681 let mut buf = Vec::new();
1682 req.encode(&mut buf)
1683 .map_err(|e| VoltError::serialization(e.to_string()))?;
1684 Ok(buf)
1685 }
1686 "GetFileContent" => {
1687 let req: volt::GetFileContentRequest = serde_json::from_value(request.clone())?;
1688 let mut buf = Vec::new();
1689 req.encode(&mut buf)
1690 .map_err(|e| VoltError::serialization(e.to_string()))?;
1691 Ok(buf)
1692 }
1693 "SetFileContent" => {
1694 let req: volt::SetFileContentRequest = serde_json::from_value(request.clone())?;
1695 let mut buf = Vec::new();
1696 req.encode(&mut buf)
1697 .map_err(|e| VoltError::serialization(e.to_string()))?;
1698 Ok(buf)
1699 }
1700 _ => Err(VoltError::MethodNotFound(format!(
1701 "Unknown method: {}",
1702 method
1703 ))),
1704 }
1705 }
1706
1707 fn deserialize_response(&self, method: &str, data: &[u8]) -> Result<serde_json::Value> {
1709 use prost::Message;
1710
1711 match method {
1712 "GetResource" => {
1713 let resp = volt::GetResourceResponse::decode(data)
1714 .map_err(|e| VoltError::serialization(e.to_string()))?;
1715 Ok(serde_json::to_value(&resp)?)
1716 }
1717 "GetResources" => {
1718 let resp = volt::GetResourcesResponse::decode(data)
1719 .map_err(|e| VoltError::serialization(e.to_string()))?;
1720 Ok(serde_json::to_value(&resp)?)
1721 }
1722 "CanAccessResource" => {
1723 let resp = volt::CanAccessResourceResponse::decode(data)
1724 .map_err(|e| VoltError::serialization(e.to_string()))?;
1725 Ok(serde_json::to_value(&resp)?)
1726 }
1727 "DeleteResource" => {
1728 let resp = volt::DeleteResourceResponse::decode(data)
1729 .map_err(|e| VoltError::serialization(e.to_string()))?;
1730 Ok(serde_json::to_value(&resp)?)
1731 }
1732 "SaveResource" => {
1733 let resp = volt::SaveResourceResponse::decode(data)
1734 .map_err(|e| VoltError::serialization(e.to_string()))?;
1735 Ok(serde_json::to_value(&resp)?)
1736 }
1737 "RequestAccess" => {
1738 let resp = volt::RequestAccessResponse::decode(data)
1739 .map_err(|e| VoltError::serialization(e.to_string()))?;
1740 Ok(serde_json::to_value(&resp)?)
1741 }
1742 "Authenticate" => {
1743 let resp = volt::AuthenticateResponse::decode(data)
1744 .map_err(|e| VoltError::serialization(e.to_string()))?;
1745 Ok(serde_json::to_value(&resp)?)
1746 }
1747 "DiscoverServices" => {
1748 let resp = volt::DiscoverServicesResponse::decode(data)
1749 .map_err(|e| VoltError::serialization(e.to_string()))?;
1750 Ok(serde_json::to_value(&resp)?)
1751 }
1752 "GetResourceAncestors" => {
1753 let resp = volt::GetResourceAncestorsResponse::decode(data)
1754 .map_err(|e| VoltError::serialization(e.to_string()))?;
1755 Ok(serde_json::to_value(&resp)?)
1756 }
1757 "GetResourceDescendants" => {
1758 let resp = volt::GetResourceDescendantsResponse::decode(data)
1759 .map_err(|e| VoltError::serialization(e.to_string()))?;
1760 Ok(serde_json::to_value(&resp)?)
1761 }
1762 "SaveAccess" => {
1763 let resp = volt::SaveAccessResponse::decode(data)
1764 .map_err(|e| VoltError::serialization(e.to_string()))?;
1765 Ok(serde_json::to_value(&resp)?)
1766 }
1767 "GetIdentities" => {
1768 let resp = volt::GetIdentitiesResponse::decode(data)
1769 .map_err(|e| VoltError::serialization(e.to_string()))?;
1770 Ok(serde_json::to_value(&resp)?)
1771 }
1772 "GetIdentity" => {
1773 let resp = volt::GetIdentityResponse::decode(data)
1774 .map_err(|e| VoltError::serialization(e.to_string()))?;
1775 Ok(serde_json::to_value(&resp)?)
1776 }
1777 "GetPolicy" => {
1778 let resp = volt::GetPolicyResponse::decode(data)
1779 .map_err(|e| VoltError::serialization(e.to_string()))?;
1780 Ok(serde_json::to_value(&resp)?)
1781 }
1782 "GetAccess" => {
1783 let resp = volt::GetAccessResponse::decode(data)
1784 .map_err(|e| VoltError::serialization(e.to_string()))?;
1785 Ok(serde_json::to_value(&resp)?)
1786 }
1787 "SaveIdentity" => {
1788 let resp = volt::SaveIdentityResponse::decode(data)
1789 .map_err(|e| VoltError::serialization(e.to_string()))?;
1790 Ok(serde_json::to_value(&resp)?)
1791 }
1792 "Shutdown" => {
1793 let resp = volt::ShutdownResponse::decode(data)
1794 .map_err(|e| VoltError::serialization(e.to_string()))?;
1795 Ok(serde_json::to_value(&resp)?)
1796 }
1797 "GetFileDescendants" => {
1798 let resp = volt::GetFileDescendantsResponse::decode(data)
1799 .map_err(|e| VoltError::serialization(e.to_string()))?;
1800 Ok(serde_json::to_value(&resp)?)
1801 }
1802 "GetFileContent" => {
1803 let resp = volt::GetFileContentResponse::decode(data)
1804 .map_err(|e| VoltError::serialization(e.to_string()))?;
1805 Ok(serde_json::to_value(&resp)?)
1806 }
1807 "SetFileContent" => {
1808 let resp = volt::SetFileContentResponse::decode(data)
1809 .map_err(|e| VoltError::serialization(e.to_string()))?;
1810 Ok(serde_json::to_value(&resp)?)
1811 }
1812 _ => Err(VoltError::MethodNotFound(format!(
1813 "Unknown method: {}",
1814 method
1815 ))),
1816 }
1817 }
1818
1819 async fn unary_call(&self, method: &str, request: serde_json::Value) -> Result<ApiResponse> {
1821 tracing::debug!("Calling {} with {:?}", method, request);
1822
1823 if self.is_relayed() {
1825 return self.unary_call_via_relay(method, request).await;
1826 }
1827
1828 let mut client = self.get_volt_client().await?;
1830
1831 macro_rules! auth_request {
1833 ($req:expr) => {{
1834 let mut request = tonic::Request::new($req);
1835 self.add_auth_to_request(&mut request);
1836 request
1837 }};
1838 }
1839
1840 let response_json = match method {
1842 "GetResource" => {
1843 let req: volt::GetResourceRequest = serde_json::from_value(request)?;
1844 let response = client
1845 .get_resource(auth_request!(req))
1846 .await
1847 .map_err(VoltError::GrpcError)?;
1848 let resp = response.into_inner();
1849 serde_json::to_value(&resp)?
1850 }
1851 "GetResources" => {
1852 let req: volt::GetResourcesRequest = serde_json::from_value(request)?;
1853 let response = client
1854 .get_resources(auth_request!(req))
1855 .await
1856 .map_err(VoltError::GrpcError)?;
1857 let resp = response.into_inner();
1858 serde_json::to_value(&resp)?
1859 }
1860 "CanAccessResource" => {
1861 let req: volt::CanAccessResourceRequest = serde_json::from_value(request)?;
1862 let response = client
1863 .can_access_resource(auth_request!(req))
1864 .await
1865 .map_err(VoltError::GrpcError)?;
1866 let resp = response.into_inner();
1867 serde_json::to_value(&resp)?
1868 }
1869 "DeleteResource" => {
1870 let req: volt::DeleteResourceRequest = serde_json::from_value(request)?;
1871 let response = client
1872 .delete_resource(auth_request!(req))
1873 .await
1874 .map_err(VoltError::GrpcError)?;
1875 let resp = response.into_inner();
1876 serde_json::to_value(&resp)?
1877 }
1878 "SaveResource" => {
1879 let req: volt::SaveResourceRequest = serde_json::from_value(request)?;
1880 let response = client
1881 .save_resource(auth_request!(req))
1882 .await
1883 .map_err(VoltError::GrpcError)?;
1884 let resp = response.into_inner();
1885 serde_json::to_value(&resp)?
1886 }
1887 "RequestAccess" => {
1888 let req: volt::RequestAccessRequest = serde_json::from_value(request)?;
1889 let response = client
1890 .request_access(auth_request!(req))
1891 .await
1892 .map_err(VoltError::GrpcError)?;
1893 let resp = response.into_inner();
1894 serde_json::to_value(&resp)?
1895 }
1896 "Authenticate" => {
1897 let req: volt::AuthenticateRequest = serde_json::from_value(request)?;
1898 let response = client
1899 .authenticate(auth_request!(req))
1900 .await
1901 .map_err(VoltError::GrpcError)?;
1902 let resp = response.into_inner();
1903 serde_json::to_value(&resp)?
1904 }
1905 "DiscoverServices" => {
1906 let req: volt::DiscoverServicesRequest = serde_json::from_value(request)?;
1907 let response = client
1908 .discover_services(auth_request!(req))
1909 .await
1910 .map_err(VoltError::GrpcError)?;
1911 let resp = response.into_inner();
1912 serde_json::to_value(&resp)?
1913 }
1914 "GetResourceAncestors" => {
1915 let req: volt::GetResourceAncestorsRequest = serde_json::from_value(request)?;
1916 let response = client
1917 .get_resource_ancestors(auth_request!(req))
1918 .await
1919 .map_err(VoltError::GrpcError)?;
1920 let resp = response.into_inner();
1921 serde_json::to_value(&resp)?
1922 }
1923 "GetResourceDescendants" => {
1924 let req: volt::GetResourceDescendantsRequest = serde_json::from_value(request)?;
1925 let response = client
1926 .get_resource_descendants(auth_request!(req))
1927 .await
1928 .map_err(VoltError::GrpcError)?;
1929 let resp = response.into_inner();
1930 serde_json::to_value(&resp)?
1931 }
1932 "SaveAccess" => {
1933 let req: volt::SaveAccessRequest = serde_json::from_value(request)?;
1934 let response = client
1935 .save_access(auth_request!(req))
1936 .await
1937 .map_err(VoltError::GrpcError)?;
1938 let resp = response.into_inner();
1939 serde_json::to_value(&resp)?
1940 }
1941 "GetIdentities" => {
1942 let req: volt::GetIdentitiesRequest = serde_json::from_value(request)?;
1943 let response = client
1944 .get_identities(auth_request!(req))
1945 .await
1946 .map_err(VoltError::GrpcError)?;
1947 let resp = response.into_inner();
1948 serde_json::to_value(&resp)?
1949 }
1950 "GetIdentity" => {
1951 let req: volt::GetIdentityRequest = serde_json::from_value(request)?;
1952 let response = client
1953 .get_identity(auth_request!(req))
1954 .await
1955 .map_err(VoltError::GrpcError)?;
1956 let resp = response.into_inner();
1957 serde_json::to_value(&resp)?
1958 }
1959 "GetPolicy" => {
1960 let req: volt::GetPolicyRequest = serde_json::from_value(request)?;
1961 let response = client
1962 .get_policy(auth_request!(req))
1963 .await
1964 .map_err(VoltError::GrpcError)?;
1965 let resp = response.into_inner();
1966 serde_json::to_value(&resp)?
1967 }
1968 "GetAccess" => {
1969 let req: volt::GetAccessRequest = serde_json::from_value(request)?;
1970 let response = client
1971 .get_access(auth_request!(req))
1972 .await
1973 .map_err(VoltError::GrpcError)?;
1974 let resp = response.into_inner();
1975 serde_json::to_value(&resp)?
1976 }
1977 "SaveIdentity" => {
1978 let req: volt::SaveIdentityRequest = serde_json::from_value(request)?;
1979 let response = client
1980 .save_identity(auth_request!(req))
1981 .await
1982 .map_err(VoltError::GrpcError)?;
1983 let resp = response.into_inner();
1984 serde_json::to_value(&resp)?
1985 }
1986 "Shutdown" => {
1987 let req: volt::ShutdownRequest = serde_json::from_value(request)?;
1988 let response = client
1989 .shutdown(auth_request!(req))
1990 .await
1991 .map_err(VoltError::GrpcError)?;
1992 let resp = response.into_inner();
1993 serde_json::to_value(&resp)?
1994 }
1995 _ => {
1996 return Err(VoltError::MethodNotFound(format!(
1997 "Unknown method: {}",
1998 method
1999 )));
2000 }
2001 };
2002
2003 let status = response_json
2005 .get("status")
2006 .and_then(|s| serde_json::from_value(s.clone()).ok());
2007
2008 Ok(ApiResponse {
2009 status,
2010 payload: response_json,
2011 })
2012 }
2013}
2014
2015impl Default for VoltClient {
2016 fn default() -> Self {
2017 Self::new().expect("Failed to create default VoltClient")
2018 }
2019}
2020
2021pub enum ConfigSource {
2023 File(std::path::PathBuf),
2025 Config(VoltClientConfig),
2027 Json(serde_json::Value),
2029}
2030
2031impl From<&str> for ConfigSource {
2032 fn from(s: &str) -> Self {
2033 ConfigSource::File(std::path::PathBuf::from(s))
2034 }
2035}
2036
2037impl From<std::path::PathBuf> for ConfigSource {
2038 fn from(p: std::path::PathBuf) -> Self {
2039 ConfigSource::File(p)
2040 }
2041}
2042
2043impl From<VoltClientConfig> for ConfigSource {
2044 fn from(c: VoltClientConfig) -> Self {
2045 ConfigSource::Config(c)
2046 }
2047}
2048
2049impl From<serde_json::Value> for ConfigSource {
2050 fn from(v: serde_json::Value) -> Self {
2051 ConfigSource::Json(v)
2052 }
2053}
2054
2055#[cfg(test)]
2056mod tests {
2057 use super::*;
2058
2059 #[tokio::test]
2060 async fn test_client_creation() {
2061 let client = VoltClient::new().unwrap();
2062 assert!(client.config().is_none());
2063 assert!(!client.is_connected().await);
2064 }
2065
2066 #[test]
2067 fn test_config_source() {
2068 let _source: ConfigSource = "path/to/config.json".into();
2069 let _source: ConfigSource = std::path::PathBuf::from("config.json").into();
2070 let _source: ConfigSource = serde_json::json!({"client_name": "test"}).into();
2071 }
2072}
2073
2074fn normalize_payload(value: serde_json::Value) -> serde_json::Value {
2076 match value {
2077 serde_json::Value::Object(map) => {
2078 let mut new_map = serde_json::Map::new();
2079 for (key, val) in map {
2080 if key == "subject" {
2081 if let serde_json::Value::Object(mut subject_map) = val {
2082 if let Some(serde_json::Value::String(s)) =
2083 subject_map.get("credentialLookup")
2084 {
2085 if s == "'[EVERYONE]'" {
2086 subject_map.clear();
2088 }
2089 } else if let Some(serde_json::Value::String(s)) =
2090 subject_map.get("identityDid")
2091 {
2092 if s.is_empty() {
2093 subject_map.insert(
2094 "identityDid".to_string(),
2095 serde_json::Value::String(
2096 "did:volt:00000000-0000-0000-0000-000000000000".to_string(),
2097 ),
2098 );
2099 }
2100 }
2101 if !subject_map.is_empty() {
2102 new_map.insert(key, serde_json::Value::Object(subject_map));
2103 }
2104 } else {
2105 new_map.insert(key, normalize_payload(val));
2106 }
2107 } else {
2108 new_map.insert(key, normalize_payload(val));
2109 }
2110 }
2111 serde_json::Value::Object(new_map)
2112 }
2113 serde_json::Value::Array(arr) => {
2114 serde_json::Value::Array(arr.into_iter().map(normalize_payload).collect())
2115 }
2116 other => other,
2117 }
2118}