Skip to main content

atrg_db/
lib.rs

1//! Database layer for at-rust-go: SQLite and/or PostgreSQL connection pooling
2//! and migrations.
3//!
4//! This crate provides a thin wrapper around `sqlx` exposing a [`DbPool`] enum
5//! that wraps either a SQLite or a PostgreSQL connection pool. The variants
6//! are gated behind cargo features:
7//!
8//! - `sqlite` *(default)* — pulls in the SQLite driver and enables
9//!   `DbPool::Sqlite`.
10//! - `postgres` *(optional)* — pulls in the PostgreSQL driver and enables
11//!   `DbPool::Postgres`.
12//!
13//! At runtime, [`connect`] inspects the URL scheme (`sqlite://`, `sqlite::memory:`
14//! → SQLite; `postgres://`, `postgresql://` → PostgreSQL) and returns the
15//! appropriate variant. If the matching driver was not compiled in, an error
16//! is returned.
17//!
18//! Internal migrations live under `migrations/sqlite/` and `migrations/postgres/`
19//! and are embedded at compile time; only the migrations matching the active
20//! pool variant are run.
21
22#![deny(unsafe_code)]
23#![warn(missing_docs)]
24
25#[cfg(not(any(feature = "sqlite", feature = "postgres")))]
26compile_error!("atrg-db requires at least one of the `sqlite` or `postgres` cargo features");
27
28#[cfg(feature = "sqlite")]
29use std::str::FromStr;
30
31#[cfg(feature = "sqlite")]
32use sqlx::SqlitePool;
33
34#[cfg(feature = "postgres")]
35use sqlx::PgPool;
36
37/// A database connection pool — either SQLite or PostgreSQL.
38///
39/// `DbPool` is the primary database handle threaded through atrg's
40/// [`AppState`](../atrg_core/struct.AppState.html). It is cheaply
41/// cloneable (the underlying sqlx pools are themselves `Arc`-based).
42#[derive(Clone)]
43pub enum DbPool {
44    /// A SQLite connection pool. Available when the `sqlite` cargo feature
45    /// is enabled (the default).
46    #[cfg(feature = "sqlite")]
47    Sqlite(SqlitePool),
48    /// A PostgreSQL connection pool. Available when the `postgres` cargo
49    /// feature is enabled.
50    #[cfg(feature = "postgres")]
51    Postgres(PgPool),
52}
53
54impl DbPool {
55    /// Borrow the inner SQLite pool, if any.
56    #[cfg(feature = "sqlite")]
57    pub fn as_sqlite(&self) -> Option<&SqlitePool> {
58        match self {
59            DbPool::Sqlite(pool) => Some(pool),
60            #[cfg(feature = "postgres")]
61            DbPool::Postgres(_) => None,
62        }
63    }
64
65    /// Borrow the inner PostgreSQL pool, if any.
66    #[cfg(feature = "postgres")]
67    pub fn as_postgres(&self) -> Option<&PgPool> {
68        match self {
69            DbPool::Postgres(pool) => Some(pool),
70            #[cfg(feature = "sqlite")]
71            DbPool::Sqlite(_) => None,
72        }
73    }
74
75    /// Returns a static string identifying the backend kind: `"sqlite"` or
76    /// `"postgres"`. Useful for diagnostics and tests.
77    pub fn backend(&self) -> &'static str {
78        match self {
79            #[cfg(feature = "sqlite")]
80            DbPool::Sqlite(_) => "sqlite",
81            #[cfg(feature = "postgres")]
82            DbPool::Postgres(_) => "postgres",
83        }
84    }
85
86    /// Close the pool, waiting for in-flight queries to complete.
87    pub async fn close(&self) {
88        match self {
89            #[cfg(feature = "sqlite")]
90            DbPool::Sqlite(p) => p.close().await,
91            #[cfg(feature = "postgres")]
92            DbPool::Postgres(p) => p.close().await,
93        }
94    }
95
96    /// Whether the underlying pool has been closed.
97    pub fn is_closed(&self) -> bool {
98        match self {
99            #[cfg(feature = "sqlite")]
100            DbPool::Sqlite(p) => p.is_closed(),
101            #[cfg(feature = "postgres")]
102            DbPool::Postgres(p) => p.is_closed(),
103        }
104    }
105
106    /// Run a trivial `SELECT 1` round-trip against the pool; used by the
107    /// `/readyz` health endpoint.
108    pub async fn ping(&self) -> anyhow::Result<()> {
109        match self {
110            #[cfg(feature = "sqlite")]
111            DbPool::Sqlite(p) => {
112                sqlx::query("SELECT 1").execute(p).await?;
113            }
114            #[cfg(feature = "postgres")]
115            DbPool::Postgres(p) => {
116                sqlx::query("SELECT 1").execute(p).await?;
117            }
118        }
119        Ok(())
120    }
121}
122
123impl std::fmt::Debug for DbPool {
124    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
125        f.debug_tuple("DbPool").field(&self.backend()).finish()
126    }
127}
128
129#[cfg(feature = "sqlite")]
130impl From<SqlitePool> for DbPool {
131    fn from(p: SqlitePool) -> Self {
132        DbPool::Sqlite(p)
133    }
134}
135
136#[cfg(feature = "postgres")]
137impl From<PgPool> for DbPool {
138    fn from(p: PgPool) -> Self {
139        DbPool::Postgres(p)
140    }
141}
142
143/// Backwards-compatible alias used throughout earlier versions of atrg.
144///
145/// New code should prefer [`DbPool`] directly.
146pub type DbConn = DbPool;
147
148/// Connect to a database, choosing the backend from the URL scheme.
149///
150/// - `sqlite://path` or `sqlite::memory:` → returns `DbPool::Sqlite`
151///   (requires the `sqlite` feature).
152/// - `postgres://...` or `postgresql://...` → returns `DbPool::Postgres`
153///   (requires the `postgres` feature).
154///
155/// SQLite pools are configured with `create_if_missing(true)`, WAL journal
156/// mode, and foreign keys enabled. PostgreSQL pools are configured with up
157/// to 8 connections.
158///
159/// # Examples
160///
161/// ```no_run
162/// # async fn example() -> anyhow::Result<()> {
163/// // SQLite (requires the `sqlite` feature)
164/// let pool = atrg_db::connect("sqlite://atrg.db").await?;
165/// // PostgreSQL (requires the `postgres` feature)
166/// // let pool = atrg_db::connect("postgres://user:pass@host/db").await?;
167/// # Ok(())
168/// # }
169/// ```
170pub async fn connect(url: &str) -> anyhow::Result<DbPool> {
171    let scheme = url.split(':').next().unwrap_or("").to_ascii_lowercase();
172    match scheme.as_str() {
173        "sqlite" => {
174            #[cfg(feature = "sqlite")]
175            {
176                let opts = sqlx::sqlite::SqliteConnectOptions::from_str(url)?
177                    .create_if_missing(true)
178                    .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
179                    .foreign_keys(true);
180
181                let pool = sqlx::sqlite::SqlitePoolOptions::new()
182                    .max_connections(8)
183                    .connect_with(opts)
184                    .await?;
185
186                tracing::info!("connected to SQLite database: {}", url);
187                Ok(DbPool::Sqlite(pool))
188            }
189            #[cfg(not(feature = "sqlite"))]
190            {
191                anyhow::bail!(
192                    "atrg-db was built without the `sqlite` feature; cannot open {}",
193                    url
194                )
195            }
196        }
197        "postgres" | "postgresql" => {
198            #[cfg(feature = "postgres")]
199            {
200                let pool = sqlx::postgres::PgPoolOptions::new()
201                    .max_connections(8)
202                    .connect(url)
203                    .await?;
204
205                tracing::info!("connected to PostgreSQL database");
206                Ok(DbPool::Postgres(pool))
207            }
208            #[cfg(not(feature = "postgres"))]
209            {
210                anyhow::bail!(
211                    "atrg-db was built without the `postgres` feature; \
212                     enable it (e.g. `cargo build --features atrg-db/postgres`) \
213                     to use {}",
214                    url
215                )
216            }
217        }
218        other => anyhow::bail!(
219            "unsupported database URL scheme `{}`; expected `sqlite://`, `postgres://`, or `postgresql://`",
220            other
221        ),
222    }
223}
224
225/// Run atrg's internal migrations against the active backend.
226///
227/// The migrations live under `migrations/<backend>/` in this crate and are
228/// embedded at compile time. They are idempotent and safe to run on every
229/// startup.
230///
231/// Uses the `_atrg_migrations` tracking table so that framework migrations
232/// never conflict with application migrations or other binaries sharing
233/// the same database.
234pub async fn run_internal_migrations(pool: &DbPool) -> anyhow::Result<()> {
235    match pool {
236        #[cfg(feature = "sqlite")]
237        DbPool::Sqlite(_) => {
238            let migrator = sqlx::migrate!("./migrations/sqlite");
239            let n = migrator.migrations.len();
240            run_migrator_with_table(pool, &migrator, "_atrg_migrations").await?;
241            tracing::info!(
242                count = n,
243                backend = "sqlite",
244                table = "_atrg_migrations",
245                "applied atrg internal migrations"
246            );
247        }
248        #[cfg(feature = "postgres")]
249        DbPool::Postgres(_) => {
250            let migrator = sqlx::migrate!("./migrations/postgres");
251            let n = migrator.migrations.len();
252            run_migrator_with_table(pool, &migrator, "_atrg_migrations").await?;
253            tracing::info!(
254                count = n,
255                backend = "postgres",
256                table = "_atrg_migrations",
257                "applied atrg internal migrations"
258            );
259        }
260    }
261    Ok(())
262}
263
264/// Run user-supplied migrations from `dir` against the active backend.
265///
266/// If the directory does not exist or contains no `.sql` files, this function
267/// returns `Ok(())` silently.
268///
269/// # Deprecation Notice
270///
271/// This function uses the default `_sqlx_migrations` tracking table which
272/// can conflict with other migration sets in the same database. Prefer
273/// [`run_isolated_migrations`] which uses a custom tracking table to
274/// prevent conflicts between framework migrations, app migrations, and
275/// multiple binaries sharing the same database.
276#[deprecated(
277    since = "0.2.0",
278    note = "Use `run_isolated_migrations` with a custom tracking table to avoid migration conflicts"
279)]
280pub async fn run_user_migrations(pool: &DbPool, dir: &std::path::Path) -> anyhow::Result<()> {
281    if !dir.exists() {
282        tracing::debug!(
283            path = %dir.display(),
284            "user migrations directory does not exist, skipping"
285        );
286        return Ok(());
287    }
288
289    let has_sql_files = std::fs::read_dir(dir)?
290        .filter_map(|entry| entry.ok())
291        .any(|entry| entry.path().extension().is_some_and(|ext| ext == "sql"));
292
293    if !has_sql_files {
294        tracing::debug!(
295            path = %dir.display(),
296            "user migrations directory contains no .sql files, skipping"
297        );
298        return Ok(());
299    }
300
301    let migrator = sqlx::migrate::Migrator::new(dir).await?;
302    let n = migrator.migrations.len();
303
304    match pool {
305        #[cfg(feature = "sqlite")]
306        DbPool::Sqlite(p) => migrator.run(p).await?,
307        #[cfg(feature = "postgres")]
308        DbPool::Postgres(p) => migrator.run(p).await?,
309    }
310
311    tracing::info!(
312        count = n,
313        path = %dir.display(),
314        backend = pool.backend(),
315        "applied user migrations (if pending)"
316    );
317
318    Ok(())
319}
320
321/// Run migrations from a directory using an isolated tracking table.
322///
323/// This prevents conflicts between framework migrations and app migrations,
324/// or between multiple app binaries sharing the same database. Each caller
325/// specifies its own `tracking_table` name (e.g. `"_myapp_migrations"`,
326/// `"_aggregator_migrations"`) and only migrations tracked in that table
327/// affect the outcome.
328///
329/// The function is idempotent: re-running with the same migrations and
330/// tracking table is a no-op for already-applied migrations.
331///
332/// # Arguments
333///
334/// * `pool` — the database connection pool (SQLite or PostgreSQL).
335/// * `dir` — path to the directory containing numbered `.sql` migration files.
336/// * `tracking_table` — the name of the table used to record which migrations
337///   have been applied. Must be a valid SQL identifier (letters, digits,
338///   underscores). The table is created automatically if it does not exist.
339///
340/// # Errors
341///
342/// Returns an error if:
343/// - The directory does not exist or cannot be read.
344/// - A migration file contains invalid SQL.
345/// - The database rejects a migration statement.
346/// - The `tracking_table` name contains invalid characters.
347///
348/// # Examples
349///
350/// ```no_run
351/// # async fn example(pool: &atrg_db::DbPool) -> anyhow::Result<()> {
352/// use std::path::Path;
353///
354/// // App migrations — isolated from atrg internals
355/// atrg_db::run_isolated_migrations(
356///     pool,
357///     Path::new("./migrations"),
358///     "_myapp_migrations",
359/// ).await?;
360///
361/// // A second binary's migrations in the same DB
362/// atrg_db::run_isolated_migrations(
363///     pool,
364///     Path::new("./aggregator_migrations"),
365///     "_aggregator_migrations",
366/// ).await?;
367/// # Ok(())
368/// # }
369/// ```
370pub async fn run_isolated_migrations(
371    pool: &DbPool,
372    dir: &std::path::Path,
373    tracking_table: &str,
374) -> anyhow::Result<()> {
375    // Validate the tracking table name to prevent SQL injection.
376    if tracking_table.is_empty()
377        || !tracking_table
378            .chars()
379            .all(|c| c.is_ascii_alphanumeric() || c == '_')
380    {
381        anyhow::bail!(
382            "invalid tracking table name `{}`; must contain only ASCII alphanumerics and underscores",
383            tracking_table
384        );
385    }
386
387    if !dir.exists() {
388        anyhow::bail!("migrations directory does not exist: {}", dir.display());
389    }
390
391    let migrator = sqlx::migrate::Migrator::new(dir).await?;
392    run_migrator_with_table(pool, &migrator, tracking_table).await?;
393
394    tracing::info!(
395        count = migrator.migrations.len(),
396        path = %dir.display(),
397        table = tracking_table,
398        backend = pool.backend(),
399        "applied isolated migrations (if pending)"
400    );
401
402    Ok(())
403}
404
405/// Validate a tracking table name. Returns an error if the name is empty or
406/// contains characters outside `[a-zA-Z0-9_]`.
407fn validate_table_name(name: &str) -> anyhow::Result<()> {
408    if name.is_empty() || !name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
409        anyhow::bail!(
410            "invalid tracking table name `{}`; must contain only ASCII alphanumerics and underscores",
411            name
412        );
413    }
414    Ok(())
415}
416
417/// Internal: run a `Migrator` using a custom tracking table instead of the
418/// default `_sqlx_migrations`. This is the core engine powering both
419/// [`run_internal_migrations`] and [`run_isolated_migrations`].
420async fn run_migrator_with_table(
421    pool: &DbPool,
422    migrator: &sqlx::migrate::Migrator,
423    tracking_table: &str,
424) -> anyhow::Result<()> {
425    validate_table_name(tracking_table)?;
426
427    match pool {
428        #[cfg(feature = "sqlite")]
429        DbPool::Sqlite(p) => run_migrator_sqlite(p, migrator, tracking_table).await,
430        #[cfg(feature = "postgres")]
431        DbPool::Postgres(p) => run_migrator_postgres(p, migrator, tracking_table).await,
432    }
433}
434
435#[cfg(feature = "sqlite")]
436async fn run_migrator_sqlite(
437    pool: &SqlitePool,
438    migrator: &sqlx::migrate::Migrator,
439    tracking_table: &str,
440) -> anyhow::Result<()> {
441    // Create the tracking table if it does not exist.
442    let create_sql = format!(
443        "CREATE TABLE IF NOT EXISTS \"{}\" (
444            version BIGINT PRIMARY KEY,
445            description TEXT NOT NULL,
446            checksum BLOB NOT NULL,
447            applied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
448        )",
449        tracking_table
450    );
451    sqlx::query(&create_sql).execute(pool).await?;
452
453    // Fetch already-applied versions.
454    let applied_sql = format!("SELECT version FROM \"{}\"", tracking_table);
455    let applied_rows: Vec<i64> = sqlx::query_scalar(&applied_sql).fetch_all(pool).await?;
456    let applied: std::collections::HashSet<i64> = applied_rows.into_iter().collect();
457
458    // Apply each pending migration in order.
459    for migration in migrator.migrations.iter() {
460        let version = migration.version;
461        if applied.contains(&version) {
462            continue;
463        }
464
465        tracing::debug!(
466            version = version,
467            description = %migration.description,
468            table = tracking_table,
469            "applying migration (sqlite)"
470        );
471
472        // Execute the migration SQL.
473        sqlx::raw_sql(migration.sql.as_ref())
474            .execute(pool)
475            .await
476            .map_err(|e| {
477                anyhow::anyhow!(
478                    "failed to apply migration {}: {} — {}",
479                    version,
480                    migration.description,
481                    e
482                )
483            })?;
484
485        // Record it in the tracking table.
486        let insert_sql = format!(
487            "INSERT INTO \"{}\" (version, description, checksum) VALUES (?, ?, ?)",
488            tracking_table
489        );
490        sqlx::query(&insert_sql)
491            .bind(version)
492            .bind(migration.description.as_ref())
493            .bind(migration.checksum.as_ref())
494            .execute(pool)
495            .await?;
496    }
497
498    Ok(())
499}
500
501#[cfg(feature = "postgres")]
502async fn run_migrator_postgres(
503    pool: &PgPool,
504    migrator: &sqlx::migrate::Migrator,
505    tracking_table: &str,
506) -> anyhow::Result<()> {
507    // Create the tracking table if it does not exist.
508    let create_sql = format!(
509        "CREATE TABLE IF NOT EXISTS \"{}\" (
510            version BIGINT PRIMARY KEY,
511            description TEXT NOT NULL,
512            checksum BYTEA NOT NULL,
513            applied_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
514        )",
515        tracking_table
516    );
517    sqlx::query(&create_sql).execute(pool).await?;
518
519    // Fetch already-applied versions.
520    let applied_sql = format!("SELECT version FROM \"{}\"", tracking_table);
521    let applied_rows: Vec<i64> = sqlx::query_scalar(&applied_sql).fetch_all(pool).await?;
522    let applied: std::collections::HashSet<i64> = applied_rows.into_iter().collect();
523
524    // Apply each pending migration in order.
525    for migration in migrator.migrations.iter() {
526        let version = migration.version;
527        if applied.contains(&version) {
528            continue;
529        }
530
531        tracing::debug!(
532            version = version,
533            description = %migration.description,
534            table = tracking_table,
535            "applying migration (postgres)"
536        );
537
538        // Execute the migration SQL.
539        sqlx::raw_sql(migration.sql.as_ref())
540            .execute(pool)
541            .await
542            .map_err(|e| {
543                anyhow::anyhow!(
544                    "failed to apply migration {}: {} — {}",
545                    version,
546                    migration.description,
547                    e
548                )
549            })?;
550
551        // Record it in the tracking table.
552        let insert_sql = format!(
553            "INSERT INTO \"{}\" (version, description, checksum) VALUES ($1, $2, $3)",
554            tracking_table
555        );
556        sqlx::query(&insert_sql)
557            .bind(version)
558            .bind(migration.description.as_ref())
559            .bind(migration.checksum.as_ref())
560            .execute(pool)
561            .await?;
562    }
563
564    Ok(())
565}
566
567#[cfg(all(test, feature = "sqlite"))]
568mod tests {
569    use super::*;
570
571    #[tokio::test]
572    async fn test_connect_memory() {
573        let pool = connect("sqlite::memory:").await.expect("connect");
574        assert_eq!(pool.backend(), "sqlite");
575        pool.ping().await.expect("ping");
576    }
577
578    #[tokio::test]
579    async fn test_internal_migrations() {
580        let pool = connect("sqlite::memory:").await.expect("connect");
581        run_internal_migrations(&pool)
582            .await
583            .expect("run internal migrations");
584
585        let sqlite = pool.as_sqlite().expect("sqlite pool");
586        let row: (String,) = sqlx::query_as(
587            "SELECT name FROM sqlite_master WHERE type='table' AND name='atrg_sessions'",
588        )
589        .fetch_one(sqlite)
590        .await
591        .expect("atrg_sessions exists");
592        assert_eq!(row.0, "atrg_sessions");
593    }
594
595    #[tokio::test]
596    async fn test_migrations_idempotent() {
597        let pool = connect("sqlite::memory:").await.expect("connect");
598        run_internal_migrations(&pool).await.expect("first run");
599        run_internal_migrations(&pool).await.expect("second run");
600    }
601
602    #[tokio::test]
603    #[allow(deprecated)]
604    async fn test_user_migrations_empty_dir() {
605        let pool = connect("sqlite::memory:").await.expect("connect");
606        let tmp_dir = std::env::temp_dir().join(format!("atrg_test_empty_{}", std::process::id()));
607        std::fs::create_dir_all(&tmp_dir).expect("mkdir");
608
609        let result = run_user_migrations(&pool, &tmp_dir).await;
610        let _ = std::fs::remove_dir_all(&tmp_dir);
611        result.expect("empty dir succeeds silently");
612    }
613
614    #[tokio::test]
615    #[allow(deprecated)]
616    async fn test_user_migrations_nonexistent_dir() {
617        let pool = connect("sqlite::memory:").await.expect("connect");
618        let nonexistent =
619            std::path::Path::new("/tmp/atrg_test_nonexistent_dir_that_does_not_exist");
620        run_user_migrations(&pool, nonexistent)
621            .await
622            .expect("nonexistent dir succeeds silently");
623    }
624
625    #[tokio::test]
626    async fn unsupported_scheme_errors() {
627        let err = connect("mysql://localhost/db").await.unwrap_err();
628        let msg = format!("{err}");
629        assert!(
630            msg.contains("unsupported database URL scheme"),
631            "got: {msg}"
632        );
633    }
634
635    #[cfg(not(feature = "postgres"))]
636    #[tokio::test]
637    async fn postgres_url_without_feature_errors() {
638        let err = connect("postgres://user:pass@localhost/db")
639            .await
640            .unwrap_err();
641        let msg = format!("{err}");
642        assert!(
643            msg.contains("postgres") && msg.contains("feature"),
644            "got: {msg}"
645        );
646    }
647
648    // ── Isolated migration tests ──────────────────────────────────────────
649
650    /// Helper: create a temp directory with a numbered SQL migration file.
651    fn write_migration(dir: &std::path::Path, filename: &str, sql: &str) {
652        std::fs::create_dir_all(dir).expect("create migration dir");
653        std::fs::write(dir.join(filename), sql).expect("write migration file");
654    }
655
656    #[tokio::test]
657    async fn test_isolated_migrations_two_sets_coexist() {
658        // Two different migration sets with different tracking tables
659        // can coexist in the same database without conflict.
660        let pool = connect("sqlite::memory:").await.expect("connect");
661        let sqlite = pool.as_sqlite().expect("sqlite pool");
662
663        let base =
664            std::env::temp_dir().join(format!("atrg_test_isolated_coexist_{}", std::process::id()));
665        let dir_a = base.join("migrations_a");
666        let dir_b = base.join("migrations_b");
667
668        // Migration set A: creates a "posts" table
669        write_migration(
670            &dir_a,
671            "20230101000000_create_posts.sql",
672            "CREATE TABLE posts (id INTEGER PRIMARY KEY, body TEXT NOT NULL);",
673        );
674
675        // Migration set B: creates a "follows" table
676        write_migration(
677            &dir_b,
678            "20230101000000_create_follows.sql",
679            "CREATE TABLE follows (id INTEGER PRIMARY KEY, follower TEXT NOT NULL, followee TEXT NOT NULL);",
680        );
681
682        // Run both sets with different tracking tables.
683        run_isolated_migrations(&pool, &dir_a, "_app_ring_migrations")
684            .await
685            .expect("ring migrations");
686        run_isolated_migrations(&pool, &dir_b, "_app_aggregator_migrations")
687            .await
688            .expect("aggregator migrations");
689
690        // Both tables should exist.
691        let posts: (String,) =
692            sqlx::query_as("SELECT name FROM sqlite_master WHERE type='table' AND name='posts'")
693                .fetch_one(sqlite)
694                .await
695                .expect("posts table exists");
696        assert_eq!(posts.0, "posts");
697
698        let follows: (String,) =
699            sqlx::query_as("SELECT name FROM sqlite_master WHERE type='table' AND name='follows'")
700                .fetch_one(sqlite)
701                .await
702                .expect("follows table exists");
703        assert_eq!(follows.0, "follows");
704
705        // Both tracking tables should exist and contain exactly one entry each.
706        let ring_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _app_ring_migrations")
707            .fetch_one(sqlite)
708            .await
709            .expect("ring tracking table");
710        assert_eq!(ring_count.0, 1);
711
712        let agg_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _app_aggregator_migrations")
713            .fetch_one(sqlite)
714            .await
715            .expect("aggregator tracking table");
716        assert_eq!(agg_count.0, 1);
717
718        // No `_sqlx_migrations` table should exist — we bypassed sqlx's default.
719        let sqlx_table: Option<(String,)> = sqlx::query_as(
720            "SELECT name FROM sqlite_master WHERE type='table' AND name='_sqlx_migrations'",
721        )
722        .fetch_optional(sqlite)
723        .await
724        .expect("query");
725        assert!(
726            sqlx_table.is_none(),
727            "_sqlx_migrations should NOT exist when using isolated migrations"
728        );
729
730        let _ = std::fs::remove_dir_all(&base);
731    }
732
733    #[tokio::test]
734    async fn test_isolated_migrations_idempotent() {
735        // Re-running the same migrations with the same tracking table is a no-op.
736        let pool = connect("sqlite::memory:").await.expect("connect");
737        let sqlite = pool.as_sqlite().expect("sqlite pool");
738
739        let dir = std::env::temp_dir().join(format!(
740            "atrg_test_isolated_idempotent_{}",
741            std::process::id()
742        ));
743        write_migration(
744            &dir,
745            "20230601000000_create_items.sql",
746            "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
747        );
748
749        // First run.
750        run_isolated_migrations(&pool, &dir, "_test_idempotent")
751            .await
752            .expect("first run");
753
754        // Second run — should not fail (CREATE TABLE would fail if re-executed).
755        run_isolated_migrations(&pool, &dir, "_test_idempotent")
756            .await
757            .expect("second run (idempotent)");
758
759        // Only one tracking row.
760        let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _test_idempotent")
761            .fetch_one(sqlite)
762            .await
763            .expect("count");
764        assert_eq!(count.0, 1);
765
766        let _ = std::fs::remove_dir_all(&dir);
767    }
768
769    #[tokio::test]
770    async fn test_isolated_migrations_multiple_files_ordered() {
771        // Multiple migration files are applied in version order.
772        let pool = connect("sqlite::memory:").await.expect("connect");
773        let sqlite = pool.as_sqlite().expect("sqlite pool");
774
775        let dir =
776            std::env::temp_dir().join(format!("atrg_test_isolated_ordered_{}", std::process::id()));
777        write_migration(
778            &dir,
779            "20230101000000_create_alpha.sql",
780            "CREATE TABLE alpha (id INTEGER PRIMARY KEY);",
781        );
782        write_migration(
783            &dir,
784            "20230102000000_create_beta.sql",
785            "CREATE TABLE beta (id INTEGER PRIMARY KEY, alpha_id INTEGER REFERENCES alpha(id));",
786        );
787
788        run_isolated_migrations(&pool, &dir, "_test_ordered")
789            .await
790            .expect("ordered migrations");
791
792        // Both tables created.
793        let count: (i64,) = sqlx::query_as(
794            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name IN ('alpha', 'beta')",
795        )
796        .fetch_one(sqlite)
797        .await
798        .expect("count tables");
799        assert_eq!(count.0, 2);
800
801        // Tracking table has 2 rows.
802        let track_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _test_ordered")
803            .fetch_one(sqlite)
804            .await
805            .expect("tracking count");
806        assert_eq!(track_count.0, 2);
807
808        let _ = std::fs::remove_dir_all(&dir);
809    }
810
811    #[tokio::test]
812    async fn test_isolated_migrations_does_not_conflict_with_internal() {
813        // Internal (atrg) migrations and isolated app migrations coexist.
814        let pool = connect("sqlite::memory:").await.expect("connect");
815        let sqlite = pool.as_sqlite().expect("sqlite pool");
816
817        // Run atrg's internal migrations first.
818        run_internal_migrations(&pool)
819            .await
820            .expect("internal migrations");
821
822        // Now run app migrations with a separate tracking table.
823        let dir =
824            std::env::temp_dir().join(format!("atrg_test_no_conflict_{}", std::process::id()));
825        write_migration(
826            &dir,
827            "20230101000000_create_widgets.sql",
828            "CREATE TABLE widgets (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
829        );
830
831        run_isolated_migrations(&pool, &dir, "_myapp_migrations")
832            .await
833            .expect("app migrations");
834
835        // atrg_sessions (from internal) and widgets (from app) both exist.
836        let sessions: (String,) = sqlx::query_as(
837            "SELECT name FROM sqlite_master WHERE type='table' AND name='atrg_sessions'",
838        )
839        .fetch_one(sqlite)
840        .await
841        .expect("atrg_sessions");
842        assert_eq!(sessions.0, "atrg_sessions");
843
844        let widgets: (String,) =
845            sqlx::query_as("SELECT name FROM sqlite_master WHERE type='table' AND name='widgets'")
846                .fetch_one(sqlite)
847                .await
848                .expect("widgets");
849        assert_eq!(widgets.0, "widgets");
850
851        // Tracking tables are separate.
852        let atrg_tracking: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _atrg_migrations")
853            .fetch_one(sqlite)
854            .await
855            .expect("atrg tracking");
856        assert!(atrg_tracking.0 >= 1);
857
858        let app_tracking: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM _myapp_migrations")
859            .fetch_one(sqlite)
860            .await
861            .expect("app tracking");
862        assert_eq!(app_tracking.0, 1);
863
864        let _ = std::fs::remove_dir_all(&dir);
865    }
866
867    #[tokio::test]
868    async fn test_isolated_migrations_invalid_table_name() {
869        let pool = connect("sqlite::memory:").await.expect("connect");
870
871        let dir =
872            std::env::temp_dir().join(format!("atrg_test_invalid_name_{}", std::process::id()));
873        write_migration(&dir, "20230101000000_noop.sql", "SELECT 1;");
874
875        // Empty name.
876        let err = run_isolated_migrations(&pool, &dir, "").await.unwrap_err();
877        assert!(
878            format!("{err}").contains("invalid tracking table name"),
879            "got: {err}"
880        );
881
882        // Name with SQL injection attempt.
883        let err = run_isolated_migrations(&pool, &dir, "foo; DROP TABLE--")
884            .await
885            .unwrap_err();
886        assert!(
887            format!("{err}").contains("invalid tracking table name"),
888            "got: {err}"
889        );
890
891        // Name with spaces.
892        let err = run_isolated_migrations(&pool, &dir, "has spaces")
893            .await
894            .unwrap_err();
895        assert!(
896            format!("{err}").contains("invalid tracking table name"),
897            "got: {err}"
898        );
899
900        let _ = std::fs::remove_dir_all(&dir);
901    }
902
903    #[tokio::test]
904    async fn test_isolated_migrations_nonexistent_dir_errors() {
905        let pool = connect("sqlite::memory:").await.expect("connect");
906        let nonexistent = std::path::Path::new("/tmp/atrg_test_isolated_nonexistent_dir_xyzzy");
907        let err = run_isolated_migrations(&pool, nonexistent, "_test")
908            .await
909            .unwrap_err();
910        assert!(format!("{err}").contains("does not exist"), "got: {err}");
911    }
912
913    #[tokio::test]
914    async fn test_internal_migrations_use_atrg_tracking_table() {
915        // Verify that run_internal_migrations uses `_atrg_migrations`
916        // (not `_sqlx_migrations`).
917        let pool = connect("sqlite::memory:").await.expect("connect");
918        let sqlite = pool.as_sqlite().expect("sqlite pool");
919
920        run_internal_migrations(&pool)
921            .await
922            .expect("internal migrations");
923
924        // _atrg_migrations should exist.
925        let tracking: (String,) = sqlx::query_as(
926            "SELECT name FROM sqlite_master WHERE type='table' AND name='_atrg_migrations'",
927        )
928        .fetch_one(sqlite)
929        .await
930        .expect("_atrg_migrations exists");
931        assert_eq!(tracking.0, "_atrg_migrations");
932
933        // _sqlx_migrations should NOT exist.
934        let sqlx_table: Option<(String,)> = sqlx::query_as(
935            "SELECT name FROM sqlite_master WHERE type='table' AND name='_sqlx_migrations'",
936        )
937        .fetch_optional(sqlite)
938        .await
939        .expect("query");
940        assert!(
941            sqlx_table.is_none(),
942            "_sqlx_migrations should NOT exist; internal migrations must use _atrg_migrations"
943        );
944    }
945}