toasty-driver-sqlite 0.5.0

SQLite driver for Toasty
Documentation
#![warn(missing_docs)]

//! Toasty driver for [SQLite](https://www.sqlite.org/) using
//! [`rusqlite`](https://docs.rs/rusqlite).
//!
//! Supports both file-backed and in-memory databases.
//!
//! # Examples
//!
//! ```
//! use toasty_driver_sqlite::Sqlite;
//!
//! // In-memory database
//! let driver = Sqlite::in_memory();
//!
//! // File-backed database
//! let driver = Sqlite::open("path/to/db.sqlite3");
//! ```

mod value;
pub(crate) use value::Value;

use async_trait::async_trait;
use rusqlite::Connection as RusqliteConnection;
use std::{
    borrow::Cow,
    path::{Path, PathBuf},
    sync::Arc,
};
use toasty_core::{
    Result, Schema,
    driver::{
        Capability, Driver, ExecResponse,
        operation::{IsolationLevel, Operation, Transaction},
    },
    schema::db::{self, Migration, SchemaDiff, Table},
    stmt,
};
use toasty_sql::{self as sql};
use url::Url;

/// A SQLite [`Driver`] that opens connections to a file or in-memory database.
///
/// # Examples
///
/// ```
/// use toasty_driver_sqlite::Sqlite;
///
/// let driver = Sqlite::in_memory();
/// ```
#[derive(Debug)]
pub enum Sqlite {
    /// A database stored at a filesystem path.
    File(PathBuf),
    /// An ephemeral in-memory database.
    InMemory,
}

impl Sqlite {
    /// Create a new SQLite driver with an arbitrary connection URL
    pub fn new(url: impl Into<String>) -> Result<Self> {
        let url_str = url.into();
        let url = Url::parse(&url_str).map_err(toasty_core::Error::driver_operation_failed)?;

        if url.scheme() != "sqlite" {
            return Err(toasty_core::Error::invalid_connection_url(format!(
                "connection URL does not have a `sqlite` scheme; url={}",
                url_str
            )));
        }

        if url.path() == ":memory:" {
            Ok(Self::InMemory)
        } else {
            Ok(Self::File(PathBuf::from(url.path())))
        }
    }

    /// Create an in-memory SQLite database
    pub fn in_memory() -> Self {
        Self::InMemory
    }

    /// Open a SQLite database at the specified file path
    pub fn open<P: AsRef<Path>>(path: P) -> Self {
        Self::File(path.as_ref().to_path_buf())
    }
}

#[async_trait]
impl Driver for Sqlite {
    fn url(&self) -> Cow<'_, str> {
        match self {
            Sqlite::InMemory => Cow::Borrowed("sqlite::memory:"),
            Sqlite::File(path) => Cow::Owned(format!("sqlite:{}", path.display())),
        }
    }

    fn capability(&self) -> &'static Capability {
        &Capability::SQLITE
    }

    async fn connect(&self) -> toasty_core::Result<Box<dyn toasty_core::Connection>> {
        let connection = match self {
            Sqlite::File(path) => Connection::open(path)?,
            Sqlite::InMemory => Connection::in_memory(),
        };
        Ok(Box::new(connection))
    }

    fn max_connections(&self) -> Option<usize> {
        matches!(self, Self::InMemory).then_some(1)
    }

    fn generate_migration(&self, schema_diff: &SchemaDiff<'_>) -> Migration {
        let statements = sql::MigrationStatement::from_diff(schema_diff, &Capability::SQLITE);

        let sql_strings: Vec<String> = statements
            .iter()
            .map(|stmt| sql::Serializer::sqlite(stmt.schema()).serialize(stmt.statement()))
            .collect();

        Migration::new_sql_with_breakpoints(&sql_strings)
    }

    async fn reset_db(&self) -> toasty_core::Result<()> {
        match self {
            Sqlite::File(path) => {
                // Delete the file and recreate it
                if path.exists() {
                    std::fs::remove_file(path)
                        .map_err(toasty_core::Error::driver_operation_failed)?;
                }
            }
            Sqlite::InMemory => {
                // Nothing to do — each connect() creates a fresh in-memory database
            }
        }

        Ok(())
    }
}

