mobc_nebula/impl_tokio/v3/
graph.rs1use std::io::{Error as IoError, ErrorKind as IoErrorKind};
2
3use fbthrift_transport::{
4 fbthrift_transport_response_handler::ResponseHandler, AsyncTransport,
5 AsyncTransportConfiguration,
6};
7use mobc::async_trait;
8use nebula_client::{
9 v3::{GraphClient, GraphSession},
10 VersionV3,
11};
12
13use super::super::{TokioSleep, TokioTcpStream};
14use crate::graph::{GraphClientConfiguration, GraphConnectionManager};
15
16pub fn new_graph_connection_manager<H>(
18 client_configuration: GraphClientConfiguration,
19 transport_configuration: AsyncTransportConfiguration<H>,
20) -> GraphConnectionManager<TokioTcpStream, TokioSleep, H, VersionV3>
21where
22 H: ResponseHandler + Send + Sync + 'static + Unpin,
23{
24 GraphConnectionManager::new(client_configuration, transport_configuration)
25}
26
27impl<H> GraphConnectionManager<TokioTcpStream, TokioSleep, H, VersionV3>
29where
30 H: ResponseHandler + Send + Sync + 'static + Unpin,
31{
32 async fn get_async_connection(
33 &self,
34 ) -> Result<GraphSession<AsyncTransport<TokioTcpStream, TokioSleep, H>>, IoError> {
35 let transport = AsyncTransport::with_tokio_tcp_connect(
36 self.client_configuration.tcp_connect_addr(),
37 self.transport_configuration.clone(),
38 )
39 .await?;
40 let client = GraphClient::new(transport);
41
42 let mut session = client
43 .authenticate(
44 &self.client_configuration.username.as_bytes().to_vec(),
45 &self.client_configuration.password.as_bytes().to_vec(),
46 )
47 .await
48 .map_err(|err| IoError::new(IoErrorKind::Other, err))?;
49
50 if let Some(ref space) = self.client_configuration.space {
51 session
52 .execute(&format!("USE {space}").as_bytes().to_vec())
53 .await
54 .map_err(|err| IoError::new(IoErrorKind::Other, err))?;
55 }
56
57 Ok(session)
58 }
59}
60
61#[async_trait]
62impl<H> mobc::Manager for GraphConnectionManager<TokioTcpStream, TokioSleep, H, VersionV3>
63where
64 H: ResponseHandler + Send + Sync + 'static + Unpin,
65{
66 type Connection = GraphSession<AsyncTransport<TokioTcpStream, TokioSleep, H>>;
67 type Error = IoError;
68
69 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
70 self.get_async_connection().await
71 }
72
73 async fn check(&self, mut conn: Self::Connection) -> Result<Self::Connection, Self::Error> {
74 conn.execute(&"SHOW CHARSET".as_bytes().to_vec())
75 .await
76 .map_err(|err| IoError::new(IoErrorKind::Other, err))?;
77
78 Ok(conn)
79 }
80
81 fn validate(&self, conn: &mut Self::Connection) -> bool {
82 !conn.is_close_required()
83 }
84}