nebula_client/v3/graph/
query.rs1use core::time::Duration;
2
3use async_trait::async_trait;
4use nebula_fbthrift_graph_v3::{
5 dependencies::common::types::ErrorCode, errors::graph_service::ExecuteError,
6 types::ExecutionResponse,
7};
8
9use serde::de::DeserializeOwned;
10use serde_nebula_fbthrift_graph::v3::de::{
11 data::DataDeserializeError, deserialize_execution_response,
12};
13
14#[async_trait]
15pub trait GraphQuery {
16 #[allow(clippy::ptr_arg)]
17 async fn query_as<D: DeserializeOwned>(
18 &mut self,
19 stmt: &Vec<u8>,
20 ) -> Result<GraphQueryOutput<D>, GraphQueryError>;
21
22 async fn query(&mut self, stmt: &Vec<u8>) -> Result<GraphQueryOutput<()>, GraphQueryError> {
23 self.query_as(stmt).await
24 }
25
26 async fn show_hosts(&mut self) -> Result<GraphQueryOutput<Host>, GraphQueryError> {
27 self.query_as(STMT_SHOW_HOSTS.to_vec().as_ref()).await
28 }
29 async fn show_spaces(&mut self) -> Result<GraphQueryOutput<Space>, GraphQueryError> {
30 self.query_as(STMT_SHOW_SPACES.to_vec().as_ref()).await
31 }
32}
33
34#[derive(Debug)]
35pub struct GraphQueryOutput<D>
36where
37 D: DeserializeOwned,
38{
39 pub latency: Duration,
40 pub space_name: Option<Vec<u8>>,
41 pub data_set: Vec<D>,
42}
43
44impl<D> GraphQueryOutput<D>
45where
46 D: DeserializeOwned,
47{
48 pub fn new(res: ExecutionResponse) -> Result<Self, GraphQueryError> {
49 let latency = Duration::from_micros(res.latency_in_us as u64);
50 let space_name = res.space_name.clone();
51 let data_set = deserialize_execution_response::<D>(&res)
52 .map_err(GraphQueryError::DataDeserializeError)?;
53
54 Ok(Self {
55 latency,
56 space_name,
57 data_set,
58 })
59 }
60}
61
62#[derive(Debug)]
66pub enum GraphQueryError {
67 ExecuteError(ExecuteError),
68 ResponseError(ErrorCode, Option<Vec<u8>>),
69 DataDeserializeError(DataDeserializeError),
70}
71
72impl core::fmt::Display for GraphQueryError {
73 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
74 match self {
75 Self::ExecuteError(err) => write!(f, "ExecuteError {err}"),
76 Self::ResponseError(err_code, err_msg) => {
77 write!(f, "ResponseError err_code:{err_code} err_msg:{err_msg:?}",)
78 }
79 Self::DataDeserializeError(err) => write!(f, "DataDeserializeError {err}"),
80 }
81 }
82}
83
84impl std::error::Error for GraphQueryError {
85 fn description(&self) -> &str {
86 match self {
87 Self::ExecuteError(_) => "ExecuteError",
88 Self::ResponseError(_, _) => "ResponseError",
89 Self::DataDeserializeError(_) => "DataDeserializeError",
90 }
91 }
92}
93
94use serde::Deserialize;
98
99const STMT_SHOW_HOSTS: &[u8] = b"SHOW HOSTS;";
100#[derive(Deserialize, Debug)]
101pub struct Host {
102 #[serde(rename(deserialize = "Host"))]
103 pub host: String,
104 #[serde(rename(deserialize = "Port"))]
105 pub port: u16,
106 #[serde(rename(deserialize = "HTTP port"))]
107 pub http_port: u16,
108 #[serde(rename(deserialize = "Status"))]
109 pub status: String,
110 #[serde(rename(deserialize = "Leader count"))]
111 pub leader_count: u64,
112 #[serde(rename(deserialize = "Leader distribution"))]
113 pub leader_distribution: String,
114 #[serde(rename(deserialize = "Partition distribution"))]
115 pub partition_distribution: String,
116 #[serde(rename(deserialize = "Version"))]
117 pub version: String,
118}
119
120const STMT_SHOW_SPACES: &[u8] = b"SHOW SPACES;";
121#[derive(Deserialize, Debug)]
122pub struct Space {
123 #[serde(rename(deserialize = "Name"))]
124 pub name: String,
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 use std::io::{Error as IoError, ErrorKind as IoErrorKind};
132
133 #[test]
134 fn impl_std_fmt_display() {
135 let err = GraphQueryError::ResponseError(ErrorCode::E_DISCONNECTED, None);
136 println!("{err}");
137 }
138
139 #[test]
140 fn impl_std_error_error() {
141 let err = IoError::new(
142 IoErrorKind::Other,
143 GraphQueryError::ResponseError(ErrorCode::E_DISCONNECTED, None),
144 );
145 println!("{err}");
146 }
147}