/// An open connection to a SQLite database.
#[derive(Debug)]
pub struct Connection {
    connection: RusqliteConnection,
}

impl Connection {
    /// Open an in-memory SQLite connection.
    pub fn in_memory() -> Self {
        let connection = RusqliteConnection::open_in_memory().unwrap();

        Self { connection }
    }

    /// Open a SQLite connection to a file at `path`.
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
        let connection =
            RusqliteConnection::open(path).map_err(toasty_core::Error::driver_operation_failed)?;
        let sqlite = Self { connection };
        Ok(sqlite)
    }
}

#[async_trait]
impl toasty_core::driver::Connection for Connection {
    async fn exec(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<ExecResponse> {
        tracing::trace!(driver = "sqlite", op = %op.name(), "driver exec");

        let (sql, typed_params, ret_tys) = match op {
            Operation::QuerySql(op) => {
                assert!(
                    op.last_insert_id_hack.is_none(),
                    "last_insert_id_hack is MySQL-specific and should not be set for SQLite"
                );
                (sql::Statement::from(op.stmt), op.params, op.ret)
            }
            // Operation::Insert(op) => op.stmt.into(),
            Operation::Transaction(mut op) => {
                if let Transaction::Start { isolation, .. } = &mut op {
                    if !matches!(isolation, Some(IsolationLevel::Serializable) | None) {
                        return Err(toasty_core::Error::unsupported_feature(
                            "SQLite only supports Serializable isolation",
                        ));
                    }
                    *isolation = None;
                }
                let sql = sql::Serializer::sqlite(&schema.db).serialize_transaction(&op);
                self.connection
                    .execute(&sql, [])
                    .map_err(toasty_core::Error::driver_operation_failed)?;
                return Ok(ExecResponse::count(0));
            }
            _ => todo!("op={:#?}", op),
        };

        let sql_str = sql::Serializer::sqlite(&schema.db).serialize(&sql);

        tracing::debug!(db.system = "sqlite", db.statement = %sql_str, params = typed_params.len(), "executing SQL");

        let mut stmt = self.connection.prepare_cached(&sql_str).unwrap();

        let width = match &sql {
            sql::Statement::Query(stmt) => match &stmt.body {
                stmt::ExprSet::Select(stmt) => {
                    Some(stmt.returning.as_expr_unwrap().as_record_unwrap().len())
                }
                _ => todo!(),
            },
            sql::Statement::Insert(stmt) => stmt
                .returning
                .as_ref()
                .map(|returning| returning.as_expr_unwrap().as_record_unwrap().len()),
            sql::Statement::Delete(stmt) => stmt
                .returning
                .as_ref()
                .map(|returning| returning.as_expr_unwrap().as_record_unwrap().len()),
            sql::Statement::Update(stmt) => {
                assert!(stmt.condition.is_none(), "stmt={stmt:#?}");
                stmt.returning
                    .as_ref()
                    .map(|returning| returning.as_expr_unwrap().as_record_unwrap().len())
            }
            _ => None,
        };

        let params = typed_params
            .into_iter()
            .map(|tv| Value::from(tv.value))
            .collect::<Vec<_>>();

        if width.is_none() {
            let count = stmt
                .execute(rusqlite::params_from_iter(params.iter()))
                .map_err(toasty_core::Error::driver_operation_failed)?;

            return Ok(ExecResponse::count(count as _));
        }

        let mut rows = stmt
            .query(rusqlite::params_from_iter(params.iter()))
            .unwrap();

        let mut ret = vec![];

        let ret_tys = &ret_tys.as_ref().unwrap();

        loop {
            match rows.next() {
                Ok(Some(row)) => {
                    let mut items = vec![];

                    let width = width.unwrap();

                    for index in 0..width {
                        items.push(Value::from_sql(row, index, &ret_tys[index]).into_inner());
                    }

                    ret.push(stmt::ValueRecord::from_vec(items).into());
                }
                Ok(None) => break,
                Err(err) => {
                    return Err(toasty_core::Error::driver_operation_failed(err));
                }
            }
        }

        Ok(ExecResponse::value_stream(stmt::ValueStream::from_vec(ret)))
    }

    async fn push_schema(&mut self, schema: &Schema) -> Result<()> {
        for table in &schema.db.tables {
            tracing::debug!(table = %table.name, "creating table");
            self.create_table(&schema.db, table)?;
        }

        Ok(())
    }

    async fn applied_migrations(
        &mut self,
    ) -> Result<Vec<toasty_core::schema::db::AppliedMigration>> {
        // Ensure the migrations table exists
        self.connection
            .execute(
                "CREATE TABLE IF NOT EXISTS __toasty_migrations (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                applied_at TEXT NOT NULL
            )",
                [],
            )
            .map_err(toasty_core::Error::driver_operation_failed)?;

        // Query all applied migrations
        let mut stmt = self
            .connection
            .prepare("SELECT id FROM __toasty_migrations ORDER BY applied_at")
            .map_err(toasty_core::Error::driver_operation_failed)?;

        let rows = stmt
            .query_map([], |row| {
                let id: i64 = row.get(0)?;
                Ok(toasty_core::schema::db::AppliedMigration::new(id as u64))
            })
            .map_err(toasty_core::Error::driver_operation_failed)?;

        rows.collect::<rusqlite::Result<Vec<_>>>()
            .map_err(toasty_core::Error::driver_operation_failed)
    }

