rusqlite_pool/
lib.rs

1//! A minimal connection pool for rusqlite.
2
3use crossbeam::queue::ArrayQueue;
4use rusqlite::Connection;
5use std::sync::Arc;
6
7#[cfg(feature = "tokio")]
8pub mod tokio;
9
10/// A pool of [`rusqlite::Connection`]s.
11///
12/// Internally, the pool is represented with a fixed-capacity, thread-safe
13/// queue.
14#[derive(Clone)]
15pub struct ConnectionPool {
16    queue: Arc<ArrayQueue<Connection>>,
17}
18
19/// A temporary handle to a [`rusqlite::Connection`] provided by a
20/// [`ConnectionPool`].
21///
22/// Upon `drop`, the inner `Connection` is placed back in the pool's inner idle
23/// queue for future use.
24///
25/// As a result, in async or multi-threaded environments, care should be taken
26/// to avoid holding onto a `ConnectionHandle` any longer than necessary to
27/// avoid blocking access to connections elsewhere.
28pub struct ConnectionHandle {
29    conn: Option<Connection>,
30    queue: Arc<ArrayQueue<Connection>>,
31}
32
33const EXPECT_QUEUE_LEN: &str = "cannot exceed fixed queue size";
34const EXPECT_CONN_SOME: &str = "connection cannot be `None`";
35
36impl ConnectionPool {
37    /// Create a new connection pool.
38    ///
39    /// This opens `capacity` number of connections using `new_conn_fn` and adds
40    /// them to the inner queue.
41    ///
42    /// If any of the connections fail to open, all previously successful
43    /// connections (if any) are dropped and the error is returned.
44    pub fn new<F>(capacity: usize, new_conn_fn: F) -> Result<Self, rusqlite::Error>
45    where
46        F: Fn() -> Result<Connection, rusqlite::Error>,
47    {
48        let queue = Arc::new(ArrayQueue::new(capacity));
49        for _ in 0..capacity {
50            let conn = new_conn_fn()?;
51            queue.push(conn).expect(EXPECT_QUEUE_LEN);
52        }
53        Ok(Self { queue })
54    }
55
56    /// Pop a connection from the queue if one is available.
57    ///
58    /// If `None` is returned, all connections are currently in use.
59    ///
60    /// The inner connection is returned to the pool upon dropping the handle.
61    pub fn pop(&self) -> Option<ConnectionHandle> {
62        self.queue.pop().map(|conn| ConnectionHandle {
63            conn: Some(conn),
64            queue: self.queue.clone(),
65        })
66    }
67
68    /// The total number of simultaneous connections managed by the pool,
69    /// specified by the user upon construction.
70    pub fn capacity(&self) -> usize {
71        self.queue.capacity()
72    }
73
74    /// Returns `true` if the inner idle queue is full, i.e. all `Connection`s
75    /// are available for use.
76    pub fn all_connections_ready(&self) -> bool {
77        self.queue.is_full()
78    }
79
80    /// Manually close the pool and all connections in the inner queue.
81    ///
82    /// Returns the `Connection::close` result for each connection in the queue.
83    ///
84    /// If it is necessary that results are returned for all connections, care
85    /// must be taken to ensure all [`ConnectionHandle`]s are dropped and that
86    /// [`all_connections_ready`][Self::all_connections_ready] returns `true`
87    /// before calling this method. Otherwise, connections not in the queue will
88    /// be closed upon the last `ConnectionHandle` dropping.
89    ///
90    /// All connections closed during this call will be unavailable in future
91    /// calls to [`pop`][ConnectionPool::pop].
92    pub fn close(&self) -> Vec<Result<(), (Connection, rusqlite::Error)>> {
93        let mut res = vec![];
94        while let Some(conn) = self.queue.pop() {
95            res.push(conn.close());
96        }
97        res
98    }
99}
100
101impl AsRef<Connection> for ConnectionHandle {
102    fn as_ref(&self) -> &Connection {
103        self
104    }
105}
106
107impl core::ops::Deref for ConnectionHandle {
108    type Target = Connection;
109    fn deref(&self) -> &Self::Target {
110        self.conn.as_ref().expect(EXPECT_CONN_SOME)
111    }
112}
113
114impl core::ops::DerefMut for ConnectionHandle {
115    fn deref_mut(&mut self) -> &mut Self::Target {
116        self.conn.as_mut().expect(EXPECT_CONN_SOME)
117    }
118}
119
120impl core::borrow::Borrow<Connection> for ConnectionHandle {
121    fn borrow(&self) -> &Connection {
122        self
123    }
124}
125
126impl core::borrow::BorrowMut<Connection> for ConnectionHandle {
127    fn borrow_mut(&mut self) -> &mut Connection {
128        &mut *self
129    }
130}
131
132impl Drop for ConnectionHandle {
133    fn drop(&mut self) {
134        // Return the connection to the pool's queue.
135        let conn = self.conn.take().expect(EXPECT_CONN_SOME);
136        self.queue.push(conn).expect(EXPECT_QUEUE_LEN);
137    }
138}