Skip to main content

rivet_cli/
state.rs

1use rusqlite::{Connection, TransactionBehavior};
2
3use crate::error::Result;
4use crate::types::CursorState;
5
6const STATE_DB_NAME: &str = ".rivet_state.db";
7
8/// Current schema version.  Bump this and add a matching arm in `MIGRATIONS`.
9#[cfg(test)]
10const SCHEMA_VERSION: i64 = 3;
11
12/// Each entry is `(version, sql)`.  Applied in order when the DB is behind.
13const MIGRATIONS: &[(i64, &str)] = &[
14    // v1: core tables
15    (
16        1,
17        "CREATE TABLE IF NOT EXISTS export_state (
18            export_name TEXT PRIMARY KEY,
19            last_cursor_value TEXT,
20            last_run_at TEXT
21        );
22        CREATE TABLE IF NOT EXISTS export_metrics (
23            id INTEGER PRIMARY KEY AUTOINCREMENT,
24            export_name TEXT NOT NULL,
25            run_at TEXT NOT NULL,
26            duration_ms INTEGER NOT NULL,
27            total_rows INTEGER NOT NULL,
28            peak_rss_mb INTEGER,
29            status TEXT NOT NULL,
30            error_message TEXT,
31            tuning_profile TEXT,
32            format TEXT,
33            mode TEXT,
34            files_produced INTEGER DEFAULT 0,
35            bytes_written INTEGER DEFAULT 0,
36            retries INTEGER DEFAULT 0,
37            validated INTEGER,
38            schema_changed INTEGER,
39            run_id TEXT
40        );
41        CREATE TABLE IF NOT EXISTS export_schema (
42            export_name TEXT PRIMARY KEY,
43            columns_json TEXT NOT NULL,
44            updated_at TEXT NOT NULL
45        );
46        CREATE TABLE IF NOT EXISTS file_manifest (
47            id INTEGER PRIMARY KEY AUTOINCREMENT,
48            run_id TEXT NOT NULL,
49            export_name TEXT NOT NULL,
50            file_name TEXT NOT NULL,
51            row_count INTEGER NOT NULL,
52            bytes INTEGER NOT NULL,
53            format TEXT NOT NULL,
54            compression TEXT,
55            created_at TEXT NOT NULL
56        );",
57    ),
58    // v2: chunk checkpoint tables
59    (
60        2,
61        "CREATE TABLE IF NOT EXISTS chunk_run (
62            run_id TEXT PRIMARY KEY,
63            export_name TEXT NOT NULL,
64            plan_hash TEXT NOT NULL,
65            status TEXT NOT NULL,
66            max_chunk_attempts INTEGER NOT NULL DEFAULT 3,
67            created_at TEXT NOT NULL,
68            updated_at TEXT NOT NULL
69        );
70        CREATE INDEX IF NOT EXISTS idx_chunk_run_export_status
71            ON chunk_run(export_name, status);
72        CREATE TABLE IF NOT EXISTS chunk_task (
73            id INTEGER PRIMARY KEY AUTOINCREMENT,
74            run_id TEXT NOT NULL,
75            chunk_index INTEGER NOT NULL,
76            start_key TEXT NOT NULL,
77            end_key TEXT NOT NULL,
78            status TEXT NOT NULL,
79            attempts INTEGER NOT NULL DEFAULT 0,
80            last_error TEXT,
81            rows_written INTEGER,
82            file_name TEXT,
83            updated_at TEXT NOT NULL,
84            UNIQUE(run_id, chunk_index)
85        );
86        CREATE INDEX IF NOT EXISTS idx_chunk_task_run_status ON chunk_task(run_id, status);",
87    ),
88    // v3: index on file_manifest for faster per-export lookups
89    (
90        3,
91        "CREATE INDEX IF NOT EXISTS idx_file_manifest_export ON file_manifest(export_name, id DESC);",
92    ),
93];
94
95fn ensure_schema_version_table(conn: &Connection) {
96    let _ = conn.execute_batch(
97        "CREATE TABLE IF NOT EXISTS schema_version (
98            version INTEGER NOT NULL
99        );",
100    );
101}
102
103fn get_current_version(conn: &Connection) -> i64 {
104    conn.query_row(
105        "SELECT COALESCE(MAX(version), 0) FROM schema_version",
106        [],
107        |row| row.get(0),
108    )
109    .unwrap_or(0)
110}
111
112fn migrate(conn: &Connection) -> Result<()> {
113    ensure_schema_version_table(conn);
114
115    let current = get_current_version(conn);
116
117    // Detect pre-versioning databases: if export_state exists but version == 0,
118    // the DB was created by older code before versioned migrations.
119    if current == 0 {
120        let has_export_state: bool = conn
121            .query_row(
122                "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='export_state'",
123                [],
124                |row| row.get(0),
125            )
126            .unwrap_or(false);
127
128        if has_export_state {
129            let metrics_cols = [
130                "files_produced INTEGER DEFAULT 0",
131                "bytes_written INTEGER DEFAULT 0",
132                "retries INTEGER DEFAULT 0",
133                "validated INTEGER",
134                "schema_changed INTEGER",
135                "run_id TEXT",
136            ];
137            for col_def in &metrics_cols {
138                let sql = format!("ALTER TABLE export_metrics ADD COLUMN {}", col_def);
139                let _ = conn.execute(&sql, []);
140            }
141        }
142    }
143
144    for &(ver, sql) in MIGRATIONS {
145        if ver > current {
146            log::debug!("state: applying migration v{}", ver);
147            conn.execute_batch(sql)
148                .map_err(|e| anyhow::anyhow!("state: migration v{} failed: {}", ver, e))?;
149            conn.execute("INSERT INTO schema_version (version) VALUES (?1)", [ver])
150                .map_err(|e| anyhow::anyhow!("state: recording migration v{}: {}", ver, e))?;
151        }
152    }
153    Ok(())
154}
155
156/// One row from `chunk_task` for display / debugging.
157#[derive(Debug, Clone)]
158pub struct ChunkTaskInfo {
159    pub chunk_index: i64,
160    pub start_key: String,
161    pub end_key: String,
162    pub status: String,
163    pub attempts: i64,
164    pub last_error: Option<String>,
165    pub rows_written: Option<i64>,
166    pub file_name: Option<String>,
167}
168
169pub struct StateStore {
170    conn: Connection,
171    db_path: std::path::PathBuf,
172}
173
174#[derive(Debug)]
175#[allow(dead_code)]
176pub struct ExportMetric {
177    pub export_name: String,
178    pub run_id: Option<String>,
179    pub run_at: String,
180    pub duration_ms: i64,
181    pub total_rows: i64,
182    pub peak_rss_mb: Option<i64>,
183    pub status: String,
184    pub error_message: Option<String>,
185    pub tuning_profile: Option<String>,
186    pub format: Option<String>,
187    pub mode: Option<String>,
188    pub files_produced: i64,
189    pub bytes_written: i64,
190    pub retries: i64,
191    pub validated: Option<bool>,
192    pub schema_changed: Option<bool>,
193}
194
195#[derive(Debug)]
196#[allow(dead_code)]
197pub struct FileRecord {
198    pub run_id: String,
199    pub export_name: String,
200    pub file_name: String,
201    pub row_count: i64,
202    pub bytes: i64,
203    pub format: String,
204    pub compression: Option<String>,
205    pub created_at: String,
206}
207
208#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
209pub struct SchemaColumn {
210    pub name: String,
211    #[serde(rename = "type")]
212    pub data_type: String,
213}
214
215#[derive(Debug)]
216pub struct SchemaChange {
217    pub added: Vec<String>,
218    pub removed: Vec<String>,
219    pub type_changed: Vec<(String, String, String)>, // (name, old_type, new_type)
220}
221
222impl SchemaChange {
223    pub fn is_empty(&self) -> bool {
224        self.added.is_empty() && self.removed.is_empty() && self.type_changed.is_empty()
225    }
226}
227
228impl StateStore {
229    pub fn open(config_path: &str) -> Result<Self> {
230        let config_dir = std::path::Path::new(config_path)
231            .parent()
232            .unwrap_or(std::path::Path::new("."));
233        let db_path = config_dir.join(STATE_DB_NAME);
234        let db_path_buf = db_path.to_path_buf();
235        let conn = Connection::open(db_path)?;
236        let _ = conn.execute_batch("PRAGMA journal_mode=WAL;");
237        migrate(&conn)?;
238        Ok(Self {
239            conn,
240            db_path: db_path_buf,
241        })
242    }
243
244    /// Path to `.rivet_state.db` next to the config file (same rules as `open`).
245    pub fn state_db_path(config_path: &str) -> std::path::PathBuf {
246        let config_dir = std::path::Path::new(config_path)
247            .parent()
248            .unwrap_or(std::path::Path::new("."));
249        config_dir.join(STATE_DB_NAME)
250    }
251
252    pub(crate) fn database_path(&self) -> &std::path::Path {
253        self.db_path.as_path()
254    }
255
256    // ─── Chunk checkpoint (chunked exports) ───────────────────
257
258    /// Latest `in_progress` chunk run for this export, if any.
259    pub fn find_in_progress_chunk_run(
260        &self,
261        export_name: &str,
262    ) -> Result<Option<(String, String)>> {
263        let mut stmt = self.conn.prepare(
264            "SELECT run_id, plan_hash FROM chunk_run
265             WHERE export_name = ?1 AND status = 'in_progress'
266             ORDER BY created_at DESC LIMIT 1",
267        )?;
268        let mut rows = stmt.query_map([export_name], |row| Ok((row.get(0)?, row.get(1)?)))?;
269        Ok(rows.next().transpose()?)
270    }
271
272    pub fn create_chunk_run(
273        &self,
274        run_id: &str,
275        export_name: &str,
276        plan_hash: &str,
277        max_chunk_attempts: u32,
278    ) -> Result<()> {
279        let now = chrono::Utc::now().to_rfc3339();
280        self.conn.execute(
281            "INSERT INTO chunk_run (run_id, export_name, plan_hash, status, max_chunk_attempts, created_at, updated_at)
282             VALUES (?1, ?2, ?3, 'in_progress', ?4, ?5, ?5)",
283            rusqlite::params![run_id, export_name, plan_hash, max_chunk_attempts as i64, now],
284        )?;
285        Ok(())
286    }
287
288    pub fn insert_chunk_tasks(&self, run_id: &str, ranges: &[(i64, i64)]) -> Result<()> {
289        let now = chrono::Utc::now().to_rfc3339();
290        let mut stmt = self.conn.prepare(
291            "INSERT INTO chunk_task (run_id, chunk_index, start_key, end_key, status, attempts, updated_at)
292             VALUES (?1, ?2, ?3, ?4, 'pending', 0, ?5)",
293        )?;
294        for (i, (start, end)) in ranges.iter().enumerate() {
295            stmt.execute(rusqlite::params![
296                run_id,
297                i as i64,
298                start.to_string(),
299                end.to_string(),
300                now,
301            ])?;
302        }
303        Ok(())
304    }
305
306    /// Mark tasks left `running` after a crash as `pending` / `failed` retryable again.
307    pub fn reset_stale_running_chunk_tasks(&self, run_id: &str) -> Result<usize> {
308        let now = chrono::Utc::now().to_rfc3339();
309        let n = self.conn.execute(
310            "UPDATE chunk_task SET status = 'pending', updated_at = ?1
311             WHERE run_id = ?2 AND status = 'running'",
312            rusqlite::params![now, run_id],
313        )?;
314        Ok(n)
315    }
316
317    /// Atomically claim the next pending or retryable failed chunk (single-threaded export).
318    pub fn claim_next_chunk_task(&self, run_id: &str) -> Result<Option<(i64, String, String)>> {
319        Self::claim_next_chunk_task_at_path(self.db_path.as_path(), run_id)
320    }
321
322    fn claim_next_chunk_in_tx(
323        tx: &rusqlite::Transaction<'_>,
324        now: &str,
325        run_id: &str,
326    ) -> Result<Option<(i64, String, String)>> {
327        let mut stmt = tx.prepare(
328            "UPDATE chunk_task
329             SET status = 'running', attempts = attempts + 1, updated_at = ?1
330             WHERE rowid = (
331               SELECT ct.rowid FROM chunk_task ct
332               INNER JOIN chunk_run cr ON cr.run_id = ct.run_id
333               WHERE ct.run_id = ?2
334                 AND cr.status = 'in_progress'
335                 AND (
336                   ct.status = 'pending'
337                   OR (ct.status = 'failed' AND ct.attempts < cr.max_chunk_attempts)
338                 )
339               ORDER BY ct.chunk_index ASC
340               LIMIT 1
341             )
342             RETURNING chunk_index, start_key, end_key",
343        )?;
344        let mut rows = stmt.query(rusqlite::params![now, run_id])?;
345        let out = match rows.next()? {
346            Some(row) => Some((row.get(0)?, row.get(1)?, row.get(2)?)),
347            None => None,
348        };
349        Ok(out)
350    }
351
352    /// Same as `claim_next_chunk_task` using a fresh connection (parallel workers).
353    pub fn claim_next_chunk_task_at_path(
354        db_path: &std::path::Path,
355        run_id: &str,
356    ) -> Result<Option<(i64, String, String)>> {
357        let mut conn = Connection::open(db_path)?;
358        let now = chrono::Utc::now().to_rfc3339();
359        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
360        let res = Self::claim_next_chunk_in_tx(&tx, &now, run_id)?;
361        tx.commit()?;
362        Ok(res)
363    }
364
365    pub fn complete_chunk_task(
366        &self,
367        run_id: &str,
368        chunk_index: i64,
369        rows_written: i64,
370        file_name: Option<&str>,
371    ) -> Result<()> {
372        let now = chrono::Utc::now().to_rfc3339();
373        self.conn.execute(
374            "UPDATE chunk_task
375             SET status = 'completed', rows_written = ?1, file_name = ?2, last_error = NULL, updated_at = ?3
376             WHERE run_id = ?4 AND chunk_index = ?5",
377            rusqlite::params![rows_written, file_name, now, run_id, chunk_index],
378        )?;
379        Ok(())
380    }
381
382    pub fn fail_chunk_task(&self, run_id: &str, chunk_index: i64, err: &str) -> Result<()> {
383        let now = chrono::Utc::now().to_rfc3339();
384        self.conn.execute(
385            "UPDATE chunk_task SET status = 'failed', last_error = ?1, updated_at = ?2
386             WHERE run_id = ?3 AND chunk_index = ?4",
387            rusqlite::params![err, now, run_id, chunk_index],
388        )?;
389        Ok(())
390    }
391
392    pub fn fail_chunk_task_at_path(
393        db_path: &std::path::Path,
394        run_id: &str,
395        chunk_index: i64,
396        err: &str,
397    ) -> Result<()> {
398        let conn = Connection::open(db_path)?;
399        let now = chrono::Utc::now().to_rfc3339();
400        conn.execute(
401            "UPDATE chunk_task SET status = 'failed', last_error = ?1, updated_at = ?2
402             WHERE run_id = ?3 AND chunk_index = ?4",
403            rusqlite::params![err, now, run_id, chunk_index],
404        )?;
405        Ok(())
406    }
407
408    pub fn complete_chunk_task_at_path(
409        db_path: &std::path::Path,
410        run_id: &str,
411        chunk_index: i64,
412        rows_written: i64,
413        file_name: Option<&str>,
414    ) -> Result<()> {
415        let conn = Connection::open(db_path)?;
416        let now = chrono::Utc::now().to_rfc3339();
417        conn.execute(
418            "UPDATE chunk_task
419             SET status = 'completed', rows_written = ?1, file_name = ?2, last_error = NULL, updated_at = ?3
420             WHERE run_id = ?4 AND chunk_index = ?5",
421            rusqlite::params![rows_written, file_name, now, run_id, chunk_index],
422        )?;
423        Ok(())
424    }
425
426    pub fn count_chunk_tasks_not_completed(&self, run_id: &str) -> Result<i64> {
427        let n: i64 = self.conn.query_row(
428            "SELECT COUNT(*) FROM chunk_task WHERE run_id = ?1 AND status != 'completed'",
429            [run_id],
430            |row| row.get(0),
431        )?;
432        Ok(n)
433    }
434
435    pub fn finalize_chunk_run_completed(&self, run_id: &str) -> Result<()> {
436        let now = chrono::Utc::now().to_rfc3339();
437        self.conn.execute(
438            "UPDATE chunk_run SET status = 'completed', updated_at = ?1 WHERE run_id = ?2",
439            rusqlite::params![now, run_id],
440        )?;
441        Ok(())
442    }
443
444    /// Remove all chunk runs and tasks for an export (abandon resume).
445    pub fn reset_chunk_checkpoint(&self, export_name: &str) -> Result<usize> {
446        let run_ids: Vec<String> = {
447            let mut stmt = self
448                .conn
449                .prepare("SELECT run_id FROM chunk_run WHERE export_name = ?1")?;
450            let rows = stmt.query_map([export_name], |row| row.get(0))?;
451            rows.collect::<std::result::Result<Vec<_>, _>>()?
452        };
453        for rid in &run_ids {
454            let _ = self
455                .conn
456                .execute("DELETE FROM chunk_task WHERE run_id = ?1", [rid]);
457        }
458        let deleted = self.conn.execute(
459            "DELETE FROM chunk_run WHERE export_name = ?1",
460            [export_name],
461        )?;
462        Ok(deleted)
463    }
464
465    /// Latest chunk_run row for an export (any status), for `rivet state chunks`.
466    pub fn get_latest_chunk_run(
467        &self,
468        export_name: &str,
469    ) -> Result<Option<(String, String, String, String)>> {
470        let mut stmt = self.conn.prepare(
471            "SELECT run_id, plan_hash, status, updated_at FROM chunk_run
472             WHERE export_name = ?1 ORDER BY updated_at DESC LIMIT 1",
473        )?;
474        let mut rows = stmt.query_map([export_name], |row| {
475            Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
476        })?;
477        Ok(rows.next().transpose()?)
478    }
479
480    pub fn list_chunk_tasks_for_run(&self, run_id: &str) -> Result<Vec<ChunkTaskInfo>> {
481        let mut stmt = self.conn.prepare(
482            "SELECT chunk_index, start_key, end_key, status, attempts, last_error, rows_written, file_name
483             FROM chunk_task WHERE run_id = ?1 ORDER BY chunk_index ASC",
484        )?;
485        let rows = stmt.query_map([run_id], |row| {
486            Ok(ChunkTaskInfo {
487                chunk_index: row.get(0)?,
488                start_key: row.get(1)?,
489                end_key: row.get(2)?,
490                status: row.get(3)?,
491                attempts: row.get(4)?,
492                last_error: row.get(5)?,
493                rows_written: row.get(6)?,
494                file_name: row.get(7)?,
495            })
496        })?;
497        rows.collect::<std::result::Result<Vec<_>, _>>()
498            .map_err(Into::into)
499    }
500
501    // ─── Cursor State ────────────────────────────────────────
502
503    pub fn get(&self, export_name: &str) -> Result<CursorState> {
504        let mut stmt = self.conn.prepare(
505            "SELECT last_cursor_value, last_run_at FROM export_state WHERE export_name = ?1",
506        )?;
507        let result = stmt.query_row([export_name], |row| {
508            Ok(CursorState {
509                export_name: export_name.to_string(),
510                last_cursor_value: row.get(0)?,
511                last_run_at: row.get(1)?,
512            })
513        });
514        match result {
515            Ok(state) => Ok(state),
516            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(CursorState {
517                export_name: export_name.to_string(),
518                last_cursor_value: None,
519                last_run_at: None,
520            }),
521            Err(e) => Err(e.into()),
522        }
523    }
524
525    pub fn update(&self, export_name: &str, cursor_value: &str) -> Result<()> {
526        let now = chrono::Utc::now().to_rfc3339();
527        self.conn.execute(
528            "INSERT INTO export_state (export_name, last_cursor_value, last_run_at)
529             VALUES (?1, ?2, ?3)
530             ON CONFLICT(export_name) DO UPDATE SET
531                last_cursor_value = excluded.last_cursor_value,
532                last_run_at = excluded.last_run_at",
533            rusqlite::params![export_name, cursor_value, now],
534        )?;
535        Ok(())
536    }
537
538    pub fn reset(&self, export_name: &str) -> Result<()> {
539        self.conn.execute(
540            "DELETE FROM export_state WHERE export_name = ?1",
541            [export_name],
542        )?;
543        Ok(())
544    }
545
546    pub fn list_all(&self) -> Result<Vec<CursorState>> {
547        let mut stmt = self.conn.prepare(
548            "SELECT export_name, last_cursor_value, last_run_at FROM export_state ORDER BY export_name",
549        )?;
550        let rows = stmt.query_map([], |row| {
551            Ok(CursorState {
552                export_name: row.get(0)?,
553                last_cursor_value: row.get(1)?,
554                last_run_at: row.get(2)?,
555            })
556        })?;
557        rows.collect::<std::result::Result<Vec<_>, _>>()
558            .map_err(Into::into)
559    }
560
561    // ─── Metrics ─────────────────────────────────────────────
562
563    #[allow(clippy::too_many_arguments)]
564    pub fn record_metric(
565        &self,
566        export_name: &str,
567        run_id: &str,
568        duration_ms: i64,
569        total_rows: i64,
570        peak_rss_mb: Option<i64>,
571        status: &str,
572        error_message: Option<&str>,
573        tuning_profile: Option<&str>,
574        format: Option<&str>,
575        mode: Option<&str>,
576        files_produced: i64,
577        bytes_written: i64,
578        retries: i64,
579        validated: Option<bool>,
580        schema_changed: Option<bool>,
581    ) -> Result<()> {
582        let now = chrono::Utc::now().to_rfc3339();
583        self.conn.execute(
584            "INSERT INTO export_metrics (export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
585             status, error_message, tuning_profile, format, mode,
586             files_produced, bytes_written, retries, validated, schema_changed)
587             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
588            rusqlite::params![
589                export_name, run_id, now, duration_ms, total_rows, peak_rss_mb,
590                status, error_message, tuning_profile, format, mode,
591                files_produced, bytes_written, retries, validated, schema_changed
592            ],
593        )?;
594        Ok(())
595    }
596
597    pub fn get_metrics(
598        &self,
599        export_name: Option<&str>,
600        limit: usize,
601    ) -> Result<Vec<ExportMetric>> {
602        let cols = "export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
603                    status, error_message, tuning_profile, format, mode,
604                    files_produced, bytes_written, retries, validated, schema_changed";
605        let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(name) =
606            export_name
607        {
608            (
609                format!(
610                    "SELECT {} FROM export_metrics WHERE export_name = ?1 ORDER BY id DESC LIMIT {}",
611                    cols, limit
612                ),
613                vec![Box::new(name.to_string())],
614            )
615        } else {
616            (
617                format!(
618                    "SELECT {} FROM export_metrics ORDER BY id DESC LIMIT {}",
619                    cols, limit
620                ),
621                vec![],
622            )
623        };
624
625        let mut stmt = self.conn.prepare(&sql)?;
626        let params_refs: Vec<&dyn rusqlite::types::ToSql> =
627            params.iter().map(|p| p.as_ref()).collect();
628        let rows = stmt.query_map(params_refs.as_slice(), |row| {
629            Ok(ExportMetric {
630                export_name: row.get(0)?,
631                run_id: row.get(1)?,
632                run_at: row.get(2)?,
633                duration_ms: row.get(3)?,
634                total_rows: row.get(4)?,
635                peak_rss_mb: row.get(5)?,
636                status: row.get(6)?,
637                error_message: row.get(7)?,
638                tuning_profile: row.get(8)?,
639                format: row.get(9)?,
640                mode: row.get(10)?,
641                files_produced: row.get::<_, Option<i64>>(11)?.unwrap_or(0),
642                bytes_written: row.get::<_, Option<i64>>(12)?.unwrap_or(0),
643                retries: row.get::<_, Option<i64>>(13)?.unwrap_or(0),
644                validated: row.get(14)?,
645                schema_changed: row.get(15)?,
646            })
647        })?;
648        rows.collect::<std::result::Result<Vec<_>, _>>()
649            .map_err(Into::into)
650    }
651
652    // ─── File Manifest ─────────────────────────────────────────
653
654    #[allow(clippy::too_many_arguments)]
655    pub fn record_file(
656        &self,
657        run_id: &str,
658        export_name: &str,
659        file_name: &str,
660        row_count: i64,
661        bytes: i64,
662        format: &str,
663        compression: Option<&str>,
664    ) -> Result<()> {
665        let now = chrono::Utc::now().to_rfc3339();
666        self.conn.execute(
667            "INSERT INTO file_manifest (run_id, export_name, file_name, row_count, bytes, format, compression, created_at)
668             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
669            rusqlite::params![run_id, export_name, file_name, row_count, bytes, format, compression, now],
670        )?;
671        Ok(())
672    }
673
674    pub fn get_files(&self, export_name: Option<&str>, limit: usize) -> Result<Vec<FileRecord>> {
675        let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(name) =
676            export_name
677        {
678            (
679                format!(
680                    "SELECT run_id, export_name, file_name, row_count, bytes, format, compression, created_at
681                     FROM file_manifest WHERE export_name = ?1 ORDER BY id DESC LIMIT {}",
682                    limit
683                ),
684                vec![Box::new(name.to_string())],
685            )
686        } else {
687            (
688                format!(
689                    "SELECT run_id, export_name, file_name, row_count, bytes, format, compression, created_at
690                     FROM file_manifest ORDER BY id DESC LIMIT {}",
691                    limit
692                ),
693                vec![],
694            )
695        };
696
697        let mut stmt = self.conn.prepare(&sql)?;
698        let params_refs: Vec<&dyn rusqlite::types::ToSql> =
699            params.iter().map(|p| p.as_ref()).collect();
700        let rows = stmt.query_map(params_refs.as_slice(), |row| {
701            Ok(FileRecord {
702                run_id: row.get(0)?,
703                export_name: row.get(1)?,
704                file_name: row.get(2)?,
705                row_count: row.get(3)?,
706                bytes: row.get(4)?,
707                format: row.get(5)?,
708                compression: row.get(6)?,
709                created_at: row.get(7)?,
710            })
711        })?;
712        rows.collect::<std::result::Result<Vec<_>, _>>()
713            .map_err(Into::into)
714    }
715
716    // ─── Schema Tracking ─────────────────────────────────────
717
718    pub fn get_stored_schema(&self, export_name: &str) -> Result<Option<Vec<SchemaColumn>>> {
719        let mut stmt = self
720            .conn
721            .prepare("SELECT columns_json FROM export_schema WHERE export_name = ?1")?;
722        let result = stmt.query_row([export_name], |row| {
723            let json_str: String = row.get(0)?;
724            Ok(json_str)
725        });
726        match result {
727            Ok(json_str) => {
728                let cols: Vec<SchemaColumn> = serde_json::from_str(&json_str)?;
729                Ok(Some(cols))
730            }
731            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
732            Err(e) => Err(e.into()),
733        }
734    }
735
736    pub fn store_schema(&self, export_name: &str, columns: &[SchemaColumn]) -> Result<()> {
737        let json = serde_json::to_string(columns)?;
738        let now = chrono::Utc::now().to_rfc3339();
739        self.conn.execute(
740            "INSERT INTO export_schema (export_name, columns_json, updated_at)
741             VALUES (?1, ?2, ?3)
742             ON CONFLICT(export_name) DO UPDATE SET
743                columns_json = excluded.columns_json,
744                updated_at = excluded.updated_at",
745            rusqlite::params![export_name, json, now],
746        )?;
747        Ok(())
748    }
749
750    pub fn detect_schema_change(
751        &self,
752        export_name: &str,
753        current: &[SchemaColumn],
754    ) -> Result<Option<SchemaChange>> {
755        let stored = match self.get_stored_schema(export_name)? {
756            Some(s) => s,
757            None => {
758                self.store_schema(export_name, current)?;
759                return Ok(None);
760            }
761        };
762
763        let stored_map: std::collections::HashMap<&str, &str> = stored
764            .iter()
765            .map(|c| (c.name.as_str(), c.data_type.as_str()))
766            .collect();
767        let current_map: std::collections::HashMap<&str, &str> = current
768            .iter()
769            .map(|c| (c.name.as_str(), c.data_type.as_str()))
770            .collect();
771
772        let added: Vec<String> = current
773            .iter()
774            .filter(|c| !stored_map.contains_key(c.name.as_str()))
775            .map(|c| format!("{} ({})", c.name, c.data_type))
776            .collect();
777
778        let removed: Vec<String> = stored
779            .iter()
780            .filter(|c| !current_map.contains_key(c.name.as_str()))
781            .map(|c| c.name.clone())
782            .collect();
783
784        let type_changed: Vec<(String, String, String)> = current
785            .iter()
786            .filter_map(|c| {
787                stored_map.get(c.name.as_str()).and_then(|old_type| {
788                    if *old_type != c.data_type.as_str() {
789                        Some((c.name.clone(), old_type.to_string(), c.data_type.clone()))
790                    } else {
791                        None
792                    }
793                })
794            })
795            .collect();
796
797        let change = SchemaChange {
798            added,
799            removed,
800            type_changed,
801        };
802
803        if !change.is_empty() {
804            self.store_schema(export_name, current)?;
805            Ok(Some(change))
806        } else {
807            Ok(None)
808        }
809    }
810
811    #[allow(dead_code)] // used by integration tests (tests/*.rs)
812    pub fn open_in_memory() -> Result<Self> {
813        let conn = Connection::open_in_memory()?;
814        migrate(&conn)?;
815        Ok(Self {
816            conn,
817            db_path: std::path::PathBuf::from(":memory:"),
818        })
819    }
820}
821
822#[cfg(test)]
823mod tests {
824    use super::*;
825
826    fn store() -> StateStore {
827        StateStore::open_in_memory().expect("in-memory store")
828    }
829
830    // ─── Cursor tests ────────────────────────────────────────
831
832    #[test]
833    fn get_unknown_returns_empty_state() {
834        let s = store();
835        let state = s.get("nonexistent").unwrap();
836        assert!(state.last_cursor_value.is_none());
837    }
838
839    #[test]
840    fn update_then_get_returns_stored_cursor() {
841        let s = store();
842        s.update("orders", "2024-06-01").unwrap();
843        assert_eq!(
844            s.get("orders").unwrap().last_cursor_value.as_deref(),
845            Some("2024-06-01")
846        );
847    }
848
849    #[test]
850    fn update_overwrites_previous_cursor() {
851        let s = store();
852        s.update("orders", "100").unwrap();
853        s.update("orders", "200").unwrap();
854        assert_eq!(
855            s.get("orders").unwrap().last_cursor_value.as_deref(),
856            Some("200")
857        );
858    }
859
860    #[test]
861    fn reset_clears_cursor_state() {
862        let s = store();
863        s.update("orders", "100").unwrap();
864        s.reset("orders").unwrap();
865        assert!(s.get("orders").unwrap().last_cursor_value.is_none());
866    }
867
868    #[test]
869    fn list_all_on_empty_store_returns_empty() {
870        assert!(store().list_all().unwrap().is_empty());
871    }
872
873    #[test]
874    fn list_all_returns_entries_sorted_by_name() {
875        let s = store();
876        s.update("gamma", "3").unwrap();
877        s.update("alpha", "1").unwrap();
878        s.update("beta", "2").unwrap();
879        let all = s.list_all().unwrap();
880        assert_eq!(all[0].export_name, "alpha");
881        assert_eq!(all[2].export_name, "gamma");
882    }
883
884    // ─── Metrics tests ───────────────────────────────────────
885
886    #[test]
887    fn record_and_query_metrics() {
888        let s = store();
889        s.record_metric(
890            "orders",
891            "run_001",
892            1200,
893            50000,
894            Some(142),
895            "success",
896            None,
897            Some("safe"),
898            Some("parquet"),
899            Some("full"),
900            1,
901            4096,
902            0,
903            Some(true),
904            Some(false),
905        )
906        .unwrap();
907        s.record_metric(
908            "orders",
909            "run_002",
910            300,
911            0,
912            Some(30),
913            "failed",
914            Some("timeout"),
915            Some("safe"),
916            Some("parquet"),
917            Some("full"),
918            0,
919            0,
920            2,
921            None,
922            None,
923        )
924        .unwrap();
925
926        let metrics = s.get_metrics(Some("orders"), 10).unwrap();
927        assert_eq!(metrics.len(), 2);
928        assert_eq!(metrics[0].status, "failed");
929        assert_eq!(metrics[0].run_id.as_deref(), Some("run_002"));
930        assert_eq!(metrics[0].retries, 2);
931        assert_eq!(metrics[1].total_rows, 50000);
932        assert_eq!(metrics[1].run_id.as_deref(), Some("run_001"));
933        assert_eq!(metrics[1].files_produced, 1);
934        assert_eq!(metrics[1].bytes_written, 4096);
935        assert_eq!(metrics[1].validated, Some(true));
936        assert_eq!(metrics[1].schema_changed, Some(false));
937    }
938
939    #[test]
940    fn query_metrics_all_exports() {
941        let s = store();
942        s.record_metric(
943            "orders", "r1", 100, 1000, None, "success", None, None, None, None, 1, 500, 0, None,
944            None,
945        )
946        .unwrap();
947        s.record_metric(
948            "users", "r2", 200, 2000, None, "success", None, None, None, None, 1, 800, 0, None,
949            None,
950        )
951        .unwrap();
952
953        let metrics = s.get_metrics(None, 10).unwrap();
954        assert_eq!(metrics.len(), 2);
955    }
956
957    #[test]
958    fn metrics_limit_works() {
959        let s = store();
960        for i in 0..10 {
961            s.record_metric(
962                "t",
963                &format!("r{}", i),
964                i * 100,
965                i,
966                None,
967                "success",
968                None,
969                None,
970                None,
971                None,
972                0,
973                0,
974                0,
975                None,
976                None,
977            )
978            .unwrap();
979        }
980        let metrics = s.get_metrics(Some("t"), 3).unwrap();
981        assert_eq!(metrics.len(), 3);
982    }
983
984    // ─── File manifest tests ─────────────────────────────────
985
986    #[test]
987    fn record_and_query_files() {
988        let s = store();
989        s.record_file(
990            "run_001",
991            "orders",
992            "orders_20260329.parquet",
993            50000,
994            4096,
995            "parquet",
996            Some("zstd"),
997        )
998        .unwrap();
999        s.record_file(
1000            "run_001",
1001            "orders",
1002            "orders_20260329_chunk1.parquet",
1003            25000,
1004            2048,
1005            "parquet",
1006            Some("zstd"),
1007        )
1008        .unwrap();
1009        s.record_file(
1010            "run_002",
1011            "users",
1012            "users_20260329.csv",
1013            1000,
1014            500,
1015            "csv",
1016            None,
1017        )
1018        .unwrap();
1019
1020        let files = s.get_files(Some("orders"), 10).unwrap();
1021        assert_eq!(files.len(), 2);
1022        assert_eq!(files[0].run_id, "run_001");
1023        assert_eq!(files[0].row_count, 25000);
1024
1025        let all = s.get_files(None, 10).unwrap();
1026        assert_eq!(all.len(), 3);
1027    }
1028
1029    #[test]
1030    fn files_limit_works() {
1031        let s = store();
1032        for i in 0..10 {
1033            s.record_file(
1034                &format!("r{}", i),
1035                "t",
1036                &format!("f{}.parquet", i),
1037                i,
1038                i * 100,
1039                "parquet",
1040                None,
1041            )
1042            .unwrap();
1043        }
1044        let files = s.get_files(Some("t"), 3).unwrap();
1045        assert_eq!(files.len(), 3);
1046    }
1047
1048    // ─── Schema tracking tests ───────────────────────────────
1049
1050    #[test]
1051    fn first_schema_stored_no_change() {
1052        let s = store();
1053        let cols = vec![
1054            SchemaColumn {
1055                name: "id".into(),
1056                data_type: "Int64".into(),
1057            },
1058            SchemaColumn {
1059                name: "name".into(),
1060                data_type: "Utf8".into(),
1061            },
1062        ];
1063        let change = s.detect_schema_change("orders", &cols).unwrap();
1064        assert!(change.is_none(), "first run should detect no change");
1065        assert!(s.get_stored_schema("orders").unwrap().is_some());
1066    }
1067
1068    #[test]
1069    fn same_schema_no_change() {
1070        let s = store();
1071        let cols = vec![SchemaColumn {
1072            name: "id".into(),
1073            data_type: "Int64".into(),
1074        }];
1075        s.detect_schema_change("t", &cols).unwrap();
1076        let change = s.detect_schema_change("t", &cols).unwrap();
1077        assert!(change.is_none());
1078    }
1079
1080    #[test]
1081    fn added_column_detected() {
1082        let s = store();
1083        let v1 = vec![SchemaColumn {
1084            name: "id".into(),
1085            data_type: "Int64".into(),
1086        }];
1087        s.detect_schema_change("t", &v1).unwrap();
1088
1089        let v2 = vec![
1090            SchemaColumn {
1091                name: "id".into(),
1092                data_type: "Int64".into(),
1093            },
1094            SchemaColumn {
1095                name: "email".into(),
1096                data_type: "Utf8".into(),
1097            },
1098        ];
1099        let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
1100        assert_eq!(change.added.len(), 1);
1101        assert!(change.added[0].contains("email"));
1102    }
1103
1104    #[test]
1105    fn removed_column_detected() {
1106        let s = store();
1107        let v1 = vec![
1108            SchemaColumn {
1109                name: "id".into(),
1110                data_type: "Int64".into(),
1111            },
1112            SchemaColumn {
1113                name: "old_field".into(),
1114                data_type: "Utf8".into(),
1115            },
1116        ];
1117        s.detect_schema_change("t", &v1).unwrap();
1118
1119        let v2 = vec![SchemaColumn {
1120            name: "id".into(),
1121            data_type: "Int64".into(),
1122        }];
1123        let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
1124        assert_eq!(change.removed, vec!["old_field"]);
1125    }
1126
1127    #[test]
1128    fn type_change_detected() {
1129        let s = store();
1130        let v1 = vec![SchemaColumn {
1131            name: "price".into(),
1132            data_type: "Float64".into(),
1133        }];
1134        s.detect_schema_change("t", &v1).unwrap();
1135
1136        let v2 = vec![SchemaColumn {
1137            name: "price".into(),
1138            data_type: "Utf8".into(),
1139        }];
1140        let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
1141        assert_eq!(change.type_changed.len(), 1);
1142        assert_eq!(
1143            change.type_changed[0],
1144            ("price".into(), "Float64".into(), "Utf8".into())
1145        );
1146    }
1147
1148    // ─── Chunk checkpoint (needs on-disk DB: separate connections share one file) ─
1149
1150    fn store_on_disk() -> (tempfile::TempDir, StateStore) {
1151        let dir = tempfile::tempdir().expect("tempdir");
1152        let cfg = dir.path().join("rivet.yaml");
1153        std::fs::write(&cfg, "# test").expect("write cfg");
1154        let s = StateStore::open(cfg.to_str().unwrap()).expect("open store");
1155        (dir, s)
1156    }
1157
1158    #[test]
1159    fn chunk_claim_complete_and_finalize() {
1160        let (_dir, s) = store_on_disk();
1161        s.create_chunk_run("run_a", "orders", "deadbeef", 2)
1162            .unwrap();
1163        s.insert_chunk_tasks("run_a", &[(1, 5), (6, 10)]).unwrap();
1164
1165        let t0 = s.claim_next_chunk_task("run_a").unwrap().expect("claim 0");
1166        assert_eq!(t0.0, 0);
1167        assert_eq!(t0.1, "1");
1168        assert_eq!(t0.2, "5");
1169
1170        s.complete_chunk_task("run_a", 0, 3, Some("part0.csv"))
1171            .unwrap();
1172
1173        let t1 = s.claim_next_chunk_task("run_a").unwrap().expect("claim 1");
1174        assert_eq!(t1.0, 1);
1175        s.complete_chunk_task("run_a", 1, 2, Some("part1.csv"))
1176            .unwrap();
1177
1178        assert_eq!(s.count_chunk_tasks_not_completed("run_a").unwrap(), 0);
1179        s.finalize_chunk_run_completed("run_a").unwrap();
1180    }
1181
1182    #[test]
1183    fn chunk_fail_then_retry_until_max() {
1184        let (_dir, s) = store_on_disk();
1185        s.create_chunk_run("run_b", "orders", "ab", 2).unwrap();
1186        s.insert_chunk_tasks("run_b", &[(1, 2)]).unwrap();
1187
1188        let t = s.claim_next_chunk_task("run_b").unwrap().unwrap();
1189        assert_eq!(t.0, 0);
1190        s.fail_chunk_task("run_b", 0, "boom").unwrap();
1191
1192        let t2 = s.claim_next_chunk_task("run_b").unwrap().unwrap();
1193        assert_eq!(t2.0, 0);
1194        s.fail_chunk_task("run_b", 0, "again").unwrap();
1195
1196        assert!(s.claim_next_chunk_task("run_b").unwrap().is_none());
1197        assert_eq!(s.count_chunk_tasks_not_completed("run_b").unwrap(), 1);
1198    }
1199
1200    #[test]
1201    fn reset_chunk_checkpoint_clears_runs() {
1202        let (_dir, s) = store_on_disk();
1203        s.create_chunk_run("r1", "e", "h", 1).unwrap();
1204        s.insert_chunk_tasks("r1", &[(0, 1)]).unwrap();
1205        assert_eq!(s.reset_chunk_checkpoint("e").unwrap(), 1);
1206        assert!(s.find_in_progress_chunk_run("e").unwrap().is_none());
1207    }
1208
1209    // ─── Schema versioning tests ────────────────────────────
1210
1211    #[test]
1212    fn fresh_db_reaches_latest_version() {
1213        let s = store();
1214        let ver = get_current_version(&s.conn);
1215        assert_eq!(ver, SCHEMA_VERSION);
1216    }
1217
1218    #[test]
1219    fn migration_is_idempotent() {
1220        let s = store();
1221        migrate(&s.conn).unwrap();
1222        migrate(&s.conn).unwrap();
1223        let ver = get_current_version(&s.conn);
1224        assert_eq!(ver, SCHEMA_VERSION);
1225    }
1226
1227    #[test]
1228    fn legacy_db_gets_upgraded() {
1229        let conn = Connection::open_in_memory().unwrap();
1230        conn.execute_batch(
1231            "CREATE TABLE export_state (
1232                export_name TEXT PRIMARY KEY,
1233                last_cursor_value TEXT,
1234                last_run_at TEXT
1235            );
1236            CREATE TABLE export_metrics (
1237                id INTEGER PRIMARY KEY AUTOINCREMENT,
1238                export_name TEXT NOT NULL,
1239                run_at TEXT NOT NULL,
1240                duration_ms INTEGER NOT NULL,
1241                total_rows INTEGER NOT NULL,
1242                status TEXT NOT NULL
1243            );",
1244        )
1245        .unwrap();
1246
1247        migrate(&conn).unwrap();
1248
1249        let ver = get_current_version(&conn);
1250        assert_eq!(ver, SCHEMA_VERSION);
1251
1252        let has_chunk_run: bool = conn
1253            .query_row(
1254                "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='chunk_run'",
1255                [],
1256                |row| row.get(0),
1257            )
1258            .unwrap();
1259        assert!(has_chunk_run);
1260    }
1261}