Skip to main content

nexus_sdk/
client.rs

1//! Nexus client implementation.
2//!
3//! Every public method on `NexusClient` routes through a
4//! [`crate::transport::Transport`] picked at construction time
5//! (RPC for `nexus://` URLs, HTTP for `http://` / `https://`,
6//! overridable via `ClientConfig.transport` or the
7//! `NEXUS_SDK_TRANSPORT` env var). The method signatures here match
8//! what the SDK shipped before `phase2_sdk-rpc-transport-default` so
9//! user code compiles unchanged.
10
11use crate::error::{NexusError, Result};
12use crate::models::*;
13use crate::transport::endpoint::Endpoint;
14use crate::transport::http::{HttpCredentials, HttpTransport, nexus_to_json};
15use crate::transport::rpc::{RpcCredentials, RpcTransport};
16use crate::transport::{Transport, TransportMode, TransportRequest};
17use base64::Engine;
18use nexus_protocol::rpc::types::NexusValue;
19use reqwest::{Client, ClientBuilder, Response};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use url::Url;
24
25/// Nexus client — transport-agnostic handle to a running server.
26#[derive(Clone)]
27pub struct NexusClient {
28    /// Active transport. `Arc` so `Clone` is cheap and independent
29    /// of whether the underlying connection is shared state.
30    transport: Arc<dyn Transport>,
31    /// Raw HTTP client — retained for a handful of legacy manager
32    /// methods (multi-database HTTP helpers) that have not yet been
33    /// ported to the Transport trait. New code SHOULD use
34    /// `self.transport.execute(...)` instead.
35    client: Client,
36    /// HTTP base URL derived from the endpoint. Used only by the
37    /// legacy paths above.
38    base_url: Url,
39    api_key: Option<String>,
40    username: Option<String>,
41    password: Option<String>,
42    #[allow(dead_code)]
43    max_retries: u32,
44}
45
46impl std::fmt::Debug for NexusClient {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("NexusClient")
49            .field("transport", &self.transport.describe())
50            .field("base_url", &self.base_url)
51            .finish()
52    }
53}
54
55impl NexusClient {
56    pub(crate) fn get_client(&self) -> &Client {
57        &self.client
58    }
59
60    pub(crate) fn get_base_url(&self) -> &Url {
61        &self.base_url
62    }
63
64    /// Describe the active transport (`"nexus://host:15475 (RPC)"`).
65    /// Useful for tracing and `--verbose`-style diagnostics in
66    /// applications wrapping the SDK.
67    pub fn endpoint_description(&self) -> String {
68        self.transport.describe()
69    }
70
71    /// True when the active transport uses the native binary RPC
72    /// wire format.
73    pub fn is_rpc(&self) -> bool {
74        self.transport.is_rpc()
75    }
76
77    /// Create a client with default configuration against `base_url`.
78    ///
79    /// `base_url` may use `nexus://`, `http://`, `https://`, or a bare
80    /// `host:port` (defaults to RPC). Loading a `nexus://` URL picks
81    /// the native binary RPC transport; any other scheme uses HTTP.
82    ///
83    /// # Example
84    ///
85    /// ```no_run
86    /// use nexus_sdk::NexusClient;
87    ///
88    /// // Binary RPC (fastest path)
89    /// let rpc = NexusClient::new("nexus://localhost:15475")?;
90    ///
91    /// // Legacy HTTP
92    /// let http = NexusClient::new("http://localhost:15474")?;
93    /// # Ok::<(), nexus_sdk::NexusError>(())
94    /// ```
95    pub fn new(base_url: &str) -> Result<Self> {
96        Self::with_config(ClientConfig {
97            base_url: base_url.to_string(),
98            ..Default::default()
99        })
100    }
101
102    /// Create a client with API key authentication.
103    pub fn with_api_key(base_url: &str, api_key: &str) -> Result<Self> {
104        Self::with_config(ClientConfig {
105            base_url: base_url.to_string(),
106            api_key: Some(api_key.to_string()),
107            ..Default::default()
108        })
109    }
110
111    /// Create a client with username/password authentication.
112    pub fn with_credentials(base_url: &str, username: &str, password: &str) -> Result<Self> {
113        Self::with_config(ClientConfig {
114            base_url: base_url.to_string(),
115            username: Some(username.to_string()),
116            password: Some(password.to_string()),
117            ..Default::default()
118        })
119    }
120
121    /// Create a client from a custom [`ClientConfig`].
122    ///
123    /// Transport precedence (strongest → weakest):
124    ///
125    /// 1. URL scheme in `config.base_url` (`nexus://` forces RPC,
126    ///    `http[s]://` forces HTTP).
127    /// 2. `NEXUS_SDK_TRANSPORT` env var.
128    /// 3. `config.transport` field.
129    /// 4. Default: `TransportMode::NexusRpc`.
130    pub fn with_config(config: ClientConfig) -> Result<Self> {
131        let endpoint = Endpoint::parse(&config.base_url)?;
132        let url_force = endpoint.scheme;
133
134        // Env var overrides the config field but not the URL scheme.
135        let env_mode = std::env::var("NEXUS_SDK_TRANSPORT")
136            .ok()
137            .and_then(|s| TransportMode::parse(&s));
138        let effective_mode = match url_force {
139            crate::transport::endpoint::Scheme::Rpc => Some(TransportMode::NexusRpc),
140            crate::transport::endpoint::Scheme::Http => Some(TransportMode::Http),
141            crate::transport::endpoint::Scheme::Https => Some(TransportMode::Https),
142            crate::transport::endpoint::Scheme::Resp3 => Some(TransportMode::Resp3),
143        };
144        let mode = effective_mode
145            .or(env_mode)
146            .or(config.transport)
147            .unwrap_or(TransportMode::NexusRpc);
148
149        // Build the legacy HTTP client regardless of active mode —
150        // a handful of manager methods still hit REST directly
151        // until their RPC verbs land.
152        let timeout = Duration::from_secs(config.timeout_secs);
153        let http_client_builder = ClientBuilder::new()
154            .timeout(timeout)
155            .user_agent(format!("nexus-sdk/{}", env!("CARGO_PKG_VERSION")));
156        let http_client = http_client_builder.build()?;
157
158        // Parse the base_url into a `url::Url` for the legacy path.
159        // `nexus://` isn't a scheme `url::Url` knows as HTTP-like, so
160        // synthesise an HTTP-equivalent URL for that storage.
161        let url_for_legacy = match mode {
162            TransportMode::NexusRpc | TransportMode::Resp3 => endpoint.as_http_url(),
163            TransportMode::Http | TransportMode::Https => config.base_url.clone(),
164        };
165        let base_url = Url::parse(&url_for_legacy)
166            .map_err(|e| NexusError::Configuration(format!("Invalid base URL: {}", e)))?;
167
168        // Build the transport per the resolved mode.
169        let transport: Arc<dyn Transport> = match mode {
170            TransportMode::NexusRpc => Arc::new(RpcTransport::new(
171                endpoint.clone(),
172                RpcCredentials {
173                    api_key: config.api_key.clone(),
174                    username: config.username.clone(),
175                    password: config.password.clone(),
176                },
177            )),
178            TransportMode::Http | TransportMode::Https => Arc::new(HttpTransport::new(
179                endpoint.clone(),
180                HttpCredentials {
181                    api_key: config.api_key.clone(),
182                    username: config.username.clone(),
183                    password: config.password.clone(),
184                },
185                config.timeout_secs,
186            )?),
187            TransportMode::Resp3 => {
188                return Err(NexusError::Configuration(
189                    "RESP3 transport is not yet implemented in the Rust SDK \
190                     (see phase2_sdk-rpc-transport-default §2.3). \
191                     Use 'nexus://' for binary RPC or 'http://' for HTTP/JSON."
192                        .to_string(),
193                ));
194            }
195        };
196
197        Ok(Self {
198            transport,
199            client: http_client,
200            base_url,
201            api_key: config.api_key,
202            username: config.username,
203            password: config.password,
204            max_retries: config.max_retries,
205        })
206    }
207
208    // ══ Transport-routed methods ═════════════════════════════════════════════
209    // Every method below goes through `self.transport.execute(...)`. The RPC
210    // path hits `nexus-server/src/protocol/rpc/dispatch/*`; the HTTP path hits
211    // the sibling REST handler. See `docs/specs/sdk-transport.md`.
212
213    /// Execute a Cypher query.
214    pub async fn execute_cypher(
215        &self,
216        query: &str,
217        parameters: Option<HashMap<String, Value>>,
218    ) -> Result<QueryResult> {
219        let mut args = vec![NexusValue::Str(query.to_string())];
220        if let Some(params) = parameters {
221            let mut pairs = Vec::with_capacity(params.len());
222            for (k, v) in params {
223                pairs.push((NexusValue::Str(k), value_to_nexus(v)));
224            }
225            args.push(NexusValue::Map(pairs));
226        }
227        let resp = self
228            .transport
229            .execute(TransportRequest {
230                command: "CYPHER".to_string(),
231                args,
232            })
233            .await?;
234        cypher_envelope_to_query_result(resp.value)
235    }
236
237    /// Get database statistics.
238    pub async fn get_stats(&self) -> Result<DatabaseStats> {
239        let resp = self
240            .transport
241            .execute(TransportRequest {
242                command: "STATS".to_string(),
243                args: vec![],
244            })
245            .await?;
246        // The RPC STATS envelope is flat (`{nodes, relationships,
247        // labels, rel_types, page_cache_hits/misses, wal_entries,
248        // active_transactions}`); the REST shape is nested
249        // (`{catalog: {node_count, ...}, label_index: {...}}`). The
250        // SDK's public `DatabaseStats` type is the nested shape —
251        // synthesise it from whichever form arrived so callers see a
252        // consistent struct regardless of transport.
253        let json = nexus_to_json(&resp.value);
254        let obj = json.as_object().ok_or_else(|| {
255            NexusError::Network(format!("STATS reply must be a map, got {:?}", resp.value))
256        })?;
257
258        // REST path: serde straight through.
259        if obj.contains_key("catalog") {
260            return Ok(serde_json::from_value(json)?);
261        }
262
263        // RPC flat path: hand-populate `CatalogStats` from the RPC
264        // field names. Legacy fields we no longer expose via RPC
265        // (`label_index`, `knn_index`) fall back to empty defaults.
266        let u = |k: &str| obj.get(k).and_then(|v| v.as_u64()).unwrap_or(0) as usize;
267        let catalog = CatalogStats {
268            node_count: u("nodes"),
269            rel_count: u("relationships"),
270            label_count: u("labels"),
271            rel_type_count: u("rel_types"),
272        };
273        Ok(DatabaseStats {
274            catalog,
275            label_index: LabelIndexStats::default(),
276            knn_index: KnnIndexStats::default(),
277        })
278    }
279
280    /// Health check — returns `true` if the server responds
281    /// successfully on the active transport.
282    pub async fn health_check(&self) -> Result<bool> {
283        match self
284            .transport
285            .execute(TransportRequest {
286                command: if self.is_rpc() { "PING" } else { "HEALTH" }.to_string(),
287                args: vec![],
288            })
289            .await
290        {
291            Ok(_) => Ok(true),
292            Err(_) => Ok(false),
293        }
294    }
295
296    /// Add authentication headers to an HTTP-fallback request.
297    /// Retained for compatibility with the multi-database manager
298    /// methods below; new code does NOT need this.
299    pub(crate) fn add_auth_headers(
300        &self,
301        mut builder: reqwest::RequestBuilder,
302    ) -> Result<reqwest::RequestBuilder> {
303        if let Some(api_key) = &self.api_key {
304            builder = builder.header("X-API-Key", api_key);
305        } else if let (Some(username), Some(password)) = (&self.username, &self.password) {
306            let auth = base64::engine::general_purpose::STANDARD
307                .encode(format!("{}:{}", username, password));
308            builder = builder.header("Authorization", format!("Basic {}", auth));
309        }
310        Ok(builder)
311    }
312
313    /// Execute an HTTP-fallback request with retries. Retained for
314    /// the manager methods that still use REST directly.
315    pub(crate) async fn execute_with_retry(
316        &self,
317        builder: reqwest::RequestBuilder,
318    ) -> Result<Response> {
319        let max_retries = self.max_retries;
320        let mut last_error = None;
321
322        for attempt in 0..=max_retries {
323            match builder.try_clone() {
324                Some(cloned_builder) => match cloned_builder.send().await {
325                    Ok(response) => {
326                        let status = response.status();
327                        if status.is_server_error() && attempt < max_retries {
328                            let delay_ms = 100u64 * (1u64 << attempt.min(5));
329                            tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
330                            continue;
331                        }
332                        return Ok(response);
333                    }
334                    Err(e) => {
335                        let is_retryable = e.is_timeout() || e.is_connect() || e.is_request();
336                        last_error = Some(e);
337                        if is_retryable && attempt < max_retries {
338                            let delay_ms = 100u64 * (1u64 << attempt.min(5));
339                            tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
340                            continue;
341                        }
342                        break;
343                    }
344                },
345                None => {
346                    return builder.send().await.map_err(NexusError::Http);
347                }
348            }
349        }
350
351        match last_error {
352            Some(e) => Err(NexusError::Http(e)),
353            None => Err(NexusError::Network(
354                "Request failed after retries".to_string(),
355            )),
356        }
357    }
358
359    // =========================================================================
360    // Legacy database-management methods — still REST-only.
361    // These are kept as-is because the server-side /databases/* routes
362    // were not part of phase 2's RPC surface; adding them is tracked in
363    // a follow-up.
364    // =========================================================================
365
366    /// List all databases (REST).
367    pub async fn list_databases(&self) -> Result<ListDatabasesResponse> {
368        let url = self.get_base_url().join("/databases")?;
369        let mut request_builder = self.get_client().get(url);
370        request_builder = self.add_auth_headers(request_builder)?;
371
372        let response = self.execute_with_retry(request_builder).await?;
373        let result: ListDatabasesResponse = response.json().await?;
374        Ok(result)
375    }
376
377    /// Create a new database (REST).
378    pub async fn create_database(&self, name: &str) -> Result<CreateDatabaseResponse> {
379        let url = self.get_base_url().join("/databases")?;
380        let request = CreateDatabaseRequest {
381            name: name.to_string(),
382        };
383        let mut request_builder = self.get_client().post(url).json(&request);
384        request_builder = self.add_auth_headers(request_builder)?;
385
386        let response = self.execute_with_retry(request_builder).await?;
387        let result: CreateDatabaseResponse = response.json().await?;
388        Ok(result)
389    }
390
391    /// Get database information (REST).
392    pub async fn get_database(&self, name: &str) -> Result<DatabaseInfo> {
393        let url = self.get_base_url().join(&format!("/databases/{}", name))?;
394        let mut request_builder = self.get_client().get(url);
395        request_builder = self.add_auth_headers(request_builder)?;
396
397        let response = self.execute_with_retry(request_builder).await?;
398        let result: DatabaseInfo = response.json().await?;
399        Ok(result)
400    }
401
402    /// Drop a database (REST).
403    pub async fn drop_database(&self, name: &str) -> Result<DropDatabaseResponse> {
404        let url = self.get_base_url().join(&format!("/databases/{}", name))?;
405        let mut request_builder = self.get_client().delete(url);
406        request_builder = self.add_auth_headers(request_builder)?;
407
408        let response = self.execute_with_retry(request_builder).await?;
409        let result: DropDatabaseResponse = response.json().await?;
410        Ok(result)
411    }
412
413    /// Get the current session database (REST).
414    pub async fn get_current_database(&self) -> Result<String> {
415        let url = self.get_base_url().join("/session/database")?;
416        let mut request_builder = self.get_client().get(url);
417        request_builder = self.add_auth_headers(request_builder)?;
418
419        let response = self.execute_with_retry(request_builder).await?;
420        let result: SessionDatabaseResponse = response.json().await?;
421        Ok(result.database)
422    }
423
424    /// Switch to a different database (REST).
425    pub async fn switch_database(&self, name: &str) -> Result<SwitchDatabaseResponse> {
426        let url = self.get_base_url().join("/session/database")?;
427        let request = SwitchDatabaseRequest {
428            name: name.to_string(),
429        };
430        let mut request_builder = self.get_client().put(url).json(&request);
431        request_builder = self.add_auth_headers(request_builder)?;
432
433        let response = self.execute_with_retry(request_builder).await?;
434        let result: SwitchDatabaseResponse = response.json().await?;
435        Ok(result)
436    }
437}
438
439// ── Helpers ────────────────────────────────────────────────────────────────
440
441/// Map the SDK's `Value` type onto `NexusValue`. `Value` already
442/// carries the same shape as `serde_json::Value`, so we serialise
443/// through that and reuse the HTTP transport's JSON→Nexus helper.
444fn value_to_nexus(v: Value) -> NexusValue {
445    let json = serde_json::to_value(&v).unwrap_or(serde_json::Value::Null);
446    crate::transport::http::json_to_nexus(json)
447}
448
449/// Decode the CYPHER reply envelope (`{columns, rows, execution_time_ms, error}`)
450/// into `QueryResult`.
451fn cypher_envelope_to_query_result(value: NexusValue) -> Result<QueryResult> {
452    let json = nexus_to_json(&value);
453    let obj = json.as_object().ok_or_else(|| {
454        NexusError::Network(format!("CYPHER reply must be a map, got {:?}", value))
455    })?;
456
457    let columns = obj
458        .get("columns")
459        .and_then(|v| v.as_array())
460        .map(|arr| {
461            arr.iter()
462                .filter_map(|v| v.as_str().map(String::from))
463                .collect()
464        })
465        .unwrap_or_default();
466    let rows = obj
467        .get("rows")
468        .and_then(|v| v.as_array())
469        .cloned()
470        .unwrap_or_default();
471    let execution_time_ms = obj.get("execution_time_ms").and_then(|v| v.as_u64());
472    let error = obj.get("error").and_then(|v| v.as_str()).map(String::from);
473
474    if let Some(msg) = error.clone() {
475        return Err(NexusError::Api {
476            message: msg,
477            status: 0,
478        });
479    }
480
481    Ok(QueryResult {
482        columns,
483        rows,
484        execution_time_ms,
485        error,
486    })
487}