tokio_rusqlite/lib.rs
1//! Asynchronous handle for rusqlite library.
2//!
3//! # Guide
4//!
5//! This library provides [`Connection`] struct. [`Connection`] struct is a handle
6//! to call functions in background thread and can be cloned cheaply.
7//! [`Connection::call`] method calls provided function in the background thread
8//! and returns its result asynchronously.
9//!
10//! # Design
11//!
12//! A thread is spawned for each opened connection handle. When `call` method
13//! is called: provided function is boxed, sent to the thread through mpsc
14//! channel and executed. Return value is then sent by oneshot channel from
15//! the thread and then returned from function.
16//!
17//! # Example
18//!
19//! ```rust,no_run
20//! use tokio_rusqlite::{params, Connection, Result};
21//!
22//! #[derive(Debug)]
23//! struct Person {
24//! id: i32,
25//! name: String,
26//! data: Option<Vec<u8>>,
27//! }
28//!
29//! #[tokio::main]
30//! async fn main() -> Result<()> {
31//! let conn = Connection::open_in_memory().await?;
32//!
33//! let people = conn
34//! .call(|conn| {
35//! conn.execute(
36//! "CREATE TABLE person (
37//! id INTEGER PRIMARY KEY,
38//! name TEXT NOT NULL,
39//! data BLOB
40//! )",
41//! [],
42//! )?;
43//!
44//! let steven = Person {
45//! id: 1,
46//! name: "Steven".to_string(),
47//! data: None,
48//! };
49//!
50//! conn.execute(
51//! "INSERT INTO person (name, data) VALUES (?1, ?2)",
52//! params![steven.name, steven.data],
53//! )?;
54//!
55//! let mut stmt = conn.prepare("SELECT id, name, data FROM person")?;
56//! let people = stmt
57//! .query_map([], |row| {
58//! Ok(Person {
59//! id: row.get(0)?,
60//! name: row.get(1)?,
61//! data: row.get(2)?,
62//! })
63//! })?
64//! .collect::<std::result::Result<Vec<Person>, rusqlite::Error>>()?;
65//!
66//! Ok(people)
67//! })
68//! .await?;
69//!
70//! for person in people {
71//! println!("Found person {:?}", person);
72//! }
73//!
74//! Ok(())
75//! }
76//! ```
77
78#![forbid(unsafe_code)]
79#![warn(
80 clippy::await_holding_lock,
81 clippy::cargo_common_metadata,
82 clippy::dbg_macro,
83 clippy::empty_enum,
84 clippy::enum_glob_use,
85 clippy::inefficient_to_string,
86 clippy::mem_forget,
87 clippy::mutex_integer,
88 clippy::needless_continue,
89 clippy::todo,
90 clippy::unimplemented,
91 clippy::wildcard_imports,
92 future_incompatible,
93 missing_docs,
94 missing_debug_implementations,
95 unreachable_pub
96)]
97
98#[cfg(test)]
99mod tests;
100
101use crossbeam_channel::{Receiver, Sender};
102use std::{
103 fmt::{self, Debug, Display},
104 path::Path,
105 thread,
106};
107use tokio::sync::oneshot::{self};
108
109pub use rusqlite::*;
110
111const BUG_TEXT: &str = "bug in tokio-rusqlite, please report";
112
113#[derive(Debug)]
114/// Represents the errors specific for this library.
115#[non_exhaustive]
116pub enum Error {
117 /// The connection to the SQLite has been closed and cannot be queried any more.
118 ConnectionClosed,
119
120 /// An error occured while closing the SQLite connection.
121 /// This `Error` variant contains the [`Connection`], which can be used to retry the close operation
122 /// and the underlying [`rusqlite::Error`] that made it impossile to close the database.
123 Close((Connection, rusqlite::Error)),
124
125 /// A `Rusqlite` error occured.
126 Rusqlite(rusqlite::Error),
127
128 /// An application-specific error occured.
129 Other(Box<dyn std::error::Error + Send + Sync + 'static>),
130}
131
132impl Display for Error {
133 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134 match self {
135 Error::ConnectionClosed => write!(f, "ConnectionClosed"),
136 Error::Close((_, e)) => write!(f, "Close((Connection, \"{e}\"))"),
137 Error::Rusqlite(e) => write!(f, "Rusqlite(\"{e}\")"),
138 Error::Other(ref e) => write!(f, "Other(\"{e}\")"),
139 }
140 }
141}
142
143impl std::error::Error for Error {
144 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
145 match self {
146 Error::ConnectionClosed => None,
147 Error::Close((_, e)) => Some(e),
148 Error::Rusqlite(e) => Some(e),
149 Error::Other(ref e) => Some(&**e),
150 }
151 }
152}
153
154impl From<rusqlite::Error> for Error {
155 fn from(value: rusqlite::Error) -> Self {
156 Error::Rusqlite(value)
157 }
158}
159
160/// The result returned on method calls in this crate.
161pub type Result<T> = std::result::Result<T, Error>;
162
163type CallFn = Box<dyn FnOnce(&mut rusqlite::Connection) + Send + 'static>;
164
165enum Message {
166 Execute(CallFn),
167 Close(oneshot::Sender<std::result::Result<(), rusqlite::Error>>),
168}
169
170/// A handle to call functions in background thread.
171#[derive(Clone)]
172pub struct Connection {
173 sender: Sender<Message>,
174}
175
176impl Connection {
177 /// Open a new connection to a SQLite database.
178 ///
179 /// `Connection::open(path)` is equivalent to
180 /// `Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_WRITE |
181 /// OpenFlags::SQLITE_OPEN_CREATE)`.
182 ///
183 /// # Failure
184 ///
185 /// Will return `Err` if `path` cannot be converted to a C-compatible
186 /// string or if the underlying SQLite open call fails.
187 pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
188 let path = path.as_ref().to_owned();
189 start(move || rusqlite::Connection::open(path))
190 .await
191 .map_err(Error::Rusqlite)
192 }
193
194 /// Open a new connection to an in-memory SQLite database.
195 ///
196 /// # Failure
197 ///
198 /// Will return `Err` if the underlying SQLite open call fails.
199 pub async fn open_in_memory() -> Result<Self> {
200 start(rusqlite::Connection::open_in_memory)
201 .await
202 .map_err(Error::Rusqlite)
203 }
204
205 /// Open a new connection to a SQLite database.
206 ///
207 /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
208 /// description of valid flag combinations.
209 ///
210 /// # Failure
211 ///
212 /// Will return `Err` if `path` cannot be converted to a C-compatible
213 /// string or if the underlying SQLite open call fails.
214 pub async fn open_with_flags<P: AsRef<Path>>(path: P, flags: OpenFlags) -> Result<Self> {
215 let path = path.as_ref().to_owned();
216 start(move || rusqlite::Connection::open_with_flags(path, flags))
217 .await
218 .map_err(Error::Rusqlite)
219 }
220
221 /// Open a new connection to a SQLite database using the specific flags
222 /// and vfs name.
223 ///
224 /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
225 /// description of valid flag combinations.
226 ///
227 /// # Failure
228 ///
229 /// Will return `Err` if either `path` or `vfs` cannot be converted to a
230 /// C-compatible string or if the underlying SQLite open call fails.
231 pub async fn open_with_flags_and_vfs<P: AsRef<Path>>(
232 path: P,
233 flags: OpenFlags,
234 vfs: &str,
235 ) -> Result<Self> {
236 let path = path.as_ref().to_owned();
237 let vfs = vfs.to_owned();
238 start(move || rusqlite::Connection::open_with_flags_and_vfs(path, flags, &vfs))
239 .await
240 .map_err(Error::Rusqlite)
241 }
242
243 /// Open a new connection to an in-memory SQLite database.
244 ///
245 /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
246 /// description of valid flag combinations.
247 ///
248 /// # Failure
249 ///
250 /// Will return `Err` if the underlying SQLite open call fails.
251 pub async fn open_in_memory_with_flags(flags: OpenFlags) -> Result<Self> {
252 start(move || rusqlite::Connection::open_in_memory_with_flags(flags))
253 .await
254 .map_err(Error::Rusqlite)
255 }
256
257 /// Open a new connection to an in-memory SQLite database using the
258 /// specific flags and vfs name.
259 ///
260 /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
261 /// description of valid flag combinations.
262 ///
263 /// # Failure
264 ///
265 /// Will return `Err` if `vfs` cannot be converted to a C-compatible
266 /// string or if the underlying SQLite open call fails.
267 pub async fn open_in_memory_with_flags_and_vfs(flags: OpenFlags, vfs: &str) -> Result<Self> {
268 let vfs = vfs.to_owned();
269 start(move || rusqlite::Connection::open_in_memory_with_flags_and_vfs(flags, &vfs))
270 .await
271 .map_err(Error::Rusqlite)
272 }
273
274 /// Call a function in background thread and get the result
275 /// asynchronously.
276 ///
277 /// # Failure
278 ///
279 /// Will return `Err` if the database connection has been closed.
280 pub async fn call<F, R>(&self, function: F) -> Result<R>
281 where
282 F: FnOnce(&mut rusqlite::Connection) -> Result<R> + 'static + Send,
283 R: Send + 'static,
284 {
285 let (sender, receiver) = oneshot::channel::<Result<R>>();
286
287 self.sender
288 .send(Message::Execute(Box::new(move |conn| {
289 let value = function(conn);
290 let _ = sender.send(value);
291 })))
292 .map_err(|_| Error::ConnectionClosed)?;
293
294 receiver.await.map_err(|_| Error::ConnectionClosed)?
295 }
296
297 /// Call a function in background thread and get the result
298 /// asynchronously.
299 ///
300 /// This method can cause a `panic` if the underlying database connection is closed.
301 /// it is a more user-friendly alternative to the [`Connection::call`] method.
302 /// It should be safe if the connection is never explicitly closed (using the [`Connection::close`] call).
303 ///
304 /// Calling this on a closed connection will cause a `panic`.
305 pub async fn call_unwrap<F, R>(&self, function: F) -> R
306 where
307 F: FnOnce(&mut rusqlite::Connection) -> R + Send + 'static,
308 R: Send + 'static,
309 {
310 let (sender, receiver) = oneshot::channel::<R>();
311
312 self.sender
313 .send(Message::Execute(Box::new(move |conn| {
314 let value = function(conn);
315 let _ = sender.send(value);
316 })))
317 .expect("database connection should be open");
318
319 receiver.await.expect(BUG_TEXT)
320 }
321
322 /// Close the database connection.
323 ///
324 /// This is functionally equivalent to the `Drop` implementation for
325 /// `Connection`. It consumes the `Connection`, but on error returns it
326 /// to the caller for retry purposes.
327 ///
328 /// If successful, any following `close` operations performed
329 /// on `Connection` copies will succeed immediately.
330 ///
331 /// On the other hand, any calls to [`Connection::call`] will return a [`Error::ConnectionClosed`],
332 /// and any calls to [`Connection::call_unwrap`] will cause a `panic`.
333 ///
334 /// # Failure
335 ///
336 /// Will return `Err` if the underlying SQLite close call fails.
337 pub async fn close(self) -> Result<()> {
338 let (sender, receiver) = oneshot::channel::<std::result::Result<(), rusqlite::Error>>();
339
340 if let Err(crossbeam_channel::SendError(_)) = self.sender.send(Message::Close(sender)) {
341 // If the channel is closed on the other side, it means the connection closed successfully
342 // This is a safeguard against calling close on a `Copy` of the connection
343 return Ok(());
344 }
345
346 let result = receiver.await;
347
348 if result.is_err() {
349 // If we get a RecvError at this point, it also means the channel closed in the meantime
350 // we can assume the connection is closed
351 return Ok(());
352 }
353
354 result.unwrap().map_err(|e| Error::Close((self, e)))
355 }
356}
357
358impl Debug for Connection {
359 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360 f.debug_struct("Connection").finish()
361 }
362}
363
364impl From<rusqlite::Connection> for Connection {
365 fn from(conn: rusqlite::Connection) -> Self {
366 let (sender, receiver) = crossbeam_channel::unbounded::<Message>();
367 thread::spawn(move || event_loop(conn, receiver));
368
369 Self { sender }
370 }
371}
372
373async fn start<F>(open: F) -> rusqlite::Result<Connection>
374where
375 F: FnOnce() -> rusqlite::Result<rusqlite::Connection> + Send + 'static,
376{
377 let (sender, receiver) = crossbeam_channel::unbounded::<Message>();
378 let (result_sender, result_receiver) = oneshot::channel();
379
380 thread::spawn(move || {
381 let conn = match open() {
382 Ok(c) => c,
383 Err(e) => {
384 let _ = result_sender.send(Err(e));
385 return;
386 }
387 };
388
389 if let Err(_e) = result_sender.send(Ok(())) {
390 return;
391 }
392
393 event_loop(conn, receiver);
394 });
395
396 result_receiver
397 .await
398 .expect(BUG_TEXT)
399 .map(|_| Connection { sender })
400}
401
402fn event_loop(mut conn: rusqlite::Connection, receiver: Receiver<Message>) {
403 while let Ok(message) = receiver.recv() {
404 match message {
405 Message::Execute(f) => f(&mut conn),
406 Message::Close(s) => {
407 let result = conn.close();
408
409 match result {
410 Ok(v) => {
411 s.send(Ok(v)).expect(BUG_TEXT);
412 break;
413 }
414 Err((c, e)) => {
415 conn = c;
416 s.send(Err(e)).expect(BUG_TEXT);
417 }
418 }
419 }
420 }
421 }
422}