nebula_client/v1/graph/
query.rs1use core::time::Duration;
2
3use async_trait::async_trait;
4use nebula_fbthrift_graph_v1::{
5 errors::graph_service::ExecuteError,
6 types::{ErrorCode, ExecutionResponse},
7};
8
9use serde::de::DeserializeOwned;
10use serde_nebula_fbthrift_graph::v1::de::{
11 data::DataDeserializeError, deserialize_execution_response,
12};
13
14#[async_trait]
15pub trait GraphQuery {
16 async fn query_as<D: DeserializeOwned>(
17 &mut self,
18 stmt: &str,
19 ) -> Result<GraphQueryOutput<D>, GraphQueryError>;
20
21 async fn query(&mut self, stmt: &str) -> Result<GraphQueryOutput<()>, GraphQueryError> {
22 self.query_as(stmt).await
23 }
24
25 async fn show_hosts(&mut self) -> Result<GraphQueryOutput<Host>, GraphQueryError> {
26 self.query_as(STMT_SHOW_HOSTS).await
27 }
28 async fn show_spaces(&mut self) -> Result<GraphQueryOutput<Space>, GraphQueryError> {
29 self.query_as(STMT_SHOW_SPACES).await
30 }
31}
32
33#[derive(Debug)]
34pub struct GraphQueryOutput<D>
35where
36 D: DeserializeOwned,
37{
38 pub latency: Duration,
39 pub space_name: Option<String>,
40 pub data_set: Vec<D>,
41}
42
43impl<D> GraphQueryOutput<D>
44where
45 D: DeserializeOwned,
46{
47 pub fn new(res: ExecutionResponse) -> Result<Self, GraphQueryError> {
48 let latency = Duration::from_micros(res.latency_in_us as u64);
49 let space_name = res.space_name.clone();
50 let data_set = deserialize_execution_response::<D>(&res)
51 .map_err(GraphQueryError::DataDeserializeError)?;
52
53 Ok(Self {
54 latency,
55 space_name,
56 data_set,
57 })
58 }
59}
60
61#[derive(Debug)]
65pub enum GraphQueryError {
66 ExecuteError(ExecuteError),
67 ResponseError(ErrorCode, Option<String>),
68 DataDeserializeError(DataDeserializeError),
69}
70
71impl core::fmt::Display for GraphQueryError {
72 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
73 match self {
74 Self::ExecuteError(err) => write!(f, "ExecuteError {err}"),
75 Self::ResponseError(err_code, err_msg) => {
76 write!(f, "ResponseError err_code:{err_code} err_msg:{err_msg:?}",)
77 }
78 Self::DataDeserializeError(err) => write!(f, "DataDeserializeError {err}"),
79 }
80 }
81}
82
83impl std::error::Error for GraphQueryError {
84 fn description(&self) -> &str {
85 match self {
86 Self::ExecuteError(_) => "ExecuteError",
87 Self::ResponseError(_, _) => "ResponseError",
88 Self::DataDeserializeError(_) => "DataDeserializeError",
89 }
90 }
91}
92
93use serde::Deserialize;
97
98const STMT_SHOW_HOSTS: &str = "SHOW HOSTS;";
99#[derive(Deserialize, Debug)]
100pub struct Host {
101 #[serde(rename(deserialize = "Ip"))]
102 pub ip: String,
103 #[serde(rename(deserialize = "Port"))]
104 pub port: String,
105 #[serde(rename(deserialize = "Status"))]
106 pub status: String,
107 #[serde(rename(deserialize = "Leader count"))]
108 pub leader_count: u64,
109 #[serde(rename(deserialize = "Leader distribution"))]
110 pub leader_distribution: String,
111 #[serde(rename(deserialize = "Partition distribution"))]
112 pub partition_distribution: String,
113}
114
115const STMT_SHOW_SPACES: &str = "SHOW SPACES;";
116#[derive(Deserialize, Debug)]
117pub struct Space {
118 #[serde(rename(deserialize = "Name"))]
119 pub name: String,
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125
126 use std::io::{Error as IoError, ErrorKind as IoErrorKind};
127
128 #[test]
129 fn impl_std_fmt_display() {
130 let err = GraphQueryError::ResponseError(ErrorCode::E_DISCONNECTED, None);
131 println!("{err}");
132 }
133
134 #[test]
135 fn impl_std_error_error() {
136 let err = IoError::new(
137 IoErrorKind::Other,
138 GraphQueryError::ResponseError(ErrorCode::E_DISCONNECTED, None),
139 );
140 println!("{err}");
141 }
142}