nebula_client/v1/graph/
client.rs1use std::io::{Error as IoError, ErrorKind as IoErrorKind};
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use fbthrift::{ApplicationException, ApplicationExceptionErrorCode, BinaryProtocol, Transport};
6use nebula_fbthrift_graph_v1::{
7 client::{GraphService, GraphServiceImpl},
8 errors::graph_service::{AuthenticateError, ExecuteError, SignoutError},
9 types::{ErrorCode, ExecutionResponse},
10};
11use serde::de::DeserializeOwned;
12
13use super::query::{GraphQuery, GraphQueryError, GraphQueryOutput};
14
15struct GraphConnection<T>
19where
20 T: Transport,
21 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
22 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
23 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
24{
25 service: GraphServiceImpl<BinaryProtocol, T>,
26}
27
28impl<T> GraphConnection<T>
29where
30 T: Transport,
31 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
32 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
33 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
34{
35 fn new(transport: T) -> Self {
36 Self {
37 service: GraphServiceImpl::<BinaryProtocol, _>::new(transport),
38 }
39 }
40}
41
42pub struct GraphClient<T>
46where
47 T: Transport,
48 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
49 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
50 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
51{
52 connection: GraphConnection<T>,
53}
54
55impl<T> GraphClient<T>
56where
57 T: Transport,
58 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
59 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
60 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
61{
62 pub fn new(transport: T) -> Self {
63 Self {
64 connection: GraphConnection::new(transport),
65 }
66 }
67
68 pub async fn authenticate(
69 self,
70 username: &str,
71 password: &str,
72 ) -> Result<GraphSession<T>, AuthenticateError> {
73 let res = self
74 .connection
75 .service
76 .authenticate(username, password)
77 .await?;
78
79 if res.error_code != ErrorCode::SUCCEEDED {
80 return Err(ApplicationException::new(
81 ApplicationExceptionErrorCode::Unknown,
82 res.error_msg.unwrap_or_else(|| "Unknown".to_owned()),
83 )
84 .into());
85 }
86
87 let session_id = res.session_id.ok_or_else(|| {
88 ApplicationException::new(
89 ApplicationExceptionErrorCode::InternalError,
90 "Missing session_id".to_owned(),
91 )
92 })?;
93
94 Ok(GraphSession::new(self.connection, session_id))
95 }
96}
97
98pub struct GraphSession<T>
102where
103 T: Transport,
104 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
105 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
106 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
107{
108 connection: GraphConnection<T>,
109 session_id: i64,
110 close_required: bool,
111}
112
113impl<T> GraphSession<T>
114where
115 T: Transport,
116 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
117 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
118 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
119{
120 fn new(connection: GraphConnection<T>, session_id: i64) -> Self {
121 Self {
122 connection,
123 session_id,
124 close_required: false,
125 }
126 }
127
128 pub async fn signout(self) -> Result<(), SignoutError> {
129 self.connection.service.signout(self.session_id).await
130 }
131
132 pub async fn execute(&mut self, stmt: &str) -> Result<ExecutionResponse, ExecuteError> {
133 let res = match self.connection.service.execute(self.session_id, stmt).await {
134 Ok(res) => res,
135 Err(ExecuteError::ThriftError(err)) => {
136 if let Some(io_err) = err.downcast_ref::<IoError>() {
137 if io_err.kind() == IoErrorKind::BrokenPipe {
139 self.close_required = true;
140 }
141 }
142
143 return Err(ExecuteError::ThriftError(err));
144 }
145 Err(err) => return Err(err),
146 };
147
148 match res.error_code {
153 ErrorCode::E_SESSION_INVALID | ErrorCode::E_SESSION_TIMEOUT => {
154 self.close_required = true;
155 }
156 _ => {}
157 }
158
159 Ok(res)
160 }
161
162 pub fn is_close_required(&self) -> bool {
163 self.close_required
164 }
165}
166
167#[async_trait]
171impl<T> GraphQuery for GraphSession<T>
172where
173 T: Transport + Send + Sync,
174 Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
175 ::fbthrift::ProtocolEncoded<BinaryProtocol>:
176 ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
177{
178 async fn query_as<D: DeserializeOwned>(
179 &mut self,
180 stmt: &str,
181 ) -> Result<GraphQueryOutput<D>, GraphQueryError> {
182 let res = self
183 .execute(stmt)
184 .await
185 .map_err(GraphQueryError::ExecuteError)?;
186
187 if res.error_code != ErrorCode::SUCCEEDED {
188 return Err(GraphQueryError::ResponseError(
189 res.error_code,
190 res.error_msg,
191 ));
192 }
193
194 GraphQueryOutput::new(res)
195 }
196}