1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//! A minimal connection pool for rusqlite.

use crossbeam::queue::ArrayQueue;
use rusqlite::Connection;
use std::sync::Arc;

#[cfg(feature = "tokio")]
pub mod tokio;

/// A pool of [`rusqlite::Connection`]s.
///
/// Internally, the pool is represented with a fixed-capacity, thread-safe
/// queue.
pub struct ConnectionPool {
    queue: Arc<ArrayQueue<Connection>>,
}

/// A temporary handle to a [`rusqlite::Connection`] provided by a
/// [`ConnectionPool`].
///
/// Upon [`ConnectionHandle::access`] or `drop`, the inner `Connection` is
/// placed back in the pool's inner idle queue for future use.
///
/// As a result, in async or multi-threaded environments, care should be taken
/// to avoid holding onto a `ConnectionHandle` any longer than necessary to
/// avoid blocking access to connections elsewhere.
pub struct ConnectionHandle {
    conn: Option<Connection>,
    queue: Arc<ArrayQueue<Connection>>,
}

const EXPECT_QUEUE_LEN: &str = "cannot exceed fixed queue size";
const EXPECT_CONN_SOME: &str = "connection cannot be `None`";

impl ConnectionPool {
    /// Create a new connection pool.
    ///
    /// This opens `capacity` number of connections using `new_conn_fn` and adds
    /// them to the inner queue.
    ///
    /// If any of the connections fail to open, all previously successful
    /// connections (if any) are dropped and the error is returned.
    pub fn new<F>(capacity: usize, new_conn_fn: F) -> Result<Self, rusqlite::Error>
    where
        F: Fn() -> Result<Connection, rusqlite::Error>,
    {
        let queue = Arc::new(ArrayQueue::new(capacity));
        for _ in 0..capacity {
            let conn = new_conn_fn()?;
            queue.push(conn).expect(EXPECT_QUEUE_LEN);
        }
        Ok(Self { queue })
    }

    /// Pop a connection from the queue if one is available.
    ///
    /// If `None` is returned, all connections are currently in use.
    pub fn pop(&self) -> Option<ConnectionHandle> {
        self.queue.pop().map(|conn| ConnectionHandle {
            conn: Some(conn),
            queue: self.queue.clone(),
        })
    }

    /// The total number of simultaneous connections managed by the pool,
    /// specified by the user upon construction.
    pub fn capacity(&self) -> usize {
        self.queue.capacity()
    }

    /// Returns `true` if the inner idle queue is full, i.e. all `Connection`s
    /// are available for use.
    pub fn all_connections_ready(&self) -> bool {
        self.queue.is_full()
    }

    /// Manually close the pool and all connections in the inner queue.
    ///
    /// Returns the `Connection::close` result for each connection in the idle
    /// queue.
    ///
    /// If it is necessary that results are returned for all connections, care
    /// must be taken to ensure all [`ConnectionHandle`]s are dropped and that
    /// [`all_connections_ready`][Self::all_connections_ready] returns `true`
    /// before calling this method. Otherwise, connections not in the queue will
    /// be closed upon the last `ConnectionHandle` dropping.
    pub fn close(self) -> Vec<Result<(), (Connection, rusqlite::Error)>> {
        let mut res = vec![];
        while let Some(conn) = self.queue.pop() {
            res.push(conn.close());
        }
        res
    }
}

impl ConnectionHandle {
    /// Consume the `ConnectionHandle` and provide access to the inner
    /// [`rusqlite::Connection`] via the given function.
    ///
    /// After the given function is called, the connection is immediately placed
    /// back on the `Pool`s idle queue and the handle is dropped.
    pub fn access<O>(mut self, f: impl FnOnce(&mut Connection) -> O) -> O {
        let mut conn = self.conn.take().expect(EXPECT_CONN_SOME);
        let output = f(&mut conn);
        self.queue.push(conn).expect(EXPECT_QUEUE_LEN);
        output
    }
}

impl Drop for ConnectionHandle {
    fn drop(&mut self) {
        // Only `Some` in the case that `ConnectionHandle::access` was not called.
        if let Some(conn) = self.conn.take() {
            self.queue.push(conn).expect(EXPECT_QUEUE_LEN);
        }
    }
}