tokio_sqlite/
sqlite.rs

1use std::marker::PhantomData;
2use std::path::Path;
3
4use tokio::sync::oneshot;
5
6use super::connection::{ConnectionClient, ConnectionTask};
7use super::query::QueryClient;
8use super::transaction::TransactionClient;
9
10pub type Error = rusqlite::Error;
11
12pub type Value = rusqlite::types::Value;
13
14#[derive(Clone, Debug)]
15pub struct Row {
16    pub(super) values: Vec<Value>,
17}
18
19impl Row {
20    pub fn values(&self) -> &[Value] {
21        &self.values
22    }
23
24    pub fn into_values(self) -> Vec<Value> {
25        self.values
26    }
27}
28
29/// A result of executing the statement without the resulting query rows.
30#[derive(Default, Clone)]
31pub struct Status {
32    pub(super) rows_affected: usize,
33    pub(super) last_insert_id: Option<i64>,
34}
35
36impl Status {
37    pub fn rows_affected(&self) -> usize {
38        self.rows_affected
39    }
40
41    pub fn last_insert_id(&self) -> Option<i64> {
42        self.last_insert_id
43    }
44}
45
46/// An asynchronous stream of resulting query rows.
47pub struct Rows<'a> {
48    tx: QueryClient,
49    _phantom: PhantomData<&'a ()>,
50}
51
52impl<'a> Rows<'a> {
53    pub fn columns(&self) -> &[String] {
54        self.tx.columns()
55    }
56
57    pub async fn next(&mut self) -> Option<Result<Row, Error>> {
58        self.tx.next().await
59    }
60}
61
62impl<'a> Drop for Rows<'a> {
63    fn drop(&mut self) {}
64}
65
66/// An asynchronous SQLite database transaction.
67pub struct Transaction<'a> {
68    tx: TransactionClient,
69    _phantom: PhantomData<&'a ()>,
70}
71
72impl<'a> Transaction<'a> {
73    /// Consumes the transaction, committing all changes made within it.
74    pub async fn commit(mut self) -> Result<(), Error> {
75        self.tx.commit().await
76    }
77
78    /// Rolls the transaction back, discarding all changes made within it.
79    pub async fn rollback(mut self) -> Result<(), Error> {
80        self.tx.rollback().await
81    }
82
83    /// Executes a statement that does not return the resulting rows.
84    ///
85    /// Returns an error if the query returns resulting rows.
86    pub async fn execute<S, A>(&mut self, statement: S, arguments: A) -> Result<Status, Error>
87    where
88        S: Into<String>,
89        A: Into<Vec<Value>>,
90    {
91        self.tx.execute(statement.into(), arguments.into()).await
92    }
93
94    /// Executes a statement that returns the resulting query rows.
95    pub async fn query<S, A>(&mut self, statement: S, arguments: A) -> Result<Rows<'_>, Error>
96    where
97        S: Into<String>,
98        A: Into<Vec<Value>>,
99    {
100        let tx = self.tx.query(statement.into(), arguments.into()).await?;
101        Ok(Rows {
102            tx,
103            _phantom: PhantomData,
104        })
105    }
106
107    /// Executes a statement that returns zero or one resulting query row.
108    ///
109    /// Returns an error if the query returns more than one row.
110    pub async fn query_row<S, A>(
111        &mut self,
112        statement: S,
113        arguments: A,
114    ) -> Result<Option<Row>, Error>
115    where
116        S: Into<String>,
117        A: Into<Vec<Value>>,
118    {
119        let mut rows = self.query(statement, arguments).await?;
120        let row = match rows.next().await {
121            Some(v) => v?,
122            None => return Ok(None),
123        };
124        if let Some(_) = rows.next().await {
125            return Err(Error::QueryReturnedNoRows);
126        }
127        Ok(Some(row))
128    }
129}
130
131impl<'a> Drop for Transaction<'a> {
132    fn drop(&mut self) {}
133}
134
135/// An asynchronous SQLite client.
136pub struct Connection {
137    tx: Option<ConnectionClient>,
138    handle: Option<tokio::task::JoinHandle<()>>,
139}
140
141impl Connection {
142    /// Opens a new connection to a SQLite database.
143    pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
144        let task = ConnectionTask::new(path.as_ref().to_owned());
145        let (tx, rx) = oneshot::channel();
146        let handle = tokio::task::spawn_blocking(|| task.blocking_run(tx));
147        Ok(Connection {
148            tx: Some(rx.await.unwrap()?),
149            handle: Some(handle),
150        })
151    }
152
153    /// Begins new transaction.
154    pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
155        let tx = self.tx.as_mut().unwrap().transaction().await?;
156        Ok(Transaction {
157            tx,
158            _phantom: PhantomData,
159        })
160    }
161
162    /// Executes a statement that does not return the resulting rows.
163    ///
164    /// Returns an error if the query returns resulting rows.
165    pub async fn execute<S, A>(&mut self, statement: S, arguments: A) -> Result<Status, Error>
166    where
167        S: Into<String>,
168        A: Into<Vec<Value>>,
169    {
170        self.tx
171            .as_mut()
172            .unwrap()
173            .execute(statement.into(), arguments.into())
174            .await
175    }
176
177    /// Executes a statement that returns the resulting query rows.
178    pub async fn query<S, A>(&mut self, statement: S, arguments: A) -> Result<Rows<'_>, Error>
179    where
180        S: Into<String>,
181        A: Into<Vec<Value>>,
182    {
183        let tx = self
184            .tx
185            .as_mut()
186            .unwrap()
187            .query(statement.into(), arguments.into())
188            .await?;
189        Ok(Rows {
190            tx,
191            _phantom: PhantomData,
192        })
193    }
194
195    /// Executes a statement that returns zero or one resulting query row.
196    ///
197    /// Returns an error if the query returns more than one row.
198    pub async fn query_row<S, A>(
199        &mut self,
200        statement: S,
201        arguments: A,
202    ) -> Result<Option<Row>, Error>
203    where
204        S: Into<String>,
205        A: Into<Vec<Value>>,
206    {
207        let mut rows = self.query(statement, arguments).await?;
208        let row = match rows.next().await {
209            Some(v) => v?,
210            None => return Ok(None),
211        };
212        if let Some(_) = rows.next().await {
213            return Err(Error::QueryReturnedNoRows);
214        }
215        Ok(Some(row))
216    }
217}
218
219impl Drop for Connection {
220    fn drop(&mut self) {
221        drop(self.tx.take());
222        if let Some(handle) = self.handle.take() {
223            tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(handle))
224                .unwrap();
225        };
226    }
227}