Skip to main content

dakera_client/
client.rs

1//! Dakera client implementation
2
3use reqwest::{Client, StatusCode};
4use std::time::Duration;
5use tracing::{debug, instrument};
6
7use crate::error::{ClientError, Result};
8use crate::types::*;
9
10/// Default timeout for requests
11const DEFAULT_TIMEOUT_SECS: u64 = 30;
12
13/// Dakeraclient for interacting with the vector database
14#[derive(Debug, Clone)]
15pub struct DakeraClient {
16    /// HTTP client
17    pub(crate) client: Client,
18    /// Base URL of the Dakera server
19    pub(crate) base_url: String,
20}
21
22impl DakeraClient {
23    /// Create a new client with the given base URL
24    ///
25    /// # Example
26    ///
27    /// ```rust,no_run
28    /// use dakera_client::DakeraClient;
29    ///
30    /// let client = DakeraClient::new("http://localhost:3000").unwrap();
31    /// ```
32    pub fn new(base_url: impl Into<String>) -> Result<Self> {
33        DakeraClientBuilder::new(base_url).build()
34    }
35
36    /// Create a new client builder for more configuration options
37    pub fn builder(base_url: impl Into<String>) -> DakeraClientBuilder {
38        DakeraClientBuilder::new(base_url)
39    }
40
41    // ========================================================================
42    // Health & Status
43    // ========================================================================
44
45    /// Check server health
46    #[instrument(skip(self))]
47    pub async fn health(&self) -> Result<HealthResponse> {
48        let url = format!("{}/health", self.base_url);
49        let response = self.client.get(&url).send().await?;
50
51        if response.status().is_success() {
52            Ok(response.json().await?)
53        } else {
54            // Health endpoint might return simple OK
55            Ok(HealthResponse {
56                healthy: true,
57                version: None,
58                uptime_seconds: None,
59            })
60        }
61    }
62
63    /// Check if server is ready
64    #[instrument(skip(self))]
65    pub async fn ready(&self) -> Result<ReadinessResponse> {
66        let url = format!("{}/health/ready", self.base_url);
67        let response = self.client.get(&url).send().await?;
68
69        if response.status().is_success() {
70            Ok(response.json().await?)
71        } else {
72            Ok(ReadinessResponse {
73                ready: false,
74                components: None,
75            })
76        }
77    }
78
79    /// Check if server is live
80    #[instrument(skip(self))]
81    pub async fn live(&self) -> Result<bool> {
82        let url = format!("{}/health/live", self.base_url);
83        let response = self.client.get(&url).send().await?;
84        Ok(response.status().is_success())
85    }
86
87    // ========================================================================
88    // Namespace Operations
89    // ========================================================================
90
91    /// List all namespaces
92    #[instrument(skip(self))]
93    pub async fn list_namespaces(&self) -> Result<Vec<String>> {
94        let url = format!("{}/v1/namespaces", self.base_url);
95        let response = self.client.get(&url).send().await?;
96        self.handle_response::<ListNamespacesResponse>(response)
97            .await
98            .map(|r| r.namespaces)
99    }
100
101    /// Get namespace information
102    #[instrument(skip(self))]
103    pub async fn get_namespace(&self, namespace: &str) -> Result<NamespaceInfo> {
104        let url = format!("{}/v1/namespaces/{}", self.base_url, namespace);
105        let response = self.client.get(&url).send().await?;
106        self.handle_response(response).await
107    }
108
109    /// Create a new namespace
110    #[instrument(skip(self, request))]
111    pub async fn create_namespace(
112        &self,
113        namespace: &str,
114        request: CreateNamespaceRequest,
115    ) -> Result<NamespaceInfo> {
116        let url = format!("{}/v1/namespaces/{}", self.base_url, namespace);
117        let response = self.client.post(&url).json(&request).send().await?;
118        self.handle_response(response).await
119    }
120
121    // ========================================================================
122    // Vector Operations
123    // ========================================================================
124
125    /// Upsert vectors into a namespace
126    #[instrument(skip(self, request), fields(vector_count = request.vectors.len()))]
127    pub async fn upsert(&self, namespace: &str, request: UpsertRequest) -> Result<UpsertResponse> {
128        let url = format!("{}/v1/namespaces/{}/vectors", self.base_url, namespace);
129        debug!(
130            "Upserting {} vectors to {}",
131            request.vectors.len(),
132            namespace
133        );
134
135        let response = self.client.post(&url).json(&request).send().await?;
136        self.handle_response(response).await
137    }
138
139    /// Upsert a single vector (convenience method)
140    #[instrument(skip(self, vector))]
141    pub async fn upsert_one(&self, namespace: &str, vector: Vector) -> Result<UpsertResponse> {
142        self.upsert(namespace, UpsertRequest::single(vector)).await
143    }
144
145    /// Upsert vectors in column format (Turbopuffer-inspired)
146    ///
147    /// This format is more efficient for bulk upserts as it avoids repeating
148    /// field names for each vector. All arrays must have equal length.
149    ///
150    /// # Example
151    ///
152    /// ```rust,no_run
153    /// use dakera_client::{DakeraClient, ColumnUpsertRequest};
154    ///
155    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
156    /// let client = DakeraClient::new("http://localhost:3000")?;
157    ///
158    /// let request = ColumnUpsertRequest::new(
159    ///     vec!["id1".to_string(), "id2".to_string(), "id3".to_string()],
160    ///     vec![
161    ///         vec![0.1, 0.2, 0.3],
162    ///         vec![0.4, 0.5, 0.6],
163    ///         vec![0.7, 0.8, 0.9],
164    ///     ],
165    /// )
166    /// .with_attribute("category", vec![
167    ///     serde_json::json!("A"),
168    ///     serde_json::json!("B"),
169    ///     serde_json::json!("A"),
170    /// ]);
171    ///
172    /// let response = client.upsert_columns("my-namespace", request).await?;
173    /// println!("Upserted {} vectors", response.upserted_count);
174    /// # Ok(())
175    /// # }
176    /// ```
177    #[instrument(skip(self, request), fields(namespace = %namespace, count = request.ids.len()))]
178    pub async fn upsert_columns(
179        &self,
180        namespace: &str,
181        request: ColumnUpsertRequest,
182    ) -> Result<UpsertResponse> {
183        let url = format!(
184            "{}/v1/namespaces/{}/upsert-columns",
185            self.base_url, namespace
186        );
187        debug!(
188            "Upserting {} vectors in column format to {}",
189            request.ids.len(),
190            namespace
191        );
192
193        let response = self.client.post(&url).json(&request).send().await?;
194        self.handle_response(response).await
195    }
196
197    /// Query for similar vectors
198    #[instrument(skip(self, request), fields(top_k = request.top_k))]
199    pub async fn query(&self, namespace: &str, request: QueryRequest) -> Result<QueryResponse> {
200        let url = format!("{}/v1/namespaces/{}/query", self.base_url, namespace);
201        debug!(
202            "Querying namespace {} for top {} results",
203            namespace, request.top_k
204        );
205
206        let response = self.client.post(&url).json(&request).send().await?;
207        self.handle_response(response).await
208    }
209
210    /// Simple query with just a vector and top_k (convenience method)
211    #[instrument(skip(self, vector))]
212    pub async fn query_simple(
213        &self,
214        namespace: &str,
215        vector: Vec<f32>,
216        top_k: u32,
217    ) -> Result<QueryResponse> {
218        self.query(namespace, QueryRequest::new(vector, top_k))
219            .await
220    }
221
222    /// Execute multiple queries in a single request
223    ///
224    /// This allows executing multiple vector similarity queries in parallel,
225    /// which is more efficient than making separate requests.
226    ///
227    /// # Example
228    ///
229    /// ```rust,no_run
230    /// use dakera_client::{DakeraClient, BatchQueryRequest, BatchQueryItem};
231    ///
232    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
233    /// let client = DakeraClient::new("http://localhost:3000")?;
234    ///
235    /// let request = BatchQueryRequest::new(vec![
236    ///     BatchQueryItem::new(vec![0.1, 0.2, 0.3], 5).with_id("query1"),
237    ///     BatchQueryItem::new(vec![0.4, 0.5, 0.6], 10).with_id("query2"),
238    /// ]);
239    ///
240    /// let response = client.batch_query("my-namespace", request).await?;
241    /// println!("Executed {} queries in {}ms", response.query_count, response.total_latency_ms);
242    /// # Ok(())
243    /// # }
244    /// ```
245    #[instrument(skip(self, request), fields(namespace = %namespace, query_count = request.queries.len()))]
246    pub async fn batch_query(
247        &self,
248        namespace: &str,
249        request: BatchQueryRequest,
250    ) -> Result<BatchQueryResponse> {
251        let url = format!("{}/v1/namespaces/{}/batch-query", self.base_url, namespace);
252        debug!(
253            "Batch querying namespace {} with {} queries",
254            namespace,
255            request.queries.len()
256        );
257
258        let response = self.client.post(&url).json(&request).send().await?;
259        self.handle_response(response).await
260    }
261
262    /// Delete vectors by ID
263    #[instrument(skip(self, request), fields(id_count = request.ids.len()))]
264    pub async fn delete(&self, namespace: &str, request: DeleteRequest) -> Result<DeleteResponse> {
265        let url = format!(
266            "{}/v1/namespaces/{}/vectors/delete",
267            self.base_url, namespace
268        );
269        debug!("Deleting {} vectors from {}", request.ids.len(), namespace);
270
271        let response = self.client.post(&url).json(&request).send().await?;
272        self.handle_response(response).await
273    }
274
275    /// Delete a single vector by ID (convenience method)
276    #[instrument(skip(self))]
277    pub async fn delete_one(&self, namespace: &str, id: &str) -> Result<DeleteResponse> {
278        self.delete(namespace, DeleteRequest::single(id)).await
279    }
280
281    // ========================================================================
282    // Full-Text Search Operations
283    // ========================================================================
284
285    /// Index documents for full-text search
286    #[instrument(skip(self, request), fields(doc_count = request.documents.len()))]
287    pub async fn index_documents(
288        &self,
289        namespace: &str,
290        request: IndexDocumentsRequest,
291    ) -> Result<IndexDocumentsResponse> {
292        let url = format!(
293            "{}/v1/namespaces/{}/fulltext/index",
294            self.base_url, namespace
295        );
296        debug!(
297            "Indexing {} documents in {}",
298            request.documents.len(),
299            namespace
300        );
301
302        let response = self.client.post(&url).json(&request).send().await?;
303        self.handle_response(response).await
304    }
305
306    /// Index a single document (convenience method)
307    #[instrument(skip(self, document))]
308    pub async fn index_document(
309        &self,
310        namespace: &str,
311        document: Document,
312    ) -> Result<IndexDocumentsResponse> {
313        self.index_documents(
314            namespace,
315            IndexDocumentsRequest {
316                documents: vec![document],
317            },
318        )
319        .await
320    }
321
322    /// Perform full-text search
323    #[instrument(skip(self, request))]
324    pub async fn fulltext_search(
325        &self,
326        namespace: &str,
327        request: FullTextSearchRequest,
328    ) -> Result<FullTextSearchResponse> {
329        let url = format!(
330            "{}/v1/namespaces/{}/fulltext/search",
331            self.base_url, namespace
332        );
333        debug!("Full-text search in {} for: {}", namespace, request.query);
334
335        let response = self.client.post(&url).json(&request).send().await?;
336        self.handle_response(response).await
337    }
338
339    /// Simple full-text search (convenience method)
340    #[instrument(skip(self))]
341    pub async fn search_text(
342        &self,
343        namespace: &str,
344        query: &str,
345        top_k: u32,
346    ) -> Result<FullTextSearchResponse> {
347        self.fulltext_search(namespace, FullTextSearchRequest::new(query, top_k))
348            .await
349    }
350
351    /// Get full-text index statistics
352    #[instrument(skip(self))]
353    pub async fn fulltext_stats(&self, namespace: &str) -> Result<FullTextStats> {
354        let url = format!(
355            "{}/v1/namespaces/{}/fulltext/stats",
356            self.base_url, namespace
357        );
358        let response = self.client.get(&url).send().await?;
359        self.handle_response(response).await
360    }
361
362    /// Delete documents from full-text index
363    #[instrument(skip(self, request))]
364    pub async fn fulltext_delete(
365        &self,
366        namespace: &str,
367        request: DeleteRequest,
368    ) -> Result<DeleteResponse> {
369        let url = format!(
370            "{}/v1/namespaces/{}/fulltext/delete",
371            self.base_url, namespace
372        );
373        let response = self.client.post(&url).json(&request).send().await?;
374        self.handle_response(response).await
375    }
376
377    // ========================================================================
378    // Hybrid Search Operations
379    // ========================================================================
380
381    /// Perform hybrid search (vector + full-text)
382    #[instrument(skip(self, request), fields(top_k = request.top_k))]
383    pub async fn hybrid_search(
384        &self,
385        namespace: &str,
386        request: HybridSearchRequest,
387    ) -> Result<HybridSearchResponse> {
388        let url = format!("{}/v1/namespaces/{}/hybrid", self.base_url, namespace);
389        debug!(
390            "Hybrid search in {} with vector_weight={}",
391            namespace, request.vector_weight
392        );
393
394        let response = self.client.post(&url).json(&request).send().await?;
395        self.handle_response(response).await
396    }
397
398    // ========================================================================
399    // Multi-Vector Search Operations
400    // ========================================================================
401
402    /// Multi-vector search with positive/negative vectors and MMR
403    ///
404    /// This performs semantic search using multiple positive vectors (to search towards)
405    /// and optional negative vectors (to search away from). Supports MMR (Maximal Marginal
406    /// Relevance) for result diversity.
407    ///
408    /// # Example
409    ///
410    /// ```rust,no_run
411    /// use dakera_client::{DakeraClient, MultiVectorSearchRequest};
412    ///
413    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
414    /// let client = DakeraClient::new("http://localhost:3000")?;
415    ///
416    /// // Search towards multiple concepts, away from others
417    /// let request = MultiVectorSearchRequest::new(vec![
418    ///     vec![0.1, 0.2, 0.3],  // positive vector 1
419    ///     vec![0.4, 0.5, 0.6],  // positive vector 2
420    /// ])
421    /// .with_negative_vectors(vec![
422    ///     vec![0.7, 0.8, 0.9],  // negative vector
423    /// ])
424    /// .with_top_k(10)
425    /// .with_mmr(0.7);  // Enable MMR with lambda=0.7
426    ///
427    /// let response = client.multi_vector_search("my-namespace", request).await?;
428    /// for result in response.results {
429    ///     println!("ID: {}, Score: {}", result.id, result.score);
430    /// }
431    /// # Ok(())
432    /// # }
433    /// ```
434    #[instrument(skip(self, request), fields(namespace = %namespace))]
435    pub async fn multi_vector_search(
436        &self,
437        namespace: &str,
438        request: MultiVectorSearchRequest,
439    ) -> Result<MultiVectorSearchResponse> {
440        let url = format!("{}/v1/namespaces/{}/multi-vector", self.base_url, namespace);
441        debug!(
442            "Multi-vector search in {} with {} positive vectors",
443            namespace,
444            request.positive_vectors.len()
445        );
446
447        let response = self.client.post(&url).json(&request).send().await?;
448        self.handle_response(response).await
449    }
450
451    // ========================================================================
452    // Aggregation Operations
453    // ========================================================================
454
455    /// Aggregate vectors with grouping (Turbopuffer-inspired)
456    ///
457    /// This performs aggregation queries on vector metadata, supporting
458    /// count, sum, avg, min, and max operations with optional grouping.
459    ///
460    /// # Example
461    ///
462    /// ```rust,no_run
463    /// use dakera_client::{DakeraClient, AggregationRequest};
464    ///
465    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
466    /// let client = DakeraClient::new("http://localhost:3000")?;
467    ///
468    /// // Count all vectors and sum scores, grouped by category
469    /// let request = AggregationRequest::new()
470    ///     .with_count("total_count")
471    ///     .with_sum("total_score", "score")
472    ///     .with_avg("avg_score", "score")
473    ///     .with_group_by("category");
474    ///
475    /// let response = client.aggregate("my-namespace", request).await?;
476    /// if let Some(groups) = response.aggregation_groups {
477    ///     for group in groups {
478    ///         println!("Group: {:?}", group.group_key);
479    ///     }
480    /// }
481    /// # Ok(())
482    /// # }
483    /// ```
484    #[instrument(skip(self, request), fields(namespace = %namespace))]
485    pub async fn aggregate(
486        &self,
487        namespace: &str,
488        request: AggregationRequest,
489    ) -> Result<AggregationResponse> {
490        let url = format!("{}/v1/namespaces/{}/aggregate", self.base_url, namespace);
491        debug!(
492            "Aggregating in namespace {} with {} aggregations",
493            namespace,
494            request.aggregate_by.len()
495        );
496
497        let response = self.client.post(&url).json(&request).send().await?;
498        self.handle_response(response).await
499    }
500
501    // ========================================================================
502    // Unified Query Operations
503    // ========================================================================
504
505    /// Unified query with flexible ranking options (Turbopuffer-inspired)
506    ///
507    /// This provides a unified API for vector search (ANN/kNN), full-text search (BM25),
508    /// and attribute ordering. Supports combining multiple ranking functions with
509    /// Sum, Max, and Product operators.
510    ///
511    /// # Example
512    ///
513    /// ```rust,no_run
514    /// use dakera_client::{DakeraClient, UnifiedQueryRequest, SortDirection};
515    ///
516    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
517    /// let client = DakeraClient::new("http://localhost:3000")?;
518    ///
519    /// // Vector ANN search
520    /// let request = UnifiedQueryRequest::vector_search(vec![0.1, 0.2, 0.3], 10);
521    /// let response = client.unified_query("my-namespace", request).await?;
522    ///
523    /// // Full-text BM25 search
524    /// let request = UnifiedQueryRequest::fulltext_search("content", "hello world", 10);
525    /// let response = client.unified_query("my-namespace", request).await?;
526    ///
527    /// // Attribute ordering with filter
528    /// let request = UnifiedQueryRequest::attribute_order("timestamp", SortDirection::Desc, 10)
529    ///     .with_filter(serde_json::json!({"category": {"$eq": "science"}}));
530    /// let response = client.unified_query("my-namespace", request).await?;
531    ///
532    /// for result in response.results {
533    ///     println!("ID: {}, Score: {:?}", result.id, result.dist);
534    /// }
535    /// # Ok(())
536    /// # }
537    /// ```
538    #[instrument(skip(self, request), fields(namespace = %namespace))]
539    pub async fn unified_query(
540        &self,
541        namespace: &str,
542        request: UnifiedQueryRequest,
543    ) -> Result<UnifiedQueryResponse> {
544        let url = format!(
545            "{}/v1/namespaces/{}/unified-query",
546            self.base_url, namespace
547        );
548        debug!(
549            "Unified query in namespace {} with top_k={}",
550            namespace, request.top_k
551        );
552
553        let response = self.client.post(&url).json(&request).send().await?;
554        self.handle_response(response).await
555    }
556
557    /// Simple vector search using the unified query API (convenience method)
558    ///
559    /// This is a shortcut for `unified_query` with a vector ANN search.
560    #[instrument(skip(self, vector))]
561    pub async fn unified_vector_search(
562        &self,
563        namespace: &str,
564        vector: Vec<f32>,
565        top_k: usize,
566    ) -> Result<UnifiedQueryResponse> {
567        self.unified_query(namespace, UnifiedQueryRequest::vector_search(vector, top_k))
568            .await
569    }
570
571    /// Simple full-text search using the unified query API (convenience method)
572    ///
573    /// This is a shortcut for `unified_query` with a BM25 full-text search.
574    #[instrument(skip(self))]
575    pub async fn unified_text_search(
576        &self,
577        namespace: &str,
578        field: &str,
579        query: &str,
580        top_k: usize,
581    ) -> Result<UnifiedQueryResponse> {
582        self.unified_query(
583            namespace,
584            UnifiedQueryRequest::fulltext_search(field, query, top_k),
585        )
586        .await
587    }
588
589    // ========================================================================
590    // Query Explain Operations
591    // ========================================================================
592
593    /// Explain query execution plan (similar to SQL EXPLAIN)
594    ///
595    /// This provides detailed information about how a query will be executed,
596    /// including index selection, execution stages, cost estimates, and
597    /// performance recommendations.
598    ///
599    /// # Example
600    ///
601    /// ```rust,no_run
602    /// use dakera_client::{DakeraClient, QueryExplainRequest};
603    ///
604    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
605    /// let client = DakeraClient::new("http://localhost:3000")?;
606    ///
607    /// // Explain a vector search query
608    /// let request = QueryExplainRequest::vector_search(vec![0.1, 0.2, 0.3], 10)
609    ///     .with_verbose();
610    /// let plan = client.explain_query("my-namespace", request).await?;
611    ///
612    /// println!("Query plan: {}", plan.summary);
613    /// println!("Estimated time: {}ms", plan.cost_estimate.estimated_time_ms);
614    ///
615    /// for stage in &plan.stages {
616    ///     println!("Stage {}: {} - {}", stage.order, stage.name, stage.description);
617    /// }
618    ///
619    /// for rec in &plan.recommendations {
620    ///     println!("Recommendation ({}): {}", rec.priority, rec.description);
621    /// }
622    /// # Ok(())
623    /// # }
624    /// ```
625    #[instrument(skip(self, request), fields(namespace = %namespace))]
626    pub async fn explain_query(
627        &self,
628        namespace: &str,
629        request: QueryExplainRequest,
630    ) -> Result<QueryExplainResponse> {
631        let url = format!("{}/v1/namespaces/{}/explain", self.base_url, namespace);
632        debug!(
633            "Explaining query in namespace {} (query_type={:?}, top_k={})",
634            namespace, request.query_type, request.top_k
635        );
636
637        let response = self.client.post(&url).json(&request).send().await?;
638        self.handle_response(response).await
639    }
640
641    // ========================================================================
642    // Cache Warming Operations
643    // ========================================================================
644
645    /// Warm cache for vectors in a namespace
646    ///
647    /// This pre-loads vectors into cache tiers for faster subsequent access.
648    /// Supports priority levels and can run in the background.
649    ///
650    /// # Example
651    ///
652    /// ```rust,no_run
653    /// use dakera_client::{DakeraClient, WarmCacheRequest, WarmingPriority};
654    ///
655    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
656    /// let client = DakeraClient::new("http://localhost:3000")?;
657    ///
658    /// // Warm entire namespace with high priority
659    /// let response = client.warm_cache(
660    ///     WarmCacheRequest::new("my-namespace")
661    ///         .with_priority(WarmingPriority::High)
662    /// ).await?;
663    ///
664    /// println!("Warmed {} entries", response.entries_warmed);
665    /// # Ok(())
666    /// # }
667    /// ```
668    #[instrument(skip(self, request), fields(namespace = %request.namespace, priority = ?request.priority))]
669    pub async fn warm_cache(&self, request: WarmCacheRequest) -> Result<WarmCacheResponse> {
670        let url = format!(
671            "{}/v1/namespaces/{}/cache/warm",
672            self.base_url, request.namespace
673        );
674        debug!(
675            "Warming cache for namespace {} with priority {:?}",
676            request.namespace, request.priority
677        );
678
679        let response = self.client.post(&url).json(&request).send().await?;
680        self.handle_response(response).await
681    }
682
683    /// Warm specific vectors by ID (convenience method)
684    #[instrument(skip(self, vector_ids))]
685    pub async fn warm_vectors(
686        &self,
687        namespace: &str,
688        vector_ids: Vec<String>,
689    ) -> Result<WarmCacheResponse> {
690        self.warm_cache(WarmCacheRequest::new(namespace).with_vector_ids(vector_ids))
691            .await
692    }
693
694    // ========================================================================
695    // Export Operations
696    // ========================================================================
697
698    /// Export vectors from a namespace with pagination
699    ///
700    /// This exports all vectors from a namespace, supporting pagination for
701    /// large datasets. Use the `next_cursor` from the response to fetch
702    /// subsequent pages.
703    ///
704    /// # Example
705    ///
706    /// ```rust,no_run
707    /// use dakera_client::{DakeraClient, ExportRequest};
708    ///
709    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
710    /// let client = DakeraClient::new("http://localhost:3000")?;
711    ///
712    /// // Export first page of vectors
713    /// let mut request = ExportRequest::new().with_top_k(1000);
714    /// let response = client.export("my-namespace", request).await?;
715    ///
716    /// println!("Exported {} vectors", response.returned_count);
717    ///
718    /// // Fetch next page if available
719    /// if let Some(cursor) = response.next_cursor {
720    ///     let next_request = ExportRequest::new().with_cursor(cursor);
721    ///     let next_response = client.export("my-namespace", next_request).await?;
722    /// }
723    /// # Ok(())
724    /// # }
725    /// ```
726    #[instrument(skip(self, request), fields(namespace = %namespace))]
727    pub async fn export(&self, namespace: &str, request: ExportRequest) -> Result<ExportResponse> {
728        let url = format!("{}/v1/namespaces/{}/export", self.base_url, namespace);
729        debug!(
730            "Exporting vectors from namespace {} (top_k={}, cursor={:?})",
731            namespace, request.top_k, request.cursor
732        );
733
734        let response = self.client.post(&url).json(&request).send().await?;
735        self.handle_response(response).await
736    }
737
738    /// Export all vectors from a namespace (convenience method)
739    ///
740    /// This is a simple wrapper that exports with default settings.
741    #[instrument(skip(self))]
742    pub async fn export_all(&self, namespace: &str) -> Result<ExportResponse> {
743        self.export(namespace, ExportRequest::new()).await
744    }
745
746    // ========================================================================
747    // Operations
748    // ========================================================================
749
750    /// Get system diagnostics
751    #[instrument(skip(self))]
752    pub async fn diagnostics(&self) -> Result<SystemDiagnostics> {
753        let url = format!("{}/ops/diagnostics", self.base_url);
754        let response = self.client.get(&url).send().await?;
755        self.handle_response(response).await
756    }
757
758    /// List background jobs
759    #[instrument(skip(self))]
760    pub async fn list_jobs(&self) -> Result<Vec<JobInfo>> {
761        let url = format!("{}/ops/jobs", self.base_url);
762        let response = self.client.get(&url).send().await?;
763        self.handle_response(response).await
764    }
765
766    /// Get a specific job status
767    #[instrument(skip(self))]
768    pub async fn get_job(&self, job_id: &str) -> Result<Option<JobInfo>> {
769        let url = format!("{}/ops/jobs/{}", self.base_url, job_id);
770        let response = self.client.get(&url).send().await?;
771
772        if response.status() == StatusCode::NOT_FOUND {
773            return Ok(None);
774        }
775
776        self.handle_response(response).await.map(Some)
777    }
778
779    /// Trigger index compaction
780    #[instrument(skip(self, request))]
781    pub async fn compact(&self, request: CompactionRequest) -> Result<CompactionResponse> {
782        let url = format!("{}/ops/compact", self.base_url);
783        let response = self.client.post(&url).json(&request).send().await?;
784        self.handle_response(response).await
785    }
786
787    /// Request graceful shutdown
788    #[instrument(skip(self))]
789    pub async fn shutdown(&self) -> Result<()> {
790        let url = format!("{}/ops/shutdown", self.base_url);
791        let response = self.client.post(&url).send().await?;
792
793        if response.status().is_success() {
794            Ok(())
795        } else {
796            let status = response.status().as_u16();
797            let text = response.text().await.unwrap_or_default();
798            Err(ClientError::Server {
799                status,
800                message: text,
801            })
802        }
803    }
804
805    // ========================================================================
806    // Fetch by ID
807    // ========================================================================
808
809    /// Fetch vectors by their IDs
810    #[instrument(skip(self, request), fields(id_count = request.ids.len()))]
811    pub async fn fetch(&self, namespace: &str, request: FetchRequest) -> Result<FetchResponse> {
812        let url = format!("{}/v1/namespaces/{}/fetch", self.base_url, namespace);
813        debug!("Fetching {} vectors from {}", request.ids.len(), namespace);
814        let response = self.client.post(&url).json(&request).send().await?;
815        self.handle_response(response).await
816    }
817
818    /// Fetch vectors by IDs (convenience method)
819    #[instrument(skip(self))]
820    pub async fn fetch_by_ids(&self, namespace: &str, ids: &[&str]) -> Result<Vec<Vector>> {
821        let request = FetchRequest::new(ids.iter().map(|s| s.to_string()).collect());
822        self.fetch(namespace, request).await.map(|r| r.vectors)
823    }
824
825    // ========================================================================
826    // Text Auto-Embedding Operations
827    // ========================================================================
828
829    /// Upsert text documents with automatic server-side embedding generation
830    #[instrument(skip(self, request), fields(doc_count = request.documents.len()))]
831    pub async fn upsert_text(
832        &self,
833        namespace: &str,
834        request: UpsertTextRequest,
835    ) -> Result<TextUpsertResponse> {
836        let url = format!("{}/v1/namespaces/{}/upsert-text", self.base_url, namespace);
837        debug!(
838            "Upserting {} text documents to {}",
839            request.documents.len(),
840            namespace
841        );
842        let response = self.client.post(&url).json(&request).send().await?;
843        self.handle_response(response).await
844    }
845
846    /// Query using natural language text with automatic server-side embedding
847    #[instrument(skip(self, request), fields(top_k = request.top_k))]
848    pub async fn query_text(
849        &self,
850        namespace: &str,
851        request: QueryTextRequest,
852    ) -> Result<TextQueryResponse> {
853        let url = format!("{}/v1/namespaces/{}/query-text", self.base_url, namespace);
854        debug!("Text query in {} for: {}", namespace, request.text);
855        let response = self.client.post(&url).json(&request).send().await?;
856        self.handle_response(response).await
857    }
858
859    /// Query text (convenience method)
860    #[instrument(skip(self))]
861    pub async fn query_text_simple(
862        &self,
863        namespace: &str,
864        text: &str,
865        top_k: u32,
866    ) -> Result<TextQueryResponse> {
867        self.query_text(namespace, QueryTextRequest::new(text, top_k))
868            .await
869    }
870
871    /// Execute multiple text queries with automatic embedding in a single request
872    #[instrument(skip(self, request), fields(query_count = request.queries.len()))]
873    pub async fn batch_query_text(
874        &self,
875        namespace: &str,
876        request: BatchQueryTextRequest,
877    ) -> Result<BatchQueryTextResponse> {
878        let url = format!(
879            "{}/v1/namespaces/{}/batch-query-text",
880            self.base_url, namespace
881        );
882        debug!(
883            "Batch text query in {} with {} queries",
884            namespace,
885            request.queries.len()
886        );
887        let response = self.client.post(&url).json(&request).send().await?;
888        self.handle_response(response).await
889    }
890
891    // ========================================================================
892    // Private Helpers
893    // ========================================================================
894
895    /// Handle response and deserialize JSON
896    pub(crate) async fn handle_response<T: serde::de::DeserializeOwned>(
897        &self,
898        response: reqwest::Response,
899    ) -> Result<T> {
900        let status = response.status();
901
902        if status.is_success() {
903            Ok(response.json().await?)
904        } else {
905            let status_code = status.as_u16();
906            let text = response.text().await.unwrap_or_default();
907
908            // Try to parse error message from JSON
909            let message = if let Ok(json) = serde_json::from_str::<serde_json::Value>(&text) {
910                json.get("error")
911                    .and_then(|e| e.as_str())
912                    .unwrap_or(&text)
913                    .to_string()
914            } else {
915                text
916            };
917
918            Err(ClientError::Server {
919                status: status_code,
920                message,
921            })
922        }
923    }
924}
925
926/// Builder for DakeraClient
927#[derive(Debug)]
928pub struct DakeraClientBuilder {
929    base_url: String,
930    timeout: Duration,
931    user_agent: Option<String>,
932}
933
934impl DakeraClientBuilder {
935    /// Create a new builder
936    pub fn new(base_url: impl Into<String>) -> Self {
937        Self {
938            base_url: base_url.into(),
939            timeout: Duration::from_secs(DEFAULT_TIMEOUT_SECS),
940            user_agent: None,
941        }
942    }
943
944    /// Set the request timeout
945    pub fn timeout(mut self, timeout: Duration) -> Self {
946        self.timeout = timeout;
947        self
948    }
949
950    /// Set the request timeout in seconds
951    pub fn timeout_secs(mut self, secs: u64) -> Self {
952        self.timeout = Duration::from_secs(secs);
953        self
954    }
955
956    /// Set a custom user agent
957    pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
958        self.user_agent = Some(user_agent.into());
959        self
960    }
961
962    /// Build the client
963    pub fn build(self) -> Result<DakeraClient> {
964        // Normalize base URL (remove trailing slash)
965        let base_url = self.base_url.trim_end_matches('/').to_string();
966
967        // Validate URL
968        if !base_url.starts_with("http://") && !base_url.starts_with("https://") {
969            return Err(ClientError::InvalidUrl(
970                "URL must start with http:// or https://".to_string(),
971            ));
972        }
973
974        let user_agent = self
975            .user_agent
976            .unwrap_or_else(|| format!("dakera-client/{}", env!("CARGO_PKG_VERSION")));
977
978        let client = Client::builder()
979            .timeout(self.timeout)
980            .user_agent(user_agent)
981            .build()
982            .map_err(|e| ClientError::Config(e.to_string()))?;
983
984        Ok(DakeraClient { client, base_url })
985    }
986}
987
988// ============================================================================
989// SSE Streaming (CE-1)
990// ============================================================================
991
992impl DakeraClient {
993    /// Subscribe to namespace-scoped SSE events.
994    ///
995    /// Opens a long-lived connection to `GET /v1/namespaces/{namespace}/events`
996    /// and returns a [`tokio::sync::mpsc::Receiver`] that yields
997    /// [`DakeraEvent`] results as they arrive.  The background task exits when
998    /// the server closes the stream or the receiver is dropped.
999    ///
1000    /// Requires a Read-scoped API key.
1001    ///
1002    /// # Example
1003    ///
1004    /// ```rust,no_run
1005    /// use dakera_client::DakeraClient;
1006    ///
1007    /// #[tokio::main]
1008    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
1009    ///     let client = DakeraClient::new("http://localhost:3000")?;
1010    ///     let mut rx = client.stream_namespace_events("my-ns").await?;
1011    ///     while let Some(result) = rx.recv().await {
1012    ///         println!("{:?}", result?);
1013    ///     }
1014    ///     Ok(())
1015    /// }
1016    /// ```
1017    pub async fn stream_namespace_events(
1018        &self,
1019        namespace: &str,
1020    ) -> Result<tokio::sync::mpsc::Receiver<Result<crate::events::DakeraEvent>>> {
1021        let url = format!(
1022            "{}/v1/namespaces/{}/events",
1023            self.base_url,
1024            urlencoding::encode(namespace)
1025        );
1026        self.stream_sse(url).await
1027    }
1028
1029    /// Subscribe to the global SSE event stream (all namespaces).
1030    ///
1031    /// Opens a long-lived connection to `GET /ops/events` and returns a
1032    /// [`tokio::sync::mpsc::Receiver`] that yields [`DakeraEvent`] results.
1033    ///
1034    /// Requires an Admin-scoped API key.
1035    pub async fn stream_global_events(
1036        &self,
1037    ) -> Result<tokio::sync::mpsc::Receiver<Result<crate::events::DakeraEvent>>> {
1038        let url = format!("{}/ops/events", self.base_url);
1039        self.stream_sse(url).await
1040    }
1041
1042    /// Subscribe to the memory lifecycle SSE event stream (DASH-B).
1043    ///
1044    /// Opens a long-lived connection to `GET /v1/events/stream` and returns a
1045    /// [`tokio::sync::mpsc::Receiver`] that yields [`MemoryEvent`] results as
1046    /// they arrive.  The background task exits when the server closes the stream
1047    /// or the receiver is dropped.
1048    ///
1049    /// Requires a Read-scoped API key.
1050    pub async fn stream_memory_events(
1051        &self,
1052    ) -> Result<tokio::sync::mpsc::Receiver<Result<crate::events::MemoryEvent>>> {
1053        let url = format!("{}/v1/events/stream", self.base_url);
1054        self.stream_sse(url).await
1055    }
1056
1057    /// Low-level generic SSE streaming helper.
1058    async fn stream_sse<T>(&self, url: String) -> Result<tokio::sync::mpsc::Receiver<Result<T>>>
1059    where
1060        T: serde::de::DeserializeOwned + Send + 'static,
1061    {
1062        use futures_util::StreamExt;
1063
1064        let response = self
1065            .client
1066            .get(&url)
1067            .header("Accept", "text/event-stream")
1068            .header("Cache-Control", "no-cache")
1069            .send()
1070            .await?;
1071
1072        if !response.status().is_success() {
1073            let status = response.status().as_u16();
1074            let body = response.text().await.unwrap_or_default();
1075            return Err(ClientError::Server {
1076                status,
1077                message: body,
1078            });
1079        }
1080
1081        let (tx, rx) = tokio::sync::mpsc::channel(64);
1082
1083        tokio::spawn(async move {
1084            let mut byte_stream = response.bytes_stream();
1085            let mut remaining = String::new();
1086            let mut data_lines: Vec<String> = Vec::new();
1087
1088            while let Some(chunk) = byte_stream.next().await {
1089                match chunk {
1090                    Ok(bytes) => {
1091                        remaining.push_str(&String::from_utf8_lossy(&bytes));
1092                        while let Some(pos) = remaining.find('\n') {
1093                            let raw = &remaining[..pos];
1094                            let line = raw.trim_end_matches('\r').to_string();
1095                            remaining = remaining[pos + 1..].to_string();
1096
1097                            if line.starts_with(':') {
1098                                // SSE comment / heartbeat — skip
1099                            } else if let Some(data) = line.strip_prefix("data:") {
1100                                data_lines.push(data.trim_start().to_string());
1101                            } else if line.is_empty() {
1102                                if !data_lines.is_empty() {
1103                                    let payload = data_lines.join("\n");
1104                                    data_lines.clear();
1105                                    let result = serde_json::from_str::<T>(&payload)
1106                                        .map_err(ClientError::Json);
1107                                    if tx.send(result).await.is_err() {
1108                                        return; // receiver dropped
1109                                    }
1110                                }
1111                            } else {
1112                                // Unrecognised field (e.g. "event:") — ignore
1113                            }
1114                        }
1115                    }
1116                    Err(e) => {
1117                        let _ = tx.send(Err(ClientError::Http(e))).await;
1118                        return;
1119                    }
1120                }
1121            }
1122        });
1123
1124        Ok(rx)
1125    }
1126}
1127
1128#[cfg(test)]
1129mod tests {
1130    use super::*;
1131
1132    #[test]
1133    fn test_client_builder() {
1134        let client = DakeraClient::new("http://localhost:3000");
1135        assert!(client.is_ok());
1136    }
1137
1138    #[test]
1139    fn test_client_builder_with_options() {
1140        let client = DakeraClient::builder("http://localhost:3000")
1141            .timeout_secs(60)
1142            .user_agent("test-client/1.0")
1143            .build();
1144        assert!(client.is_ok());
1145    }
1146
1147    #[test]
1148    fn test_client_builder_invalid_url() {
1149        let client = DakeraClient::new("invalid-url");
1150        assert!(client.is_err());
1151    }
1152
1153    #[test]
1154    fn test_client_builder_trailing_slash() {
1155        let client = DakeraClient::new("http://localhost:3000/").unwrap();
1156        assert!(!client.base_url.ends_with('/'));
1157    }
1158
1159    #[test]
1160    fn test_vector_creation() {
1161        let v = Vector::new("test", vec![0.1, 0.2, 0.3]);
1162        assert_eq!(v.id, "test");
1163        assert_eq!(v.values.len(), 3);
1164        assert!(v.metadata.is_none());
1165    }
1166
1167    #[test]
1168    fn test_query_request_builder() {
1169        let req = QueryRequest::new(vec![0.1, 0.2], 10)
1170            .with_filter(serde_json::json!({"category": "test"}))
1171            .include_metadata(false);
1172
1173        assert_eq!(req.top_k, 10);
1174        assert!(req.filter.is_some());
1175        assert!(!req.include_metadata);
1176    }
1177
1178    #[test]
1179    fn test_hybrid_search_request() {
1180        let req = HybridSearchRequest::new(vec![0.1], "test query", 5).with_vector_weight(0.7);
1181
1182        assert_eq!(req.vector_weight, 0.7);
1183        assert_eq!(req.text, "test query");
1184    }
1185
1186    #[test]
1187    fn test_hybrid_search_weight_clamping() {
1188        let req = HybridSearchRequest::new(vec![0.1], "test", 5).with_vector_weight(1.5); // Should be clamped to 1.0
1189
1190        assert_eq!(req.vector_weight, 1.0);
1191    }
1192
1193    #[test]
1194    fn test_text_document_builder() {
1195        let doc = TextDocument::new("doc1", "Hello world").with_ttl(3600);
1196
1197        assert_eq!(doc.id, "doc1");
1198        assert_eq!(doc.text, "Hello world");
1199        assert_eq!(doc.ttl_seconds, Some(3600));
1200        assert!(doc.metadata.is_none());
1201    }
1202
1203    #[test]
1204    fn test_upsert_text_request_builder() {
1205        let docs = vec![
1206            TextDocument::new("doc1", "Hello"),
1207            TextDocument::new("doc2", "World"),
1208        ];
1209        let req = UpsertTextRequest::new(docs).with_model(EmbeddingModel::BgeSmall);
1210
1211        assert_eq!(req.documents.len(), 2);
1212        assert_eq!(req.model, Some(EmbeddingModel::BgeSmall));
1213    }
1214
1215    #[test]
1216    fn test_query_text_request_builder() {
1217        let req = QueryTextRequest::new("semantic search query", 5)
1218            .with_filter(serde_json::json!({"category": "docs"}))
1219            .include_vectors(true)
1220            .with_model(EmbeddingModel::E5Small);
1221
1222        assert_eq!(req.text, "semantic search query");
1223        assert_eq!(req.top_k, 5);
1224        assert!(req.filter.is_some());
1225        assert!(req.include_vectors);
1226        assert_eq!(req.model, Some(EmbeddingModel::E5Small));
1227    }
1228
1229    #[test]
1230    fn test_fetch_request_builder() {
1231        let req = FetchRequest::new(vec!["id1".to_string(), "id2".to_string()]);
1232
1233        assert_eq!(req.ids.len(), 2);
1234        assert!(req.include_values);
1235        assert!(req.include_metadata);
1236    }
1237
1238    #[test]
1239    fn test_create_namespace_request_builder() {
1240        let req = CreateNamespaceRequest::new()
1241            .with_dimensions(384)
1242            .with_index_type("hnsw");
1243
1244        assert_eq!(req.dimensions, Some(384));
1245        assert_eq!(req.index_type.as_deref(), Some("hnsw"));
1246    }
1247
1248    #[test]
1249    fn test_batch_query_text_request() {
1250        let req =
1251            BatchQueryTextRequest::new(vec!["query one".to_string(), "query two".to_string()], 10);
1252
1253        assert_eq!(req.queries.len(), 2);
1254        assert_eq!(req.top_k, 10);
1255        assert!(!req.include_vectors);
1256        assert!(req.model.is_none());
1257    }
1258}