Skip to main content

dakera_client/
client.rs

1//! Dakera client implementation
2
3use reqwest::{Client, StatusCode};
4use std::time::Duration;
5use tracing::{debug, instrument};
6
7use serde::Deserialize;
8
9use crate::error::{ClientError, Result, ServerErrorCode};
10use crate::types::*;
11
12/// Default timeout for requests
13const DEFAULT_TIMEOUT_SECS: u64 = 30;
14
15/// Dakeraclient for interacting with the vector database
16#[derive(Debug, Clone)]
17pub struct DakeraClient {
18    /// HTTP client
19    pub(crate) client: Client,
20    /// Base URL of the Dakera server
21    pub(crate) base_url: String,
22}
23
24impl DakeraClient {
25    /// Create a new client with the given base URL
26    ///
27    /// # Example
28    ///
29    /// ```rust,no_run
30    /// use dakera_client::DakeraClient;
31    ///
32    /// let client = DakeraClient::new("http://localhost:3000").unwrap();
33    /// ```
34    pub fn new(base_url: impl Into<String>) -> Result<Self> {
35        DakeraClientBuilder::new(base_url).build()
36    }
37
38    /// Create a new client builder for more configuration options
39    pub fn builder(base_url: impl Into<String>) -> DakeraClientBuilder {
40        DakeraClientBuilder::new(base_url)
41    }
42
43    // ========================================================================
44    // Health & Status
45    // ========================================================================
46
47    /// Check server health
48    #[instrument(skip(self))]
49    pub async fn health(&self) -> Result<HealthResponse> {
50        let url = format!("{}/health", self.base_url);
51        let response = self.client.get(&url).send().await?;
52
53        if response.status().is_success() {
54            Ok(response.json().await?)
55        } else {
56            // Health endpoint might return simple OK
57            Ok(HealthResponse {
58                healthy: true,
59                version: None,
60                uptime_seconds: None,
61            })
62        }
63    }
64
65    /// Check if server is ready
66    #[instrument(skip(self))]
67    pub async fn ready(&self) -> Result<ReadinessResponse> {
68        let url = format!("{}/health/ready", self.base_url);
69        let response = self.client.get(&url).send().await?;
70
71        if response.status().is_success() {
72            Ok(response.json().await?)
73        } else {
74            Ok(ReadinessResponse {
75                ready: false,
76                components: None,
77            })
78        }
79    }
80
81    /// Check if server is live
82    #[instrument(skip(self))]
83    pub async fn live(&self) -> Result<bool> {
84        let url = format!("{}/health/live", self.base_url);
85        let response = self.client.get(&url).send().await?;
86        Ok(response.status().is_success())
87    }
88
89    // ========================================================================
90    // Namespace Operations
91    // ========================================================================
92
93    /// List all namespaces
94    #[instrument(skip(self))]
95    pub async fn list_namespaces(&self) -> Result<Vec<String>> {
96        let url = format!("{}/v1/namespaces", self.base_url);
97        let response = self.client.get(&url).send().await?;
98        self.handle_response::<ListNamespacesResponse>(response)
99            .await
100            .map(|r| r.namespaces)
101    }
102
103    /// Get namespace information
104    #[instrument(skip(self))]
105    pub async fn get_namespace(&self, namespace: &str) -> Result<NamespaceInfo> {
106        let url = format!("{}/v1/namespaces/{}", self.base_url, namespace);
107        let response = self.client.get(&url).send().await?;
108        self.handle_response(response).await
109    }
110
111    /// Create a new namespace
112    #[instrument(skip(self, request))]
113    pub async fn create_namespace(
114        &self,
115        namespace: &str,
116        request: CreateNamespaceRequest,
117    ) -> Result<NamespaceInfo> {
118        let url = format!("{}/v1/namespaces/{}", self.base_url, namespace);
119        let response = self.client.post(&url).json(&request).send().await?;
120        self.handle_response(response).await
121    }
122
123    /// Create or update a namespace configuration (upsert semantics — v0.6.0).
124    ///
125    /// Creates the namespace if it does not exist, or updates its distance-metric
126    /// configuration if it already exists.  Dimension changes are rejected to
127    /// prevent silent data corruption.  Requires `Scope::Write`.
128    #[instrument(skip(self, request), fields(namespace = %namespace))]
129    pub async fn configure_namespace(
130        &self,
131        namespace: &str,
132        request: ConfigureNamespaceRequest,
133    ) -> Result<ConfigureNamespaceResponse> {
134        let url = format!("{}/v1/namespaces/{}", self.base_url, namespace);
135        let response = self.client.put(&url).json(&request).send().await?;
136        self.handle_response(response).await
137    }
138
139    // ========================================================================
140    // Vector Operations
141    // ========================================================================
142
143    /// Upsert vectors into a namespace
144    #[instrument(skip(self, request), fields(vector_count = request.vectors.len()))]
145    pub async fn upsert(&self, namespace: &str, request: UpsertRequest) -> Result<UpsertResponse> {
146        let url = format!("{}/v1/namespaces/{}/vectors", self.base_url, namespace);
147        debug!(
148            "Upserting {} vectors to {}",
149            request.vectors.len(),
150            namespace
151        );
152
153        let response = self.client.post(&url).json(&request).send().await?;
154        self.handle_response(response).await
155    }
156
157    /// Upsert a single vector (convenience method)
158    #[instrument(skip(self, vector))]
159    pub async fn upsert_one(&self, namespace: &str, vector: Vector) -> Result<UpsertResponse> {
160        self.upsert(namespace, UpsertRequest::single(vector)).await
161    }
162
163    /// Upsert vectors in column format (Turbopuffer-inspired)
164    ///
165    /// This format is more efficient for bulk upserts as it avoids repeating
166    /// field names for each vector. All arrays must have equal length.
167    ///
168    /// # Example
169    ///
170    /// ```rust,no_run
171    /// use dakera_client::{DakeraClient, ColumnUpsertRequest};
172    ///
173    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
174    /// let client = DakeraClient::new("http://localhost:3000")?;
175    ///
176    /// let request = ColumnUpsertRequest::new(
177    ///     vec!["id1".to_string(), "id2".to_string(), "id3".to_string()],
178    ///     vec![
179    ///         vec![0.1, 0.2, 0.3],
180    ///         vec![0.4, 0.5, 0.6],
181    ///         vec![0.7, 0.8, 0.9],
182    ///     ],
183    /// )
184    /// .with_attribute("category", vec![
185    ///     serde_json::json!("A"),
186    ///     serde_json::json!("B"),
187    ///     serde_json::json!("A"),
188    /// ]);
189    ///
190    /// let response = client.upsert_columns("my-namespace", request).await?;
191    /// println!("Upserted {} vectors", response.upserted_count);
192    /// # Ok(())
193    /// # }
194    /// ```
195    #[instrument(skip(self, request), fields(namespace = %namespace, count = request.ids.len()))]
196    pub async fn upsert_columns(
197        &self,
198        namespace: &str,
199        request: ColumnUpsertRequest,
200    ) -> Result<UpsertResponse> {
201        let url = format!(
202            "{}/v1/namespaces/{}/upsert-columns",
203            self.base_url, namespace
204        );
205        debug!(
206            "Upserting {} vectors in column format to {}",
207            request.ids.len(),
208            namespace
209        );
210
211        let response = self.client.post(&url).json(&request).send().await?;
212        self.handle_response(response).await
213    }
214
215    /// Query for similar vectors
216    #[instrument(skip(self, request), fields(top_k = request.top_k))]
217    pub async fn query(&self, namespace: &str, request: QueryRequest) -> Result<QueryResponse> {
218        let url = format!("{}/v1/namespaces/{}/query", self.base_url, namespace);
219        debug!(
220            "Querying namespace {} for top {} results",
221            namespace, request.top_k
222        );
223
224        let response = self.client.post(&url).json(&request).send().await?;
225        self.handle_response(response).await
226    }
227
228    /// Simple query with just a vector and top_k (convenience method)
229    #[instrument(skip(self, vector))]
230    pub async fn query_simple(
231        &self,
232        namespace: &str,
233        vector: Vec<f32>,
234        top_k: u32,
235    ) -> Result<QueryResponse> {
236        self.query(namespace, QueryRequest::new(vector, top_k))
237            .await
238    }
239
240    /// Execute multiple queries in a single request
241    ///
242    /// This allows executing multiple vector similarity queries in parallel,
243    /// which is more efficient than making separate requests.
244    ///
245    /// # Example
246    ///
247    /// ```rust,no_run
248    /// use dakera_client::{DakeraClient, BatchQueryRequest, BatchQueryItem};
249    ///
250    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
251    /// let client = DakeraClient::new("http://localhost:3000")?;
252    ///
253    /// let request = BatchQueryRequest::new(vec![
254    ///     BatchQueryItem::new(vec![0.1, 0.2, 0.3], 5).with_id("query1"),
255    ///     BatchQueryItem::new(vec![0.4, 0.5, 0.6], 10).with_id("query2"),
256    /// ]);
257    ///
258    /// let response = client.batch_query("my-namespace", request).await?;
259    /// println!("Executed {} queries in {}ms", response.query_count, response.total_latency_ms);
260    /// # Ok(())
261    /// # }
262    /// ```
263    #[instrument(skip(self, request), fields(namespace = %namespace, query_count = request.queries.len()))]
264    pub async fn batch_query(
265        &self,
266        namespace: &str,
267        request: BatchQueryRequest,
268    ) -> Result<BatchQueryResponse> {
269        let url = format!("{}/v1/namespaces/{}/batch-query", self.base_url, namespace);
270        debug!(
271            "Batch querying namespace {} with {} queries",
272            namespace,
273            request.queries.len()
274        );
275
276        let response = self.client.post(&url).json(&request).send().await?;
277        self.handle_response(response).await
278    }
279
280    /// Delete vectors by ID
281    #[instrument(skip(self, request), fields(id_count = request.ids.len()))]
282    pub async fn delete(&self, namespace: &str, request: DeleteRequest) -> Result<DeleteResponse> {
283        let url = format!(
284            "{}/v1/namespaces/{}/vectors/delete",
285            self.base_url, namespace
286        );
287        debug!("Deleting {} vectors from {}", request.ids.len(), namespace);
288
289        let response = self.client.post(&url).json(&request).send().await?;
290        self.handle_response(response).await
291    }
292
293    /// Delete a single vector by ID (convenience method)
294    #[instrument(skip(self))]
295    pub async fn delete_one(&self, namespace: &str, id: &str) -> Result<DeleteResponse> {
296        self.delete(namespace, DeleteRequest::single(id)).await
297    }
298
299    // ========================================================================
300    // Full-Text Search Operations
301    // ========================================================================
302
303    /// Index documents for full-text search
304    #[instrument(skip(self, request), fields(doc_count = request.documents.len()))]
305    pub async fn index_documents(
306        &self,
307        namespace: &str,
308        request: IndexDocumentsRequest,
309    ) -> Result<IndexDocumentsResponse> {
310        let url = format!(
311            "{}/v1/namespaces/{}/fulltext/index",
312            self.base_url, namespace
313        );
314        debug!(
315            "Indexing {} documents in {}",
316            request.documents.len(),
317            namespace
318        );
319
320        let response = self.client.post(&url).json(&request).send().await?;
321        self.handle_response(response).await
322    }
323
324    /// Index a single document (convenience method)
325    #[instrument(skip(self, document))]
326    pub async fn index_document(
327        &self,
328        namespace: &str,
329        document: Document,
330    ) -> Result<IndexDocumentsResponse> {
331        self.index_documents(
332            namespace,
333            IndexDocumentsRequest {
334                documents: vec![document],
335            },
336        )
337        .await
338    }
339
340    /// Perform full-text search
341    #[instrument(skip(self, request))]
342    pub async fn fulltext_search(
343        &self,
344        namespace: &str,
345        request: FullTextSearchRequest,
346    ) -> Result<FullTextSearchResponse> {
347        let url = format!(
348            "{}/v1/namespaces/{}/fulltext/search",
349            self.base_url, namespace
350        );
351        debug!("Full-text search in {} for: {}", namespace, request.query);
352
353        let response = self.client.post(&url).json(&request).send().await?;
354        self.handle_response(response).await
355    }
356
357    /// Simple full-text search (convenience method)
358    #[instrument(skip(self))]
359    pub async fn search_text(
360        &self,
361        namespace: &str,
362        query: &str,
363        top_k: u32,
364    ) -> Result<FullTextSearchResponse> {
365        self.fulltext_search(namespace, FullTextSearchRequest::new(query, top_k))
366            .await
367    }
368
369    /// Get full-text index statistics
370    #[instrument(skip(self))]
371    pub async fn fulltext_stats(&self, namespace: &str) -> Result<FullTextStats> {
372        let url = format!(
373            "{}/v1/namespaces/{}/fulltext/stats",
374            self.base_url, namespace
375        );
376        let response = self.client.get(&url).send().await?;
377        self.handle_response(response).await
378    }
379
380    /// Delete documents from full-text index
381    #[instrument(skip(self, request))]
382    pub async fn fulltext_delete(
383        &self,
384        namespace: &str,
385        request: DeleteRequest,
386    ) -> Result<DeleteResponse> {
387        let url = format!(
388            "{}/v1/namespaces/{}/fulltext/delete",
389            self.base_url, namespace
390        );
391        let response = self.client.post(&url).json(&request).send().await?;
392        self.handle_response(response).await
393    }
394
395    // ========================================================================
396    // Hybrid Search Operations
397    // ========================================================================
398
399    /// Perform hybrid search (vector + full-text)
400    #[instrument(skip(self, request), fields(top_k = request.top_k))]
401    pub async fn hybrid_search(
402        &self,
403        namespace: &str,
404        request: HybridSearchRequest,
405    ) -> Result<HybridSearchResponse> {
406        let url = format!("{}/v1/namespaces/{}/hybrid", self.base_url, namespace);
407        debug!(
408            "Hybrid search in {} with vector_weight={}",
409            namespace, request.vector_weight
410        );
411
412        let response = self.client.post(&url).json(&request).send().await?;
413        self.handle_response(response).await
414    }
415
416    // ========================================================================
417    // Multi-Vector Search Operations
418    // ========================================================================
419
420    /// Multi-vector search with positive/negative vectors and MMR
421    ///
422    /// This performs semantic search using multiple positive vectors (to search towards)
423    /// and optional negative vectors (to search away from). Supports MMR (Maximal Marginal
424    /// Relevance) for result diversity.
425    ///
426    /// # Example
427    ///
428    /// ```rust,no_run
429    /// use dakera_client::{DakeraClient, MultiVectorSearchRequest};
430    ///
431    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
432    /// let client = DakeraClient::new("http://localhost:3000")?;
433    ///
434    /// // Search towards multiple concepts, away from others
435    /// let request = MultiVectorSearchRequest::new(vec![
436    ///     vec![0.1, 0.2, 0.3],  // positive vector 1
437    ///     vec![0.4, 0.5, 0.6],  // positive vector 2
438    /// ])
439    /// .with_negative_vectors(vec![
440    ///     vec![0.7, 0.8, 0.9],  // negative vector
441    /// ])
442    /// .with_top_k(10)
443    /// .with_mmr(0.7);  // Enable MMR with lambda=0.7
444    ///
445    /// let response = client.multi_vector_search("my-namespace", request).await?;
446    /// for result in response.results {
447    ///     println!("ID: {}, Score: {}", result.id, result.score);
448    /// }
449    /// # Ok(())
450    /// # }
451    /// ```
452    #[instrument(skip(self, request), fields(namespace = %namespace))]
453    pub async fn multi_vector_search(
454        &self,
455        namespace: &str,
456        request: MultiVectorSearchRequest,
457    ) -> Result<MultiVectorSearchResponse> {
458        let url = format!("{}/v1/namespaces/{}/multi-vector", self.base_url, namespace);
459        debug!(
460            "Multi-vector search in {} with {} positive vectors",
461            namespace,
462            request.positive_vectors.len()
463        );
464
465        let response = self.client.post(&url).json(&request).send().await?;
466        self.handle_response(response).await
467    }
468
469    // ========================================================================
470    // Aggregation Operations
471    // ========================================================================
472
473    /// Aggregate vectors with grouping (Turbopuffer-inspired)
474    ///
475    /// This performs aggregation queries on vector metadata, supporting
476    /// count, sum, avg, min, and max operations with optional grouping.
477    ///
478    /// # Example
479    ///
480    /// ```rust,no_run
481    /// use dakera_client::{DakeraClient, AggregationRequest};
482    ///
483    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
484    /// let client = DakeraClient::new("http://localhost:3000")?;
485    ///
486    /// // Count all vectors and sum scores, grouped by category
487    /// let request = AggregationRequest::new()
488    ///     .with_count("total_count")
489    ///     .with_sum("total_score", "score")
490    ///     .with_avg("avg_score", "score")
491    ///     .with_group_by("category");
492    ///
493    /// let response = client.aggregate("my-namespace", request).await?;
494    /// if let Some(groups) = response.aggregation_groups {
495    ///     for group in groups {
496    ///         println!("Group: {:?}", group.group_key);
497    ///     }
498    /// }
499    /// # Ok(())
500    /// # }
501    /// ```
502    #[instrument(skip(self, request), fields(namespace = %namespace))]
503    pub async fn aggregate(
504        &self,
505        namespace: &str,
506        request: AggregationRequest,
507    ) -> Result<AggregationResponse> {
508        let url = format!("{}/v1/namespaces/{}/aggregate", self.base_url, namespace);
509        debug!(
510            "Aggregating in namespace {} with {} aggregations",
511            namespace,
512            request.aggregate_by.len()
513        );
514
515        let response = self.client.post(&url).json(&request).send().await?;
516        self.handle_response(response).await
517    }
518
519    // ========================================================================
520    // Unified Query Operations
521    // ========================================================================
522
523    /// Unified query with flexible ranking options (Turbopuffer-inspired)
524    ///
525    /// This provides a unified API for vector search (ANN/kNN), full-text search (BM25),
526    /// and attribute ordering. Supports combining multiple ranking functions with
527    /// Sum, Max, and Product operators.
528    ///
529    /// # Example
530    ///
531    /// ```rust,no_run
532    /// use dakera_client::{DakeraClient, UnifiedQueryRequest, SortDirection};
533    ///
534    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
535    /// let client = DakeraClient::new("http://localhost:3000")?;
536    ///
537    /// // Vector ANN search
538    /// let request = UnifiedQueryRequest::vector_search(vec![0.1, 0.2, 0.3], 10);
539    /// let response = client.unified_query("my-namespace", request).await?;
540    ///
541    /// // Full-text BM25 search
542    /// let request = UnifiedQueryRequest::fulltext_search("content", "hello world", 10);
543    /// let response = client.unified_query("my-namespace", request).await?;
544    ///
545    /// // Attribute ordering with filter
546    /// let request = UnifiedQueryRequest::attribute_order("timestamp", SortDirection::Desc, 10)
547    ///     .with_filter(serde_json::json!({"category": {"$eq": "science"}}));
548    /// let response = client.unified_query("my-namespace", request).await?;
549    ///
550    /// for result in response.results {
551    ///     println!("ID: {}, Score: {:?}", result.id, result.dist);
552    /// }
553    /// # Ok(())
554    /// # }
555    /// ```
556    #[instrument(skip(self, request), fields(namespace = %namespace))]
557    pub async fn unified_query(
558        &self,
559        namespace: &str,
560        request: UnifiedQueryRequest,
561    ) -> Result<UnifiedQueryResponse> {
562        let url = format!(
563            "{}/v1/namespaces/{}/unified-query",
564            self.base_url, namespace
565        );
566        debug!(
567            "Unified query in namespace {} with top_k={}",
568            namespace, request.top_k
569        );
570
571        let response = self.client.post(&url).json(&request).send().await?;
572        self.handle_response(response).await
573    }
574
575    /// Simple vector search using the unified query API (convenience method)
576    ///
577    /// This is a shortcut for `unified_query` with a vector ANN search.
578    #[instrument(skip(self, vector))]
579    pub async fn unified_vector_search(
580        &self,
581        namespace: &str,
582        vector: Vec<f32>,
583        top_k: usize,
584    ) -> Result<UnifiedQueryResponse> {
585        self.unified_query(namespace, UnifiedQueryRequest::vector_search(vector, top_k))
586            .await
587    }
588
589    /// Simple full-text search using the unified query API (convenience method)
590    ///
591    /// This is a shortcut for `unified_query` with a BM25 full-text search.
592    #[instrument(skip(self))]
593    pub async fn unified_text_search(
594        &self,
595        namespace: &str,
596        field: &str,
597        query: &str,
598        top_k: usize,
599    ) -> Result<UnifiedQueryResponse> {
600        self.unified_query(
601            namespace,
602            UnifiedQueryRequest::fulltext_search(field, query, top_k),
603        )
604        .await
605    }
606
607    // ========================================================================
608    // Query Explain Operations
609    // ========================================================================
610
611    /// Explain query execution plan (similar to SQL EXPLAIN)
612    ///
613    /// This provides detailed information about how a query will be executed,
614    /// including index selection, execution stages, cost estimates, and
615    /// performance recommendations.
616    ///
617    /// # Example
618    ///
619    /// ```rust,no_run
620    /// use dakera_client::{DakeraClient, QueryExplainRequest};
621    ///
622    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
623    /// let client = DakeraClient::new("http://localhost:3000")?;
624    ///
625    /// // Explain a vector search query
626    /// let request = QueryExplainRequest::vector_search(vec![0.1, 0.2, 0.3], 10)
627    ///     .with_verbose();
628    /// let plan = client.explain_query("my-namespace", request).await?;
629    ///
630    /// println!("Query plan: {}", plan.summary);
631    /// println!("Estimated time: {}ms", plan.cost_estimate.estimated_time_ms);
632    ///
633    /// for stage in &plan.stages {
634    ///     println!("Stage {}: {} - {}", stage.order, stage.name, stage.description);
635    /// }
636    ///
637    /// for rec in &plan.recommendations {
638    ///     println!("Recommendation ({}): {}", rec.priority, rec.description);
639    /// }
640    /// # Ok(())
641    /// # }
642    /// ```
643    #[instrument(skip(self, request), fields(namespace = %namespace))]
644    pub async fn explain_query(
645        &self,
646        namespace: &str,
647        request: QueryExplainRequest,
648    ) -> Result<QueryExplainResponse> {
649        let url = format!("{}/v1/namespaces/{}/explain", self.base_url, namespace);
650        debug!(
651            "Explaining query in namespace {} (query_type={:?}, top_k={})",
652            namespace, request.query_type, request.top_k
653        );
654
655        let response = self.client.post(&url).json(&request).send().await?;
656        self.handle_response(response).await
657    }
658
659    // ========================================================================
660    // Cache Warming Operations
661    // ========================================================================
662
663    /// Warm cache for vectors in a namespace
664    ///
665    /// This pre-loads vectors into cache tiers for faster subsequent access.
666    /// Supports priority levels and can run in the background.
667    ///
668    /// # Example
669    ///
670    /// ```rust,no_run
671    /// use dakera_client::{DakeraClient, WarmCacheRequest, WarmingPriority};
672    ///
673    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
674    /// let client = DakeraClient::new("http://localhost:3000")?;
675    ///
676    /// // Warm entire namespace with high priority
677    /// let response = client.warm_cache(
678    ///     WarmCacheRequest::new("my-namespace")
679    ///         .with_priority(WarmingPriority::High)
680    /// ).await?;
681    ///
682    /// println!("Warmed {} entries", response.entries_warmed);
683    /// # Ok(())
684    /// # }
685    /// ```
686    #[instrument(skip(self, request), fields(namespace = %request.namespace, priority = ?request.priority))]
687    pub async fn warm_cache(&self, request: WarmCacheRequest) -> Result<WarmCacheResponse> {
688        let url = format!(
689            "{}/v1/namespaces/{}/cache/warm",
690            self.base_url, request.namespace
691        );
692        debug!(
693            "Warming cache for namespace {} with priority {:?}",
694            request.namespace, request.priority
695        );
696
697        let response = self.client.post(&url).json(&request).send().await?;
698        self.handle_response(response).await
699    }
700
701    /// Warm specific vectors by ID (convenience method)
702    #[instrument(skip(self, vector_ids))]
703    pub async fn warm_vectors(
704        &self,
705        namespace: &str,
706        vector_ids: Vec<String>,
707    ) -> Result<WarmCacheResponse> {
708        self.warm_cache(WarmCacheRequest::new(namespace).with_vector_ids(vector_ids))
709            .await
710    }
711
712    // ========================================================================
713    // Export Operations
714    // ========================================================================
715
716    /// Export vectors from a namespace with pagination
717    ///
718    /// This exports all vectors from a namespace, supporting pagination for
719    /// large datasets. Use the `next_cursor` from the response to fetch
720    /// subsequent pages.
721    ///
722    /// # Example
723    ///
724    /// ```rust,no_run
725    /// use dakera_client::{DakeraClient, ExportRequest};
726    ///
727    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
728    /// let client = DakeraClient::new("http://localhost:3000")?;
729    ///
730    /// // Export first page of vectors
731    /// let mut request = ExportRequest::new().with_top_k(1000);
732    /// let response = client.export("my-namespace", request).await?;
733    ///
734    /// println!("Exported {} vectors", response.returned_count);
735    ///
736    /// // Fetch next page if available
737    /// if let Some(cursor) = response.next_cursor {
738    ///     let next_request = ExportRequest::new().with_cursor(cursor);
739    ///     let next_response = client.export("my-namespace", next_request).await?;
740    /// }
741    /// # Ok(())
742    /// # }
743    /// ```
744    #[instrument(skip(self, request), fields(namespace = %namespace))]
745    pub async fn export(&self, namespace: &str, request: ExportRequest) -> Result<ExportResponse> {
746        let url = format!("{}/v1/namespaces/{}/export", self.base_url, namespace);
747        debug!(
748            "Exporting vectors from namespace {} (top_k={}, cursor={:?})",
749            namespace, request.top_k, request.cursor
750        );
751
752        let response = self.client.post(&url).json(&request).send().await?;
753        self.handle_response(response).await
754    }
755
756    /// Export all vectors from a namespace (convenience method)
757    ///
758    /// This is a simple wrapper that exports with default settings.
759    #[instrument(skip(self))]
760    pub async fn export_all(&self, namespace: &str) -> Result<ExportResponse> {
761        self.export(namespace, ExportRequest::new()).await
762    }
763
764    // ========================================================================
765    // Operations
766    // ========================================================================
767
768    /// Get system diagnostics
769    #[instrument(skip(self))]
770    pub async fn diagnostics(&self) -> Result<SystemDiagnostics> {
771        let url = format!("{}/ops/diagnostics", self.base_url);
772        let response = self.client.get(&url).send().await?;
773        self.handle_response(response).await
774    }
775
776    /// List background jobs
777    #[instrument(skip(self))]
778    pub async fn list_jobs(&self) -> Result<Vec<JobInfo>> {
779        let url = format!("{}/ops/jobs", self.base_url);
780        let response = self.client.get(&url).send().await?;
781        self.handle_response(response).await
782    }
783
784    /// Get a specific job status
785    #[instrument(skip(self))]
786    pub async fn get_job(&self, job_id: &str) -> Result<Option<JobInfo>> {
787        let url = format!("{}/ops/jobs/{}", self.base_url, job_id);
788        let response = self.client.get(&url).send().await?;
789
790        if response.status() == StatusCode::NOT_FOUND {
791            return Ok(None);
792        }
793
794        self.handle_response(response).await.map(Some)
795    }
796
797    /// Trigger index compaction
798    #[instrument(skip(self, request))]
799    pub async fn compact(&self, request: CompactionRequest) -> Result<CompactionResponse> {
800        let url = format!("{}/ops/compact", self.base_url);
801        let response = self.client.post(&url).json(&request).send().await?;
802        self.handle_response(response).await
803    }
804
805    /// Request graceful shutdown
806    #[instrument(skip(self))]
807    pub async fn shutdown(&self) -> Result<()> {
808        let url = format!("{}/ops/shutdown", self.base_url);
809        let response = self.client.post(&url).send().await?;
810
811        if response.status().is_success() {
812            Ok(())
813        } else {
814            let status = response.status().as_u16();
815            let text = response.text().await.unwrap_or_default();
816            Err(ClientError::Server {
817                status,
818                message: text,
819                code: None,
820            })
821        }
822    }
823
824    // ========================================================================
825    // Fetch by ID
826    // ========================================================================
827
828    /// Fetch vectors by their IDs
829    #[instrument(skip(self, request), fields(id_count = request.ids.len()))]
830    pub async fn fetch(&self, namespace: &str, request: FetchRequest) -> Result<FetchResponse> {
831        let url = format!("{}/v1/namespaces/{}/fetch", self.base_url, namespace);
832        debug!("Fetching {} vectors from {}", request.ids.len(), namespace);
833        let response = self.client.post(&url).json(&request).send().await?;
834        self.handle_response(response).await
835    }
836
837    /// Fetch vectors by IDs (convenience method)
838    #[instrument(skip(self))]
839    pub async fn fetch_by_ids(&self, namespace: &str, ids: &[&str]) -> Result<Vec<Vector>> {
840        let request = FetchRequest::new(ids.iter().map(|s| s.to_string()).collect());
841        self.fetch(namespace, request).await.map(|r| r.vectors)
842    }
843
844    // ========================================================================
845    // Text Auto-Embedding Operations
846    // ========================================================================
847
848    /// Upsert text documents with automatic server-side embedding generation
849    #[instrument(skip(self, request), fields(doc_count = request.documents.len()))]
850    pub async fn upsert_text(
851        &self,
852        namespace: &str,
853        request: UpsertTextRequest,
854    ) -> Result<TextUpsertResponse> {
855        let url = format!("{}/v1/namespaces/{}/upsert-text", self.base_url, namespace);
856        debug!(
857            "Upserting {} text documents to {}",
858            request.documents.len(),
859            namespace
860        );
861        let response = self.client.post(&url).json(&request).send().await?;
862        self.handle_response(response).await
863    }
864
865    /// Query using natural language text with automatic server-side embedding
866    #[instrument(skip(self, request), fields(top_k = request.top_k))]
867    pub async fn query_text(
868        &self,
869        namespace: &str,
870        request: QueryTextRequest,
871    ) -> Result<TextQueryResponse> {
872        let url = format!("{}/v1/namespaces/{}/query-text", self.base_url, namespace);
873        debug!("Text query in {} for: {}", namespace, request.text);
874        let response = self.client.post(&url).json(&request).send().await?;
875        self.handle_response(response).await
876    }
877
878    /// Query text (convenience method)
879    #[instrument(skip(self))]
880    pub async fn query_text_simple(
881        &self,
882        namespace: &str,
883        text: &str,
884        top_k: u32,
885    ) -> Result<TextQueryResponse> {
886        self.query_text(namespace, QueryTextRequest::new(text, top_k))
887            .await
888    }
889
890    /// Execute multiple text queries with automatic embedding in a single request
891    #[instrument(skip(self, request), fields(query_count = request.queries.len()))]
892    pub async fn batch_query_text(
893        &self,
894        namespace: &str,
895        request: BatchQueryTextRequest,
896    ) -> Result<BatchQueryTextResponse> {
897        let url = format!(
898            "{}/v1/namespaces/{}/batch-query-text",
899            self.base_url, namespace
900        );
901        debug!(
902            "Batch text query in {} with {} queries",
903            namespace,
904            request.queries.len()
905        );
906        let response = self.client.post(&url).json(&request).send().await?;
907        self.handle_response(response).await
908    }
909
910    // ========================================================================
911    // Private Helpers
912    // ========================================================================
913
914    /// Handle response and deserialize JSON
915    pub(crate) async fn handle_response<T: serde::de::DeserializeOwned>(
916        &self,
917        response: reqwest::Response,
918    ) -> Result<T> {
919        let status = response.status();
920
921        if status.is_success() {
922            Ok(response.json().await?)
923        } else {
924            let status_code = status.as_u16();
925            let text = response.text().await.unwrap_or_default();
926
927            #[derive(Deserialize)]
928            struct ErrorBody {
929                error: Option<String>,
930                code: Option<ServerErrorCode>,
931            }
932
933            let (message, code) = if let Ok(body) = serde_json::from_str::<ErrorBody>(&text) {
934                (body.error.unwrap_or_else(|| text.clone()), body.code)
935            } else {
936                (text, None)
937            };
938
939            match status_code {
940                401 => Err(ClientError::Server {
941                    status: 401,
942                    message,
943                    code,
944                }),
945                403 => Err(ClientError::Authorization {
946                    status: 403,
947                    message,
948                    code,
949                }),
950                404 => match &code {
951                    Some(ServerErrorCode::NamespaceNotFound) => {
952                        Err(ClientError::NamespaceNotFound(message))
953                    }
954                    Some(ServerErrorCode::VectorNotFound) => {
955                        Err(ClientError::VectorNotFound(message))
956                    }
957                    _ => Err(ClientError::Server {
958                        status: 404,
959                        message,
960                        code,
961                    }),
962                },
963                _ => Err(ClientError::Server {
964                    status: status_code,
965                    message,
966                    code,
967                }),
968            }
969        }
970    }
971}
972
973/// Builder for DakeraClient
974#[derive(Debug)]
975pub struct DakeraClientBuilder {
976    base_url: String,
977    timeout: Duration,
978    user_agent: Option<String>,
979}
980
981impl DakeraClientBuilder {
982    /// Create a new builder
983    pub fn new(base_url: impl Into<String>) -> Self {
984        Self {
985            base_url: base_url.into(),
986            timeout: Duration::from_secs(DEFAULT_TIMEOUT_SECS),
987            user_agent: None,
988        }
989    }
990
991    /// Set the request timeout
992    pub fn timeout(mut self, timeout: Duration) -> Self {
993        self.timeout = timeout;
994        self
995    }
996
997    /// Set the request timeout in seconds
998    pub fn timeout_secs(mut self, secs: u64) -> Self {
999        self.timeout = Duration::from_secs(secs);
1000        self
1001    }
1002
1003    /// Set a custom user agent
1004    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
1005        self.user_agent = Some(user_agent.into());
1006        self
1007    }
1008
1009    /// Build the client
1010    pub fn build(self) -> Result<DakeraClient> {
1011        // Normalize base URL (remove trailing slash)
1012        let base_url = self.base_url.trim_end_matches('/').to_string();
1013
1014        // Validate URL
1015        if !base_url.starts_with("http://") && !base_url.starts_with("https://") {
1016            return Err(ClientError::InvalidUrl(
1017                "URL must start with http:// or https://".to_string(),
1018            ));
1019        }
1020
1021        let user_agent = self
1022            .user_agent
1023            .unwrap_or_else(|| format!("dakera-client/{}", env!("CARGO_PKG_VERSION")));
1024
1025        let client = Client::builder()
1026            .timeout(self.timeout)
1027            .user_agent(user_agent)
1028            .build()
1029            .map_err(|e| ClientError::Config(e.to_string()))?;
1030
1031        Ok(DakeraClient { client, base_url })
1032    }
1033}
1034
1035// ============================================================================
1036// SSE Streaming (CE-1)
1037// ============================================================================
1038
1039impl DakeraClient {
1040    /// Subscribe to namespace-scoped SSE events.
1041    ///
1042    /// Opens a long-lived connection to `GET /v1/namespaces/{namespace}/events`
1043    /// and returns a [`tokio::sync::mpsc::Receiver`] that yields
1044    /// [`DakeraEvent`] results as they arrive.  The background task exits when
1045    /// the server closes the stream or the receiver is dropped.
1046    ///
1047    /// Requires a Read-scoped API key.
1048    ///
1049    /// # Example
1050    ///
1051    /// ```rust,no_run
1052    /// use dakera_client::DakeraClient;
1053    ///
1054    /// #[tokio::main]
1055    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1056    ///     let client = DakeraClient::new("http://localhost:3000")?;
1057    ///     let mut rx = client.stream_namespace_events("my-ns").await?;
1058    ///     while let Some(result) = rx.recv().await {
1059    ///         println!("{:?}", result?);
1060    ///     }
1061    ///     Ok(())
1062    /// }
1063    /// ```
1064    pub async fn stream_namespace_events(
1065        &self,
1066        namespace: &str,
1067    ) -> Result<tokio::sync::mpsc::Receiver<Result<crate::events::DakeraEvent>>> {
1068        let url = format!(
1069            "{}/v1/namespaces/{}/events",
1070            self.base_url,
1071            urlencoding::encode(namespace)
1072        );
1073        self.stream_sse(url).await
1074    }
1075
1076    /// Subscribe to the global SSE event stream (all namespaces).
1077    ///
1078    /// Opens a long-lived connection to `GET /ops/events` and returns a
1079    /// [`tokio::sync::mpsc::Receiver`] that yields [`DakeraEvent`] results.
1080    ///
1081    /// Requires an Admin-scoped API key.
1082    pub async fn stream_global_events(
1083        &self,
1084    ) -> Result<tokio::sync::mpsc::Receiver<Result<crate::events::DakeraEvent>>> {
1085        let url = format!("{}/ops/events", self.base_url);
1086        self.stream_sse(url).await
1087    }
1088
1089    /// Subscribe to the memory lifecycle SSE event stream (DASH-B).
1090    ///
1091    /// Opens a long-lived connection to `GET /v1/events/stream` and returns a
1092    /// [`tokio::sync::mpsc::Receiver`] that yields [`MemoryEvent`] results as
1093    /// they arrive.  The background task exits when the server closes the stream
1094    /// or the receiver is dropped.
1095    ///
1096    /// Requires a Read-scoped API key.
1097    pub async fn stream_memory_events(
1098        &self,
1099    ) -> Result<tokio::sync::mpsc::Receiver<Result<crate::events::MemoryEvent>>> {
1100        let url = format!("{}/v1/events/stream", self.base_url);
1101        self.stream_sse(url).await
1102    }
1103
1104    /// Low-level generic SSE streaming helper.
1105    async fn stream_sse<T>(&self, url: String) -> Result<tokio::sync::mpsc::Receiver<Result<T>>>
1106    where
1107        T: serde::de::DeserializeOwned + Send + 'static,
1108    {
1109        use futures_util::StreamExt;
1110
1111        let response = self
1112            .client
1113            .get(&url)
1114            .header("Accept", "text/event-stream")
1115            .header("Cache-Control", "no-cache")
1116            .send()
1117            .await?;
1118
1119        if !response.status().is_success() {
1120            let status = response.status().as_u16();
1121            let body = response.text().await.unwrap_or_default();
1122            return Err(ClientError::Server {
1123                status,
1124                message: body,
1125                code: None,
1126            });
1127        }
1128
1129        let (tx, rx) = tokio::sync::mpsc::channel(64);
1130
1131        tokio::spawn(async move {
1132            let mut byte_stream = response.bytes_stream();
1133            let mut remaining = String::new();
1134            let mut data_lines: Vec<String> = Vec::new();
1135
1136            while let Some(chunk) = byte_stream.next().await {
1137                match chunk {
1138                    Ok(bytes) => {
1139                        remaining.push_str(&String::from_utf8_lossy(&bytes));
1140                        while let Some(pos) = remaining.find('\n') {
1141                            let raw = &remaining[..pos];
1142                            let line = raw.trim_end_matches('\r').to_string();
1143                            remaining = remaining[pos + 1..].to_string();
1144
1145                            if line.starts_with(':') {
1146                                // SSE comment / heartbeat — skip
1147                            } else if let Some(data) = line.strip_prefix("data:") {
1148                                data_lines.push(data.trim_start().to_string());
1149                            } else if line.is_empty() {
1150                                if !data_lines.is_empty() {
1151                                    let payload = data_lines.join("\n");
1152                                    data_lines.clear();
1153                                    let result = serde_json::from_str::<T>(&payload)
1154                                        .map_err(ClientError::Json);
1155                                    if tx.send(result).await.is_err() {
1156                                        return; // receiver dropped
1157                                    }
1158                                }
1159                            } else {
1160                                // Unrecognised field (e.g. "event:") — ignore
1161                            }
1162                        }
1163                    }
1164                    Err(e) => {
1165                        let _ = tx.send(Err(ClientError::Http(e))).await;
1166                        return;
1167                    }
1168                }
1169            }
1170        });
1171
1172        Ok(rx)
1173    }
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178    use super::*;
1179
1180    #[test]
1181    fn test_client_builder() {
1182        let client = DakeraClient::new("http://localhost:3000");
1183        assert!(client.is_ok());
1184    }
1185
1186    #[test]
1187    fn test_client_builder_with_options() {
1188        let client = DakeraClient::builder("http://localhost:3000")
1189            .timeout_secs(60)
1190            .user_agent("test-client/1.0")
1191            .build();
1192        assert!(client.is_ok());
1193    }
1194
1195    #[test]
1196    fn test_client_builder_invalid_url() {
1197        let client = DakeraClient::new("invalid-url");
1198        assert!(client.is_err());
1199    }
1200
1201    #[test]
1202    fn test_client_builder_trailing_slash() {
1203        let client = DakeraClient::new("http://localhost:3000/").unwrap();
1204        assert!(!client.base_url.ends_with('/'));
1205    }
1206
1207    #[test]
1208    fn test_vector_creation() {
1209        let v = Vector::new("test", vec![0.1, 0.2, 0.3]);
1210        assert_eq!(v.id, "test");
1211        assert_eq!(v.values.len(), 3);
1212        assert!(v.metadata.is_none());
1213    }
1214
1215    #[test]
1216    fn test_query_request_builder() {
1217        let req = QueryRequest::new(vec![0.1, 0.2], 10)
1218            .with_filter(serde_json::json!({"category": "test"}))
1219            .include_metadata(false);
1220
1221        assert_eq!(req.top_k, 10);
1222        assert!(req.filter.is_some());
1223        assert!(!req.include_metadata);
1224    }
1225
1226    #[test]
1227    fn test_hybrid_search_request() {
1228        let req = HybridSearchRequest::new(vec![0.1], "test query", 5).with_vector_weight(0.7);
1229
1230        assert_eq!(req.vector_weight, 0.7);
1231        assert_eq!(req.text, "test query");
1232    }
1233
1234    #[test]
1235    fn test_hybrid_search_weight_clamping() {
1236        let req = HybridSearchRequest::new(vec![0.1], "test", 5).with_vector_weight(1.5); // Should be clamped to 1.0
1237
1238        assert_eq!(req.vector_weight, 1.0);
1239    }
1240
1241    #[test]
1242    fn test_text_document_builder() {
1243        let doc = TextDocument::new("doc1", "Hello world").with_ttl(3600);
1244
1245        assert_eq!(doc.id, "doc1");
1246        assert_eq!(doc.text, "Hello world");
1247        assert_eq!(doc.ttl_seconds, Some(3600));
1248        assert!(doc.metadata.is_none());
1249    }
1250
1251    #[test]
1252    fn test_upsert_text_request_builder() {
1253        let docs = vec![
1254            TextDocument::new("doc1", "Hello"),
1255            TextDocument::new("doc2", "World"),
1256        ];
1257        let req = UpsertTextRequest::new(docs).with_model(EmbeddingModel::BgeSmall);
1258
1259        assert_eq!(req.documents.len(), 2);
1260        assert_eq!(req.model, Some(EmbeddingModel::BgeSmall));
1261    }
1262
1263    #[test]
1264    fn test_query_text_request_builder() {
1265        let req = QueryTextRequest::new("semantic search query", 5)
1266            .with_filter(serde_json::json!({"category": "docs"}))
1267            .include_vectors(true)
1268            .with_model(EmbeddingModel::E5Small);
1269
1270        assert_eq!(req.text, "semantic search query");
1271        assert_eq!(req.top_k, 5);
1272        assert!(req.filter.is_some());
1273        assert!(req.include_vectors);
1274        assert_eq!(req.model, Some(EmbeddingModel::E5Small));
1275    }
1276
1277    #[test]
1278    fn test_fetch_request_builder() {
1279        let req = FetchRequest::new(vec!["id1".to_string(), "id2".to_string()]);
1280
1281        assert_eq!(req.ids.len(), 2);
1282        assert!(req.include_values);
1283        assert!(req.include_metadata);
1284    }
1285
1286    #[test]
1287    fn test_create_namespace_request_builder() {
1288        let req = CreateNamespaceRequest::new()
1289            .with_dimensions(384)
1290            .with_index_type("hnsw");
1291
1292        assert_eq!(req.dimensions, Some(384));
1293        assert_eq!(req.index_type.as_deref(), Some("hnsw"));
1294    }
1295
1296    #[test]
1297    fn test_batch_query_text_request() {
1298        let req =
1299            BatchQueryTextRequest::new(vec!["query one".to_string(), "query two".to_string()], 10);
1300
1301        assert_eq!(req.queries.len(), 2);
1302        assert_eq!(req.top_k, 10);
1303        assert!(!req.include_vectors);
1304        assert!(req.model.is_none());
1305    }
1306}