rusqlite_pool/
tokio.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
//! [`AsyncConnectionPool`] and [`AsyncConnectionHandle`] implemented using tokio semaphores.

use rusqlite::Connection;
use std::sync::Arc;
use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore, TryAcquireError};

/// A thin wrapper around a sync [`ConnectionPool`][crate::ConnectionPool]
/// providing orderly async access to DB connections via a [`Semaphore`].
#[derive(Clone)]
pub struct AsyncConnectionPool {
    pool: crate::ConnectionPool,
    semaphore: Arc<Semaphore>,
}

/// A thin wrapper around a [`ConnectionHandle`][crate::ConnectionHandle] that
/// manages the associated permit that was provided by the pool's semaphore.
///
/// Upon `drop`, the connection handle is dropped first returning the connection
/// to the pool's queue, then the permit is dropped indicating avaialability to
/// the pool's semaphore.
pub struct AsyncConnectionHandle {
    // NOTE(MAINTAINERS):
    // These fields are intentionally ordered in drop order!
    // The connection must be dropped prior to the permit!
    conn: crate::ConnectionHandle,
    /// Hold the permit, so that it may be dropped immediately after the
    /// `ConnectionHandle` is dropped.
    _permit: OwnedSemaphorePermit,
}

impl AsyncConnectionPool {
    /// Create a new connection pool.
    ///
    /// All connections are created using `new_conn_fn` and queued for use prior
    /// to returning.
    ///
    /// If any of the connections fail to open, all previously successful
    /// connections (if any) are dropped and the error is returned.
    pub fn new<F>(conn_limit: usize, new_conn_fn: F) -> Result<Self, rusqlite::Error>
    where
        F: Fn() -> Result<Connection, rusqlite::Error>,
    {
        let pool = crate::ConnectionPool::new(conn_limit, new_conn_fn)?;
        let semaphore = Arc::new(Semaphore::new(conn_limit));
        Ok(Self { pool, semaphore })
    }

    /// The total number of simultaneous connections managed by the pool,
    /// specified by the user upon construction.
    pub fn capacity(&self) -> usize {
        self.pool.capacity()
    }

    /// Returns `true` if the inner idle queue is full, i.e. all `Connection`s
    /// are available for use.
    pub fn all_connections_ready(&self) -> bool {
        self.pool.all_connections_ready()
    }

    /// Acquire a connection from the idle queue.
    ///
    /// Awaits a permit from the inner semaphore before acquiring the connection.
    pub async fn acquire(&self) -> Result<AsyncConnectionHandle, AcquireError> {
        let _permit = Semaphore::acquire_owned(self.semaphore.clone()).await?;
        let conn = self.pool.pop().expect("permit guarantees availability");
        Ok(AsyncConnectionHandle { conn, _permit })
    }

    /// Attempt to acquire a connection from the idle queue if one is available.
    ///
    /// Returns a `TryAcquireError` if there are no permits available, or if the
    /// semaphore has been closed.
    pub fn try_acquire(&self) -> Result<AsyncConnectionHandle, TryAcquireError> {
        let _permit = Semaphore::try_acquire_owned(self.semaphore.clone())?;
        let conn = self.pool.pop().expect("permit guarantees availability");
        Ok(AsyncConnectionHandle { conn, _permit })
    }

    /// Close the inner `semaphore` and all connections in the queue.
    ///
    /// All following calls to `acquire` or `try_acquire` will return
    /// immediately with `Err`.
    ///
    /// Returns the [`Connection::close`][rusqlite::Connection::close] result
    /// for each connection in the queue.
    ///
    /// Ensure all [`AsyncConnectionHandle`]s are dropped before calling `close`
    /// to properly handle all connection results. Otherwise, connections not in
    /// the queue will be closed upon the last `AsyncConnectionHandle` dropping.
    pub fn close(&self) -> Vec<Result<(), (rusqlite::Connection, rusqlite::Error)>> {
        self.semaphore.close();
        self.pool.close()
    }

    /// Returns whether or not the pool has been closed.
    pub fn is_closed(&self) -> bool {
        self.semaphore.is_closed()
    }
}

impl AsRef<rusqlite::Connection> for AsyncConnectionHandle {
    fn as_ref(&self) -> &rusqlite::Connection {
        self
    }
}

impl AsRef<crate::ConnectionHandle> for AsyncConnectionHandle {
    fn as_ref(&self) -> &crate::ConnectionHandle {
        self
    }
}

impl core::ops::Deref for AsyncConnectionHandle {
    type Target = crate::ConnectionHandle;
    fn deref(&self) -> &Self::Target {
        &self.conn
    }
}

impl core::ops::DerefMut for AsyncConnectionHandle {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.conn
    }
}

impl core::borrow::Borrow<Connection> for AsyncConnectionHandle {
    fn borrow(&self) -> &Connection {
        self
    }
}

impl core::borrow::BorrowMut<Connection> for AsyncConnectionHandle {
    fn borrow_mut(&mut self) -> &mut Connection {
        &mut *self
    }
}