sqlw-backend 0.1.0

Database executor implementations for sqlw
Documentation
//! Turso/libSQL executor and row types.

use std::future::Future;
use std::sync::Arc;

use sqlw::{FromRow, Query, RowCell, RowError, RowLike, Value};

use crate::error::DBError;

/// A borrowed row from a Turso/libSQL query result.
pub struct TursoRowRef<'a> {
    row: &'a turso::Row,
    column_names: &'a [String],
}

impl<'a> TursoRowRef<'a> {
    /// Creates a new row reference from a Turso row and its column names.
    pub fn new(row: &'a turso::Row, column_names: &'a [String]) -> Self {
        Self { row, column_names }
    }
}

impl<'a> RowLike for TursoRowRef<'a> {
    fn cell<'b>(&'b self, name: &str) -> Result<RowCell<'b>, RowError> {
        let index = self
            .column_names
            .iter()
            .position(|col| col == name)
            .ok_or_else(|| RowError::ColumnNotFound { name: name.into() })?;

        let turso_value = self
            .row
            .get_value(index)
            .map_err(|e| RowError::Any(e.to_string()))?;

        let value = match turso_value {
            turso::Value::Null => Value::Null,
            turso::Value::Integer(i) => Value::Int(i),
            turso::Value::Real(f) => Value::Float(f),
            turso::Value::Text(s) => Value::Text(s),
            turso::Value::Blob(b) => Value::Blob(b),
        };

        Ok(RowCell::Owned(value))
    }
}

/// An async Turso/libSQL executor.
///
/// Provides async database operations via the [`QueryExecutor`](sqlw::QueryExecutor) trait.
#[derive(Debug)]
pub struct TursoExecutor {
    connection: Arc<::turso::Connection>,
}

/// A transaction handle for [`TursoExecutor`].
///
/// Created with [`TursoExecutor::transaction`]. Queries executed via this
/// handle are scoped to the same transaction until committed or rolled back.
#[derive(Debug)]
pub struct TursoTransaction<'conn> {
    tx: ::turso::transaction::Transaction<'conn>,
}

impl TursoExecutor {
    /// Creates a new Turso executor using the given connector function.
    pub async fn new<F, Fut>(connector: F) -> Result<Self, DBError>
    where
        F: FnOnce() -> Fut,
        Fut: Future<Output = Result<::turso::Connection, ::turso::Error>> + Send + 'static,
    {
        let connection = connector()
            .await
            .map_err(|e| DBError::Connection(e.into()))?;
        Ok(TursoExecutor {
            connection: Arc::new(connection),
        })
    }

    /// Returns a reference to the underlying Turso connection.
    pub fn conn(&self) -> &::turso::Connection {
        &self.connection
    }

    /// Starts a new transaction using Turso's default behavior.
    ///
    /// The returned handle supports `query_void`, `query_one`, `query_list`,
    /// `commit`, and `rollback`.
    pub fn transaction(&self) -> impl Future<Output = Result<TursoTransaction<'_>, DBError>> {
        async move {
            let tx = self
                .connection
                .unchecked_transaction()
                .await
                .map_err(|e| DBError::Transaction(e.into()))?;
            Ok(TursoTransaction { tx })
        }
    }
}

impl sqlw::QueryExecutor for TursoExecutor {
    type Error = DBError;

    fn query_void(&self, query: Query) -> impl Future<Output = Result<(), DBError>> {
        let connection = Arc::clone(&self.connection);
        async move { execute_query_void(connection.as_ref(), query).await }
    }

    fn query_one<T: FromRow + Send + 'static>(
        &self,
        query: Query,
    ) -> impl Future<Output = Result<Option<T>, DBError>> {
        let connection = Arc::clone(&self.connection);
        async move { execute_query_one(connection.as_ref(), query).await }
    }

    fn query_list<T: FromRow + Send + 'static>(
        &self,
        query: Query,
    ) -> impl Future<Output = Result<Vec<T>, DBError>> {
        let connection = Arc::clone(&self.connection);
        async move { execute_query_list(connection.as_ref(), query).await }
    }

    fn batch(&self, scripts: &[fn() -> Query]) -> impl Future<Output = Result<(), DBError>> {
        async move {
            let tx = self.transaction().await?;
            for script in scripts {
                if let Err(err) = tx.query_void(script()).await {
                    let _ = tx.rollback().await;
                    return Err(err);
                }
            }
            tx.commit().await
        }
    }
}

