rusqlite_pool/tokio.rs
1//! [`AsyncConnectionPool`] and [`AsyncConnectionHandle`] implemented using tokio semaphores.
2
3use rusqlite::Connection;
4use std::sync::Arc;
5use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore, TryAcquireError};
6
7/// A thin wrapper around a sync [`ConnectionPool`][crate::ConnectionPool]
8/// providing orderly async access to DB connections via a [`Semaphore`].
9#[derive(Clone)]
10pub struct AsyncConnectionPool {
11 pool: crate::ConnectionPool,
12 semaphore: Arc<Semaphore>,
13}
14
15/// A thin wrapper around a [`ConnectionHandle`][crate::ConnectionHandle] that
16/// manages the associated permit that was provided by the pool's semaphore.
17///
18/// Upon `drop`, the connection handle is dropped first returning the connection
19/// to the pool's queue, then the permit is dropped indicating avaialability to
20/// the pool's semaphore.
21pub struct AsyncConnectionHandle {
22 // NOTE(MAINTAINERS):
23 // These fields are intentionally ordered in drop order!
24 // The connection must be dropped prior to the permit!
25 conn: crate::ConnectionHandle,
26 /// Hold the permit, so that it may be dropped immediately after the
27 /// `ConnectionHandle` is dropped.
28 _permit: OwnedSemaphorePermit,
29}
30
31impl AsyncConnectionPool {
32 /// Create a new connection pool.
33 ///
34 /// All connections are created using `new_conn_fn` and queued for use prior
35 /// to returning.
36 ///
37 /// If any of the connections fail to open, all previously successful
38 /// connections (if any) are dropped and the error is returned.
39 pub fn new<F>(conn_limit: usize, new_conn_fn: F) -> Result<Self, rusqlite::Error>
40 where
41 F: Fn() -> Result<Connection, rusqlite::Error>,
42 {
43 let pool = crate::ConnectionPool::new(conn_limit, new_conn_fn)?;
44 let semaphore = Arc::new(Semaphore::new(conn_limit));
45 Ok(Self { pool, semaphore })
46 }
47
48 /// The total number of simultaneous connections managed by the pool,
49 /// specified by the user upon construction.
50 pub fn capacity(&self) -> usize {
51 self.pool.capacity()
52 }
53
54 /// Returns `true` if the inner idle queue is full, i.e. all `Connection`s
55 /// are available for use.
56 pub fn all_connections_ready(&self) -> bool {
57 self.pool.all_connections_ready()
58 }
59
60 /// Acquire a connection from the idle queue.
61 ///
62 /// Awaits a permit from the inner semaphore before acquiring the connection.
63 pub async fn acquire(&self) -> Result<AsyncConnectionHandle, AcquireError> {
64 let _permit = Semaphore::acquire_owned(self.semaphore.clone()).await?;
65 let conn = self.pool.pop().expect("permit guarantees availability");
66 Ok(AsyncConnectionHandle { conn, _permit })
67 }
68
69 /// Attempt to acquire a connection from the idle queue if one is available.
70 ///
71 /// Returns a `TryAcquireError` if there are no permits available, or if the
72 /// semaphore has been closed.
73 pub fn try_acquire(&self) -> Result<AsyncConnectionHandle, TryAcquireError> {
74 let _permit = Semaphore::try_acquire_owned(self.semaphore.clone())?;
75 let conn = self.pool.pop().expect("permit guarantees availability");
76 Ok(AsyncConnectionHandle { conn, _permit })
77 }
78
79 /// Close the inner `semaphore` and all connections in the queue.
80 ///
81 /// All following calls to `acquire` or `try_acquire` will return
82 /// immediately with `Err`.
83 ///
84 /// Returns the [`Connection::close`][rusqlite::Connection::close] result
85 /// for each connection in the queue.
86 ///
87 /// Ensure all [`AsyncConnectionHandle`]s are dropped before calling `close`
88 /// to properly handle all connection results. Otherwise, connections not in
89 /// the queue will be closed upon the last `AsyncConnectionHandle` dropping.
90 pub fn close(&self) -> Vec<Result<(), (rusqlite::Connection, rusqlite::Error)>> {
91 self.semaphore.close();
92 self.pool.close()
93 }
94
95 /// Returns whether or not the pool has been closed.
96 pub fn is_closed(&self) -> bool {
97 self.semaphore.is_closed()
98 }
99}
100
101impl AsMut<rusqlite::Connection> for AsyncConnectionHandle {
102 fn as_mut(&mut self) -> &mut rusqlite::Connection {
103 self
104 }
105}
106
107impl AsRef<rusqlite::Connection> for AsyncConnectionHandle {
108 fn as_ref(&self) -> &rusqlite::Connection {
109 self
110 }
111}
112
113impl AsRef<crate::ConnectionHandle> for AsyncConnectionHandle {
114 fn as_ref(&self) -> &crate::ConnectionHandle {
115 self
116 }
117}
118
119impl core::ops::Deref for AsyncConnectionHandle {
120 type Target = crate::ConnectionHandle;
121 fn deref(&self) -> &Self::Target {
122 &self.conn
123 }
124}
125
126impl core::ops::DerefMut for AsyncConnectionHandle {
127 fn deref_mut(&mut self) -> &mut Self::Target {
128 &mut self.conn
129 }
130}
131
132impl core::borrow::Borrow<Connection> for AsyncConnectionHandle {
133 fn borrow(&self) -> &Connection {
134 self
135 }
136}
137
138impl core::borrow::BorrowMut<Connection> for AsyncConnectionHandle {
139 fn borrow_mut(&mut self) -> &mut Connection {
140 &mut *self
141 }
142}