1pub use hyperspace_proto::hyperspace::database_client::DatabaseClient;
2pub use hyperspace_proto::hyperspace::{
3 BatchInsertRequest, BatchSearchRequest, CollectionSummary, DurabilityLevel, EventMessage,
4 EventSubscriptionRequest, EventType, FindSemanticClustersRequest, FindSemanticClustersResponse,
5 GetConceptParentsRequest, GetConceptParentsResponse, GetNeighborsRequest, GetNeighborsResponse,
6 GetNodeRequest, GraphNode, InsertRequest, InsertTextRequest, SearchRequest, SearchResponse,
7 SearchResult, SearchResult as ResultItem, SearchTextRequest, TraverseRequest, TraverseResponse,
8 VectorData, VectorizeRequest, VectorizeResponse,
9};
10use tonic::codegen::InterceptedService;
11use tonic::service::Interceptor;
12use tonic::transport::Channel;
13use tonic::{Request, Status};
14
15pub mod fuzzy;
16pub mod gromov;
17pub mod math;
18
19#[cfg(feature = "embedders")]
20mod embedder;
21#[cfg(feature = "embedders")]
22pub use embedder::*;
23
24#[derive(Clone)]
25pub struct AuthInterceptor {
26 api_key: Option<String>,
27 user_id: Option<String>,
28}
29
30impl Interceptor for AuthInterceptor {
31 fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
32 if let Some(key) = &self.api_key {
33 let token = key
34 .parse()
35 .map_err(|_| Status::invalid_argument("Invalid API Key format"))?;
36 request.metadata_mut().insert("x-api-key", token);
37 }
38 if let Some(uid) = &self.user_id {
39 let token = uid
40 .parse()
41 .map_err(|_| Status::invalid_argument("Invalid User ID format"))?;
42 request.metadata_mut().insert("x-hyperspace-user-id", token);
43 }
44 Ok(request)
45 }
46}
47
48pub struct Client {
49 inner: DatabaseClient<InterceptedService<Channel, AuthInterceptor>>,
50 #[cfg(feature = "embedders")]
51 embedder: Option<Box<dyn Embedder>>,
52}
53
54impl Client {
55 #[inline]
56 fn vec_f32_to_f64(vector: &[f32]) -> Vec<f64> {
57 vector.iter().map(|&x| f64::from(x)).collect()
58 }
59
60 pub async fn connect(
65 dst: String,
66 api_key: Option<String>,
67 user_id: Option<String>,
68 ) -> Result<Self, Box<dyn std::error::Error>> {
69 let channel = Channel::from_shared(dst)?
70 .tcp_keepalive(Some(std::time::Duration::from_secs(30)))
71 .tcp_nodelay(true)
72 .keep_alive_while_idle(true)
73 .connect_timeout(std::time::Duration::from_secs(10))
74 .connect()
75 .await?;
76
77 let interceptor = AuthInterceptor { api_key, user_id };
78 let client = DatabaseClient::with_interceptor(channel, interceptor)
79 .max_decoding_message_size(64 * 1024 * 1024) .max_encoding_message_size(64 * 1024 * 1024); Ok(Self {
83 inner: client,
84 #[cfg(feature = "embedders")]
85 embedder: None,
86 })
87 }
88
89 #[cfg(feature = "embedders")]
90 pub fn set_embedder(&mut self, embedder: Box<dyn Embedder>) {
91 self.embedder = Some(embedder);
92 }
93
94 pub async fn create_collection(
99 &mut self,
100 name: String,
101 dimension: u32,
102 metric: String,
103 ) -> Result<String, tonic::Status> {
104 let req = hyperspace_proto::hyperspace::CreateCollectionRequest {
105 name,
106 dimension,
107 metric,
108 };
109 let resp = self.inner.create_collection(req).await?;
110 Ok(resp.into_inner().status)
111 }
112
113 pub async fn delete_collection(&mut self, name: String) -> Result<String, tonic::Status> {
118 let req = hyperspace_proto::hyperspace::DeleteCollectionRequest { name };
119 let resp = self.inner.delete_collection(req).await?;
120 Ok(resp.into_inner().status)
121 }
122
123 pub async fn list_collections(&mut self) -> Result<Vec<CollectionSummary>, tonic::Status> {
128 let req = hyperspace_proto::hyperspace::Empty {};
129 let resp = self.inner.list_collections(req).await?;
130 Ok(resp.into_inner().collections)
131 }
132
133
134 pub async fn get_collection_stats(
139 &mut self,
140 name: String,
141 ) -> Result<hyperspace_proto::hyperspace::CollectionStatsResponse, tonic::Status> {
142 let req = hyperspace_proto::hyperspace::CollectionStatsRequest { name };
143 let resp = self.inner.get_collection_stats(req).await?;
144 Ok(resp.into_inner())
145 }
146
147 pub async fn rebuild_index(&mut self, name: String) -> Result<String, tonic::Status> {
152 let req = hyperspace_proto::hyperspace::RebuildIndexRequest {
153 name,
154 filter_query: None,
155 };
156 let resp = self.inner.rebuild_index(req).await?;
157 Ok(resp.into_inner().status)
158 }
159
160 pub async fn rebuild_index_with_filter(
165 &mut self,
166 name: String,
167 key: String,
168 op: String,
169 value: f64,
170 ) -> Result<String, tonic::Status> {
171 let req = hyperspace_proto::hyperspace::RebuildIndexRequest {
172 name,
173 filter_query: Some(hyperspace_proto::hyperspace::VacuumFilterQuery { key, op, value }),
174 };
175 let resp = self.inner.rebuild_index(req).await?;
176 Ok(resp.into_inner().status)
177 }
178
179 pub async fn trigger_vacuum(&mut self) -> Result<String, tonic::Status> {
184 let req = hyperspace_proto::hyperspace::Empty {};
185 let resp = self.inner.trigger_vacuum(req).await?;
186 Ok(resp.into_inner().status)
187 }
188
189 pub async fn trigger_reconsolidation(
194 &mut self,
195 collection: String,
196 target_vector: Vec<f64>,
197 learning_rate: f64,
198 ) -> Result<String, tonic::Status> {
199 let req = hyperspace_proto::hyperspace::ReconsolidationRequest {
200 collection,
201 target_vector,
202 learning_rate,
203 };
204 let resp = self.inner.trigger_reconsolidation(req).await?;
205 Ok(resp.into_inner().status)
206 }
207
208 pub async fn insert(
213 &mut self,
214 id: u32,
215 vector: Vec<f64>,
216 metadata: std::collections::HashMap<String, String>,
217 collection: Option<String>,
218 ) -> Result<bool, tonic::Status> {
219 let req = InsertRequest {
220 id,
221 vector,
222 metadata,
223 typed_metadata: std::collections::HashMap::new(),
224 collection: collection.unwrap_or_default(),
225 origin_node_id: String::new(),
226 logical_clock: 0,
227 durability: 0,
228 };
229 let resp = self.inner.insert(req).await?;
230 Ok(resp.into_inner().success)
231 }
232
233 pub async fn insert_f32(
238 &mut self,
239 id: u32,
240 vector: &[f32],
241 metadata: std::collections::HashMap<String, String>,
242 collection: Option<String>,
243 ) -> Result<bool, tonic::Status> {
244 self.insert(id, Self::vec_f32_to_f64(vector), metadata, collection)
245 .await
246 }
247
248 pub async fn insert_text(
253 &mut self,
254 id: u32,
255 text: String,
256 metadata: std::collections::HashMap<String, String>,
257 collection: Option<String>,
258 ) -> Result<bool, tonic::Status> {
259 let req = InsertTextRequest {
260 id,
261 text,
262 metadata,
263 collection: collection.unwrap_or_default(),
264 durability: 0,
265 };
266 let resp = self.inner.insert_text(req).await?;
267 Ok(resp.into_inner().success)
268 }
269
270 pub async fn vectorize(
275 &mut self,
276 text: String,
277 metric: String,
278 ) -> Result<Vec<f64>, tonic::Status> {
279 let req = VectorizeRequest { text, metric };
280 let resp = self.inner.vectorize(req).await?;
281 Ok(resp.into_inner().vector)
282 }
283
284 pub async fn batch_insert(
289 &mut self,
290 items: Vec<(u32, Vec<f64>, std::collections::HashMap<String, String>)>,
291 collection: Option<String>,
292 durability: DurabilityLevel,
293 ) -> Result<bool, tonic::Status> {
294 let vectors = items
295 .into_iter()
296 .map(|(id, vector, metadata)| VectorData {
297 id,
298 vector,
299 metadata,
300 typed_metadata: std::collections::HashMap::new(),
301 })
302 .collect();
303 let req = BatchInsertRequest {
304 collection: collection.unwrap_or_default(),
305 vectors,
306 origin_node_id: String::new(),
307 logical_clock: 0,
308 durability: durability as i32,
309 };
310 let resp = self.inner.batch_insert(req).await?;
311 Ok(resp.into_inner().success)
312 }
313
314 pub async fn batch_insert_f32(
319 &mut self,
320 items: Vec<(u32, Vec<f32>, std::collections::HashMap<String, String>)>,
321 collection: Option<String>,
322 durability: DurabilityLevel,
323 ) -> Result<bool, tonic::Status> {
324 let items_f64 = items
325 .into_iter()
326 .map(|(id, v, m)| (id, Self::vec_f32_to_f64(&v), m))
327 .collect();
328 self.batch_insert(items_f64, collection, durability).await
329 }
330
331 pub async fn search(
336 &mut self,
337 vector: Vec<f64>,
338 top_k: u32,
339 collection: Option<String>,
340 ) -> Result<Vec<SearchResult>, tonic::Status> {
341 let req = SearchRequest {
342 vector,
343 top_k,
344 filter: std::collections::HashMap::default(),
345 filters: vec![],
346 hybrid_query: None,
347 hybrid_alpha: None,
348 use_wasserstein: false,
349 collection: collection.unwrap_or_default(),
350 };
351 let resp = self.inner.search(req).await?;
352 Ok(resp.into_inner().results)
353 }
354
355 pub async fn search_f32(
360 &mut self,
361 vector: &[f32],
362 top_k: u32,
363 collection: Option<String>,
364 ) -> Result<Vec<SearchResult>, tonic::Status> {
365 self.search(Self::vec_f32_to_f64(vector), top_k, collection)
366 .await
367 }
368
369 pub async fn search_text(
374 &mut self,
375 text: String,
376 top_k: u32,
377 collection: Option<String>,
378 ) -> Result<Vec<SearchResult>, tonic::Status> {
379 let req = SearchTextRequest {
380 text,
381 top_k,
382 collection: collection.unwrap_or_default(),
383 filter: std::collections::HashMap::new(),
384 filters: vec![],
385 };
386 let resp = self.inner.search_text(req).await?;
387 Ok(resp.into_inner().results)
388 }
389
390 pub async fn search_wasserstein(
395 &mut self,
396 vector: Vec<f64>,
397 top_k: u32,
398 collection: Option<String>,
399 ) -> Result<Vec<SearchResult>, tonic::Status> {
400 let req = SearchRequest {
401 vector,
402 top_k,
403 filter: std::collections::HashMap::default(),
404 filters: vec![],
405 hybrid_query: None,
406 hybrid_alpha: None,
407 use_wasserstein: true,
408 collection: collection.unwrap_or_default(),
409 };
410 let resp = self.inner.search(req).await?;
411 Ok(resp.into_inner().results)
412 }
413
414 pub async fn search_batch(
419 &mut self,
420 vectors: Vec<Vec<f64>>,
421 top_k: u32,
422 collection: Option<String>,
423 ) -> Result<Vec<Vec<SearchResult>>, tonic::Status> {
424 let collection_name = collection.unwrap_or_default();
425 let searches = vectors
426 .into_iter()
427 .map(|vector| SearchRequest {
428 vector,
429 top_k,
430 filter: std::collections::HashMap::default(),
431 filters: vec![],
432 hybrid_query: None,
433 hybrid_alpha: None,
434 use_wasserstein: false,
435 collection: collection_name.clone(),
436 })
437 .collect();
438
439 let req = BatchSearchRequest { searches };
440 let resp = self.inner.search_batch(req).await?;
441 Ok(resp
442 .into_inner()
443 .responses
444 .into_iter()
445 .map(|SearchResponse { results }| results)
446 .collect())
447 }
448
449 pub async fn search_batch_f32(
454 &mut self,
455 vectors: &[Vec<f32>],
456 top_k: u32,
457 collection: Option<String>,
458 ) -> Result<Vec<Vec<SearchResult>>, tonic::Status> {
459 let vectors_f64 = vectors
460 .iter()
461 .map(|v| Self::vec_f32_to_f64(v))
462 .collect::<Vec<_>>();
463 self.search_batch(vectors_f64, top_k, collection).await
464 }
465
466 pub async fn search_multi_collection(
473 &mut self,
474 vector: Vec<f64>,
475 collections: Vec<String>,
476 top_k: u32,
477 ) -> Result<std::collections::HashMap<String, Vec<SearchResult>>, tonic::Status> {
478 let searches = collections
479 .iter()
480 .map(|col_name| SearchRequest {
481 vector: vector.clone(),
482 top_k,
483 filter: std::collections::HashMap::default(),
484 filters: vec![],
485 hybrid_query: None,
486 hybrid_alpha: None,
487 use_wasserstein: false,
488 collection: col_name.clone(),
489 })
490 .collect();
491
492 let req = BatchSearchRequest { searches };
493 let resp = self.inner.search_batch(req).await?;
494
495 let mut result_map = std::collections::HashMap::new();
496 for (col_name, response) in collections.into_iter().zip(resp.into_inner().responses) {
497 result_map.insert(col_name, response.results);
498 }
499
500 Ok(result_map)
501 }
502
503 pub async fn search_advanced(
508 &mut self,
509 vector: Vec<f64>,
510 top_k: u32,
511 filters: Vec<hyperspace_proto::hyperspace::Filter>,
512 hybrid: Option<(String, f32)>,
513 collection: Option<String>,
514 ) -> Result<Vec<SearchResult>, tonic::Status> {
515 let (hybrid_query, hybrid_alpha) = match hybrid {
516 Some((q, a)) => (Some(q), Some(a)),
517 None => (None, None),
518 };
519
520 let req = SearchRequest {
521 vector,
522 top_k,
523 filter: std::collections::HashMap::default(),
524 filters,
525 hybrid_query,
526 hybrid_alpha,
527 use_wasserstein: false,
528 collection: collection.unwrap_or_default(),
529 };
530 let resp = self.inner.search(req).await?;
531 Ok(resp.into_inner().results)
532 }
533
534 pub async fn delete(
539 &mut self,
540 id: u32,
541 collection: Option<String>,
542 ) -> Result<bool, tonic::Status> {
543 let req = hyperspace_proto::hyperspace::DeleteRequest {
544 id,
545 collection: collection.unwrap_or_default(),
546 };
547 let resp = self.inner.delete(req).await?;
548 Ok(resp.into_inner().success)
549 }
550
551 pub async fn get_node(
556 &mut self,
557 id: u32,
558 layer: u32,
559 collection: Option<String>,
560 ) -> Result<GraphNode, tonic::Status> {
561 let req = GetNodeRequest {
562 collection: collection.unwrap_or_default(),
563 id,
564 layer,
565 };
566 let resp = self.inner.get_node(req).await?;
567 Ok(resp.into_inner())
568 }
569
570 pub async fn get_neighbors(
575 &mut self,
576 id: u32,
577 layer: u32,
578 limit: u32,
579 offset: u32,
580 collection: Option<String>,
581 ) -> Result<GetNeighborsResponse, tonic::Status> {
582 let req = GetNeighborsRequest {
583 collection: collection.unwrap_or_default(),
584 id,
585 layer,
586 limit,
587 offset,
588 };
589 let resp = self.inner.get_neighbors(req).await?;
590 Ok(resp.into_inner())
591 }
592
593 pub async fn get_neighbors_with_weights(
598 &mut self,
599 id: u32,
600 layer: u32,
601 limit: u32,
602 offset: u32,
603 collection: Option<String>,
604 ) -> Result<Vec<(GraphNode, f64)>, tonic::Status> {
605 let resp = self
606 .get_neighbors(id, layer, limit, offset, collection)
607 .await?;
608 let mut out = Vec::with_capacity(resp.neighbors.len());
609 for (idx, node) in resp.neighbors.into_iter().enumerate() {
610 let w = resp.edge_weights.get(idx).copied().unwrap_or_default();
611 out.push((node, w));
612 }
613 Ok(out)
614 }
615
616 pub async fn traverse(
621 &mut self,
622 req: TraverseRequest,
623 ) -> Result<TraverseResponse, tonic::Status> {
624 let resp = self.inner.traverse(req).await?;
625 Ok(resp.into_inner())
626 }
627
628 pub async fn find_semantic_clusters(
633 &mut self,
634 req: FindSemanticClustersRequest,
635 ) -> Result<FindSemanticClustersResponse, tonic::Status> {
636 let resp = self.inner.find_semantic_clusters(req).await?;
637 Ok(resp.into_inner())
638 }
639
640 pub async fn get_concept_parents(
645 &mut self,
646 id: u32,
647 layer: u32,
648 limit: u32,
649 collection: Option<String>,
650 ) -> Result<GetConceptParentsResponse, tonic::Status> {
651 let req = GetConceptParentsRequest {
652 collection: collection.unwrap_or_default(),
653 id,
654 layer,
655 limit,
656 };
657 let resp = self.inner.get_concept_parents(req).await?;
658 Ok(resp.into_inner())
659 }
660
661 pub async fn subscribe_to_events(
666 &mut self,
667 types: Vec<EventType>,
668 collection: Option<String>,
669 ) -> Result<tonic::Streaming<EventMessage>, tonic::Status> {
670 let req = EventSubscriptionRequest {
671 types: types.into_iter().map(|t| t as i32).collect(),
672 collection,
673 };
674 let resp = self.inner.subscribe_to_events(req).await?;
675 Ok(resp.into_inner())
676 }
677
678 pub async fn configure(
683 &mut self,
684 ef_search: Option<u32>,
685 ef_construction: Option<u32>,
686 collection: Option<String>,
687 ) -> Result<String, tonic::Status> {
688 let req = hyperspace_proto::hyperspace::ConfigUpdate {
689 ef_search,
690 ef_construction,
691 collection: collection.unwrap_or_default(),
692 };
693 let resp = self.inner.configure(req).await?;
694 Ok(resp.into_inner().status)
695 }
696
697 pub async fn get_digest(
702 &mut self,
703 collection: Option<String>,
704 ) -> Result<hyperspace_proto::hyperspace::DigestResponse, tonic::Status> {
705 let req = hyperspace_proto::hyperspace::DigestRequest {
706 collection: collection.unwrap_or_default(),
707 };
708 let resp = self.inner.get_digest(req).await?;
709 Ok(resp.into_inner())
710 }
711
712 pub async fn sync_handshake(
717 &mut self,
718 collection: String,
719 client_buckets: Vec<u64>,
720 client_logical_clock: u64,
721 client_count: u64,
722 ) -> Result<hyperspace_proto::hyperspace::SyncHandshakeResponse, tonic::Status> {
723 if client_buckets.len() != 256 {
724 return Err(tonic::Status::invalid_argument(
725 "client_buckets must contain exactly 256 elements",
726 ));
727 }
728 let req = hyperspace_proto::hyperspace::SyncHandshakeRequest {
729 collection,
730 client_buckets,
731 client_logical_clock,
732 client_count,
733 };
734 let resp = self.inner.sync_handshake(req).await?;
735 Ok(resp.into_inner())
736 }
737
738 pub async fn sync_pull(
743 &mut self,
744 collection: String,
745 bucket_indices: Vec<u32>,
746 ) -> Result<tonic::Streaming<hyperspace_proto::hyperspace::SyncVectorData>, tonic::Status> {
747 let req = hyperspace_proto::hyperspace::SyncPullRequest {
748 collection,
749 bucket_indices,
750 };
751 let resp = self.inner.sync_pull(req).await?;
752 Ok(resp.into_inner())
753 }
754
755 pub async fn sync_push(
760 &mut self,
761 stream: impl tonic::IntoStreamingRequest<Message = hyperspace_proto::hyperspace::SyncVectorData>,
762 ) -> Result<hyperspace_proto::hyperspace::SyncPushResponse, tonic::Status> {
763 let resp = self.inner.sync_push(stream).await?;
764 Ok(resp.into_inner())
765 }
766
767 pub async fn search_fuzzy(
773 &mut self,
774 query: &fuzzy::FuzzyQuery,
775 top_k: u32,
776 collection: Option<String>,
777 ) -> Result<Vec<(u32, f32)>, tonic::Status> {
778 let mut vectors = Vec::new();
779 query.extract_vectors(&mut vectors);
780
781 if vectors.is_empty() {
782 return Ok(Vec::new());
783 }
784
785 let batch_results = self.search_batch(vectors, top_k * 5, collection).await?;
787
788 let mut node_dists: std::collections::HashMap<u32, std::collections::HashMap<usize, f32>> =
790 std::collections::HashMap::new();
791 #[allow(clippy::cast_possible_truncation)]
792 for (q_idx, results) in batch_results.iter().enumerate() {
793 for res in results {
794 node_dists
795 .entry(res.id)
796 .or_default()
797 .insert(q_idx, res.distance as f32);
798 }
799 }
800
801 let mut scored_nodes = Vec::with_capacity(node_dists.len());
802
803 for (id, dists) in node_dists {
804 let mut eval_idx: usize = 0;
805 let score = query.evaluate_indexed(&mut eval_idx, &|idx| {
807 *dists.get(&idx).unwrap_or(&(1000.0)) });
809 scored_nodes.push((id, score));
810 }
811
812 scored_nodes.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
813 scored_nodes.truncate(usize::try_from(top_k).unwrap_or(usize::MAX));
814 Ok(scored_nodes)
815 }
816}