Skip to main content

sqlw_backend/
turso.rs

1//! Turso/libSQL executor and row types.
2
3use std::future::Future;
4use std::sync::Arc;
5
6use sqlw::{FromRow, Query, RowCell, RowError, RowLike, Value};
7
8use crate::error::DBError;
9
10/// A borrowed row from a Turso/libSQL query result.
11pub struct TursoRowRef<'a> {
12    row: &'a turso::Row,
13    column_names: &'a [String],
14}
15
16impl<'a> TursoRowRef<'a> {
17    /// Creates a new row reference from a Turso row and its column names.
18    pub fn new(row: &'a turso::Row, column_names: &'a [String]) -> Self {
19        Self { row, column_names }
20    }
21}
22
23impl<'a> RowLike for TursoRowRef<'a> {
24    fn cell<'b>(&'b self, name: &str) -> Result<RowCell<'b>, RowError> {
25        let index = self
26            .column_names
27            .iter()
28            .position(|col| col == name)
29            .ok_or_else(|| RowError::ColumnNotFound { name: name.into() })?;
30
31        let turso_value = self
32            .row
33            .get_value(index)
34            .map_err(|e| RowError::Any(e.to_string()))?;
35
36        let value = match turso_value {
37            turso::Value::Null => Value::Null,
38            turso::Value::Integer(i) => Value::Int(i),
39            turso::Value::Real(f) => Value::Float(f),
40            turso::Value::Text(s) => Value::Text(s),
41            turso::Value::Blob(b) => Value::Blob(b),
42        };
43
44        Ok(RowCell::Owned(value))
45    }
46}
47
48/// An async Turso/libSQL executor.
49///
50/// Provides async database operations via the [`QueryExecutor`](sqlw::QueryExecutor) trait.
51#[derive(Debug)]
52pub struct TursoExecutor {
53    connection: Arc<::turso::Connection>,
54}
55
56/// A transaction handle for [`TursoExecutor`].
57///
58/// Created with [`TursoExecutor::transaction`]. Queries executed via this
59/// handle are scoped to the same transaction until committed or rolled back.
60#[derive(Debug)]
61pub struct TursoTransaction<'conn> {
62    tx: ::turso::transaction::Transaction<'conn>,
63}
64
65impl TursoExecutor {
66    /// Creates a new Turso executor using the given connector function.
67    pub async fn new<F, Fut>(connector: F) -> Result<Self, DBError>
68    where
69        F: FnOnce() -> Fut,
70        Fut: Future<Output = Result<::turso::Connection, ::turso::Error>> + Send + 'static,
71    {
72        let connection = connector()
73            .await
74            .map_err(|e| DBError::Connection(e.into()))?;
75        Ok(TursoExecutor {
76            connection: Arc::new(connection),
77        })
78    }
79
80    /// Returns a reference to the underlying Turso connection.
81    pub fn conn(&self) -> &::turso::Connection {
82        &self.connection
83    }
84
85    /// Starts a new transaction using Turso's default behavior.
86    ///
87    /// The returned handle supports `query_void`, `query_one`, `query_list`,
88    /// `commit`, and `rollback`.
89    pub fn transaction(&self) -> impl Future<Output = Result<TursoTransaction<'_>, DBError>> {
90        async move {
91            let tx = self
92                .connection
93                .unchecked_transaction()
94                .await
95                .map_err(|e| DBError::Transaction(e.into()))?;
96            Ok(TursoTransaction { tx })
97        }
98    }
99}
100
101impl sqlw::QueryExecutor for TursoExecutor {
102    type Error = DBError;
103
104    fn query_void(&self, query: Query) -> impl Future<Output = Result<(), DBError>> {
105        let connection = Arc::clone(&self.connection);
106        async move { execute_query_void(connection.as_ref(), query).await }
107    }
108
109    fn query_one<T: FromRow + Send + 'static>(
110        &self,
111        query: Query,
112    ) -> impl Future<Output = Result<Option<T>, DBError>> {
113        let connection = Arc::clone(&self.connection);
114        async move { execute_query_one(connection.as_ref(), query).await }
115    }
116
117    fn query_list<T: FromRow + Send + 'static>(
118        &self,
119        query: Query,
120    ) -> impl Future<Output = Result<Vec<T>, DBError>> {
121        let connection = Arc::clone(&self.connection);
122        async move { execute_query_list(connection.as_ref(), query).await }
123    }
124
125    fn batch(&self, scripts: &[fn() -> Query]) -> impl Future<Output = Result<(), DBError>> {
126        async move {
127            let tx = self.transaction().await?;
128            for script in scripts {
129                if let Err(err) = tx.query_void(script()).await {
130                    let _ = tx.rollback().await;
131                    return Err(err);
132                }
133            }
134            tx.commit().await
135        }
136    }
137}
138
139impl TursoTransaction<'_> {
140    /// Returns the underlying Turso transaction handle.
141    pub fn raw(&self) -> &::turso::transaction::Transaction<'_> {
142        &self.tx
143    }
144
145    /// Executes a query inside this transaction, discarding results.
146    pub async fn query_void(&self, query: Query) -> Result<(), DBError> {
147        execute_query_void(&self.tx, query).await
148    }
149
150    /// Executes a query inside this transaction, returning at most one row.
151    pub async fn query_one<T: FromRow + Send + 'static>(
152        &self,
153        query: Query,
154    ) -> Result<Option<T>, DBError> {
155        execute_query_one(&self.tx, query).await
156    }
157
158    /// Executes a query inside this transaction, returning all rows.
159    pub async fn query_list<T: FromRow + Send + 'static>(
160        &self,
161        query: Query,
162    ) -> Result<Vec<T>, DBError> {
163        execute_query_list(&self.tx, query).await
164    }
165
166    /// Commits the transaction.
167    pub async fn commit(self) -> Result<(), DBError> {
168        self.tx
169            .commit()
170            .await
171            .map_err(|e| DBError::Transaction(e.into()))
172    }
173
174    /// Rolls back the transaction.
175    pub async fn rollback(self) -> Result<(), DBError> {
176        self.tx
177            .rollback()
178            .await
179            .map_err(|e| DBError::Transaction(e.into()))
180    }
181}
182
183async fn execute_query_void(connection: &::turso::Connection, query: Query) -> Result<(), DBError> {
184    let (sql, args) = query.split();
185    let args: Vec<::turso::Value> = args.into_iter().map(sqlw_to_turso_value).collect();
186
187    connection
188        .execute(&sql, args)
189        .await
190        .map(|_| ())
191        .map_err(|e| DBError::Execution(e.into()))
192}
193
194async fn execute_query_one<T: FromRow + Send + 'static>(
195    connection: &::turso::Connection,
196    query: Query,
197) -> Result<Option<T>, DBError> {
198    let (sql, args) = query.split();
199    let args: Vec<::turso::Value> = args.into_iter().map(sqlw_to_turso_value).collect();
200
201    let mut stmt = connection
202        .prepare(&sql)
203        .await
204        .map_err(|e| DBError::Execution(e.into()))?;
205
206    let columns: Vec<String> = stmt
207        .columns()
208        .iter()
209        .map(|col| col.name().to_string())
210        .collect();
211
212    let mut rows = stmt
213        .query(args)
214        .await
215        .map_err(|e| DBError::Execution(e.into()))?;
216
217    if let Some(row) = rows
218        .next()
219        .await
220        .map_err(|e| DBError::Execution(e.into()))?
221    {
222        let row_ref = TursoRowRef::new(&row, &columns);
223        Ok(Some(T::from_row(&row_ref)?))
224    } else {
225        Ok(None)
226    }
227}
228
229async fn execute_query_list<T: FromRow + Send + 'static>(
230    connection: &::turso::Connection,
231    query: Query,
232) -> Result<Vec<T>, DBError> {
233    let (sql, args) = query.split();
234    let args: Vec<::turso::Value> = args.into_iter().map(sqlw_to_turso_value).collect();
235
236    let mut stmt = connection
237        .prepare(&sql)
238        .await
239        .map_err(|e| DBError::Execution(e.into()))?;
240
241    let columns: Vec<String> = stmt
242        .columns()
243        .iter()
244        .map(|col| col.name().to_string())
245        .collect();
246
247    let mut rows = stmt
248        .query(args)
249        .await
250        .map_err(|e| DBError::Execution(e.into()))?;
251
252    let mut results = Vec::new();
253
254    while let Some(row) = rows
255        .next()
256        .await
257        .map_err(|e| DBError::Execution(e.into()))?
258    {
259        let row_ref = TursoRowRef::new(&row, &columns);
260        results.push(T::from_row(&row_ref)?);
261    }
262
263    Ok(results)
264}
265
266fn sqlw_to_turso_value(value: Value) -> ::turso::Value {
267    match value {
268        Value::Text(s) => ::turso::Value::Text(s),
269        Value::Int(i) => ::turso::Value::Integer(i),
270        Value::Float(f) => ::turso::Value::Real(f),
271        Value::Bool(b) => ::turso::Value::Integer(i64::from(b)),
272        Value::Blob(b) => ::turso::Value::Blob(b),
273        Value::Null => ::turso::Value::Null,
274    }
275}