    async fn apply_migration(
        &mut self,
        id: u64,
        name: &str,
        migration: &toasty_core::schema::db::Migration,
    ) -> Result<()> {
        tracing::info!(id = id, name = %name, "applying migration");
        // Ensure the migrations table exists
        self.connection
            .execute(
                "CREATE TABLE IF NOT EXISTS __toasty_migrations (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                applied_at TEXT NOT NULL
            )",
                [],
            )
            .map_err(toasty_core::Error::driver_operation_failed)?;

        // Start transaction
        self.connection
            .execute("BEGIN", [])
            .map_err(toasty_core::Error::driver_operation_failed)?;

        // Execute each migration statement
        for statement in migration.statements() {
            if let Err(e) = self
                .connection
                .execute(statement, [])
                .map_err(toasty_core::Error::driver_operation_failed)
            {
                self.connection
                    .execute("ROLLBACK", [])
                    .map_err(toasty_core::Error::driver_operation_failed)?;
                return Err(e);
            }
        }

        // Record the migration
        if let Err(e) = self.connection.execute(
            "INSERT INTO __toasty_migrations (id, name, applied_at) VALUES (?1, ?2, datetime('now'))",
            rusqlite::params![id as i64, name],
        ).map_err(toasty_core::Error::driver_operation_failed) {
            self.connection.execute("ROLLBACK", []).map_err(toasty_core::Error::driver_operation_failed)?;
            return Err(e);
        }

        // Commit transaction
        self.connection
            .execute("COMMIT", [])
            .map_err(toasty_core::Error::driver_operation_failed)?;
        Ok(())
    }
}

impl Connection {
    fn create_table(&mut self, schema: &db::Schema, table: &Table) -> Result<()> {
        let serializer = sql::Serializer::sqlite(schema);

        let stmt = serializer.serialize(&sql::Statement::create_table(table, &Capability::SQLITE));

        self.connection
            .execute(&stmt, [])
            .map_err(toasty_core::Error::driver_operation_failed)?;

        // Create any indices
        for index in &table.indices {
            // The PK has already been created by the table statement
            if index.primary_key {
                continue;
            }

            let stmt = serializer.serialize(&sql::Statement::create_index(index));

            self.connection
                .execute(&stmt, [])
                .map_err(toasty_core::Error::driver_operation_failed)?;
        }
        Ok(())
    }
}