rusqlite_pool/
tokio.rs

1//! [`AsyncConnectionPool`] and [`AsyncConnectionHandle`] implemented using tokio semaphores.
2
3use rusqlite::Connection;
4use std::sync::Arc;
5use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore, TryAcquireError};
6
7/// A thin wrapper around a sync [`ConnectionPool`][crate::ConnectionPool]
8/// providing orderly async access to DB connections via a [`Semaphore`].
9#[derive(Clone)]
10pub struct AsyncConnectionPool {
11    pool: crate::ConnectionPool,
12    semaphore: Arc<Semaphore>,
13}
14
15/// A thin wrapper around a [`ConnectionHandle`][crate::ConnectionHandle] that
16/// manages the associated permit that was provided by the pool's semaphore.
17///
18/// Upon `drop`, the connection handle is dropped first returning the connection
19/// to the pool's queue, then the permit is dropped indicating avaialability to
20/// the pool's semaphore.
21pub struct AsyncConnectionHandle {
22    // NOTE(MAINTAINERS):
23    // These fields are intentionally ordered in drop order!
24    // The connection must be dropped prior to the permit!
25    conn: crate::ConnectionHandle,
26    /// Hold the permit, so that it may be dropped immediately after the
27    /// `ConnectionHandle` is dropped.
28    _permit: OwnedSemaphorePermit,
29}
30
31impl AsyncConnectionPool {
32    /// Create a new connection pool.
33    ///
34    /// All connections are created using `new_conn_fn` and queued for use prior
35    /// to returning.
36    ///
37    /// If any of the connections fail to open, all previously successful
38    /// connections (if any) are dropped and the error is returned.
39    pub fn new<F>(conn_limit: usize, new_conn_fn: F) -> Result<Self, rusqlite::Error>
40    where
41        F: Fn() -> Result<Connection, rusqlite::Error>,
42    {
43        let pool = crate::ConnectionPool::new(conn_limit, new_conn_fn)?;
44        let semaphore = Arc::new(Semaphore::new(conn_limit));
45        Ok(Self { pool, semaphore })
46    }
47
48    /// The total number of simultaneous connections managed by the pool,
49    /// specified by the user upon construction.
50    pub fn capacity(&self) -> usize {
51        self.pool.capacity()
52    }
53
54    /// Returns `true` if the inner idle queue is full, i.e. all `Connection`s
55    /// are available for use.
56    pub fn all_connections_ready(&self) -> bool {
57        self.pool.all_connections_ready()
58    }
59
60    /// Acquire a connection from the idle queue.
61    ///
62    /// Awaits a permit from the inner semaphore before acquiring the connection.
63    pub async fn acquire(&self) -> Result<AsyncConnectionHandle, AcquireError> {
64        let _permit = Semaphore::acquire_owned(self.semaphore.clone()).await?;
65        let conn = self.pool.pop().expect("permit guarantees availability");
66        Ok(AsyncConnectionHandle { conn, _permit })
67    }
68
69    /// Attempt to acquire a connection from the idle queue if one is available.
70    ///
71    /// Returns a `TryAcquireError` if there are no permits available, or if the
72    /// semaphore has been closed.
73    pub fn try_acquire(&self) -> Result<AsyncConnectionHandle, TryAcquireError> {
74        let _permit = Semaphore::try_acquire_owned(self.semaphore.clone())?;
75        let conn = self.pool.pop().expect("permit guarantees availability");
76        Ok(AsyncConnectionHandle { conn, _permit })
77    }
78
79    /// Close the inner `semaphore` and all connections in the queue.
80    ///
81    /// All following calls to `acquire` or `try_acquire` will return
82    /// immediately with `Err`.
83    ///
84    /// Returns the [`Connection::close`][rusqlite::Connection::close] result
85    /// for each connection in the queue.
86    ///
87    /// Ensure all [`AsyncConnectionHandle`]s are dropped before calling `close`
88    /// to properly handle all connection results. Otherwise, connections not in
89    /// the queue will be closed upon the last `AsyncConnectionHandle` dropping.
90    pub fn close(&self) -> Vec<Result<(), (rusqlite::Connection, rusqlite::Error)>> {
91        self.semaphore.close();
92        self.pool.close()
93    }
94
95    /// Returns whether or not the pool has been closed.
96    pub fn is_closed(&self) -> bool {
97        self.semaphore.is_closed()
98    }
99}
100
101impl AsMut<rusqlite::Connection> for AsyncConnectionHandle {
102    fn as_mut(&mut self) -> &mut rusqlite::Connection {
103        self
104    }
105}
106
107impl AsRef<rusqlite::Connection> for AsyncConnectionHandle {
108    fn as_ref(&self) -> &rusqlite::Connection {
109        self
110    }
111}
112
113impl AsRef<crate::ConnectionHandle> for AsyncConnectionHandle {
114    fn as_ref(&self) -> &crate::ConnectionHandle {
115        self
116    }
117}
118
119impl core::ops::Deref for AsyncConnectionHandle {
120    type Target = crate::ConnectionHandle;
121    fn deref(&self) -> &Self::Target {
122        &self.conn
123    }
124}
125
126impl core::ops::DerefMut for AsyncConnectionHandle {
127    fn deref_mut(&mut self) -> &mut Self::Target {
128        &mut self.conn
129    }
130}
131
132impl core::borrow::Borrow<Connection> for AsyncConnectionHandle {
133    fn borrow(&self) -> &Connection {
134        self
135    }
136}
137
138impl core::borrow::BorrowMut<Connection> for AsyncConnectionHandle {
139    fn borrow_mut(&mut self) -> &mut Connection {
140        &mut *self
141    }
142}