tokio_rusqlite/lib.rs
1//! Asynchronous handle for rusqlite library.
2//!
3//! # Guide
4//!
5//! This library provides [`Connection`] struct. [`Connection`] struct is a handle
6//! to call functions in background thread and can be cloned cheaply.
7//! [`Connection::call`] method calls provided function in the background thread
8//! and returns its result asynchronously.
9//!
10//! # Design
11//!
12//! A thread is spawned for each opened connection handle. When `call` method
13//! is called: provided function is boxed, sent to the thread through mpsc
14//! channel and executed. Return value is then sent by oneshot channel from
15//! the thread and then returned from function.
16//!
17//! # Example
18//!
19//! ```rust,no_run
20//! use tokio_rusqlite::{params, Connection, Result};
21//!
22//! #[derive(Debug)]
23//! struct Person {
24//!     id: i32,
25//!     name: String,
26//!     data: Option<Vec<u8>>,
27//! }
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<()> {
31//!     let conn = Connection::open_in_memory().await?;
32//!
33//!     let people = conn
34//!         .call(|conn| {
35//!             conn.execute(
36//!                 "CREATE TABLE person (
37//!                     id    INTEGER PRIMARY KEY,
38//!                     name  TEXT NOT NULL,
39//!                     data  BLOB
40//!                 )",
41//!                 [],
42//!             )?;
43//!
44//!             let steven = Person {
45//!                 id: 1,
46//!                 name: "Steven".to_string(),
47//!                 data: None,
48//!             };
49//!
50//!             conn.execute(
51//!                 "INSERT INTO person (name, data) VALUES (?1, ?2)",
52//!                 params![steven.name, steven.data],
53//!             )?;
54//!
55//!             let mut stmt = conn.prepare("SELECT id, name, data FROM person")?;
56//!             let people = stmt
57//!                 .query_map([], |row| {
58//!                     Ok(Person {
59//!                         id: row.get(0)?,
60//!                         name: row.get(1)?,
61//!                         data: row.get(2)?,
62//!                     })
63//!                 })?
64//!                 .collect::<std::result::Result<Vec<Person>, rusqlite::Error>>()?;
65//!
66//!             Ok(people)
67//!         })
68//!         .await?;
69//!
70//!     for person in people {
71//!         println!("Found person {:?}", person);
72//!     }
73//!
74//!     Ok(())
75//! }
76//! ```
77
78#![forbid(unsafe_code)]
79#![warn(
80    clippy::await_holding_lock,
81    clippy::cargo_common_metadata,
82    clippy::dbg_macro,
83    clippy::empty_enum,
84    clippy::enum_glob_use,
85    clippy::inefficient_to_string,
86    clippy::mem_forget,
87    clippy::mutex_integer,
88    clippy::needless_continue,
89    clippy::todo,
90    clippy::unimplemented,
91    clippy::wildcard_imports,
92    future_incompatible,
93    missing_docs,
94    missing_debug_implementations,
95    unreachable_pub
96)]
97
98#[cfg(test)]
99mod tests;
100
101use crossbeam_channel::{Receiver, Sender};
102use std::{
103    fmt::{self, Debug, Display},
104    path::Path,
105    thread,
106};
107use tokio::sync::oneshot::{self};
108
109pub use rusqlite::*;
110
111const BUG_TEXT: &str = "bug in tokio-rusqlite, please report";
112
113#[derive(Debug)]
114/// Represents the errors specific for this library.
115#[non_exhaustive]
116pub enum Error {
117    /// The connection to the SQLite has been closed and cannot be queried any more.
118    ConnectionClosed,
119
120    /// An error occured while closing the SQLite connection.
121    /// This `Error` variant contains the [`Connection`], which can be used to retry the close operation
122    /// and the underlying [`rusqlite::Error`] that made it impossile to close the database.
123    Close((Connection, rusqlite::Error)),
124
125    /// A `Rusqlite` error occured.
126    Rusqlite(rusqlite::Error),
127
128    /// An application-specific error occured.
129    Other(Box<dyn std::error::Error + Send + Sync + 'static>),
130}
131
132impl Display for Error {
133    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134        match self {
135            Error::ConnectionClosed => write!(f, "ConnectionClosed"),
136            Error::Close((_, e)) => write!(f, "Close((Connection, \"{e}\"))"),
137            Error::Rusqlite(e) => write!(f, "Rusqlite(\"{e}\")"),
138            Error::Other(ref e) => write!(f, "Other(\"{e}\")"),
139        }
140    }
141}
142
143impl std::error::Error for Error {
144    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
145        match self {
146            Error::ConnectionClosed => None,
147            Error::Close((_, e)) => Some(e),
148            Error::Rusqlite(e) => Some(e),
149            Error::Other(ref e) => Some(&**e),
150        }
151    }
152}
153
154impl From<rusqlite::Error> for Error {
155    fn from(value: rusqlite::Error) -> Self {
156        Error::Rusqlite(value)
157    }
158}
159
160/// The result returned on method calls in this crate.
161pub type Result<T> = std::result::Result<T, Error>;
162
163type CallFn = Box<dyn FnOnce(&mut rusqlite::Connection) + Send + 'static>;
164
165enum Message {
166    Execute(CallFn),
167    Close(oneshot::Sender<std::result::Result<(), rusqlite::Error>>),
168}
169
170/// A handle to call functions in background thread.
171#[derive(Clone)]
172pub struct Connection {
173    sender: Sender<Message>,
174}
175
176impl Connection {
177    /// Open a new connection to a SQLite database.
178    ///
179    /// `Connection::open(path)` is equivalent to
180    /// `Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_WRITE |
181    /// OpenFlags::SQLITE_OPEN_CREATE)`.
182    ///
183    /// # Failure
184    ///
185    /// Will return `Err` if `path` cannot be converted to a C-compatible
186    /// string or if the underlying SQLite open call fails.
187    pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
188        let path = path.as_ref().to_owned();
189        start(move || rusqlite::Connection::open(path))
190            .await
191            .map_err(Error::Rusqlite)
192    }
193
194    /// Open a new connection to an in-memory SQLite database.
195    ///
196    /// # Failure
197    ///
198    /// Will return `Err` if the underlying SQLite open call fails.
199    pub async fn open_in_memory() -> Result<Self> {
200        start(rusqlite::Connection::open_in_memory)
201            .await
202            .map_err(Error::Rusqlite)
203    }
204
205    /// Open a new connection to a SQLite database.
206    ///
207    /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
208    /// description of valid flag combinations.
209    ///
210    /// # Failure
211    ///
212    /// Will return `Err` if `path` cannot be converted to a C-compatible
213    /// string or if the underlying SQLite open call fails.
214    pub async fn open_with_flags<P: AsRef<Path>>(path: P, flags: OpenFlags) -> Result<Self> {
215        let path = path.as_ref().to_owned();
216        start(move || rusqlite::Connection::open_with_flags(path, flags))
217            .await
218            .map_err(Error::Rusqlite)
219    }
220
221    /// Open a new connection to a SQLite database using the specific flags
222    /// and vfs name.
223    ///
224    /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
225    /// description of valid flag combinations.
226    ///
227    /// # Failure
228    ///
229    /// Will return `Err` if either `path` or `vfs` cannot be converted to a
230    /// C-compatible string or if the underlying SQLite open call fails.
231    pub async fn open_with_flags_and_vfs<P: AsRef<Path>>(
232        path: P,
233        flags: OpenFlags,
234        vfs: &str,
235    ) -> Result<Self> {
236        let path = path.as_ref().to_owned();
237        let vfs = vfs.to_owned();
238        start(move || rusqlite::Connection::open_with_flags_and_vfs(path, flags, &vfs))
239            .await
240            .map_err(Error::Rusqlite)
241    }
242
243    /// Open a new connection to an in-memory SQLite database.
244    ///
245    /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
246    /// description of valid flag combinations.
247    ///
248    /// # Failure
249    ///
250    /// Will return `Err` if the underlying SQLite open call fails.
251    pub async fn open_in_memory_with_flags(flags: OpenFlags) -> Result<Self> {
252        start(move || rusqlite::Connection::open_in_memory_with_flags(flags))
253            .await
254            .map_err(Error::Rusqlite)
255    }
256
257    /// Open a new connection to an in-memory SQLite database using the
258    /// specific flags and vfs name.
259    ///
260    /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
261    /// description of valid flag combinations.
262    ///
263    /// # Failure
264    ///
265    /// Will return `Err` if `vfs` cannot be converted to a C-compatible
266    /// string or if the underlying SQLite open call fails.
267    pub async fn open_in_memory_with_flags_and_vfs(flags: OpenFlags, vfs: &str) -> Result<Self> {
268        let vfs = vfs.to_owned();
269        start(move || rusqlite::Connection::open_in_memory_with_flags_and_vfs(flags, &vfs))
270            .await
271            .map_err(Error::Rusqlite)
272    }
273
274    /// Call a function in background thread and get the result
275    /// asynchronously.
276    ///
277    /// # Failure
278    ///
279    /// Will return `Err` if the database connection has been closed.
280    pub async fn call<F, R>(&self, function: F) -> Result<R>
281    where
282        F: FnOnce(&mut rusqlite::Connection) -> Result<R> + 'static + Send,
283        R: Send + 'static,
284    {
285        let (sender, receiver) = oneshot::channel::<Result<R>>();
286
287        self.sender
288            .send(Message::Execute(Box::new(move |conn| {
289                let value = function(conn);
290                let _ = sender.send(value);
291            })))
292            .map_err(|_| Error::ConnectionClosed)?;
293
294        receiver.await.map_err(|_| Error::ConnectionClosed)?
295    }
296
297    /// Call a function in background thread and get the result
298    /// asynchronously.
299    ///
300    /// This method can cause a `panic` if the underlying database connection is closed.
301    /// it is a more user-friendly alternative to the [`Connection::call`] method.
302    /// It should be safe if the connection is never explicitly closed (using the [`Connection::close`] call).
303    ///
304    /// Calling this on a closed connection will cause a `panic`.
305    pub async fn call_unwrap<F, R>(&self, function: F) -> R
306    where
307        F: FnOnce(&mut rusqlite::Connection) -> R + Send + 'static,
308        R: Send + 'static,
309    {
310        let (sender, receiver) = oneshot::channel::<R>();
311
312        self.sender
313            .send(Message::Execute(Box::new(move |conn| {
314                let value = function(conn);
315                let _ = sender.send(value);
316            })))
317            .expect("database connection should be open");
318
319        receiver.await.expect(BUG_TEXT)
320    }
321
322    /// Close the database connection.
323    ///
324    /// This is functionally equivalent to the `Drop` implementation for
325    /// `Connection`. It consumes the `Connection`, but on error returns it
326    /// to the caller for retry purposes.
327    ///
328    /// If successful, any following `close` operations performed
329    /// on `Connection` copies will succeed immediately.
330    ///
331    /// On the other hand, any calls to [`Connection::call`] will return a [`Error::ConnectionClosed`],
332    /// and any calls to [`Connection::call_unwrap`] will cause a `panic`.
333    ///
334    /// # Failure
335    ///
336    /// Will return `Err` if the underlying SQLite close call fails.
337    pub async fn close(self) -> Result<()> {
338        let (sender, receiver) = oneshot::channel::<std::result::Result<(), rusqlite::Error>>();
339
340        if let Err(crossbeam_channel::SendError(_)) = self.sender.send(Message::Close(sender)) {
341            // If the channel is closed on the other side, it means the connection closed successfully
342            // This is a safeguard against calling close on a `Copy` of the connection
343            return Ok(());
344        }
345
346        let result = receiver.await;
347
348        if result.is_err() {
349            // If we get a RecvError at this point, it also means the channel closed in the meantime
350            // we can assume the connection is closed
351            return Ok(());
352        }
353
354        result.unwrap().map_err(|e| Error::Close((self, e)))
355    }
356}
357
358impl Debug for Connection {
359    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360        f.debug_struct("Connection").finish()
361    }
362}
363
364impl From<rusqlite::Connection> for Connection {
365    fn from(conn: rusqlite::Connection) -> Self {
366        let (sender, receiver) = crossbeam_channel::unbounded::<Message>();
367        thread::spawn(move || event_loop(conn, receiver));
368
369        Self { sender }
370    }
371}
372
373async fn start<F>(open: F) -> rusqlite::Result<Connection>
374where
375    F: FnOnce() -> rusqlite::Result<rusqlite::Connection> + Send + 'static,
376{
377    let (sender, receiver) = crossbeam_channel::unbounded::<Message>();
378    let (result_sender, result_receiver) = oneshot::channel();
379
380    thread::spawn(move || {
381        let conn = match open() {
382            Ok(c) => c,
383            Err(e) => {
384                let _ = result_sender.send(Err(e));
385                return;
386            }
387        };
388
389        if let Err(_e) = result_sender.send(Ok(())) {
390            return;
391        }
392
393        event_loop(conn, receiver);
394    });
395
396    result_receiver
397        .await
398        .expect(BUG_TEXT)
399        .map(|_| Connection { sender })
400}
401
402fn event_loop(mut conn: rusqlite::Connection, receiver: Receiver<Message>) {
403    while let Ok(message) = receiver.recv() {
404        match message {
405            Message::Execute(f) => f(&mut conn),
406            Message::Close(s) => {
407                let result = conn.close();
408
409                match result {
410                    Ok(v) => {
411                        s.send(Ok(v)).expect(BUG_TEXT);
412                        break;
413                    }
414                    Err((c, e)) => {
415                        conn = c;
416                        s.send(Err(e)).expect(BUG_TEXT);
417                    }
418                }
419            }
420        }
421    }
422}