rusqlite_pool/lib.rs
1//! A minimal connection pool for rusqlite.
2
3use crossbeam::queue::ArrayQueue;
4use rusqlite::Connection;
5use std::sync::Arc;
6
7#[cfg(feature = "tokio")]
8pub mod tokio;
9
10/// A pool of [`rusqlite::Connection`]s.
11///
12/// Internally, the pool is represented with a fixed-capacity, thread-safe
13/// queue.
14#[derive(Clone)]
15pub struct ConnectionPool {
16 queue: Arc<ArrayQueue<Connection>>,
17}
18
19/// A temporary handle to a [`rusqlite::Connection`] provided by a
20/// [`ConnectionPool`].
21///
22/// Upon `drop`, the inner `Connection` is placed back in the pool's inner idle
23/// queue for future use.
24///
25/// As a result, in async or multi-threaded environments, care should be taken
26/// to avoid holding onto a `ConnectionHandle` any longer than necessary to
27/// avoid blocking access to connections elsewhere.
28pub struct ConnectionHandle {
29 conn: Option<Connection>,
30 queue: Arc<ArrayQueue<Connection>>,
31}
32
33const EXPECT_QUEUE_LEN: &str = "cannot exceed fixed queue size";
34const EXPECT_CONN_SOME: &str = "connection cannot be `None`";
35
36impl ConnectionPool {
37 /// Create a new connection pool.
38 ///
39 /// This opens `capacity` number of connections using `new_conn_fn` and adds
40 /// them to the inner queue.
41 ///
42 /// If any of the connections fail to open, all previously successful
43 /// connections (if any) are dropped and the error is returned.
44 pub fn new<F>(capacity: usize, new_conn_fn: F) -> Result<Self, rusqlite::Error>
45 where
46 F: Fn() -> Result<Connection, rusqlite::Error>,
47 {
48 let queue = Arc::new(ArrayQueue::new(capacity));
49 for _ in 0..capacity {
50 let conn = new_conn_fn()?;
51 queue.push(conn).expect(EXPECT_QUEUE_LEN);
52 }
53 Ok(Self { queue })
54 }
55
56 /// Pop a connection from the queue if one is available.
57 ///
58 /// If `None` is returned, all connections are currently in use.
59 ///
60 /// The inner connection is returned to the pool upon dropping the handle.
61 pub fn pop(&self) -> Option<ConnectionHandle> {
62 self.queue.pop().map(|conn| ConnectionHandle {
63 conn: Some(conn),
64 queue: self.queue.clone(),
65 })
66 }
67
68 /// The total number of simultaneous connections managed by the pool,
69 /// specified by the user upon construction.
70 pub fn capacity(&self) -> usize {
71 self.queue.capacity()
72 }
73
74 /// Returns `true` if the inner idle queue is full, i.e. all `Connection`s
75 /// are available for use.
76 pub fn all_connections_ready(&self) -> bool {
77 self.queue.is_full()
78 }
79
80 /// Manually close the pool and all connections in the inner queue.
81 ///
82 /// Returns the `Connection::close` result for each connection in the queue.
83 ///
84 /// If it is necessary that results are returned for all connections, care
85 /// must be taken to ensure all [`ConnectionHandle`]s are dropped and that
86 /// [`all_connections_ready`][Self::all_connections_ready] returns `true`
87 /// before calling this method. Otherwise, connections not in the queue will
88 /// be closed upon the last `ConnectionHandle` dropping.
89 ///
90 /// All connections closed during this call will be unavailable in future
91 /// calls to [`pop`][ConnectionPool::pop].
92 pub fn close(&self) -> Vec<Result<(), (Connection, rusqlite::Error)>> {
93 let mut res = vec![];
94 while let Some(conn) = self.queue.pop() {
95 res.push(conn.close());
96 }
97 res
98 }
99}
100
101impl AsRef<Connection> for ConnectionHandle {
102 fn as_ref(&self) -> &Connection {
103 self
104 }
105}
106
107impl core::ops::Deref for ConnectionHandle {
108 type Target = Connection;
109 fn deref(&self) -> &Self::Target {
110 self.conn.as_ref().expect(EXPECT_CONN_SOME)
111 }
112}
113
114impl core::ops::DerefMut for ConnectionHandle {
115 fn deref_mut(&mut self) -> &mut Self::Target {
116 self.conn.as_mut().expect(EXPECT_CONN_SOME)
117 }
118}
119
120impl core::borrow::Borrow<Connection> for ConnectionHandle {
121 fn borrow(&self) -> &Connection {
122 self
123 }
124}
125
126impl core::borrow::BorrowMut<Connection> for ConnectionHandle {
127 fn borrow_mut(&mut self) -> &mut Connection {
128 &mut *self
129 }
130}
131
132impl Drop for ConnectionHandle {
133 fn drop(&mut self) {
134 // Return the connection to the pool's queue.
135 let conn = self.conn.take().expect(EXPECT_CONN_SOME);
136 self.queue.push(conn).expect(EXPECT_QUEUE_LEN);
137 }
138}