Skip to main content

vecgraph_client/
remote.rs

1use crate::RemoteGraphStoreConfig;
2use async_compat::CompatExt;
3use async_trait::async_trait;
4use tonic::transport::{Channel, Endpoint};
5use vecgraph_core::{
6    Edge, EdgeId, EdgeWithVector, GraphStore, Node, NodeId, NodeWithVector, SearchQuery,
7    SearchResult, VecGraphError, VecGraphResult,
8};
9use vecgraph_proto::{
10    DeleteEdgeRequest, DeleteNameMappingRequest, DeleteNodeRequest, GetEdgeRequest,
11    GetEdgeVectorRequest, GetEdgesForNodeRequest, GetEdgesTargetingNodeRequest,
12    GetNameMappingRequest, GetNodeRequest, GetNodeVectorRequest, InsertEdgeRequest,
13    InsertEdgeWithVectorRequest, InsertNodeRequest, InsertNodeWithVectorRequest, SearchRequest,
14    SetNameMappingRequest, graph_store_service_client::GraphStoreServiceClient,
15};
16
17#[derive(Clone)]
18pub struct RemoteGraphStore {
19    client: GraphStoreServiceClient<Channel>,
20}
21
22impl RemoteGraphStore {
23    pub async fn connect(config: RemoteGraphStoreConfig) -> VecGraphResult<Self> {
24        let mut endpoint: Endpoint = Endpoint::from_shared(config.endpoint)
25            .map_err(|e| VecGraphError::Other(format!("invalid endpoint: {}", e)))?;
26
27        if let Some(timeout) = config.connect_timeout {
28            endpoint = endpoint.connect_timeout(timeout);
29        }
30
31        if let Some(timeout) = config.request_timeout {
32            endpoint = endpoint.timeout(timeout);
33        }
34
35        let channel = endpoint
36            .connect()
37            .compat()
38            .await
39            .map_err(|e| VecGraphError::Other(format!("connection failed: {}", e)))?;
40
41        Ok(Self {
42            client: GraphStoreServiceClient::new(channel),
43        })
44    }
45
46    pub async fn connect_lazy(config: RemoteGraphStoreConfig) -> VecGraphResult<Self> {
47        async {
48            let mut endpoint: Endpoint = Endpoint::from_shared(config.endpoint)
49                .map_err(|e| VecGraphError::Other(format!("invalid endpoint: {}", e)))?;
50
51            if let Some(timeout) = config.connect_timeout {
52                endpoint = endpoint.connect_timeout(timeout);
53            }
54
55            if let Some(timeout) = config.request_timeout {
56                endpoint = endpoint.timeout(timeout);
57            }
58
59            let channel = endpoint.connect_lazy();
60
61            Ok(Self {
62                client: GraphStoreServiceClient::new(channel),
63            })
64        }
65        .compat()
66        .await
67    }
68}
69
70#[async_trait]
71impl GraphStore for RemoteGraphStore {
72    async fn insert_node(&self, node: &Node) -> VecGraphResult<()> {
73        let request = InsertNodeRequest {
74            node: Some(
75                node.clone()
76                    .try_into()
77                    .map_err(|e| VecGraphError::Other(format!("invalid node data: {}", e)))?,
78            ),
79        };
80        self.client
81            .clone()
82            .insert_node(request)
83            .compat()
84            .await
85            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
86        Ok(())
87    }
88
89    async fn insert_node_with_vector(&self, node: &NodeWithVector) -> VecGraphResult<()> {
90        let request = InsertNodeWithVectorRequest {
91            node: Some(
92                node.clone()
93                    .try_into()
94                    .map_err(|e| VecGraphError::Other(format!("invalid node data: {}", e)))?,
95            ),
96        };
97        self.client
98            .clone()
99            .insert_node_with_vector(request)
100            .compat()
101            .await
102            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
103        Ok(())
104    }
105
106    async fn get_node(&self, node: &NodeId) -> VecGraphResult<Option<Node>> {
107        let request = GetNodeRequest {
108            id: node.to_string(),
109        };
110        let response = self
111            .client
112            .clone()
113            .get_node(request)
114            .compat()
115            .await
116            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
117
118        if let Some(node) = response.into_inner().node {
119            let node: Node = node
120                .try_into()
121                .map_err(|e| VecGraphError::Other(format!("invalid node data: {}", e)))?;
122            Ok(Some(node))
123        } else {
124            Ok(None)
125        }
126    }
127
128    async fn get_node_vector(&self, node: &NodeId) -> VecGraphResult<Option<Vec<f32>>> {
129        let request = GetNodeVectorRequest {
130            id: node.to_string(),
131        };
132        let response = self
133            .client
134            .clone()
135            .get_node_vector(request)
136            .compat()
137            .await
138            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
139
140        if let Some(vector) = response.into_inner().vector {
141            Ok(Some(vector.values))
142        } else {
143            Ok(None)
144        }
145    }
146
147    async fn delete_node(&self, node: &NodeId) -> VecGraphResult<()> {
148        let request = DeleteNodeRequest {
149            id: node.to_string(),
150        };
151        self.client
152            .clone()
153            .delete_node(request)
154            .compat()
155            .await
156            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
157        Ok(())
158    }
159
160    async fn insert_edge(&self, edge: &Edge) -> VecGraphResult<()> {
161        let request = InsertEdgeRequest {
162            edge: Some(
163                edge.clone()
164                    .try_into()
165                    .map_err(|e| VecGraphError::Other(format!("invalid edge data: {}", e)))?,
166            ),
167        };
168        self.client
169            .clone()
170            .insert_edge(request)
171            .compat()
172            .await
173            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
174        Ok(())
175    }
176
177    async fn insert_edge_with_vector(&self, edge: &EdgeWithVector) -> VecGraphResult<()> {
178        let request = InsertEdgeWithVectorRequest {
179            edge: Some(
180                edge.clone()
181                    .try_into()
182                    .map_err(|e| VecGraphError::Other(format!("invalid edge data: {}", e)))?,
183            ),
184        };
185        self.client
186            .clone()
187            .insert_edge_with_vector(request)
188            .compat()
189            .await
190            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
191        Ok(())
192    }
193
194    async fn get_edge(&self, edge: &EdgeId) -> VecGraphResult<Option<Edge>> {
195        let request = GetEdgeRequest {
196            id: edge.to_string(),
197        };
198        let response = self
199            .client
200            .clone()
201            .get_edge(request)
202            .compat()
203            .await
204            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
205
206        if let Some(edge) = response.into_inner().edge {
207            let edge: Edge = edge
208                .try_into()
209                .map_err(|e| VecGraphError::Other(format!("invalid edge data: {}", e)))?;
210            Ok(Some(edge))
211        } else {
212            Ok(None)
213        }
214    }
215
216    async fn get_edges_for_node(&self, node_id: &NodeId) -> VecGraphResult<Vec<Edge>> {
217        let request = GetEdgesForNodeRequest {
218            node_id: node_id.to_string(),
219        };
220        let response = self
221            .client
222            .clone()
223            .get_edges_for_node(request)
224            .compat()
225            .await
226            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
227
228        let edges = response
229            .into_inner()
230            .edges
231            .into_iter()
232            .filter_map(|edge| {
233                edge.try_into()
234                    .map_err(|e| {
235                        VecGraphError::Other(format!("invalid edge data in response: {}", e))
236                    })
237                    .ok()
238            })
239            .collect();
240
241        Ok(edges)
242    }
243
244    async fn get_edges_targeting_node(&self, node_id: &NodeId) -> VecGraphResult<Vec<Edge>> {
245        let request = GetEdgesTargetingNodeRequest {
246            node_id: node_id.to_string(),
247        };
248        let response = self
249            .client
250            .clone()
251            .get_edges_targeting_node(request)
252            .compat()
253            .await
254            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
255
256        let edges = response
257            .into_inner()
258            .edges
259            .into_iter()
260            .filter_map(|edge| {
261                edge.try_into()
262                    .map_err(|e| {
263                        VecGraphError::Other(format!("invalid edge data in response: {}", e))
264                    })
265                    .ok()
266            })
267            .collect();
268
269        Ok(edges)
270    }
271
272    async fn delete_edge(&self, edge: &EdgeId) -> VecGraphResult<()> {
273        let request = DeleteEdgeRequest {
274            id: edge.to_string(),
275        };
276        self.client
277            .clone()
278            .delete_edge(request)
279            .compat()
280            .await
281            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
282        Ok(())
283    }
284
285    async fn get_edge_vector(&self, edge: &EdgeId) -> VecGraphResult<Option<Vec<f32>>> {
286        let request = GetEdgeVectorRequest {
287            id: edge.to_string(),
288        };
289        let response = self
290            .client
291            .clone()
292            .get_edge_vector(request)
293            .compat()
294            .await
295            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
296
297        if let Some(vector) = response.into_inner().vector {
298            Ok(Some(vector.values))
299        } else {
300            Ok(None)
301        }
302    }
303
304    async fn set_name_mapping(
305        &self,
306        kind: &str,
307        name: &str,
308        node_id: &NodeId,
309    ) -> VecGraphResult<()> {
310        let request = SetNameMappingRequest {
311            kind: kind.to_string(),
312            name: name.to_string(),
313            node_id: node_id.to_string(),
314        };
315        self.client
316            .clone()
317            .set_name_mapping(request)
318            .compat()
319            .await
320            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
321        Ok(())
322    }
323
324    async fn get_name_mapping(&self, kind: &str, name: &str) -> VecGraphResult<Option<NodeId>> {
325        let request = GetNameMappingRequest {
326            kind: kind.to_string(),
327            name: name.to_string(),
328        };
329        let response = self
330            .client
331            .clone()
332            .get_name_mapping(request)
333            .compat()
334            .await
335            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
336
337        if let Some(node_id) = response.into_inner().node_id {
338            Ok(Some(NodeId(node_id)))
339        } else {
340            Ok(None)
341        }
342    }
343
344    async fn delete_name_mapping(&self, kind: &str, name: &str) -> VecGraphResult<()> {
345        let request = DeleteNameMappingRequest {
346            kind: kind.to_string(),
347            name: name.to_string(),
348        };
349        self.client
350            .clone()
351            .delete_name_mapping(request)
352            .compat()
353            .await
354            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
355        Ok(())
356    }
357
358    async fn search(&self, query: &SearchQuery) -> VecGraphResult<Vec<SearchResult>> {
359        let request = SearchRequest {
360            query: Some(
361                query
362                    .clone()
363                    .try_into()
364                    .map_err(|e| VecGraphError::Other(format!("invalid search query: {}", e)))?,
365            ),
366        };
367        let response = self
368            .client
369            .clone()
370            .search(request)
371            .compat()
372            .await
373            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
374
375        let results = response
376            .into_inner()
377            .results
378            .into_iter()
379            .filter_map(|result| {
380                result
381                    .try_into()
382                    .map_err(|e| {
383                        VecGraphError::Other(format!(
384                            "invalid search result data in response: {}",
385                            e
386                        ))
387                    })
388                    .ok()
389            })
390            .collect();
391
392        Ok(results)
393    }
394}