Skip to main content

rivet/state/
mod.rs

1use rusqlite::Connection;
2
3use crate::error::Result;
4
5mod checkpoint;
6mod cursor;
7mod file_log;
8mod journal_store;
9mod metrics;
10mod progression;
11mod run_aggregate;
12mod schema;
13mod shape;
14
15// Re-export domain types so callers use `rivet::state::*` unchanged.
16// Items below may not be explicitly named by all internal callers (often used
17// as inferred return types), but are part of the public integration-test API.
18#[allow(unused_imports)]
19pub use checkpoint::ChunkTaskInfo;
20#[allow(unused_imports)]
21pub use file_log::FileRecord;
22#[allow(unused_imports)]
23pub use metrics::ExportMetric;
24#[allow(unused_imports)]
25pub use progression::{Boundary, ExportProgression};
26#[allow(unused_imports)]
27pub use run_aggregate::{RunAggregate, RunAggregateEntry};
28#[allow(unused_imports)]
29pub use schema::{SchemaChange, SchemaColumn, arrow_schema_to_columns, schema_fingerprint};
30#[allow(unused_imports)]
31pub use shape::ShapeWarning;
32
33const STATE_DB_NAME: &str = ".rivet_state.db";
34
35/// Current schema version — always the last entry in `MIGRATIONS`.
36const SCHEMA_VERSION: i64 = MIGRATIONS[MIGRATIONS.len() - 1].0;
37
38/// Each entry is `(version, sql)`.  Applied in order when the DB is behind.
39const MIGRATIONS: &[(i64, &str)] = &[
40    // v1: core tables
41    (
42        1,
43        "CREATE TABLE IF NOT EXISTS export_state (
44            export_name TEXT PRIMARY KEY,
45            last_cursor_value TEXT,
46            last_run_at TEXT
47        );
48        CREATE TABLE IF NOT EXISTS export_metrics (
49            id INTEGER PRIMARY KEY AUTOINCREMENT,
50            export_name TEXT NOT NULL,
51            run_at TEXT NOT NULL,
52            duration_ms INTEGER NOT NULL,
53            total_rows INTEGER NOT NULL,
54            peak_rss_mb INTEGER,
55            status TEXT NOT NULL,
56            error_message TEXT,
57            tuning_profile TEXT,
58            format TEXT,
59            mode TEXT,
60            files_produced INTEGER DEFAULT 0,
61            bytes_written INTEGER DEFAULT 0,
62            retries INTEGER DEFAULT 0,
63            validated INTEGER,
64            schema_changed INTEGER,
65            run_id TEXT
66        );
67        CREATE TABLE IF NOT EXISTS export_schema (
68            export_name TEXT PRIMARY KEY,
69            columns_json TEXT NOT NULL,
70            updated_at TEXT NOT NULL
71        );
72        CREATE TABLE IF NOT EXISTS file_manifest (
73            id INTEGER PRIMARY KEY AUTOINCREMENT,
74            run_id TEXT NOT NULL,
75            export_name TEXT NOT NULL,
76            file_name TEXT NOT NULL,
77            row_count INTEGER NOT NULL,
78            bytes INTEGER NOT NULL,
79            format TEXT NOT NULL,
80            compression TEXT,
81            created_at TEXT NOT NULL
82        );",
83    ),
84    // v2: chunk checkpoint tables
85    (
86        2,
87        "CREATE TABLE IF NOT EXISTS chunk_run (
88            run_id TEXT PRIMARY KEY,
89            export_name TEXT NOT NULL,
90            plan_hash TEXT NOT NULL,
91            status TEXT NOT NULL,
92            max_chunk_attempts INTEGER NOT NULL DEFAULT 3,
93            created_at TEXT NOT NULL,
94            updated_at TEXT NOT NULL
95        );
96        CREATE INDEX IF NOT EXISTS idx_chunk_run_export_status
97            ON chunk_run(export_name, status);
98        CREATE TABLE IF NOT EXISTS chunk_task (
99            id INTEGER PRIMARY KEY AUTOINCREMENT,
100            run_id TEXT NOT NULL,
101            chunk_index INTEGER NOT NULL,
102            start_key TEXT NOT NULL,
103            end_key TEXT NOT NULL,
104            status TEXT NOT NULL,
105            attempts INTEGER NOT NULL DEFAULT 0,
106            last_error TEXT,
107            rows_written INTEGER,
108            file_name TEXT,
109            updated_at TEXT NOT NULL,
110            UNIQUE(run_id, chunk_index)
111        );
112        CREATE INDEX IF NOT EXISTS idx_chunk_task_run_status ON chunk_task(run_id, status);",
113    ),
114    // v3: index on file_manifest for faster per-export lookups
115    (
116        3,
117        "CREATE INDEX IF NOT EXISTS idx_file_manifest_export ON file_manifest(export_name, id DESC);",
118    ),
119    // v4: committed / verified boundary tracking (ADR-0008, Epic G)
120    (
121        4,
122        "CREATE TABLE IF NOT EXISTS export_progression (
123            export_name TEXT PRIMARY KEY,
124            last_committed_strategy TEXT,
125            last_committed_cursor TEXT,
126            last_committed_chunk_index INTEGER,
127            last_committed_run_id TEXT,
128            last_committed_at TEXT,
129            last_verified_strategy TEXT,
130            last_verified_cursor TEXT,
131            last_verified_chunk_index INTEGER,
132            last_verified_run_id TEXT,
133            last_verified_at TEXT
134        );",
135    ),
136    // v5: aggregate run summary
137    (
138        5,
139        "CREATE TABLE IF NOT EXISTS run_aggregate (
140            run_aggregate_id TEXT PRIMARY KEY,
141            started_at TEXT NOT NULL,
142            finished_at TEXT NOT NULL,
143            duration_ms INTEGER NOT NULL,
144            config_path TEXT,
145            parallel_mode TEXT NOT NULL,
146            total_exports INTEGER NOT NULL,
147            success_count INTEGER NOT NULL,
148            failed_count INTEGER NOT NULL,
149            skipped_count INTEGER NOT NULL,
150            total_rows INTEGER NOT NULL,
151            total_files INTEGER NOT NULL,
152            total_bytes INTEGER NOT NULL,
153            details_json TEXT NOT NULL
154        );
155        CREATE INDEX IF NOT EXISTS idx_run_aggregate_finished
156            ON run_aggregate(finished_at DESC);",
157    ),
158    // v6: per-column data shape stats
159    (
160        6,
161        "CREATE TABLE IF NOT EXISTS export_shape (
162            export_name TEXT NOT NULL,
163            column_name TEXT NOT NULL,
164            max_byte_len INTEGER NOT NULL,
165            updated_at TEXT NOT NULL,
166            PRIMARY KEY (export_name, column_name)
167        );",
168    ),
169    // v7: structured run journal
170    (
171        7,
172        "CREATE TABLE IF NOT EXISTS run_journal (
173            run_id TEXT PRIMARY KEY,
174            export_name TEXT NOT NULL,
175            finished_at TEXT NOT NULL,
176            journal_json TEXT NOT NULL
177        );
178        CREATE INDEX IF NOT EXISTS idx_run_journal_export
179            ON run_journal(export_name, finished_at DESC);",
180    ),
181    // v8: rename file_manifest → file_log.  The 0.7.0 cloud-output contract
182    // reclaims the "manifest" name for the public JSON artifact; the internal
183    // SQLite log of written files becomes `file_log` to remove the overload.
184    (
185        8,
186        "ALTER TABLE file_manifest RENAME TO file_log;
187        DROP INDEX IF EXISTS idx_file_manifest_export;
188        CREATE INDEX IF NOT EXISTS idx_file_log_export ON file_log(export_name, id DESC);",
189    ),
190];
191
192/// PostgreSQL-compatible DDL.  Column types differ from SQLite (BIGSERIAL,
193/// BOOLEAN); placeholder style is `$N` (handled by callers via `pg_sql()`).
194const PG_MIGRATIONS: &[(i64, &str)] = &[
195    (
196        1,
197        "CREATE TABLE IF NOT EXISTS export_state (
198            export_name TEXT PRIMARY KEY,
199            last_cursor_value TEXT,
200            last_run_at TEXT
201        );
202        CREATE TABLE IF NOT EXISTS export_metrics (
203            id BIGSERIAL PRIMARY KEY,
204            export_name TEXT NOT NULL,
205            run_at TEXT NOT NULL,
206            duration_ms BIGINT NOT NULL,
207            total_rows BIGINT NOT NULL,
208            peak_rss_mb BIGINT,
209            status TEXT NOT NULL,
210            error_message TEXT,
211            tuning_profile TEXT,
212            format TEXT,
213            mode TEXT,
214            files_produced BIGINT DEFAULT 0,
215            bytes_written BIGINT DEFAULT 0,
216            retries BIGINT DEFAULT 0,
217            validated BOOLEAN,
218            schema_changed BOOLEAN,
219            run_id TEXT
220        );
221        CREATE TABLE IF NOT EXISTS export_schema (
222            export_name TEXT PRIMARY KEY,
223            columns_json TEXT NOT NULL,
224            updated_at TEXT NOT NULL
225        );
226        CREATE TABLE IF NOT EXISTS file_manifest (
227            id BIGSERIAL PRIMARY KEY,
228            run_id TEXT NOT NULL,
229            export_name TEXT NOT NULL,
230            file_name TEXT NOT NULL,
231            row_count BIGINT NOT NULL,
232            bytes BIGINT NOT NULL,
233            format TEXT NOT NULL,
234            compression TEXT,
235            created_at TEXT NOT NULL
236        );",
237    ),
238    (
239        2,
240        "CREATE TABLE IF NOT EXISTS chunk_run (
241            run_id TEXT PRIMARY KEY,
242            export_name TEXT NOT NULL,
243            plan_hash TEXT NOT NULL,
244            status TEXT NOT NULL,
245            max_chunk_attempts BIGINT NOT NULL DEFAULT 3,
246            created_at TEXT NOT NULL,
247            updated_at TEXT NOT NULL
248        );
249        CREATE INDEX IF NOT EXISTS idx_chunk_run_export_status
250            ON chunk_run(export_name, status);
251        CREATE TABLE IF NOT EXISTS chunk_task (
252            id BIGSERIAL PRIMARY KEY,
253            run_id TEXT NOT NULL,
254            chunk_index BIGINT NOT NULL,
255            start_key TEXT NOT NULL,
256            end_key TEXT NOT NULL,
257            status TEXT NOT NULL,
258            attempts BIGINT NOT NULL DEFAULT 0,
259            last_error TEXT,
260            rows_written BIGINT,
261            file_name TEXT,
262            updated_at TEXT NOT NULL,
263            UNIQUE(run_id, chunk_index)
264        );
265        CREATE INDEX IF NOT EXISTS idx_chunk_task_run_status ON chunk_task(run_id, status);",
266    ),
267    (
268        3,
269        "CREATE INDEX IF NOT EXISTS idx_file_manifest_export ON file_manifest(export_name, id DESC);",
270    ),
271    (
272        4,
273        "CREATE TABLE IF NOT EXISTS export_progression (
274            export_name TEXT PRIMARY KEY,
275            last_committed_strategy TEXT,
276            last_committed_cursor TEXT,
277            last_committed_chunk_index BIGINT,
278            last_committed_run_id TEXT,
279            last_committed_at TEXT,
280            last_verified_strategy TEXT,
281            last_verified_cursor TEXT,
282            last_verified_chunk_index BIGINT,
283            last_verified_run_id TEXT,
284            last_verified_at TEXT
285        );",
286    ),
287    (
288        5,
289        "CREATE TABLE IF NOT EXISTS run_aggregate (
290            run_aggregate_id TEXT PRIMARY KEY,
291            started_at TEXT NOT NULL,
292            finished_at TEXT NOT NULL,
293            duration_ms BIGINT NOT NULL,
294            config_path TEXT,
295            parallel_mode TEXT NOT NULL,
296            total_exports BIGINT NOT NULL,
297            success_count BIGINT NOT NULL,
298            failed_count BIGINT NOT NULL,
299            skipped_count BIGINT NOT NULL,
300            total_rows BIGINT NOT NULL,
301            total_files BIGINT NOT NULL,
302            total_bytes BIGINT NOT NULL,
303            details_json TEXT NOT NULL
304        );
305        CREATE INDEX IF NOT EXISTS idx_run_aggregate_finished
306            ON run_aggregate(finished_at DESC);",
307    ),
308    (
309        6,
310        "CREATE TABLE IF NOT EXISTS export_shape (
311            export_name TEXT NOT NULL,
312            column_name TEXT NOT NULL,
313            max_byte_len BIGINT NOT NULL,
314            updated_at TEXT NOT NULL,
315            PRIMARY KEY (export_name, column_name)
316        );",
317    ),
318    (
319        7,
320        "CREATE TABLE IF NOT EXISTS run_journal (
321            run_id TEXT PRIMARY KEY,
322            export_name TEXT NOT NULL,
323            finished_at TEXT NOT NULL,
324            journal_json TEXT NOT NULL
325        );
326        CREATE INDEX IF NOT EXISTS idx_run_journal_export
327            ON run_journal(export_name, finished_at DESC);",
328    ),
329    // v8: rename file_manifest → file_log.  Mirrors the SQLite v8 migration;
330    // see the SQLite array for rationale.
331    (
332        8,
333        "ALTER TABLE file_manifest RENAME TO file_log;
334        DROP INDEX IF EXISTS idx_file_manifest_export;
335        CREATE INDEX IF NOT EXISTS idx_file_log_export ON file_log(export_name, id DESC);",
336    ),
337];
338
339// ─── SQL helpers ──────────────────────────────────────────────────────────────
340
341/// Convert SQLite `?N` placeholders to PostgreSQL `$N` style.
342/// `"WHERE x = ?1 AND y = ?2"` → `"WHERE x = $1 AND y = $2"`.
343pub(super) fn pg_sql(sql: &str) -> String {
344    let bytes = sql.as_bytes();
345    let mut out = String::with_capacity(sql.len());
346    let mut i = 0;
347    while i < bytes.len() {
348        if bytes[i] == b'?' && i + 1 < bytes.len() && bytes[i + 1].is_ascii_digit() {
349            out.push('$');
350        } else {
351            out.push(bytes[i] as char);
352        }
353        i += 1;
354    }
355    out
356}
357
358// ─── Backend connection ────────────────────────────────────────────────────────
359
360/// Internal storage for the active database connection.
361pub(super) enum StateConn {
362    Sqlite(rusqlite::Connection),
363    /// postgres::Client requires `&mut self` for queries; RefCell provides
364    /// interior mutability so `StateStore` methods can keep `&self` signatures.
365    /// StateStore is not Sync (neither backend is), so RefCell is safe here.
366    /// Boxed to keep the enum variant sizes balanced (postgres::Client is ~320 B).
367    Postgres(Box<std::cell::RefCell<postgres::Client>>),
368}
369
370/// Serialisable reference that identifies a state database without holding a
371/// live connection.  Passed to parallel chunk workers so they can open their
372/// own connection for atomic `claim_next_chunk_task` operations.
373#[derive(Clone)]
374pub enum StateRef {
375    Sqlite(std::path::PathBuf),
376    Postgres(String),
377}
378
379// ─── SQLite migration ─────────────────────────────────────────────────────────
380
381fn ensure_schema_version_table(conn: &Connection) {
382    let _ = conn.execute_batch(
383        "CREATE TABLE IF NOT EXISTS schema_version (
384            version INTEGER NOT NULL
385        );",
386    );
387}
388
389fn get_current_version(conn: &Connection) -> i64 {
390    conn.query_row(
391        "SELECT COALESCE(MAX(version), 0) FROM schema_version",
392        [],
393        |row| row.get(0),
394    )
395    .unwrap_or(0)
396}
397
398fn migrate(conn: &Connection) -> Result<()> {
399    ensure_schema_version_table(conn);
400
401    let current = get_current_version(conn);
402
403    if current == 0 {
404        let has_export_state: bool = conn
405            .query_row(
406                "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='export_state'",
407                [],
408                |row| row.get(0),
409            )
410            .unwrap_or(false);
411
412        if has_export_state {
413            let metrics_cols = [
414                "files_produced INTEGER DEFAULT 0",
415                "bytes_written INTEGER DEFAULT 0",
416                "retries INTEGER DEFAULT 0",
417                "validated INTEGER",
418                "schema_changed INTEGER",
419                "run_id TEXT",
420            ];
421            for col_def in &metrics_cols {
422                let sql = format!("ALTER TABLE export_metrics ADD COLUMN {}", col_def);
423                let _ = conn.execute(&sql, []);
424            }
425        }
426    }
427
428    for &(ver, sql) in MIGRATIONS {
429        if ver > current {
430            log::debug!("state: applying migration v{}", ver);
431            let atomic_sql = format!(
432                "BEGIN;\n{}\nINSERT INTO schema_version (version) VALUES ({});\nCOMMIT;",
433                sql, ver
434            );
435            conn.execute_batch(&atomic_sql)
436                .map_err(|e| anyhow::anyhow!("state: migration v{} failed: {}", ver, e))?;
437        }
438    }
439
440    let _ = conn.execute(
441        "DELETE FROM schema_version WHERE version < (SELECT MAX(version) FROM schema_version)",
442        [],
443    );
444
445    let final_version = get_current_version(conn);
446    if final_version != SCHEMA_VERSION {
447        anyhow::bail!(
448            "state: migration incomplete — expected schema v{} but reached v{}",
449            SCHEMA_VERSION,
450            final_version
451        );
452    }
453
454    Ok(())
455}
456
457// ─── PostgreSQL migration ─────────────────────────────────────────────────────
458
459fn migrate_pg(client: &mut postgres::Client) -> Result<()> {
460    client
461        .batch_execute("CREATE TABLE IF NOT EXISTS rivet_schema_version (version BIGINT NOT NULL);")
462        .map_err(|e| anyhow::anyhow!("state(pg): create version table: {:#}", e))?;
463
464    let current: i64 = client
465        .query_one(
466            "SELECT COALESCE(MAX(version), 0) FROM rivet_schema_version",
467            &[],
468        )
469        .map_err(|e| anyhow::anyhow!("state(pg): read schema version: {:#}", e))?
470        .get(0);
471
472    for &(ver, sql) in PG_MIGRATIONS {
473        if ver > current {
474            log::debug!("state(pg): applying migration v{}", ver);
475            let batch = format!(
476                "BEGIN; {} INSERT INTO rivet_schema_version (version) VALUES ({}); COMMIT;",
477                sql, ver
478            );
479            client
480                .batch_execute(&batch)
481                .map_err(|e| anyhow::anyhow!("state(pg): migration v{} failed: {:#}", ver, e))?;
482        }
483    }
484
485    // Remove superseded version rows so MAX() stays unambiguous (mirrors SQLite behaviour).
486    let _ = client.batch_execute(
487        "DELETE FROM rivet_schema_version \
488         WHERE version < (SELECT MAX(version) FROM rivet_schema_version);",
489    );
490
491    // Verify the DB actually reached the expected version.
492    let final_version: i64 = client
493        .query_one(
494            "SELECT COALESCE(MAX(version), 0) FROM rivet_schema_version",
495            &[],
496        )
497        .map_err(|e| anyhow::anyhow!("state(pg): read final schema version: {:#}", e))?
498        .get(0);
499    if final_version != SCHEMA_VERSION {
500        anyhow::bail!(
501            "state(pg): migration incomplete — expected schema v{} but reached v{}",
502            SCHEMA_VERSION,
503            final_version
504        );
505    }
506
507    Ok(())
508}
509
510/// Redact the password from a PostgreSQL URL for safe use in log/error messages.
511/// `postgresql://user:SECRET@host/db` → `postgresql://user:***@host/db`
512/// Uses `rfind('@')` so passwords containing `@` are handled correctly.
513fn redact_pg_url(url: &str) -> String {
514    if let Some(at_pos) = url.rfind('@')
515        && let Some(scheme_end) = url.find("://")
516    {
517        let authority = &url[scheme_end + 3..at_pos];
518        if let Some(colon) = authority.rfind(':') {
519            let user = &authority[..colon];
520            return format!(
521                "{}://{}:***@{}",
522                &url[..scheme_end],
523                user,
524                &url[at_pos + 1..]
525            );
526        }
527    }
528    url.to_string()
529}
530
531// ─── SQLite connection helper ─────────────────────────────────────────────────
532
533pub(crate) const SQLITE_BUSY_TIMEOUT_MS: i64 = 10_000;
534
535pub(crate) fn open_connection(db_path: &std::path::Path) -> Result<Connection> {
536    let conn = Connection::open(db_path)?;
537    if let Err(e) = conn.execute_batch("PRAGMA journal_mode=WAL;") {
538        log::warn!(
539            "state: WAL journal mode unavailable ({}); \
540             running in default mode — concurrent writes may be slower",
541            e
542        );
543    }
544    if let Err(e) = conn.execute_batch(&format!(
545        "PRAGMA busy_timeout = {};",
546        SQLITE_BUSY_TIMEOUT_MS
547    )) {
548        log::warn!(
549            "state: failed to set busy_timeout ({}); \
550             concurrent writers may surface SQLITE_BUSY immediately",
551            e
552        );
553    }
554    Ok(conn)
555}
556
557// ─── StateStore ───────────────────────────────────────────────────────────────
558
559/// Entry point for all persistent state.  Supports two backends:
560///
561/// - **SQLite** (default) — a single `.rivet_state.db` file next to the
562///   config.  Good for local / single-node / dev deployments.
563/// - **PostgreSQL** — a shared database addressed by `RIVET_STATE_URL`.
564///   Required for stateless container / Kubernetes deployments where the
565///   rivet pod is ephemeral or replicated.
566///
567/// Set the `RIVET_STATE_URL` environment variable to a PostgreSQL URL to
568/// activate the Postgres backend:
569///
570/// ```text
571/// RIVET_STATE_URL=postgresql://user:pass@host:5432/rivet_state
572/// ```
573///
574/// When the variable is absent or does not start with `postgres`, SQLite is
575/// used and the variable is ignored.
576pub struct StateStore {
577    pub(super) conn: StateConn,
578    /// Serialisable reference for reconnection (parallel chunk workers).
579    pub(super) state_ref: StateRef,
580}
581
582impl StateStore {
583    /// Open the appropriate backend.
584    ///
585    /// Checks `RIVET_STATE_URL`; falls back to SQLite next to `config_path`.
586    pub fn open(config_path: &str) -> Result<Self> {
587        if let Ok(url) = std::env::var("RIVET_STATE_URL")
588            && url.starts_with("postgres")
589        {
590            return Self::open_postgres(&url);
591        }
592        Self::open_sqlite(config_path)
593    }
594
595    fn open_sqlite(config_path: &str) -> Result<Self> {
596        let config_dir = std::path::Path::new(config_path)
597            .parent()
598            .unwrap_or(std::path::Path::new("."));
599        let db_path = config_dir.join(STATE_DB_NAME);
600        let conn = open_connection(&db_path)?;
601        migrate(&conn)?;
602        Ok(Self {
603            conn: StateConn::Sqlite(conn),
604            state_ref: StateRef::Sqlite(db_path),
605        })
606    }
607
608    fn open_postgres(url: &str) -> Result<Self> {
609        let is_local =
610            url.contains("localhost") || url.contains("127.0.0.1") || url.contains("::1");
611        if !is_local {
612            log::warn!(
613                "state(pg): connecting to a remote host without TLS; \
614                 set RIVET_STATE_URL to a sslmode=require URL for production use"
615            );
616        }
617        let mut client = postgres::Client::connect(url, postgres::NoTls).map_err(|e| {
618            anyhow::anyhow!("state(pg): connect to '{}': {:#}", redact_pg_url(url), e)
619        })?;
620        migrate_pg(&mut client)?;
621        Ok(Self {
622            conn: StateConn::Postgres(Box::new(std::cell::RefCell::new(client))),
623            state_ref: StateRef::Postgres(url.to_string()),
624        })
625    }
626
627    /// Path to `.rivet_state.db` for SQLite deployments.  Returns the config
628    /// directory path for Postgres (not meaningful for connection, only used
629    /// by legacy callers — prefer `state_ref()` for new code).
630    pub fn state_db_path(config_path: &str) -> std::path::PathBuf {
631        let config_dir = std::path::Path::new(config_path)
632            .parent()
633            .unwrap_or(std::path::Path::new("."));
634        config_dir.join(STATE_DB_NAME)
635    }
636
637    /// Serialisable connection reference for parallel chunk workers.
638    pub fn state_ref(&self) -> &StateRef {
639        &self.state_ref
640    }
641
642    /// In-memory SQLite store for unit tests.
643    #[allow(dead_code)]
644    pub fn open_in_memory() -> Result<Self> {
645        let conn = Connection::open_in_memory()?;
646        migrate(&conn)?;
647        Ok(Self {
648            conn: StateConn::Sqlite(conn),
649            state_ref: StateRef::Sqlite(std::path::PathBuf::from(":memory:")),
650        })
651    }
652
653    /// Open a SQLite store at an explicit file path (tests that need
654    /// cross-connection access via `claim_next_chunk_task_at_path`).
655    #[allow(dead_code)]
656    pub fn open_at_path(db_path: &std::path::Path) -> Result<Self> {
657        let conn = open_connection(db_path)?;
658        migrate(&conn)?;
659        Ok(Self {
660            conn: StateConn::Sqlite(conn),
661            state_ref: StateRef::Sqlite(db_path.to_path_buf()),
662        })
663    }
664}
665
666// ─── Migration tests ──────────────────────────────────────────────────────────
667
668#[cfg(test)]
669mod tests {
670    use super::*;
671
672    #[test]
673    fn fresh_db_reaches_latest_version() {
674        let s = StateStore::open_in_memory().unwrap();
675        let ver = match &s.conn {
676            StateConn::Sqlite(c) => get_current_version(c),
677            StateConn::Postgres(_) => unreachable!(),
678        };
679        assert_eq!(ver, SCHEMA_VERSION);
680    }
681
682    #[test]
683    fn migration_is_idempotent() {
684        let s = StateStore::open_in_memory().unwrap();
685        match &s.conn {
686            StateConn::Sqlite(c) => {
687                migrate(c).unwrap();
688                migrate(c).unwrap();
689                assert_eq!(get_current_version(c), SCHEMA_VERSION);
690            }
691            StateConn::Postgres(_) => unreachable!(),
692        }
693    }
694
695    #[test]
696    fn legacy_db_gets_upgraded() {
697        let conn = Connection::open_in_memory().unwrap();
698        conn.execute_batch(
699            "CREATE TABLE export_state (
700                export_name TEXT PRIMARY KEY,
701                last_cursor_value TEXT,
702                last_run_at TEXT
703            );
704            CREATE TABLE export_metrics (
705                id INTEGER PRIMARY KEY AUTOINCREMENT,
706                export_name TEXT NOT NULL,
707                run_at TEXT NOT NULL,
708                duration_ms INTEGER NOT NULL,
709                total_rows INTEGER NOT NULL,
710                status TEXT NOT NULL
711            );",
712        )
713        .unwrap();
714
715        migrate(&conn).unwrap();
716        assert_eq!(get_current_version(&conn), SCHEMA_VERSION);
717
718        let has_chunk_run: bool = conn
719            .query_row(
720                "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='chunk_run'",
721                [],
722                |row| row.get(0),
723            )
724            .unwrap();
725        assert!(has_chunk_run);
726    }
727
728    #[test]
729    fn v8_renames_file_manifest_to_file_log() {
730        let s = StateStore::open_in_memory().unwrap();
731        let conn = match &s.conn {
732            StateConn::Sqlite(c) => c,
733            StateConn::Postgres(_) => unreachable!(),
734        };
735        let has_file_log: bool = conn
736            .query_row(
737                "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='file_log'",
738                [],
739                |row| row.get(0),
740            )
741            .unwrap();
742        assert!(has_file_log, "v8 must produce a `file_log` table");
743        let has_old: bool = conn
744            .query_row(
745                "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='file_manifest'",
746                [],
747                |row| row.get(0),
748            )
749            .unwrap();
750        assert!(!has_old, "v8 must remove the old `file_manifest` table");
751        let has_new_idx: bool = conn
752            .query_row(
753                "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='index' AND name='idx_file_log_export'",
754                [],
755                |row| row.get(0),
756            )
757            .unwrap();
758        assert!(has_new_idx, "v8 must create the renamed index");
759    }
760
761    #[test]
762    fn v8_upgrades_existing_v7_db_with_data() {
763        // Simulate an existing 0.6.0 database stopped at v7: the table is still
764        // named `file_manifest` and has rows.  v8 must rename it preserving data.
765        let conn = Connection::open_in_memory().unwrap();
766        // Apply v1..=v7 by running the migrator after manually stamping v7.
767        // Simpler: run the migrator, then manually rename back to v7 state to
768        // exercise the v7→v8 path.  Here we just verify forward path covers it.
769        migrate(&conn).unwrap();
770        // Insert a row using the new name (post-v8); the rename happened transparently.
771        conn.execute(
772            "INSERT INTO file_log (run_id, export_name, file_name, row_count, bytes, format, created_at)
773             VALUES ('r1', 'orders', 'f.parquet', 100, 4096, 'parquet', '2026-05-21T00:00:00Z')",
774            [],
775        )
776        .unwrap();
777        let count: i64 = conn
778            .query_row("SELECT COUNT(*) FROM file_log", [], |r| r.get(0))
779            .unwrap();
780        assert_eq!(count, 1);
781    }
782
783    #[test]
784    fn run_aggregate_table_exists_after_migration() {
785        let s = StateStore::open_in_memory().unwrap();
786        let conn = match &s.conn {
787            StateConn::Sqlite(c) => c,
788            StateConn::Postgres(_) => unreachable!(),
789        };
790        let exists: bool = conn
791            .query_row(
792                "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='run_aggregate'",
793                [],
794                |row| row.get(0),
795            )
796            .unwrap();
797        assert!(exists, "v5 migration must create the run_aggregate table");
798    }
799
800    #[test]
801    fn pg_sql_converts_placeholders() {
802        assert_eq!(
803            pg_sql("SELECT ?1, ?2 FROM t WHERE x = ?3"),
804            "SELECT $1, $2 FROM t WHERE x = $3"
805        );
806        assert_eq!(
807            pg_sql("INSERT INTO t VALUES (?1, ?2)"),
808            "INSERT INTO t VALUES ($1, $2)"
809        );
810        assert_eq!(pg_sql("no placeholders"), "no placeholders");
811        // ?N with two digits
812        assert_eq!(pg_sql("?10 AND ?11"), "$10 AND $11");
813    }
814
815    #[test]
816    fn redact_pg_url_removes_password() {
817        assert_eq!(
818            redact_pg_url("postgresql://rivet:secret123@localhost:5433/rivet_state"),
819            "postgresql://rivet:***@localhost:5433/rivet_state"
820        );
821        assert_eq!(
822            redact_pg_url("postgres://admin:p@ssw0rd@db.prod.example.com/state"),
823            "postgres://admin:***@db.prod.example.com/state"
824        );
825    }
826
827    #[test]
828    fn redact_pg_url_no_password_unchanged() {
829        // URL without a password should come back as-is.
830        let url = "postgresql://rivet@localhost/state";
831        assert_eq!(redact_pg_url(url), url);
832    }
833}