1use crate::RemoteGraphStoreConfig;
2use async_compat::CompatExt;
3use async_trait::async_trait;
4use tonic::transport::{Channel, Endpoint};
5use vecgraph_core::{
6 Edge, EdgeId, EdgeWithVector, GraphStore, Node, NodeId, NodeWithVector, SearchQuery,
7 SearchResult, VecGraphError, VecGraphResult,
8};
9use vecgraph_proto::{
10 DeleteEdgeRequest, DeleteNameMappingRequest, DeleteNodeRequest, GetEdgeRequest,
11 GetEdgeVectorRequest, GetEdgesForNodeRequest, GetEdgesTargetingNodeRequest,
12 GetNameMappingRequest, GetNodeRequest, GetNodeVectorRequest, InsertEdgeRequest,
13 InsertEdgeWithVectorRequest, InsertNodeRequest, InsertNodeWithVectorRequest, SearchRequest,
14 SetNameMappingRequest, graph_store_service_client::GraphStoreServiceClient,
15};
16
17#[derive(Clone)]
18pub struct RemoteGraphStore {
19 client: GraphStoreServiceClient<Channel>,
20}
21
22impl RemoteGraphStore {
23 pub async fn connect(config: RemoteGraphStoreConfig) -> VecGraphResult<Self> {
24 let mut endpoint: Endpoint = Endpoint::from_shared(config.endpoint)
25 .map_err(|e| VecGraphError::Other(format!("invalid endpoint: {}", e)))?;
26
27 if let Some(timeout) = config.connect_timeout {
28 endpoint = endpoint.connect_timeout(timeout);
29 }
30
31 if let Some(timeout) = config.request_timeout {
32 endpoint = endpoint.timeout(timeout);
33 }
34
35 let channel = endpoint
36 .connect()
37 .compat()
38 .await
39 .map_err(|e| VecGraphError::Other(format!("connection failed: {}", e)))?;
40
41 Ok(Self {
42 client: GraphStoreServiceClient::new(channel),
43 })
44 }
45
46 pub async fn connect_lazy(config: RemoteGraphStoreConfig) -> VecGraphResult<Self> {
47 async {
48 let mut endpoint: Endpoint = Endpoint::from_shared(config.endpoint)
49 .map_err(|e| VecGraphError::Other(format!("invalid endpoint: {}", e)))?;
50
51 if let Some(timeout) = config.connect_timeout {
52 endpoint = endpoint.connect_timeout(timeout);
53 }
54
55 if let Some(timeout) = config.request_timeout {
56 endpoint = endpoint.timeout(timeout);
57 }
58
59 let channel = endpoint.connect_lazy();
60
61 Ok(Self {
62 client: GraphStoreServiceClient::new(channel),
63 })
64 }
65 .compat()
66 .await
67 }
68}
69
70#[async_trait]
71impl GraphStore for RemoteGraphStore {
72 async fn insert_node(&self, node: &Node) -> VecGraphResult<()> {
73 let request = InsertNodeRequest {
74 node: Some(
75 node.clone()
76 .try_into()
77 .map_err(|e| VecGraphError::Other(format!("invalid node data: {}", e)))?,
78 ),
79 };
80 self.client
81 .clone()
82 .insert_node(request)
83 .compat()
84 .await
85 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
86 Ok(())
87 }
88
89 async fn insert_node_with_vector(&self, node: &NodeWithVector) -> VecGraphResult<()> {
90 let request = InsertNodeWithVectorRequest {
91 node: Some(
92 node.clone()
93 .try_into()
94 .map_err(|e| VecGraphError::Other(format!("invalid node data: {}", e)))?,
95 ),
96 };
97 self.client
98 .clone()
99 .insert_node_with_vector(request)
100 .compat()
101 .await
102 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
103 Ok(())
104 }
105
106 async fn get_node(&self, node: &NodeId) -> VecGraphResult<Option<Node>> {
107 let request = GetNodeRequest {
108 id: node.to_string(),
109 };
110 let response = self
111 .client
112 .clone()
113 .get_node(request)
114 .compat()
115 .await
116 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
117
118 if let Some(node) = response.into_inner().node {
119 let node: Node = node
120 .try_into()
121 .map_err(|e| VecGraphError::Other(format!("invalid node data: {}", e)))?;
122 Ok(Some(node))
123 } else {
124 Ok(None)
125 }
126 }
127
128 async fn get_node_vector(&self, node: &NodeId) -> VecGraphResult<Option<Vec<f32>>> {
129 let request = GetNodeVectorRequest {
130 id: node.to_string(),
131 };
132 let response = self
133 .client
134 .clone()
135 .get_node_vector(request)
136 .compat()
137 .await
138 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
139
140 if let Some(vector) = response.into_inner().vector {
141 Ok(Some(vector.values))
142 } else {
143 Ok(None)
144 }
145 }
146
147 async fn delete_node(&self, node: &NodeId) -> VecGraphResult<()> {
148 let request = DeleteNodeRequest {
149 id: node.to_string(),
150 };
151 self.client
152 .clone()
153 .delete_node(request)
154 .compat()
155 .await
156 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
157 Ok(())
158 }
159
160 async fn insert_edge(&self, edge: &Edge) -> VecGraphResult<()> {
161 let request = InsertEdgeRequest {
162 edge: Some(
163 edge.clone()
164 .try_into()
165 .map_err(|e| VecGraphError::Other(format!("invalid edge data: {}", e)))?,
166 ),
167 };
168 self.client
169 .clone()
170 .insert_edge(request)
171 .compat()
172 .await
173 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
174 Ok(())
175 }
176
177 async fn insert_edge_with_vector(&self, edge: &EdgeWithVector) -> VecGraphResult<()> {
178 let request = InsertEdgeWithVectorRequest {
179 edge: Some(
180 edge.clone()
181 .try_into()
182 .map_err(|e| VecGraphError::Other(format!("invalid edge data: {}", e)))?,
183 ),
184 };
185 self.client
186 .clone()
187 .insert_edge_with_vector(request)
188 .compat()
189 .await
190 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
191 Ok(())
192 }
193
194 async fn get_edge(&self, edge: &EdgeId) -> VecGraphResult<Option<Edge>> {
195 let request = GetEdgeRequest {
196 id: edge.to_string(),
197 };
198 let response = self
199 .client
200 .clone()
201 .get_edge(request)
202 .compat()
203 .await
204 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
205
206 if let Some(edge) = response.into_inner().edge {
207 let edge: Edge = edge
208 .try_into()
209 .map_err(|e| VecGraphError::Other(format!("invalid edge data: {}", e)))?;
210 Ok(Some(edge))
211 } else {
212 Ok(None)
213 }
214 }
215
216 async fn get_edges_for_node(&self, node_id: &NodeId) -> VecGraphResult<Vec<Edge>> {
217 let request = GetEdgesForNodeRequest {
218 node_id: node_id.to_string(),
219 };
220 let response = self
221 .client
222 .clone()
223 .get_edges_for_node(request)
224 .compat()
225 .await
226 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
227
228 let edges = response
229 .into_inner()
230 .edges
231 .into_iter()
232 .filter_map(|edge| {
233 edge.try_into()
234 .map_err(|e| {
235 VecGraphError::Other(format!("invalid edge data in response: {}", e))
236 })
237 .ok()
238 })
239 .collect();
240
241 Ok(edges)
242 }
243
244 async fn get_edges_targeting_node(&self, node_id: &NodeId) -> VecGraphResult<Vec<Edge>> {
245 let request = GetEdgesTargetingNodeRequest {
246 node_id: node_id.to_string(),
247 };
248 let response = self
249 .client
250 .clone()
251 .get_edges_targeting_node(request)
252 .compat()
253 .await
254 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
255
256 let edges = response
257 .into_inner()
258 .edges
259 .into_iter()
260 .filter_map(|edge| {
261 edge.try_into()
262 .map_err(|e| {
263 VecGraphError::Other(format!("invalid edge data in response: {}", e))
264 })
265 .ok()
266 })
267 .collect();
268
269 Ok(edges)
270 }
271
272 async fn delete_edge(&self, edge: &EdgeId) -> VecGraphResult<()> {
273 let request = DeleteEdgeRequest {
274 id: edge.to_string(),
275 };
276 self.client
277 .clone()
278 .delete_edge(request)
279 .compat()
280 .await
281 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
282 Ok(())
283 }
284
285 async fn get_edge_vector(&self, edge: &EdgeId) -> VecGraphResult<Option<Vec<f32>>> {
286 let request = GetEdgeVectorRequest {
287 id: edge.to_string(),
288 };
289 let response = self
290 .client
291 .clone()
292 .get_edge_vector(request)
293 .compat()
294 .await
295 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
296
297 if let Some(vector) = response.into_inner().vector {
298 Ok(Some(vector.values))
299 } else {
300 Ok(None)
301 }
302 }
303
304 async fn set_name_mapping(
305 &self,
306 kind: &str,
307 name: &str,
308 node_id: &NodeId,
309 ) -> VecGraphResult<()> {
310 let request = SetNameMappingRequest {
311 kind: kind.to_string(),
312 name: name.to_string(),
313 node_id: node_id.to_string(),
314 };
315 self.client
316 .clone()
317 .set_name_mapping(request)
318 .compat()
319 .await
320 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
321 Ok(())
322 }
323
324 async fn get_name_mapping(&self, kind: &str, name: &str) -> VecGraphResult<Option<NodeId>> {
325 let request = GetNameMappingRequest {
326 kind: kind.to_string(),
327 name: name.to_string(),
328 };
329 let response = self
330 .client
331 .clone()
332 .get_name_mapping(request)
333 .compat()
334 .await
335 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
336
337 if let Some(node_id) = response.into_inner().node_id {
338 Ok(Some(NodeId(node_id)))
339 } else {
340 Ok(None)
341 }
342 }
343
344 async fn delete_name_mapping(&self, kind: &str, name: &str) -> VecGraphResult<()> {
345 let request = DeleteNameMappingRequest {
346 kind: kind.to_string(),
347 name: name.to_string(),
348 };
349 self.client
350 .clone()
351 .delete_name_mapping(request)
352 .compat()
353 .await
354 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
355 Ok(())
356 }
357
358 async fn search(&self, query: &SearchQuery) -> VecGraphResult<Vec<SearchResult>> {
359 let request = SearchRequest {
360 query: Some(
361 query
362 .clone()
363 .try_into()
364 .map_err(|e| VecGraphError::Other(format!("invalid search query: {}", e)))?,
365 ),
366 };
367 let response = self
368 .client
369 .clone()
370 .search(request)
371 .compat()
372 .await
373 .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
374
375 let results = response
376 .into_inner()
377 .results
378 .into_iter()
379 .filter_map(|result| {
380 result
381 .try_into()
382 .map_err(|e| {
383 VecGraphError::Other(format!(
384 "invalid search result data in response: {}",
385 e
386 ))
387 })
388 .ok()
389 })
390 .collect();
391
392 Ok(results)
393 }
394}