tokio_rusqlite/
lib.rs

1//! Asynchronous handle for rusqlite library.
2//!
3//! # Guide
4//!
5//! This library provides [`Connection`] struct. [`Connection`] struct is a handle
6//! to call functions in background thread and can be cloned cheaply.
7//! [`Connection::call`] method calls provided function in the background thread
8//! and returns its result asynchronously.
9//!
10//! # Design
11//!
12//! A thread is spawned for each opened connection handle. When `call` method
13//! is called: provided function is boxed, sent to the thread through mpsc
14//! channel and executed. Return value is then sent by oneshot channel from
15//! the thread and then returned from function.
16//!
17//! # Example
18//!
19//! ```rust,no_run
20//! use tokio_rusqlite::{params, Connection, Result};
21//!
22//! #[derive(Debug)]
23//! struct Person {
24//!     id: i32,
25//!     name: String,
26//!     data: Option<Vec<u8>>,
27//! }
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<()> {
31//!     let conn = Connection::open_in_memory().await?;
32//!
33//!     let people = conn
34//!         .call(|conn| {
35//!             conn.execute(
36//!                 "CREATE TABLE person (
37//!                     id    INTEGER PRIMARY KEY,
38//!                     name  TEXT NOT NULL,
39//!                     data  BLOB
40//!                 )",
41//!                 [],
42//!             )?;
43//!
44//!             let steven = Person {
45//!                 id: 1,
46//!                 name: "Steven".to_string(),
47//!                 data: None,
48//!             };
49//!
50//!             conn.execute(
51//!                 "INSERT INTO person (name, data) VALUES (?1, ?2)",
52//!                 params![steven.name, steven.data],
53//!             )?;
54//!
55//!             let mut stmt = conn.prepare("SELECT id, name, data FROM person")?;
56//!             let people = stmt
57//!                 .query_map([], |row| {
58//!                     Ok(Person {
59//!                         id: row.get(0)?,
60//!                         name: row.get(1)?,
61//!                         data: row.get(2)?,
62//!                     })
63//!                 })?
64//!                 .collect::<std::result::Result<Vec<Person>, rusqlite::Error>>()?;
65//!
66//!             Ok(people)
67//!         })
68//!         .await?;
69//!
70//!     for person in people {
71//!         println!("Found person {:?}", person);
72//!     }
73//!
74//!     Ok(())
75//! }
76//! ```
77
78#![forbid(unsafe_code)]
79#![warn(
80    clippy::await_holding_lock,
81    clippy::cargo_common_metadata,
82    clippy::dbg_macro,
83    clippy::empty_enum,
84    clippy::enum_glob_use,
85    clippy::inefficient_to_string,
86    clippy::mem_forget,
87    clippy::mutex_integer,
88    clippy::needless_continue,
89    clippy::todo,
90    clippy::unimplemented,
91    clippy::wildcard_imports,
92    future_incompatible,
93    missing_docs,
94    missing_debug_implementations,
95    unreachable_pub
96)]
97
98#[cfg(test)]
99mod tests;
100
101use crossbeam_channel::{Receiver, Sender};
102use std::{
103    fmt::{self, Debug, Display},
104    path::Path,
105    thread,
106};
107use tokio::sync::oneshot::{self};
108
109pub use rusqlite::{self, *};
110
111const BUG_TEXT: &str = "bug in tokio-rusqlite, please report";
112
113#[derive(Debug)]
114/// Represents the errors specific for this library.
115#[non_exhaustive]
116pub enum Error<E = rusqlite::Error> {
117    /// The connection to the SQLite has been closed and cannot be queried any more.
118    ConnectionClosed,
119
120    /// An error occured while closing the SQLite connection.
121    /// This `Error` variant contains the [`Connection`], which can be used to retry the close operation
122    /// and the underlying [`rusqlite::Error`] that made it impossile to close the database.
123    Close((Connection, rusqlite::Error)),
124
125    /// An application-specific error occured.
126    Error(E),
127}
128
129impl<E: Display> Display for Error<E> {
130    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131        match self {
132            Error::ConnectionClosed => write!(f, "ConnectionClosed"),
133            Error::Close((_, e)) => write!(f, "Close((Connection, \"{e}\"))"),
134            Error::Error(e) => write!(f, "Error(\"{e}\")"),
135        }
136    }
137}
138
139impl<E: std::error::Error + 'static> std::error::Error for Error<E> {
140    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
141        match self {
142            Error::ConnectionClosed => None,
143            Error::Close((_, e)) => Some(e),
144            Error::Error(e) => Some(e),
145        }
146    }
147}
148
149impl From<rusqlite::Error> for Error {
150    fn from(value: rusqlite::Error) -> Self {
151        Error::Error(value)
152    }
153}
154
155/// The result returned on method calls in this crate.
156pub type Result<T> = std::result::Result<T, Error>;
157
158type CallFn = Box<dyn FnOnce(&mut rusqlite::Connection) + Send + 'static>;
159
160enum Message {
161    Execute(CallFn),
162    Close(oneshot::Sender<std::result::Result<(), rusqlite::Error>>),
163}
164
165/// A handle to call functions in background thread.
166#[derive(Clone)]
167pub struct Connection {
168    sender: Sender<Message>,
169}
170
171impl Connection {
172    /// Open a new connection to a SQLite database.
173    ///
174    /// `Connection::open(path)` is equivalent to
175    /// `Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_WRITE |
176    /// OpenFlags::SQLITE_OPEN_CREATE)`.
177    ///
178    /// # Failure
179    ///
180    /// Will return `Err` if `path` cannot be converted to a C-compatible
181    /// string or if the underlying SQLite open call fails.
182    pub async fn open<P: AsRef<Path>>(path: P) -> std::result::Result<Self, rusqlite::Error> {
183        let path = path.as_ref().to_owned();
184        start(move || rusqlite::Connection::open(path)).await
185    }
186
187    /// Open a new connection to an in-memory SQLite database.
188    ///
189    /// # Failure
190    ///
191    /// Will return `Err` if the underlying SQLite open call fails.
192    pub async fn open_in_memory() -> std::result::Result<Self, rusqlite::Error> {
193        start(rusqlite::Connection::open_in_memory).await
194    }
195
196    /// Open a new connection to a SQLite database.
197    ///
198    /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
199    /// description of valid flag combinations.
200    ///
201    /// # Failure
202    ///
203    /// Will return `Err` if `path` cannot be converted to a C-compatible
204    /// string or if the underlying SQLite open call fails.
205    pub async fn open_with_flags<P: AsRef<Path>>(
206        path: P,
207        flags: OpenFlags,
208    ) -> std::result::Result<Self, rusqlite::Error> {
209        let path = path.as_ref().to_owned();
210        start(move || rusqlite::Connection::open_with_flags(path, flags)).await
211    }
212
213    /// Open a new connection to a SQLite database using the specific flags
214    /// and vfs name.
215    ///
216    /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
217    /// description of valid flag combinations.
218    ///
219    /// # Failure
220    ///
221    /// Will return `Err` if either `path` or `vfs` cannot be converted to a
222    /// C-compatible string or if the underlying SQLite open call fails.
223    pub async fn open_with_flags_and_vfs<P: AsRef<Path>>(
224        path: P,
225        flags: OpenFlags,
226        vfs: &str,
227    ) -> std::result::Result<Self, rusqlite::Error> {
228        let path = path.as_ref().to_owned();
229        let vfs = vfs.to_owned();
230        start(move || rusqlite::Connection::open_with_flags_and_vfs(path, flags, &*vfs)).await
231    }
232
233    /// Open a new connection to an in-memory SQLite database.
234    ///
235    /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
236    /// description of valid flag combinations.
237    ///
238    /// # Failure
239    ///
240    /// Will return `Err` if the underlying SQLite open call fails.
241    pub async fn open_in_memory_with_flags(
242        flags: OpenFlags,
243    ) -> std::result::Result<Self, rusqlite::Error> {
244        start(move || rusqlite::Connection::open_in_memory_with_flags(flags)).await
245    }
246
247    /// Open a new connection to an in-memory SQLite database using the
248    /// specific flags and vfs name.
249    ///
250    /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
251    /// description of valid flag combinations.
252    ///
253    /// # Failure
254    ///
255    /// Will return `Err` if `vfs` cannot be converted to a C-compatible
256    /// string or if the underlying SQLite open call fails.
257    pub async fn open_in_memory_with_flags_and_vfs(
258        flags: OpenFlags,
259        vfs: &str,
260    ) -> std::result::Result<Self, rusqlite::Error> {
261        let vfs = vfs.to_owned();
262        start(move || rusqlite::Connection::open_in_memory_with_flags_and_vfs(flags, &*vfs)).await
263    }
264
265    /// Call a function in background thread and get the result
266    /// asynchronously.
267    ///
268    /// # Failure
269    ///
270    /// Will return `Err` if the database connection has been closed.
271    /// Will return `Error::Error` wrapping the inner error if `function` failed.
272    pub async fn call<F, R, E>(&self, function: F) -> std::result::Result<R, Error<E>>
273    where
274        F: FnOnce(&mut rusqlite::Connection) -> std::result::Result<R, E> + 'static + Send,
275        R: Send + 'static,
276        E: Send + 'static,
277    {
278        self.call_raw(function)
279            .await
280            .map_err(|_| Error::ConnectionClosed)
281            .and_then(|result| result.map_err(Error::Error))
282    }
283
284    /// Call a function in background thread and get the result
285    /// asynchronously.
286    ///
287    /// # Failure
288    ///
289    /// Will return `Err` if the database connection has been closed.
290    pub async fn call_raw<F, R>(&self, function: F) -> Result<R>
291    where
292        F: FnOnce(&mut rusqlite::Connection) -> R + 'static + Send,
293        R: Send + 'static,
294    {
295        let (sender, receiver) = oneshot::channel::<R>();
296
297        self.sender
298            .send(Message::Execute(Box::new(move |conn| {
299                let value = function(conn);
300                let _ = sender.send(value);
301            })))
302            .map_err(|_| Error::ConnectionClosed)?;
303
304        receiver.await.map_err(|_| Error::ConnectionClosed)
305    }
306
307    /// Call a function in background thread and get the result
308    /// asynchronously.
309    ///
310    /// This method can cause a `panic` if the underlying database connection is closed.
311    /// it is a more user-friendly alternative to the [`Connection::call`] method.
312    /// It should be safe if the connection is never explicitly closed (using the [`Connection::close`] call).
313    ///
314    /// Calling this on a closed connection will cause a `panic`.
315    pub async fn call_unwrap<F, R>(&self, function: F) -> R
316    where
317        F: FnOnce(&mut rusqlite::Connection) -> R + Send + 'static,
318        R: Send + 'static,
319    {
320        let (sender, receiver) = oneshot::channel::<R>();
321
322        self.sender
323            .send(Message::Execute(Box::new(move |conn| {
324                let value = function(conn);
325                let _ = sender.send(value);
326            })))
327            .expect("database connection should be open");
328
329        receiver.await.expect(BUG_TEXT)
330    }
331
332    /// Close the database connection.
333    ///
334    /// This is functionally equivalent to the `Drop` implementation for
335    /// `Connection`. It consumes the `Connection`, but on error returns it
336    /// to the caller for retry purposes.
337    ///
338    /// If successful, any following `close` operations performed
339    /// on `Connection` copies will succeed immediately.
340    ///
341    /// On the other hand, any calls to [`Connection::call`] will return a [`Error::ConnectionClosed`],
342    /// and any calls to [`Connection::call_unwrap`] will cause a `panic`.
343    ///
344    /// # Failure
345    ///
346    /// Will return `Err` if the underlying SQLite close call fails.
347    pub async fn close(self) -> Result<()> {
348        let (sender, receiver) = oneshot::channel::<std::result::Result<(), rusqlite::Error>>();
349
350        if let Err(crossbeam_channel::SendError(_)) = self.sender.send(Message::Close(sender)) {
351            // If the channel is closed on the other side, it means the connection closed successfully
352            // This is a safeguard against calling close on a `Copy` of the connection
353            return Ok(());
354        }
355
356        let result = receiver.await;
357
358        if result.is_err() {
359            // If we get a RecvError at this point, it also means the channel closed in the meantime
360            // we can assume the connection is closed
361            return Ok(());
362        }
363
364        result.unwrap().map_err(|e| Error::Close((self, e)))
365    }
366}
367
368impl Debug for Connection {
369    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370        f.debug_struct("Connection").finish()
371    }
372}
373
374impl From<rusqlite::Connection> for Connection {
375    fn from(conn: rusqlite::Connection) -> Self {
376        let (sender, receiver) = crossbeam_channel::unbounded::<Message>();
377        thread::spawn(move || event_loop(conn, receiver));
378
379        Self { sender }
380    }
381}
382
383async fn start<F>(open: F) -> rusqlite::Result<Connection>
384where
385    F: FnOnce() -> rusqlite::Result<rusqlite::Connection> + Send + 'static,
386{
387    let (sender, receiver) = crossbeam_channel::unbounded::<Message>();
388    let (result_sender, result_receiver) = oneshot::channel();
389
390    thread::spawn(move || {
391        let conn = match open() {
392            Ok(c) => c,
393            Err(e) => {
394                let _ = result_sender.send(Err(e));
395                return;
396            }
397        };
398
399        if let Err(_e) = result_sender.send(Ok(())) {
400            return;
401        }
402
403        event_loop(conn, receiver);
404    });
405
406    result_receiver
407        .await
408        .expect(BUG_TEXT)
409        .map(|_| Connection { sender })
410}
411
412fn event_loop(mut conn: rusqlite::Connection, receiver: Receiver<Message>) {
413    while let Ok(message) = receiver.recv() {
414        match message {
415            Message::Execute(f) => f(&mut conn),
416            Message::Close(s) => {
417                let result = conn.close();
418
419                match result {
420                    Ok(v) => {
421                        s.send(Ok(v)).expect(BUG_TEXT);
422                        break;
423                    }
424                    Err((c, e)) => {
425                        conn = c;
426                        s.send(Err(e)).expect(BUG_TEXT);
427                    }
428                }
429            }
430        }
431    }
432}