Skip to main content

dakera_client/
client.rs

1//! Dakera client implementation
2
3use reqwest::{Client, StatusCode};
4use std::time::Duration;
5use tracing::{debug, instrument};
6
7use crate::error::{ClientError, Result};
8use crate::types::*;
9
10/// Default timeout for requests
11const DEFAULT_TIMEOUT_SECS: u64 = 30;
12
13/// Dakeraclient for interacting with the vector database
14#[derive(Debug, Clone)]
15pub struct DakeraClient {
16    /// HTTP client
17    pub(crate) client: Client,
18    /// Base URL of the Dakera server
19    pub(crate) base_url: String,
20}
21
22impl DakeraClient {
23    /// Create a new client with the given base URL
24    ///
25    /// # Example
26    ///
27    /// ```rust,no_run
28    /// use dakera_client::DakeraClient;
29    ///
30    /// let client = DakeraClient::new("http://localhost:3000").unwrap();
31    /// ```
32    pub fn new(base_url: impl Into<String>) -> Result<Self> {
33        DakeraClientBuilder::new(base_url).build()
34    }
35
36    /// Create a new client builder for more configuration options
37    pub fn builder(base_url: impl Into<String>) -> DakeraClientBuilder {
38        DakeraClientBuilder::new(base_url)
39    }
40
41    // ========================================================================
42    // Health & Status
43    // ========================================================================
44
45    /// Check server health
46    #[instrument(skip(self))]
47    pub async fn health(&self) -> Result<HealthResponse> {
48        let url = format!("{}/health", self.base_url);
49        let response = self.client.get(&url).send().await?;
50
51        if response.status().is_success() {
52            Ok(response.json().await?)
53        } else {
54            // Health endpoint might return simple OK
55            Ok(HealthResponse {
56                healthy: true,
57                version: None,
58                uptime_seconds: None,
59            })
60        }
61    }
62
63    /// Check if server is ready
64    #[instrument(skip(self))]
65    pub async fn ready(&self) -> Result<ReadinessResponse> {
66        let url = format!("{}/health/ready", self.base_url);
67        let response = self.client.get(&url).send().await?;
68
69        if response.status().is_success() {
70            Ok(response.json().await?)
71        } else {
72            Ok(ReadinessResponse {
73                ready: false,
74                components: None,
75            })
76        }
77    }
78
79    /// Check if server is live
80    #[instrument(skip(self))]
81    pub async fn live(&self) -> Result<bool> {
82        let url = format!("{}/health/live", self.base_url);
83        let response = self.client.get(&url).send().await?;
84        Ok(response.status().is_success())
85    }
86
87    // ========================================================================
88    // Namespace Operations
89    // ========================================================================
90
91    /// List all namespaces
92    #[instrument(skip(self))]
93    pub async fn list_namespaces(&self) -> Result<Vec<String>> {
94        let url = format!("{}/v1/namespaces", self.base_url);
95        let response = self.client.get(&url).send().await?;
96        self.handle_response::<ListNamespacesResponse>(response)
97            .await
98            .map(|r| r.namespaces)
99    }
100
101    /// Get namespace information
102    #[instrument(skip(self))]
103    pub async fn get_namespace(&self, namespace: &str) -> Result<NamespaceInfo> {
104        let url = format!("{}/v1/namespaces/{}", self.base_url, namespace);
105        let response = self.client.get(&url).send().await?;
106        self.handle_response(response).await
107    }
108
109    /// Create a new namespace
110    #[instrument(skip(self, request))]
111    pub async fn create_namespace(
112        &self,
113        namespace: &str,
114        request: CreateNamespaceRequest,
115    ) -> Result<NamespaceInfo> {
116        let url = format!("{}/v1/namespaces/{}", self.base_url, namespace);
117        let response = self.client.post(&url).json(&request).send().await?;
118        self.handle_response(response).await
119    }
120
121    /// Create or update a namespace configuration (upsert semantics — v0.6.0).
122    ///
123    /// Creates the namespace if it does not exist, or updates its distance-metric
124    /// configuration if it already exists.  Dimension changes are rejected to
125    /// prevent silent data corruption.  Requires `Scope::Write`.
126    #[instrument(skip(self, request), fields(namespace = %namespace))]
127    pub async fn configure_namespace(
128        &self,
129        namespace: &str,
130        request: ConfigureNamespaceRequest,
131    ) -> Result<ConfigureNamespaceResponse> {
132        let url = format!("{}/v1/namespaces/{}", self.base_url, namespace);
133        let response = self.client.put(&url).json(&request).send().await?;
134        self.handle_response(response).await
135    }
136
137    // ========================================================================
138    // Vector Operations
139    // ========================================================================
140
141    /// Upsert vectors into a namespace
142    #[instrument(skip(self, request), fields(vector_count = request.vectors.len()))]
143    pub async fn upsert(&self, namespace: &str, request: UpsertRequest) -> Result<UpsertResponse> {
144        let url = format!("{}/v1/namespaces/{}/vectors", self.base_url, namespace);
145        debug!(
146            "Upserting {} vectors to {}",
147            request.vectors.len(),
148            namespace
149        );
150
151        let response = self.client.post(&url).json(&request).send().await?;
152        self.handle_response(response).await
153    }
154
155    /// Upsert a single vector (convenience method)
156    #[instrument(skip(self, vector))]
157    pub async fn upsert_one(&self, namespace: &str, vector: Vector) -> Result<UpsertResponse> {
158        self.upsert(namespace, UpsertRequest::single(vector)).await
159    }
160
161    /// Upsert vectors in column format (Turbopuffer-inspired)
162    ///
163    /// This format is more efficient for bulk upserts as it avoids repeating
164    /// field names for each vector. All arrays must have equal length.
165    ///
166    /// # Example
167    ///
168    /// ```rust,no_run
169    /// use dakera_client::{DakeraClient, ColumnUpsertRequest};
170    ///
171    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
172    /// let client = DakeraClient::new("http://localhost:3000")?;
173    ///
174    /// let request = ColumnUpsertRequest::new(
175    ///     vec!["id1".to_string(), "id2".to_string(), "id3".to_string()],
176    ///     vec![
177    ///         vec![0.1, 0.2, 0.3],
178    ///         vec![0.4, 0.5, 0.6],
179    ///         vec![0.7, 0.8, 0.9],
180    ///     ],
181    /// )
182    /// .with_attribute("category", vec![
183    ///     serde_json::json!("A"),
184    ///     serde_json::json!("B"),
185    ///     serde_json::json!("A"),
186    /// ]);
187    ///
188    /// let response = client.upsert_columns("my-namespace", request).await?;
189    /// println!("Upserted {} vectors", response.upserted_count);
190    /// # Ok(())
191    /// # }
192    /// ```
193    #[instrument(skip(self, request), fields(namespace = %namespace, count = request.ids.len()))]
194    pub async fn upsert_columns(
195        &self,
196        namespace: &str,
197        request: ColumnUpsertRequest,
198    ) -> Result<UpsertResponse> {
199        let url = format!(
200            "{}/v1/namespaces/{}/upsert-columns",
201            self.base_url, namespace
202        );
203        debug!(
204            "Upserting {} vectors in column format to {}",
205            request.ids.len(),
206            namespace
207        );
208
209        let response = self.client.post(&url).json(&request).send().await?;
210        self.handle_response(response).await
211    }
212
213    /// Query for similar vectors
214    #[instrument(skip(self, request), fields(top_k = request.top_k))]
215    pub async fn query(&self, namespace: &str, request: QueryRequest) -> Result<QueryResponse> {
216        let url = format!("{}/v1/namespaces/{}/query", self.base_url, namespace);
217        debug!(
218            "Querying namespace {} for top {} results",
219            namespace, request.top_k
220        );
221
222        let response = self.client.post(&url).json(&request).send().await?;
223        self.handle_response(response).await
224    }
225
226    /// Simple query with just a vector and top_k (convenience method)
227    #[instrument(skip(self, vector))]
228    pub async fn query_simple(
229        &self,
230        namespace: &str,
231        vector: Vec<f32>,
232        top_k: u32,
233    ) -> Result<QueryResponse> {
234        self.query(namespace, QueryRequest::new(vector, top_k))
235            .await
236    }
237
238    /// Execute multiple queries in a single request
239    ///
240    /// This allows executing multiple vector similarity queries in parallel,
241    /// which is more efficient than making separate requests.
242    ///
243    /// # Example
244    ///
245    /// ```rust,no_run
246    /// use dakera_client::{DakeraClient, BatchQueryRequest, BatchQueryItem};
247    ///
248    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
249    /// let client = DakeraClient::new("http://localhost:3000")?;
250    ///
251    /// let request = BatchQueryRequest::new(vec![
252    ///     BatchQueryItem::new(vec![0.1, 0.2, 0.3], 5).with_id("query1"),
253    ///     BatchQueryItem::new(vec![0.4, 0.5, 0.6], 10).with_id("query2"),
254    /// ]);
255    ///
256    /// let response = client.batch_query("my-namespace", request).await?;
257    /// println!("Executed {} queries in {}ms", response.query_count, response.total_latency_ms);
258    /// # Ok(())
259    /// # }
260    /// ```
261    #[instrument(skip(self, request), fields(namespace = %namespace, query_count = request.queries.len()))]
262    pub async fn batch_query(
263        &self,
264        namespace: &str,
265        request: BatchQueryRequest,
266    ) -> Result<BatchQueryResponse> {
267        let url = format!("{}/v1/namespaces/{}/batch-query", self.base_url, namespace);
268        debug!(
269            "Batch querying namespace {} with {} queries",
270            namespace,
271            request.queries.len()
272        );
273
274        let response = self.client.post(&url).json(&request).send().await?;
275        self.handle_response(response).await
276    }
277
278    /// Delete vectors by ID
279    #[instrument(skip(self, request), fields(id_count = request.ids.len()))]
280    pub async fn delete(&self, namespace: &str, request: DeleteRequest) -> Result<DeleteResponse> {
281        let url = format!(
282            "{}/v1/namespaces/{}/vectors/delete",
283            self.base_url, namespace
284        );
285        debug!("Deleting {} vectors from {}", request.ids.len(), namespace);
286
287        let response = self.client.post(&url).json(&request).send().await?;
288        self.handle_response(response).await
289    }
290
291    /// Delete a single vector by ID (convenience method)
292    #[instrument(skip(self))]
293    pub async fn delete_one(&self, namespace: &str, id: &str) -> Result<DeleteResponse> {
294        self.delete(namespace, DeleteRequest::single(id)).await
295    }
296
297    // ========================================================================
298    // Full-Text Search Operations
299    // ========================================================================
300
301    /// Index documents for full-text search
302    #[instrument(skip(self, request), fields(doc_count = request.documents.len()))]
303    pub async fn index_documents(
304        &self,
305        namespace: &str,
306        request: IndexDocumentsRequest,
307    ) -> Result<IndexDocumentsResponse> {
308        let url = format!(
309            "{}/v1/namespaces/{}/fulltext/index",
310            self.base_url, namespace
311        );
312        debug!(
313            "Indexing {} documents in {}",
314            request.documents.len(),
315            namespace
316        );
317
318        let response = self.client.post(&url).json(&request).send().await?;
319        self.handle_response(response).await
320    }
321
322    /// Index a single document (convenience method)
323    #[instrument(skip(self, document))]
324    pub async fn index_document(
325        &self,
326        namespace: &str,
327        document: Document,
328    ) -> Result<IndexDocumentsResponse> {
329        self.index_documents(
330            namespace,
331            IndexDocumentsRequest {
332                documents: vec![document],
333            },
334        )
335        .await
336    }
337
338    /// Perform full-text search
339    #[instrument(skip(self, request))]
340    pub async fn fulltext_search(
341        &self,
342        namespace: &str,
343        request: FullTextSearchRequest,
344    ) -> Result<FullTextSearchResponse> {
345        let url = format!(
346            "{}/v1/namespaces/{}/fulltext/search",
347            self.base_url, namespace
348        );
349        debug!("Full-text search in {} for: {}", namespace, request.query);
350
351        let response = self.client.post(&url).json(&request).send().await?;
352        self.handle_response(response).await
353    }
354
355    /// Simple full-text search (convenience method)
356    #[instrument(skip(self))]
357    pub async fn search_text(
358        &self,
359        namespace: &str,
360        query: &str,
361        top_k: u32,
362    ) -> Result<FullTextSearchResponse> {
363        self.fulltext_search(namespace, FullTextSearchRequest::new(query, top_k))
364            .await
365    }
366
367    /// Get full-text index statistics
368    #[instrument(skip(self))]
369    pub async fn fulltext_stats(&self, namespace: &str) -> Result<FullTextStats> {
370        let url = format!(
371            "{}/v1/namespaces/{}/fulltext/stats",
372            self.base_url, namespace
373        );
374        let response = self.client.get(&url).send().await?;
375        self.handle_response(response).await
376    }
377
378    /// Delete documents from full-text index
379    #[instrument(skip(self, request))]
380    pub async fn fulltext_delete(
381        &self,
382        namespace: &str,
383        request: DeleteRequest,
384    ) -> Result<DeleteResponse> {
385        let url = format!(
386            "{}/v1/namespaces/{}/fulltext/delete",
387            self.base_url, namespace
388        );
389        let response = self.client.post(&url).json(&request).send().await?;
390        self.handle_response(response).await
391    }
392
393    // ========================================================================
394    // Hybrid Search Operations
395    // ========================================================================
396
397    /// Perform hybrid search (vector + full-text)
398    #[instrument(skip(self, request), fields(top_k = request.top_k))]
399    pub async fn hybrid_search(
400        &self,
401        namespace: &str,
402        request: HybridSearchRequest,
403    ) -> Result<HybridSearchResponse> {
404        let url = format!("{}/v1/namespaces/{}/hybrid", self.base_url, namespace);
405        debug!(
406            "Hybrid search in {} with vector_weight={}",
407            namespace, request.vector_weight
408        );
409
410        let response = self.client.post(&url).json(&request).send().await?;
411        self.handle_response(response).await
412    }
413
414    // ========================================================================
415    // Multi-Vector Search Operations
416    // ========================================================================
417
418    /// Multi-vector search with positive/negative vectors and MMR
419    ///
420    /// This performs semantic search using multiple positive vectors (to search towards)
421    /// and optional negative vectors (to search away from). Supports MMR (Maximal Marginal
422    /// Relevance) for result diversity.
423    ///
424    /// # Example
425    ///
426    /// ```rust,no_run
427    /// use dakera_client::{DakeraClient, MultiVectorSearchRequest};
428    ///
429    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
430    /// let client = DakeraClient::new("http://localhost:3000")?;
431    ///
432    /// // Search towards multiple concepts, away from others
433    /// let request = MultiVectorSearchRequest::new(vec![
434    ///     vec![0.1, 0.2, 0.3],  // positive vector 1
435    ///     vec![0.4, 0.5, 0.6],  // positive vector 2
436    /// ])
437    /// .with_negative_vectors(vec![
438    ///     vec![0.7, 0.8, 0.9],  // negative vector
439    /// ])
440    /// .with_top_k(10)
441    /// .with_mmr(0.7);  // Enable MMR with lambda=0.7
442    ///
443    /// let response = client.multi_vector_search("my-namespace", request).await?;
444    /// for result in response.results {
445    ///     println!("ID: {}, Score: {}", result.id, result.score);
446    /// }
447    /// # Ok(())
448    /// # }
449    /// ```
450    #[instrument(skip(self, request), fields(namespace = %namespace))]
451    pub async fn multi_vector_search(
452        &self,
453        namespace: &str,
454        request: MultiVectorSearchRequest,
455    ) -> Result<MultiVectorSearchResponse> {
456        let url = format!("{}/v1/namespaces/{}/multi-vector", self.base_url, namespace);
457        debug!(
458            "Multi-vector search in {} with {} positive vectors",
459            namespace,
460            request.positive_vectors.len()
461        );
462
463        let response = self.client.post(&url).json(&request).send().await?;
464        self.handle_response(response).await
465    }
466
467    // ========================================================================
468    // Aggregation Operations
469    // ========================================================================
470
471    /// Aggregate vectors with grouping (Turbopuffer-inspired)
472    ///
473    /// This performs aggregation queries on vector metadata, supporting
474    /// count, sum, avg, min, and max operations with optional grouping.
475    ///
476    /// # Example
477    ///
478    /// ```rust,no_run
479    /// use dakera_client::{DakeraClient, AggregationRequest};
480    ///
481    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
482    /// let client = DakeraClient::new("http://localhost:3000")?;
483    ///
484    /// // Count all vectors and sum scores, grouped by category
485    /// let request = AggregationRequest::new()
486    ///     .with_count("total_count")
487    ///     .with_sum("total_score", "score")
488    ///     .with_avg("avg_score", "score")
489    ///     .with_group_by("category");
490    ///
491    /// let response = client.aggregate("my-namespace", request).await?;
492    /// if let Some(groups) = response.aggregation_groups {
493    ///     for group in groups {
494    ///         println!("Group: {:?}", group.group_key);
495    ///     }
496    /// }
497    /// # Ok(())
498    /// # }
499    /// ```
500    #[instrument(skip(self, request), fields(namespace = %namespace))]
501    pub async fn aggregate(
502        &self,
503        namespace: &str,
504        request: AggregationRequest,
505    ) -> Result<AggregationResponse> {
506        let url = format!("{}/v1/namespaces/{}/aggregate", self.base_url, namespace);
507        debug!(
508            "Aggregating in namespace {} with {} aggregations",
509            namespace,
510            request.aggregate_by.len()
511        );
512
513        let response = self.client.post(&url).json(&request).send().await?;
514        self.handle_response(response).await
515    }
516
517    // ========================================================================
518    // Unified Query Operations
519    // ========================================================================
520
521    /// Unified query with flexible ranking options (Turbopuffer-inspired)
522    ///
523    /// This provides a unified API for vector search (ANN/kNN), full-text search (BM25),
524    /// and attribute ordering. Supports combining multiple ranking functions with
525    /// Sum, Max, and Product operators.
526    ///
527    /// # Example
528    ///
529    /// ```rust,no_run
530    /// use dakera_client::{DakeraClient, UnifiedQueryRequest, SortDirection};
531    ///
532    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
533    /// let client = DakeraClient::new("http://localhost:3000")?;
534    ///
535    /// // Vector ANN search
536    /// let request = UnifiedQueryRequest::vector_search(vec![0.1, 0.2, 0.3], 10);
537    /// let response = client.unified_query("my-namespace", request).await?;
538    ///
539    /// // Full-text BM25 search
540    /// let request = UnifiedQueryRequest::fulltext_search("content", "hello world", 10);
541    /// let response = client.unified_query("my-namespace", request).await?;
542    ///
543    /// // Attribute ordering with filter
544    /// let request = UnifiedQueryRequest::attribute_order("timestamp", SortDirection::Desc, 10)
545    ///     .with_filter(serde_json::json!({"category": {"$eq": "science"}}));
546    /// let response = client.unified_query("my-namespace", request).await?;
547    ///
548    /// for result in response.results {
549    ///     println!("ID: {}, Score: {:?}", result.id, result.dist);
550    /// }
551    /// # Ok(())
552    /// # }
553    /// ```
554    #[instrument(skip(self, request), fields(namespace = %namespace))]
555    pub async fn unified_query(
556        &self,
557        namespace: &str,
558        request: UnifiedQueryRequest,
559    ) -> Result<UnifiedQueryResponse> {
560        let url = format!(
561            "{}/v1/namespaces/{}/unified-query",
562            self.base_url, namespace
563        );
564        debug!(
565            "Unified query in namespace {} with top_k={}",
566            namespace, request.top_k
567        );
568
569        let response = self.client.post(&url).json(&request).send().await?;
570        self.handle_response(response).await
571    }
572
573    /// Simple vector search using the unified query API (convenience method)
574    ///
575    /// This is a shortcut for `unified_query` with a vector ANN search.
576    #[instrument(skip(self, vector))]
577    pub async fn unified_vector_search(
578        &self,
579        namespace: &str,
580        vector: Vec<f32>,
581        top_k: usize,
582    ) -> Result<UnifiedQueryResponse> {
583        self.unified_query(namespace, UnifiedQueryRequest::vector_search(vector, top_k))
584            .await
585    }
586
587    /// Simple full-text search using the unified query API (convenience method)
588    ///
589    /// This is a shortcut for `unified_query` with a BM25 full-text search.
590    #[instrument(skip(self))]
591    pub async fn unified_text_search(
592        &self,
593        namespace: &str,
594        field: &str,
595        query: &str,
596        top_k: usize,
597    ) -> Result<UnifiedQueryResponse> {
598        self.unified_query(
599            namespace,
600            UnifiedQueryRequest::fulltext_search(field, query, top_k),
601        )
602        .await
603    }
604
605    // ========================================================================
606    // Query Explain Operations
607    // ========================================================================
608
609    /// Explain query execution plan (similar to SQL EXPLAIN)
610    ///
611    /// This provides detailed information about how a query will be executed,
612    /// including index selection, execution stages, cost estimates, and
613    /// performance recommendations.
614    ///
615    /// # Example
616    ///
617    /// ```rust,no_run
618    /// use dakera_client::{DakeraClient, QueryExplainRequest};
619    ///
620    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
621    /// let client = DakeraClient::new("http://localhost:3000")?;
622    ///
623    /// // Explain a vector search query
624    /// let request = QueryExplainRequest::vector_search(vec![0.1, 0.2, 0.3], 10)
625    ///     .with_verbose();
626    /// let plan = client.explain_query("my-namespace", request).await?;
627    ///
628    /// println!("Query plan: {}", plan.summary);
629    /// println!("Estimated time: {}ms", plan.cost_estimate.estimated_time_ms);
630    ///
631    /// for stage in &plan.stages {
632    ///     println!("Stage {}: {} - {}", stage.order, stage.name, stage.description);
633    /// }
634    ///
635    /// for rec in &plan.recommendations {
636    ///     println!("Recommendation ({}): {}", rec.priority, rec.description);
637    /// }
638    /// # Ok(())
639    /// # }
640    /// ```
641    #[instrument(skip(self, request), fields(namespace = %namespace))]
642    pub async fn explain_query(
643        &self,
644        namespace: &str,
645        request: QueryExplainRequest,
646    ) -> Result<QueryExplainResponse> {
647        let url = format!("{}/v1/namespaces/{}/explain", self.base_url, namespace);
648        debug!(
649            "Explaining query in namespace {} (query_type={:?}, top_k={})",
650            namespace, request.query_type, request.top_k
651        );
652
653        let response = self.client.post(&url).json(&request).send().await?;
654        self.handle_response(response).await
655    }
656
657    // ========================================================================
658    // Cache Warming Operations
659    // ========================================================================
660
661    /// Warm cache for vectors in a namespace
662    ///
663    /// This pre-loads vectors into cache tiers for faster subsequent access.
664    /// Supports priority levels and can run in the background.
665    ///
666    /// # Example
667    ///
668    /// ```rust,no_run
669    /// use dakera_client::{DakeraClient, WarmCacheRequest, WarmingPriority};
670    ///
671    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
672    /// let client = DakeraClient::new("http://localhost:3000")?;
673    ///
674    /// // Warm entire namespace with high priority
675    /// let response = client.warm_cache(
676    ///     WarmCacheRequest::new("my-namespace")
677    ///         .with_priority(WarmingPriority::High)
678    /// ).await?;
679    ///
680    /// println!("Warmed {} entries", response.entries_warmed);
681    /// # Ok(())
682    /// # }
683    /// ```
684    #[instrument(skip(self, request), fields(namespace = %request.namespace, priority = ?request.priority))]
685    pub async fn warm_cache(&self, request: WarmCacheRequest) -> Result<WarmCacheResponse> {
686        let url = format!(
687            "{}/v1/namespaces/{}/cache/warm",
688            self.base_url, request.namespace
689        );
690        debug!(
691            "Warming cache for namespace {} with priority {:?}",
692            request.namespace, request.priority
693        );
694
695        let response = self.client.post(&url).json(&request).send().await?;
696        self.handle_response(response).await
697    }
698
699    /// Warm specific vectors by ID (convenience method)
700    #[instrument(skip(self, vector_ids))]
701    pub async fn warm_vectors(
702        &self,
703        namespace: &str,
704        vector_ids: Vec<String>,
705    ) -> Result<WarmCacheResponse> {
706        self.warm_cache(WarmCacheRequest::new(namespace).with_vector_ids(vector_ids))
707            .await
708    }
709
710    // ========================================================================
711    // Export Operations
712    // ========================================================================
713
714    /// Export vectors from a namespace with pagination
715    ///
716    /// This exports all vectors from a namespace, supporting pagination for
717    /// large datasets. Use the `next_cursor` from the response to fetch
718    /// subsequent pages.
719    ///
720    /// # Example
721    ///
722    /// ```rust,no_run
723    /// use dakera_client::{DakeraClient, ExportRequest};
724    ///
725    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
726    /// let client = DakeraClient::new("http://localhost:3000")?;
727    ///
728    /// // Export first page of vectors
729    /// let mut request = ExportRequest::new().with_top_k(1000);
730    /// let response = client.export("my-namespace", request).await?;
731    ///
732    /// println!("Exported {} vectors", response.returned_count);
733    ///
734    /// // Fetch next page if available
735    /// if let Some(cursor) = response.next_cursor {
736    ///     let next_request = ExportRequest::new().with_cursor(cursor);
737    ///     let next_response = client.export("my-namespace", next_request).await?;
738    /// }
739    /// # Ok(())
740    /// # }
741    /// ```
742    #[instrument(skip(self, request), fields(namespace = %namespace))]
743    pub async fn export(&self, namespace: &str, request: ExportRequest) -> Result<ExportResponse> {
744        let url = format!("{}/v1/namespaces/{}/export", self.base_url, namespace);
745        debug!(
746            "Exporting vectors from namespace {} (top_k={}, cursor={:?})",
747            namespace, request.top_k, request.cursor
748        );
749
750        let response = self.client.post(&url).json(&request).send().await?;
751        self.handle_response(response).await
752    }
753
754    /// Export all vectors from a namespace (convenience method)
755    ///
756    /// This is a simple wrapper that exports with default settings.
757    #[instrument(skip(self))]
758    pub async fn export_all(&self, namespace: &str) -> Result<ExportResponse> {
759        self.export(namespace, ExportRequest::new()).await
760    }
761
762    // ========================================================================
763    // Operations
764    // ========================================================================
765
766    /// Get system diagnostics
767    #[instrument(skip(self))]
768    pub async fn diagnostics(&self) -> Result<SystemDiagnostics> {
769        let url = format!("{}/ops/diagnostics", self.base_url);
770        let response = self.client.get(&url).send().await?;
771        self.handle_response(response).await
772    }
773
774    /// List background jobs
775    #[instrument(skip(self))]
776    pub async fn list_jobs(&self) -> Result<Vec<JobInfo>> {
777        let url = format!("{}/ops/jobs", self.base_url);
778        let response = self.client.get(&url).send().await?;
779        self.handle_response(response).await
780    }
781
782    /// Get a specific job status
783    #[instrument(skip(self))]
784    pub async fn get_job(&self, job_id: &str) -> Result<Option<JobInfo>> {
785        let url = format!("{}/ops/jobs/{}", self.base_url, job_id);
786        let response = self.client.get(&url).send().await?;
787
788        if response.status() == StatusCode::NOT_FOUND {
789            return Ok(None);
790        }
791
792        self.handle_response(response).await.map(Some)
793    }
794
795    /// Trigger index compaction
796    #[instrument(skip(self, request))]
797    pub async fn compact(&self, request: CompactionRequest) -> Result<CompactionResponse> {
798        let url = format!("{}/ops/compact", self.base_url);
799        let response = self.client.post(&url).json(&request).send().await?;
800        self.handle_response(response).await
801    }
802
803    /// Request graceful shutdown
804    #[instrument(skip(self))]
805    pub async fn shutdown(&self) -> Result<()> {
806        let url = format!("{}/ops/shutdown", self.base_url);
807        let response = self.client.post(&url).send().await?;
808
809        if response.status().is_success() {
810            Ok(())
811        } else {
812            let status = response.status().as_u16();
813            let text = response.text().await.unwrap_or_default();
814            Err(ClientError::Server {
815                status,
816                message: text,
817            })
818        }
819    }
820
821    // ========================================================================
822    // Fetch by ID
823    // ========================================================================
824
825    /// Fetch vectors by their IDs
826    #[instrument(skip(self, request), fields(id_count = request.ids.len()))]
827    pub async fn fetch(&self, namespace: &str, request: FetchRequest) -> Result<FetchResponse> {
828        let url = format!("{}/v1/namespaces/{}/fetch", self.base_url, namespace);
829        debug!("Fetching {} vectors from {}", request.ids.len(), namespace);
830        let response = self.client.post(&url).json(&request).send().await?;
831        self.handle_response(response).await
832    }
833
834    /// Fetch vectors by IDs (convenience method)
835    #[instrument(skip(self))]
836    pub async fn fetch_by_ids(&self, namespace: &str, ids: &[&str]) -> Result<Vec<Vector>> {
837        let request = FetchRequest::new(ids.iter().map(|s| s.to_string()).collect());
838        self.fetch(namespace, request).await.map(|r| r.vectors)
839    }
840
841    // ========================================================================
842    // Text Auto-Embedding Operations
843    // ========================================================================
844
845    /// Upsert text documents with automatic server-side embedding generation
846    #[instrument(skip(self, request), fields(doc_count = request.documents.len()))]
847    pub async fn upsert_text(
848        &self,
849        namespace: &str,
850        request: UpsertTextRequest,
851    ) -> Result<TextUpsertResponse> {
852        let url = format!("{}/v1/namespaces/{}/upsert-text", self.base_url, namespace);
853        debug!(
854            "Upserting {} text documents to {}",
855            request.documents.len(),
856            namespace
857        );
858        let response = self.client.post(&url).json(&request).send().await?;
859        self.handle_response(response).await
860    }
861
862    /// Query using natural language text with automatic server-side embedding
863    #[instrument(skip(self, request), fields(top_k = request.top_k))]
864    pub async fn query_text(
865        &self,
866        namespace: &str,
867        request: QueryTextRequest,
868    ) -> Result<TextQueryResponse> {
869        let url = format!("{}/v1/namespaces/{}/query-text", self.base_url, namespace);
870        debug!("Text query in {} for: {}", namespace, request.text);
871        let response = self.client.post(&url).json(&request).send().await?;
872        self.handle_response(response).await
873    }
874
875    /// Query text (convenience method)
876    #[instrument(skip(self))]
877    pub async fn query_text_simple(
878        &self,
879        namespace: &str,
880        text: &str,
881        top_k: u32,
882    ) -> Result<TextQueryResponse> {
883        self.query_text(namespace, QueryTextRequest::new(text, top_k))
884            .await
885    }
886
887    /// Execute multiple text queries with automatic embedding in a single request
888    #[instrument(skip(self, request), fields(query_count = request.queries.len()))]
889    pub async fn batch_query_text(
890        &self,
891        namespace: &str,
892        request: BatchQueryTextRequest,
893    ) -> Result<BatchQueryTextResponse> {
894        let url = format!(
895            "{}/v1/namespaces/{}/batch-query-text",
896            self.base_url, namespace
897        );
898        debug!(
899            "Batch text query in {} with {} queries",
900            namespace,
901            request.queries.len()
902        );
903        let response = self.client.post(&url).json(&request).send().await?;
904        self.handle_response(response).await
905    }
906
907    // ========================================================================
908    // Private Helpers
909    // ========================================================================
910
911    /// Handle response and deserialize JSON
912    pub(crate) async fn handle_response<T: serde::de::DeserializeOwned>(
913        &self,
914        response: reqwest::Response,
915    ) -> Result<T> {
916        let status = response.status();
917
918        if status.is_success() {
919            Ok(response.json().await?)
920        } else {
921            let status_code = status.as_u16();
922            let text = response.text().await.unwrap_or_default();
923
924            // Try to parse error message from JSON
925            let message = if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) {
926                json.get("error")
927                    .and_then(|e| e.as_str())
928                    .unwrap_or(&text)
929                    .to_string()
930            } else {
931                text
932            };
933
934            Err(ClientError::Server {
935                status: status_code,
936                message,
937            })
938        }
939    }
940}
941
942/// Builder for DakeraClient
943#[derive(Debug)]
944pub struct DakeraClientBuilder {
945    base_url: String,
946    timeout: Duration,
947    user_agent: Option<String>,
948}
949
950impl DakeraClientBuilder {
951    /// Create a new builder
952    pub fn new(base_url: impl Into<String>) -> Self {
953        Self {
954            base_url: base_url.into(),
955            timeout: Duration::from_secs(DEFAULT_TIMEOUT_SECS),
956            user_agent: None,
957        }
958    }
959
960    /// Set the request timeout
961    pub fn timeout(mut self, timeout: Duration) -> Self {
962        self.timeout = timeout;
963        self
964    }
965
966    /// Set the request timeout in seconds
967    pub fn timeout_secs(mut self, secs: u64) -> Self {
968        self.timeout = Duration::from_secs(secs);
969        self
970    }
971
972    /// Set a custom user agent
973    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
974        self.user_agent = Some(user_agent.into());
975        self
976    }
977
978    /// Build the client
979    pub fn build(self) -> Result<DakeraClient> {
980        // Normalize base URL (remove trailing slash)
981        let base_url = self.base_url.trim_end_matches('/').to_string();
982
983        // Validate URL
984        if !base_url.starts_with("http://") && !base_url.starts_with("https://") {
985            return Err(ClientError::InvalidUrl(
986                "URL must start with http:// or https://".to_string(),
987            ));
988        }
989
990        let user_agent = self
991            .user_agent
992            .unwrap_or_else(|| format!("dakera-client/{}", env!("CARGO_PKG_VERSION")));
993
994        let client = Client::builder()
995            .timeout(self.timeout)
996            .user_agent(user_agent)
997            .build()
998            .map_err(|e| ClientError::Config(e.to_string()))?;
999
1000        Ok(DakeraClient { client, base_url })
1001    }
1002}
1003
1004// ============================================================================
1005// SSE Streaming (CE-1)
1006// ============================================================================
1007
1008impl DakeraClient {
1009    /// Subscribe to namespace-scoped SSE events.
1010    ///
1011    /// Opens a long-lived connection to `GET /v1/namespaces/{namespace}/events`
1012    /// and returns a [`tokio::sync::mpsc::Receiver`] that yields
1013    /// [`DakeraEvent`] results as they arrive.  The background task exits when
1014    /// the server closes the stream or the receiver is dropped.
1015    ///
1016    /// Requires a Read-scoped API key.
1017    ///
1018    /// # Example
1019    ///
1020    /// ```rust,no_run
1021    /// use dakera_client::DakeraClient;
1022    ///
1023    /// #[tokio::main]
1024    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1025    ///     let client = DakeraClient::new("http://localhost:3000")?;
1026    ///     let mut rx = client.stream_namespace_events("my-ns").await?;
1027    ///     while let Some(result) = rx.recv().await {
1028    ///         println!("{:?}", result?);
1029    ///     }
1030    ///     Ok(())
1031    /// }
1032    /// ```
1033    pub async fn stream_namespace_events(
1034        &self,
1035        namespace: &str,
1036    ) -> Result<tokio::sync::mpsc::Receiver<Result<crate::events::DakeraEvent>>> {
1037        let url = format!(
1038            "{}/v1/namespaces/{}/events",
1039            self.base_url,
1040            urlencoding::encode(namespace)
1041        );
1042        self.stream_sse(url).await
1043    }
1044
1045    /// Subscribe to the global SSE event stream (all namespaces).
1046    ///
1047    /// Opens a long-lived connection to `GET /ops/events` and returns a
1048    /// [`tokio::sync::mpsc::Receiver`] that yields [`DakeraEvent`] results.
1049    ///
1050    /// Requires an Admin-scoped API key.
1051    pub async fn stream_global_events(
1052        &self,
1053    ) -> Result<tokio::sync::mpsc::Receiver<Result<crate::events::DakeraEvent>>> {
1054        let url = format!("{}/ops/events", self.base_url);
1055        self.stream_sse(url).await
1056    }
1057
1058    /// Subscribe to the memory lifecycle SSE event stream (DASH-B).
1059    ///
1060    /// Opens a long-lived connection to `GET /v1/events/stream` and returns a
1061    /// [`tokio::sync::mpsc::Receiver`] that yields [`MemoryEvent`] results as
1062    /// they arrive.  The background task exits when the server closes the stream
1063    /// or the receiver is dropped.
1064    ///
1065    /// Requires a Read-scoped API key.
1066    pub async fn stream_memory_events(
1067        &self,
1068    ) -> Result<tokio::sync::mpsc::Receiver<Result<crate::events::MemoryEvent>>> {
1069        let url = format!("{}/v1/events/stream", self.base_url);
1070        self.stream_sse(url).await
1071    }
1072
1073    /// Low-level generic SSE streaming helper.
1074    async fn stream_sse<T>(&self, url: String) -> Result<tokio::sync::mpsc::Receiver<Result<T>>>
1075    where
1076        T: serde::de::DeserializeOwned + Send + 'static,
1077    {
1078        use futures_util::StreamExt;
1079
1080        let response = self
1081            .client
1082            .get(&url)
1083            .header("Accept", "text/event-stream")
1084            .header("Cache-Control", "no-cache")
1085            .send()
1086            .await?;
1087
1088        if !response.status().is_success() {
1089            let status = response.status().as_u16();
1090            let body = response.text().await.unwrap_or_default();
1091            return Err(ClientError::Server {
1092                status,
1093                message: body,
1094            });
1095        }
1096
1097        let (tx, rx) = tokio::sync::mpsc::channel(64);
1098
1099        tokio::spawn(async move {
1100            let mut byte_stream = response.bytes_stream();
1101            let mut remaining = String::new();
1102            let mut data_lines: Vec<String> = Vec::new();
1103
1104            while let Some(chunk) = byte_stream.next().await {
1105                match chunk {
1106                    Ok(bytes) => {
1107                        remaining.push_str(&String::from_utf8_lossy(&bytes));
1108                        while let Some(pos) = remaining.find('\n') {
1109                            let raw = &remaining[..pos];
1110                            let line = raw.trim_end_matches('\r').to_string();
1111                            remaining = remaining[pos + 1..].to_string();
1112
1113                            if line.starts_with(':') {
1114                                // SSE comment / heartbeat — skip
1115                            } else if let Some(data) = line.strip_prefix("data:") {
1116                                data_lines.push(data.trim_start().to_string());
1117                            } else if line.is_empty() {
1118                                if !data_lines.is_empty() {
1119                                    let payload = data_lines.join("\n");
1120                                    data_lines.clear();
1121                                    let result = serde_json::from_str::<T>(&payload)
1122                                        .map_err(ClientError::Json);
1123                                    if tx.send(result).await.is_err() {
1124                                        return; // receiver dropped
1125                                    }
1126                                }
1127                            } else {
1128                                // Unrecognised field (e.g. "event:") — ignore
1129                            }
1130                        }
1131                    }
1132                    Err(e) => {
1133                        let _ = tx.send(Err(ClientError::Http(e))).await;
1134                        return;
1135                    }
1136                }
1137            }
1138        });
1139
1140        Ok(rx)
1141    }
1142}
1143
1144#[cfg(test)]
1145mod tests {
1146    use super::*;
1147
1148    #[test]
1149    fn test_client_builder() {
1150        let client = DakeraClient::new("http://localhost:3000");
1151        assert!(client.is_ok());
1152    }
1153
1154    #[test]
1155    fn test_client_builder_with_options() {
1156        let client = DakeraClient::builder("http://localhost:3000")
1157            .timeout_secs(60)
1158            .user_agent("test-client/1.0")
1159            .build();
1160        assert!(client.is_ok());
1161    }
1162
1163    #[test]
1164    fn test_client_builder_invalid_url() {
1165        let client = DakeraClient::new("invalid-url");
1166        assert!(client.is_err());
1167    }
1168
1169    #[test]
1170    fn test_client_builder_trailing_slash() {
1171        let client = DakeraClient::new("http://localhost:3000/").unwrap();
1172        assert!(!client.base_url.ends_with('/'));
1173    }
1174
1175    #[test]
1176    fn test_vector_creation() {
1177        let v = Vector::new("test", vec![0.1, 0.2, 0.3]);
1178        assert_eq!(v.id, "test");
1179        assert_eq!(v.values.len(), 3);
1180        assert!(v.metadata.is_none());
1181    }
1182
1183    #[test]
1184    fn test_query_request_builder() {
1185        let req = QueryRequest::new(vec![0.1, 0.2], 10)
1186            .with_filter(serde_json::json!({"category": "test"}))
1187            .include_metadata(false);
1188
1189        assert_eq!(req.top_k, 10);
1190        assert!(req.filter.is_some());
1191        assert!(!req.include_metadata);
1192    }
1193
1194    #[test]
1195    fn test_hybrid_search_request() {
1196        let req = HybridSearchRequest::new(vec![0.1], "test query", 5).with_vector_weight(0.7);
1197
1198        assert_eq!(req.vector_weight, 0.7);
1199        assert_eq!(req.text, "test query");
1200    }
1201
1202    #[test]
1203    fn test_hybrid_search_weight_clamping() {
1204        let req = HybridSearchRequest::new(vec![0.1], "test", 5).with_vector_weight(1.5); // Should be clamped to 1.0
1205
1206        assert_eq!(req.vector_weight, 1.0);
1207    }
1208
1209    #[test]
1210    fn test_text_document_builder() {
1211        let doc = TextDocument::new("doc1", "Hello world").with_ttl(3600);
1212
1213        assert_eq!(doc.id, "doc1");
1214        assert_eq!(doc.text, "Hello world");
1215        assert_eq!(doc.ttl_seconds, Some(3600));
1216        assert!(doc.metadata.is_none());
1217    }
1218
1219    #[test]
1220    fn test_upsert_text_request_builder() {
1221        let docs = vec![
1222            TextDocument::new("doc1", "Hello"),
1223            TextDocument::new("doc2", "World"),
1224        ];
1225        let req = UpsertTextRequest::new(docs).with_model(EmbeddingModel::BgeSmall);
1226
1227        assert_eq!(req.documents.len(), 2);
1228        assert_eq!(req.model, Some(EmbeddingModel::BgeSmall));
1229    }
1230
1231    #[test]
1232    fn test_query_text_request_builder() {
1233        let req = QueryTextRequest::new("semantic search query", 5)
1234            .with_filter(serde_json::json!({"category": "docs"}))
1235            .include_vectors(true)
1236            .with_model(EmbeddingModel::E5Small);
1237
1238        assert_eq!(req.text, "semantic search query");
1239        assert_eq!(req.top_k, 5);
1240        assert!(req.filter.is_some());
1241        assert!(req.include_vectors);
1242        assert_eq!(req.model, Some(EmbeddingModel::E5Small));
1243    }
1244
1245    #[test]
1246    fn test_fetch_request_builder() {
1247        let req = FetchRequest::new(vec!["id1".to_string(), "id2".to_string()]);
1248
1249        assert_eq!(req.ids.len(), 2);
1250        assert!(req.include_values);
1251        assert!(req.include_metadata);
1252    }
1253
1254    #[test]
1255    fn test_create_namespace_request_builder() {
1256        let req = CreateNamespaceRequest::new()
1257            .with_dimensions(384)
1258            .with_index_type("hnsw");
1259
1260        assert_eq!(req.dimensions, Some(384));
1261        assert_eq!(req.index_type.as_deref(), Some("hnsw"));
1262    }
1263
1264    #[test]
1265    fn test_batch_query_text_request() {
1266        let req =
1267            BatchQueryTextRequest::new(vec!["query one".to_string(), "query two".to_string()], 10);
1268
1269        assert_eq!(req.queries.len(), 2);
1270        assert_eq!(req.top_k, 10);
1271        assert!(!req.include_vectors);
1272        assert!(req.model.is_none());
1273    }
1274}