1use rusqlite::Connection;
2
3use crate::error::Result;
4
5mod checkpoint;
6mod cursor;
7mod file_log;
8mod journal_store;
9mod metrics;
10mod progression;
11mod run_aggregate;
12mod schema;
13mod shape;
14
15#[allow(unused_imports)]
19pub use checkpoint::ChunkTaskInfo;
20#[allow(unused_imports)]
21pub use file_log::FileRecord;
22#[allow(unused_imports)]
23pub use metrics::ExportMetric;
24#[allow(unused_imports)]
25pub use progression::{Boundary, ExportProgression};
26#[allow(unused_imports)]
27pub use run_aggregate::{RunAggregate, RunAggregateEntry};
28#[allow(unused_imports)]
29pub use schema::{SchemaChange, SchemaColumn, arrow_schema_to_columns, schema_fingerprint};
30#[allow(unused_imports)]
31pub use shape::ShapeWarning;
32
33const STATE_DB_NAME: &str = ".rivet_state.db";
34
35const SCHEMA_VERSION: i64 = MIGRATIONS[MIGRATIONS.len() - 1].0;
37
38const MIGRATIONS: &[(i64, &str)] = &[
40 (
42 1,
43 "CREATE TABLE IF NOT EXISTS export_state (
44 export_name TEXT PRIMARY KEY,
45 last_cursor_value TEXT,
46 last_run_at TEXT
47 );
48 CREATE TABLE IF NOT EXISTS export_metrics (
49 id INTEGER PRIMARY KEY AUTOINCREMENT,
50 export_name TEXT NOT NULL,
51 run_at TEXT NOT NULL,
52 duration_ms INTEGER NOT NULL,
53 total_rows INTEGER NOT NULL,
54 peak_rss_mb INTEGER,
55 status TEXT NOT NULL,
56 error_message TEXT,
57 tuning_profile TEXT,
58 format TEXT,
59 mode TEXT,
60 files_produced INTEGER DEFAULT 0,
61 bytes_written INTEGER DEFAULT 0,
62 retries INTEGER DEFAULT 0,
63 validated INTEGER,
64 schema_changed INTEGER,
65 run_id TEXT
66 );
67 CREATE TABLE IF NOT EXISTS export_schema (
68 export_name TEXT PRIMARY KEY,
69 columns_json TEXT NOT NULL,
70 updated_at TEXT NOT NULL
71 );
72 CREATE TABLE IF NOT EXISTS file_manifest (
73 id INTEGER PRIMARY KEY AUTOINCREMENT,
74 run_id TEXT NOT NULL,
75 export_name TEXT NOT NULL,
76 file_name TEXT NOT NULL,
77 row_count INTEGER NOT NULL,
78 bytes INTEGER NOT NULL,
79 format TEXT NOT NULL,
80 compression TEXT,
81 created_at TEXT NOT NULL
82 );",
83 ),
84 (
86 2,
87 "CREATE TABLE IF NOT EXISTS chunk_run (
88 run_id TEXT PRIMARY KEY,
89 export_name TEXT NOT NULL,
90 plan_hash TEXT NOT NULL,
91 status TEXT NOT NULL,
92 max_chunk_attempts INTEGER NOT NULL DEFAULT 3,
93 created_at TEXT NOT NULL,
94 updated_at TEXT NOT NULL
95 );
96 CREATE INDEX IF NOT EXISTS idx_chunk_run_export_status
97 ON chunk_run(export_name, status);
98 CREATE TABLE IF NOT EXISTS chunk_task (
99 id INTEGER PRIMARY KEY AUTOINCREMENT,
100 run_id TEXT NOT NULL,
101 chunk_index INTEGER NOT NULL,
102 start_key TEXT NOT NULL,
103 end_key TEXT NOT NULL,
104 status TEXT NOT NULL,
105 attempts INTEGER NOT NULL DEFAULT 0,
106 last_error TEXT,
107 rows_written INTEGER,
108 file_name TEXT,
109 updated_at TEXT NOT NULL,
110 UNIQUE(run_id, chunk_index)
111 );
112 CREATE INDEX IF NOT EXISTS idx_chunk_task_run_status ON chunk_task(run_id, status);",
113 ),
114 (
116 3,
117 "CREATE INDEX IF NOT EXISTS idx_file_manifest_export ON file_manifest(export_name, id DESC);",
118 ),
119 (
121 4,
122 "CREATE TABLE IF NOT EXISTS export_progression (
123 export_name TEXT PRIMARY KEY,
124 last_committed_strategy TEXT,
125 last_committed_cursor TEXT,
126 last_committed_chunk_index INTEGER,
127 last_committed_run_id TEXT,
128 last_committed_at TEXT,
129 last_verified_strategy TEXT,
130 last_verified_cursor TEXT,
131 last_verified_chunk_index INTEGER,
132 last_verified_run_id TEXT,
133 last_verified_at TEXT
134 );",
135 ),
136 (
138 5,
139 "CREATE TABLE IF NOT EXISTS run_aggregate (
140 run_aggregate_id TEXT PRIMARY KEY,
141 started_at TEXT NOT NULL,
142 finished_at TEXT NOT NULL,
143 duration_ms INTEGER NOT NULL,
144 config_path TEXT,
145 parallel_mode TEXT NOT NULL,
146 total_exports INTEGER NOT NULL,
147 success_count INTEGER NOT NULL,
148 failed_count INTEGER NOT NULL,
149 skipped_count INTEGER NOT NULL,
150 total_rows INTEGER NOT NULL,
151 total_files INTEGER NOT NULL,
152 total_bytes INTEGER NOT NULL,
153 details_json TEXT NOT NULL
154 );
155 CREATE INDEX IF NOT EXISTS idx_run_aggregate_finished
156 ON run_aggregate(finished_at DESC);",
157 ),
158 (
160 6,
161 "CREATE TABLE IF NOT EXISTS export_shape (
162 export_name TEXT NOT NULL,
163 column_name TEXT NOT NULL,
164 max_byte_len INTEGER NOT NULL,
165 updated_at TEXT NOT NULL,
166 PRIMARY KEY (export_name, column_name)
167 );",
168 ),
169 (
171 7,
172 "CREATE TABLE IF NOT EXISTS run_journal (
173 run_id TEXT PRIMARY KEY,
174 export_name TEXT NOT NULL,
175 finished_at TEXT NOT NULL,
176 journal_json TEXT NOT NULL
177 );
178 CREATE INDEX IF NOT EXISTS idx_run_journal_export
179 ON run_journal(export_name, finished_at DESC);",
180 ),
181 (
185 8,
186 "ALTER TABLE file_manifest RENAME TO file_log;
187 DROP INDEX IF EXISTS idx_file_manifest_export;
188 CREATE INDEX IF NOT EXISTS idx_file_log_export ON file_log(export_name, id DESC);",
189 ),
190];
191
192const PG_MIGRATIONS: &[(i64, &str)] = &[
195 (
196 1,
197 "CREATE TABLE IF NOT EXISTS export_state (
198 export_name TEXT PRIMARY KEY,
199 last_cursor_value TEXT,
200 last_run_at TEXT
201 );
202 CREATE TABLE IF NOT EXISTS export_metrics (
203 id BIGSERIAL PRIMARY KEY,
204 export_name TEXT NOT NULL,
205 run_at TEXT NOT NULL,
206 duration_ms BIGINT NOT NULL,
207 total_rows BIGINT NOT NULL,
208 peak_rss_mb BIGINT,
209 status TEXT NOT NULL,
210 error_message TEXT,
211 tuning_profile TEXT,
212 format TEXT,
213 mode TEXT,
214 files_produced BIGINT DEFAULT 0,
215 bytes_written BIGINT DEFAULT 0,
216 retries BIGINT DEFAULT 0,
217 validated BOOLEAN,
218 schema_changed BOOLEAN,
219 run_id TEXT
220 );
221 CREATE TABLE IF NOT EXISTS export_schema (
222 export_name TEXT PRIMARY KEY,
223 columns_json TEXT NOT NULL,
224 updated_at TEXT NOT NULL
225 );
226 CREATE TABLE IF NOT EXISTS file_manifest (
227 id BIGSERIAL PRIMARY KEY,
228 run_id TEXT NOT NULL,
229 export_name TEXT NOT NULL,
230 file_name TEXT NOT NULL,
231 row_count BIGINT NOT NULL,
232 bytes BIGINT NOT NULL,
233 format TEXT NOT NULL,
234 compression TEXT,
235 created_at TEXT NOT NULL
236 );",
237 ),
238 (
239 2,
240 "CREATE TABLE IF NOT EXISTS chunk_run (
241 run_id TEXT PRIMARY KEY,
242 export_name TEXT NOT NULL,
243 plan_hash TEXT NOT NULL,
244 status TEXT NOT NULL,
245 max_chunk_attempts BIGINT NOT NULL DEFAULT 3,
246 created_at TEXT NOT NULL,
247 updated_at TEXT NOT NULL
248 );
249 CREATE INDEX IF NOT EXISTS idx_chunk_run_export_status
250 ON chunk_run(export_name, status);
251 CREATE TABLE IF NOT EXISTS chunk_task (
252 id BIGSERIAL PRIMARY KEY,
253 run_id TEXT NOT NULL,
254 chunk_index BIGINT NOT NULL,
255 start_key TEXT NOT NULL,
256 end_key TEXT NOT NULL,
257 status TEXT NOT NULL,
258 attempts BIGINT NOT NULL DEFAULT 0,
259 last_error TEXT,
260 rows_written BIGINT,
261 file_name TEXT,
262 updated_at TEXT NOT NULL,
263 UNIQUE(run_id, chunk_index)
264 );
265 CREATE INDEX IF NOT EXISTS idx_chunk_task_run_status ON chunk_task(run_id, status);",
266 ),
267 (
268 3,
269 "CREATE INDEX IF NOT EXISTS idx_file_manifest_export ON file_manifest(export_name, id DESC);",
270 ),
271 (
272 4,
273 "CREATE TABLE IF NOT EXISTS export_progression (
274 export_name TEXT PRIMARY KEY,
275 last_committed_strategy TEXT,
276 last_committed_cursor TEXT,
277 last_committed_chunk_index BIGINT,
278 last_committed_run_id TEXT,
279 last_committed_at TEXT,
280 last_verified_strategy TEXT,
281 last_verified_cursor TEXT,
282 last_verified_chunk_index BIGINT,
283 last_verified_run_id TEXT,
284 last_verified_at TEXT
285 );",
286 ),
287 (
288 5,
289 "CREATE TABLE IF NOT EXISTS run_aggregate (
290 run_aggregate_id TEXT PRIMARY KEY,
291 started_at TEXT NOT NULL,
292 finished_at TEXT NOT NULL,
293 duration_ms BIGINT NOT NULL,
294 config_path TEXT,
295 parallel_mode TEXT NOT NULL,
296 total_exports BIGINT NOT NULL,
297 success_count BIGINT NOT NULL,
298 failed_count BIGINT NOT NULL,
299 skipped_count BIGINT NOT NULL,
300 total_rows BIGINT NOT NULL,
301 total_files BIGINT NOT NULL,
302 total_bytes BIGINT NOT NULL,
303 details_json TEXT NOT NULL
304 );
305 CREATE INDEX IF NOT EXISTS idx_run_aggregate_finished
306 ON run_aggregate(finished_at DESC);",
307 ),
308 (
309 6,
310 "CREATE TABLE IF NOT EXISTS export_shape (
311 export_name TEXT NOT NULL,
312 column_name TEXT NOT NULL,
313 max_byte_len BIGINT NOT NULL,
314 updated_at TEXT NOT NULL,
315 PRIMARY KEY (export_name, column_name)
316 );",
317 ),
318 (
319 7,
320 "CREATE TABLE IF NOT EXISTS run_journal (
321 run_id TEXT PRIMARY KEY,
322 export_name TEXT NOT NULL,
323 finished_at TEXT NOT NULL,
324 journal_json TEXT NOT NULL
325 );
326 CREATE INDEX IF NOT EXISTS idx_run_journal_export
327 ON run_journal(export_name, finished_at DESC);",
328 ),
329 (
332 8,
333 "ALTER TABLE file_manifest RENAME TO file_log;
334 DROP INDEX IF EXISTS idx_file_manifest_export;
335 CREATE INDEX IF NOT EXISTS idx_file_log_export ON file_log(export_name, id DESC);",
336 ),
337];
338
339pub(super) fn pg_sql(sql: &str) -> String {
344 let bytes = sql.as_bytes();
345 let mut out = String::with_capacity(sql.len());
346 let mut i = 0;
347 while i < bytes.len() {
348 if bytes[i] == b'?' && i + 1 < bytes.len() && bytes[i + 1].is_ascii_digit() {
349 out.push('$');
350 } else {
351 out.push(bytes[i] as char);
352 }
353 i += 1;
354 }
355 out
356}
357
358pub(super) enum StateConn {
362 Sqlite(rusqlite::Connection),
363 Postgres(Box<std::cell::RefCell<postgres::Client>>),
368}
369
370#[derive(Clone)]
374pub enum StateRef {
375 Sqlite(std::path::PathBuf),
376 Postgres(String),
377}
378
379fn ensure_schema_version_table(conn: &Connection) {
382 let _ = conn.execute_batch(
383 "CREATE TABLE IF NOT EXISTS schema_version (
384 version INTEGER NOT NULL
385 );",
386 );
387}
388
389fn get_current_version(conn: &Connection) -> i64 {
390 conn.query_row(
391 "SELECT COALESCE(MAX(version), 0) FROM schema_version",
392 [],
393 |row| row.get(0),
394 )
395 .unwrap_or(0)
396}
397
398fn migrate(conn: &Connection) -> Result<()> {
399 ensure_schema_version_table(conn);
400
401 let current = get_current_version(conn);
402
403 if current == 0 {
404 let has_export_state: bool = conn
405 .query_row(
406 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='export_state'",
407 [],
408 |row| row.get(0),
409 )
410 .unwrap_or(false);
411
412 if has_export_state {
413 let metrics_cols = [
414 "files_produced INTEGER DEFAULT 0",
415 "bytes_written INTEGER DEFAULT 0",
416 "retries INTEGER DEFAULT 0",
417 "validated INTEGER",
418 "schema_changed INTEGER",
419 "run_id TEXT",
420 ];
421 for col_def in &metrics_cols {
422 let sql = format!("ALTER TABLE export_metrics ADD COLUMN {}", col_def);
423 let _ = conn.execute(&sql, []);
424 }
425 }
426 }
427
428 for &(ver, sql) in MIGRATIONS {
429 if ver > current {
430 log::debug!("state: applying migration v{}", ver);
431 let atomic_sql = format!(
432 "BEGIN;\n{}\nINSERT INTO schema_version (version) VALUES ({});\nCOMMIT;",
433 sql, ver
434 );
435 conn.execute_batch(&atomic_sql)
436 .map_err(|e| anyhow::anyhow!("state: migration v{} failed: {}", ver, e))?;
437 }
438 }
439
440 let _ = conn.execute(
441 "DELETE FROM schema_version WHERE version < (SELECT MAX(version) FROM schema_version)",
442 [],
443 );
444
445 let final_version = get_current_version(conn);
446 if final_version != SCHEMA_VERSION {
447 anyhow::bail!(
448 "state: migration incomplete — expected schema v{} but reached v{}",
449 SCHEMA_VERSION,
450 final_version
451 );
452 }
453
454 Ok(())
455}
456
457fn migrate_pg(client: &mut postgres::Client) -> Result<()> {
460 client
461 .batch_execute("CREATE TABLE IF NOT EXISTS rivet_schema_version (version BIGINT NOT NULL);")
462 .map_err(|e| anyhow::anyhow!("state(pg): create version table: {:#}", e))?;
463
464 let current: i64 = client
465 .query_one(
466 "SELECT COALESCE(MAX(version), 0) FROM rivet_schema_version",
467 &[],
468 )
469 .map_err(|e| anyhow::anyhow!("state(pg): read schema version: {:#}", e))?
470 .get(0);
471
472 for &(ver, sql) in PG_MIGRATIONS {
473 if ver > current {
474 log::debug!("state(pg): applying migration v{}", ver);
475 let batch = format!(
476 "BEGIN; {} INSERT INTO rivet_schema_version (version) VALUES ({}); COMMIT;",
477 sql, ver
478 );
479 client
480 .batch_execute(&batch)
481 .map_err(|e| anyhow::anyhow!("state(pg): migration v{} failed: {:#}", ver, e))?;
482 }
483 }
484
485 let _ = client.batch_execute(
487 "DELETE FROM rivet_schema_version \
488 WHERE version < (SELECT MAX(version) FROM rivet_schema_version);",
489 );
490
491 let final_version: i64 = client
493 .query_one(
494 "SELECT COALESCE(MAX(version), 0) FROM rivet_schema_version",
495 &[],
496 )
497 .map_err(|e| anyhow::anyhow!("state(pg): read final schema version: {:#}", e))?
498 .get(0);
499 if final_version != SCHEMA_VERSION {
500 anyhow::bail!(
501 "state(pg): migration incomplete — expected schema v{} but reached v{}",
502 SCHEMA_VERSION,
503 final_version
504 );
505 }
506
507 Ok(())
508}
509
510fn redact_pg_url(url: &str) -> String {
514 if let Some(at_pos) = url.rfind('@')
515 && let Some(scheme_end) = url.find("://")
516 {
517 let authority = &url[scheme_end + 3..at_pos];
518 if let Some(colon) = authority.rfind(':') {
519 let user = &authority[..colon];
520 return format!(
521 "{}://{}:***@{}",
522 &url[..scheme_end],
523 user,
524 &url[at_pos + 1..]
525 );
526 }
527 }
528 url.to_string()
529}
530
531pub(crate) const SQLITE_BUSY_TIMEOUT_MS: i64 = 10_000;
534
535pub(crate) fn open_connection(db_path: &std::path::Path) -> Result<Connection> {
536 let conn = Connection::open(db_path)?;
537 if let Err(e) = conn.execute_batch("PRAGMA journal_mode=WAL;") {
538 log::warn!(
539 "state: WAL journal mode unavailable ({}); \
540 running in default mode — concurrent writes may be slower",
541 e
542 );
543 }
544 if let Err(e) = conn.execute_batch(&format!(
545 "PRAGMA busy_timeout = {};",
546 SQLITE_BUSY_TIMEOUT_MS
547 )) {
548 log::warn!(
549 "state: failed to set busy_timeout ({}); \
550 concurrent writers may surface SQLITE_BUSY immediately",
551 e
552 );
553 }
554 Ok(conn)
555}
556
557pub struct StateStore {
577 pub(super) conn: StateConn,
578 pub(super) state_ref: StateRef,
580}
581
582impl StateStore {
583 pub fn open(config_path: &str) -> Result<Self> {
587 if let Ok(url) = std::env::var("RIVET_STATE_URL")
588 && url.starts_with("postgres")
589 {
590 return Self::open_postgres(&url);
591 }
592 Self::open_sqlite(config_path)
593 }
594
595 fn open_sqlite(config_path: &str) -> Result<Self> {
596 let config_dir = std::path::Path::new(config_path)
597 .parent()
598 .unwrap_or(std::path::Path::new("."));
599 let db_path = config_dir.join(STATE_DB_NAME);
600 let conn = open_connection(&db_path)?;
601 migrate(&conn)?;
602 Ok(Self {
603 conn: StateConn::Sqlite(conn),
604 state_ref: StateRef::Sqlite(db_path),
605 })
606 }
607
608 fn open_postgres(url: &str) -> Result<Self> {
609 let is_local =
610 url.contains("localhost") || url.contains("127.0.0.1") || url.contains("::1");
611 if !is_local {
612 log::warn!(
613 "state(pg): connecting to a remote host without TLS; \
614 set RIVET_STATE_URL to a sslmode=require URL for production use"
615 );
616 }
617 let mut client = postgres::Client::connect(url, postgres::NoTls).map_err(|e| {
618 anyhow::anyhow!("state(pg): connect to '{}': {:#}", redact_pg_url(url), e)
619 })?;
620 migrate_pg(&mut client)?;
621 Ok(Self {
622 conn: StateConn::Postgres(Box::new(std::cell::RefCell::new(client))),
623 state_ref: StateRef::Postgres(url.to_string()),
624 })
625 }
626
627 pub fn state_db_path(config_path: &str) -> std::path::PathBuf {
631 let config_dir = std::path::Path::new(config_path)
632 .parent()
633 .unwrap_or(std::path::Path::new("."));
634 config_dir.join(STATE_DB_NAME)
635 }
636
637 pub fn state_ref(&self) -> &StateRef {
639 &self.state_ref
640 }
641
642 #[allow(dead_code)]
644 pub fn open_in_memory() -> Result<Self> {
645 let conn = Connection::open_in_memory()?;
646 migrate(&conn)?;
647 Ok(Self {
648 conn: StateConn::Sqlite(conn),
649 state_ref: StateRef::Sqlite(std::path::PathBuf::from(":memory:")),
650 })
651 }
652
653 #[allow(dead_code)]
656 pub fn open_at_path(db_path: &std::path::Path) -> Result<Self> {
657 let conn = open_connection(db_path)?;
658 migrate(&conn)?;
659 Ok(Self {
660 conn: StateConn::Sqlite(conn),
661 state_ref: StateRef::Sqlite(db_path.to_path_buf()),
662 })
663 }
664}
665
666#[cfg(test)]
669mod tests {
670 use super::*;
671
672 #[test]
673 fn fresh_db_reaches_latest_version() {
674 let s = StateStore::open_in_memory().unwrap();
675 let ver = match &s.conn {
676 StateConn::Sqlite(c) => get_current_version(c),
677 StateConn::Postgres(_) => unreachable!(),
678 };
679 assert_eq!(ver, SCHEMA_VERSION);
680 }
681
682 #[test]
683 fn migration_is_idempotent() {
684 let s = StateStore::open_in_memory().unwrap();
685 match &s.conn {
686 StateConn::Sqlite(c) => {
687 migrate(c).unwrap();
688 migrate(c).unwrap();
689 assert_eq!(get_current_version(c), SCHEMA_VERSION);
690 }
691 StateConn::Postgres(_) => unreachable!(),
692 }
693 }
694
695 #[test]
696 fn legacy_db_gets_upgraded() {
697 let conn = Connection::open_in_memory().unwrap();
698 conn.execute_batch(
699 "CREATE TABLE export_state (
700 export_name TEXT PRIMARY KEY,
701 last_cursor_value TEXT,
702 last_run_at TEXT
703 );
704 CREATE TABLE export_metrics (
705 id INTEGER PRIMARY KEY AUTOINCREMENT,
706 export_name TEXT NOT NULL,
707 run_at TEXT NOT NULL,
708 duration_ms INTEGER NOT NULL,
709 total_rows INTEGER NOT NULL,
710 status TEXT NOT NULL
711 );",
712 )
713 .unwrap();
714
715 migrate(&conn).unwrap();
716 assert_eq!(get_current_version(&conn), SCHEMA_VERSION);
717
718 let has_chunk_run: bool = conn
719 .query_row(
720 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='chunk_run'",
721 [],
722 |row| row.get(0),
723 )
724 .unwrap();
725 assert!(has_chunk_run);
726 }
727
728 #[test]
729 fn v8_renames_file_manifest_to_file_log() {
730 let s = StateStore::open_in_memory().unwrap();
731 let conn = match &s.conn {
732 StateConn::Sqlite(c) => c,
733 StateConn::Postgres(_) => unreachable!(),
734 };
735 let has_file_log: bool = conn
736 .query_row(
737 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='file_log'",
738 [],
739 |row| row.get(0),
740 )
741 .unwrap();
742 assert!(has_file_log, "v8 must produce a `file_log` table");
743 let has_old: bool = conn
744 .query_row(
745 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='file_manifest'",
746 [],
747 |row| row.get(0),
748 )
749 .unwrap();
750 assert!(!has_old, "v8 must remove the old `file_manifest` table");
751 let has_new_idx: bool = conn
752 .query_row(
753 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='index' AND name='idx_file_log_export'",
754 [],
755 |row| row.get(0),
756 )
757 .unwrap();
758 assert!(has_new_idx, "v8 must create the renamed index");
759 }
760
761 #[test]
762 fn v8_upgrades_existing_v7_db_with_data() {
763 let conn = Connection::open_in_memory().unwrap();
766 migrate(&conn).unwrap();
770 conn.execute(
772 "INSERT INTO file_log (run_id, export_name, file_name, row_count, bytes, format, created_at)
773 VALUES ('r1', 'orders', 'f.parquet', 100, 4096, 'parquet', '2026-05-21T00:00:00Z')",
774 [],
775 )
776 .unwrap();
777 let count: i64 = conn
778 .query_row("SELECT COUNT(*) FROM file_log", [], |r| r.get(0))
779 .unwrap();
780 assert_eq!(count, 1);
781 }
782
783 #[test]
784 fn run_aggregate_table_exists_after_migration() {
785 let s = StateStore::open_in_memory().unwrap();
786 let conn = match &s.conn {
787 StateConn::Sqlite(c) => c,
788 StateConn::Postgres(_) => unreachable!(),
789 };
790 let exists: bool = conn
791 .query_row(
792 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='run_aggregate'",
793 [],
794 |row| row.get(0),
795 )
796 .unwrap();
797 assert!(exists, "v5 migration must create the run_aggregate table");
798 }
799
800 #[test]
801 fn pg_sql_converts_placeholders() {
802 assert_eq!(
803 pg_sql("SELECT ?1, ?2 FROM t WHERE x = ?3"),
804 "SELECT $1, $2 FROM t WHERE x = $3"
805 );
806 assert_eq!(
807 pg_sql("INSERT INTO t VALUES (?1, ?2)"),
808 "INSERT INTO t VALUES ($1, $2)"
809 );
810 assert_eq!(pg_sql("no placeholders"), "no placeholders");
811 assert_eq!(pg_sql("?10 AND ?11"), "$10 AND $11");
813 }
814
815 #[test]
816 fn redact_pg_url_removes_password() {
817 assert_eq!(
818 redact_pg_url("postgresql://rivet:secret123@localhost:5433/rivet_state"),
819 "postgresql://rivet:***@localhost:5433/rivet_state"
820 );
821 assert_eq!(
822 redact_pg_url("postgres://admin:p@ssw0rd@db.prod.example.com/state"),
823 "postgres://admin:***@db.prod.example.com/state"
824 );
825 }
826
827 #[test]
828 fn redact_pg_url_no_password_unchanged() {
829 let url = "postgresql://rivet@localhost/state";
831 assert_eq!(redact_pg_url(url), url);
832 }
833}