Skip to main content

xitca_postgres/pool/
connect.rs

1use core::future::Future;
2
3use xitca_io::io::AsyncIo;
4
5use crate::{
6    BoxedFuture, Postgres,
7    client::Client,
8    config::Config,
9    driver::{Driver, generic::GenericDriver},
10    error::Error,
11    iter::AsyncLendingIterator,
12};
13
14/// trait for how to produce a new [`Client`] and push it into connection pool
15pub trait Connect: Send + Sync {
16    fn connect(&self, cfg: Config) -> impl Future<Output = Result<Client, Error>> + Send;
17}
18
19pub(super) trait ConnectorDyn: Send + Sync {
20    fn connect_dyn(&self, cfg: Config) -> BoxedFuture<'_, Result<Client, Error>>;
21}
22
23impl<C> ConnectorDyn for C
24where
25    C: Connect + Send + Sync,
26{
27    fn connect_dyn(&self, cfg: Config) -> BoxedFuture<'_, Result<Client, Error>> {
28        Box::pin(self.connect(cfg))
29    }
30}
31
32pub(super) struct DefaultConnector;
33
34impl Connect for DefaultConnector {
35    async fn connect(&self, cfg: Config) -> Result<Client, Error> {
36        let (client, driver) = Postgres::new(cfg).connect().await?;
37        match driver {
38            Driver::Tcp(drv) => {
39                #[cfg(feature = "io-uring")]
40                {
41                    drive_uring(drv)
42                }
43
44                #[cfg(not(feature = "io-uring"))]
45                {
46                    drive(drv)
47                }
48            }
49            Driver::Dynamic(drv) => drive(drv),
50            #[cfg(feature = "tls")]
51            Driver::Tls(drv) => drive(drv),
52            #[cfg(unix)]
53            Driver::Unix(drv) => drive(drv),
54            #[cfg(all(unix, feature = "tls"))]
55            Driver::UnixTls(drv) => drive(drv),
56            #[cfg(feature = "quic")]
57            Driver::Quic(drv) => drive(drv),
58        };
59        Ok(client)
60    }
61}
62
63fn drive(mut drv: GenericDriver<impl AsyncIo + Send + 'static>) {
64    tokio::task::spawn(async move {
65        while drv.try_next().await?.is_some() {
66            // TODO: add notify listen callback to Pool
67        }
68        Ok::<_, Error>(())
69    });
70}
71
72#[cfg(feature = "io-uring")]
73fn drive_uring(drv: GenericDriver<xitca_io::net::TcpStream>) {
74    use core::{async_iter::AsyncIterator, future::poll_fn, pin::pin};
75
76    tokio::task::spawn_local(async move {
77        let mut iter = pin!(crate::driver::io_uring::UringDriver::from_tcp(drv).into_iter());
78        while let Some(res) = poll_fn(|cx| iter.as_mut().poll_next(cx)).await {
79            let _ = res?;
80        }
81        Ok::<_, Error>(())
82    });
83}