bb8_nebula/impl_tokio/v1/
graph.rs1use std::io::{Error as IoError, ErrorKind as IoErrorKind};
2
3use async_trait::async_trait;
4use fbthrift_transport::{
5 fbthrift_transport_response_handler::ResponseHandler, AsyncTransport,
6 AsyncTransportConfiguration,
7};
8use nebula_client::{
9 v1::{GraphClient, GraphSession},
10 VersionV1,
11};
12
13use super::super::{TokioSleep, TokioTcpStream};
14use crate::graph::{GraphClientConfiguration, GraphConnectionManager};
15
16pub fn new_graph_connection_manager<H>(
18 client_configuration: GraphClientConfiguration,
19 transport_configuration: AsyncTransportConfiguration<H>,
20) -> GraphConnectionManager<TokioTcpStream, TokioSleep, H, VersionV1>
21where
22 H: ResponseHandler + Send + Sync + 'static + Unpin,
23{
24 GraphConnectionManager::new(client_configuration, transport_configuration)
25}
26
27impl<H> GraphConnectionManager<TokioTcpStream, TokioSleep, H, VersionV1>
29where
30 H: ResponseHandler + Send + Sync + 'static + Unpin,
31{
32 async fn get_async_connection(
33 &self,
34 ) -> Result<GraphSession<AsyncTransport<TokioTcpStream, TokioSleep, H>>, IoError> {
35 let transport = AsyncTransport::with_tokio_tcp_connect(
36 self.client_configuration.tcp_connect_addr(),
37 self.transport_configuration.clone(),
38 )
39 .await?;
40
41 let client = GraphClient::new(transport);
42
43 let mut session = client
44 .authenticate(
45 &self.client_configuration.username,
46 &self.client_configuration.password,
47 )
48 .await
49 .map_err(|err| IoError::new(IoErrorKind::Other, err))?;
50
51 if let Some(ref space) = self.client_configuration.space {
52 session
53 .execute(&format!("USE {space}"))
54 .await
55 .map_err(|err| IoError::new(IoErrorKind::Other, err))?;
56 }
57
58 Ok(session)
59 }
60}
61
62#[async_trait]
63impl<H> bb8::ManageConnection for GraphConnectionManager<TokioTcpStream, TokioSleep, H, VersionV1>
64where
65 H: ResponseHandler + Send + Sync + 'static + Unpin,
66{
67 type Connection = GraphSession<AsyncTransport<TokioTcpStream, TokioSleep, H>>;
68 type Error = IoError;
69
70 async fn connect(&self) -> Result<Self::Connection, Self::Error> {
71 self.get_async_connection().await
72 }
73
74 async fn is_valid(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> {
75 Ok(())
76 }
77
78 fn has_broken(&self, conn: &mut Self::Connection) -> bool {
79 conn.is_close_required()
80 }
81}