#![forbid(unsafe_code)]
use jerrycan_core::{App, Error, Extension, Result};
pub use sea_query;
pub use sea_query_binder;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Backend {
Sqlite,
Postgres,
}
#[derive(Clone)]
pub struct Db {
pool: sqlx::AnyPool,
backend: Backend,
}
impl Db {
pub async fn connect(url: &str) -> Result<Self> {
sqlx::any::install_default_drivers(); let backend = if url.starts_with("postgres") {
Backend::Postgres
} else if url.starts_with("sqlite") {
Backend::Sqlite
} else {
return Err(Error::internal(format!(
"unsupported database url scheme: `{url}` (sqlite:// or postgres:// in v0)"
)));
};
let max = match backend {
Backend::Sqlite => 1,
Backend::Postgres => 5,
};
let pool = sqlx::any::AnyPoolOptions::new()
.max_connections(max)
.connect(url)
.await
.map_err(db_error)?;
Ok(Self { pool, backend })
}
pub async fn from_env() -> Result<Self> {
let url = std::env::var("JERRYCAN_DATABASE_URL")
.unwrap_or_else(|_| "sqlite::memory:".to_string());
Self::connect(&url).await
}
pub fn pool(&self) -> &sqlx::AnyPool {
&self.pool
}
pub fn backend(&self) -> Backend {
self.backend
}
pub fn sql(&self, query: &str) -> String {
translate_placeholders(query, self.backend)
}
pub fn query_builder(&self) -> &'static dyn sea_query::QueryBuilder {
match self.backend {
Backend::Sqlite => &sea_query::SqliteQueryBuilder,
Backend::Postgres => &sea_query::PostgresQueryBuilder,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct Migration {
pub name: &'static str,
pub sqlite: &'static str,
pub postgres: &'static str,
}
#[derive(Debug, Clone)]
pub struct OwnedMigration {
pub name: String,
pub sqlite: String,
pub postgres: String,
}
impl Db {
pub async fn migrate(&self, migrations: &[Migration]) -> Result<Vec<String>> {
self.migrate_iter(migrations.iter().map(|m| (m.name, m.sqlite, m.postgres)))
.await
}
pub async fn migrate_owned(&self, migrations: &[OwnedMigration]) -> Result<Vec<String>> {
self.migrate_iter(
migrations
.iter()
.map(|m| (m.name.as_str(), m.sqlite.as_str(), m.postgres.as_str())),
)
.await
}
async fn migrate_iter<'a>(
&self,
items: impl Iterator<Item = (&'a str, &'a str, &'a str)>,
) -> Result<Vec<String>> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS _jerrycan_migrations (name TEXT PRIMARY KEY, applied_at TEXT NOT NULL)",
)
.execute(&self.pool)
.await
.map_err(db_error)?;
let mut applied = Vec::new();
for (name, sqlite, postgres) in items {
let seen =
sqlx::query(&self.sql("SELECT name FROM _jerrycan_migrations WHERE name = ?"))
.bind(name)
.fetch_optional(&self.pool)
.await
.map_err(db_error)?;
if seen.is_some() {
continue;
}
let statement = match self.backend {
Backend::Sqlite => sqlite,
Backend::Postgres => postgres,
};
sqlx::query(statement)
.execute(&self.pool)
.await
.map_err(|e| {
eprintln!("jerrycan-db: migration `{name}` failed");
db_error(e)
})?;
sqlx::query(
&self.sql("INSERT INTO _jerrycan_migrations (name, applied_at) VALUES (?, ?)"),
)
.bind(name)
.bind(chrono_free_timestamp())
.execute(&self.pool)
.await
.map_err(db_error)?;
applied.push(name.to_string());
}
Ok(applied)
}
}
fn chrono_free_timestamp() -> String {
let secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
format!("unix:{secs}")
}
pub fn translate_placeholders(query: &str, backend: Backend) -> String {
match backend {
Backend::Sqlite => query.to_string(),
Backend::Postgres => {
let mut out = String::with_capacity(query.len() + 8);
let mut n = 0;
for ch in query.chars() {
if ch == '?' {
n += 1;
out.push('$');
out.push_str(&n.to_string());
} else {
out.push(ch);
}
}
out
}
}
}
pub fn db_error(e: sqlx::Error) -> Error {
eprintln!("jerrycan-db: {e}");
if let sqlx::Error::Database(ref db) = e
&& db.is_unique_violation()
{
return Error::conflict("conflict: a row with this key already exists");
}
Error::new(
jerrycan_core::http::StatusCode::INTERNAL_SERVER_ERROR,
"JC0510",
"database error",
)
}
impl Extension for Db {
fn register(self, app: App) -> App {
app.provide(self)
}
}
pub use sqlx;
#[cfg(test)]
mod tests {
use super::*;
use sqlx::Row;
#[tokio::test]
async fn sqlite_memory_is_one_database_across_queries() {
let db = Db::connect("sqlite::memory:").await.unwrap();
assert_eq!(db.backend(), Backend::Sqlite);
sqlx::query("CREATE TABLE t (x BIGINT)")
.execute(db.pool())
.await
.unwrap();
sqlx::query("INSERT INTO t (x) VALUES (?)")
.bind(7i64)
.execute(db.pool())
.await
.unwrap();
let row = sqlx::query("SELECT x FROM t")
.fetch_one(db.pool())
.await
.unwrap();
let x: i64 = row.get("x");
assert_eq!(x, 7, "second pooled query must see the first one's table");
}
#[test]
fn placeholder_translation_is_backend_aware() {
assert_eq!(
translate_placeholders("INSERT INTO t (a, b) VALUES (?, ?)", Backend::Postgres),
"INSERT INTO t (a, b) VALUES ($1, $2)"
);
assert_eq!(
translate_placeholders("INSERT INTO t (a, b) VALUES (?, ?)", Backend::Sqlite),
"INSERT INTO t (a, b) VALUES (?, ?)"
);
}
#[tokio::test]
async fn from_env_defaults_to_sqlite_memory() {
let db = Db::from_env().await.unwrap();
assert_eq!(db.backend(), Backend::Sqlite);
}
#[test]
fn db_errors_are_jc0510_and_leak_nothing() {
let e = db_error(sqlx::Error::RowNotFound);
assert_eq!(e.code(), "JC0510");
assert_eq!(e.message(), "database error");
}
#[tokio::test]
async fn sea_query_builds_and_executes_via_the_any_pool() {
use sea_query::{Alias, Expr, Query};
use sea_query_binder::SqlxBinder;
use sqlx::Row;
let db = Db::connect("sqlite::memory:").await.unwrap();
sqlx::query("CREATE TABLE sq (id INTEGER PRIMARY KEY, title TEXT NOT NULL)")
.execute(db.pool())
.await
.unwrap();
let (sql, values) = Query::insert()
.into_table(Alias::new("sq"))
.columns([Alias::new("id"), Alias::new("title")])
.values_panic([7.into(), "hello".into()])
.returning(Query::returning().columns([Alias::new("id")]))
.build_any_sqlx(db.query_builder());
let row = sqlx::query_with(&sql, values)
.fetch_one(db.pool())
.await
.unwrap();
assert_eq!(row.get::<i64, _>("id"), 7, "RETURNING id round-trips");
let (sql, values) = Query::select()
.columns([Alias::new("id"), Alias::new("title")])
.from(Alias::new("sq"))
.and_where(Expr::col(Alias::new("id")).eq(7))
.build_any_sqlx(db.query_builder());
let row = sqlx::query_with(&sql, values)
.fetch_one(db.pool())
.await
.unwrap();
assert_eq!(row.get::<String, _>("title"), "hello");
}
#[tokio::test]
async fn unique_violations_map_to_409_conflict() {
let db = Db::connect("sqlite::memory:").await.unwrap();
sqlx::query("CREATE TABLE u (id INTEGER PRIMARY KEY, t TEXT)")
.execute(db.pool())
.await
.unwrap();
sqlx::query("INSERT INTO u (id, t) VALUES (1, 'a')")
.execute(db.pool())
.await
.unwrap();
let dup = sqlx::query("INSERT INTO u (id, t) VALUES (1, 'b')")
.execute(db.pool())
.await
.expect_err("duplicate pk must fail");
let e = db_error(dup);
assert_eq!(e.code(), "JC0409");
assert_eq!(e.status().as_u16(), 409);
assert!(!e.message().contains("sqlite"), "{}", e.message());
}
fn demo_migrations() -> Vec<Migration> {
vec![
Migration {
name: "0001_create_todos",
sqlite: "CREATE TABLE todos (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL)",
postgres: "CREATE TABLE todos (id BIGSERIAL PRIMARY KEY, title TEXT NOT NULL)",
},
Migration {
name: "0002_add_done",
sqlite: "ALTER TABLE todos ADD COLUMN done BOOLEAN NOT NULL DEFAULT 0",
postgres: "ALTER TABLE todos ADD COLUMN done BOOLEAN NOT NULL DEFAULT FALSE",
},
]
}
#[tokio::test]
async fn migrations_apply_in_order_and_only_once() {
let db = Db::connect("sqlite::memory:").await.unwrap();
let applied = db.migrate(&demo_migrations()).await.unwrap();
assert_eq!(applied, vec!["0001_create_todos", "0002_add_done"]);
let applied = db.migrate(&demo_migrations()).await.unwrap();
assert!(applied.is_empty());
sqlx::query("INSERT INTO todos (title, done) VALUES (?, ?)")
.bind("x")
.bind(true)
.execute(db.pool())
.await
.unwrap();
}
#[tokio::test]
async fn owned_migrations_apply_in_order_and_only_once() {
let db = Db::connect("sqlite::memory:").await.unwrap();
let owned = vec![
OwnedMigration {
name: "0001_create_todos".into(),
sqlite:
"CREATE TABLE todos (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL)"
.into(),
postgres: "CREATE TABLE todos (id BIGSERIAL PRIMARY KEY, title TEXT NOT NULL)"
.into(),
},
OwnedMigration {
name: "0002_add_done".into(),
sqlite: "ALTER TABLE todos ADD COLUMN done BOOLEAN NOT NULL DEFAULT 0".into(),
postgres: "ALTER TABLE todos ADD COLUMN done BOOLEAN NOT NULL DEFAULT FALSE".into(),
},
];
let applied = db.migrate_owned(&owned).await.unwrap();
assert_eq!(applied, vec!["0001_create_todos", "0002_add_done"]);
let applied = db.migrate_owned(&owned).await.unwrap();
assert!(applied.is_empty());
}
#[tokio::test]
async fn a_failing_migration_surfaces_jc0510_and_is_not_recorded() {
let db = Db::connect("sqlite::memory:").await.unwrap();
let bad = vec![Migration {
name: "0001_broken",
sqlite: "CREATE GARBAGE",
postgres: "CREATE GARBAGE",
}];
let err = db.migrate(&bad).await.unwrap_err();
assert_eq!(err.code(), "JC0510");
let good = vec![Migration {
name: "0001_broken",
sqlite: "CREATE TABLE ok (x BIGINT)",
postgres: "CREATE TABLE ok (x BIGINT)",
}];
let applied = db.migrate(&good).await.unwrap();
assert_eq!(applied, vec!["0001_broken"]);
}
}