bb8_nebula/impl_tokio/v1/
graph.rs

1use std::io::{Error as IoError, ErrorKind as IoErrorKind};
2
3use async_trait::async_trait;
4use fbthrift_transport::{
5    fbthrift_transport_response_handler::ResponseHandler, AsyncTransport,
6    AsyncTransportConfiguration,
7};
8use nebula_client::{
9    v1::{GraphClient, GraphSession},
10    VersionV1,
11};
12
13use super::super::{TokioSleep, TokioTcpStream};
14use crate::graph::{GraphClientConfiguration, GraphConnectionManager};
15
16//
17pub fn new_graph_connection_manager<H>(
18    client_configuration: GraphClientConfiguration,
19    transport_configuration: AsyncTransportConfiguration<H>,
20) -> GraphConnectionManager<TokioTcpStream, TokioSleep, H, VersionV1>
21where
22    H: ResponseHandler + Send + Sync + 'static + Unpin,
23{
24    GraphConnectionManager::new(client_configuration, transport_configuration)
25}
26
27//
28impl<H> GraphConnectionManager<TokioTcpStream, TokioSleep, H, VersionV1>
29where
30    H: ResponseHandler + Send + Sync + 'static + Unpin,
31{
32    async fn get_async_connection(
33        &self,
34    ) -> Result<GraphSession<AsyncTransport<TokioTcpStream, TokioSleep, H>>, IoError> {
35        let transport = AsyncTransport::with_tokio_tcp_connect(
36            self.client_configuration.tcp_connect_addr(),
37            self.transport_configuration.clone(),
38        )
39        .await?;
40
41        let client = GraphClient::new(transport);
42
43        let mut session = client
44            .authenticate(
45                &self.client_configuration.username,
46                &self.client_configuration.password,
47            )
48            .await
49            .map_err(|err| IoError::new(IoErrorKind::Other, err))?;
50
51        if let Some(ref space) = self.client_configuration.space {
52            session
53                .execute(&format!("USE {space}"))
54                .await
55                .map_err(|err| IoError::new(IoErrorKind::Other, err))?;
56        }
57
58        Ok(session)
59    }
60}
61
62#[async_trait]
63impl<H> bb8::ManageConnection for GraphConnectionManager<TokioTcpStream, TokioSleep, H, VersionV1>
64where
65    H: ResponseHandler + Send + Sync + 'static + Unpin,
66{
67    type Connection = GraphSession<AsyncTransport<TokioTcpStream, TokioSleep, H>>;
68    type Error = IoError;
69
70    async fn connect(&self) -> Result<Self::Connection, Self::Error> {
71        self.get_async_connection().await
72    }
73
74    async fn is_valid(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> {
75        Ok(())
76    }
77
78    fn has_broken(&self, conn: &mut Self::Connection) -> bool {
79        conn.is_close_required()
80    }
81}