tokio_sqlite/
sqlite.rs

1use std::marker::PhantomData;
2use std::path::Path;
3
4use tokio::sync::oneshot;
5
6use super::connection::{ConnectionClient, ConnectionTask};
7use super::query::QueryClient;
8use super::transaction::TransactionClient;
9
10pub type Error = rusqlite::Error;
11
12pub type Value = rusqlite::types::Value;
13
14#[derive(Clone, Debug)]
15pub struct Row {
16    pub(super) values: Vec<Value>,
17}
18
19impl Row {
20    pub fn values(&self) -> &[Value] {
21        &self.values
22    }
23
24    pub fn into_values(self) -> Vec<Value> {
25        self.values
26    }
27}
28
29/// A result of executing the statement without the resulting query rows.
30#[derive(Clone, Debug, Default)]
31pub struct Status {
32    pub(super) rows_affected: usize,
33    pub(super) last_insert_id: Option<i64>,
34}
35
36impl Status {
37    pub fn rows_affected(&self) -> usize {
38        self.rows_affected
39    }
40
41    pub fn last_insert_id(&self) -> Option<i64> {
42        self.last_insert_id
43    }
44}
45
46/// An asynchronous stream of resulting query rows.
47pub struct Rows<'a> {
48    tx: QueryClient,
49    _phantom: PhantomData<&'a ()>,
50}
51
52impl<'a> Rows<'a> {
53    pub fn columns(&self) -> &[String] {
54        self.tx.columns()
55    }
56
57    pub async fn next(&mut self) -> Option<Result<Row, Error>> {
58        self.tx.next().await
59    }
60}
61
62impl<'a> Drop for Rows<'a> {
63    fn drop(&mut self) {}
64}
65
66/// An asynchronous SQLite database transaction.
67pub struct Transaction<'a> {
68    tx: TransactionClient,
69    _phantom: PhantomData<&'a ()>,
70}
71
72impl<'a> Transaction<'a> {
73    /// Consumes the transaction, committing all changes made within it.
74    pub async fn commit(mut self) -> Result<(), Error> {
75        self.tx.commit().await
76    }
77
78    /// Rolls the transaction back, discarding all changes made within it.
79    pub async fn rollback(mut self) -> Result<(), Error> {
80        self.tx.rollback().await
81    }
82
83    /// Executes a statement that does not return the resulting rows.
84    ///
85    /// Returns an error if the query returns resulting rows.
86    pub async fn execute<S, A>(&mut self, statement: S, arguments: A) -> Result<Status, Error>
87    where
88        S: Into<String>,
89        A: Into<Vec<Value>>,
90    {
91        self.tx.execute(statement.into(), arguments.into()).await
92    }
93
94    /// Executes a statement that returns the resulting query rows.
95    pub async fn query<S, A>(&mut self, statement: S, arguments: A) -> Result<Rows<'_>, Error>
96    where
97        S: Into<String>,
98        A: Into<Vec<Value>>,
99    {
100        let tx = self.tx.query(statement.into(), arguments.into()).await?;
101        Ok(Rows {
102            tx,
103            _phantom: PhantomData,
104        })
105    }
106
107    /// Executes a statement that returns zero or one resulting query row.
108    ///
109    /// Returns an error if the query returns more than one row.
110    pub async fn query_row<S, A>(
111        &mut self,
112        statement: S,
113        arguments: A,
114    ) -> Result<Option<Row>, Error>
115    where
116        S: Into<String>,
117        A: Into<Vec<Value>>,
118    {
119        let mut rows = self.query(statement, arguments).await?;
120        let row = match rows.next().await {
121            Some(v) => v?,
122            None => return Ok(None),
123        };
124        if let Some(_) = rows.next().await {
125            return Err(Error::QueryReturnedNoRows);
126        }
127        Ok(Some(row))
128    }
129}
130
131impl<'a> Drop for Transaction<'a> {
132    fn drop(&mut self) {}
133}
134
135/// An asynchronous SQLite client.
136pub struct Connection {
137    tx: Option<ConnectionClient>,
138    #[cfg(feature = "rt-multi-thread")]
139    handle: Option<tokio::task::JoinHandle<()>>,
140    #[cfg(not(feature = "rt-multi-thread"))]
141    handle: Option<std::thread::JoinHandle<()>>,
142}
143
144impl Connection {
145    /// Opens a new connection to a SQLite database.
146    pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
147        let task = ConnectionTask::new(path.as_ref().to_owned());
148        let (tx, rx) = oneshot::channel();
149        #[cfg(feature = "rt-multi-thread")]
150        let handle = tokio::task::spawn_blocking(|| task.blocking_run(tx));
151        #[cfg(not(feature = "rt-multi-thread"))]
152        let handle = std::thread::spawn(|| task.blocking_run(tx));
153        Ok(Connection {
154            tx: Some(rx.await.unwrap()?),
155            handle: Some(handle),
156        })
157    }
158
159    pub async fn close(mut self) {
160        drop(self.tx.take());
161        if let Some(handle) = self.handle.take() {
162            #[cfg(feature = "rt-multi-thread")]
163            handle.await.unwrap();
164            #[cfg(not(feature = "rt-multi-thread"))]
165            handle.join().unwrap();
166        };
167    }
168
169    /// Begins new transaction.
170    pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
171        let tx = self.tx.as_mut().unwrap().transaction().await?;
172        Ok(Transaction {
173            tx,
174            _phantom: PhantomData,
175        })
176    }
177
178    /// Executes a statement that does not return the resulting rows.
179    ///
180    /// Returns an error if the query returns resulting rows.
181    pub async fn execute<S, A>(&mut self, statement: S, arguments: A) -> Result<Status, Error>
182    where
183        S: Into<String>,
184        A: Into<Vec<Value>>,
185    {
186        self.tx
187            .as_mut()
188            .unwrap()
189            .execute(statement.into(), arguments.into())
190            .await
191    }
192
193    /// Executes a statement that returns the resulting query rows.
194    pub async fn query<S, A>(&mut self, statement: S, arguments: A) -> Result<Rows<'_>, Error>
195    where
196        S: Into<String>,
197        A: Into<Vec<Value>>,
198    {
199        let tx = self
200            .tx
201            .as_mut()
202            .unwrap()
203            .query(statement.into(), arguments.into())
204            .await?;
205        Ok(Rows {
206            tx,
207            _phantom: PhantomData,
208        })
209    }
210
211    /// Executes a statement that returns zero or one resulting query row.
212    ///
213    /// Returns an error if the query returns more than one row.
214    pub async fn query_row<S, A>(
215        &mut self,
216        statement: S,
217        arguments: A,
218    ) -> Result<Option<Row>, Error>
219    where
220        S: Into<String>,
221        A: Into<Vec<Value>>,
222    {
223        let mut rows = self.query(statement, arguments).await?;
224        let row = match rows.next().await {
225            Some(v) => v?,
226            None => return Ok(None),
227        };
228        if let Some(_) = rows.next().await {
229            return Err(Error::QueryReturnedNoRows);
230        }
231        Ok(Some(row))
232    }
233}
234
235impl Drop for Connection {
236    fn drop(&mut self) {
237        drop(self.tx.take());
238    }
239}