rdbc_rs/future/
database.rs

1//! Default [`super::ConnectionPool`] implementation.
2
3use std::sync::{Arc, Mutex};
4
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7
8use crate::{driver, BoxedDriver};
9
10use super::{driver::AsyncDriver, ConnectionPool, Preparable, Statement, Transaction};
11
12use anyhow::Result;
13
14/// Default [`super::ConnectionPool`] implementation.
15#[derive(Clone)]
16#[allow(dead_code)]
17pub struct Database {
18    driver: Arc<Mutex<BoxedDriver>>,
19    url: String,
20    conns: Arc<Mutex<Vec<(DateTime<Utc>, Box<dyn driver::Connection>)>>>,
21    max_idle_conns: usize,
22    max_lifetime: chrono::Duration,
23    _driver_name: String,
24}
25
26impl ConnectionPool for Database {
27    /// Implement [`super::ConnectionPool::new`]
28    fn new<S>(driver_name: S, driver: Arc<Mutex<BoxedDriver>>, url: S) -> anyhow::Result<Self>
29    where
30        S: Into<String> + AsRef<str>,
31    {
32        let url: String = url.into();
33        let driver_name = driver_name.into();
34
35        Ok(Self {
36            driver,
37            url,
38            conns: Default::default(),
39            max_idle_conns: 100,
40            max_lifetime: chrono::Duration::hours(1),
41            _driver_name: driver_name,
42        })
43    }
44    /// Implement [`super::ConnectionPool::get_conn`]
45    fn get_conn(&self) -> anyhow::Result<Box<dyn driver::Connection>> {
46        let mut conns = self.conns.lock().unwrap();
47
48        if !conns.is_empty() {
49            let (_, conn) = conns.remove(0);
50            return Ok(conn);
51        }
52
53        let conn = self.driver.lock().unwrap().open(&self.url)?;
54
55        Ok(conn)
56    }
57
58    /// Implement [`super::ConnectionPool::release_conn`]
59    fn release_conn(&self, conn: Box<dyn driver::Connection>) {
60        let mut conns = self.conns.lock().unwrap();
61
62        if conns.len() == self.max_idle_conns {
63            conns.remove(0);
64        }
65
66        conns.push((Utc::now(), conn));
67    }
68}
69
70impl Database {
71    /// Start new transaction
72    pub async fn begin(&self) -> Result<Transaction<Database>> {
73        let mut conn = self.get_conn()?;
74
75        let async_driver = AsyncDriver::new();
76
77        conn.begin(async_driver.callback());
78
79        let tx = async_driver.await?;
80
81        Ok(Transaction::new(
82            self._driver_name.clone(),
83            self.url.clone(),
84            tx,
85            Some(conn),
86            self.clone(),
87        ))
88    }
89}
90
91#[async_trait]
92impl Preparable for Database {
93    type DB = Database;
94    async fn prepare<S>(&mut self, query: S) -> Result<Statement<Self::DB>>
95    where
96        S: Into<String> + Send,
97    {
98        let mut conn = self.get_conn()?;
99
100        let async_driver = AsyncDriver::new();
101
102        conn.prepare(query.into(), async_driver.callback());
103
104        let stmt = async_driver.await?;
105
106        Ok(Statement::new(Some(self.clone()), Some(conn), stmt))
107    }
108
109    fn driver_name(&self) -> &str {
110        &self._driver_name
111    }
112
113    fn conn_str(&self) -> &str {
114        &self.url
115    }
116}