nebula_client/v3/graph/
client.rs

1use std::io::{Error as IoError, ErrorKind as IoErrorKind};
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use fbthrift::{ApplicationException, ApplicationExceptionErrorCode, BinaryProtocol, Transport};
6use nebula_fbthrift_graph_v3::{
7    client::{GraphService, GraphServiceImpl},
8    dependencies::common::types::ErrorCode,
9    errors::graph_service::{AuthenticateError, ExecuteError, ExecuteJsonError, SignoutError},
10    types::ExecutionResponse,
11};
12use serde::de::DeserializeOwned;
13
14use super::query::{GraphQuery, GraphQueryError, GraphQueryOutput};
15
16//
17//
18//
19struct GraphConnection<T>
20where
21    T: Transport,
22    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
23    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
24        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
25{
26    service: GraphServiceImpl<BinaryProtocol, T>,
27}
28
29impl<T> GraphConnection<T>
30where
31    T: Transport,
32    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
33    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
34        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
35{
36    fn new(transport: T) -> Self {
37        Self {
38            service: GraphServiceImpl::<BinaryProtocol, _>::new(transport),
39        }
40    }
41}
42
43//
44//
45//
46pub struct GraphClient<T>
47where
48    T: Transport,
49    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
50    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
51        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
52{
53    connection: GraphConnection<T>,
54}
55
56impl<T> GraphClient<T>
57where
58    T: Transport,
59    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
60    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
61        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
62{
63    pub fn new(transport: T) -> Self {
64        Self {
65            connection: GraphConnection::new(transport),
66        }
67    }
68
69    #[allow(clippy::ptr_arg)]
70    pub async fn authenticate(
71        self,
72        username: &Vec<u8>,
73        password: &Vec<u8>,
74    ) -> Result<GraphSession<T>, AuthenticateError> {
75        let res = self
76            .connection
77            .service
78            .authenticate(username, password)
79            .await?;
80
81        if res.error_code != ErrorCode::SUCCEEDED {
82            return Err(ApplicationException::new(
83                ApplicationExceptionErrorCode::Unknown,
84                res.error_msg
85                    .map(|x| String::from_utf8_lossy(&x).to_string())
86                    .unwrap_or_else(|| "Unknown".to_owned()),
87            )
88            .into());
89        }
90        let session_id = res.session_id.ok_or_else(|| {
91            ApplicationException::new(
92                ApplicationExceptionErrorCode::InternalError,
93                "Missing session_id".to_owned(),
94            )
95        })?;
96
97        Ok(GraphSession::new(self.connection, session_id))
98    }
99}
100
101//
102//
103//
104pub struct GraphSession<T>
105where
106    T: Transport,
107    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
108    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
109        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
110{
111    connection: GraphConnection<T>,
112    session_id: i64,
113    close_required: bool,
114}
115
116impl<T> GraphSession<T>
117where
118    T: Transport,
119    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
120    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
121        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
122{
123    fn new(connection: GraphConnection<T>, session_id: i64) -> Self {
124        Self {
125            connection,
126            session_id,
127            close_required: false,
128        }
129    }
130
131    pub async fn signout(self) -> Result<(), SignoutError> {
132        self.connection.service.signout(self.session_id).await
133    }
134
135    #[allow(clippy::ptr_arg)]
136    pub async fn execute(&mut self, stmt: &Vec<u8>) -> Result<ExecutionResponse, ExecuteError> {
137        let res = match self.connection.service.execute(self.session_id, stmt).await {
138            Ok(res) => res,
139            Err(ExecuteError::ThriftError(err)) => {
140                if let Some(io_err) = err.downcast_ref::<IoError>() {
141                    // "ExecuteError Broken pipe (os error 32)"
142                    if io_err.kind() == IoErrorKind::BrokenPipe {
143                        self.close_required = true;
144                    }
145                }
146
147                return Err(ExecuteError::ThriftError(err));
148            }
149            Err(err) => return Err(err),
150        };
151
152        match res.error_code {
153            ErrorCode::E_SESSION_INVALID | ErrorCode::E_SESSION_TIMEOUT => {
154                self.close_required = true;
155            }
156            _ => {}
157        }
158
159        Ok(res)
160    }
161
162    #[allow(clippy::ptr_arg)]
163    pub async fn execute_json(&mut self, stmt: &Vec<u8>) -> Result<Vec<u8>, ExecuteJsonError> {
164        let res = match self
165            .connection
166            .service
167            .executeJson(self.session_id, stmt)
168            .await
169        {
170            Ok(res) => res,
171            Err(ExecuteJsonError::ThriftError(err)) => {
172                if let Some(io_err) = err.downcast_ref::<IoError>() {
173                    // "ExecuteJsonError Broken pipe (os error 32)"
174                    if io_err.kind() == IoErrorKind::BrokenPipe {
175                        self.close_required = true;
176                    }
177                }
178
179                return Err(ExecuteJsonError::ThriftError(err));
180            }
181            Err(err) => return Err(err),
182        };
183
184        Ok(res)
185    }
186
187    pub fn is_close_required(&self) -> bool {
188        self.close_required
189    }
190}
191
192//
193//
194//
195#[async_trait]
196impl<T> GraphQuery for GraphSession<T>
197where
198    T: Transport + Send + Sync,
199    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
200    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
201        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
202{
203    async fn query_as<D: DeserializeOwned>(
204        &mut self,
205        stmt: &Vec<u8>,
206    ) -> Result<GraphQueryOutput<D>, GraphQueryError> {
207        let res = self
208            .execute(stmt)
209            .await
210            .map_err(GraphQueryError::ExecuteError)?;
211
212        if res.error_code != ErrorCode::SUCCEEDED {
213            return Err(GraphQueryError::ResponseError(
214                res.error_code,
215                res.error_msg,
216            ));
217        }
218
219        GraphQueryOutput::new(res)
220    }
221}