1use crate::error::{NexusError, Result};
12use crate::models::*;
13use crate::transport::endpoint::Endpoint;
14use crate::transport::http::{HttpCredentials, HttpTransport, nexus_to_json};
15use crate::transport::rpc::{RpcCredentials, RpcTransport};
16use crate::transport::{Transport, TransportMode, TransportRequest};
17use base64::Engine;
18use nexus_protocol::rpc::types::NexusValue;
19use reqwest::{Client, ClientBuilder, Response};
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use url::Url;
24
25#[derive(Clone)]
27pub struct NexusClient {
28 transport: Arc<dyn Transport>,
31 client: Client,
36 base_url: Url,
39 api_key: Option<String>,
40 username: Option<String>,
41 password: Option<String>,
42 #[allow(dead_code)]
43 max_retries: u32,
44}
45
46impl std::fmt::Debug for NexusClient {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("NexusClient")
49 .field("transport", &self.transport.describe())
50 .field("base_url", &self.base_url)
51 .finish()
52 }
53}
54
55impl NexusClient {
56 pub(crate) fn get_client(&self) -> &Client {
57 &self.client
58 }
59
60 pub(crate) fn get_base_url(&self) -> &Url {
61 &self.base_url
62 }
63
64 pub fn endpoint_description(&self) -> String {
68 self.transport.describe()
69 }
70
71 pub fn is_rpc(&self) -> bool {
74 self.transport.is_rpc()
75 }
76
77 pub fn new(base_url: &str) -> Result<Self> {
96 Self::with_config(ClientConfig {
97 base_url: base_url.to_string(),
98 ..Default::default()
99 })
100 }
101
102 pub fn with_api_key(base_url: &str, api_key: &str) -> Result<Self> {
104 Self::with_config(ClientConfig {
105 base_url: base_url.to_string(),
106 api_key: Some(api_key.to_string()),
107 ..Default::default()
108 })
109 }
110
111 pub fn with_credentials(base_url: &str, username: &str, password: &str) -> Result<Self> {
113 Self::with_config(ClientConfig {
114 base_url: base_url.to_string(),
115 username: Some(username.to_string()),
116 password: Some(password.to_string()),
117 ..Default::default()
118 })
119 }
120
121 pub fn with_config(config: ClientConfig) -> Result<Self> {
131 let endpoint = Endpoint::parse(&config.base_url)?;
132 let url_force = endpoint.scheme;
133
134 let env_mode = std::env::var("NEXUS_SDK_TRANSPORT")
136 .ok()
137 .and_then(|s| TransportMode::parse(&s));
138 let effective_mode = match url_force {
139 crate::transport::endpoint::Scheme::Rpc => Some(TransportMode::NexusRpc),
140 crate::transport::endpoint::Scheme::Http => Some(TransportMode::Http),
141 crate::transport::endpoint::Scheme::Https => Some(TransportMode::Https),
142 crate::transport::endpoint::Scheme::Resp3 => Some(TransportMode::Resp3),
143 };
144 let mode = effective_mode
145 .or(env_mode)
146 .or(config.transport)
147 .unwrap_or(TransportMode::NexusRpc);
148
149 let timeout = Duration::from_secs(config.timeout_secs);
153 let http_client_builder = ClientBuilder::new()
154 .timeout(timeout)
155 .user_agent(format!("nexus-sdk/{}", env!("CARGO_PKG_VERSION")));
156 let http_client = http_client_builder.build()?;
157
158 let url_for_legacy = match mode {
162 TransportMode::NexusRpc | TransportMode::Resp3 => endpoint.as_http_url(),
163 TransportMode::Http | TransportMode::Https => config.base_url.clone(),
164 };
165 let base_url = Url::parse(&url_for_legacy)
166 .map_err(|e| NexusError::Configuration(format!("Invalid base URL: {}", e)))?;
167
168 let transport: Arc<dyn Transport> = match mode {
170 TransportMode::NexusRpc => Arc::new(RpcTransport::new(
171 endpoint.clone(),
172 RpcCredentials {
173 api_key: config.api_key.clone(),
174 username: config.username.clone(),
175 password: config.password.clone(),
176 },
177 )),
178 TransportMode::Http | TransportMode::Https => Arc::new(HttpTransport::new(
179 endpoint.clone(),
180 HttpCredentials {
181 api_key: config.api_key.clone(),
182 username: config.username.clone(),
183 password: config.password.clone(),
184 },
185 config.timeout_secs,
186 )?),
187 TransportMode::Resp3 => {
188 return Err(NexusError::Configuration(
189 "RESP3 transport is not yet implemented in the Rust SDK \
190 (see phase2_sdk-rpc-transport-default §2.3). \
191 Use 'nexus://' for binary RPC or 'http://' for HTTP/JSON."
192 .to_string(),
193 ));
194 }
195 };
196
197 Ok(Self {
198 transport,
199 client: http_client,
200 base_url,
201 api_key: config.api_key,
202 username: config.username,
203 password: config.password,
204 max_retries: config.max_retries,
205 })
206 }
207
208 pub async fn execute_cypher(
215 &self,
216 query: &str,
217 parameters: Option<HashMap<String, Value>>,
218 ) -> Result<QueryResult> {
219 let mut args = vec![NexusValue::Str(query.to_string())];
220 if let Some(params) = parameters {
221 let mut pairs = Vec::with_capacity(params.len());
222 for (k, v) in params {
223 pairs.push((NexusValue::Str(k), value_to_nexus(v)));
224 }
225 args.push(NexusValue::Map(pairs));
226 }
227 let resp = self
228 .transport
229 .execute(TransportRequest {
230 command: "CYPHER".to_string(),
231 args,
232 })
233 .await?;
234 cypher_envelope_to_query_result(resp.value)
235 }
236
237 pub async fn get_stats(&self) -> Result<DatabaseStats> {
239 let resp = self
240 .transport
241 .execute(TransportRequest {
242 command: "STATS".to_string(),
243 args: vec![],
244 })
245 .await?;
246 let json = nexus_to_json(&resp.value);
254 let obj = json.as_object().ok_or_else(|| {
255 NexusError::Network(format!("STATS reply must be a map, got {:?}", resp.value))
256 })?;
257
258 if obj.contains_key("catalog") {
260 return Ok(serde_json::from_value(json)?);
261 }
262
263 let u = |k: &str| obj.get(k).and_then(|v| v.as_u64()).unwrap_or(0) as usize;
267 let catalog = CatalogStats {
268 node_count: u("nodes"),
269 rel_count: u("relationships"),
270 label_count: u("labels"),
271 rel_type_count: u("rel_types"),
272 };
273 Ok(DatabaseStats {
274 catalog,
275 label_index: LabelIndexStats::default(),
276 knn_index: KnnIndexStats::default(),
277 })
278 }
279
280 pub async fn health_check(&self) -> Result<bool> {
283 match self
284 .transport
285 .execute(TransportRequest {
286 command: if self.is_rpc() { "PING" } else { "HEALTH" }.to_string(),
287 args: vec![],
288 })
289 .await
290 {
291 Ok(_) => Ok(true),
292 Err(_) => Ok(false),
293 }
294 }
295
296 pub(crate) fn add_auth_headers(
300 &self,
301 mut builder: reqwest::RequestBuilder,
302 ) -> Result<reqwest::RequestBuilder> {
303 if let Some(api_key) = &self.api_key {
304 builder = builder.header("X-API-Key", api_key);
305 } else if let (Some(username), Some(password)) = (&self.username, &self.password) {
306 let auth = base64::engine::general_purpose::STANDARD
307 .encode(format!("{}:{}", username, password));
308 builder = builder.header("Authorization", format!("Basic {}", auth));
309 }
310 Ok(builder)
311 }
312
313 pub(crate) async fn execute_with_retry(
316 &self,
317 builder: reqwest::RequestBuilder,
318 ) -> Result<Response> {
319 let max_retries = self.max_retries;
320 let mut last_error = None;
321
322 for attempt in 0..=max_retries {
323 match builder.try_clone() {
324 Some(cloned_builder) => match cloned_builder.send().await {
325 Ok(response) => {
326 let status = response.status();
327 if status.is_server_error() && attempt < max_retries {
328 let delay_ms = 100u64 * (1u64 << attempt.min(5));
329 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
330 continue;
331 }
332 return Ok(response);
333 }
334 Err(e) => {
335 let is_retryable = e.is_timeout() || e.is_connect() || e.is_request();
336 last_error = Some(e);
337 if is_retryable && attempt < max_retries {
338 let delay_ms = 100u64 * (1u64 << attempt.min(5));
339 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
340 continue;
341 }
342 break;
343 }
344 },
345 None => {
346 return builder.send().await.map_err(NexusError::Http);
347 }
348 }
349 }
350
351 match last_error {
352 Some(e) => Err(NexusError::Http(e)),
353 None => Err(NexusError::Network(
354 "Request failed after retries".to_string(),
355 )),
356 }
357 }
358
359 pub async fn list_databases(&self) -> Result<ListDatabasesResponse> {
368 let url = self.get_base_url().join("/databases")?;
369 let mut request_builder = self.get_client().get(url);
370 request_builder = self.add_auth_headers(request_builder)?;
371
372 let response = self.execute_with_retry(request_builder).await?;
373 let result: ListDatabasesResponse = response.json().await?;
374 Ok(result)
375 }
376
377 pub async fn create_database(&self, name: &str) -> Result<CreateDatabaseResponse> {
379 let url = self.get_base_url().join("/databases")?;
380 let request = CreateDatabaseRequest {
381 name: name.to_string(),
382 };
383 let mut request_builder = self.get_client().post(url).json(&request);
384 request_builder = self.add_auth_headers(request_builder)?;
385
386 let response = self.execute_with_retry(request_builder).await?;
387 let result: CreateDatabaseResponse = response.json().await?;
388 Ok(result)
389 }
390
391 pub async fn get_database(&self, name: &str) -> Result<DatabaseInfo> {
393 let url = self.get_base_url().join(&format!("/databases/{}", name))?;
394 let mut request_builder = self.get_client().get(url);
395 request_builder = self.add_auth_headers(request_builder)?;
396
397 let response = self.execute_with_retry(request_builder).await?;
398 let result: DatabaseInfo = response.json().await?;
399 Ok(result)
400 }
401
402 pub async fn drop_database(&self, name: &str) -> Result<DropDatabaseResponse> {
404 let url = self.get_base_url().join(&format!("/databases/{}", name))?;
405 let mut request_builder = self.get_client().delete(url);
406 request_builder = self.add_auth_headers(request_builder)?;
407
408 let response = self.execute_with_retry(request_builder).await?;
409 let result: DropDatabaseResponse = response.json().await?;
410 Ok(result)
411 }
412
413 pub async fn get_current_database(&self) -> Result<String> {
415 let url = self.get_base_url().join("/session/database")?;
416 let mut request_builder = self.get_client().get(url);
417 request_builder = self.add_auth_headers(request_builder)?;
418
419 let response = self.execute_with_retry(request_builder).await?;
420 let result: SessionDatabaseResponse = response.json().await?;
421 Ok(result.database)
422 }
423
424 pub async fn switch_database(&self, name: &str) -> Result<SwitchDatabaseResponse> {
426 let url = self.get_base_url().join("/session/database")?;
427 let request = SwitchDatabaseRequest {
428 name: name.to_string(),
429 };
430 let mut request_builder = self.get_client().put(url).json(&request);
431 request_builder = self.add_auth_headers(request_builder)?;
432
433 let response = self.execute_with_retry(request_builder).await?;
434 let result: SwitchDatabaseResponse = response.json().await?;
435 Ok(result)
436 }
437}
438
439fn value_to_nexus(v: Value) -> NexusValue {
445 let json = serde_json::to_value(&v).unwrap_or(serde_json::Value::Null);
446 crate::transport::http::json_to_nexus(json)
447}
448
449fn cypher_envelope_to_query_result(value: NexusValue) -> Result<QueryResult> {
452 let json = nexus_to_json(&value);
453 let obj = json.as_object().ok_or_else(|| {
454 NexusError::Network(format!("CYPHER reply must be a map, got {:?}", value))
455 })?;
456
457 let columns = obj
458 .get("columns")
459 .and_then(|v| v.as_array())
460 .map(|arr| {
461 arr.iter()
462 .filter_map(|v| v.as_str().map(String::from))
463 .collect()
464 })
465 .unwrap_or_default();
466 let rows = obj
467 .get("rows")
468 .and_then(|v| v.as_array())
469 .cloned()
470 .unwrap_or_default();
471 let execution_time_ms = obj.get("execution_time_ms").and_then(|v| v.as_u64());
472 let error = obj.get("error").and_then(|v| v.as_str()).map(String::from);
473
474 if let Some(msg) = error.clone() {
475 return Err(NexusError::Api {
476 message: msg,
477 status: 0,
478 });
479 }
480
481 Ok(QueryResult {
482 columns,
483 rows,
484 execution_time_ms,
485 error,
486 })
487}