dinoco_engine 0.0.7

Database adapters, query execution, and migration engine components for Dinoco.
Documentation
use async_trait::async_trait;

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;

use deadpool_sqlite::{Config, Pool, Runtime};
use rusqlite::types::{ToSqlOutput, Value};

mod dialect;
mod handler;
mod migration;
mod row;

use crate::{
    ConstraintError, DinocoAdapter, DinocoClientConfig, DinocoError, DinocoQueryLog, DinocoQueryLogger, DinocoResult,
    DinocoRow, DinocoTransactionAdapter, DinocoValue, ExecutionResult,
};

pub use dialect::SqliteDialect;

static SQLITE_DIALECT: SqliteDialect = SqliteDialect;
tokio::task_local! {
    static SQLITE_TX_CONNECTION: Arc<tokio::sync::Mutex<deadpool_sqlite::Object>>;
}

#[derive(Clone)]
pub struct SqliteAdapter {
    pub url: String,
    pub pool: Arc<Pool>,
    pub query_logger: DinocoQueryLogger,
}

#[async_trait]
impl DinocoAdapter for SqliteAdapter {
    type Dialect = SqliteDialect;

    fn dialect(&self) -> &Self::Dialect {
        &SQLITE_DIALECT
    }

    async fn connect(url: String, config: DinocoClientConfig) -> DinocoResult<Self> {
        if !url.starts_with("file:") {
            return Err(DinocoError::ConnectionError(
                "Invalid sqlite connection URL. Expected format: file:path".to_string(),
            ));
        }

        let cfg = Config::new(&url);
        let pool = cfg.create_pool(Runtime::Tokio1).map_err(DinocoError::from)?;

        Ok(Self { url, pool: Arc::new(pool), query_logger: config.query_logger })
    }

    async fn execute_result(&self, query: &str, params: &[DinocoValue]) -> DinocoResult<ExecutionResult> {
        if let Ok(tx_conn) = SQLITE_TX_CONNECTION.try_with(Clone::clone) {
            let conn = tx_conn.lock().await;

            return execute_result_with_connection(&conn, query, params, &self.query_logger).await;
        }

        let conn = self.pool.get().await.map_err(DinocoError::from)?;

        execute_result_with_connection(&conn, query, params, &self.query_logger).await
    }

    async fn execute_script(&self, sql_content: &str) -> DinocoResult<()> {
        for statement in sql_content.split(';') {
            let clean_statement = statement.trim();

            if clean_statement.is_empty() {
                continue;
            }

            self.execute(clean_statement, &[]).await?;
        }

        Ok(())
    }

    async fn query_as<T: DinocoRow + Send + 'static>(
        &self,
        query: &str,
        params: &[DinocoValue],
    ) -> DinocoResult<Vec<T>> {
        if let Ok(tx_conn) = SQLITE_TX_CONNECTION.try_with(Clone::clone) {
            let conn = tx_conn.lock().await;

            return query_as_with_connection::<T>(&conn, query, params, &self.query_logger).await;
        }

        let conn = self.pool.get().await.map_err(DinocoError::from)?;

        query_as_with_connection::<T>(&conn, query, params, &self.query_logger).await
    }
}

impl DinocoTransactionAdapter for SqliteAdapter {
    fn with_transaction<'a, T, F>(&'a self, operation: F) -> Pin<Box<dyn Future<Output = DinocoResult<T>> + Send + 'a>>
    where
        T: Send + 'a,
        F: FnOnce() -> Pin<Box<dyn Future<Output = DinocoResult<T>> + Send + 'a>> + Send + 'a,
    {
        Box::pin(async move {
            if SQLITE_TX_CONNECTION.try_with(|_| ()).is_ok() {
                return operation().await;
            }

            let connection = self.pool.get().await.map_err(DinocoError::from)?;
            let tx_connection = Arc::new(tokio::sync::Mutex::new(connection));

            {
                let conn = tx_connection.lock().await;
                conn.interact(|conn| conn.execute("BEGIN", []))
                    .await
                    .map_err(DinocoError::from)?
                    .map_err(DinocoError::from)?;
            }

            let result = SQLITE_TX_CONNECTION.scope(tx_connection.clone(), async move { operation().await }).await;

            match result {
                Ok(output) => {
                    let conn = tx_connection.lock().await;
                    conn.interact(|conn| conn.execute("COMMIT", []))
                        .await
                        .map_err(DinocoError::from)?
                        .map_err(DinocoError::from)?;

                    Ok(output)
                }
                Err(error) => {
                    let conn = tx_connection.lock().await;
                    let _ = conn.interact(|conn| conn.execute("ROLLBACK", [])).await;

                    Err(error)
                }
            }
        })
    }
}

async fn execute_result_with_connection(
    conn: &deadpool_sqlite::Object,
    query: &str,
    params: &[DinocoValue],
    query_logger: &DinocoQueryLogger,
) -> DinocoResult<ExecutionResult> {
    let query_owned = query.to_string();
    let params_owned = params.to_vec();
    let logged_query = query.to_string();
    let logged_params = params.to_vec();
    let started_at = Instant::now();

    let affected_rows = conn
        .interact(move |conn| {
            let params_refs: Vec<&dyn rusqlite::ToSql> =
                params_owned.iter().map(|p| p as &dyn rusqlite::ToSql).collect();

            conn.execute(&query_owned, params_refs.as_slice())
                .map(|affected_rows| (affected_rows, conn.last_insert_rowid()))
        })
        .await
        .map_err(DinocoError::from)?
        .map_err(DinocoError::from)?;

    query_logger.log(DinocoQueryLog {
        adapter: "sqlite",
        duration: started_at.elapsed(),
        params: logged_params,
        query: logged_query,
    });

    Ok(ExecutionResult { affected_rows: affected_rows.0 as u64, last_insert_id: Some(affected_rows.1) })
}

