use itertools::{
EitherOrBoth::{Both, Left, Right},
Itertools,
};
use rusqlite::{Connection, OpenFlags};
use serde::Deserialize;
use std::cell::RefCell;
use std::path::{Path, PathBuf};
use std::sync::Mutex;
#[doc(hidden)]
pub use once_cell::sync::Lazy;
#[doc(hidden)]
pub use rusqlite::{
self, named_params, params, types::FromSql, types::FromSqlResult, types::ToSql,
types::ToSqlOutput, types::Value, types::ValueRef,
};
#[doc(hidden)]
pub use serde::Serialize;
#[doc(hidden)]
pub use serde_json;
pub use turbosql_impl::{execute, select, Turbosql};
pub type Blob = Vec<u8>;
pub trait Turbosql {
fn insert(&self) -> Result<i64, Error>;
fn insert_batch<T: AsRef<Self>>(rows: &[T]) -> Result<(), Error>;
fn update(&self) -> Result<usize, Error>;
fn update_batch<T: AsRef<Self>>(rows: &[T]) -> Result<(), Error>;
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
Rusqlite(#[from] rusqlite::Error),
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),
#[error("Turbosql Error: {0}")]
OtherError(&'static str),
}
#[allow(dead_code)]
#[derive(Clone, Debug, Deserialize, Default)]
struct MigrationsToml {
migrations_append_only: Option<Vec<String>>,
output_generated_schema_for_your_information_do_not_edit: Option<String>,
}
#[derive(Clone, Debug, Default)]
struct DbPath {
path: Option<PathBuf>,
opened: bool,
}
static __DB_PATH: Lazy<Mutex<DbPath>> = Lazy::new(Default::default);
pub fn now_ms() -> i64 {
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as i64
}
fn run_migrations(conn: &mut Connection) {
cfg_if::cfg_if! {
if #[cfg(doc)] {
let toml_decoded: MigrationsToml = MigrationsToml::default();
} else if #[cfg(feature = "test")] {
let toml_decoded: MigrationsToml = toml::from_str(include_str!("../../test.migrations.toml")).unwrap();
} else {
let toml_decoded: MigrationsToml = toml::from_str(include_str!(concat!(env!("OUT_DIR"), "/migrations.toml"))).expect("Unable to decode embedded migrations.toml");
}
};
let target_migrations = toml_decoded.migrations_append_only.unwrap_or_default();
let target_migrations: Vec<_> =
target_migrations.into_iter().filter(|m| !m.starts_with("--")).collect();
conn.execute("BEGIN EXCLUSIVE TRANSACTION", params![]).unwrap();
let _ = conn.execute("ALTER TABLE turbosql_migrations RENAME TO _turbosql_migrations", params![]);
let result = conn.query_row(
"SELECT sql FROM sqlite_master WHERE name = ?",
params!["_turbosql_migrations"],
|row| {
let sql: String = row.get(0).unwrap();
Ok(sql)
},
);
match result {
Err(rusqlite::Error::QueryReturnedNoRows) => {
conn
.execute_batch(if cfg!(feature = "sqlite-compat-no-strict-tables") {
r#"CREATE TABLE _turbosql_migrations (rowid INTEGER PRIMARY KEY, migration TEXT NOT NULL)"#
} else {
r#"CREATE TABLE _turbosql_migrations (rowid INTEGER PRIMARY KEY, migration TEXT NOT NULL) STRICT"#
})
.expect("CREATE TABLE _turbosql_migrations");
}
Err(err) => {
panic!("Could not query sqlite_master table: {}", err);
}
Ok(_) => (),
}
let applied_migrations = conn
.prepare("SELECT migration FROM _turbosql_migrations ORDER BY rowid")
.unwrap()
.query_map(params![], |row| Ok(row.get(0).unwrap()))
.unwrap()
.map(|x: Result<String, _>| x.unwrap())
.filter(|m| !m.starts_with("--"))
.collect::<Vec<String>>();
applied_migrations.iter().zip_longest(&target_migrations).for_each(|item| match item {
Both(a, b) => {
if a != b {
panic!("Mismatch in Turbosql migrations! {:?} != {:?}", a, b)
}
}
Left(_) => panic!("More migrations are applied than target"),
Right(migration) => {
if !migration.starts_with("--") {
conn.execute(migration, params![]).unwrap();
}
conn
.execute("INSERT INTO _turbosql_migrations(migration) VALUES(?)", params![migration])
.unwrap();
}
});
conn.execute("COMMIT", params![]).unwrap();
}
#[derive(Debug)]
pub struct CheckpointResult {
pub busy: i64,
pub log: i64,
pub checkpointed: i64,
}
pub fn checkpoint() -> Result<CheckpointResult, Error> {
let start = std::time::Instant::now();
let db_path = __DB_PATH.lock().unwrap();
let conn = Connection::open_with_flags(
db_path.path.as_ref().unwrap(),
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
)?;
let result = conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", params![], |row| {
Ok(CheckpointResult { busy: row.get(0)?, log: row.get(1)?, checkpointed: row.get(2)? })
})?;
log::info!("db checkpointed in {:?} {:#?}", start.elapsed(), result);
Ok(result)
}
fn open_db() -> Connection {
let mut db_path = __DB_PATH.lock().unwrap();
if db_path.path.is_none() {
#[cfg(not(feature = "test"))]
let path = {
let exe_stem = std::env::current_exe().unwrap().file_stem().unwrap().to_owned();
let exe_stem_lossy = exe_stem.to_string_lossy();
let path = directories_next::ProjectDirs::from("org", &exe_stem_lossy, &exe_stem_lossy)
.unwrap()
.data_dir()
.to_owned();
std::fs::create_dir_all(&path).unwrap();
path.join(exe_stem).with_extension("sqlite")
};
#[cfg(feature = "test")]
let path = Path::new(":memory:").to_owned();
db_path.path = Some(path);
}
log::debug!("opening db at {:?}", db_path.path.as_ref().unwrap());
let mut conn = Connection::open_with_flags(
db_path.path.as_ref().unwrap(),
OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_NO_MUTEX,
)
.expect("rusqlite::Connection::open_with_flags");
conn
.execute_batch(
r#"
PRAGMA busy_timeout=3000;
PRAGMA auto_vacuum=INCREMENTAL;
PRAGMA journal_mode=WAL;
PRAGMA wal_autocheckpoint=8000;
PRAGMA synchronous=NORMAL;
"#,
)
.expect("Execute PRAGMAs");
if !db_path.opened {
run_migrations(&mut conn);
db_path.opened = true;
}
conn
}
thread_local! {
#[doc(hidden)]
pub static __TURBOSQL_DB: RefCell<Connection> = RefCell::new(open_db());
}
pub fn set_db_path(path: &Path) -> Result<(), Error> {
let mut db_path = __DB_PATH.lock().unwrap();
if db_path.opened {
return Err(Error::OtherError("Trying to set path when DB is already opened"));
}
db_path.path = Some(path.to_owned());
Ok(())
}