impl TursoTransaction<'_> {
    /// Returns the underlying Turso transaction handle.
    pub fn raw(&self) -> &::turso::transaction::Transaction<'_> {
        &self.tx
    }

    /// Executes a query inside this transaction, discarding results.
    pub async fn query_void(&self, query: Query) -> Result<(), DBError> {
        execute_query_void(&self.tx, query).await
    }

    /// Executes a query inside this transaction, returning at most one row.
    pub async fn query_one<T: FromRow + Send + 'static>(
        &self,
        query: Query,
    ) -> Result<Option<T>, DBError> {
        execute_query_one(&self.tx, query).await
    }

    /// Executes a query inside this transaction, returning all rows.
    pub async fn query_list<T: FromRow + Send + 'static>(
        &self,
        query: Query,
    ) -> Result<Vec<T>, DBError> {
        execute_query_list(&self.tx, query).await
    }

    /// Commits the transaction.
    pub async fn commit(self) -> Result<(), DBError> {
        self.tx
            .commit()
            .await
            .map_err(|e| DBError::Transaction(e.into()))
    }

    /// Rolls back the transaction.
    pub async fn rollback(self) -> Result<(), DBError> {
        self.tx
            .rollback()
            .await
            .map_err(|e| DBError::Transaction(e.into()))
    }
}

async fn execute_query_void(connection: &::turso::Connection, query: Query) -> Result<(), DBError> {
    let (sql, args) = query.split();
    let args: Vec<::turso::Value> = args.into_iter().map(sqlw_to_turso_value).collect();

    connection
        .execute(&sql, args)
        .await
        .map(|_| ())
        .map_err(|e| DBError::Execution(e.into()))
}

async fn execute_query_one<T: FromRow + Send + 'static>(
    connection: &::turso::Connection,
    query: Query,
) -> Result<Option<T>, DBError> {
    let (sql, args) = query.split();
    let args: Vec<::turso::Value> = args.into_iter().map(sqlw_to_turso_value).collect();

    let mut stmt = connection
        .prepare(&sql)
        .await
        .map_err(|e| DBError::Execution(e.into()))?;

    let columns: Vec<String> = stmt
        .columns()
        .iter()
        .map(|col| col.name().to_string())
        .collect();

    let mut rows = stmt
        .query(args)
        .await
        .map_err(|e| DBError::Execution(e.into()))?;

    if let Some(row) = rows
        .next()
        .await
        .map_err(|e| DBError::Execution(e.into()))?
    {
        let row_ref = TursoRowRef::new(&row, &columns);
        Ok(Some(T::from_row(&row_ref)?))
    } else {
        Ok(None)
    }
}

async fn execute_query_list<T: FromRow + Send + 'static>(
    connection: &::turso::Connection,
    query: Query,
) -> Result<Vec<T>, DBError> {
    let (sql, args) = query.split();
    let args: Vec<::turso::Value> = args.into_iter().map(sqlw_to_turso_value).collect();

    let mut stmt = connection
        .prepare(&sql)
        .await
        .map_err(|e| DBError::Execution(e.into()))?;

    let columns: Vec<String> = stmt
        .columns()
        .iter()
        .map(|col| col.name().to_string())
        .collect();

    let mut rows = stmt
        .query(args)
        .await
        .map_err(|e| DBError::Execution(e.into()))?;

    let mut results = Vec::new();

    while let Some(row) = rows
        .next()
        .await
        .map_err(|e| DBError::Execution(e.into()))?
    {
        let row_ref = TursoRowRef::new(&row, &columns);
        results.push(T::from_row(&row_ref)?);
    }

    Ok(results)
}

fn sqlw_to_turso_value(value: Value) -> ::turso::Value {
    match value {
        Value::Text(s) => ::turso::Value::Text(s),
        Value::Int(i) => ::turso::Value::Integer(i),
        Value::Float(f) => ::turso::Value::Real(f),
        Value::Bool(b) => ::turso::Value::Integer(i64::from(b)),
        Value::Blob(b) => ::turso::Value::Blob(b),
        Value::Null => ::turso::Value::Null,
    }
}