nebula_client/v3/graph/
client.rs1use std::io::{Error as IoError, ErrorKind as IoErrorKind};
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use fbthrift::{ApplicationException, ApplicationExceptionErrorCode, BinaryProtocol, Transport};
6use nebula_fbthrift_graph_v3::{
7 client::{GraphService, GraphServiceImpl},
8 dependencies::common::types::ErrorCode,
9 errors::graph_service::{AuthenticateError, ExecuteError, ExecuteJsonError, SignoutError},
10 types::ExecutionResponse,
11};
12use serde::de::DeserializeOwned;
13
14use super::query::{GraphQuery, GraphQueryError, GraphQueryOutput};
15
16struct GraphConnection<T>
20where
21 T: Transport,
22 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
23 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
24 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
25{
26 service: GraphServiceImpl<BinaryProtocol, T>,
27}
28
29impl<T> GraphConnection<T>
30where
31 T: Transport,
32 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
33 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
34 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
35{
36 fn new(transport: T) -> Self {
37 Self {
38 service: GraphServiceImpl::<BinaryProtocol, _>::new(transport),
39 }
40 }
41}
42
43pub struct GraphClient<T>
47where
48 T: Transport,
49 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
50 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
51 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
52{
53 connection: GraphConnection<T>,
54}
55
56impl<T> GraphClient<T>
57where
58 T: Transport,
59 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
60 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
61 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
62{
63 pub fn new(transport: T) -> Self {
64 Self {
65 connection: GraphConnection::new(transport),
66 }
67 }
68
69 #[allow(clippy::ptr_arg)]
70 pub async fn authenticate(
71 self,
72 username: &Vec<u8>,
73 password: &Vec<u8>,
74 ) -> Result<GraphSession<T>, AuthenticateError> {
75 let res = self
76 .connection
77 .service
78 .authenticate(username, password)
79 .await?;
80
81 if res.error_code != ErrorCode::SUCCEEDED {
82 return Err(ApplicationException::new(
83 ApplicationExceptionErrorCode::Unknown,
84 res.error_msg
85 .map(|x| String::from_utf8_lossy(&x).to_string())
86 .unwrap_or_else(|| "Unknown".to_owned()),
87 )
88 .into());
89 }
90 let session_id = res.session_id.ok_or_else(|| {
91 ApplicationException::new(
92 ApplicationExceptionErrorCode::InternalError,
93 "Missing session_id".to_owned(),
94 )
95 })?;
96
97 Ok(GraphSession::new(self.connection, session_id))
98 }
99}
100
101pub struct GraphSession<T>
105where
106 T: Transport,
107 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
108 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
109 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
110{
111 connection: GraphConnection<T>,
112 session_id: i64,
113 close_required: bool,
114}
115
116impl<T> GraphSession<T>
117where
118 T: Transport,
119 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
120 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
121 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
122{
123 fn new(connection: GraphConnection<T>, session_id: i64) -> Self {
124 Self {
125 connection,
126 session_id,
127 close_required: false,
128 }
129 }
130
131 pub async fn signout(self) -> Result<(), SignoutError> {
132 self.connection.service.signout(self.session_id).await
133 }
134
135 #[allow(clippy::ptr_arg)]
136 pub async fn execute(&mut self, stmt: &Vec<u8>) -> Result<ExecutionResponse, ExecuteError> {
137 let res = match self.connection.service.execute(self.session_id, stmt).await {
138 Ok(res) => res,
139 Err(ExecuteError::ThriftError(err)) => {
140 if let Some(io_err) = err.downcast_ref::<IoError>() {
141 if io_err.kind() == IoErrorKind::BrokenPipe {
143 self.close_required = true;
144 }
145 }
146
147 return Err(ExecuteError::ThriftError(err));
148 }
149 Err(err) => return Err(err),
150 };
151
152 match res.error_code {
153 ErrorCode::E_SESSION_INVALID | ErrorCode::E_SESSION_TIMEOUT => {
154 self.close_required = true;
155 }
156 _ => {}
157 }
158
159 Ok(res)
160 }
161
162 #[allow(clippy::ptr_arg)]
163 pub async fn execute_json(&mut self, stmt: &Vec<u8>) -> Result<Vec<u8>, ExecuteJsonError> {
164 let res = match self
165 .connection
166 .service
167 .executeJson(self.session_id, stmt)
168 .await
169 {
170 Ok(res) => res,
171 Err(ExecuteJsonError::ThriftError(err)) => {
172 if let Some(io_err) = err.downcast_ref::<IoError>() {
173 if io_err.kind() == IoErrorKind::BrokenPipe {
175 self.close_required = true;
176 }
177 }
178
179 return Err(ExecuteJsonError::ThriftError(err));
180 }
181 Err(err) => return Err(err),
182 };
183
184 Ok(res)
185 }
186
187 pub fn is_close_required(&self) -> bool {
188 self.close_required
189 }
190}
191
192#[async_trait]
196impl<T> GraphQuery for GraphSession<T>
197where
198 T: Transport + Send + Sync,
199 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
200 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
201 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
202{
203 async fn query_as<D: DeserializeOwned>(
204 &mut self,
205 stmt: &Vec<u8>,
206 ) -> Result<GraphQueryOutput<D>, GraphQueryError> {
207 let res = self
208 .execute(stmt)
209 .await
210 .map_err(GraphQueryError::ExecuteError)?;
211
212 if res.error_code != ErrorCode::SUCCEEDED {
213 return Err(GraphQueryError::ResponseError(
214 res.error_code,
215 res.error_msg,
216 ));
217 }
218
219 GraphQueryOutput::new(res)
220 }
221}