1use std::future::Future;
4use std::sync::Arc;
5
6use sqlw::{FromRow, Query, RowCell, RowError, RowLike, Value};
7
8use crate::error::DBError;
9
10pub struct TursoRowRef<'a> {
12 row: &'a turso::Row,
13 column_names: &'a [String],
14}
15
16impl<'a> TursoRowRef<'a> {
17 pub fn new(row: &'a turso::Row, column_names: &'a [String]) -> Self {
19 Self { row, column_names }
20 }
21}
22
23impl<'a> RowLike for TursoRowRef<'a> {
24 fn cell<'b>(&'b self, name: &str) -> Result<RowCell<'b>, RowError> {
25 let index = self
26 .column_names
27 .iter()
28 .position(|col| col == name)
29 .ok_or_else(|| RowError::ColumnNotFound { name: name.into() })?;
30
31 let turso_value = self
32 .row
33 .get_value(index)
34 .map_err(|e| RowError::Any(e.to_string()))?;
35
36 let value = match turso_value {
37 turso::Value::Null => Value::Null,
38 turso::Value::Integer(i) => Value::Int(i),
39 turso::Value::Real(f) => Value::Float(f),
40 turso::Value::Text(s) => Value::Text(s),
41 turso::Value::Blob(b) => Value::Blob(b),
42 };
43
44 Ok(RowCell::Owned(value))
45 }
46}
47
48#[derive(Debug)]
52pub struct TursoExecutor {
53 connection: Arc<::turso::Connection>,
54}
55
56#[derive(Debug)]
61pub struct TursoTransaction<'conn> {
62 tx: ::turso::transaction::Transaction<'conn>,
63}
64
65impl TursoExecutor {
66 pub async fn new<F, Fut>(connector: F) -> Result<Self, DBError>
68 where
69 F: FnOnce() -> Fut,
70 Fut: Future<Output = Result<::turso::Connection, ::turso::Error>> + Send + 'static,
71 {
72 let connection = connector()
73 .await
74 .map_err(|e| DBError::Connection(e.into()))?;
75 Ok(TursoExecutor {
76 connection: Arc::new(connection),
77 })
78 }
79
80 pub fn conn(&self) -> &::turso::Connection {
82 &self.connection
83 }
84
85 pub fn transaction(&self) -> impl Future<Output = Result<TursoTransaction<'_>, DBError>> {
90 async move {
91 let tx = self
92 .connection
93 .unchecked_transaction()
94 .await
95 .map_err(|e| DBError::Transaction(e.into()))?;
96 Ok(TursoTransaction { tx })
97 }
98 }
99}
100
101impl sqlw::QueryExecutor for TursoExecutor {
102 type Error = DBError;
103
104 fn query_void(&self, query: Query) -> impl Future<Output = Result<(), DBError>> {
105 let connection = Arc::clone(&self.connection);
106 async move { execute_query_void(connection.as_ref(), query).await }
107 }
108
109 fn query_one<T: FromRow + Send + 'static>(
110 &self,
111 query: Query,
112 ) -> impl Future<Output = Result<Option<T>, DBError>> {
113 let connection = Arc::clone(&self.connection);
114 async move { execute_query_one(connection.as_ref(), query).await }
115 }
116
117 fn query_list<T: FromRow + Send + 'static>(
118 &self,
119 query: Query,
120 ) -> impl Future<Output = Result<Vec<T>, DBError>> {
121 let connection = Arc::clone(&self.connection);
122 async move { execute_query_list(connection.as_ref(), query).await }
123 }
124
125 fn batch(&self, scripts: &[fn() -> Query]) -> impl Future<Output = Result<(), DBError>> {
126 async move {
127 let tx = self.transaction().await?;
128 for script in scripts {
129 if let Err(err) = tx.query_void(script()).await {
130 let _ = tx.rollback().await;
131 return Err(err);
132 }
133 }
134 tx.commit().await
135 }
136 }
137}
138
139impl TursoTransaction<'_> {
140 pub fn raw(&self) -> &::turso::transaction::Transaction<'_> {
142 &self.tx
143 }
144
145 pub async fn query_void(&self, query: Query) -> Result<(), DBError> {
147 execute_query_void(&self.tx, query).await
148 }
149
150 pub async fn query_one<T: FromRow + Send + 'static>(
152 &self,
153 query: Query,
154 ) -> Result<Option<T>, DBError> {
155 execute_query_one(&self.tx, query).await
156 }
157
158 pub async fn query_list<T: FromRow + Send + 'static>(
160 &self,
161 query: Query,
162 ) -> Result<Vec<T>, DBError> {
163 execute_query_list(&self.tx, query).await
164 }
165
166 pub async fn commit(self) -> Result<(), DBError> {
168 self.tx
169 .commit()
170 .await
171 .map_err(|e| DBError::Transaction(e.into()))
172 }
173
174 pub async fn rollback(self) -> Result<(), DBError> {
176 self.tx
177 .rollback()
178 .await
179 .map_err(|e| DBError::Transaction(e.into()))
180 }
181}
182
183async fn execute_query_void(connection: &::turso::Connection, query: Query) -> Result<(), DBError> {
184 let (sql, args) = query.split();
185 let args: Vec<::turso::Value> = args.into_iter().map(sqlw_to_turso_value).collect();
186
187 connection
188 .execute(&sql, args)
189 .await
190 .map(|_| ())
191 .map_err(|e| DBError::Execution(e.into()))
192}
193
194async fn execute_query_one<T: FromRow + Send + 'static>(
195 connection: &::turso::Connection,
196 query: Query,
197) -> Result<Option<T>, DBError> {
198 let (sql, args) = query.split();
199 let args: Vec<::turso::Value> = args.into_iter().map(sqlw_to_turso_value).collect();
200
201 let mut stmt = connection
202 .prepare(&sql)
203 .await
204 .map_err(|e| DBError::Execution(e.into()))?;
205
206 let columns: Vec<String> = stmt
207 .columns()
208 .iter()
209 .map(|col| col.name().to_string())
210 .collect();
211
212 let mut rows = stmt
213 .query(args)
214 .await
215 .map_err(|e| DBError::Execution(e.into()))?;
216
217 if let Some(row) = rows
218 .next()
219 .await
220 .map_err(|e| DBError::Execution(e.into()))?
221 {
222 let row_ref = TursoRowRef::new(&row, &columns);
223 Ok(Some(T::from_row(&row_ref)?))
224 } else {
225 Ok(None)
226 }
227}
228
229async fn execute_query_list<T: FromRow + Send + 'static>(
230 connection: &::turso::Connection,
231 query: Query,
232) -> Result<Vec<T>, DBError> {
233 let (sql, args) = query.split();
234 let args: Vec<::turso::Value> = args.into_iter().map(sqlw_to_turso_value).collect();
235
236 let mut stmt = connection
237 .prepare(&sql)
238 .await
239 .map_err(|e| DBError::Execution(e.into()))?;
240
241 let columns: Vec<String> = stmt
242 .columns()
243 .iter()
244 .map(|col| col.name().to_string())
245 .collect();
246
247 let mut rows = stmt
248 .query(args)
249 .await
250 .map_err(|e| DBError::Execution(e.into()))?;
251
252 let mut results = Vec::new();
253
254 while let Some(row) = rows
255 .next()
256 .await
257 .map_err(|e| DBError::Execution(e.into()))?
258 {
259 let row_ref = TursoRowRef::new(&row, &columns);
260 results.push(T::from_row(&row_ref)?);
261 }
262
263 Ok(results)
264}
265
266fn sqlw_to_turso_value(value: Value) -> ::turso::Value {
267 match value {
268 Value::Text(s) => ::turso::Value::Text(s),
269 Value::Int(i) => ::turso::Value::Integer(i),
270 Value::Float(f) => ::turso::Value::Real(f),
271 Value::Bool(b) => ::turso::Value::Integer(i64::from(b)),
272 Value::Blob(b) => ::turso::Value::Blob(b),
273 Value::Null => ::turso::Value::Null,
274 }
275}