nebula_client/v2/graph/
query.rs

1use core::time::Duration;
2
3use async_trait::async_trait;
4use nebula_fbthrift_graph_v2::{
5    dependencies::common::types::ErrorCode, errors::graph_service::ExecuteError,
6    types::ExecutionResponse,
7};
8
9use serde::de::DeserializeOwned;
10use serde_nebula_fbthrift_graph::v2::de::{
11    data::DataDeserializeError, deserialize_execution_response,
12};
13
14#[async_trait]
15pub trait GraphQuery {
16    #[allow(clippy::ptr_arg)]
17    async fn query_as<D: DeserializeOwned>(
18        &mut self,
19        stmt: &Vec<u8>,
20    ) -> Result<GraphQueryOutput<D>, GraphQueryError>;
21
22    async fn query(&mut self, stmt: &Vec<u8>) -> Result<GraphQueryOutput<()>, GraphQueryError> {
23        self.query_as(stmt).await
24    }
25
26    async fn show_hosts(&mut self) -> Result<GraphQueryOutput<Host>, GraphQueryError> {
27        self.query_as(STMT_SHOW_HOSTS.to_vec().as_ref()).await
28    }
29    async fn show_spaces(&mut self) -> Result<GraphQueryOutput<Space>, GraphQueryError> {
30        self.query_as(STMT_SHOW_SPACES.to_vec().as_ref()).await
31    }
32}
33
34#[derive(Debug)]
35pub struct GraphQueryOutput<D>
36where
37    D: DeserializeOwned,
38{
39    pub latency: Duration,
40    pub space_name: Option<Vec<u8>>,
41    pub data_set: Vec<D>,
42}
43
44impl<D> GraphQueryOutput<D>
45where
46    D: DeserializeOwned,
47{
48    pub fn new(res: ExecutionResponse) -> Result<Self, GraphQueryError> {
49        let latency = Duration::from_micros(res.latency_in_us as u64);
50        let space_name = res.space_name.clone();
51        let data_set = deserialize_execution_response::<D>(&res)
52            .map_err(GraphQueryError::DataDeserializeError)?;
53
54        Ok(Self {
55            latency,
56            space_name,
57            data_set,
58        })
59    }
60}
61
62//
63//
64//
65#[derive(Debug)]
66pub enum GraphQueryError {
67    ExecuteError(ExecuteError),
68    ResponseError(ErrorCode, Option<Vec<u8>>),
69    DataDeserializeError(DataDeserializeError),
70}
71
72impl core::fmt::Display for GraphQueryError {
73    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
74        match self {
75            Self::ExecuteError(err) => write!(f, "ExecuteError {err}"),
76            Self::ResponseError(err_code, err_msg) => {
77                write!(f, "ResponseError err_code:{err_code} err_msg:{err_msg:?}",)
78            }
79            Self::DataDeserializeError(err) => write!(f, "DataDeserializeError {err}"),
80        }
81    }
82}
83
84impl std::error::Error for GraphQueryError {
85    fn description(&self) -> &str {
86        match self {
87            Self::ExecuteError(_) => "ExecuteError",
88            Self::ResponseError(_, _) => "ResponseError",
89            Self::DataDeserializeError(_) => "DataDeserializeError",
90        }
91    }
92}
93
94//
95//
96//
97use serde::Deserialize;
98
99const STMT_SHOW_HOSTS: &[u8] = b"SHOW HOSTS;";
100#[derive(Deserialize, Debug)]
101pub struct Host {
102    #[serde(rename(deserialize = "Host"))]
103    pub host: String,
104    #[serde(rename(deserialize = "Port"))]
105    pub port: u16,
106    #[serde(rename(deserialize = "Status"))]
107    pub status: String,
108    #[serde(rename(deserialize = "Leader count"))]
109    pub leader_count: u64,
110    #[serde(rename(deserialize = "Leader distribution"))]
111    pub leader_distribution: String,
112    #[serde(rename(deserialize = "Partition distribution"))]
113    pub partition_distribution: String,
114}
115
116const STMT_SHOW_SPACES: &[u8] = b"SHOW SPACES;";
117#[derive(Deserialize, Debug)]
118pub struct Space {
119    #[serde(rename(deserialize = "Name"))]
120    pub name: String,
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    use std::io::{Error as IoError, ErrorKind as IoErrorKind};
128
129    #[test]
130    fn impl_std_fmt_display() {
131        let err = GraphQueryError::ResponseError(ErrorCode::E_DISCONNECTED, None);
132        println!("{err}");
133    }
134
135    #[test]
136    fn impl_std_error_error() {
137        let err = IoError::new(
138            IoErrorKind::Other,
139            GraphQueryError::ResponseError(ErrorCode::E_DISCONNECTED, None),
140        );
141        println!("{err}");
142    }
143}