vecgraph-client 0.1.1

Client library for vecgraph, providing gRPC client implementation and related utilities.
Documentation
use crate::RemoteGraphStoreConfig;
use async_compat::CompatExt;
use async_trait::async_trait;
use tonic::transport::{Channel, Endpoint};
use vecgraph_core::{
    Edge, EdgeId, EdgeWithVector, GraphStore, Node, NodeId, NodeWithVector, SearchQuery,
    SearchResult, VecGraphError, VecGraphResult,
};
use vecgraph_proto::{
    DeleteEdgeRequest, DeleteNameMappingRequest, DeleteNodeRequest, GetEdgeRequest,
    GetEdgeVectorRequest, GetEdgesForNodeRequest, GetEdgesTargetingNodeRequest,
    GetNameMappingRequest, GetNodeRequest, GetNodeVectorRequest, InsertEdgeRequest,
    InsertEdgeWithVectorRequest, InsertNodeRequest, InsertNodeWithVectorRequest, SearchRequest,
    SetNameMappingRequest, graph_store_service_client::GraphStoreServiceClient,
};

#[derive(Clone)]
pub struct RemoteGraphStore {
    client: GraphStoreServiceClient<Channel>,
}

impl RemoteGraphStore {
    pub async fn connect(config: RemoteGraphStoreConfig) -> VecGraphResult<Self> {
        let mut endpoint: Endpoint = Endpoint::from_shared(config.endpoint)
            .map_err(|e| VecGraphError::Other(format!("invalid endpoint: {}", e)))?;

        if let Some(timeout) = config.connect_timeout {
            endpoint = endpoint.connect_timeout(timeout);
        }

        if let Some(timeout) = config.request_timeout {
            endpoint = endpoint.timeout(timeout);
        }

        let channel = endpoint
            .connect()
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("connection failed: {}", e)))?;

        Ok(Self {
            client: GraphStoreServiceClient::new(channel),
        })
    }

    pub async fn connect_lazy(config: RemoteGraphStoreConfig) -> VecGraphResult<Self> {
        async {
            let mut endpoint: Endpoint = Endpoint::from_shared(config.endpoint)
                .map_err(|e| VecGraphError::Other(format!("invalid endpoint: {}", e)))?;

            if let Some(timeout) = config.connect_timeout {
                endpoint = endpoint.connect_timeout(timeout);
            }

            if let Some(timeout) = config.request_timeout {
                endpoint = endpoint.timeout(timeout);
            }

            let channel = endpoint.connect_lazy();

            Ok(Self {
                client: GraphStoreServiceClient::new(channel),
            })
        }
        .compat()
        .await
    }
}

