1#![forbid(unsafe_code)]
6
7use jerrycan_core::{App, Error, Extension, Result};
8
9pub use sea_query;
13pub use sea_query_binder;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum Backend {
19 Sqlite,
20 Postgres,
21}
22
23#[derive(Clone)]
26pub struct Db {
27 pool: sqlx::AnyPool,
28 backend: Backend,
29}
30
31impl Db {
32 pub async fn connect(url: &str) -> Result<Self> {
34 sqlx::any::install_default_drivers(); let backend = if url.starts_with("postgres") {
36 Backend::Postgres
37 } else if url.starts_with("sqlite") {
38 Backend::Sqlite
39 } else {
40 return Err(Error::internal(format!(
41 "unsupported database url scheme: `{url}` (sqlite:// or postgres:// in v0)"
42 )));
43 };
44 let max = match backend {
47 Backend::Sqlite => 1,
48 Backend::Postgres => 5,
49 };
50 let pool = sqlx::any::AnyPoolOptions::new()
51 .max_connections(max)
52 .connect(url)
53 .await
54 .map_err(db_error)?;
55 Ok(Self { pool, backend })
56 }
57
58 pub async fn from_env() -> Result<Self> {
60 let url = std::env::var("JERRYCAN_DATABASE_URL")
61 .unwrap_or_else(|_| "sqlite::memory:".to_string());
62 Self::connect(&url).await
63 }
64
65 pub fn pool(&self) -> &sqlx::AnyPool {
66 &self.pool
67 }
68
69 pub fn backend(&self) -> Backend {
70 self.backend
71 }
72
73 pub fn sql(&self, query: &str) -> String {
75 translate_placeholders(query, self.backend)
76 }
77
78 pub fn query_builder(&self) -> &'static dyn sea_query::QueryBuilder {
82 match self.backend {
83 Backend::Sqlite => &sea_query::SqliteQueryBuilder,
84 Backend::Postgres => &sea_query::PostgresQueryBuilder,
85 }
86 }
87}
88
89#[derive(Debug, Clone, Copy)]
92pub struct Migration {
93 pub name: &'static str,
94 pub sqlite: &'static str,
95 pub postgres: &'static str,
96}
97
98#[derive(Debug, Clone)]
101pub struct OwnedMigration {
102 pub name: String,
103 pub sqlite: String,
104 pub postgres: String,
105}
106
107impl Db {
108 pub async fn migrate(&self, migrations: &[Migration]) -> Result<Vec<String>> {
112 self.migrate_iter(migrations.iter().map(|m| (m.name, m.sqlite, m.postgres)))
113 .await
114 }
115
116 pub async fn migrate_owned(&self, migrations: &[OwnedMigration]) -> Result<Vec<String>> {
118 self.migrate_iter(
119 migrations
120 .iter()
121 .map(|m| (m.name.as_str(), m.sqlite.as_str(), m.postgres.as_str())),
122 )
123 .await
124 }
125
126 async fn migrate_iter<'a>(
129 &self,
130 items: impl Iterator<Item = (&'a str, &'a str, &'a str)>,
131 ) -> Result<Vec<String>> {
132 sqlx::query(
133 "CREATE TABLE IF NOT EXISTS _jerrycan_migrations (name TEXT PRIMARY KEY, applied_at TEXT NOT NULL)",
134 )
135 .execute(&self.pool)
136 .await
137 .map_err(db_error)?;
138
139 let mut applied = Vec::new();
140 for (name, sqlite, postgres) in items {
141 let seen =
142 sqlx::query(&self.sql("SELECT name FROM _jerrycan_migrations WHERE name = ?"))
143 .bind(name)
144 .fetch_optional(&self.pool)
145 .await
146 .map_err(db_error)?;
147 if seen.is_some() {
148 continue;
149 }
150 let statement = match self.backend {
151 Backend::Sqlite => sqlite,
152 Backend::Postgres => postgres,
153 };
154 sqlx::query(statement)
155 .execute(&self.pool)
156 .await
157 .map_err(|e| {
158 eprintln!("jerrycan-db: migration `{name}` failed");
159 db_error(e)
160 })?;
161 sqlx::query(
162 &self.sql("INSERT INTO _jerrycan_migrations (name, applied_at) VALUES (?, ?)"),
163 )
164 .bind(name)
165 .bind(chrono_free_timestamp())
166 .execute(&self.pool)
167 .await
168 .map_err(db_error)?;
169 applied.push(name.to_string());
170 }
171 Ok(applied)
172 }
173}
174
175fn chrono_free_timestamp() -> String {
177 let secs = std::time::SystemTime::now()
178 .duration_since(std::time::UNIX_EPOCH)
179 .map(|d| d.as_secs())
180 .unwrap_or(0);
181 format!("unix:{secs}")
182}
183
184pub fn translate_placeholders(query: &str, backend: Backend) -> String {
187 match backend {
188 Backend::Sqlite => query.to_string(),
189 Backend::Postgres => {
190 let mut out = String::with_capacity(query.len() + 8);
191 let mut n = 0;
192 for ch in query.chars() {
193 if ch == '?' {
194 n += 1;
195 out.push('$');
196 out.push_str(&n.to_string());
197 } else {
198 out.push(ch);
199 }
200 }
201 out
202 }
203 }
204}
205
206pub fn db_error(e: sqlx::Error) -> Error {
211 eprintln!("jerrycan-db: {e}");
212 if let sqlx::Error::Database(ref db) = e
213 && db.is_unique_violation()
214 {
215 return Error::conflict("conflict: a row with this key already exists");
216 }
217 Error::new(
218 jerrycan_core::http::StatusCode::INTERNAL_SERVER_ERROR,
219 "JC0510",
220 "database error",
221 )
222}
223
224impl Extension for Db {
225 fn register(self, app: App) -> App {
226 app.provide(self)
227 }
228}
229
230pub use sqlx;
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use sqlx::Row;
238
239 #[tokio::test]
240 async fn sqlite_memory_is_one_database_across_queries() {
241 let db = Db::connect("sqlite::memory:").await.unwrap();
244 assert_eq!(db.backend(), Backend::Sqlite);
245 sqlx::query("CREATE TABLE t (x BIGINT)")
246 .execute(db.pool())
247 .await
248 .unwrap();
249 sqlx::query("INSERT INTO t (x) VALUES (?)")
250 .bind(7i64)
251 .execute(db.pool())
252 .await
253 .unwrap();
254 let row = sqlx::query("SELECT x FROM t")
255 .fetch_one(db.pool())
256 .await
257 .unwrap();
258 let x: i64 = row.get("x");
259 assert_eq!(x, 7, "second pooled query must see the first one's table");
260 }
261
262 #[test]
263 fn placeholder_translation_is_backend_aware() {
264 assert_eq!(
265 translate_placeholders("INSERT INTO t (a, b) VALUES (?, ?)", Backend::Postgres),
266 "INSERT INTO t (a, b) VALUES ($1, $2)"
267 );
268 assert_eq!(
269 translate_placeholders("INSERT INTO t (a, b) VALUES (?, ?)", Backend::Sqlite),
270 "INSERT INTO t (a, b) VALUES (?, ?)"
271 );
272 }
273
274 #[tokio::test]
275 async fn from_env_defaults_to_sqlite_memory() {
276 let db = Db::from_env().await.unwrap();
278 assert_eq!(db.backend(), Backend::Sqlite);
279 }
280
281 #[test]
282 fn db_errors_are_jc0510_and_leak_nothing() {
283 let e = db_error(sqlx::Error::RowNotFound);
284 assert_eq!(e.code(), "JC0510");
285 assert_eq!(e.message(), "database error");
286 }
287
288 #[tokio::test]
292 async fn sea_query_builds_and_executes_via_the_any_pool() {
293 use sea_query::{Alias, Expr, Query};
294 use sea_query_binder::SqlxBinder;
295 use sqlx::Row;
296
297 let db = Db::connect("sqlite::memory:").await.unwrap();
298 sqlx::query("CREATE TABLE sq (id INTEGER PRIMARY KEY, title TEXT NOT NULL)")
299 .execute(db.pool())
300 .await
301 .unwrap();
302
303 let (sql, values) = Query::insert()
304 .into_table(Alias::new("sq"))
305 .columns([Alias::new("id"), Alias::new("title")])
306 .values_panic([7.into(), "hello".into()])
307 .returning(Query::returning().columns([Alias::new("id")]))
308 .build_any_sqlx(db.query_builder());
309 let row = sqlx::query_with(&sql, values)
310 .fetch_one(db.pool())
311 .await
312 .unwrap();
313 assert_eq!(row.get::<i64, _>("id"), 7, "RETURNING id round-trips");
314
315 let (sql, values) = Query::select()
316 .columns([Alias::new("id"), Alias::new("title")])
317 .from(Alias::new("sq"))
318 .and_where(Expr::col(Alias::new("id")).eq(7))
319 .build_any_sqlx(db.query_builder());
320 let row = sqlx::query_with(&sql, values)
321 .fetch_one(db.pool())
322 .await
323 .unwrap();
324 assert_eq!(row.get::<String, _>("title"), "hello");
325 }
326
327 #[tokio::test]
330 async fn unique_violations_map_to_409_conflict() {
331 let db = Db::connect("sqlite::memory:").await.unwrap();
332 sqlx::query("CREATE TABLE u (id INTEGER PRIMARY KEY, t TEXT)")
333 .execute(db.pool())
334 .await
335 .unwrap();
336 sqlx::query("INSERT INTO u (id, t) VALUES (1, 'a')")
337 .execute(db.pool())
338 .await
339 .unwrap();
340 let dup = sqlx::query("INSERT INTO u (id, t) VALUES (1, 'b')")
341 .execute(db.pool())
342 .await
343 .expect_err("duplicate pk must fail");
344 let e = db_error(dup);
345 assert_eq!(e.code(), "JC0409");
346 assert_eq!(e.status().as_u16(), 409);
347 assert!(!e.message().contains("sqlite"), "{}", e.message());
349 }
350
351 fn demo_migrations() -> Vec<Migration> {
352 vec![
353 Migration {
354 name: "0001_create_todos",
355 sqlite: "CREATE TABLE todos (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL)",
356 postgres: "CREATE TABLE todos (id BIGSERIAL PRIMARY KEY, title TEXT NOT NULL)",
357 },
358 Migration {
359 name: "0002_add_done",
360 sqlite: "ALTER TABLE todos ADD COLUMN done BOOLEAN NOT NULL DEFAULT 0",
361 postgres: "ALTER TABLE todos ADD COLUMN done BOOLEAN NOT NULL DEFAULT FALSE",
362 },
363 ]
364 }
365
366 #[tokio::test]
367 async fn migrations_apply_in_order_and_only_once() {
368 let db = Db::connect("sqlite::memory:").await.unwrap();
369 let applied = db.migrate(&demo_migrations()).await.unwrap();
370 assert_eq!(applied, vec!["0001_create_todos", "0002_add_done"]);
371
372 let applied = db.migrate(&demo_migrations()).await.unwrap();
374 assert!(applied.is_empty());
375
376 sqlx::query("INSERT INTO todos (title, done) VALUES (?, ?)")
378 .bind("x")
379 .bind(true)
380 .execute(db.pool())
381 .await
382 .unwrap();
383 }
384
385 #[tokio::test]
386 async fn owned_migrations_apply_in_order_and_only_once() {
387 let db = Db::connect("sqlite::memory:").await.unwrap();
388 let owned = vec![
389 OwnedMigration {
390 name: "0001_create_todos".into(),
391 sqlite:
392 "CREATE TABLE todos (id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT NOT NULL)"
393 .into(),
394 postgres: "CREATE TABLE todos (id BIGSERIAL PRIMARY KEY, title TEXT NOT NULL)"
395 .into(),
396 },
397 OwnedMigration {
398 name: "0002_add_done".into(),
399 sqlite: "ALTER TABLE todos ADD COLUMN done BOOLEAN NOT NULL DEFAULT 0".into(),
400 postgres: "ALTER TABLE todos ADD COLUMN done BOOLEAN NOT NULL DEFAULT FALSE".into(),
401 },
402 ];
403 let applied = db.migrate_owned(&owned).await.unwrap();
404 assert_eq!(applied, vec!["0001_create_todos", "0002_add_done"]);
405 let applied = db.migrate_owned(&owned).await.unwrap();
407 assert!(applied.is_empty());
408 }
409
410 #[tokio::test]
411 async fn a_failing_migration_surfaces_jc0510_and_is_not_recorded() {
412 let db = Db::connect("sqlite::memory:").await.unwrap();
413 let bad = vec![Migration {
414 name: "0001_broken",
415 sqlite: "CREATE GARBAGE",
416 postgres: "CREATE GARBAGE",
417 }];
418 let err = db.migrate(&bad).await.unwrap_err();
419 assert_eq!(err.code(), "JC0510");
420
421 let good = vec![Migration {
423 name: "0001_broken",
424 sqlite: "CREATE TABLE ok (x BIGINT)",
425 postgres: "CREATE TABLE ok (x BIGINT)",
426 }];
427 let applied = db.migrate(&good).await.unwrap();
428 assert_eq!(applied, vec!["0001_broken"]);
429 }
430}