1use reqwest::{Client, StatusCode};
4use std::time::Duration;
5use tracing::{debug, instrument};
6
7use serde::Deserialize;
8
9use crate::error::{ClientError, Result, ServerErrorCode};
10use crate::types::*;
11
12const DEFAULT_TIMEOUT_SECS: u64 = 30;
14
15#[derive(Debug, Clone)]
17pub struct DakeraClient {
18 pub(crate) client: Client,
20 pub(crate) base_url: String,
22}
23
24impl DakeraClient {
25 pub fn new(base_url: impl Into<String>) -> Result<Self> {
35 DakeraClientBuilder::new(base_url).build()
36 }
37
38 pub fn builder(base_url: impl Into<String>) -> DakeraClientBuilder {
40 DakeraClientBuilder::new(base_url)
41 }
42
43 #[instrument(skip(self))]
49 pub async fn health(&self) -> Result<HealthResponse> {
50 let url = format!("{}/health", self.base_url);
51 let response = self.client.get(&url).send().await?;
52
53 if response.status().is_success() {
54 Ok(response.json().await?)
55 } else {
56 Ok(HealthResponse {
58 healthy: true,
59 version: None,
60 uptime_seconds: None,
61 })
62 }
63 }
64
65 #[instrument(skip(self))]
67 pub async fn ready(&self) -> Result<ReadinessResponse> {
68 let url = format!("{}/health/ready", self.base_url);
69 let response = self.client.get(&url).send().await?;
70
71 if response.status().is_success() {
72 Ok(response.json().await?)
73 } else {
74 Ok(ReadinessResponse {
75 ready: false,
76 components: None,
77 })
78 }
79 }
80
81 #[instrument(skip(self))]
83 pub async fn live(&self) -> Result<bool> {
84 let url = format!("{}/health/live", self.base_url);
85 let response = self.client.get(&url).send().await?;
86 Ok(response.status().is_success())
87 }
88
89 #[instrument(skip(self))]
95 pub async fn list_namespaces(&self) -> Result<Vec<String>> {
96 let url = format!("{}/v1/namespaces", self.base_url);
97 let response = self.client.get(&url).send().await?;
98 self.handle_response::<ListNamespacesResponse>(response)
99 .await
100 .map(|r| r.namespaces)
101 }
102
103 #[instrument(skip(self))]
105 pub async fn get_namespace(&self, namespace: &str) -> Result<NamespaceInfo> {
106 let url = format!("{}/v1/namespaces/{}", self.base_url, namespace);
107 let response = self.client.get(&url).send().await?;
108 self.handle_response(response).await
109 }
110
111 #[instrument(skip(self, request))]
113 pub async fn create_namespace(
114 &self,
115 namespace: &str,
116 request: CreateNamespaceRequest,
117 ) -> Result<NamespaceInfo> {
118 let url = format!("{}/v1/namespaces/{}", self.base_url, namespace);
119 let response = self.client.post(&url).json(&request).send().await?;
120 self.handle_response(response).await
121 }
122
123 #[instrument(skip(self, request), fields(namespace = %namespace))]
129 pub async fn configure_namespace(
130 &self,
131 namespace: &str,
132 request: ConfigureNamespaceRequest,
133 ) -> Result<ConfigureNamespaceResponse> {
134 let url = format!("{}/v1/namespaces/{}", self.base_url, namespace);
135 let response = self.client.put(&url).json(&request).send().await?;
136 self.handle_response(response).await
137 }
138
139 #[instrument(skip(self, request), fields(vector_count = request.vectors.len()))]
145 pub async fn upsert(&self, namespace: &str, request: UpsertRequest) -> Result<UpsertResponse> {
146 let url = format!("{}/v1/namespaces/{}/vectors", self.base_url, namespace);
147 debug!(
148 "Upserting {} vectors to {}",
149 request.vectors.len(),
150 namespace
151 );
152
153 let response = self.client.post(&url).json(&request).send().await?;
154 self.handle_response(response).await
155 }
156
157 #[instrument(skip(self, vector))]
159 pub async fn upsert_one(&self, namespace: &str, vector: Vector) -> Result<UpsertResponse> {
160 self.upsert(namespace, UpsertRequest::single(vector)).await
161 }
162
163 #[instrument(skip(self, request), fields(namespace = %namespace, count = request.ids.len()))]
196 pub async fn upsert_columns(
197 &self,
198 namespace: &str,
199 request: ColumnUpsertRequest,
200 ) -> Result<UpsertResponse> {
201 let url = format!(
202 "{}/v1/namespaces/{}/upsert-columns",
203 self.base_url, namespace
204 );
205 debug!(
206 "Upserting {} vectors in column format to {}",
207 request.ids.len(),
208 namespace
209 );
210
211 let response = self.client.post(&url).json(&request).send().await?;
212 self.handle_response(response).await
213 }
214
215 #[instrument(skip(self, request), fields(top_k = request.top_k))]
217 pub async fn query(&self, namespace: &str, request: QueryRequest) -> Result<QueryResponse> {
218 let url = format!("{}/v1/namespaces/{}/query", self.base_url, namespace);
219 debug!(
220 "Querying namespace {} for top {} results",
221 namespace, request.top_k
222 );
223
224 let response = self.client.post(&url).json(&request).send().await?;
225 self.handle_response(response).await
226 }
227
228 #[instrument(skip(self, vector))]
230 pub async fn query_simple(
231 &self,
232 namespace: &str,
233 vector: Vec<f32>,
234 top_k: u32,
235 ) -> Result<QueryResponse> {
236 self.query(namespace, QueryRequest::new(vector, top_k))
237 .await
238 }
239
240 #[instrument(skip(self, request), fields(namespace = %namespace, query_count = request.queries.len()))]
264 pub async fn batch_query(
265 &self,
266 namespace: &str,
267 request: BatchQueryRequest,
268 ) -> Result<BatchQueryResponse> {
269 let url = format!("{}/v1/namespaces/{}/batch-query", self.base_url, namespace);
270 debug!(
271 "Batch querying namespace {} with {} queries",
272 namespace,
273 request.queries.len()
274 );
275
276 let response = self.client.post(&url).json(&request).send().await?;
277 self.handle_response(response).await
278 }
279
280 #[instrument(skip(self, request), fields(id_count = request.ids.len()))]
282 pub async fn delete(&self, namespace: &str, request: DeleteRequest) -> Result<DeleteResponse> {
283 let url = format!(
284 "{}/v1/namespaces/{}/vectors/delete",
285 self.base_url, namespace
286 );
287 debug!("Deleting {} vectors from {}", request.ids.len(), namespace);
288
289 let response = self.client.post(&url).json(&request).send().await?;
290 self.handle_response(response).await
291 }
292
293 #[instrument(skip(self))]
295 pub async fn delete_one(&self, namespace: &str, id: &str) -> Result<DeleteResponse> {
296 self.delete(namespace, DeleteRequest::single(id)).await
297 }
298
299 #[instrument(skip(self, request), fields(doc_count = request.documents.len()))]
305 pub async fn index_documents(
306 &self,
307 namespace: &str,
308 request: IndexDocumentsRequest,
309 ) -> Result<IndexDocumentsResponse> {
310 let url = format!(
311 "{}/v1/namespaces/{}/fulltext/index",
312 self.base_url, namespace
313 );
314 debug!(
315 "Indexing {} documents in {}",
316 request.documents.len(),
317 namespace
318 );
319
320 let response = self.client.post(&url).json(&request).send().await?;
321 self.handle_response(response).await
322 }
323
324 #[instrument(skip(self, document))]
326 pub async fn index_document(
327 &self,
328 namespace: &str,
329 document: Document,
330 ) -> Result<IndexDocumentsResponse> {
331 self.index_documents(
332 namespace,
333 IndexDocumentsRequest {
334 documents: vec![document],
335 },
336 )
337 .await
338 }
339
340 #[instrument(skip(self, request))]
342 pub async fn fulltext_search(
343 &self,
344 namespace: &str,
345 request: FullTextSearchRequest,
346 ) -> Result<FullTextSearchResponse> {
347 let url = format!(
348 "{}/v1/namespaces/{}/fulltext/search",
349 self.base_url, namespace
350 );
351 debug!("Full-text search in {} for: {}", namespace, request.query);
352
353 let response = self.client.post(&url).json(&request).send().await?;
354 self.handle_response(response).await
355 }
356
357 #[instrument(skip(self))]
359 pub async fn search_text(
360 &self,
361 namespace: &str,
362 query: &str,
363 top_k: u32,
364 ) -> Result<FullTextSearchResponse> {
365 self.fulltext_search(namespace, FullTextSearchRequest::new(query, top_k))
366 .await
367 }
368
369 #[instrument(skip(self))]
371 pub async fn fulltext_stats(&self, namespace: &str) -> Result<FullTextStats> {
372 let url = format!(
373 "{}/v1/namespaces/{}/fulltext/stats",
374 self.base_url, namespace
375 );
376 let response = self.client.get(&url).send().await?;
377 self.handle_response(response).await
378 }
379
380 #[instrument(skip(self, request))]
382 pub async fn fulltext_delete(
383 &self,
384 namespace: &str,
385 request: DeleteRequest,
386 ) -> Result<DeleteResponse> {
387 let url = format!(
388 "{}/v1/namespaces/{}/fulltext/delete",
389 self.base_url, namespace
390 );
391 let response = self.client.post(&url).json(&request).send().await?;
392 self.handle_response(response).await
393 }
394
395 #[instrument(skip(self, request), fields(top_k = request.top_k))]
401 pub async fn hybrid_search(
402 &self,
403 namespace: &str,
404 request: HybridSearchRequest,
405 ) -> Result<HybridSearchResponse> {
406 let url = format!("{}/v1/namespaces/{}/hybrid", self.base_url, namespace);
407 debug!(
408 "Hybrid search in {} with vector_weight={}",
409 namespace, request.vector_weight
410 );
411
412 let response = self.client.post(&url).json(&request).send().await?;
413 self.handle_response(response).await
414 }
415
416 #[instrument(skip(self, request), fields(namespace = %namespace))]
453 pub async fn multi_vector_search(
454 &self,
455 namespace: &str,
456 request: MultiVectorSearchRequest,
457 ) -> Result<MultiVectorSearchResponse> {
458 let url = format!("{}/v1/namespaces/{}/multi-vector", self.base_url, namespace);
459 debug!(
460 "Multi-vector search in {} with {} positive vectors",
461 namespace,
462 request.positive_vectors.len()
463 );
464
465 let response = self.client.post(&url).json(&request).send().await?;
466 self.handle_response(response).await
467 }
468
469 #[instrument(skip(self, request), fields(namespace = %namespace))]
503 pub async fn aggregate(
504 &self,
505 namespace: &str,
506 request: AggregationRequest,
507 ) -> Result<AggregationResponse> {
508 let url = format!("{}/v1/namespaces/{}/aggregate", self.base_url, namespace);
509 debug!(
510 "Aggregating in namespace {} with {} aggregations",
511 namespace,
512 request.aggregate_by.len()
513 );
514
515 let response = self.client.post(&url).json(&request).send().await?;
516 self.handle_response(response).await
517 }
518
519 #[instrument(skip(self, request), fields(namespace = %namespace))]
557 pub async fn unified_query(
558 &self,
559 namespace: &str,
560 request: UnifiedQueryRequest,
561 ) -> Result<UnifiedQueryResponse> {
562 let url = format!(
563 "{}/v1/namespaces/{}/unified-query",
564 self.base_url, namespace
565 );
566 debug!(
567 "Unified query in namespace {} with top_k={}",
568 namespace, request.top_k
569 );
570
571 let response = self.client.post(&url).json(&request).send().await?;
572 self.handle_response(response).await
573 }
574
575 #[instrument(skip(self, vector))]
579 pub async fn unified_vector_search(
580 &self,
581 namespace: &str,
582 vector: Vec<f32>,
583 top_k: usize,
584 ) -> Result<UnifiedQueryResponse> {
585 self.unified_query(namespace, UnifiedQueryRequest::vector_search(vector, top_k))
586 .await
587 }
588
589 #[instrument(skip(self))]
593 pub async fn unified_text_search(
594 &self,
595 namespace: &str,
596 field: &str,
597 query: &str,
598 top_k: usize,
599 ) -> Result<UnifiedQueryResponse> {
600 self.unified_query(
601 namespace,
602 UnifiedQueryRequest::fulltext_search(field, query, top_k),
603 )
604 .await
605 }
606
607 #[instrument(skip(self, request), fields(namespace = %namespace))]
644 pub async fn explain_query(
645 &self,
646 namespace: &str,
647 request: QueryExplainRequest,
648 ) -> Result<QueryExplainResponse> {
649 let url = format!("{}/v1/namespaces/{}/explain", self.base_url, namespace);
650 debug!(
651 "Explaining query in namespace {} (query_type={:?}, top_k={})",
652 namespace, request.query_type, request.top_k
653 );
654
655 let response = self.client.post(&url).json(&request).send().await?;
656 self.handle_response(response).await
657 }
658
659 #[instrument(skip(self, request), fields(namespace = %request.namespace, priority = ?request.priority))]
687 pub async fn warm_cache(&self, request: WarmCacheRequest) -> Result<WarmCacheResponse> {
688 let url = format!(
689 "{}/v1/namespaces/{}/cache/warm",
690 self.base_url, request.namespace
691 );
692 debug!(
693 "Warming cache for namespace {} with priority {:?}",
694 request.namespace, request.priority
695 );
696
697 let response = self.client.post(&url).json(&request).send().await?;
698 self.handle_response(response).await
699 }
700
701 #[instrument(skip(self, vector_ids))]
703 pub async fn warm_vectors(
704 &self,
705 namespace: &str,
706 vector_ids: Vec<String>,
707 ) -> Result<WarmCacheResponse> {
708 self.warm_cache(WarmCacheRequest::new(namespace).with_vector_ids(vector_ids))
709 .await
710 }
711
712 #[instrument(skip(self, request), fields(namespace = %namespace))]
745 pub async fn export(&self, namespace: &str, request: ExportRequest) -> Result<ExportResponse> {
746 let url = format!("{}/v1/namespaces/{}/export", self.base_url, namespace);
747 debug!(
748 "Exporting vectors from namespace {} (top_k={}, cursor={:?})",
749 namespace, request.top_k, request.cursor
750 );
751
752 let response = self.client.post(&url).json(&request).send().await?;
753 self.handle_response(response).await
754 }
755
756 #[instrument(skip(self))]
760 pub async fn export_all(&self, namespace: &str) -> Result<ExportResponse> {
761 self.export(namespace, ExportRequest::new()).await
762 }
763
764 #[instrument(skip(self))]
770 pub async fn diagnostics(&self) -> Result<SystemDiagnostics> {
771 let url = format!("{}/ops/diagnostics", self.base_url);
772 let response = self.client.get(&url).send().await?;
773 self.handle_response(response).await
774 }
775
776 #[instrument(skip(self))]
778 pub async fn list_jobs(&self) -> Result<Vec<JobInfo>> {
779 let url = format!("{}/ops/jobs", self.base_url);
780 let response = self.client.get(&url).send().await?;
781 self.handle_response(response).await
782 }
783
784 #[instrument(skip(self))]
786 pub async fn get_job(&self, job_id: &str) -> Result<Option<JobInfo>> {
787 let url = format!("{}/ops/jobs/{}", self.base_url, job_id);
788 let response = self.client.get(&url).send().await?;
789
790 if response.status() == StatusCode::NOT_FOUND {
791 return Ok(None);
792 }
793
794 self.handle_response(response).await.map(Some)
795 }
796
797 #[instrument(skip(self, request))]
799 pub async fn compact(&self, request: CompactionRequest) -> Result<CompactionResponse> {
800 let url = format!("{}/ops/compact", self.base_url);
801 let response = self.client.post(&url).json(&request).send().await?;
802 self.handle_response(response).await
803 }
804
805 #[instrument(skip(self))]
807 pub async fn shutdown(&self) -> Result<()> {
808 let url = format!("{}/ops/shutdown", self.base_url);
809 let response = self.client.post(&url).send().await?;
810
811 if response.status().is_success() {
812 Ok(())
813 } else {
814 let status = response.status().as_u16();
815 let text = response.text().await.unwrap_or_default();
816 Err(ClientError::Server {
817 status,
818 message: text,
819 code: None,
820 })
821 }
822 }
823
824 #[instrument(skip(self, request), fields(id_count = request.ids.len()))]
830 pub async fn fetch(&self, namespace: &str, request: FetchRequest) -> Result<FetchResponse> {
831 let url = format!("{}/v1/namespaces/{}/fetch", self.base_url, namespace);
832 debug!("Fetching {} vectors from {}", request.ids.len(), namespace);
833 let response = self.client.post(&url).json(&request).send().await?;
834 self.handle_response(response).await
835 }
836
837 #[instrument(skip(self))]
839 pub async fn fetch_by_ids(&self, namespace: &str, ids: &[&str]) -> Result<Vec<Vector>> {
840 let request = FetchRequest::new(ids.iter().map(|s| s.to_string()).collect());
841 self.fetch(namespace, request).await.map(|r| r.vectors)
842 }
843
844 #[instrument(skip(self, request), fields(doc_count = request.documents.len()))]
850 pub async fn upsert_text(
851 &self,
852 namespace: &str,
853 request: UpsertTextRequest,
854 ) -> Result<TextUpsertResponse> {
855 let url = format!("{}/v1/namespaces/{}/upsert-text", self.base_url, namespace);
856 debug!(
857 "Upserting {} text documents to {}",
858 request.documents.len(),
859 namespace
860 );
861 let response = self.client.post(&url).json(&request).send().await?;
862 self.handle_response(response).await
863 }
864
865 #[instrument(skip(self, request), fields(top_k = request.top_k))]
867 pub async fn query_text(
868 &self,
869 namespace: &str,
870 request: QueryTextRequest,
871 ) -> Result<TextQueryResponse> {
872 let url = format!("{}/v1/namespaces/{}/query-text", self.base_url, namespace);
873 debug!("Text query in {} for: {}", namespace, request.text);
874 let response = self.client.post(&url).json(&request).send().await?;
875 self.handle_response(response).await
876 }
877
878 #[instrument(skip(self))]
880 pub async fn query_text_simple(
881 &self,
882 namespace: &str,
883 text: &str,
884 top_k: u32,
885 ) -> Result<TextQueryResponse> {
886 self.query_text(namespace, QueryTextRequest::new(text, top_k))
887 .await
888 }
889
890 #[instrument(skip(self, request), fields(query_count = request.queries.len()))]
892 pub async fn batch_query_text(
893 &self,
894 namespace: &str,
895 request: BatchQueryTextRequest,
896 ) -> Result<BatchQueryTextResponse> {
897 let url = format!(
898 "{}/v1/namespaces/{}/batch-query-text",
899 self.base_url, namespace
900 );
901 debug!(
902 "Batch text query in {} with {} queries",
903 namespace,
904 request.queries.len()
905 );
906 let response = self.client.post(&url).json(&request).send().await?;
907 self.handle_response(response).await
908 }
909
910 pub(crate) async fn handle_response<T: serde::de::DeserializeOwned>(
916 &self,
917 response: reqwest::Response,
918 ) -> Result<T> {
919 let status = response.status();
920
921 if status.is_success() {
922 Ok(response.json().await?)
923 } else {
924 let status_code = status.as_u16();
925 let text = response.text().await.unwrap_or_default();
926
927 #[derive(Deserialize)]
928 struct ErrorBody {
929 error: Option<String>,
930 code: Option<ServerErrorCode>,
931 }
932
933 let (message, code) = if let Ok(body) = serde_json::from_str::<ErrorBody>(&text) {
934 (body.error.unwrap_or_else(|| text.clone()), body.code)
935 } else {
936 (text, None)
937 };
938
939 match status_code {
940 401 => Err(ClientError::Server {
941 status: 401,
942 message,
943 code,
944 }),
945 403 => Err(ClientError::Authorization {
946 status: 403,
947 message,
948 code,
949 }),
950 404 => match &code {
951 Some(ServerErrorCode::NamespaceNotFound) => {
952 Err(ClientError::NamespaceNotFound(message))
953 }
954 Some(ServerErrorCode::VectorNotFound) => {
955 Err(ClientError::VectorNotFound(message))
956 }
957 _ => Err(ClientError::Server {
958 status: 404,
959 message,
960 code,
961 }),
962 },
963 _ => Err(ClientError::Server {
964 status: status_code,
965 message,
966 code,
967 }),
968 }
969 }
970 }
971}
972
973#[derive(Debug)]
975pub struct DakeraClientBuilder {
976 base_url: String,
977 timeout: Duration,
978 user_agent: Option<String>,
979}
980
981impl DakeraClientBuilder {
982 pub fn new(base_url: impl Into<String>) -> Self {
984 Self {
985 base_url: base_url.into(),
986 timeout: Duration::from_secs(DEFAULT_TIMEOUT_SECS),
987 user_agent: None,
988 }
989 }
990
991 pub fn timeout(mut self, timeout: Duration) -> Self {
993 self.timeout = timeout;
994 self
995 }
996
997 pub fn timeout_secs(mut self, secs: u64) -> Self {
999 self.timeout = Duration::from_secs(secs);
1000 self
1001 }
1002
1003 pub fn user_agent(mut self, user_agent: impl Into<String>) -> Self {
1005 self.user_agent = Some(user_agent.into());
1006 self
1007 }
1008
1009 pub fn build(self) -> Result<DakeraClient> {
1011 let base_url = self.base_url.trim_end_matches('/').to_string();
1013
1014 if !base_url.starts_with("http://") && !base_url.starts_with("https://") {
1016 return Err(ClientError::InvalidUrl(
1017 "URL must start with http:// or https://".to_string(),
1018 ));
1019 }
1020
1021 let user_agent = self
1022 .user_agent
1023 .unwrap_or_else(|| format!("dakera-client/{}", env!("CARGO_PKG_VERSION")));
1024
1025 let client = Client::builder()
1026 .timeout(self.timeout)
1027 .user_agent(user_agent)
1028 .build()
1029 .map_err(|e| ClientError::Config(e.to_string()))?;
1030
1031 Ok(DakeraClient { client, base_url })
1032 }
1033}
1034
1035impl DakeraClient {
1040 pub async fn stream_namespace_events(
1065 &self,
1066 namespace: &str,
1067 ) -> Result<tokio::sync::mpsc::Receiver<Result<crate::events::DakeraEvent>>> {
1068 let url = format!(
1069 "{}/v1/namespaces/{}/events",
1070 self.base_url,
1071 urlencoding::encode(namespace)
1072 );
1073 self.stream_sse(url).await
1074 }
1075
1076 pub async fn stream_global_events(
1083 &self,
1084 ) -> Result<tokio::sync::mpsc::Receiver<Result<crate::events::DakeraEvent>>> {
1085 let url = format!("{}/ops/events", self.base_url);
1086 self.stream_sse(url).await
1087 }
1088
1089 pub async fn stream_memory_events(
1098 &self,
1099 ) -> Result<tokio::sync::mpsc::Receiver<Result<crate::events::MemoryEvent>>> {
1100 let url = format!("{}/v1/events/stream", self.base_url);
1101 self.stream_sse(url).await
1102 }
1103
1104 async fn stream_sse<T>(&self, url: String) -> Result<tokio::sync::mpsc::Receiver<Result<T>>>
1106 where
1107 T: serde::de::DeserializeOwned + Send + 'static,
1108 {
1109 use futures_util::StreamExt;
1110
1111 let response = self
1112 .client
1113 .get(&url)
1114 .header("Accept", "text/event-stream")
1115 .header("Cache-Control", "no-cache")
1116 .send()
1117 .await?;
1118
1119 if !response.status().is_success() {
1120 let status = response.status().as_u16();
1121 let body = response.text().await.unwrap_or_default();
1122 return Err(ClientError::Server {
1123 status,
1124 message: body,
1125 code: None,
1126 });
1127 }
1128
1129 let (tx, rx) = tokio::sync::mpsc::channel(64);
1130
1131 tokio::spawn(async move {
1132 let mut byte_stream = response.bytes_stream();
1133 let mut remaining = String::new();
1134 let mut data_lines: Vec<String> = Vec::new();
1135
1136 while let Some(chunk) = byte_stream.next().await {
1137 match chunk {
1138 Ok(bytes) => {
1139 remaining.push_str(&String::from_utf8_lossy(&bytes));
1140 while let Some(pos) = remaining.find('\n') {
1141 let raw = &remaining[..pos];
1142 let line = raw.trim_end_matches('\r').to_string();
1143 remaining = remaining[pos + 1..].to_string();
1144
1145 if line.starts_with(':') {
1146 } else if let Some(data) = line.strip_prefix("data:") {
1148 data_lines.push(data.trim_start().to_string());
1149 } else if line.is_empty() {
1150 if !data_lines.is_empty() {
1151 let payload = data_lines.join("\n");
1152 data_lines.clear();
1153 let result = serde_json::from_str::<T>(&payload)
1154 .map_err(ClientError::Json);
1155 if tx.send(result).await.is_err() {
1156 return; }
1158 }
1159 } else {
1160 }
1162 }
1163 }
1164 Err(e) => {
1165 let _ = tx.send(Err(ClientError::Http(e))).await;
1166 return;
1167 }
1168 }
1169 }
1170 });
1171
1172 Ok(rx)
1173 }
1174}
1175
1176#[cfg(test)]
1177mod tests {
1178 use super::*;
1179
1180 #[test]
1181 fn test_client_builder() {
1182 let client = DakeraClient::new("http://localhost:3000");
1183 assert!(client.is_ok());
1184 }
1185
1186 #[test]
1187 fn test_client_builder_with_options() {
1188 let client = DakeraClient::builder("http://localhost:3000")
1189 .timeout_secs(60)
1190 .user_agent("test-client/1.0")
1191 .build();
1192 assert!(client.is_ok());
1193 }
1194
1195 #[test]
1196 fn test_client_builder_invalid_url() {
1197 let client = DakeraClient::new("invalid-url");
1198 assert!(client.is_err());
1199 }
1200
1201 #[test]
1202 fn test_client_builder_trailing_slash() {
1203 let client = DakeraClient::new("http://localhost:3000/").unwrap();
1204 assert!(!client.base_url.ends_with('/'));
1205 }
1206
1207 #[test]
1208 fn test_vector_creation() {
1209 let v = Vector::new("test", vec![0.1, 0.2, 0.3]);
1210 assert_eq!(v.id, "test");
1211 assert_eq!(v.values.len(), 3);
1212 assert!(v.metadata.is_none());
1213 }
1214
1215 #[test]
1216 fn test_query_request_builder() {
1217 let req = QueryRequest::new(vec![0.1, 0.2], 10)
1218 .with_filter(serde_json::json!({"category": "test"}))
1219 .include_metadata(false);
1220
1221 assert_eq!(req.top_k, 10);
1222 assert!(req.filter.is_some());
1223 assert!(!req.include_metadata);
1224 }
1225
1226 #[test]
1227 fn test_hybrid_search_request() {
1228 let req = HybridSearchRequest::new(vec![0.1], "test query", 5).with_vector_weight(0.7);
1229
1230 assert_eq!(req.vector_weight, 0.7);
1231 assert_eq!(req.text, "test query");
1232 }
1233
1234 #[test]
1235 fn test_hybrid_search_weight_clamping() {
1236 let req = HybridSearchRequest::new(vec![0.1], "test", 5).with_vector_weight(1.5); assert_eq!(req.vector_weight, 1.0);
1239 }
1240
1241 #[test]
1242 fn test_text_document_builder() {
1243 let doc = TextDocument::new("doc1", "Hello world").with_ttl(3600);
1244
1245 assert_eq!(doc.id, "doc1");
1246 assert_eq!(doc.text, "Hello world");
1247 assert_eq!(doc.ttl_seconds, Some(3600));
1248 assert!(doc.metadata.is_none());
1249 }
1250
1251 #[test]
1252 fn test_upsert_text_request_builder() {
1253 let docs = vec![
1254 TextDocument::new("doc1", "Hello"),
1255 TextDocument::new("doc2", "World"),
1256 ];
1257 let req = UpsertTextRequest::new(docs).with_model(EmbeddingModel::BgeSmall);
1258
1259 assert_eq!(req.documents.len(), 2);
1260 assert_eq!(req.model, Some(EmbeddingModel::BgeSmall));
1261 }
1262
1263 #[test]
1264 fn test_query_text_request_builder() {
1265 let req = QueryTextRequest::new("semantic search query", 5)
1266 .with_filter(serde_json::json!({"category": "docs"}))
1267 .include_vectors(true)
1268 .with_model(EmbeddingModel::E5Small);
1269
1270 assert_eq!(req.text, "semantic search query");
1271 assert_eq!(req.top_k, 5);
1272 assert!(req.filter.is_some());
1273 assert!(req.include_vectors);
1274 assert_eq!(req.model, Some(EmbeddingModel::E5Small));
1275 }
1276
1277 #[test]
1278 fn test_fetch_request_builder() {
1279 let req = FetchRequest::new(vec!["id1".to_string(), "id2".to_string()]);
1280
1281 assert_eq!(req.ids.len(), 2);
1282 assert!(req.include_values);
1283 assert!(req.include_metadata);
1284 }
1285
1286 #[test]
1287 fn test_create_namespace_request_builder() {
1288 let req = CreateNamespaceRequest::new()
1289 .with_dimensions(384)
1290 .with_index_type("hnsw");
1291
1292 assert_eq!(req.dimensions, Some(384));
1293 assert_eq!(req.index_type.as_deref(), Some("hnsw"));
1294 }
1295
1296 #[test]
1297 fn test_batch_query_text_request() {
1298 let req =
1299 BatchQueryTextRequest::new(vec!["query one".to_string(), "query two".to_string()], 10);
1300
1301 assert_eq!(req.queries.len(), 2);
1302 assert_eq!(req.top_k, 10);
1303 assert!(!req.include_vectors);
1304 assert!(req.model.is_none());
1305 }
1306}