Skip to main content

connection_pool/
tcp.rs

1use crate::{CleanupConfig, ConnectionManager, ConnectionPool, ManagedConnection};
2use std::{sync::Arc, time::Duration};
3use tokio::net::{TcpStream, ToSocketAddrs};
4
5/// Example implementation for TcpStream
6#[derive(Clone)]
7pub struct TcpConnectionManager<A: ToSocketAddrs + Send + Sync + Clone + 'static> {
8    pub address: A,
9}
10
11impl<A> ConnectionManager for TcpConnectionManager<A>
12where
13    A: ToSocketAddrs + Send + Sync + Clone + 'static,
14{
15    type Connection = TcpStream;
16    type Error = std::io::Error;
17    type CreateFut = std::pin::Pin<Box<dyn Future<Output = Result<TcpStream, Self::Error>> + Send>>;
18    type ValidFut<'a> = std::pin::Pin<Box<dyn Future<Output = bool> + Send + 'a>>;
19
20    fn create_connection(&self) -> Self::CreateFut {
21        let addr = self.address.clone();
22        Box::pin(async move { TcpStream::connect(addr).await })
23    }
24
25    fn is_valid<'a>(&'a self, stream: &'a mut Self::Connection) -> Self::ValidFut<'a> {
26        Box::pin(async move {
27            if stream.peer_addr().is_err() {
28                return false;
29            }
30            // try to read a byte without consuming
31            let mut buf = [0u8; 1];
32            match stream.try_read(&mut buf) {
33                Ok(0) => return false,                                         // EOF
34                Ok(_) => {}                                                    // existing readable data
35                Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {} // no data but connection is fine
36                Err(_) => return false,
37            }
38            true
39        })
40    }
41}
42
43/// Convenience type aliases for TCP connections
44pub type TcpConnectionPool<A = std::net::SocketAddr> = ConnectionPool<TcpConnectionManager<A>>;
45
46/// Managed TCP stream
47pub type TcpManagedConnection<A = std::net::SocketAddr> = ManagedConnection<TcpConnectionManager<A>>;
48
49impl<A> TcpConnectionPool<A>
50where
51    A: ToSocketAddrs + Send + Sync + Clone + 'static,
52{
53    /// Create a new TCP connection pool
54    pub fn new_tcp(
55        max_size: Option<usize>,
56        max_idle_time: Option<Duration>,
57        connection_timeout: Option<Duration>,
58        cleanup_config: Option<CleanupConfig>,
59        address: A,
60    ) -> Arc<Self> {
61        log::info!("Creating TCP connection pool");
62        let manager = TcpConnectionManager { address };
63        Self::new(max_size, max_idle_time, connection_timeout, cleanup_config, manager)
64    }
65}