Skip to main content

hyperspace_sdk/
lib.rs

1pub use hyperspace_proto::hyperspace::database_client::DatabaseClient;
2pub use hyperspace_proto::hyperspace::{
3    BatchInsertRequest, BatchSearchRequest, CollectionSummary, DurabilityLevel, EventMessage,
4    EventSubscriptionRequest, EventType, FindSemanticClustersRequest, FindSemanticClustersResponse,
5    GetConceptParentsRequest, GetConceptParentsResponse, GetNeighborsRequest, GetNeighborsResponse,
6    GetNodeRequest, GraphNode, InsertRequest, InsertTextRequest, SearchRequest, SearchResponse,
7    SearchResult, SearchResult as ResultItem, SearchTextRequest, TraverseRequest, TraverseResponse,
8    VectorData, VectorizeRequest, VectorizeResponse,
9};
10use tonic::codegen::InterceptedService;
11use tonic::service::Interceptor;
12use tonic::transport::Channel;
13use tonic::{Request, Status};
14
15pub mod fuzzy;
16pub mod gromov;
17pub mod math;
18
19#[cfg(feature = "embedders")]
20mod embedder;
21#[cfg(feature = "embedders")]
22pub use embedder::*;
23
24#[derive(Clone)]
25pub struct AuthInterceptor {
26    api_key: Option<String>,
27    user_id: Option<String>,
28}
29
30impl Interceptor for AuthInterceptor {
31    fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
32        if let Some(key) = &self.api_key {
33            let token = key
34                .parse()
35                .map_err(|_| Status::invalid_argument("Invalid API Key format"))?;
36            request.metadata_mut().insert("x-api-key", token);
37        }
38        if let Some(uid) = &self.user_id {
39            let token = uid
40                .parse()
41                .map_err(|_| Status::invalid_argument("Invalid User ID format"))?;
42            request.metadata_mut().insert("x-hyperspace-user-id", token);
43        }
44        Ok(request)
45    }
46}
47
48pub struct Client {
49    inner: DatabaseClient<InterceptedService<Channel, AuthInterceptor>>,
50    #[cfg(feature = "embedders")]
51    embedder: Option<Box<dyn Embedder>>,
52}
53
54impl Client {
55    #[inline]
56    fn vec_f32_to_f64(vector: &[f32]) -> Vec<f64> {
57        vector.iter().map(|&x| f64::from(x)).collect()
58    }
59
60    /// Connects to the `HyperspaceDB` server.
61    ///
62    /// # Errors
63    /// Returns error if connection fails.
64    pub async fn connect(
65        dst: String,
66        api_key: Option<String>,
67        user_id: Option<String>,
68    ) -> Result<Self, Box<dyn std::error::Error>> {
69        let channel = Channel::from_shared(dst)?
70            .tcp_keepalive(Some(std::time::Duration::from_secs(30)))
71            .tcp_nodelay(true)
72            .keep_alive_while_idle(true)
73            .connect_timeout(std::time::Duration::from_secs(10))
74            .connect()
75            .await?;
76
77        let interceptor = AuthInterceptor { api_key, user_id };
78        let client = DatabaseClient::with_interceptor(channel, interceptor)
79            .max_decoding_message_size(64 * 1024 * 1024) // 64MB
80            .max_encoding_message_size(64 * 1024 * 1024); // 64MB
81
82        Ok(Self {
83            inner: client,
84            #[cfg(feature = "embedders")]
85            embedder: None,
86        })
87    }
88
89    #[cfg(feature = "embedders")]
90    pub fn set_embedder(&mut self, embedder: Box<dyn Embedder>) {
91        self.embedder = Some(embedder);
92    }
93
94    /// Creates a new collection.
95    ///
96    /// # Errors
97    /// Returns error if the collection already exists or if network fails.
98    pub async fn create_collection(
99        &mut self,
100        name: String,
101        dimension: u32,
102        metric: String,
103    ) -> Result<String, tonic::Status> {
104        let req = hyperspace_proto::hyperspace::CreateCollectionRequest {
105            name,
106            dimension,
107            metric,
108        };
109        let resp = self.inner.create_collection(req).await?;
110        Ok(resp.into_inner().status)
111    }
112
113    /// Deletes a collection.
114    ///
115    /// # Errors
116    /// Returns error if the collection does not exist cancellation.
117    pub async fn delete_collection(&mut self, name: String) -> Result<String, tonic::Status> {
118        let req = hyperspace_proto::hyperspace::DeleteCollectionRequest { name };
119        let resp = self.inner.delete_collection(req).await?;
120        Ok(resp.into_inner().status)
121    }
122
123    /// Lists all collections with their metadata (dimension, metric, count).
124    ///
125    /// # Errors
126    /// Returns error on network failure.
127    pub async fn list_collections(&mut self) -> Result<Vec<CollectionSummary>, tonic::Status> {
128        let req = hyperspace_proto::hyperspace::Empty {};
129        let resp = self.inner.list_collections(req).await?;
130        Ok(resp.into_inner().collections)
131    }
132
133
134    /// Gets statistics for a collection.
135    ///
136    /// # Errors
137    /// Returns error if the collection does not exist or network fails.
138    pub async fn get_collection_stats(
139        &mut self,
140        name: String,
141    ) -> Result<hyperspace_proto::hyperspace::CollectionStatsResponse, tonic::Status> {
142        let req = hyperspace_proto::hyperspace::CollectionStatsRequest { name };
143        let resp = self.inner.get_collection_stats(req).await?;
144        Ok(resp.into_inner())
145    }
146
147    /// Rebuilds the index for a collection. This is a resource-intensive operation.
148    ///
149    /// # Errors
150    /// Returns error if the collection does not exist or operation fails.
151    pub async fn rebuild_index(&mut self, name: String) -> Result<String, tonic::Status> {
152        let req = hyperspace_proto::hyperspace::RebuildIndexRequest {
153            name,
154            filter_query: None,
155        };
156        let resp = self.inner.rebuild_index(req).await?;
157        Ok(resp.into_inner().status)
158    }
159
160    /// Rebuilds index with optional metadata-based pruning filter.
161    ///
162    /// # Errors
163    /// Returns error if operation fails.
164    pub async fn rebuild_index_with_filter(
165        &mut self,
166        name: String,
167        key: String,
168        op: String,
169        value: f64,
170    ) -> Result<String, tonic::Status> {
171        let req = hyperspace_proto::hyperspace::RebuildIndexRequest {
172            name,
173            filter_query: Some(hyperspace_proto::hyperspace::VacuumFilterQuery { key, op, value }),
174        };
175        let resp = self.inner.rebuild_index(req).await?;
176        Ok(resp.into_inner().status)
177    }
178
179    /// Triggers memory cleanup (Vacuum).
180    ///
181    /// # Errors
182    /// Returns error if the operation fails.
183    pub async fn trigger_vacuum(&mut self) -> Result<String, tonic::Status> {
184        let req = hyperspace_proto::hyperspace::Empty {};
185        let resp = self.inner.trigger_vacuum(req).await?;
186        Ok(resp.into_inner().status)
187    }
188
189    /// Triggers background memory reconsolidation (AI Sleep Mode / Flow Matching SGD).
190    ///
191    /// # Errors
192    /// Returns error if the operation fails.
193    pub async fn trigger_reconsolidation(
194        &mut self,
195        collection: String,
196        target_vector: Vec<f64>,
197        learning_rate: f64,
198    ) -> Result<String, tonic::Status> {
199        let req = hyperspace_proto::hyperspace::ReconsolidationRequest {
200            collection,
201            target_vector,
202            learning_rate,
203        };
204        let resp = self.inner.trigger_reconsolidation(req).await?;
205        Ok(resp.into_inner().status)
206    }
207
208    /// Inserts a vector into the collection.
209    ///
210    /// # Errors
211    /// Returns error if insertion fails.
212    pub async fn insert(
213        &mut self,
214        id: u32,
215        vector: Vec<f64>,
216        metadata: std::collections::HashMap<String, String>,
217        collection: Option<String>,
218    ) -> Result<bool, tonic::Status> {
219        let req = InsertRequest {
220            id,
221            vector,
222            metadata,
223            typed_metadata: std::collections::HashMap::new(),
224            collection: collection.unwrap_or_default(),
225            origin_node_id: String::new(),
226            logical_clock: 0,
227            durability: 0,
228        };
229        let resp = self.inner.insert(req).await?;
230        Ok(resp.into_inner().success)
231    }
232
233    /// Inserts a vector from f32 input (client-side conversion to protocol f64).
234    ///
235    /// # Errors
236    /// Returns error if insertion fails.
237    pub async fn insert_f32(
238        &mut self,
239        id: u32,
240        vector: &[f32],
241        metadata: std::collections::HashMap<String, String>,
242        collection: Option<String>,
243    ) -> Result<bool, tonic::Status> {
244        self.insert(id, Self::vec_f32_to_f64(vector), metadata, collection)
245            .await
246    }
247
248    /// Inserts text that will be vectorized on the server side.
249    ///
250    /// # Errors
251    /// Returns error if insertion/vectorization fails.
252    pub async fn insert_text(
253        &mut self,
254        id: u32,
255        text: String,
256        metadata: std::collections::HashMap<String, String>,
257        collection: Option<String>,
258    ) -> Result<bool, tonic::Status> {
259        let req = InsertTextRequest {
260            id,
261            text,
262            metadata,
263            collection: collection.unwrap_or_default(),
264            durability: 0,
265        };
266        let resp = self.inner.insert_text(req).await?;
267        Ok(resp.into_inner().success)
268    }
269
270    /// Vectoize text using the server-side embedding engine.
271    ///
272    /// # Errors
273    /// Returns error if vectorization fails.
274    pub async fn vectorize(
275        &mut self,
276        text: String,
277        metric: String,
278    ) -> Result<Vec<f64>, tonic::Status> {
279        let req = VectorizeRequest { text, metric };
280        let resp = self.inner.vectorize(req).await?;
281        Ok(resp.into_inner().vector)
282    }
283
284    /// Batch inserts multiple vectors.
285    ///
286    /// # Errors
287    /// Returns error if insertion fails.
288    pub async fn batch_insert(
289        &mut self,
290        items: Vec<(u32, Vec<f64>, std::collections::HashMap<String, String>)>,
291        collection: Option<String>,
292        durability: DurabilityLevel,
293    ) -> Result<bool, tonic::Status> {
294        let vectors = items
295            .into_iter()
296            .map(|(id, vector, metadata)| VectorData {
297                id,
298                vector,
299                metadata,
300                typed_metadata: std::collections::HashMap::new(),
301            })
302            .collect();
303        let req = BatchInsertRequest {
304            collection: collection.unwrap_or_default(),
305            vectors,
306            origin_node_id: String::new(),
307            logical_clock: 0,
308            durability: durability as i32,
309        };
310        let resp = self.inner.batch_insert(req).await?;
311        Ok(resp.into_inner().success)
312    }
313
314    /// Batch inserts multiple vectors from f32 input.
315    ///
316    /// # Errors
317    /// Returns error if insertion fails.
318    pub async fn batch_insert_f32(
319        &mut self,
320        items: Vec<(u32, Vec<f32>, std::collections::HashMap<String, String>)>,
321        collection: Option<String>,
322        durability: DurabilityLevel,
323    ) -> Result<bool, tonic::Status> {
324        let items_f64 = items
325            .into_iter()
326            .map(|(id, v, m)| (id, Self::vec_f32_to_f64(&v), m))
327            .collect();
328        self.batch_insert(items_f64, collection, durability).await
329    }
330
331    /// Searches for nearest neighbors.
332    ///
333    /// # Errors
334    /// Returns error if search fails.
335    pub async fn search(
336        &mut self,
337        vector: Vec<f64>,
338        top_k: u32,
339        collection: Option<String>,
340    ) -> Result<Vec<SearchResult>, tonic::Status> {
341        let req = SearchRequest {
342            vector,
343            top_k,
344            filter: std::collections::HashMap::default(),
345            filters: vec![],
346            hybrid_query: None,
347            hybrid_alpha: None,
348            use_wasserstein: false,
349            collection: collection.unwrap_or_default(),
350        };
351        let resp = self.inner.search(req).await?;
352        Ok(resp.into_inner().results)
353    }
354
355    /// Searches using f32 query vector (converted to protocol f64 once).
356    ///
357    /// # Errors
358    /// Returns error if search fails.
359    pub async fn search_f32(
360        &mut self,
361        vector: &[f32],
362        top_k: u32,
363        collection: Option<String>,
364    ) -> Result<Vec<SearchResult>, tonic::Status> {
365        self.search(Self::vec_f32_to_f64(vector), top_k, collection)
366            .await
367    }
368
369    /// Searches using text that will be vectorized on the server side.
370    ///
371    /// # Errors
372    /// Returns error if search fails.
373    pub async fn search_text(
374        &mut self,
375        text: String,
376        top_k: u32,
377        collection: Option<String>,
378    ) -> Result<Vec<SearchResult>, tonic::Status> {
379        let req = SearchTextRequest {
380            text,
381            top_k,
382            collection: collection.unwrap_or_default(),
383            filter: std::collections::HashMap::new(),
384            filters: vec![],
385        };
386        let resp = self.inner.search_text(req).await?;
387        Ok(resp.into_inner().results)
388    }
389
390    /// Performs search utilizing the Wasserstein distance (Cross-Feature Matching Metric).
391    ///
392    /// # Errors
393    /// Returns error if search fails.
394    pub async fn search_wasserstein(
395        &mut self,
396        vector: Vec<f64>,
397        top_k: u32,
398        collection: Option<String>,
399    ) -> Result<Vec<SearchResult>, tonic::Status> {
400        let req = SearchRequest {
401            vector,
402            top_k,
403            filter: std::collections::HashMap::default(),
404            filters: vec![],
405            hybrid_query: None,
406            hybrid_alpha: None,
407            use_wasserstein: true,
408            collection: collection.unwrap_or_default(),
409        };
410        let resp = self.inner.search(req).await?;
411        Ok(resp.into_inner().results)
412    }
413
414    /// Batch search for multiple vectors in a single RPC.
415    ///
416    /// # Errors
417    /// Returns error if the batch search fails.
418    pub async fn search_batch(
419        &mut self,
420        vectors: Vec<Vec<f64>>,
421        top_k: u32,
422        collection: Option<String>,
423    ) -> Result<Vec<Vec<SearchResult>>, tonic::Status> {
424        let collection_name = collection.unwrap_or_default();
425        let searches = vectors
426            .into_iter()
427            .map(|vector| SearchRequest {
428                vector,
429                top_k,
430                filter: std::collections::HashMap::default(),
431                filters: vec![],
432                hybrid_query: None,
433                hybrid_alpha: None,
434                use_wasserstein: false,
435                collection: collection_name.clone(),
436            })
437            .collect();
438
439        let req = BatchSearchRequest { searches };
440        let resp = self.inner.search_batch(req).await?;
441        Ok(resp
442            .into_inner()
443            .responses
444            .into_iter()
445            .map(|SearchResponse { results }| results)
446            .collect())
447    }
448
449    /// Batch search from f32 vectors (converted to protocol f64 once).
450    ///
451    /// # Errors
452    /// Returns error if the batch search fails.
453    pub async fn search_batch_f32(
454        &mut self,
455        vectors: &[Vec<f32>],
456        top_k: u32,
457        collection: Option<String>,
458    ) -> Result<Vec<Vec<SearchResult>>, tonic::Status> {
459        let vectors_f64 = vectors
460            .iter()
461            .map(|v| Self::vec_f32_to_f64(v))
462            .collect::<Vec<_>>();
463        self.search_batch(vectors_f64, top_k, collection).await
464    }
465
466    /// Multi-Geometry Benchmark Endpoint (10.3)
467    /// Performs identical searches across multiple collections in parallel (via Batch Search).
468    /// Typically used by the Frontend to compare L2, Cosine, Poincare and Lorentz results directly.
469    ///
470    /// # Errors
471    /// Returns error if the batch search RPC fails.
472    pub async fn search_multi_collection(
473        &mut self,
474        vector: Vec<f64>,
475        collections: Vec<String>,
476        top_k: u32,
477    ) -> Result<std::collections::HashMap<String, Vec<SearchResult>>, tonic::Status> {
478        let searches = collections
479            .iter()
480            .map(|col_name| SearchRequest {
481                vector: vector.clone(),
482                top_k,
483                filter: std::collections::HashMap::default(),
484                filters: vec![],
485                hybrid_query: None,
486                hybrid_alpha: None,
487                use_wasserstein: false,
488                collection: col_name.clone(),
489            })
490            .collect();
491
492        let req = BatchSearchRequest { searches };
493        let resp = self.inner.search_batch(req).await?;
494
495        let mut result_map = std::collections::HashMap::new();
496        for (col_name, response) in collections.into_iter().zip(resp.into_inner().responses) {
497            result_map.insert(col_name, response.results);
498        }
499
500        Ok(result_map)
501    }
502
503    /// Advanced search with filters and hybrid query.
504    ///
505    /// # Errors
506    /// Returns error if search fails.
507    pub async fn search_advanced(
508        &mut self,
509        vector: Vec<f64>,
510        top_k: u32,
511        filters: Vec<hyperspace_proto::hyperspace::Filter>,
512        hybrid: Option<(String, f32)>,
513        collection: Option<String>,
514    ) -> Result<Vec<SearchResult>, tonic::Status> {
515        let (hybrid_query, hybrid_alpha) = match hybrid {
516            Some((q, a)) => (Some(q), Some(a)),
517            None => (None, None),
518        };
519
520        let req = SearchRequest {
521            vector,
522            top_k,
523            filter: std::collections::HashMap::default(),
524            filters,
525            hybrid_query,
526            hybrid_alpha,
527            use_wasserstein: false,
528            collection: collection.unwrap_or_default(),
529        };
530        let resp = self.inner.search(req).await?;
531        Ok(resp.into_inner().results)
532    }
533
534    /// Deletes a vector by ID.
535    ///
536    /// # Errors
537    /// Returns error if deletion fails.
538    pub async fn delete(
539        &mut self,
540        id: u32,
541        collection: Option<String>,
542    ) -> Result<bool, tonic::Status> {
543        let req = hyperspace_proto::hyperspace::DeleteRequest {
544            id,
545            collection: collection.unwrap_or_default(),
546        };
547        let resp = self.inner.delete(req).await?;
548        Ok(resp.into_inner().success)
549    }
550
551    /// Returns a graph node with adjacency on a specific layer.
552    ///
553    /// # Errors
554    /// Returns error if request fails.
555    pub async fn get_node(
556        &mut self,
557        id: u32,
558        layer: u32,
559        collection: Option<String>,
560    ) -> Result<GraphNode, tonic::Status> {
561        let req = GetNodeRequest {
562            collection: collection.unwrap_or_default(),
563            id,
564            layer,
565        };
566        let resp = self.inner.get_node(req).await?;
567        Ok(resp.into_inner())
568    }
569
570    /// Returns neighbors for a node with pagination.
571    ///
572    /// # Errors
573    /// Returns error if request fails.
574    pub async fn get_neighbors(
575        &mut self,
576        id: u32,
577        layer: u32,
578        limit: u32,
579        offset: u32,
580        collection: Option<String>,
581    ) -> Result<GetNeighborsResponse, tonic::Status> {
582        let req = GetNeighborsRequest {
583            collection: collection.unwrap_or_default(),
584            id,
585            layer,
586            limit,
587            offset,
588        };
589        let resp = self.inner.get_neighbors(req).await?;
590        Ok(resp.into_inner())
591    }
592
593    /// Returns neighbors with aligned edge weights (distance to source).
594    ///
595    /// # Errors
596    /// Returns error if request fails.
597    pub async fn get_neighbors_with_weights(
598        &mut self,
599        id: u32,
600        layer: u32,
601        limit: u32,
602        offset: u32,
603        collection: Option<String>,
604    ) -> Result<Vec<(GraphNode, f64)>, tonic::Status> {
605        let resp = self
606            .get_neighbors(id, layer, limit, offset, collection)
607            .await?;
608        let mut out = Vec::with_capacity(resp.neighbors.len());
609        for (idx, node) in resp.neighbors.into_iter().enumerate() {
610            let w = resp.edge_weights.get(idx).copied().unwrap_or_default();
611            out.push((node, w));
612        }
613        Ok(out)
614    }
615
616    /// Traverses graph from a start node with depth and node guards.
617    ///
618    /// # Errors
619    /// Returns error if request fails.
620    pub async fn traverse(
621        &mut self,
622        req: TraverseRequest,
623    ) -> Result<TraverseResponse, tonic::Status> {
624        let resp = self.inner.traverse(req).await?;
625        Ok(resp.into_inner())
626    }
627
628    /// Finds connected components as semantic clusters.
629    ///
630    /// # Errors
631    /// Returns error if request fails.
632    pub async fn find_semantic_clusters(
633        &mut self,
634        req: FindSemanticClustersRequest,
635    ) -> Result<FindSemanticClustersResponse, tonic::Status> {
636        let resp = self.inner.find_semantic_clusters(req).await?;
637        Ok(resp.into_inner())
638    }
639
640    /// Returns parent-like neighbors for concept-style traversals.
641    ///
642    /// # Errors
643    /// Returns error if request fails.
644    pub async fn get_concept_parents(
645        &mut self,
646        id: u32,
647        layer: u32,
648        limit: u32,
649        collection: Option<String>,
650    ) -> Result<GetConceptParentsResponse, tonic::Status> {
651        let req = GetConceptParentsRequest {
652            collection: collection.unwrap_or_default(),
653            id,
654            layer,
655            limit,
656        };
657        let resp = self.inner.get_concept_parents(req).await?;
658        Ok(resp.into_inner())
659    }
660
661    /// Subscribes to CDC event stream (`VectorInserted`/`VectorDeleted`).
662    ///
663    /// # Errors
664    /// Returns error if stream initialization fails.
665    pub async fn subscribe_to_events(
666        &mut self,
667        types: Vec<EventType>,
668        collection: Option<String>,
669    ) -> Result<tonic::Streaming<EventMessage>, tonic::Status> {
670        let req = EventSubscriptionRequest {
671            types: types.into_iter().map(|t| t as i32).collect(),
672            collection,
673        };
674        let resp = self.inner.subscribe_to_events(req).await?;
675        Ok(resp.into_inner())
676    }
677
678    /// Configures collection parameters.
679    ///
680    /// # Errors
681    /// Returns error if configuration fails.
682    pub async fn configure(
683        &mut self,
684        ef_search: Option<u32>,
685        ef_construction: Option<u32>,
686        collection: Option<String>,
687    ) -> Result<String, tonic::Status> {
688        let req = hyperspace_proto::hyperspace::ConfigUpdate {
689            ef_search,
690            ef_construction,
691            collection: collection.unwrap_or_default(),
692        };
693        let resp = self.inner.configure(req).await?;
694        Ok(resp.into_inner().status)
695    }
696
697    /// Gets collection digest (hash and count).
698    ///
699    /// # Errors
700    /// Returns error if retrieval fails.
701    pub async fn get_digest(
702        &mut self,
703        collection: Option<String>,
704    ) -> Result<hyperspace_proto::hyperspace::DigestResponse, tonic::Status> {
705        let req = hyperspace_proto::hyperspace::DigestRequest {
706            collection: collection.unwrap_or_default(),
707        };
708        let resp = self.inner.get_digest(req).await?;
709        Ok(resp.into_inner())
710    }
711
712    /// Performs Delta Sync Handshake with the server.
713    ///
714    /// # Errors
715    /// Returns error on network failure.
716    pub async fn sync_handshake(
717        &mut self,
718        collection: String,
719        client_buckets: Vec<u64>,
720        client_logical_clock: u64,
721        client_count: u64,
722    ) -> Result<hyperspace_proto::hyperspace::SyncHandshakeResponse, tonic::Status> {
723        if client_buckets.len() != 256 {
724            return Err(tonic::Status::invalid_argument(
725                "client_buckets must contain exactly 256 elements",
726            ));
727        }
728        let req = hyperspace_proto::hyperspace::SyncHandshakeRequest {
729            collection,
730            client_buckets,
731            client_logical_clock,
732            client_count,
733        };
734        let resp = self.inner.sync_handshake(req).await?;
735        Ok(resp.into_inner())
736    }
737
738    /// Pulls differing subset of vectors from server bucket indices.
739    ///
740    /// # Errors
741    /// Returns error if stream initialization fails.
742    pub async fn sync_pull(
743        &mut self,
744        collection: String,
745        bucket_indices: Vec<u32>,
746    ) -> Result<tonic::Streaming<hyperspace_proto::hyperspace::SyncVectorData>, tonic::Status> {
747        let req = hyperspace_proto::hyperspace::SyncPullRequest {
748            collection,
749            bucket_indices,
750        };
751        let resp = self.inner.sync_pull(req).await?;
752        Ok(resp.into_inner())
753    }
754
755    /// Pushes offline delta back to the server.
756    ///
757    /// # Errors
758    /// Returns error if push stream initialization fails.
759    pub async fn sync_push(
760        &mut self,
761        stream: impl tonic::IntoStreamingRequest<Message = hyperspace_proto::hyperspace::SyncVectorData>,
762    ) -> Result<hyperspace_proto::hyperspace::SyncPushResponse, tonic::Status> {
763        let resp = self.inner.sync_push(stream).await?;
764        Ok(resp.into_inner())
765    }
766
767    /// Evaluates a fuzzy logic search query locally by issuing batch search RPCs
768    /// and scoring candidates with TNorms/TConorms.
769    ///
770    /// # Errors
771    /// Returns error if the batch search fails.
772    pub async fn search_fuzzy(
773        &mut self,
774        query: &fuzzy::FuzzyQuery,
775        top_k: u32,
776        collection: Option<String>,
777    ) -> Result<Vec<(u32, f32)>, tonic::Status> {
778        let mut vectors = Vec::new();
779        query.extract_vectors(&mut vectors);
780
781        if vectors.is_empty() {
782            return Ok(Vec::new());
783        }
784
785        // Search with an oversample factor to try to collect all relevant overlapping candidates
786        let batch_results = self.search_batch(vectors, top_k * 5, collection).await?;
787
788        // Extract distances into a structured map: NodeId -> HashMap<QueryIdx, Distance>
789        let mut node_dists: std::collections::HashMap<u32, std::collections::HashMap<usize, f32>> =
790            std::collections::HashMap::new();
791        #[allow(clippy::cast_possible_truncation)]
792        for (q_idx, results) in batch_results.iter().enumerate() {
793            for res in results {
794                node_dists
795                    .entry(res.id)
796                    .or_default()
797                    .insert(q_idx, res.distance as f32);
798            }
799        }
800
801        let mut scored_nodes = Vec::with_capacity(node_dists.len());
802
803        for (id, dists) in node_dists {
804            let mut eval_idx: usize = 0;
805            // Unretrieved elements are assumed infinitely far (producing minimum membership)
806            let score = query.evaluate_indexed(&mut eval_idx, &|idx| {
807                *dists.get(&idx).unwrap_or(&(1000.0)) // very large distance fallback
808            });
809            scored_nodes.push((id, score));
810        }
811
812        scored_nodes.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
813        scored_nodes.truncate(usize::try_from(top_k).unwrap_or(usize::MAX));
814        Ok(scored_nodes)
815    }
816}