1use crate::{CleanupConfig, ConnectionManager, ConnectionPool, ManagedConnection};
2use std::{sync::Arc, time::Duration};
3use tokio::net::{TcpStream, ToSocketAddrs};
4
5pub struct TcpConnectionManager<A: ToSocketAddrs + Send + Sync + Clone + 'static> {
7 pub address: A,
8}
9
10impl<A> ConnectionManager for TcpConnectionManager<A>
11where
12 A: ToSocketAddrs + Send + Sync + Clone + 'static,
13{
14 type Connection = TcpStream;
15 type Error = std::io::Error;
16 type CreateFut = std::pin::Pin<Box<dyn Future<Output = Result<TcpStream, Self::Error>> + Send>>;
17 type ValidFut<'a> = std::pin::Pin<Box<dyn Future<Output = bool> + Send + 'a>>;
18
19 fn create_connection(&self) -> Self::CreateFut {
20 let addr = self.address.clone();
21 Box::pin(async move { TcpStream::connect(addr).await })
22 }
23
24 fn is_valid<'a>(&'a self, stream: &'a mut Self::Connection) -> Self::ValidFut<'a> {
25 Box::pin(async move {
26 let interest = tokio::io::Interest::READABLE | tokio::io::Interest::WRITABLE;
27 if let Ok(r) = stream.ready(interest).await {
28 return r.is_readable() && r.is_writable();
29 }
30 false
31 })
32 }
33}
34
35pub type TcpConnectionPool<A = std::net::SocketAddr> = ConnectionPool<TcpConnectionManager<A>>;
37
38pub type TcpManagedConnection<A = std::net::SocketAddr> = ManagedConnection<TcpConnectionManager<A>>;
40
41impl<A> TcpConnectionPool<A>
42where
43 A: ToSocketAddrs + Send + Sync + Clone + 'static,
44{
45 pub fn new_tcp(
47 max_size: Option<usize>,
48 max_idle_time: Option<Duration>,
49 connection_timeout: Option<Duration>,
50 cleanup_config: Option<CleanupConfig>,
51 address: A,
52 ) -> Arc<Self> {
53 log::info!("Creating TCP connection pool");
54 let manager = TcpConnectionManager { address };
55 Self::new(max_size, max_idle_time, connection_timeout, cleanup_config, manager)
56 }
57}