tokio_rusqlite_new/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![forbid(unsafe_code)]
4#![warn(missing_docs)]
5
6use std::fmt::Debug;
7
8use crossbeam_channel::{Receiver, Sender};
9use tokio::sync::oneshot;
10
11mod error;
12pub use error::*;
13
14enum Message {
15    Execute(Box<dyn FnOnce(&mut rusqlite::Connection) + Send + 'static>),
16    Close(oneshot::Sender<Result<(), rusqlite::Error>>),
17}
18
19/// A handle to call functions in background thread.
20#[derive(Clone)]
21pub struct Connection {
22    sender: Sender<Message>,
23}
24
25impl Debug for Connection {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        f.debug_struct("Connection").finish_non_exhaustive()
28    }
29}
30
31fn event_loop(mut conn: rusqlite::Connection, receiver: Receiver<Message>) {
32    while let Ok(message) = receiver.recv() {
33        match message {
34            Message::Execute(f) => f(&mut conn),
35            Message::Close(s) => {
36                let result = conn.close();
37
38                match result {
39                    Ok(v) => {
40                        s.send(Ok(v)).expect(BUG_TEXT);
41                        break;
42                    }
43                    Err((c, e)) => {
44                        conn = c;
45                        s.send(Err(e)).expect(BUG_TEXT);
46                    }
47                }
48            }
49        }
50    }
51}
52
53impl Connection {
54    /// Create a new connection from an existing SQLite connection.
55    pub fn new(conn: rusqlite::Connection) -> Self {
56        let (sender, receiver) = crossbeam_channel::unbounded();
57        std::thread::spawn(move || event_loop(conn, receiver));
58        Self { sender }
59    }
60
61    /// Call a function in background thread and get the result
62    /// asynchronously.
63    ///
64    /// # Failure
65    ///
66    /// Will return `Err` if the database connection has been closed.
67    /// Will return `Error::Error` wrapping the inner error if `function` failed.
68    pub async fn call<F, R, E>(&self, function: F) -> Result<R, Error<E>>
69    where
70        F: FnOnce(&mut rusqlite::Connection) -> Result<R, E> + Send + 'static,
71        R: Send + 'static,
72        E: Send + 'static,
73    {
74        self.call_raw(function)
75            .await
76            .map_err(|_| Error::ConnectionClosed)
77            .and_then(|result| result.map_err(Error::Error))
78    }
79
80    /// Call a function in background thread and get the result asynchronously.
81    ///
82    /// # Failure
83    ///
84    /// Will return `Err` if the database connection has been closed.
85    pub async fn call_raw<F, R>(&self, function: F) -> Result<R>
86    where
87        F: FnOnce(&mut rusqlite::Connection) -> R + Send + 'static,
88        R: Send + 'static,
89    {
90        let (sender, receiver) = oneshot::channel::<R>();
91
92        self.sender
93            .send(Message::Execute(Box::new(move |conn| {
94                let value = function(conn);
95                let _ = sender.send(value);
96            })))
97            .map_err(|_| Error::ConnectionClosed)?;
98
99        receiver.await.map_err(|_| Error::ConnectionClosed)
100    }
101
102    /// Call a function in background thread and get the result
103    /// asynchronously.
104    ///
105    /// This method can cause a `panic` if the underlying database connection is closed.
106    /// It is a more user-friendly alternative to the [`Connection::call`] method.
107    /// It should be safe if the connection is never explicitly closed (using the [`Connection::close`] call).
108    ///
109    /// Calling this on a closed connection will cause a `panic`.
110    pub async fn call_unwrap<F, R>(&self, function: F) -> R
111    where
112        F: FnOnce(&mut rusqlite::Connection) -> R + Send + 'static,
113        R: Send + 'static,
114    {
115        let (sender, receiver) = oneshot::channel::<R>();
116
117        self.sender
118            .send(Message::Execute(Box::new(move |conn| {
119                let value = function(conn);
120                let _ = sender.send(value);
121            })))
122            .expect("database connection should be open");
123
124        receiver.await.expect(BUG_TEXT)
125    }
126
127    /// Close the database connection.
128    ///
129    /// This is functionally equivalent to the `Drop` implementation for
130    /// `Connection`. It consumes the `Connection`, but on error returns it
131    /// to the caller for retry purposes.
132    ///
133    /// If successful, any following `close` operations performed
134    /// on `Connection` copies will succeed immediately.
135    ///
136    /// On the other hand, any calls to [`Connection::call`] will return a [`Error::ConnectionClosed`],
137    /// and any calls to [`Connection::call_unwrap`] will cause a `panic`.
138    ///
139    /// # Failure
140    ///
141    /// Will return `Err` if the underlying SQLite close call fails.
142    pub async fn close(self) -> Result<()> {
143        let (sender, receiver) = oneshot::channel::<Result<(), rusqlite::Error>>();
144
145        if let Err(crossbeam_channel::SendError(_)) = self.sender.send(Message::Close(sender)) {
146            // If the channel is closed on the other side, it means the connection closed successfully
147            // This is a safeguard against calling close on a `Copy` of the connection
148            return Ok(());
149        }
150
151        match receiver.await {
152            Ok(result) => result.map_err(|e| Error::Close((self, e))),
153            Err(_) => {
154                // If we get a RecvError at this point, it also means the channel closed in the meantime
155                // we can assume the connection is closed
156                Ok(())
157            }
158        }
159    }
160}
161
162impl From<rusqlite::Connection> for Connection {
163    fn from(conn: rusqlite::Connection) -> Self {
164        Self::new(conn)
165    }
166}
167
168use std::path::Path;
169use rusqlite::{Name, OpenFlags};
170
171async fn start<F>(open: F) -> Result<Connection>
172where
173    F: FnOnce() -> rusqlite::Result<rusqlite::Connection> + Send + 'static,
174{
175    let (sender, receiver) = crossbeam_channel::unbounded::<Message>();
176    let (opener_sender, opener_receiver) = oneshot::channel();
177    std::thread::spawn(move || match open() {
178        Ok(conn) => {
179            if let Err(_) = opener_sender.send(Ok(())) { return; }
180            event_loop(conn, receiver);
181        },
182        Err(e) => {
183            let _ = opener_sender.send(Err(e));
184        },
185    });
186    opener_receiver
187        .await
188        .expect(BUG_TEXT)
189        .map(|_| Connection { sender })
190        .map_err(Error::Error)
191}
192
193impl Connection {
194    /// Open a new connection to an SQLite database.
195    ///
196    /// `Connection::open(path)` is equivalent to
197    /// `Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_WRITE |
198    /// OpenFlags::SQLITE_OPEN_CREATE)`.
199    ///
200    /// # Failure
201    ///
202    /// Will return `Err` if `path` cannot be converted to a C-compatible
203    /// string or if the underlying SQLite open call fails.
204    pub async fn open<P: AsRef<Path>>(path: P) -> Result<Connection> {
205        let path = path.as_ref().to_owned();
206        start(move || rusqlite::Connection::open(path)).await
207    }
208
209    /// Open a new connection to an in-memory SQLite database.
210    ///
211    /// # Failure
212    ///
213    /// Will return `Err` if the underlying SQLite open call fails.
214    pub async fn open_in_memory() -> Result<Connection> {
215        start(rusqlite::Connection::open_in_memory).await
216    }
217
218    /// Open a new connection to an SQLite database.
219    ///
220    /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
221    /// description of valid flag combinations.
222    ///
223    /// # Failure
224    ///
225    /// Will return `Err` if `path` cannot be converted to a C-compatible
226    /// string or if the underlying SQLite open call fails.
227    pub async fn open_with_flags<P: AsRef<Path>>(path: P, flags: OpenFlags) -> Result<Self> {
228        let path = path.as_ref().to_owned();
229        start(move || rusqlite::Connection::open_with_flags(path, flags)).await
230    }
231
232    /// Open a new connection to an SQLite database using the specific flags
233    /// and vfs name.
234    ///
235    /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
236    /// description of valid flag combinations.
237    ///
238    /// # Failure
239    ///
240    /// Will return `Err` if either `path` or `vfs` cannot be converted to a
241    /// C-compatible string or if the underlying SQLite open call fails.
242    pub async fn open_with_flags_and_vfs<P: AsRef<Path>>(path: P, flags: OpenFlags, vfs: impl Name) -> Result<Self> {
243        let path = path.as_ref().to_owned();
244        let vfs = vfs.as_cstr()?.to_owned();
245        start(move || rusqlite::Connection::open_with_flags_and_vfs(path, flags, vfs.as_c_str())).await
246    }
247
248    /// Open a new connection to an in-memory SQLite database.
249    ///
250    /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
251    /// description of valid flag combinations.
252    ///
253    /// # Failure
254    ///
255    /// Will return `Err` if the underlying SQLite open call fails.
256    pub async fn open_in_memory_with_flags(flags: OpenFlags) -> Result<Self> {
257        start(move || rusqlite::Connection::open_in_memory_with_flags(flags)).await
258    }
259
260    /// Open a new connection to an in-memory SQLite database using the
261    /// specific flags and vfs name.
262    ///
263    /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
264    /// description of valid flag combinations.
265    ///
266    /// # Failure
267    ///
268    /// Will return `Err` if `vfs` cannot be converted to a C-compatible
269    /// string or if the underlying SQLite open call fails.
270    pub async fn open_in_memory_with_flags_and_vfs(flags: OpenFlags, vfs: impl Name) -> Result<Self> {
271        let vfs = vfs.as_cstr()?.to_owned();
272        start(move || rusqlite::Connection::open_in_memory_with_flags_and_vfs(flags, vfs.as_c_str())).await
273    }
274}