tokio_rusqlite/lib.rs
1//! Asynchronous handle for rusqlite library.
2//!
3//! # Guide
4//!
5//! This library provides [`Connection`] struct. [`Connection`] struct is a handle
6//! to call functions in background thread and can be cloned cheaply.
7//! [`Connection::call`] method calls provided function in the background thread
8//! and returns its result asynchronously.
9//!
10//! # Design
11//!
12//! A thread is spawned for each opened connection handle. When `call` method
13//! is called: provided function is boxed, sent to the thread through mpsc
14//! channel and executed. Return value is then sent by oneshot channel from
15//! the thread and then returned from function.
16//!
17//! # Example
18//!
19//! ```rust,no_run
20//! use tokio_rusqlite::{params, Connection, Result};
21//!
22//! #[derive(Debug)]
23//! struct Person {
24//! id: i32,
25//! name: String,
26//! data: Option<Vec<u8>>,
27//! }
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<()> {
31//! let conn = Connection::open_in_memory().await?;
32//!
33//! let people = conn
34//! .call(|conn| {
35//! conn.execute(
36//! "CREATE TABLE person (
37//! id INTEGER PRIMARY KEY,
38//! name TEXT NOT NULL,
39//! data BLOB
40//! )",
41//! [],
42//! )?;
43//!
44//! let steven = Person {
45//! id: 1,
46//! name: "Steven".to_string(),
47//! data: None,
48//! };
49//!
50//! conn.execute(
51//! "INSERT INTO person (name, data) VALUES (?1, ?2)",
52//! params![steven.name, steven.data],
53//! )?;
54//!
55//! let mut stmt = conn.prepare("SELECT id, name, data FROM person")?;
56//! let people = stmt
57//! .query_map([], |row| {
58//! Ok(Person {
59//! id: row.get(0)?,
60//! name: row.get(1)?,
61//! data: row.get(2)?,
62//! })
63//! })?
64//! .collect::<std::result::Result<Vec<Person>, rusqlite::Error>>()?;
65//!
66//! Ok(people)
67//! })
68//! .await?;
69//!
70//! for person in people {
71//! println!("Found person {:?}", person);
72//! }
73//!
74//! Ok(())
75//! }
76//! ```
77
78#![forbid(unsafe_code)]
79#![warn(
80 clippy::await_holding_lock,
81 clippy::cargo_common_metadata,
82 clippy::dbg_macro,
83 clippy::empty_enum,
84 clippy::enum_glob_use,
85 clippy::inefficient_to_string,
86 clippy::mem_forget,
87 clippy::mutex_integer,
88 clippy::needless_continue,
89 clippy::todo,
90 clippy::unimplemented,
91 clippy::wildcard_imports,
92 future_incompatible,
93 missing_docs,
94 missing_debug_implementations,
95 unreachable_pub
96)]
97
98#[cfg(test)]
99mod tests;
100
101use crossbeam_channel::{Receiver, Sender};
102use std::{
103 fmt::{self, Debug, Display},
104 path::Path,
105 thread,
106};
107use tokio::sync::oneshot::{self};
108
109pub use rusqlite::{self, *};
110
111const BUG_TEXT: &str = "bug in tokio-rusqlite, please report";
112
113#[derive(Debug)]
114/// Represents the errors specific for this library.
115#[non_exhaustive]
116pub enum Error<E = rusqlite::Error> {
117 /// The connection to the SQLite has been closed and cannot be queried any more.
118 ConnectionClosed,
119
120 /// An error occured while closing the SQLite connection.
121 /// This `Error` variant contains the [`Connection`], which can be used to retry the close operation
122 /// and the underlying [`rusqlite::Error`] that made it impossile to close the database.
123 Close((Connection, rusqlite::Error)),
124
125 /// An application-specific error occured.
126 Error(E),
127}
128
129impl<E: Display> Display for Error<E> {
130 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
131 match self {
132 Error::ConnectionClosed => write!(f, "ConnectionClosed"),
133 Error::Close((_, e)) => write!(f, "Close((Connection, \"{e}\"))"),
134 Error::Error(e) => write!(f, "Error(\"{e}\")"),
135 }
136 }
137}
138
139impl<E: std::error::Error + 'static> std::error::Error for Error<E> {
140 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
141 match self {
142 Error::ConnectionClosed => None,
143 Error::Close((_, e)) => Some(e),
144 Error::Error(e) => Some(e),
145 }
146 }
147}
148
149impl From<rusqlite::Error> for Error {
150 fn from(value: rusqlite::Error) -> Self {
151 Error::Error(value)
152 }
153}
154
155/// The result returned on method calls in this crate.
156pub type Result<T> = std::result::Result<T, Error>;
157
158type CallFn = Box<dyn FnOnce(&mut rusqlite::Connection) + Send + 'static>;
159
160enum Message {
161 Execute(CallFn),
162 Close(oneshot::Sender<std::result::Result<(), rusqlite::Error>>),
163}
164
165/// A handle to call functions in background thread.
166#[derive(Clone)]
167pub struct Connection {
168 sender: Sender<Message>,
169}
170
171impl Connection {
172 /// Open a new connection to a SQLite database.
173 ///
174 /// `Connection::open(path)` is equivalent to
175 /// `Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_WRITE |
176 /// OpenFlags::SQLITE_OPEN_CREATE)`.
177 ///
178 /// # Failure
179 ///
180 /// Will return `Err` if `path` cannot be converted to a C-compatible
181 /// string or if the underlying SQLite open call fails.
182 pub async fn open<P: AsRef<Path>>(path: P) -> std::result::Result<Self, rusqlite::Error> {
183 let path = path.as_ref().to_owned();
184 start(move || rusqlite::Connection::open(path)).await
185 }
186
187 /// Open a new connection to an in-memory SQLite database.
188 ///
189 /// # Failure
190 ///
191 /// Will return `Err` if the underlying SQLite open call fails.
192 pub async fn open_in_memory() -> std::result::Result<Self, rusqlite::Error> {
193 start(rusqlite::Connection::open_in_memory).await
194 }
195
196 /// Open a new connection to a SQLite database.
197 ///
198 /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
199 /// description of valid flag combinations.
200 ///
201 /// # Failure
202 ///
203 /// Will return `Err` if `path` cannot be converted to a C-compatible
204 /// string or if the underlying SQLite open call fails.
205 pub async fn open_with_flags<P: AsRef<Path>>(
206 path: P,
207 flags: OpenFlags,
208 ) -> std::result::Result<Self, rusqlite::Error> {
209 let path = path.as_ref().to_owned();
210 start(move || rusqlite::Connection::open_with_flags(path, flags)).await
211 }
212
213 /// Open a new connection to a SQLite database using the specific flags
214 /// and vfs name.
215 ///
216 /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
217 /// description of valid flag combinations.
218 ///
219 /// # Failure
220 ///
221 /// Will return `Err` if either `path` or `vfs` cannot be converted to a
222 /// C-compatible string or if the underlying SQLite open call fails.
223 pub async fn open_with_flags_and_vfs<P: AsRef<Path>>(
224 path: P,
225 flags: OpenFlags,
226 vfs: &str,
227 ) -> std::result::Result<Self, rusqlite::Error> {
228 let path = path.as_ref().to_owned();
229 let vfs = vfs.to_owned();
230 start(move || rusqlite::Connection::open_with_flags_and_vfs(path, flags, &*vfs)).await
231 }
232
233 /// Open a new connection to an in-memory SQLite database.
234 ///
235 /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
236 /// description of valid flag combinations.
237 ///
238 /// # Failure
239 ///
240 /// Will return `Err` if the underlying SQLite open call fails.
241 pub async fn open_in_memory_with_flags(
242 flags: OpenFlags,
243 ) -> std::result::Result<Self, rusqlite::Error> {
244 start(move || rusqlite::Connection::open_in_memory_with_flags(flags)).await
245 }
246
247 /// Open a new connection to an in-memory SQLite database using the
248 /// specific flags and vfs name.
249 ///
250 /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
251 /// description of valid flag combinations.
252 ///
253 /// # Failure
254 ///
255 /// Will return `Err` if `vfs` cannot be converted to a C-compatible
256 /// string or if the underlying SQLite open call fails.
257 pub async fn open_in_memory_with_flags_and_vfs(
258 flags: OpenFlags,
259 vfs: &str,
260 ) -> std::result::Result<Self, rusqlite::Error> {
261 let vfs = vfs.to_owned();
262 start(move || rusqlite::Connection::open_in_memory_with_flags_and_vfs(flags, &*vfs)).await
263 }
264
265 /// Call a function in background thread and get the result
266 /// asynchronously.
267 ///
268 /// # Failure
269 ///
270 /// Will return `Err` if the database connection has been closed.
271 /// Will return `Error::Error` wrapping the inner error if `function` failed.
272 pub async fn call<F, R, E>(&self, function: F) -> std::result::Result<R, Error<E>>
273 where
274 F: FnOnce(&mut rusqlite::Connection) -> std::result::Result<R, E> + 'static + Send,
275 R: Send + 'static,
276 E: Send + 'static,
277 {
278 self.call_raw(function)
279 .await
280 .map_err(|_| Error::ConnectionClosed)
281 .and_then(|result| result.map_err(Error::Error))
282 }
283
284 /// Call a function in background thread and get the result
285 /// asynchronously.
286 ///
287 /// # Failure
288 ///
289 /// Will return `Err` if the database connection has been closed.
290 pub async fn call_raw<F, R>(&self, function: F) -> Result<R>
291 where
292 F: FnOnce(&mut rusqlite::Connection) -> R + 'static + Send,
293 R: Send + 'static,
294 {
295 let (sender, receiver) = oneshot::channel::<R>();
296
297 self.sender
298 .send(Message::Execute(Box::new(move |conn| {
299 let value = function(conn);
300 let _ = sender.send(value);
301 })))
302 .map_err(|_| Error::ConnectionClosed)?;
303
304 receiver.await.map_err(|_| Error::ConnectionClosed)
305 }
306
307 /// Call a function in background thread and get the result
308 /// asynchronously.
309 ///
310 /// This method can cause a `panic` if the underlying database connection is closed.
311 /// it is a more user-friendly alternative to the [`Connection::call`] method.
312 /// It should be safe if the connection is never explicitly closed (using the [`Connection::close`] call).
313 ///
314 /// Calling this on a closed connection will cause a `panic`.
315 pub async fn call_unwrap<F, R>(&self, function: F) -> R
316 where
317 F: FnOnce(&mut rusqlite::Connection) -> R + Send + 'static,
318 R: Send + 'static,
319 {
320 let (sender, receiver) = oneshot::channel::<R>();
321
322 self.sender
323 .send(Message::Execute(Box::new(move |conn| {
324 let value = function(conn);
325 let _ = sender.send(value);
326 })))
327 .expect("database connection should be open");
328
329 receiver.await.expect(BUG_TEXT)
330 }
331
332 /// Close the database connection.
333 ///
334 /// This is functionally equivalent to the `Drop` implementation for
335 /// `Connection`. It consumes the `Connection`, but on error returns it
336 /// to the caller for retry purposes.
337 ///
338 /// If successful, any following `close` operations performed
339 /// on `Connection` copies will succeed immediately.
340 ///
341 /// On the other hand, any calls to [`Connection::call`] will return a [`Error::ConnectionClosed`],
342 /// and any calls to [`Connection::call_unwrap`] will cause a `panic`.
343 ///
344 /// # Failure
345 ///
346 /// Will return `Err` if the underlying SQLite close call fails.
347 pub async fn close(self) -> Result<()> {
348 let (sender, receiver) = oneshot::channel::<std::result::Result<(), rusqlite::Error>>();
349
350 if let Err(crossbeam_channel::SendError(_)) = self.sender.send(Message::Close(sender)) {
351 // If the channel is closed on the other side, it means the connection closed successfully
352 // This is a safeguard against calling close on a `Copy` of the connection
353 return Ok(());
354 }
355
356 let result = receiver.await;
357
358 if result.is_err() {
359 // If we get a RecvError at this point, it also means the channel closed in the meantime
360 // we can assume the connection is closed
361 return Ok(());
362 }
363
364 result.unwrap().map_err(|e| Error::Close((self, e)))
365 }
366}
367
368impl Debug for Connection {
369 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
370 f.debug_struct("Connection").finish()
371 }
372}
373
374impl From<rusqlite::Connection> for Connection {
375 fn from(conn: rusqlite::Connection) -> Self {
376 let (sender, receiver) = crossbeam_channel::unbounded::<Message>();
377 thread::spawn(move || event_loop(conn, receiver));
378
379 Self { sender }
380 }
381}
382
383async fn start<F>(open: F) -> rusqlite::Result<Connection>
384where
385 F: FnOnce() -> rusqlite::Result<rusqlite::Connection> + Send + 'static,
386{
387 let (sender, receiver) = crossbeam_channel::unbounded::<Message>();
388 let (result_sender, result_receiver) = oneshot::channel();
389
390 thread::spawn(move || {
391 let conn = match open() {
392 Ok(c) => c,
393 Err(e) => {
394 let _ = result_sender.send(Err(e));
395 return;
396 }
397 };
398
399 if let Err(_e) = result_sender.send(Ok(())) {
400 return;
401 }
402
403 event_loop(conn, receiver);
404 });
405
406 result_receiver
407 .await
408 .expect(BUG_TEXT)
409 .map(|_| Connection { sender })
410}
411
412fn event_loop(mut conn: rusqlite::Connection, receiver: Receiver<Message>) {
413 while let Ok(message) = receiver.recv() {
414 match message {
415 Message::Execute(f) => f(&mut conn),
416 Message::Close(s) => {
417 let result = conn.close();
418
419 match result {
420 Ok(v) => {
421 s.send(Ok(v)).expect(BUG_TEXT);
422 break;
423 }
424 Err((c, e)) => {
425 conn = c;
426 s.send(Err(e)).expect(BUG_TEXT);
427 }
428 }
429 }
430 }
431 }
432}