monoio_transports/pool/
connector.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
use super::{ConnectionPool, Key, Poolable, Pooled};
use crate::connectors::Connector;

/// PooledConnector is a connector with a connection pool.
/// It is designed for non-multiplex transport, like http1.
#[derive(Debug)]
pub struct PooledConnector<C, K, T> {
    transport_connector: C,
    pool: ConnectionPool<K, T>,
}

impl<C, K, T> PooledConnector<C, K, T> {
    #[inline]
    pub const fn new(transport_connector: C, pool: ConnectionPool<K, T>) -> Self {
        Self {
            transport_connector,
            pool,
        }
    }

    #[inline]
    pub fn into_parts(self) -> (C, ConnectionPool<K, T>) {
        (self.transport_connector, self.pool)
    }

    #[inline]
    pub fn transport_connector(&self) -> &C {
        &self.transport_connector
    }

    #[inline]
    pub fn pool(&self) -> &ConnectionPool<K, T> {
        &self.pool
    }
}

impl<C: Default, K: 'static, T: 'static> Default for PooledConnector<C, K, T> {
    #[inline]
    fn default() -> Self {
        Self::new(Default::default(), Default::default())
    }
}

impl<C: Clone, K, T> Clone for PooledConnector<C, K, T> {
    #[inline]
    fn clone(&self) -> Self {
        Self {
            transport_connector: self.transport_connector.clone(),
            pool: self.pool.clone(),
        }
    }
}

impl<C, K: 'static, T: 'static> PooledConnector<C, K, T> {
    #[inline]
    pub fn new_with_default_pool(transport_connector: C) -> Self {
        Self::new(transport_connector, Default::default())
    }
}

impl<C, K: Key, T: Poolable> Connector<K> for PooledConnector<C, K, T>
where
    C: Connector<K, Connection = T>,
{
    type Connection = Pooled<K, T>;
    type Error = C::Error;

    #[inline]
    async fn connect(&self, key: K) -> Result<Self::Connection, Self::Error> {
        if let Some(conn) = self.pool.get(&key) {
            return Ok(conn);
        }
        let key_owned = key.to_owned();
        let io = self.transport_connector.connect(key).await?;
        Ok(self.pool.link(key_owned, io))
    }
}