Skip to main content

volt_client_grpc/
volt_client.rs

1//! Volt gRPC client.
2//!
3//! The main client interface for interacting with a Volt server.
4
5use futures::Stream;
6use std::collections::HashMap;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10use tokio::sync::{mpsc, Mutex, RwLock};
11use tokio_stream::wrappers::ReceiverStream;
12use tonic::transport::{Certificate, Channel, ClientTlsConfig, Identity};
13use tonic::Streaming;
14
15use crate::config::{resolve_volt_config, InitialiseOptions, VoltClientConfig, VoltConfig};
16use crate::constants::{PolicyDecision, VoltAccessType};
17use crate::credential::VoltCredential;
18use crate::error::{Result, VoltError};
19use crate::proto::*;
20use crate::relay::{RelayContext, RelayState};
21use crate::utils::get_local_ip;
22use crate::volt_connection::{EventReceiver, VoltConnection};
23
24// ==============================================================================
25// SyncDocumentStream - A wrapper type that abstracts over different stream sources
26// ==============================================================================
27
28/// A stream of SyncDocumentResponse items that can come from either:
29/// - A direct gRPC streaming connection
30/// - A relay-wrapped channel connection
31///
32/// This type implements `Stream` so it can be used with `StreamExt` methods.
33pub enum SyncDocumentStream {
34    /// Direct gRPC streaming connection
35    Direct(Streaming<volt::SyncDocumentResponse>),
36    /// Relay-wrapped channel connection
37    Relay(ReceiverStream<std::result::Result<volt::SyncDocumentResponse, tonic::Status>>),
38}
39
40impl Stream for SyncDocumentStream {
41    type Item = std::result::Result<volt::SyncDocumentResponse, tonic::Status>;
42
43    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
44        match self.get_mut() {
45            SyncDocumentStream::Direct(stream) => {
46                // SAFETY: We're pinning the inner stream which is safe because
47                // we're immediately calling poll_next on it
48                Pin::new(stream).poll_next(cx)
49            }
50            SyncDocumentStream::Relay(stream) => Pin::new(stream).poll_next(cx),
51        }
52    }
53}
54
55/// The main Volt client
56pub struct VoltClient {
57    /// Client configuration
58    config: Option<VoltClientConfig>,
59    /// Credential manager
60    credential: Option<VoltCredential>,
61    /// Volt-specific configuration
62    volt_config: Option<VoltConfig>,
63    /// Active connection
64    connection: Option<VoltConnection>,
65    /// Cached gRPC channel
66    cached_channel: Arc<Mutex<Option<Channel>>>,
67    /// Cached service clients
68    cached_services: Arc<RwLock<HashMap<String, Channel>>>,
69    /// Connected state
70    connected: Arc<RwLock<bool>>,
71    /// Session ID
72    session: Arc<RwLock<Option<String>>>,
73    /// Relay context (for relayed connections)
74    relay_context: Arc<Mutex<Option<RelayContext>>>,
75    /// Relay invoke stream sender
76    relay_invoke_tx: Arc<Mutex<Option<mpsc::Sender<volt::InvokeRequest>>>>,
77    /// Relay invoke stream receiver
78    relay_invoke_rx: Arc<Mutex<Option<Streaming<volt::InvokeResponse>>>>,
79}
80
81impl VoltClient {
82    /// Create a new Volt client
83    pub fn new() -> Result<Self> {
84        Ok(Self {
85            config: None,
86            credential: None,
87            volt_config: None,
88            connection: None,
89            cached_channel: Arc::new(Mutex::new(None)),
90            cached_services: Arc::new(RwLock::new(HashMap::new())),
91            connected: Arc::new(RwLock::new(false)),
92            session: Arc::new(RwLock::new(None)),
93            relay_context: Arc::new(Mutex::new(None)),
94            relay_invoke_tx: Arc::new(Mutex::new(None)),
95            relay_invoke_rx: Arc::new(Mutex::new(None)),
96        })
97    }
98
99    /// Get the client configuration
100    pub fn config(&self) -> Option<&VoltClientConfig> {
101        self.config.as_ref()
102    }
103
104    /// Get the credential manager
105    pub fn credential(&self) -> Option<&VoltCredential> {
106        self.credential.as_ref()
107    }
108
109    /// Get mutable credential manager
110    pub fn credential_mut(&mut self) -> Option<&mut VoltCredential> {
111        self.credential.as_mut()
112    }
113
114    /// Get the Volt configuration
115    pub fn volt_config(&self) -> Option<&VoltConfig> {
116        self.volt_config.as_ref()
117    }
118
119    /// Check if connected via relay
120    pub fn is_relayed(&self) -> bool {
121        self.volt_config
122            .as_ref()
123            .map(|c| c.relay.is_some() && !c.id.is_empty())
124            .unwrap_or(false)
125    }
126
127    /// Check if connected
128    pub async fn is_connected(&self) -> bool {
129        *self.connected.read().await
130    }
131
132    /// Close the client and release resources
133    pub async fn close(&mut self) {
134        // Close cached channel
135        *self.cached_channel.lock().await = None;
136
137        // Close cached services
138        self.cached_services.write().await.clear();
139    }
140
141    /// Initialize the client with configuration
142    ///
143    /// # Arguments
144    /// * `config` - Either a path to a config file or a configuration object
145    /// * `options` - Initialization options
146    pub async fn initialise(
147        &mut self,
148        config: ConfigSource,
149        options: InitialiseOptions,
150    ) -> Result<VoltClientConfig> {
151        tracing::debug!("Initializing Volt client");
152
153        // Load configuration
154        let (mut config, config_path) = match config {
155            ConfigSource::File(path) => {
156                let path_str = path.to_string_lossy().to_string();
157                tracing::debug!("Loading config from file: {}", path_str);
158
159                if path.exists() {
160                    let config = VoltClientConfig::from_file(&path).await?;
161                    (config, Some(path_str))
162                } else {
163                    // File doesn't exist yet, use defaults
164                    (VoltClientConfig::default(), Some(path_str))
165                }
166            }
167            ConfigSource::Config(config) => (config, None),
168            ConfigSource::Json(json) => (VoltClientConfig::from_value(json)?, None),
169        };
170
171        // Merge extra config if provided
172        if let Some(extra) = options.extra_config {
173            if let Ok(extra_config) = serde_json::from_value::<VoltClientConfig>(extra) {
174                // Merge fields (extra config is lower priority)
175                if config.client_name.is_empty() {
176                    config.client_name = extra_config.client_name;
177                }
178            }
179        }
180
181        // Validate configuration
182        if config.client_name.is_empty() {
183            return Err(VoltError::missing_config("client_name"));
184        }
185
186        // Resolve Volt configuration (DID resolution if needed)
187        let registry_list = options.did_registry_list.as_deref();
188        config.volt = resolve_volt_config(config.volt, registry_list).await?;
189        self.volt_config = Some(config.volt.clone());
190
191        tracing::debug!("Volt config: {:?}", self.volt_config);
192
193        if config.volt.id.is_empty() {
194            return Err(VoltError::missing_config("volt.id"));
195        }
196
197        // Create credential manager
198        let mut credential = VoltCredential::new(config.clone(), config_path)?;
199        credential.initialise().await?;
200
201        // If not relayed, we need address and CA
202        if !self.is_relayed() {
203            if config.volt.address.is_none() {
204                return Err(VoltError::missing_config("volt.address"));
205            }
206            if config.volt.ca_pem.is_none() {
207                return Err(VoltError::missing_config("volt.ca_pem"));
208            }
209        }
210
211        // Set config before authentication (authenticate_internal needs it)
212        self.config = Some(config.clone());
213
214        // Authenticate if we don't have a certificate
215        let is_bound = if credential.certificate().is_some() {
216            true
217        } else {
218            self.authenticate_internal(
219                &mut credential,
220                options.no_did,
221                options.own_did,
222                options.exchange_token.as_deref(),
223                options.wait_for_auth,
224            )
225            .await?
226        };
227
228        if !is_bound {
229            return Err(VoltError::auth("Volt authentication failed"));
230        }
231
232        // Save the cache
233        credential.save_cache().await?;
234
235        self.credential = Some(credential);
236
237        Ok(config)
238    }
239
240    /// Initialize and connect in one call
241    pub async fn initialise_and_connect(
242        &mut self,
243        config: ConfigSource,
244        options: InitialiseOptions,
245        hello_payload: Option<serde_json::Value>,
246    ) -> Result<EventReceiver> {
247        self.initialise(config, options).await?;
248        self.connect(hello_payload).await
249    }
250
251    /// Connect to the Volt server
252    pub async fn connect(
253        &mut self,
254        hello_payload: Option<serde_json::Value>,
255    ) -> Result<EventReceiver> {
256        if self.connection.is_some() {
257            tracing::debug!("Already have a connection");
258            return Err(VoltError::AlreadyConnected);
259        }
260
261        let config = self.config.as_ref().ok_or(VoltError::NotInitialized)?;
262
263        let mut connection = VoltConnection::new(config);
264        let event_rx = connection.connect(hello_payload).await?;
265
266        self.connection = Some(connection);
267
268        Ok(event_rx)
269    }
270
271    /// Disconnect from the Volt server
272    pub async fn disconnect(&mut self) {
273        if let Some(ref mut connection) = self.connection {
274            connection.disconnect().await;
275        }
276        self.connection = None;
277        *self.connected.write().await = false;
278        *self.session.write().await = None;
279        self.close().await;
280    }
281
282    /// Internal authentication logic
283    async fn authenticate_internal(
284        &self,
285        credential: &mut VoltCredential,
286        no_did: bool,
287        own_did: bool,
288        exchange_token: Option<&str>,
289        wait_for_permit: bool,
290    ) -> Result<bool> {
291        let config = self.config.as_ref().ok_or(VoltError::NotInitialized)?;
292        let volt_config = self.volt_config.as_ref().ok_or(VoltError::NotInitialized)?;
293
294        // Ensure we have CA certificate
295        let cache = credential.cache_mut();
296        if cache.ca.is_none() {
297            cache.ca = volt_config.ca_pem.clone();
298        }
299
300        if cache.ca.is_none() {
301            return Err(VoltError::CertificateError(
302                "No CA certificate found".into(),
303            ));
304        }
305
306        // Get or create key
307        credential.create_key().await?;
308        let public_key_pem = credential.public_key_pem()?;
309
310        // Default bind IP
311        if credential
312            .cache()
313            .and_then(|c| c.bind_ip.as_ref())
314            .is_none()
315        {
316            let ip = get_local_ip().unwrap_or_else(|_| "127.0.0.1".to_string());
317            credential.cache_mut().bind_ip = Some(ip);
318        }
319
320        credential.save_cache().await?;
321
322        // Build authenticate request
323        let mut auth_request = AuthenticateRequest {
324            client_name: config.client_name.clone(),
325            host: credential.cache().and_then(|c| c.bind_ip.clone()),
326            ..Default::default()
327        };
328
329        // Add verifiable credentials if present
330        if let Some(vcs) = credential.cache().map(|c| &c.vc).filter(|v| !v.is_empty()) {
331            let mut presentation = VerifiablePresentation::default();
332            let mut presentation_data = String::new();
333
334            for vc in vcs {
335                let vc_str = serde_json::to_string_pretty(vc)?;
336                presentation.credential.push(vc_str.clone());
337                presentation_data.push_str(&vc_str);
338            }
339
340            let key = credential.get_key()?;
341            presentation.signature = key.sign_base64(presentation_data.as_bytes());
342            auth_request.verifiable_presentation.push(presentation);
343        }
344
345        // Add exchange token if provided
346        if let Some(token) = exchange_token {
347            auth_request.exchange_token = Some(token.to_string());
348        }
349
350        // Add challenge code if present
351        if let Some(challenge) = credential.cache().and_then(|c| c.challenge_code.clone()) {
352            auth_request.challenge = Some(challenge);
353        }
354
355        // Determine auth type
356        if no_did {
357            // Session-only authentication
358            auth_request.public_key = Some(public_key_pem);
359        } else if credential.identity_did().is_none() {
360            if !own_did {
361                // Let Volt create DID for us
362                auth_request.did_public_key = Some(public_key_pem);
363            } else {
364                // Create our own DID
365                let did = format!("did:volt:{}", uuid::Uuid::new_v4());
366                credential.cache_mut().identity_did = Some(did.clone());
367
368                // Create DID document
369                let did_document = serde_json::json!({
370                    "@context": [
371                        "https://www.w3.org/ns/did/v1",
372                        "https://tdxvolt.com/ns/did/v1"
373                    ],
374                    "authentication": [format!("{}#key-1", did)],
375                    "controller": did,
376                    "id": did,
377                    "verificationMethod": [{
378                        "id": format!("{}#key-1", did),
379                        "publicKeyPem": public_key_pem,
380                        "type": "Ed25519Signature2018"
381                    }]
382                });
383
384                let did_doc_str = serde_json::to_string_pretty(&did_document)?;
385                let key = credential.get_key()?;
386                let signature = key.sign_base64(did_doc_str.as_bytes());
387
388                auth_request.did_document = Some(did_doc_str);
389                auth_request.did_document_signature = Some(signature);
390            }
391        } else {
392            // Use existing DID
393            auth_request.did = credential.identity_did().map(|s| s.to_string());
394        }
395
396        // Poll for authentication decision
397        let poll_interval = std::time::Duration::from_millis(10000);
398        let mut decision = PolicyDecision::Unknown;
399
400        loop {
401            if decision != PolicyDecision::Unknown {
402                tokio::time::sleep(poll_interval).await;
403            }
404
405            let response = self.issue_authenticate(&auth_request).await?;
406            decision = PolicyDecision::from_str(&response.decision);
407
408            tracing::debug!("Authenticate decision: {:?}", decision);
409
410            match decision {
411                PolicyDecision::Permit | PolicyDecision::Prompt | PolicyDecision::Pending => {
412                    // Update cache with response data
413                    let cache = credential.cache_mut();
414
415                    if let Some(cert) = response.cert {
416                        cache.cert = Some(cert);
417                    }
418                    if let Some(ca) = response.chain {
419                        cache.ca = Some(ca);
420                    }
421                    if let Some(session_id) = response.session_id {
422                        cache.session_id = Some(session_id);
423                    }
424                    if cache.identity_did.is_none() {
425                        cache.identity_did = response.identity_did;
426                    }
427
428                    // Add credentials to cache
429                    for cred_str in response.credential {
430                        if let Ok(vc) = serde_json::from_str(&cred_str) {
431                            // Check if VC already exists
432                            let exists = cache.vc.iter().any(|existing| {
433                                existing.get("id")
434                                    == serde_json::from_str::<serde_json::Value>(&cred_str)
435                                        .ok()
436                                        .and_then(|v| v.get("id").cloned())
437                                        .as_ref()
438                            });
439                            if !exists {
440                                cache.vc.push(vc);
441                            }
442                        }
443                    }
444
445                    credential.save_cache().await?;
446                }
447                PolicyDecision::Deny => {
448                    tracing::warn!("Access request denied");
449                    break;
450                }
451                _ => {
452                    tracing::debug!("Unknown decision: {}", response.decision);
453                }
454            }
455
456            // Check if we should continue polling
457            if decision == PolicyDecision::Permit {
458                break;
459            }
460            if decision == PolicyDecision::Pending {
461                continue;
462            }
463            if decision == PolicyDecision::Prompt && wait_for_permit {
464                continue;
465            }
466            break;
467        }
468
469        Ok(decision == PolicyDecision::Permit)
470    }
471
472    /// Issue an authenticate request
473    async fn issue_authenticate(
474        &self,
475        request: &AuthenticateRequest,
476    ) -> Result<AuthenticateResponse> {
477        if self.is_relayed() {
478            tracing::warn!("Relay authentication not yet implemented; returning simulated permit");
479            return Ok(AuthenticateResponse {
480                decision: "POLICY_DECISION_PERMIT".to_string(),
481                session_id: Some(uuid::Uuid::new_v4().to_string()),
482                ..Default::default()
483            });
484        }
485
486        tracing::debug!("Authenticating directly with Volt");
487
488        let request_value =
489            serde_json::to_value(request).map_err(|e| VoltError::serialization(e.to_string()))?;
490
491        let grpc_request: volt::AuthenticateRequest = serde_json::from_value(request_value)
492            .map_err(|e| VoltError::serialization(e.to_string()))?;
493
494        let mut client = self.get_volt_client().await?;
495        let response = client
496            .authenticate(tonic::Request::new(grpc_request))
497            .await
498            .map_err(VoltError::GrpcError)?;
499
500        let response_json = serde_json::to_value(&response.into_inner())
501            .map_err(|e| VoltError::serialization(e.to_string()))?;
502
503        let auth_response: AuthenticateResponse = serde_json::from_value(response_json)
504            .map_err(|e| VoltError::serialization(e.to_string()))?;
505
506        tracing::debug!(
507            "Authentication response decision: {}",
508            auth_response.decision
509        );
510
511        Ok(auth_response)
512    }
513
514    // ========== API Methods ==========
515
516    /// Request resource access (blocking until decision)
517    pub async fn request_access_blocking(
518        &self,
519        target_resource_id: &str,
520        access_type: VoltAccessType,
521    ) -> Result<bool> {
522        let request = AccessRequest {
523            resource_id: target_resource_id.to_string(),
524            access: access_type.as_str().to_string(),
525        };
526
527        let poll_interval = std::time::Duration::from_millis(10000);
528        let mut decision = PolicyDecision::Unknown;
529
530        loop {
531            if decision != PolicyDecision::Unknown {
532                tokio::time::sleep(poll_interval).await;
533            }
534
535            let response = self.request_access(request.clone()).await?;
536            decision = PolicyDecision::from_str(&response.decision);
537
538            tracing::debug!("Access decision: {:?}", decision);
539
540            if decision != PolicyDecision::Prompt && decision != PolicyDecision::Pending {
541                break;
542            }
543        }
544
545        Ok(decision == PolicyDecision::Permit)
546    }
547
548    /// Check if a resource can be accessed
549    pub async fn can_access_resource(&self, request: serde_json::Value) -> Result<ApiResponse> {
550        self.unary_call("CanAccessResource", request).await
551    }
552
553    /// Delete a resource
554    pub async fn delete_resource(&self, request: serde_json::Value) -> Result<ApiResponse> {
555        self.unary_call("DeleteResource", request).await
556    }
557
558    /// Discover services
559    pub async fn discover_services(&self, request: serde_json::Value) -> Result<ApiResponse> {
560        self.unary_call("DiscoverServices", request).await
561    }
562
563    /// Get a resource
564    pub async fn get_resource(&self, request: serde_json::Value) -> Result<ApiResponse> {
565        self.unary_call("GetResource", request).await
566    }
567
568    /// Get multiple resources
569    pub async fn get_resources(&self, request: serde_json::Value) -> Result<ApiResponse> {
570        self.unary_call("GetResources", request).await
571    }
572
573    /// Get resource ancestors
574    pub async fn get_resource_ancestors(&self, request: serde_json::Value) -> Result<ApiResponse> {
575        self.unary_call("GetResourceAncestors", request).await
576    }
577
578    /// Get resource descendants
579    pub async fn get_resource_descendants(
580        &self,
581        request: serde_json::Value,
582    ) -> Result<ApiResponse> {
583        self.unary_call("GetResourceDescendants", request).await
584    }
585
586    /// Request access to a resource
587    pub async fn request_access(&self, request: AccessRequest) -> Result<AccessResponse> {
588        let response = self
589            .unary_call("RequestAccess", serde_json::to_value(&request)?)
590            .await?;
591        serde_json::from_value(response.payload).map_err(VoltError::from)
592    }
593
594    /// Save a resource
595    pub async fn save_resource(&self, request: serde_json::Value) -> Result<ApiResponse> {
596        self.unary_call("SaveResource", request).await
597    }
598
599    /// Authenticate
600    pub async fn authenticate(&self, request: serde_json::Value) -> Result<ApiResponse> {
601        self.unary_call("Authenticate", request).await
602    }
603
604    /// Bind to the Volt
605    pub async fn bind(&self, request: serde_json::Value) -> Result<ApiResponse> {
606        self.unary_call("Bind", request).await
607    }
608
609    /// Get bindings
610    pub async fn get_bindings(&self, request: serde_json::Value) -> Result<ApiResponse> {
611        self.unary_call("GetBindings", request).await
612    }
613
614    /// Get identities
615    pub async fn get_identities(&self, request: serde_json::Value) -> Result<ApiResponse> {
616        self.unary_call("GetIdentities", request).await
617    }
618
619    /// Get a specific identity
620    pub async fn get_identity(&self, request: serde_json::Value) -> Result<ApiResponse> {
621        self.unary_call("GetIdentity", request).await
622    }
623
624    /// Get policy
625    pub async fn get_policy(&self, request: serde_json::Value) -> Result<ApiResponse> {
626        self.unary_call("GetPolicy", request).await
627    }
628
629    /// Get access permissions for a resource
630    pub async fn get_access(&self, request: serde_json::Value) -> Result<ApiResponse> {
631        self.unary_call("GetAccess", request).await
632    }
633
634    /// Get settings
635    pub async fn get_settings(&self, request: serde_json::Value) -> Result<ApiResponse> {
636        self.unary_call("GetSettings", request).await
637    }
638
639    /// Save access
640    pub async fn save_access(&self, request: serde_json::Value) -> Result<ApiResponse> {
641        self.unary_call("SaveAccess", request).await
642    }
643
644    /// Save identity
645    pub async fn save_identity(&self, request: serde_json::Value) -> Result<ApiResponse> {
646        self.unary_call("SaveIdentity", request).await
647    }
648
649    /// Save settings
650    pub async fn save_settings(&self, request: serde_json::Value) -> Result<ApiResponse> {
651        self.unary_call("SaveSettings", request).await
652    }
653
654    /// Shutdown the Volt
655    pub async fn shutdown(&self, request: serde_json::Value) -> Result<ApiResponse> {
656        self.unary_call("Shutdown", request).await
657    }
658
659    // ========== File API ==========
660
661    /// Get file descendants
662    pub async fn get_file_descendants(&self, request: serde_json::Value) -> Result<ApiResponse> {
663        self.unary_call("GetFileDescendants", request).await
664    }
665
666    /// Get file content
667    pub async fn get_file_content(&self, request: serde_json::Value) -> Result<ApiResponse> {
668        self.unary_call("GetFileContent", request).await
669    }
670
671    /// Set file content
672    pub async fn set_file_content(&self, request: serde_json::Value) -> Result<ApiResponse> {
673        self.unary_call("SetFileContent", request).await
674    }
675
676    // ========== Database API ==========
677
678    /// Create a database
679    pub async fn create_database(&self, request: serde_json::Value) -> Result<ApiResponse> {
680        self.unary_call("CreateDatabase", request).await
681    }
682
683    /// Bulk update
684    pub async fn bulk_update(&self, request: serde_json::Value) -> Result<ApiResponse> {
685        self.unary_call("BulkUpdate", request).await
686    }
687
688    // ========== Internal Methods ==========
689
690    /// Get authentication metadata for relay connections
691    /// Returns None if not using relay or if credential is not available
692    fn get_relay_auth_metadata(&self) -> Option<tonic::metadata::MetadataMap> {
693        tracing::info!("get_relay_auth_metadata: is_relayed={}", self.is_relayed());
694
695        if !self.is_relayed() {
696            tracing::info!("Not relayed, skipping auth metadata");
697            return None;
698        }
699
700        let credential = self.credential.as_ref();
701        if credential.is_none() {
702            tracing::info!("No credential available");
703            return None;
704        }
705        let credential = credential.unwrap();
706
707        let volt_id = self
708            .volt_config
709            .as_ref()
710            .map(|c| c.id.as_str())
711            .unwrap_or("");
712        tracing::info!("Creating auth metadata for volt_id: {}", volt_id);
713
714        match credential.get_identity_metadata(Some(volt_id), None, true, 60) {
715            Ok(identity) => {
716                tracing::info!("Created relay authentication metadata successfully");
717                Some(identity.metadata)
718            }
719            Err(e) => {
720                tracing::warn!("Failed to create relay auth metadata: {}", e);
721                None
722            }
723        }
724    }
725
726    /// Add authentication metadata to a tonic request
727    fn add_auth_to_request<T>(&self, request: &mut tonic::Request<T>) {
728        if let Some(metadata) = self.get_relay_auth_metadata() {
729            let req_metadata = request.metadata_mut();
730            for key_and_value in metadata.iter() {
731                match key_and_value {
732                    tonic::metadata::KeyAndValueRef::Ascii(key, value) => {
733                        tracing::info!("Adding metadata: {}={:?}", key, value);
734                        req_metadata.insert(key, value.clone());
735                    }
736                    tonic::metadata::KeyAndValueRef::Binary(key, value) => {
737                        tracing::info!("Adding binary metadata: {}", key);
738                        req_metadata.insert_bin(key, value.clone());
739                    }
740                }
741            }
742        }
743    }
744
745    /// Get or create a gRPC channel to the Volt server
746    async fn get_channel(&self) -> Result<Channel> {
747        // Check if we have a cached channel - but NOT for relayed connections
748        // For relay, each call should establish its own connection to avoid
749        // issues with stream multiplexing
750        if !self.is_relayed() {
751            let cached = self.cached_channel.lock().await;
752            if let Some(channel) = cached.as_ref() {
753                return Ok(channel.clone());
754            }
755        }
756
757        // Get config
758        let volt_config = self.volt_config.as_ref().ok_or(VoltError::NotInitialized)?;
759        let credential = self.credential.as_ref().ok_or(VoltError::NotInitialized)?;
760
761        // Determine address and CA certificate based on whether we're using relay
762        let is_relayed = self.is_relayed();
763        let (address, ca_pem) = if is_relayed {
764            let relay = volt_config
765                .relay
766                .as_ref()
767                .ok_or_else(|| VoltError::missing_config("volt.relay"))?;
768            tracing::info!("Using relay connection to: {}", relay.address);
769            (relay.address.clone(), relay.ca_pem.clone())
770        } else {
771            let addr = volt_config
772                .address
773                .as_ref()
774                .ok_or_else(|| VoltError::missing_config("volt.address"))?;
775            tracing::info!("Using direct connection to: {}", addr);
776            (addr.clone(), volt_config.ca_pem.clone())
777        };
778
779        // Build URI - support both http and https
780        // Use https when we have a CA certificate
781        let uri = if address.starts_with("http://") || address.starts_with("https://") {
782            address.clone()
783        } else if ca_pem.is_some() {
784            format!("https://{}", address)
785        } else {
786            format!("http://{}", address)
787        };
788
789        tracing::info!("Connecting to URI: {}", uri);
790        tracing::info!(
791            "has CA cert: {}, is_relayed: {}",
792            ca_pem.is_some(),
793            is_relayed
794        );
795
796        // Create channel builder
797        let mut endpoint = Channel::from_shared(uri.clone())
798            .map_err(|e| VoltError::ConnectionError(format!("Invalid URI: {}", e)))?;
799
800        // Configure HTTP/2 settings
801        // Use TCP-level keepalive instead of aggressive HTTP/2 pings
802        // This is more compatible with servers that have strict ping limits
803        endpoint = endpoint
804            // Use TCP keepalive for connection liveness detection
805            .tcp_keepalive(Some(std::time::Duration::from_secs(60)))
806            // Don't set TCP_NODELAY to allow Nagle's algorithm to batch small writes
807            .tcp_nodelay(true)
808            // Use adaptive flow control window sizing
809            .http2_adaptive_window(true)
810            // Increase initial window sizes for better throughput
811            .initial_connection_window_size(1024 * 1024) // 1MB
812            .initial_stream_window_size(1024 * 1024) // 1MB
813            // Set connection timeout
814            .connect_timeout(std::time::Duration::from_secs(30));
815        // NOTE: Don't set .timeout() - this would apply to ALL requests
816        // and cause long-running streams to timeout. Let individual
817        // requests handle their own timeouts if needed.
818
819        // Configure TLS if we have a CA certificate
820        if let Some(ca_pem) = &ca_pem {
821            tracing::info!("Configuring TLS with CA certificate (len={})", ca_pem.len());
822            let ca = Certificate::from_pem(ca_pem);
823            let mut tls_config = ClientTlsConfig::new().ca_certificate(ca);
824
825            // Only override the domain for relay connections (relay certs are issued to coreid.com)
826            if is_relayed {
827                tls_config = tls_config.domain_name("coreid.com");
828            }
829
830            // Add client certificate if we have one (mTLS) - but NOT for relay connections
831            // Relay connections don't use client certificates, only the relay CA
832            if !is_relayed {
833                if let (Some(cert_pem), Some(key_pem)) = (
834                    credential.cache().and_then(|c| c.cert.as_ref()),
835                    credential.cache().and_then(|c| c.key.as_ref()),
836                ) {
837                    let identity = Identity::from_pem(cert_pem.as_bytes(), key_pem.as_bytes());
838                    tls_config = tls_config.identity(identity);
839                }
840            }
841
842            endpoint = endpoint
843                .tls_config(tls_config)
844                .map_err(|e| VoltError::ConnectionError(format!("TLS config error: {}", e)))?;
845        }
846
847        // For relay connections, use lazy connection so we don't establish
848        // the TCP connection until we actually need it
849        let channel = if is_relayed {
850            // Try eager connection for relay to see if that helps with streaming
851            tracing::info!("Eagerly connecting to relay...");
852            endpoint.connect().await.map_err(|e| {
853                VoltError::ConnectionError(format!("Failed to connect to relay {}: {}", uri, e))
854            })?
855        } else {
856            // Connect eagerly for direct connections
857            endpoint.connect().await.map_err(|e| {
858                VoltError::ConnectionError(format!("Failed to connect to {}: {}", uri, e))
859            })?
860        };
861
862        // Cache the channel
863        {
864            let mut cached = self.cached_channel.lock().await;
865            *cached = Some(channel.clone());
866        }
867
868        Ok(channel)
869    }
870
871    /// Get a VoltApiClient
872    async fn get_volt_client(&self) -> Result<volt::volt_api_client::VoltApiClient<Channel>> {
873        let channel = self.get_channel().await?;
874        // For relay connections, don't request compression as it may not be supported
875        let client = if self.is_relayed() {
876            volt::volt_api_client::VoltApiClient::new(channel)
877        } else {
878            // Accept all supported compression encodings including deflate
879            volt::volt_api_client::VoltApiClient::new(channel)
880                .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
881                .accept_compressed(tonic::codec::CompressionEncoding::Zstd)
882                .accept_compressed(tonic::codec::CompressionEncoding::Deflate)
883        };
884        Ok(client)
885    }
886
887    /// Get the Sync API client
888    async fn get_sync_client(&self) -> Result<volt::sync_api_client::SyncApiClient<Channel>> {
889        let channel = self.get_channel().await?;
890        // Accept all supported compression encodings including deflate
891        let client = volt::sync_api_client::SyncApiClient::new(channel)
892            .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
893            .accept_compressed(tonic::codec::CompressionEncoding::Zstd)
894            .accept_compressed(tonic::codec::CompressionEncoding::Deflate);
895        Ok(client)
896    }
897
898    /// Establish a relay connection with key exchange
899    ///
900    /// This opens the `Invoke` bidirectional stream and performs the key exchange
901    /// protocol to establish an encrypted channel to the target Volt.
902    async fn establish_relay_connection(&self) -> Result<()> {
903        if !self.is_relayed() {
904            tracing::debug!("Not a relayed connection, skipping relay setup");
905            return Ok(());
906        }
907
908        // Check if already connected
909        {
910            let ctx_guard = self.relay_context.lock().await;
911            if let Some(ctx) = ctx_guard.as_ref() {
912                if ctx.state() == &RelayState::Connected {
913                    tracing::debug!("Relay already connected");
914                    return Ok(());
915                }
916            }
917        }
918
919        tracing::info!("Establishing relay connection...");
920
921        // Get configuration
922        let volt_config = self.volt_config.as_ref().ok_or(VoltError::NotInitialized)?;
923        let credential = self.credential.as_ref().ok_or(VoltError::NotInitialized)?;
924        let target_did = &volt_config.id;
925
926        // Create relay context
927        let mut relay_ctx = RelayContext::new(target_did.clone());
928
929        // Set the target's public key for signature verification (if available)
930        if let Some(public_key_pem) = &volt_config.public_key {
931            if let Ok(pk_bytes) =
932                crate::crypto::from_base64(&crate::crypto::strip_pem_headers(public_key_pem))
933            {
934                relay_ctx.set_target_public_key(pk_bytes);
935            }
936        }
937
938        // Get JWT token for authentication, including our relay public key so the server can derive the shared secret
939        let relay_public_key_b64 = relay_ctx.public_key_base64();
940        let identity = credential.get_identity_metadata(
941            Some(target_did),
942            Some(&relay_public_key_b64),
943            true,
944            60,
945        )?;
946        let token = identity.token.clone();
947
948        // Create the key exchange request
949        let key_exchange_request = relay_ctx.create_key_exchange_request(&token);
950        tracing::debug!(
951            "Created key exchange request with invoke_id: {}",
952            key_exchange_request.invoke_id
953        );
954
955        // Get VoltApiClient to call Invoke
956        let mut volt_client = self.get_volt_client().await?;
957
958        // Create channel for sending invoke requests
959        let (invoke_tx, invoke_rx) = mpsc::channel::<volt::InvokeRequest>(100);
960
961        // Send the key exchange request
962        invoke_tx
963            .send(key_exchange_request)
964            .await
965            .map_err(|_| VoltError::connection("Failed to send key exchange request"))?;
966
967        // Create the invoke stream
968        let request_stream = ReceiverStream::new(invoke_rx);
969        let mut request = tonic::Request::new(request_stream);
970
971        // Attach the auth metadata generated alongside the relay key material
972        let req_metadata = request.metadata_mut();
973        for key_and_value in identity.metadata.iter() {
974            match key_and_value {
975                tonic::metadata::KeyAndValueRef::Ascii(key, value) => {
976                    req_metadata.insert(key, value.clone());
977                }
978                tonic::metadata::KeyAndValueRef::Binary(key, value) => {
979                    req_metadata.insert_bin(key, value.clone());
980                }
981            }
982        }
983
984        // Start the Invoke bidirectional stream
985        tracing::debug!("establish_relay: Starting Invoke stream...");
986        let response = volt_client.invoke(request).await.map_err(|e| {
987            tracing::error!(
988                "establish_relay: Invoke stream error: code={:?}, msg={}",
989                e.code(),
990                e.message()
991            );
992            VoltError::connection(format!("Failed to start Invoke stream: {}", e))
993        })?;
994
995        let mut invoke_response_stream = response.into_inner();
996
997        // Wait for the key exchange response
998        tracing::debug!("Waiting for key exchange response...");
999        let key_exchange_response = invoke_response_stream
1000            .message()
1001            .await
1002            .map_err(|e| {
1003                VoltError::connection(format!("Failed to receive key exchange response: {}", e))
1004            })?
1005            .ok_or_else(|| VoltError::connection("Invoke stream closed before key exchange"))?;
1006
1007        tracing::debug!(
1008            "Received key exchange response: invoke_id={}",
1009            key_exchange_response.invoke_id
1010        );
1011
1012        // Process the key exchange
1013        if let Some(key_exchange) = &key_exchange_response.key_exchange {
1014            relay_ctx.process_key_exchange_response(key_exchange)?;
1015            tracing::info!("Relay key exchange completed successfully");
1016        } else if let Some(volt::invoke_response::ResponsePayload::Status(status)) =
1017            &key_exchange_response.response_payload
1018        {
1019            return Err(VoltError::server(status.code, &status.message));
1020        } else {
1021            return Err(VoltError::protocol(
1022                "Expected key exchange response, got something else",
1023            ));
1024        }
1025
1026        // Store the relay state
1027        *self.relay_context.lock().await = Some(relay_ctx);
1028        *self.relay_invoke_tx.lock().await = Some(invoke_tx);
1029        *self.relay_invoke_rx.lock().await = Some(invoke_response_stream);
1030
1031        tracing::info!("Relay connection established");
1032        Ok(())
1033    }
1034
1035    /// Check if relay connection is established
1036    pub async fn is_relay_connected(&self) -> bool {
1037        let ctx_guard = self.relay_context.lock().await;
1038        ctx_guard
1039            .as_ref()
1040            .map(|ctx| ctx.state() == &RelayState::Connected)
1041            .unwrap_or(false)
1042    }
1043
1044    /// Create a bidirectional streaming connection for document synchronization
1045    ///
1046    /// This method creates a bidirectional stream for syncing CRDT documents using Y.js/yrs.
1047    /// The stream allows sending updates to the server and receiving updates from other clients.
1048    ///
1049    /// # Returns
1050    /// A tuple of:
1051    /// - `mpsc::Sender<volt::SyncDocumentRequest>` - Use this to send requests to the server
1052    /// - `Streaming<volt::SyncDocumentResponse>` - Use this to receive responses from the server
1053    ///
1054    /// # Example
1055    /// ```ignore
1056    /// let (tx, mut rx) = client.sync_document().await?;
1057    ///
1058    /// // Send initial sync request
1059    /// tx.send(volt::SyncDocumentRequest {
1060    ///     sync_start: Some(volt::SyncDocumentStart {
1061    ///         database_id: "my-db".to_string(),
1062    ///         document_id: "my-doc".to_string(),
1063    ///         state_vector: vec![],
1064    ///         read_only: false,
1065    ///         read_only_fallback: false,
1066    ///     }),
1067    ///     ..Default::default()
1068    /// }).await?;
1069    ///
1070    /// // Receive responses
1071    /// while let Some(response) = rx.next().await {
1072    ///     match response {
1073    ///         Ok(resp) => println!("Received: {:?}", resp),
1074    ///         Err(e) => eprintln!("Error: {:?}", e),
1075    ///     }
1076    /// }
1077    /// ```
1078    pub async fn sync_document(
1079        &self,
1080        initial_request: volt::SyncDocumentRequest,
1081    ) -> Result<(mpsc::Sender<volt::SyncDocumentRequest>, SyncDocumentStream)> {
1082        // For relayed connections, use the relay protocol
1083        if self.is_relayed() {
1084            return self.sync_document_via_relay(initial_request).await;
1085        }
1086
1087        // Direct connection path
1088        let mut client = self.get_sync_client().await?;
1089
1090        // Create a channel for sending requests
1091        let (tx, rx) = mpsc::channel::<volt::SyncDocumentRequest>(100);
1092
1093        // Send the initial request before creating the stream
1094        tx.send(initial_request)
1095            .await
1096            .map_err(|_| VoltError::ConnectionError("Failed to send initial request".into()))?;
1097
1098        // Convert the receiver into a stream
1099        let request_stream = ReceiverStream::new(rx);
1100
1101        // Build the request, adding authentication metadata if using relay
1102        let mut request = tonic::Request::new(request_stream);
1103        self.add_auth_to_request(&mut request);
1104
1105        // Make the bidirectional streaming call
1106        let response = client
1107            .sync_document(request)
1108            .await
1109            .map_err(VoltError::GrpcError)?;
1110
1111        let response_stream = response.into_inner();
1112
1113        Ok((tx, SyncDocumentStream::Direct(response_stream)))
1114    }
1115
1116    /// Sync document through the relay protocol
1117    ///
1118    /// This wraps the SyncDocument stream in the Invoke stream with encryption.
1119    async fn sync_document_via_relay(
1120        &self,
1121        initial_request: volt::SyncDocumentRequest,
1122    ) -> Result<(mpsc::Sender<volt::SyncDocumentRequest>, SyncDocumentStream)> {
1123        tracing::info!("Using relay protocol for sync_document");
1124
1125        // Establish relay connection if not already done
1126        self.establish_relay_connection().await?;
1127
1128        // Get the relay context
1129        let relay_ctx = self.relay_context.lock().await;
1130        let ctx = relay_ctx
1131            .as_ref()
1132            .ok_or_else(|| VoltError::protocol("Relay context not initialized"))?;
1133
1134        if ctx.state() != &RelayState::Connected {
1135            return Err(VoltError::protocol("Relay not in Connected state"));
1136        }
1137
1138        // Get service ID from config (defaults to "tdx.volt_api.volt.v1.SyncAPI")
1139        let service_id = "tdx.volt_api.volt.v1.SyncAPI".to_string();
1140
1141        // Get the invoke_id for this call
1142        let invoke_id = ctx.next_invoke_id();
1143
1144        drop(relay_ctx);
1145
1146        // Create channels for the user-facing interface
1147        let (user_tx, mut user_rx) = mpsc::channel::<volt::SyncDocumentRequest>(100);
1148        let (response_tx, response_rx) =
1149            mpsc::channel::<std::result::Result<volt::SyncDocumentResponse, tonic::Status>>(100);
1150
1151        // Create a fake streaming response using a channel
1152        // Note: This is a bit of a hack - we're creating our own stream
1153        let response_stream = tokio_stream::wrappers::ReceiverStream::new(response_rx);
1154
1155        // Get the invoke stream sender
1156        let invoke_tx_guard = self.relay_invoke_tx.lock().await;
1157        let invoke_tx = invoke_tx_guard
1158            .as_ref()
1159            .ok_or_else(|| VoltError::protocol("Invoke stream not initialized"))?
1160            .clone();
1161        drop(invoke_tx_guard);
1162
1163        // Clone what we need for the background task
1164        let relay_context = self.relay_context.clone();
1165        let relay_invoke_rx = self.relay_invoke_rx.clone();
1166
1167        // Spawn a task to handle the relay wrapping
1168        tokio::spawn(async move {
1169            use prost::Message;
1170
1171            // Send the initial request
1172            {
1173                let ctx_guard = relay_context.lock().await;
1174                let ctx = match ctx_guard.as_ref() {
1175                    Some(c) => c,
1176                    None => {
1177                        tracing::error!("Relay context disappeared");
1178                        return;
1179                    }
1180                };
1181
1182                // Serialize the initial request
1183                let mut request_bytes = Vec::new();
1184                if let Err(e) = initial_request.encode(&mut request_bytes) {
1185                    tracing::error!("Failed to encode initial request: {}", e);
1186                    return;
1187                }
1188
1189                // Create MethodInvoke to start the SyncDocument stream
1190                let remote_response = ctx.create_method_invoke(
1191                    invoke_id,
1192                    &service_id,
1193                    "SyncDocument",
1194                    volt::MethodType::Bidi,
1195                    request_bytes,
1196                );
1197
1198                // Encrypt and send
1199                match ctx.create_encrypted_request(invoke_id, &remote_response) {
1200                    Ok(invoke_req) => {
1201                        if let Err(e) = invoke_tx.send(invoke_req).await {
1202                            tracing::error!("Failed to send initial invoke request: {}", e);
1203                            return;
1204                        }
1205                    }
1206                    Err(e) => {
1207                        tracing::error!("Failed to create encrypted request: {}", e);
1208                        return;
1209                    }
1210                }
1211            }
1212
1213            // Spawn a task to forward user requests to the invoke stream
1214            let invoke_tx_clone = invoke_tx.clone();
1215            let relay_context_clone = relay_context.clone();
1216            let forward_task = tokio::spawn(async move {
1217                while let Some(sync_req) = user_rx.recv().await {
1218                    let ctx_guard = relay_context_clone.lock().await;
1219                    let ctx = match ctx_guard.as_ref() {
1220                        Some(c) => c,
1221                        None => break,
1222                    };
1223
1224                    // Serialize the request
1225                    let mut request_bytes = Vec::new();
1226                    if let Err(e) = sync_req.encode(&mut request_bytes) {
1227                        tracing::error!("Failed to encode sync request: {}", e);
1228                        continue;
1229                    }
1230
1231                    // Wrap in MethodPayload
1232                    let remote_response = ctx.create_method_payload(invoke_id, request_bytes);
1233
1234                    // Encrypt and send
1235                    match ctx.create_encrypted_request(invoke_id, &remote_response) {
1236                        Ok(invoke_req) => {
1237                            drop(ctx_guard);
1238                            if let Err(e) = invoke_tx_clone.send(invoke_req).await {
1239                                tracing::error!("Failed to send invoke request: {}", e);
1240                                break;
1241                            }
1242                        }
1243                        Err(e) => {
1244                            tracing::error!("Failed to create encrypted request: {}", e);
1245                        }
1246                    }
1247                }
1248            });
1249
1250            // Process responses from the invoke stream
1251            loop {
1252                let mut rx_guard = relay_invoke_rx.lock().await;
1253                let rx = match rx_guard.as_mut() {
1254                    Some(r) => r,
1255                    None => break,
1256                };
1257
1258                let response = match rx.message().await {
1259                    Ok(Some(resp)) => resp,
1260                    Ok(None) => {
1261                        tracing::debug!("Invoke stream ended");
1262                        break;
1263                    }
1264                    Err(e) => {
1265                        tracing::error!("Error receiving invoke response: {}", e);
1266                        let _ = response_tx.send(Err(e)).await;
1267                        break;
1268                    }
1269                };
1270                drop(rx_guard);
1271
1272                // Parse and decrypt the response
1273                let mut ctx_guard = relay_context.lock().await;
1274                let ctx = match ctx_guard.as_mut() {
1275                    Some(c) => c,
1276                    None => break,
1277                };
1278
1279                let remote_request = match ctx.parse_response(&response) {
1280                    Ok(Some(req)) => req,
1281                    Ok(None) => continue, // Key exchange or other non-payload response
1282                    Err(e) => {
1283                        tracing::error!("Failed to parse response: {}", e);
1284                        continue;
1285                    }
1286                };
1287                drop(ctx_guard);
1288
1289                // Extract the SyncDocumentResponse from the RemoteRequest
1290                match remote_request.payload {
1291                    Some(volt::remote_request::Payload::MethodPayload(mp)) => {
1292                        let payload = match mp.method_payload {
1293                            Some(volt::method_payload::MethodPayload::Payload(p)) => p,
1294                            Some(volt::method_payload::MethodPayload::JsonPayload(j)) => {
1295                                j.into_bytes()
1296                            }
1297                            None => continue,
1298                        };
1299
1300                        match volt::SyncDocumentResponse::decode(payload.as_slice()) {
1301                            Ok(sync_resp) => {
1302                                if let Err(e) = response_tx.send(Ok(sync_resp)).await {
1303                                    tracing::error!("Failed to forward response: {}", e);
1304                                    break;
1305                                }
1306                            }
1307                            Err(e) => {
1308                                tracing::error!("Failed to decode SyncDocumentResponse: {}", e);
1309                            }
1310                        }
1311                    }
1312                    Some(volt::remote_request::Payload::MethodEnd(end)) => {
1313                        tracing::debug!("Received MethodEnd for invoke_id={}", end.id);
1314                        if end.error_code != 0 {
1315                            let _ = response_tx
1316                                .send(Err(tonic::Status::new(
1317                                    tonic::Code::from_i32(end.error_code),
1318                                    &end.error,
1319                                )))
1320                                .await;
1321                        }
1322                        break;
1323                    }
1324                    _ => {}
1325                }
1326            }
1327
1328            forward_task.abort();
1329        });
1330
1331        // Wrap the channel receiver in our SyncDocumentStream wrapper
1332        let relay_stream = SyncDocumentStream::Relay(response_stream);
1333
1334        Ok((user_tx, relay_stream))
1335    }
1336
1337    /// Make a unary call through the relay protocol
1338    ///
1339    /// This opens a new Invoke stream for each call, performs key exchange,
1340    /// and sends the encrypted request. This matches the JS implementation
1341    /// where each GRPCCall gets its own stream.
1342    async fn unary_call_via_relay(
1343        &self,
1344        method: &str,
1345        request_json: serde_json::Value,
1346    ) -> Result<ApiResponse> {
1347        use crate::proto::volt::{method_payload, remote_request, MethodType};
1348        use crate::relay::RelayContext;
1349
1350        tracing::info!("Making relay call for method: {}", method);
1351
1352        let request_json = normalize_payload(request_json);
1353
1354        // Get configuration
1355        let volt_config = self.volt_config.as_ref().ok_or(VoltError::NotInitialized)?;
1356        let credential = self.credential.as_ref().ok_or(VoltError::NotInitialized)?;
1357        let target_did = &volt_config.id;
1358
1359        // Create relay context for this call
1360        let mut relay_ctx = RelayContext::new(target_did.clone());
1361
1362        // Set the target's public key for signature verification (if available)
1363        if let Some(public_key_pem) = &volt_config.public_key {
1364            if let Ok(pk_bytes) =
1365                crate::crypto::from_base64(&crate::crypto::strip_pem_headers(public_key_pem))
1366            {
1367                relay_ctx.set_target_public_key(pk_bytes);
1368            }
1369        }
1370
1371        // Get JWT token for authentication, injecting our relay public key so the server can derive the shared secret
1372        let relay_public_key_b64 = relay_ctx.public_key_base64();
1373        let identity = credential.get_identity_metadata(
1374            Some(target_did),
1375            Some(&relay_public_key_b64),
1376            true,
1377            60,
1378        )?;
1379        let token = identity.token.clone();
1380
1381        // Create the key exchange request
1382        let key_exchange_request = relay_ctx.create_key_exchange_request(&token);
1383        let invoke_id = key_exchange_request.invoke_id;
1384        tracing::info!(
1385            "Created key exchange request: invoke_id={}, target_did={:?}, token_len={}, iv_len={}",
1386            invoke_id,
1387            key_exchange_request.target_did,
1388            key_exchange_request.token.len(),
1389            key_exchange_request.iv.len()
1390        );
1391
1392        // Get VoltApiClient to call Invoke
1393        let mut volt_client = self.get_volt_client().await?;
1394        tracing::info!("Got VoltApiClient successfully");
1395
1396        // Create channel for sending invoke requests
1397        let (invoke_tx, invoke_rx) = mpsc::channel::<volt::InvokeRequest>(10);
1398        tracing::info!("Created mpsc channel");
1399
1400        // Send the key exchange request
1401        invoke_tx
1402            .send(key_exchange_request)
1403            .await
1404            .map_err(|_| VoltError::connection("Failed to send key exchange request"))?;
1405        tracing::info!("Sent key exchange request to channel");
1406
1407        // Create the invoke stream
1408        let request_stream = ReceiverStream::new(invoke_rx);
1409        let mut request = tonic::Request::new(request_stream);
1410        tracing::info!("Created request stream");
1411
1412        // Attach the auth metadata created alongside the relay key so Invoke sees the same JWT the relay expects
1413        {
1414            let req_metadata = request.metadata_mut();
1415            for key_and_value in identity.metadata.iter() {
1416                match key_and_value {
1417                    tonic::metadata::KeyAndValueRef::Ascii(key, value) => {
1418                        tracing::info!("Adding metadata: {}={:?}", key, value);
1419                        req_metadata.insert(key, value.clone());
1420                    }
1421                    tonic::metadata::KeyAndValueRef::Binary(key, value) => {
1422                        tracing::info!("Adding binary metadata: {}", key);
1423                        req_metadata.insert_bin(key, value.clone());
1424                    }
1425                }
1426            }
1427        }
1428        tracing::info!("Added auth metadata to request");
1429
1430        // Debug: print all metadata in the request
1431        tracing::info!("Request metadata contents:");
1432        for key_and_value in request.metadata().iter() {
1433            match key_and_value {
1434                tonic::metadata::KeyAndValueRef::Ascii(key, value) => {
1435                    tracing::info!("  {}={:?}", key, value);
1436                }
1437                tonic::metadata::KeyAndValueRef::Binary(key, _value) => {
1438                    tracing::info!("  {} (binary)", key);
1439                }
1440            }
1441        }
1442
1443        // Start the Invoke bidirectional stream
1444        // NOTE: This currently fails with ConnectionReset when using tonic against the relay server.
1445        // Unary gRPC calls work fine, but bidirectional streaming fails specifically.
1446        // The JavaScript client using @grpc/grpc-js works, suggesting a tonic-specific issue.
1447        tracing::info!("unary_call_via_relay: Calling invoke()...");
1448        let response = volt_client.invoke(request).await.map_err(|e| {
1449            tracing::error!("unary_call_via_relay: invoke() failed: {:?}", e);
1450            VoltError::connection(format!("Failed to start Invoke stream: {}", e))
1451        })?;
1452        tracing::info!("unary_call_via_relay: invoke() succeeded!");
1453
1454        let mut invoke_response_stream = response.into_inner();
1455
1456        // Wait for the key exchange response
1457        tracing::debug!("Waiting for key exchange response...");
1458        let key_exchange_response = invoke_response_stream
1459            .message()
1460            .await
1461            .map_err(|e| {
1462                VoltError::connection(format!("Failed to receive key exchange response: {}", e))
1463            })?
1464            .ok_or_else(|| VoltError::connection("Invoke stream closed before key exchange"))?;
1465
1466        tracing::debug!(
1467            "Received key exchange response: invoke_id={}",
1468            key_exchange_response.invoke_id
1469        );
1470
1471        // Process the key exchange
1472        if let Some(key_exchange) = &key_exchange_response.key_exchange {
1473            relay_ctx.process_key_exchange_response(key_exchange)?;
1474            tracing::info!("Relay key exchange completed successfully");
1475        } else if let Some(volt::invoke_response::ResponsePayload::Status(status)) =
1476            &key_exchange_response.response_payload
1477        {
1478            return Err(VoltError::server(status.code, &status.message));
1479        } else {
1480            return Err(VoltError::protocol(
1481                "Expected key exchange response, got something else",
1482            ));
1483        }
1484
1485        // Now we have encryption established, serialize and send the actual request
1486        let service_id = "tdx.volt_api.volt.v1.VoltAPI";
1487
1488        // Serialize the request based on method
1489        // We need to serialize to protobuf bytes
1490        let request_bytes = self.serialize_request(method, &request_json)?;
1491
1492        // Create the method invoke
1493        let remote_response = relay_ctx.create_method_invoke(
1494            invoke_id,
1495            service_id,
1496            method,
1497            MethodType::Unary,
1498            request_bytes,
1499        );
1500
1501        // Encrypt and send
1502        let invoke_request = relay_ctx.create_encrypted_request(invoke_id, &remote_response)?;
1503        invoke_tx
1504            .send(invoke_request)
1505            .await
1506            .map_err(|e| VoltError::connection(format!("Failed to send method invoke: {}", e)))?;
1507
1508        // Receive the method response
1509        let response = invoke_response_stream
1510            .message()
1511            .await
1512            .map_err(|e| VoltError::grpc(e.code(), e.message()))?
1513            .ok_or_else(|| VoltError::connection("Stream closed before response"))?;
1514
1515        // Parse and decrypt the response
1516        let remote_request = relay_ctx
1517            .parse_response(&response)?
1518            .ok_or_else(|| VoltError::protocol("Expected payload response"))?;
1519
1520        // Extract the method payload and deserialize
1521        match remote_request.payload {
1522            Some(remote_request::Payload::MethodPayload(mp)) => {
1523                let payload = match mp.method_payload {
1524                    Some(method_payload::MethodPayload::Payload(p)) => p,
1525                    Some(method_payload::MethodPayload::JsonPayload(j)) => j.into_bytes(),
1526                    None => return Err(VoltError::protocol("Empty method payload")),
1527                };
1528
1529                // Deserialize the response
1530                let response_json = self.deserialize_response(method, &payload)?;
1531
1532                // Extract status if present
1533                let status = response_json
1534                    .get("status")
1535                    .and_then(|s| serde_json::from_value(s.clone()).ok());
1536
1537                Ok(ApiResponse {
1538                    status,
1539                    payload: response_json,
1540                })
1541            }
1542            Some(remote_request::Payload::MethodEnd(end)) => {
1543                if end.error_code != 0 || !end.error.is_empty() {
1544                    Err(VoltError::server(end.error_code, &end.error))
1545                } else {
1546                    Err(VoltError::protocol("Received MethodEnd without response"))
1547                }
1548            }
1549            _ => Err(VoltError::protocol("Expected MethodPayload response")),
1550        }
1551    }
1552
1553    /// Serialize a request to protobuf bytes
1554    fn serialize_request(&self, method: &str, request: &serde_json::Value) -> Result<Vec<u8>> {
1555        use prost::Message;
1556
1557        match method {
1558            "GetResource" => {
1559                let req: volt::GetResourceRequest = serde_json::from_value(request.clone())?;
1560                let mut buf = Vec::new();
1561                req.encode(&mut buf)
1562                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1563                Ok(buf)
1564            }
1565            "GetResources" => {
1566                let req: volt::GetResourcesRequest = serde_json::from_value(request.clone())?;
1567                let mut buf = Vec::new();
1568                req.encode(&mut buf)
1569                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1570                Ok(buf)
1571            }
1572            "CanAccessResource" => {
1573                let req: volt::CanAccessResourceRequest = serde_json::from_value(request.clone())?;
1574                let mut buf = Vec::new();
1575                req.encode(&mut buf)
1576                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1577                Ok(buf)
1578            }
1579            "DeleteResource" => {
1580                let req: volt::DeleteResourceRequest = serde_json::from_value(request.clone())?;
1581                let mut buf = Vec::new();
1582                req.encode(&mut buf)
1583                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1584                Ok(buf)
1585            }
1586            "SaveResource" => {
1587                let req: volt::SaveResourceRequest = serde_json::from_value(request.clone())?;
1588                let mut buf = Vec::new();
1589                req.encode(&mut buf)
1590                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1591                Ok(buf)
1592            }
1593            "RequestAccess" => {
1594                let req: volt::RequestAccessRequest = serde_json::from_value(request.clone())?;
1595                let mut buf = Vec::new();
1596                req.encode(&mut buf)
1597                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1598                Ok(buf)
1599            }
1600            "Authenticate" => {
1601                let req: volt::AuthenticateRequest = serde_json::from_value(request.clone())?;
1602                let mut buf = Vec::new();
1603                req.encode(&mut buf)
1604                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1605                Ok(buf)
1606            }
1607            "DiscoverServices" => {
1608                let req: volt::DiscoverServicesRequest = serde_json::from_value(request.clone())?;
1609                let mut buf = Vec::new();
1610                req.encode(&mut buf)
1611                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1612                Ok(buf)
1613            }
1614            "GetResourceAncestors" => {
1615                let req: volt::GetResourceAncestorsRequest =
1616                    serde_json::from_value(request.clone())?;
1617                let mut buf = Vec::new();
1618                req.encode(&mut buf)
1619                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1620                Ok(buf)
1621            }
1622            "GetResourceDescendants" => {
1623                let req: volt::GetResourceDescendantsRequest =
1624                    serde_json::from_value(request.clone())?;
1625                let mut buf = Vec::new();
1626                req.encode(&mut buf)
1627                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1628                Ok(buf)
1629            }
1630            "SaveAccess" => {
1631                let req: volt::SaveAccessRequest = serde_json::from_value(request.clone())?;
1632                let mut buf = Vec::new();
1633                req.encode(&mut buf)
1634                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1635                Ok(buf)
1636            }
1637            "GetIdentities" => {
1638                let req: volt::GetIdentitiesRequest = serde_json::from_value(request.clone())?;
1639                let mut buf = Vec::new();
1640                req.encode(&mut buf)
1641                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1642                Ok(buf)
1643            }
1644            "GetIdentity" => {
1645                let req: volt::GetIdentityRequest = serde_json::from_value(request.clone())?;
1646                let mut buf = Vec::new();
1647                req.encode(&mut buf)
1648                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1649                Ok(buf)
1650            }
1651            "GetPolicy" => {
1652                let req: volt::GetPolicyRequest = serde_json::from_value(request.clone())?;
1653                let mut buf = Vec::new();
1654                req.encode(&mut buf)
1655                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1656                Ok(buf)
1657            }
1658            "GetAccess" => {
1659                let req: volt::GetAccessRequest = serde_json::from_value(request.clone())?;
1660                let mut buf = Vec::new();
1661                req.encode(&mut buf)
1662                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1663                Ok(buf)
1664            }
1665            "SaveIdentity" => {
1666                let req: volt::SaveIdentityRequest = serde_json::from_value(request.clone())?;
1667                let mut buf = Vec::new();
1668                req.encode(&mut buf)
1669                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1670                Ok(buf)
1671            }
1672            "Shutdown" => {
1673                let req: volt::ShutdownRequest = serde_json::from_value(request.clone())?;
1674                let mut buf = Vec::new();
1675                req.encode(&mut buf)
1676                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1677                Ok(buf)
1678            }
1679            "GetFileDescendants" => {
1680                let req: volt::GetFileDescendantsRequest = serde_json::from_value(request.clone())?;
1681                let mut buf = Vec::new();
1682                req.encode(&mut buf)
1683                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1684                Ok(buf)
1685            }
1686            "GetFileContent" => {
1687                let req: volt::GetFileContentRequest = serde_json::from_value(request.clone())?;
1688                let mut buf = Vec::new();
1689                req.encode(&mut buf)
1690                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1691                Ok(buf)
1692            }
1693            "SetFileContent" => {
1694                let req: volt::SetFileContentRequest = serde_json::from_value(request.clone())?;
1695                let mut buf = Vec::new();
1696                req.encode(&mut buf)
1697                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1698                Ok(buf)
1699            }
1700            _ => Err(VoltError::MethodNotFound(format!(
1701                "Unknown method: {}",
1702                method
1703            ))),
1704        }
1705    }
1706
1707    /// Deserialize a response from protobuf bytes
1708    fn deserialize_response(&self, method: &str, data: &[u8]) -> Result<serde_json::Value> {
1709        use prost::Message;
1710
1711        match method {
1712            "GetResource" => {
1713                let resp = volt::GetResourceResponse::decode(data)
1714                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1715                Ok(serde_json::to_value(&resp)?)
1716            }
1717            "GetResources" => {
1718                let resp = volt::GetResourcesResponse::decode(data)
1719                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1720                Ok(serde_json::to_value(&resp)?)
1721            }
1722            "CanAccessResource" => {
1723                let resp = volt::CanAccessResourceResponse::decode(data)
1724                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1725                Ok(serde_json::to_value(&resp)?)
1726            }
1727            "DeleteResource" => {
1728                let resp = volt::DeleteResourceResponse::decode(data)
1729                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1730                Ok(serde_json::to_value(&resp)?)
1731            }
1732            "SaveResource" => {
1733                let resp = volt::SaveResourceResponse::decode(data)
1734                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1735                Ok(serde_json::to_value(&resp)?)
1736            }
1737            "RequestAccess" => {
1738                let resp = volt::RequestAccessResponse::decode(data)
1739                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1740                Ok(serde_json::to_value(&resp)?)
1741            }
1742            "Authenticate" => {
1743                let resp = volt::AuthenticateResponse::decode(data)
1744                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1745                Ok(serde_json::to_value(&resp)?)
1746            }
1747            "DiscoverServices" => {
1748                let resp = volt::DiscoverServicesResponse::decode(data)
1749                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1750                Ok(serde_json::to_value(&resp)?)
1751            }
1752            "GetResourceAncestors" => {
1753                let resp = volt::GetResourceAncestorsResponse::decode(data)
1754                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1755                Ok(serde_json::to_value(&resp)?)
1756            }
1757            "GetResourceDescendants" => {
1758                let resp = volt::GetResourceDescendantsResponse::decode(data)
1759                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1760                Ok(serde_json::to_value(&resp)?)
1761            }
1762            "SaveAccess" => {
1763                let resp = volt::SaveAccessResponse::decode(data)
1764                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1765                Ok(serde_json::to_value(&resp)?)
1766            }
1767            "GetIdentities" => {
1768                let resp = volt::GetIdentitiesResponse::decode(data)
1769                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1770                Ok(serde_json::to_value(&resp)?)
1771            }
1772            "GetIdentity" => {
1773                let resp = volt::GetIdentityResponse::decode(data)
1774                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1775                Ok(serde_json::to_value(&resp)?)
1776            }
1777            "GetPolicy" => {
1778                let resp = volt::GetPolicyResponse::decode(data)
1779                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1780                Ok(serde_json::to_value(&resp)?)
1781            }
1782            "GetAccess" => {
1783                let resp = volt::GetAccessResponse::decode(data)
1784                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1785                Ok(serde_json::to_value(&resp)?)
1786            }
1787            "SaveIdentity" => {
1788                let resp = volt::SaveIdentityResponse::decode(data)
1789                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1790                Ok(serde_json::to_value(&resp)?)
1791            }
1792            "Shutdown" => {
1793                let resp = volt::ShutdownResponse::decode(data)
1794                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1795                Ok(serde_json::to_value(&resp)?)
1796            }
1797            "GetFileDescendants" => {
1798                let resp = volt::GetFileDescendantsResponse::decode(data)
1799                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1800                Ok(serde_json::to_value(&resp)?)
1801            }
1802            "GetFileContent" => {
1803                let resp = volt::GetFileContentResponse::decode(data)
1804                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1805                Ok(serde_json::to_value(&resp)?)
1806            }
1807            "SetFileContent" => {
1808                let resp = volt::SetFileContentResponse::decode(data)
1809                    .map_err(|e| VoltError::serialization(e.to_string()))?;
1810                Ok(serde_json::to_value(&resp)?)
1811            }
1812            _ => Err(VoltError::MethodNotFound(format!(
1813                "Unknown method: {}",
1814                method
1815            ))),
1816        }
1817    }
1818
1819    /// Make a unary gRPC call
1820    async fn unary_call(&self, method: &str, request: serde_json::Value) -> Result<ApiResponse> {
1821        tracing::debug!("Calling {} with {:?}", method, request);
1822
1823        // For relayed connections, use the relay protocol
1824        if self.is_relayed() {
1825            return self.unary_call_via_relay(method, request).await;
1826        }
1827
1828        // Get the gRPC client
1829        let mut client = self.get_volt_client().await?;
1830
1831        // Helper to create authenticated request
1832        macro_rules! auth_request {
1833            ($req:expr) => {{
1834                let mut request = tonic::Request::new($req);
1835                self.add_auth_to_request(&mut request);
1836                request
1837            }};
1838        }
1839
1840        // Dispatch to the appropriate method based on name
1841        let response_json = match method {
1842            "GetResource" => {
1843                let req: volt::GetResourceRequest = serde_json::from_value(request)?;
1844                let response = client
1845                    .get_resource(auth_request!(req))
1846                    .await
1847                    .map_err(VoltError::GrpcError)?;
1848                let resp = response.into_inner();
1849                serde_json::to_value(&resp)?
1850            }
1851            "GetResources" => {
1852                let req: volt::GetResourcesRequest = serde_json::from_value(request)?;
1853                let response = client
1854                    .get_resources(auth_request!(req))
1855                    .await
1856                    .map_err(VoltError::GrpcError)?;
1857                let resp = response.into_inner();
1858                serde_json::to_value(&resp)?
1859            }
1860            "CanAccessResource" => {
1861                let req: volt::CanAccessResourceRequest = serde_json::from_value(request)?;
1862                let response = client
1863                    .can_access_resource(auth_request!(req))
1864                    .await
1865                    .map_err(VoltError::GrpcError)?;
1866                let resp = response.into_inner();
1867                serde_json::to_value(&resp)?
1868            }
1869            "DeleteResource" => {
1870                let req: volt::DeleteResourceRequest = serde_json::from_value(request)?;
1871                let response = client
1872                    .delete_resource(auth_request!(req))
1873                    .await
1874                    .map_err(VoltError::GrpcError)?;
1875                let resp = response.into_inner();
1876                serde_json::to_value(&resp)?
1877            }
1878            "SaveResource" => {
1879                let req: volt::SaveResourceRequest = serde_json::from_value(request)?;
1880                let response = client
1881                    .save_resource(auth_request!(req))
1882                    .await
1883                    .map_err(VoltError::GrpcError)?;
1884                let resp = response.into_inner();
1885                serde_json::to_value(&resp)?
1886            }
1887            "RequestAccess" => {
1888                let req: volt::RequestAccessRequest = serde_json::from_value(request)?;
1889                let response = client
1890                    .request_access(auth_request!(req))
1891                    .await
1892                    .map_err(VoltError::GrpcError)?;
1893                let resp = response.into_inner();
1894                serde_json::to_value(&resp)?
1895            }
1896            "Authenticate" => {
1897                let req: volt::AuthenticateRequest = serde_json::from_value(request)?;
1898                let response = client
1899                    .authenticate(auth_request!(req))
1900                    .await
1901                    .map_err(VoltError::GrpcError)?;
1902                let resp = response.into_inner();
1903                serde_json::to_value(&resp)?
1904            }
1905            "DiscoverServices" => {
1906                let req: volt::DiscoverServicesRequest = serde_json::from_value(request)?;
1907                let response = client
1908                    .discover_services(auth_request!(req))
1909                    .await
1910                    .map_err(VoltError::GrpcError)?;
1911                let resp = response.into_inner();
1912                serde_json::to_value(&resp)?
1913            }
1914            "GetResourceAncestors" => {
1915                let req: volt::GetResourceAncestorsRequest = serde_json::from_value(request)?;
1916                let response = client
1917                    .get_resource_ancestors(auth_request!(req))
1918                    .await
1919                    .map_err(VoltError::GrpcError)?;
1920                let resp = response.into_inner();
1921                serde_json::to_value(&resp)?
1922            }
1923            "GetResourceDescendants" => {
1924                let req: volt::GetResourceDescendantsRequest = serde_json::from_value(request)?;
1925                let response = client
1926                    .get_resource_descendants(auth_request!(req))
1927                    .await
1928                    .map_err(VoltError::GrpcError)?;
1929                let resp = response.into_inner();
1930                serde_json::to_value(&resp)?
1931            }
1932            "SaveAccess" => {
1933                let req: volt::SaveAccessRequest = serde_json::from_value(request)?;
1934                let response = client
1935                    .save_access(auth_request!(req))
1936                    .await
1937                    .map_err(VoltError::GrpcError)?;
1938                let resp = response.into_inner();
1939                serde_json::to_value(&resp)?
1940            }
1941            "GetIdentities" => {
1942                let req: volt::GetIdentitiesRequest = serde_json::from_value(request)?;
1943                let response = client
1944                    .get_identities(auth_request!(req))
1945                    .await
1946                    .map_err(VoltError::GrpcError)?;
1947                let resp = response.into_inner();
1948                serde_json::to_value(&resp)?
1949            }
1950            "GetIdentity" => {
1951                let req: volt::GetIdentityRequest = serde_json::from_value(request)?;
1952                let response = client
1953                    .get_identity(auth_request!(req))
1954                    .await
1955                    .map_err(VoltError::GrpcError)?;
1956                let resp = response.into_inner();
1957                serde_json::to_value(&resp)?
1958            }
1959            "GetPolicy" => {
1960                let req: volt::GetPolicyRequest = serde_json::from_value(request)?;
1961                let response = client
1962                    .get_policy(auth_request!(req))
1963                    .await
1964                    .map_err(VoltError::GrpcError)?;
1965                let resp = response.into_inner();
1966                serde_json::to_value(&resp)?
1967            }
1968            "GetAccess" => {
1969                let req: volt::GetAccessRequest = serde_json::from_value(request)?;
1970                let response = client
1971                    .get_access(auth_request!(req))
1972                    .await
1973                    .map_err(VoltError::GrpcError)?;
1974                let resp = response.into_inner();
1975                serde_json::to_value(&resp)?
1976            }
1977            "SaveIdentity" => {
1978                let req: volt::SaveIdentityRequest = serde_json::from_value(request)?;
1979                let response = client
1980                    .save_identity(auth_request!(req))
1981                    .await
1982                    .map_err(VoltError::GrpcError)?;
1983                let resp = response.into_inner();
1984                serde_json::to_value(&resp)?
1985            }
1986            "Shutdown" => {
1987                let req: volt::ShutdownRequest = serde_json::from_value(request)?;
1988                let response = client
1989                    .shutdown(auth_request!(req))
1990                    .await
1991                    .map_err(VoltError::GrpcError)?;
1992                let resp = response.into_inner();
1993                serde_json::to_value(&resp)?
1994            }
1995            _ => {
1996                return Err(VoltError::MethodNotFound(format!(
1997                    "Unknown method: {}",
1998                    method
1999                )));
2000            }
2001        };
2002
2003        // Extract status if present
2004        let status = response_json
2005            .get("status")
2006            .and_then(|s| serde_json::from_value(s.clone()).ok());
2007
2008        Ok(ApiResponse {
2009            status,
2010            payload: response_json,
2011        })
2012    }
2013}
2014
2015impl Default for VoltClient {
2016    fn default() -> Self {
2017        Self::new().expect("Failed to create default VoltClient")
2018    }
2019}
2020
2021/// Source for client configuration
2022pub enum ConfigSource {
2023    /// Load from file path
2024    File(std::path::PathBuf),
2025    /// Use existing config
2026    Config(VoltClientConfig),
2027    /// Parse from JSON
2028    Json(serde_json::Value),
2029}
2030
2031impl From<&str> for ConfigSource {
2032    fn from(s: &str) -> Self {
2033        ConfigSource::File(std::path::PathBuf::from(s))
2034    }
2035}
2036
2037impl From<std::path::PathBuf> for ConfigSource {
2038    fn from(p: std::path::PathBuf) -> Self {
2039        ConfigSource::File(p)
2040    }
2041}
2042
2043impl From<VoltClientConfig> for ConfigSource {
2044    fn from(c: VoltClientConfig) -> Self {
2045        ConfigSource::Config(c)
2046    }
2047}
2048
2049impl From<serde_json::Value> for ConfigSource {
2050    fn from(v: serde_json::Value) -> Self {
2051        ConfigSource::Json(v)
2052    }
2053}
2054
2055#[cfg(test)]
2056mod tests {
2057    use super::*;
2058
2059    #[tokio::test]
2060    async fn test_client_creation() {
2061        let client = VoltClient::new().unwrap();
2062        assert!(client.config().is_none());
2063        assert!(!client.is_connected().await);
2064    }
2065
2066    #[test]
2067    fn test_config_source() {
2068        let _source: ConfigSource = "path/to/config.json".into();
2069        let _source: ConfigSource = std::path::PathBuf::from("config.json").into();
2070        let _source: ConfigSource = serde_json::json!({"client_name": "test"}).into();
2071    }
2072}
2073
2074/// Normalize payload to handle empty identityDid in subject
2075fn normalize_payload(value: serde_json::Value) -> serde_json::Value {
2076    match value {
2077        serde_json::Value::Object(map) => {
2078            let mut new_map = serde_json::Map::new();
2079            for (key, val) in map {
2080                if key == "subject" {
2081                    if let serde_json::Value::Object(mut subject_map) = val {
2082                        if let Some(serde_json::Value::String(s)) =
2083                            subject_map.get("credentialLookup")
2084                        {
2085                            if s == "'[EVERYONE]'" {
2086                                // For everyone, remove the subject entirely
2087                                subject_map.clear();
2088                            }
2089                        } else if let Some(serde_json::Value::String(s)) =
2090                            subject_map.get("identityDid")
2091                        {
2092                            if s.is_empty() {
2093                                subject_map.insert(
2094                                    "identityDid".to_string(),
2095                                    serde_json::Value::String(
2096                                        "did:volt:00000000-0000-0000-0000-000000000000".to_string(),
2097                                    ),
2098                                );
2099                            }
2100                        }
2101                        if !subject_map.is_empty() {
2102                            new_map.insert(key, serde_json::Value::Object(subject_map));
2103                        }
2104                    } else {
2105                        new_map.insert(key, normalize_payload(val));
2106                    }
2107                } else {
2108                    new_map.insert(key, normalize_payload(val));
2109                }
2110            }
2111            serde_json::Value::Object(new_map)
2112        }
2113        serde_json::Value::Array(arr) => {
2114            serde_json::Value::Array(arr.into_iter().map(normalize_payload).collect())
2115        }
2116        other => other,
2117    }
2118}