#[async_trait]
impl GraphStore for RemoteGraphStore {
    async fn insert_node(&self, node: &Node) -> VecGraphResult<()> {
        let request = InsertNodeRequest {
            node: Some(
                node.clone()
                    .try_into()
                    .map_err(|e| VecGraphError::Other(format!("invalid node data: {}", e)))?,
            ),
        };
        self.client
            .clone()
            .insert_node(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
        Ok(())
    }

    async fn insert_node_with_vector(&self, node: &NodeWithVector) -> VecGraphResult<()> {
        let request = InsertNodeWithVectorRequest {
            node: Some(
                node.clone()
                    .try_into()
                    .map_err(|e| VecGraphError::Other(format!("invalid node data: {}", e)))?,
            ),
        };
        self.client
            .clone()
            .insert_node_with_vector(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
        Ok(())
    }

    async fn get_node(&self, node: &NodeId) -> VecGraphResult<Option<Node>> {
        let request = GetNodeRequest {
            id: node.to_string(),
        };
        let response = self
            .client
            .clone()
            .get_node(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;

        if let Some(node) = response.into_inner().node {
            let node: Node = node
                .try_into()
                .map_err(|e| VecGraphError::Other(format!("invalid node data: {}", e)))?;
            Ok(Some(node))
        } else {
            Ok(None)
        }
    }

    async fn get_node_vector(&self, node: &NodeId) -> VecGraphResult<Option<Vec<f32>>> {
        let request = GetNodeVectorRequest {
            id: node.to_string(),
        };
        let response = self
            .client
            .clone()
            .get_node_vector(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;

        if let Some(vector) = response.into_inner().vector {
            Ok(Some(vector.values))
        } else {
            Ok(None)
        }
    }

    async fn delete_node(&self, node: &NodeId) -> VecGraphResult<()> {
        let request = DeleteNodeRequest {
            id: node.to_string(),
        };
        self.client
            .clone()
            .delete_node(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
        Ok(())
    }

    async fn insert_edge(&self, edge: &Edge) -> VecGraphResult<()> {
        let request = InsertEdgeRequest {
            edge: Some(
                edge.clone()
                    .try_into()
                    .map_err(|e| VecGraphError::Other(format!("invalid edge data: {}", e)))?,
            ),
        };
        self.client
            .clone()
            .insert_edge(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
        Ok(())
    }

    async fn insert_edge_with_vector(&self, edge: &EdgeWithVector) -> VecGraphResult<()> {
        let request = InsertEdgeWithVectorRequest {
            edge: Some(
                edge.clone()
                    .try_into()
                    .map_err(|e| VecGraphError::Other(format!("invalid edge data: {}", e)))?,
            ),
        };
        self.client
            .clone()
            .insert_edge_with_vector(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
        Ok(())
    }

    async fn get_edge(&self, edge: &EdgeId) -> VecGraphResult<Option<Edge>> {
        let request = GetEdgeRequest {
            id: edge.to_string(),
        };
        let response = self
            .client
            .clone()
            .get_edge(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;

        if let Some(edge) = response.into_inner().edge {
            let edge: Edge = edge
                .try_into()
                .map_err(|e| VecGraphError::Other(format!("invalid edge data: {}", e)))?;
            Ok(Some(edge))
        } else {
            Ok(None)
        }
    }

    async fn get_edges_for_node(&self, node_id: &NodeId) -> VecGraphResult<Vec<Edge>> {
        let request = GetEdgesForNodeRequest {
            node_id: node_id.to_string(),
        };
        let response = self
            .client
            .clone()
            .get_edges_for_node(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;

        let edges = response
            .into_inner()
            .edges
            .into_iter()
            .filter_map(|edge| {
                edge.try_into()
                    .map_err(|e| {
                        VecGraphError::Other(format!("invalid edge data in response: {}", e))
                    })
                    .ok()
            })
            .collect();

        Ok(edges)
    }

    async fn get_edges_targeting_node(&self, node_id: &NodeId) -> VecGraphResult<Vec<Edge>> {
        let request = GetEdgesTargetingNodeRequest {
            node_id: node_id.to_string(),
        };
        let response = self
            .client
            .clone()
            .get_edges_targeting_node(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;

        let edges = response
            .into_inner()
            .edges
            .into_iter()
            .filter_map(|edge| {
                edge.try_into()
                    .map_err(|e| {
                        VecGraphError::Other(format!("invalid edge data in response: {}", e))
                    })
                    .ok()
            })
            .collect();

        Ok(edges)
    }

    async fn delete_edge(&self, edge: &EdgeId) -> VecGraphResult<()> {
        let request = DeleteEdgeRequest {
            id: edge.to_string(),
        };
        self.client
            .clone()
            .delete_edge(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
        Ok(())
    }

    async fn get_edge_vector(&self, edge: &EdgeId) -> VecGraphResult<Option<Vec<f32>>> {
        let request = GetEdgeVectorRequest {
            id: edge.to_string(),
        };
        let response = self
            .client
            .clone()
            .get_edge_vector(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;

        if let Some(vector) = response.into_inner().vector {
            Ok(Some(vector.values))
        } else {
            Ok(None)
        }
    }

    async fn set_name_mapping(
        &self,
        kind: &str,
        name: &str,
        node_id: &NodeId,
    ) -> VecGraphResult<()> {
        let request = SetNameMappingRequest {
            kind: kind.to_string(),
            name: name.to_string(),
            node_id: node_id.to_string(),
        };
        self.client
            .clone()
            .set_name_mapping(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
        Ok(())
    }

    async fn get_name_mapping(&self, kind: &str, name: &str) -> VecGraphResult<Option<NodeId>> {
        let request = GetNameMappingRequest {
            kind: kind.to_string(),
            name: name.to_string(),
        };
        let response = self
            .client
            .clone()
            .get_name_mapping(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;

        if let Some(node_id) = response.into_inner().node_id {
            Ok(Some(NodeId(node_id)))
        } else {
            Ok(None)
        }
    }

    async fn delete_name_mapping(&self, kind: &str, name: &str) -> VecGraphResult<()> {
        let request = DeleteNameMappingRequest {
            kind: kind.to_string(),
            name: name.to_string(),
        };
        self.client
            .clone()
            .delete_name_mapping(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;
        Ok(())
    }

    async fn search(&self, query: &SearchQuery) -> VecGraphResult<Vec<SearchResult>> {
        let request = SearchRequest {
            query: Some(
                query
                    .clone()
                    .try_into()
                    .map_err(|e| VecGraphError::Other(format!("invalid search query: {}", e)))?,
            ),
        };
        let response = self
            .client
            .clone()
            .search(request)
            .compat()
            .await
            .map_err(|e| VecGraphError::Other(format!("request failed: {}", e)))?;

        let results = response
            .into_inner()
            .results
            .into_iter()
            .filter_map(|result| {
                result
                    .try_into()
                    .map_err(|e| {
                        VecGraphError::Other(format!(
                            "invalid search result data in response: {}",
                            e
                        ))
                    })
                    .ok()
            })
            .collect();

        Ok(results)
    }
}