nebula_client/v1/graph/
client.rs

1use std::io::{Error as IoError, ErrorKind as IoErrorKind};
2
3use async_trait::async_trait;
4use bytes::Bytes;
5use fbthrift::{ApplicationException, ApplicationExceptionErrorCode, BinaryProtocol, Transport};
6use nebula_fbthrift_graph_v1::{
7    client::{GraphService, GraphServiceImpl},
8    errors::graph_service::{AuthenticateError, ExecuteError, SignoutError},
9    types::{ErrorCode, ExecutionResponse},
10};
11use serde::de::DeserializeOwned;
12
13use super::query::{GraphQuery, GraphQueryError, GraphQueryOutput};
14
15//
16//
17//
18struct GraphConnection<T>
19where
20    T: Transport,
21    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
22    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
23        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
24{
25    service: GraphServiceImpl<BinaryProtocol, T>,
26}
27
28impl<T> GraphConnection<T>
29where
30    T: Transport,
31    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
32    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
33        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
34{
35    fn new(transport: T) -> Self {
36        Self {
37            service: GraphServiceImpl::<BinaryProtocol, _>::new(transport),
38        }
39    }
40}
41
42//
43//
44//
45pub struct GraphClient<T>
46where
47    T: Transport,
48    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
49    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
50        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
51{
52    connection: GraphConnection<T>,
53}
54
55impl<T> GraphClient<T>
56where
57    T: Transport,
58    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
59    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
60        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
61{
62    pub fn new(transport: T) -> Self {
63        Self {
64            connection: GraphConnection::new(transport),
65        }
66    }
67
68    pub async fn authenticate(
69        self,
70        username: &str,
71        password: &str,
72    ) -> Result<GraphSession<T>, AuthenticateError> {
73        let res = self
74            .connection
75            .service
76            .authenticate(username, password)
77            .await?;
78
79        if res.error_code != ErrorCode::SUCCEEDED {
80            return Err(ApplicationException::new(
81                ApplicationExceptionErrorCode::Unknown,
82                res.error_msg.unwrap_or_else(|| "Unknown".to_owned()),
83            )
84            .into());
85        }
86
87        let session_id = res.session_id.ok_or_else(|| {
88            ApplicationException::new(
89                ApplicationExceptionErrorCode::InternalError,
90                "Missing session_id".to_owned(),
91            )
92        })?;
93
94        Ok(GraphSession::new(self.connection, session_id))
95    }
96}
97
98//
99//
100//
101pub struct GraphSession<T>
102where
103    T: Transport,
104    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
105    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
106        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
107{
108    connection: GraphConnection<T>,
109    session_id: i64,
110    close_required: bool,
111}
112
113impl<T> GraphSession<T>
114where
115    T: Transport,
116    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
117    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
118        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
119{
120    fn new(connection: GraphConnection<T>, session_id: i64) -> Self {
121        Self {
122            connection,
123            session_id,
124            close_required: false,
125        }
126    }
127
128    pub async fn signout(self) -> Result<(), SignoutError> {
129        self.connection.service.signout(self.session_id).await
130    }
131
132    pub async fn execute(&mut self, stmt: &str) -> Result<ExecutionResponse, ExecuteError> {
133        let res = match self.connection.service.execute(self.session_id, stmt).await {
134            Ok(res) => res,
135            Err(ExecuteError::ThriftError(err)) => {
136                if let Some(io_err) = err.downcast_ref::<IoError>() {
137                    // "ExecuteError Broken pipe (os error 32)"
138                    if io_err.kind() == IoErrorKind::BrokenPipe {
139                        self.close_required = true;
140                    }
141                }
142
143                return Err(ExecuteError::ThriftError(err));
144            }
145            Err(err) => return Err(err),
146        };
147
148        /*
149        ResponseError(ErrorCode::E_EXECUTION_ERROR, Some("RPC failure in MetaClient: N6apache6thrift9transport19TTransportExceptionE: AsyncSocketException: connect failed, type = Socket not open, errno = 111 (Connection refused): Connection refused"))
150        */
151
152        match res.error_code {
153            ErrorCode::E_SESSION_INVALID | ErrorCode::E_SESSION_TIMEOUT => {
154                self.close_required = true;
155            }
156            _ => {}
157        }
158
159        Ok(res)
160    }
161
162    pub fn is_close_required(&self) -> bool {
163        self.close_required
164    }
165}
166
167//
168//
169//
170#[async_trait]
171impl<T> GraphQuery for GraphSession<T>
172where
173    T: Transport + Send + Sync,
174    Bytes: ::fbthrift::Framing<DecBuf = ::fbthrift::FramingDecoded<T>>,
175    ::fbthrift::ProtocolEncoded<BinaryProtocol>:
176        ::fbthrift::BufMutExt<Final = ::fbthrift::FramingEncodedFinal<T>>,
177{
178    async fn query_as<D: DeserializeOwned>(
179        &mut self,
180        stmt: &str,
181    ) -> Result<GraphQueryOutput<D>, GraphQueryError> {
182        let res = self
183            .execute(stmt)
184            .await
185            .map_err(GraphQueryError::ExecuteError)?;
186
187        if res.error_code != ErrorCode::SUCCEEDED {
188            return Err(GraphQueryError::ResponseError(
189                res.error_code,
190                res.error_msg,
191            ));
192        }
193
194        GraphQueryOutput::new(res)
195    }
196}