async fn query_as_with_connection<T: DinocoRow + Send + 'static>(
    conn: &deadpool_sqlite::Object,
    query: &str,
    params: &[DinocoValue],
    query_logger: &DinocoQueryLogger,
) -> DinocoResult<Vec<T>> {
    let query_owned = query.to_string();
    let params_owned = params.to_vec();
    let logged_query = query.to_string();
    let logged_params = params.to_vec();
    let started_at = Instant::now();

    let results = conn
        .interact(move |conn| -> DinocoResult<Vec<T>> {
            let mut stmt = conn.prepare(&query_owned).map_err(DinocoError::from)?;
            let params_refs: Vec<&dyn rusqlite::ToSql> =
                params_owned.iter().map(|p| p as &dyn rusqlite::ToSql).collect();

            let mut rows = stmt.query(params_refs.as_slice()).map_err(DinocoError::from)?;
            let mut results = Vec::new();

            while let Some(row) = rows.next().map_err(DinocoError::from)? {
                results.push(T::from_row(row)?);
            }

            Ok(results)
        })
        .await
        .map_err(DinocoError::from)??;

    query_logger.log(DinocoQueryLog {
        adapter: "sqlite",
        duration: started_at.elapsed(),
        params: logged_params,
        query: logged_query,
    });

    Ok(results)
}

impl rusqlite::ToSql for DinocoValue {
    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput<'_>> {
        match self {
            DinocoValue::Null => Ok(ToSqlOutput::Owned(Value::Null)),
            DinocoValue::Integer(i) => Ok(ToSqlOutput::Owned(Value::Integer(*i))),
            DinocoValue::Float(f) => Ok(ToSqlOutput::Owned(Value::Real(*f))),
            DinocoValue::Boolean(b) => Ok(ToSqlOutput::Owned(Value::Integer(if *b { 1 } else { 0 }))),
            DinocoValue::String(s) => Ok(ToSqlOutput::Owned(Value::Text(s.clone()))),
            DinocoValue::Enum(_, s) => Ok(ToSqlOutput::Owned(Value::Text(s.clone()))),
            DinocoValue::Json(v) => Ok(ToSqlOutput::Owned(Value::Text(v.to_string()))),
            DinocoValue::Bytes(v) => Ok(ToSqlOutput::Owned(Value::Blob(v.clone()))),
            DinocoValue::DateTime(dt) => Ok(ToSqlOutput::Owned(Value::Text(dt.to_string()))),
            DinocoValue::Date(date) => Ok(ToSqlOutput::Owned(Value::Text(date.to_string()))),
        }
    }
}

impl From<deadpool_sqlite::CreatePoolError> for DinocoError {
    fn from(e: deadpool_sqlite::CreatePoolError) -> Self {
        Self::ConnectionError(format!("Failed to get connection from pool: {}", e))
    }
}

impl From<deadpool_sqlite::PoolError> for DinocoError {
    fn from(e: deadpool_sqlite::PoolError) -> Self {
        Self::ConnectionError(format!("Failed to get connection from pool: {}", e))
    }
}

impl From<deadpool_sqlite::BuildError> for DinocoError {
    fn from(e: deadpool_sqlite::BuildError) -> Self {
        Self::ConnectionError(format!("Failed to build connection pool: {}", e))
    }
}

impl From<deadpool_sqlite::InteractError> for DinocoError {
    fn from(e: deadpool_sqlite::InteractError) -> Self {
        Self::ParseError(e.to_string())
    }
}

impl From<rusqlite::Error> for DinocoError {
    fn from(e: rusqlite::Error) -> Self {
        if let Some(error) = map_sqlite_constraint_error(&e) {
            return Self::Constraint(error);
        }

        Self::Sqlite(e)
    }
}

fn map_sqlite_constraint_error(error: &rusqlite::Error) -> Option<ConstraintError> {
    let rusqlite::Error::SqliteFailure(_, message) = error else {
        return None;
    };
    let message = message.clone()?;
    let normalized = message.to_ascii_lowercase();

    if normalized.starts_with("unique constraint failed:") {
        let targets = parse_sqlite_constraint_targets(&message, "UNIQUE constraint failed:");
        let table = targets.first().and_then(|target| target.split('.').next()).map(str::to_string);
        let columns =
            targets.into_iter().map(|target| target.split('.').nth(1).unwrap_or(target.as_str()).to_string()).collect();

        return Some(ConstraintError::unique(table, columns, None, message));
    }

    if normalized.starts_with("not null constraint failed:") {
        let targets = parse_sqlite_constraint_targets(&message, "NOT NULL constraint failed:");
        let table = targets.first().and_then(|target| target.split('.').next()).map(str::to_string);
        let columns =
            targets.into_iter().map(|target| target.split('.').nth(1).unwrap_or(target.as_str()).to_string()).collect();

        return Some(ConstraintError::not_null(table, columns, None, message));
    }

    if normalized.starts_with("foreign key constraint failed") {
        return Some(ConstraintError::foreign_key(None, Vec::new(), None, message));
    }

    if normalized.starts_with("check constraint failed:") {
        let constraint =
            message.split_once(':').map(|(_, rest)| rest.trim().to_string()).filter(|item| !item.is_empty());

        return Some(ConstraintError::check(None, Vec::new(), constraint, message));
    }

    None
}

fn parse_sqlite_constraint_targets(message: &str, prefix: &str) -> Vec<String> {
    message
        .strip_prefix(prefix)
        .unwrap_or(message)
        .split(',')
        .map(str::trim)
        .filter(|item| !item.is_empty())
        .map(str::to_string)
        .collect()
}