1use std::marker::PhantomData;
2use std::path::Path;
3
4use tokio::sync::oneshot;
5
6use super::connection::{ConnectionClient, ConnectionTask};
7use super::query::QueryClient;
8use super::transaction::TransactionClient;
9
10pub type Error = rusqlite::Error;
11
12pub type Value = rusqlite::types::Value;
13
14#[derive(Clone, Debug)]
15pub struct Row {
16 pub(super) values: Vec<Value>,
17}
18
19impl Row {
20 pub fn values(&self) -> &[Value] {
21 &self.values
22 }
23
24 pub fn into_values(self) -> Vec<Value> {
25 self.values
26 }
27}
28
29#[derive(Default, Clone)]
31pub struct Status {
32 pub(super) rows_affected: usize,
33 pub(super) last_insert_id: Option<i64>,
34}
35
36impl Status {
37 pub fn rows_affected(&self) -> usize {
38 self.rows_affected
39 }
40
41 pub fn last_insert_id(&self) -> Option<i64> {
42 self.last_insert_id
43 }
44}
45
46pub struct Rows<'a> {
48 tx: QueryClient,
49 _phantom: PhantomData<&'a ()>,
50}
51
52impl<'a> Rows<'a> {
53 pub fn columns(&self) -> &[String] {
54 self.tx.columns()
55 }
56
57 pub async fn next(&mut self) -> Option<Result<Row, Error>> {
58 self.tx.next().await
59 }
60}
61
62impl<'a> Drop for Rows<'a> {
63 fn drop(&mut self) {}
64}
65
66pub struct Transaction<'a> {
68 tx: TransactionClient,
69 _phantom: PhantomData<&'a ()>,
70}
71
72impl<'a> Transaction<'a> {
73 pub async fn commit(mut self) -> Result<(), Error> {
75 self.tx.commit().await
76 }
77
78 pub async fn rollback(mut self) -> Result<(), Error> {
80 self.tx.rollback().await
81 }
82
83 pub async fn execute<S, A>(&mut self, statement: S, arguments: A) -> Result<Status, Error>
87 where
88 S: Into<String>,
89 A: Into<Vec<Value>>,
90 {
91 self.tx.execute(statement.into(), arguments.into()).await
92 }
93
94 pub async fn query<S, A>(&mut self, statement: S, arguments: A) -> Result<Rows<'_>, Error>
96 where
97 S: Into<String>,
98 A: Into<Vec<Value>>,
99 {
100 let tx = self.tx.query(statement.into(), arguments.into()).await?;
101 Ok(Rows {
102 tx,
103 _phantom: PhantomData,
104 })
105 }
106
107 pub async fn query_row<S, A>(
111 &mut self,
112 statement: S,
113 arguments: A,
114 ) -> Result<Option<Row>, Error>
115 where
116 S: Into<String>,
117 A: Into<Vec<Value>>,
118 {
119 let mut rows = self.query(statement, arguments).await?;
120 let row = match rows.next().await {
121 Some(v) => v?,
122 None => return Ok(None),
123 };
124 if let Some(_) = rows.next().await {
125 return Err(Error::QueryReturnedNoRows);
126 }
127 Ok(Some(row))
128 }
129}
130
131impl<'a> Drop for Transaction<'a> {
132 fn drop(&mut self) {}
133}
134
135pub struct Connection {
137 tx: Option<ConnectionClient>,
138 handle: Option<tokio::task::JoinHandle<()>>,
139}
140
141impl Connection {
142 pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
144 let task = ConnectionTask::new(path.as_ref().to_owned());
145 let (tx, rx) = oneshot::channel();
146 let handle = tokio::task::spawn_blocking(|| task.blocking_run(tx));
147 Ok(Connection {
148 tx: Some(rx.await.unwrap()?),
149 handle: Some(handle),
150 })
151 }
152
153 pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
155 let tx = self.tx.as_mut().unwrap().transaction().await?;
156 Ok(Transaction {
157 tx,
158 _phantom: PhantomData,
159 })
160 }
161
162 pub async fn execute<S, A>(&mut self, statement: S, arguments: A) -> Result<Status, Error>
166 where
167 S: Into<String>,
168 A: Into<Vec<Value>>,
169 {
170 self.tx
171 .as_mut()
172 .unwrap()
173 .execute(statement.into(), arguments.into())
174 .await
175 }
176
177 pub async fn query<S, A>(&mut self, statement: S, arguments: A) -> Result<Rows<'_>, Error>
179 where
180 S: Into<String>,
181 A: Into<Vec<Value>>,
182 {
183 let tx = self
184 .tx
185 .as_mut()
186 .unwrap()
187 .query(statement.into(), arguments.into())
188 .await?;
189 Ok(Rows {
190 tx,
191 _phantom: PhantomData,
192 })
193 }
194
195 pub async fn query_row<S, A>(
199 &mut self,
200 statement: S,
201 arguments: A,
202 ) -> Result<Option<Row>, Error>
203 where
204 S: Into<String>,
205 A: Into<Vec<Value>>,
206 {
207 let mut rows = self.query(statement, arguments).await?;
208 let row = match rows.next().await {
209 Some(v) => v?,
210 None => return Ok(None),
211 };
212 if let Some(_) = rows.next().await {
213 return Err(Error::QueryReturnedNoRows);
214 }
215 Ok(Some(row))
216 }
217}
218
219impl Drop for Connection {
220 fn drop(&mut self) {
221 drop(self.tx.take());
222 if let Some(handle) = self.handle.take() {
223 tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(handle))
224 .unwrap();
225 };
226 }
227}