tokio_rusqlite_new/lib.rs
1#![doc = include_str!("../README.md")]
2#![cfg_attr(docsrs, feature(doc_cfg))]
3#![forbid(unsafe_code)]
4#![warn(missing_docs)]
5
6use std::fmt::Debug;
7
8use crossbeam_channel::{Receiver, Sender};
9use tokio::sync::oneshot;
10
11mod error;
12pub use error::*;
13
14enum Message {
15 Execute(Box<dyn FnOnce(&mut rusqlite::Connection) + Send + 'static>),
16 Close(oneshot::Sender<Result<(), rusqlite::Error>>),
17}
18
19/// A handle to call functions in background thread.
20#[derive(Clone)]
21pub struct Connection {
22 sender: Sender<Message>,
23}
24
25impl Debug for Connection {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 f.debug_struct("Connection").finish_non_exhaustive()
28 }
29}
30
31fn event_loop(mut conn: rusqlite::Connection, receiver: Receiver<Message>) {
32 while let Ok(message) = receiver.recv() {
33 match message {
34 Message::Execute(f) => f(&mut conn),
35 Message::Close(s) => {
36 let result = conn.close();
37
38 match result {
39 Ok(v) => {
40 s.send(Ok(v)).expect(BUG_TEXT);
41 break;
42 }
43 Err((c, e)) => {
44 conn = c;
45 s.send(Err(e)).expect(BUG_TEXT);
46 }
47 }
48 }
49 }
50 }
51}
52
53impl Connection {
54 /// Create a new connection from an existing SQLite connection.
55 pub fn new(conn: rusqlite::Connection) -> Self {
56 let (sender, receiver) = crossbeam_channel::unbounded();
57 std::thread::spawn(move || event_loop(conn, receiver));
58 Self { sender }
59 }
60
61 /// Call a function in background thread and get the result
62 /// asynchronously.
63 ///
64 /// # Failure
65 ///
66 /// Will return `Err` if the database connection has been closed.
67 /// Will return `Error::Error` wrapping the inner error if `function` failed.
68 pub async fn call<F, R, E>(&self, function: F) -> Result<R, Error<E>>
69 where
70 F: FnOnce(&mut rusqlite::Connection) -> Result<R, E> + Send + 'static,
71 R: Send + 'static,
72 E: Send + 'static,
73 {
74 self.call_raw(function)
75 .await
76 .map_err(|_| Error::ConnectionClosed)
77 .and_then(|result| result.map_err(Error::Error))
78 }
79
80 /// Call a function in background thread and get the result asynchronously.
81 ///
82 /// # Failure
83 ///
84 /// Will return `Err` if the database connection has been closed.
85 pub async fn call_raw<F, R>(&self, function: F) -> Result<R>
86 where
87 F: FnOnce(&mut rusqlite::Connection) -> R + Send + 'static,
88 R: Send + 'static,
89 {
90 let (sender, receiver) = oneshot::channel::<R>();
91
92 self.sender
93 .send(Message::Execute(Box::new(move |conn| {
94 let value = function(conn);
95 let _ = sender.send(value);
96 })))
97 .map_err(|_| Error::ConnectionClosed)?;
98
99 receiver.await.map_err(|_| Error::ConnectionClosed)
100 }
101
102 /// Call a function in background thread and get the result
103 /// asynchronously.
104 ///
105 /// This method can cause a `panic` if the underlying database connection is closed.
106 /// It is a more user-friendly alternative to the [`Connection::call`] method.
107 /// It should be safe if the connection is never explicitly closed (using the [`Connection::close`] call).
108 ///
109 /// Calling this on a closed connection will cause a `panic`.
110 pub async fn call_unwrap<F, R>(&self, function: F) -> R
111 where
112 F: FnOnce(&mut rusqlite::Connection) -> R + Send + 'static,
113 R: Send + 'static,
114 {
115 let (sender, receiver) = oneshot::channel::<R>();
116
117 self.sender
118 .send(Message::Execute(Box::new(move |conn| {
119 let value = function(conn);
120 let _ = sender.send(value);
121 })))
122 .expect("database connection should be open");
123
124 receiver.await.expect(BUG_TEXT)
125 }
126
127 /// Close the database connection.
128 ///
129 /// This is functionally equivalent to the `Drop` implementation for
130 /// `Connection`. It consumes the `Connection`, but on error returns it
131 /// to the caller for retry purposes.
132 ///
133 /// If successful, any following `close` operations performed
134 /// on `Connection` copies will succeed immediately.
135 ///
136 /// On the other hand, any calls to [`Connection::call`] will return a [`Error::ConnectionClosed`],
137 /// and any calls to [`Connection::call_unwrap`] will cause a `panic`.
138 ///
139 /// # Failure
140 ///
141 /// Will return `Err` if the underlying SQLite close call fails.
142 pub async fn close(self) -> Result<()> {
143 let (sender, receiver) = oneshot::channel::<Result<(), rusqlite::Error>>();
144
145 if let Err(crossbeam_channel::SendError(_)) = self.sender.send(Message::Close(sender)) {
146 // If the channel is closed on the other side, it means the connection closed successfully
147 // This is a safeguard against calling close on a `Copy` of the connection
148 return Ok(());
149 }
150
151 match receiver.await {
152 Ok(result) => result.map_err(|e| Error::Close((self, e))),
153 Err(_) => {
154 // If we get a RecvError at this point, it also means the channel closed in the meantime
155 // we can assume the connection is closed
156 Ok(())
157 }
158 }
159 }
160}
161
162impl From<rusqlite::Connection> for Connection {
163 fn from(conn: rusqlite::Connection) -> Self {
164 Self::new(conn)
165 }
166}
167
168use std::path::Path;
169use rusqlite::{Name, OpenFlags};
170
171async fn start<F>(open: F) -> Result<Connection>
172where
173 F: FnOnce() -> rusqlite::Result<rusqlite::Connection> + Send + 'static,
174{
175 let (sender, receiver) = crossbeam_channel::unbounded::<Message>();
176 let (opener_sender, opener_receiver) = oneshot::channel();
177 std::thread::spawn(move || match open() {
178 Ok(conn) => {
179 if let Err(_) = opener_sender.send(Ok(())) { return; }
180 event_loop(conn, receiver);
181 },
182 Err(e) => {
183 let _ = opener_sender.send(Err(e));
184 },
185 });
186 opener_receiver
187 .await
188 .expect(BUG_TEXT)
189 .map(|_| Connection { sender })
190 .map_err(Error::Error)
191}
192
193impl Connection {
194 /// Open a new connection to an SQLite database.
195 ///
196 /// `Connection::open(path)` is equivalent to
197 /// `Connection::open_with_flags(path, OpenFlags::SQLITE_OPEN_READ_WRITE |
198 /// OpenFlags::SQLITE_OPEN_CREATE)`.
199 ///
200 /// # Failure
201 ///
202 /// Will return `Err` if `path` cannot be converted to a C-compatible
203 /// string or if the underlying SQLite open call fails.
204 pub async fn open<P: AsRef<Path>>(path: P) -> Result<Connection> {
205 let path = path.as_ref().to_owned();
206 start(move || rusqlite::Connection::open(path)).await
207 }
208
209 /// Open a new connection to an in-memory SQLite database.
210 ///
211 /// # Failure
212 ///
213 /// Will return `Err` if the underlying SQLite open call fails.
214 pub async fn open_in_memory() -> Result<Connection> {
215 start(rusqlite::Connection::open_in_memory).await
216 }
217
218 /// Open a new connection to an SQLite database.
219 ///
220 /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
221 /// description of valid flag combinations.
222 ///
223 /// # Failure
224 ///
225 /// Will return `Err` if `path` cannot be converted to a C-compatible
226 /// string or if the underlying SQLite open call fails.
227 pub async fn open_with_flags<P: AsRef<Path>>(path: P, flags: OpenFlags) -> Result<Self> {
228 let path = path.as_ref().to_owned();
229 start(move || rusqlite::Connection::open_with_flags(path, flags)).await
230 }
231
232 /// Open a new connection to an SQLite database using the specific flags
233 /// and vfs name.
234 ///
235 /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
236 /// description of valid flag combinations.
237 ///
238 /// # Failure
239 ///
240 /// Will return `Err` if either `path` or `vfs` cannot be converted to a
241 /// C-compatible string or if the underlying SQLite open call fails.
242 pub async fn open_with_flags_and_vfs<P: AsRef<Path>>(path: P, flags: OpenFlags, vfs: impl Name) -> Result<Self> {
243 let path = path.as_ref().to_owned();
244 let vfs = vfs.as_cstr()?.to_owned();
245 start(move || rusqlite::Connection::open_with_flags_and_vfs(path, flags, vfs.as_c_str())).await
246 }
247
248 /// Open a new connection to an in-memory SQLite database.
249 ///
250 /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
251 /// description of valid flag combinations.
252 ///
253 /// # Failure
254 ///
255 /// Will return `Err` if the underlying SQLite open call fails.
256 pub async fn open_in_memory_with_flags(flags: OpenFlags) -> Result<Self> {
257 start(move || rusqlite::Connection::open_in_memory_with_flags(flags)).await
258 }
259
260 /// Open a new connection to an in-memory SQLite database using the
261 /// specific flags and vfs name.
262 ///
263 /// [Database Connection](http://www.sqlite.org/c3ref/open.html) for a
264 /// description of valid flag combinations.
265 ///
266 /// # Failure
267 ///
268 /// Will return `Err` if `vfs` cannot be converted to a C-compatible
269 /// string or if the underlying SQLite open call fails.
270 pub async fn open_in_memory_with_flags_and_vfs(flags: OpenFlags, vfs: impl Name) -> Result<Self> {
271 let vfs = vfs.as_cstr()?.to_owned();
272 start(move || rusqlite::Connection::open_in_memory_with_flags_and_vfs(flags, vfs.as_c_str())).await
273 }
274}