#![warn(missing_docs)]
mod value;
pub(crate) use value::Value;
use async_trait::async_trait;
use rusqlite::Connection as RusqliteConnection;
use std::{
borrow::Cow,
path::{Path, PathBuf},
sync::Arc,
};
use toasty_core::{
Result, Schema,
driver::{
Capability, Driver, ExecResponse,
operation::{IsolationLevel, Operation, Transaction},
},
schema::db::{self, Migration, SchemaDiff, Table},
stmt,
};
use toasty_sql::{self as sql};
use url::Url;
#[derive(Debug)]
pub enum Sqlite {
File(PathBuf),
InMemory,
}
impl Sqlite {
pub fn new(url: impl Into<String>) -> Result<Self> {
let url_str = url.into();
let url = Url::parse(&url_str).map_err(toasty_core::Error::driver_operation_failed)?;
if url.scheme() != "sqlite" {
return Err(toasty_core::Error::invalid_connection_url(format!(
"connection URL does not have a `sqlite` scheme; url={}",
url_str
)));
}
if url.path() == ":memory:" {
Ok(Self::InMemory)
} else {
Ok(Self::File(PathBuf::from(url.path())))
}
}
pub fn in_memory() -> Self {
Self::InMemory
}
pub fn open<P: AsRef<Path>>(path: P) -> Self {
Self::File(path.as_ref().to_path_buf())
}
}
#[async_trait]
impl Driver for Sqlite {
fn url(&self) -> Cow<'_, str> {
match self {
Sqlite::InMemory => Cow::Borrowed("sqlite::memory:"),
Sqlite::File(path) => Cow::Owned(format!("sqlite:{}", path.display())),
}
}
fn capability(&self) -> &'static Capability {
&Capability::SQLITE
}
async fn connect(&self) -> toasty_core::Result<Box<dyn toasty_core::Connection>> {
let connection = match self {
Sqlite::File(path) => Connection::open(path)?,
Sqlite::InMemory => Connection::in_memory(),
};
Ok(Box::new(connection))
}
fn max_connections(&self) -> Option<usize> {
matches!(self, Self::InMemory).then_some(1)
}
fn generate_migration(&self, schema_diff: &SchemaDiff<'_>) -> Migration {
let statements = sql::MigrationStatement::from_diff(schema_diff, &Capability::SQLITE);
let sql_strings: Vec<String> = statements
.iter()
.map(|stmt| sql::Serializer::sqlite(stmt.schema()).serialize(stmt.statement()))
.collect();
Migration::new_sql_with_breakpoints(&sql_strings)
}
async fn reset_db(&self) -> toasty_core::Result<()> {
match self {
Sqlite::File(path) => {
if path.exists() {
std::fs::remove_file(path)
.map_err(toasty_core::Error::driver_operation_failed)?;
}
}
Sqlite::InMemory => {
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct Connection {
connection: RusqliteConnection,
}
impl Connection {
pub fn in_memory() -> Self {
let connection = RusqliteConnection::open_in_memory().unwrap();
Self { connection }
}
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let connection =
RusqliteConnection::open(path).map_err(toasty_core::Error::driver_operation_failed)?;
let sqlite = Self { connection };
Ok(sqlite)
}
}
#[async_trait]
impl toasty_core::driver::Connection for Connection {
async fn exec(&mut self, schema: &Arc<Schema>, op: Operation) -> Result<ExecResponse> {
tracing::trace!(driver = "sqlite", op = %op.name(), "driver exec");
let (sql, typed_params, ret_tys) = match op {
Operation::QuerySql(op) => {
assert!(
op.last_insert_id_hack.is_none(),
"last_insert_id_hack is MySQL-specific and should not be set for SQLite"
);
(sql::Statement::from(op.stmt), op.params, op.ret)
}
Operation::Transaction(mut op) => {
if let Transaction::Start { isolation, .. } = &mut op {
if !matches!(isolation, Some(IsolationLevel::Serializable) | None) {
return Err(toasty_core::Error::unsupported_feature(
"SQLite only supports Serializable isolation",
));
}
*isolation = None;
}
let sql = sql::Serializer::sqlite(&schema.db).serialize_transaction(&op);
self.connection
.execute(&sql, [])
.map_err(toasty_core::Error::driver_operation_failed)?;
return Ok(ExecResponse::count(0));
}
_ => todo!("op={:#?}", op),
};
let sql_str = sql::Serializer::sqlite(&schema.db).serialize(&sql);
tracing::debug!(db.system = "sqlite", db.statement = %sql_str, params = typed_params.len(), "executing SQL");
let mut stmt = self.connection.prepare_cached(&sql_str).unwrap();
let width = match &sql {
sql::Statement::Query(stmt) => match &stmt.body {
stmt::ExprSet::Select(stmt) => {
Some(stmt.returning.as_expr_unwrap().as_record_unwrap().len())
}
_ => todo!(),
},
sql::Statement::Insert(stmt) => stmt
.returning
.as_ref()
.map(|returning| returning.as_expr_unwrap().as_record_unwrap().len()),
sql::Statement::Delete(stmt) => stmt
.returning
.as_ref()
.map(|returning| returning.as_expr_unwrap().as_record_unwrap().len()),
sql::Statement::Update(stmt) => {
assert!(stmt.condition.is_none(), "stmt={stmt:#?}");
stmt.returning
.as_ref()
.map(|returning| returning.as_expr_unwrap().as_record_unwrap().len())
}
_ => None,
};
let params = typed_params
.into_iter()
.map(|tv| Value::from(tv.value))
.collect::<Vec<_>>();
if width.is_none() {
let count = stmt
.execute(rusqlite::params_from_iter(params.iter()))
.map_err(toasty_core::Error::driver_operation_failed)?;
return Ok(ExecResponse::count(count as _));
}
let mut rows = stmt
.query(rusqlite::params_from_iter(params.iter()))
.unwrap();
let mut ret = vec![];
let ret_tys = &ret_tys.as_ref().unwrap();
loop {
match rows.next() {
Ok(Some(row)) => {
let mut items = vec![];
let width = width.unwrap();
for index in 0..width {
items.push(Value::from_sql(row, index, &ret_tys[index]).into_inner());
}
ret.push(stmt::ValueRecord::from_vec(items).into());
}
Ok(None) => break,
Err(err) => {
return Err(toasty_core::Error::driver_operation_failed(err));
}
}
}
Ok(ExecResponse::value_stream(stmt::ValueStream::from_vec(ret)))
}
async fn push_schema(&mut self, schema: &Schema) -> Result<()> {
for table in &schema.db.tables {
tracing::debug!(table = %table.name, "creating table");
self.create_table(&schema.db, table)?;
}
Ok(())
}
async fn applied_migrations(
&mut self,
) -> Result<Vec<toasty_core::schema::db::AppliedMigration>> {
self.connection
.execute(
"CREATE TABLE IF NOT EXISTS __toasty_migrations (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at TEXT NOT NULL
)",
[],
)
.map_err(toasty_core::Error::driver_operation_failed)?;
let mut stmt = self
.connection
.prepare("SELECT id FROM __toasty_migrations ORDER BY applied_at")
.map_err(toasty_core::Error::driver_operation_failed)?;
let rows = stmt
.query_map([], |row| {
let id: i64 = row.get(0)?;
Ok(toasty_core::schema::db::AppliedMigration::new(id as u64))
})
.map_err(toasty_core::Error::driver_operation_failed)?;
rows.collect::<rusqlite::Result<Vec<_>>>()
.map_err(toasty_core::Error::driver_operation_failed)
}
async fn apply_migration(
&mut self,
id: u64,
name: &str,
migration: &toasty_core::schema::db::Migration,
) -> Result<()> {
tracing::info!(id = id, name = %name, "applying migration");
self.connection
.execute(
"CREATE TABLE IF NOT EXISTS __toasty_migrations (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at TEXT NOT NULL
)",
[],
)
.map_err(toasty_core::Error::driver_operation_failed)?;
self.connection
.execute("BEGIN", [])
.map_err(toasty_core::Error::driver_operation_failed)?;
for statement in migration.statements() {
if let Err(e) = self
.connection
.execute(statement, [])
.map_err(toasty_core::Error::driver_operation_failed)
{
self.connection
.execute("ROLLBACK", [])
.map_err(toasty_core::Error::driver_operation_failed)?;
return Err(e);
}
}
if let Err(e) = self.connection.execute(
"INSERT INTO __toasty_migrations (id, name, applied_at) VALUES (?1, ?2, datetime('now'))",
rusqlite::params![id as i64, name],
).map_err(toasty_core::Error::driver_operation_failed) {
self.connection.execute("ROLLBACK", []).map_err(toasty_core::Error::driver_operation_failed)?;
return Err(e);
}
self.connection
.execute("COMMIT", [])
.map_err(toasty_core::Error::driver_operation_failed)?;
Ok(())
}
}
impl Connection {
fn create_table(&mut self, schema: &db::Schema, table: &Table) -> Result<()> {
let serializer = sql::Serializer::sqlite(schema);
let stmt = serializer.serialize(&sql::Statement::create_table(table, &Capability::SQLITE));
self.connection
.execute(&stmt, [])
.map_err(toasty_core::Error::driver_operation_failed)?;
for index in &table.indices {
if index.primary_key {
continue;
}
let stmt = serializer.serialize(&sql::Statement::create_index(index));
self.connection
.execute(&stmt, [])
.map_err(toasty_core::Error::driver_operation_failed)?;
}
Ok(())
}
}