1use std::marker::PhantomData;
2use std::path::Path;
3
4use tokio::sync::oneshot;
5
6use super::connection::{ConnectionClient, ConnectionTask};
7use super::query::QueryClient;
8use super::transaction::TransactionClient;
9
10pub type Error = rusqlite::Error;
11
12pub type Value = rusqlite::types::Value;
13
14#[derive(Clone, Debug)]
15pub struct Row {
16 pub(super) values: Vec<Value>,
17}
18
19impl Row {
20 pub fn values(&self) -> &[Value] {
21 &self.values
22 }
23
24 pub fn into_values(self) -> Vec<Value> {
25 self.values
26 }
27}
28
29#[derive(Clone, Debug, Default)]
31pub struct Status {
32 pub(super) rows_affected: usize,
33 pub(super) last_insert_id: Option<i64>,
34}
35
36impl Status {
37 pub fn rows_affected(&self) -> usize {
38 self.rows_affected
39 }
40
41 pub fn last_insert_id(&self) -> Option<i64> {
42 self.last_insert_id
43 }
44}
45
46pub struct Rows<'a> {
48 tx: QueryClient,
49 _phantom: PhantomData<&'a ()>,
50}
51
52impl<'a> Rows<'a> {
53 pub fn columns(&self) -> &[String] {
54 self.tx.columns()
55 }
56
57 pub async fn next(&mut self) -> Option<Result<Row, Error>> {
58 self.tx.next().await
59 }
60}
61
62impl<'a> Drop for Rows<'a> {
63 fn drop(&mut self) {}
64}
65
66pub struct Transaction<'a> {
68 tx: TransactionClient,
69 _phantom: PhantomData<&'a ()>,
70}
71
72impl<'a> Transaction<'a> {
73 pub async fn commit(mut self) -> Result<(), Error> {
75 self.tx.commit().await
76 }
77
78 pub async fn rollback(mut self) -> Result<(), Error> {
80 self.tx.rollback().await
81 }
82
83 pub async fn execute<S, A>(&mut self, statement: S, arguments: A) -> Result<Status, Error>
87 where
88 S: Into<String>,
89 A: Into<Vec<Value>>,
90 {
91 self.tx.execute(statement.into(), arguments.into()).await
92 }
93
94 pub async fn query<S, A>(&mut self, statement: S, arguments: A) -> Result<Rows<'_>, Error>
96 where
97 S: Into<String>,
98 A: Into<Vec<Value>>,
99 {
100 let tx = self.tx.query(statement.into(), arguments.into()).await?;
101 Ok(Rows {
102 tx,
103 _phantom: PhantomData,
104 })
105 }
106
107 pub async fn query_row<S, A>(
111 &mut self,
112 statement: S,
113 arguments: A,
114 ) -> Result<Option<Row>, Error>
115 where
116 S: Into<String>,
117 A: Into<Vec<Value>>,
118 {
119 let mut rows = self.query(statement, arguments).await?;
120 let row = match rows.next().await {
121 Some(v) => v?,
122 None => return Ok(None),
123 };
124 if let Some(_) = rows.next().await {
125 return Err(Error::QueryReturnedNoRows);
126 }
127 Ok(Some(row))
128 }
129}
130
131impl<'a> Drop for Transaction<'a> {
132 fn drop(&mut self) {}
133}
134
135pub struct Connection {
137 tx: Option<ConnectionClient>,
138 #[cfg(feature = "rt-multi-thread")]
139 handle: Option<tokio::task::JoinHandle<()>>,
140 #[cfg(not(feature = "rt-multi-thread"))]
141 handle: Option<std::thread::JoinHandle<()>>,
142}
143
144impl Connection {
145 pub async fn open<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
147 let task = ConnectionTask::new(path.as_ref().to_owned());
148 let (tx, rx) = oneshot::channel();
149 #[cfg(feature = "rt-multi-thread")]
150 let handle = tokio::task::spawn_blocking(|| task.blocking_run(tx));
151 #[cfg(not(feature = "rt-multi-thread"))]
152 let handle = std::thread::spawn(|| task.blocking_run(tx));
153 Ok(Connection {
154 tx: Some(rx.await.unwrap()?),
155 handle: Some(handle),
156 })
157 }
158
159 pub async fn close(mut self) {
160 drop(self.tx.take());
161 if let Some(handle) = self.handle.take() {
162 #[cfg(feature = "rt-multi-thread")]
163 handle.await.unwrap();
164 #[cfg(not(feature = "rt-multi-thread"))]
165 handle.join().unwrap();
166 };
167 }
168
169 pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
171 let tx = self.tx.as_mut().unwrap().transaction().await?;
172 Ok(Transaction {
173 tx,
174 _phantom: PhantomData,
175 })
176 }
177
178 pub async fn execute<S, A>(&mut self, statement: S, arguments: A) -> Result<Status, Error>
182 where
183 S: Into<String>,
184 A: Into<Vec<Value>>,
185 {
186 self.tx
187 .as_mut()
188 .unwrap()
189 .execute(statement.into(), arguments.into())
190 .await
191 }
192
193 pub async fn query<S, A>(&mut self, statement: S, arguments: A) -> Result<Rows<'_>, Error>
195 where
196 S: Into<String>,
197 A: Into<Vec<Value>>,
198 {
199 let tx = self
200 .tx
201 .as_mut()
202 .unwrap()
203 .query(statement.into(), arguments.into())
204 .await?;
205 Ok(Rows {
206 tx,
207 _phantom: PhantomData,
208 })
209 }
210
211 pub async fn query_row<S, A>(
215 &mut self,
216 statement: S,
217 arguments: A,
218 ) -> Result<Option<Row>, Error>
219 where
220 S: Into<String>,
221 A: Into<Vec<Value>>,
222 {
223 let mut rows = self.query(statement, arguments).await?;
224 let row = match rows.next().await {
225 Some(v) => v?,
226 None => return Ok(None),
227 };
228 if let Some(_) = rows.next().await {
229 return Err(Error::QueryReturnedNoRows);
230 }
231 Ok(Some(row))
232 }
233}
234
235impl Drop for Connection {
236 fn drop(&mut self) {
237 drop(self.tx.take());
238 }
239}