nebula_client/v3/graph/
query.rs

1use core::time::Duration;
2
3use async_trait::async_trait;
4use nebula_fbthrift_graph_v3::{
5    dependencies::common::types::ErrorCode, errors::graph_service::ExecuteError,
6    types::ExecutionResponse,
7};
8
9use serde::de::DeserializeOwned;
10use serde_nebula_fbthrift_graph::v3::de::{
11    data::DataDeserializeError, deserialize_execution_response,
12};
13
14#[async_trait]
15pub trait GraphQuery {
16    #[allow(clippy::ptr_arg)]
17    async fn query_as<D: DeserializeOwned>(
18        &mut self,
19        stmt: &Vec<u8>,
20    ) -> Result<GraphQueryOutput<D>, GraphQueryError>;
21
22    async fn query(&mut self, stmt: &Vec<u8>) -> Result<GraphQueryOutput<()>, GraphQueryError> {
23        self.query_as(stmt).await
24    }
25
26    async fn show_hosts(&mut self) -> Result<GraphQueryOutput<Host>, GraphQueryError> {
27        self.query_as(STMT_SHOW_HOSTS.to_vec().as_ref()).await
28    }
29    async fn show_spaces(&mut self) -> Result<GraphQueryOutput<Space>, GraphQueryError> {
30        self.query_as(STMT_SHOW_SPACES.to_vec().as_ref()).await
31    }
32}
33
34#[derive(Debug)]
35pub struct GraphQueryOutput<D>
36where
37    D: DeserializeOwned,
38{
39    pub latency: Duration,
40    pub space_name: Option<Vec<u8>>,
41    pub data_set: Vec<D>,
42}
43
44impl<D> GraphQueryOutput<D>
45where
46    D: DeserializeOwned,
47{
48    pub fn new(res: ExecutionResponse) -> Result<Self, GraphQueryError> {
49        let latency = Duration::from_micros(res.latency_in_us as u64);
50        let space_name = res.space_name.clone();
51        let data_set = deserialize_execution_response::<D>(&res)
52            .map_err(GraphQueryError::DataDeserializeError)?;
53
54        Ok(Self {
55            latency,
56            space_name,
57            data_set,
58        })
59    }
60}
61
62//
63//
64//
65#[derive(Debug)]
66pub enum GraphQueryError {
67    ExecuteError(ExecuteError),
68    ResponseError(ErrorCode, Option<Vec<u8>>),
69    DataDeserializeError(DataDeserializeError),
70}
71
72impl core::fmt::Display for GraphQueryError {
73    fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
74        match self {
75            Self::ExecuteError(err) => write!(f, "ExecuteError {err}"),
76            Self::ResponseError(err_code, err_msg) => {
77                write!(f, "ResponseError err_code:{err_code} err_msg:{err_msg:?}",)
78            }
79            Self::DataDeserializeError(err) => write!(f, "DataDeserializeError {err}"),
80        }
81    }
82}
83
84impl std::error::Error for GraphQueryError {
85    fn description(&self) -> &str {
86        match self {
87            Self::ExecuteError(_) => "ExecuteError",
88            Self::ResponseError(_, _) => "ResponseError",
89            Self::DataDeserializeError(_) => "DataDeserializeError",
90        }
91    }
92}
93
94//
95//
96//
97use serde::Deserialize;
98
99const STMT_SHOW_HOSTS: &[u8] = b"SHOW HOSTS;";
100#[derive(Deserialize, Debug)]
101pub struct Host {
102    #[serde(rename(deserialize = "Host"))]
103    pub host: String,
104    #[serde(rename(deserialize = "Port"))]
105    pub port: u16,
106    #[serde(rename(deserialize = "HTTP port"))]
107    pub http_port: u16,
108    #[serde(rename(deserialize = "Status"))]
109    pub status: String,
110    #[serde(rename(deserialize = "Leader count"))]
111    pub leader_count: u64,
112    #[serde(rename(deserialize = "Leader distribution"))]
113    pub leader_distribution: String,
114    #[serde(rename(deserialize = "Partition distribution"))]
115    pub partition_distribution: String,
116    #[serde(rename(deserialize = "Version"))]
117    pub version: String,
118}
119
120const STMT_SHOW_SPACES: &[u8] = b"SHOW SPACES;";
121#[derive(Deserialize, Debug)]
122pub struct Space {
123    #[serde(rename(deserialize = "Name"))]
124    pub name: String,
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    use std::io::{Error as IoError, ErrorKind as IoErrorKind};
132
133    #[test]
134    fn impl_std_fmt_display() {
135        let err = GraphQueryError::ResponseError(ErrorCode::E_DISCONNECTED, None);
136        println!("{err}");
137    }
138
139    #[test]
140    fn impl_std_error_error() {
141        let err = IoError::new(
142            IoErrorKind::Other,
143            GraphQueryError::ResponseError(ErrorCode::E_DISCONNECTED, None),
144        );
145        println!("{err}");
146    }
147}