mobc_nebula/impl_tokio/v3/
graph.rs

1use std::io::{Error as IoError, ErrorKind as IoErrorKind};
2
3use fbthrift_transport::{
4    fbthrift_transport_response_handler::ResponseHandler, AsyncTransport,
5    AsyncTransportConfiguration,
6};
7use mobc::async_trait;
8use nebula_client::{
9    v3::{GraphClient, GraphSession},
10    VersionV3,
11};
12
13use super::super::{TokioSleep, TokioTcpStream};
14use crate::graph::{GraphClientConfiguration, GraphConnectionManager};
15
16//
17pub fn new_graph_connection_manager<H>(
18    client_configuration: GraphClientConfiguration,
19    transport_configuration: AsyncTransportConfiguration<H>,
20) -> GraphConnectionManager<TokioTcpStream, TokioSleep, H, VersionV3>
21where
22    H: ResponseHandler + Send + Sync + 'static + Unpin,
23{
24    GraphConnectionManager::new(client_configuration, transport_configuration)
25}
26
27//
28impl<H> GraphConnectionManager<TokioTcpStream, TokioSleep, H, VersionV3>
29where
30    H: ResponseHandler + Send + Sync + 'static + Unpin,
31{
32    async fn get_async_connection(
33        &self,
34    ) -> Result<GraphSession<AsyncTransport<TokioTcpStream, TokioSleep, H>>, IoError> {
35        let transport = AsyncTransport::with_tokio_tcp_connect(
36            self.client_configuration.tcp_connect_addr(),
37            self.transport_configuration.clone(),
38        )
39        .await?;
40        let client = GraphClient::new(transport);
41
42        let mut session = client
43            .authenticate(
44                &self.client_configuration.username.as_bytes().to_vec(),
45                &self.client_configuration.password.as_bytes().to_vec(),
46            )
47            .await
48            .map_err(|err| IoError::new(IoErrorKind::Other, err))?;
49
50        if let Some(ref space) = self.client_configuration.space {
51            session
52                .execute(&format!("USE {space}").as_bytes().to_vec())
53                .await
54                .map_err(|err| IoError::new(IoErrorKind::Other, err))?;
55        }
56
57        Ok(session)
58    }
59}
60
61#[async_trait]
62impl<H> mobc::Manager for GraphConnectionManager<TokioTcpStream, TokioSleep, H, VersionV3>
63where
64    H: ResponseHandler + Send + Sync + 'static + Unpin,
65{
66    type Connection = GraphSession<AsyncTransport<TokioTcpStream, TokioSleep, H>>;
67    type Error = IoError;
68
69    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
70        self.get_async_connection().await
71    }
72
73    async fn check(&self, mut conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
74        conn.execute(&"SHOW CHARSET".as_bytes().to_vec())
75            .await
76            .map_err(|err| IoError::new(IoErrorKind::Other, err))?;
77
78        Ok(conn)
79    }
80
81    fn validate(&self, conn: &mut Self::Connection) -> bool {
82        !conn.is_close_required()
83    }
84}