rusqlite_pool/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
//! A minimal connection pool for rusqlite.

use crossbeam::queue::ArrayQueue;
use rusqlite::Connection;
use std::sync::Arc;

#[cfg(feature = "tokio")]
pub mod tokio;

/// A pool of [`rusqlite::Connection`]s.
///
/// Internally, the pool is represented with a fixed-capacity, thread-safe
/// queue.
#[derive(Clone)]
pub struct ConnectionPool {
    queue: Arc<ArrayQueue<Connection>>,
}

/// A temporary handle to a [`rusqlite::Connection`] provided by a
/// [`ConnectionPool`].
///
/// Upon `drop`, the inner `Connection` is placed back in the pool's inner idle
/// queue for future use.
///
/// As a result, in async or multi-threaded environments, care should be taken
/// to avoid holding onto a `ConnectionHandle` any longer than necessary to
/// avoid blocking access to connections elsewhere.
pub struct ConnectionHandle {
    conn: Option<Connection>,
    queue: Arc<ArrayQueue<Connection>>,
}

const EXPECT_QUEUE_LEN: &str = "cannot exceed fixed queue size";
const EXPECT_CONN_SOME: &str = "connection cannot be `None`";

impl ConnectionPool {
    /// Create a new connection pool.
    ///
    /// This opens `capacity` number of connections using `new_conn_fn` and adds
    /// them to the inner queue.
    ///
    /// If any of the connections fail to open, all previously successful
    /// connections (if any) are dropped and the error is returned.
    pub fn new<F>(capacity: usize, new_conn_fn: F) -> Result<Self, rusqlite::Error>
    where
        F: Fn() -> Result<Connection, rusqlite::Error>,
    {
        let queue = Arc::new(ArrayQueue::new(capacity));
        for _ in 0..capacity {
            let conn = new_conn_fn()?;
            queue.push(conn).expect(EXPECT_QUEUE_LEN);
        }
        Ok(Self { queue })
    }

    /// Pop a connection from the queue if one is available.
    ///
    /// If `None` is returned, all connections are currently in use.
    ///
    /// The inner connection is returned to the pool upon dropping the handle.
    pub fn pop(&self) -> Option<ConnectionHandle> {
        self.queue.pop().map(|conn| ConnectionHandle {
            conn: Some(conn),
            queue: self.queue.clone(),
        })
    }

    /// The total number of simultaneous connections managed by the pool,
    /// specified by the user upon construction.
    pub fn capacity(&self) -> usize {
        self.queue.capacity()
    }

    /// Returns `true` if the inner idle queue is full, i.e. all `Connection`s
    /// are available for use.
    pub fn all_connections_ready(&self) -> bool {
        self.queue.is_full()
    }

    /// Manually close the pool and all connections in the inner queue.
    ///
    /// Returns the `Connection::close` result for each connection in the queue.
    ///
    /// If it is necessary that results are returned for all connections, care
    /// must be taken to ensure all [`ConnectionHandle`]s are dropped and that
    /// [`all_connections_ready`][Self::all_connections_ready] returns `true`
    /// before calling this method. Otherwise, connections not in the queue will
    /// be closed upon the last `ConnectionHandle` dropping.
    ///
    /// All connections closed during this call will be unavailable in future
    /// calls to [`pop`][ConnectionPool::pop].
    pub fn close(&self) -> Vec<Result<(), (Connection, rusqlite::Error)>> {
        let mut res = vec![];
        while let Some(conn) = self.queue.pop() {
            res.push(conn.close());
        }
        res
    }
}

impl AsRef<Connection> for ConnectionHandle {
    fn as_ref(&self) -> &Connection {
        self
    }
}

impl core::ops::Deref for ConnectionHandle {
    type Target = Connection;
    fn deref(&self) -> &Self::Target {
        self.conn.as_ref().expect(EXPECT_CONN_SOME)
    }
}

impl core::ops::DerefMut for ConnectionHandle {
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.conn.as_mut().expect(EXPECT_CONN_SOME)
    }
}

impl core::borrow::Borrow<Connection> for ConnectionHandle {
    fn borrow(&self) -> &Connection {
        self
    }
}

impl core::borrow::BorrowMut<Connection> for ConnectionHandle {
    fn borrow_mut(&mut self) -> &mut Connection {
        &mut *self
    }
}

impl Drop for ConnectionHandle {
    fn drop(&mut self) {
        // Return the connection to the pool's queue.
        let conn = self.conn.take().expect(EXPECT_CONN_SOME);
        self.queue.push(conn).expect(EXPECT_QUEUE_LEN);
    }
}