Skip to main content

rivet/state/
checkpoint.rs

1use rusqlite::TransactionBehavior;
2
3use crate::error::Result;
4
5use super::{StateConn, StateRef, StateStore, open_connection, pg_sql};
6
7/// One row from `chunk_task` for display / debugging.
8#[derive(Debug, Clone)]
9pub struct ChunkTaskInfo {
10    pub chunk_index: i64,
11    pub start_key: String,
12    pub end_key: String,
13    pub status: String,
14    pub attempts: i64,
15    pub last_error: Option<String>,
16    pub rows_written: Option<i64>,
17    pub file_name: Option<String>,
18}
19
20/// Chunk checkpoint store — reads and writes `chunk_run` and `chunk_task`.
21///
22/// Governs Invariant I5 (Chunk Task Acyclicity): transitions are strictly
23/// forward (`pending → running → completed | failed`).  Completed tasks are
24/// never re-claimed.  Failed tasks return to `running` only while
25/// `attempts < max_chunk_attempts`.
26///
27/// Some methods accept an explicit `state_ref` and open a fresh connection.
28/// This is required for parallel workers that cannot share one `Connection`.
29impl StateStore {
30    /// Distinct `export_name` values that currently have at least one
31    /// `chunk_run` row with `status = 'in_progress'` (interrupted run — resume or reset).
32    pub fn list_export_names_with_in_progress_chunk_runs(&self) -> Result<Vec<String>> {
33        let sql = "SELECT DISTINCT export_name FROM chunk_run WHERE status = 'in_progress' ORDER BY export_name ASC";
34        match &self.conn {
35            StateConn::Sqlite(c) => {
36                let mut stmt = c.prepare(sql)?;
37                let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
38                rows.collect::<std::result::Result<Vec<_>, _>>()
39                    .map_err(Into::into)
40            }
41            StateConn::Postgres(client) => {
42                let mut c = client.borrow_mut();
43                let rows = c.query(&pg_sql(sql), &[])?;
44                Ok(rows.iter().map(|row| row.get(0)).collect())
45            }
46        }
47    }
48
49    /// Latest `in_progress` chunk run for this export, if any.
50    pub fn find_in_progress_chunk_run(
51        &self,
52        export_name: &str,
53    ) -> Result<Option<(String, String)>> {
54        let sql = "SELECT run_id, plan_hash FROM chunk_run
55             WHERE export_name = ?1 AND status = 'in_progress'
56             ORDER BY created_at DESC LIMIT 1";
57        match &self.conn {
58            StateConn::Sqlite(c) => {
59                let mut stmt = c.prepare(sql)?;
60                let mut rows =
61                    stmt.query_map([export_name], |row| Ok((row.get(0)?, row.get(1)?)))?;
62                Ok(rows.next().transpose()?)
63            }
64            StateConn::Postgres(client) => {
65                // Single borrow for the duration of this call; safe because all Postgres
66                // operations in StateStore are sequential (no re-entrant borrows).
67                let mut c = client.borrow_mut();
68                let rows = c.query(&pg_sql(sql), &[&export_name])?;
69                Ok(rows.first().map(|row| (row.get(0), row.get(1))))
70            }
71        }
72    }
73
74    pub fn create_chunk_run(
75        &self,
76        run_id: &str,
77        export_name: &str,
78        plan_hash: &str,
79        max_chunk_attempts: u32,
80    ) -> Result<()> {
81        let now = chrono::Utc::now().to_rfc3339();
82        let sql = "INSERT INTO chunk_run (run_id, export_name, plan_hash, status, max_chunk_attempts, created_at, updated_at)
83             VALUES (?1, ?2, ?3, 'in_progress', ?4, ?5, ?5)";
84        match &self.conn {
85            StateConn::Sqlite(c) => {
86                c.execute(
87                    sql,
88                    rusqlite::params![
89                        run_id,
90                        export_name,
91                        plan_hash,
92                        max_chunk_attempts as i64,
93                        now
94                    ],
95                )?;
96            }
97            StateConn::Postgres(client) => {
98                let mut c = client.borrow_mut();
99                c.execute(
100                    &pg_sql(sql),
101                    &[
102                        &run_id,
103                        &export_name,
104                        &plan_hash,
105                        &(max_chunk_attempts as i64),
106                        &now,
107                    ],
108                )?;
109            }
110        }
111        Ok(())
112    }
113
114    pub fn insert_chunk_tasks(&self, run_id: &str, ranges: &[(i64, i64)]) -> Result<()> {
115        if ranges.is_empty() {
116            return Ok(());
117        }
118        let now = chrono::Utc::now().to_rfc3339();
119        match &self.conn {
120            StateConn::Sqlite(c) => {
121                let tx = c.unchecked_transaction()?;
122                {
123                    let mut stmt = tx.prepare(
124                        "INSERT INTO chunk_task (run_id, chunk_index, start_key, end_key, status, attempts, updated_at)
125                         VALUES (?1, ?2, ?3, ?4, 'pending', 0, ?5)",
126                    )?;
127                    for (i, (start, end)) in ranges.iter().enumerate() {
128                        stmt.execute(rusqlite::params![
129                            run_id,
130                            i as i64,
131                            start.to_string(),
132                            end.to_string(),
133                            now,
134                        ])?;
135                    }
136                }
137                tx.commit()?;
138            }
139            StateConn::Postgres(client) => {
140                let mut c = client.borrow_mut();
141                let mut tx = c
142                    .transaction()
143                    .map_err(|e| anyhow::anyhow!("state(pg): begin transaction: {:#}", e))?;
144                for (i, (start, end)) in ranges.iter().enumerate() {
145                    tx.execute(
146                        "INSERT INTO chunk_task (run_id, chunk_index, start_key, end_key, status, attempts, updated_at)
147                         VALUES ($1, $2, $3, $4, 'pending', 0, $5)",
148                        &[
149                            &run_id,
150                            &(i as i64),
151                            &start.to_string(),
152                            &end.to_string(),
153                            &now,
154                        ],
155                    )
156                    .map_err(|e| anyhow::anyhow!("state(pg): insert chunk_task: {:#}", e))?;
157                }
158                tx.commit()
159                    .map_err(|e| anyhow::anyhow!("state(pg): commit: {:#}", e))?;
160            }
161        }
162        Ok(())
163    }
164
165    /// Mark tasks left `running` after a crash as `pending` so they can be retried.
166    pub fn reset_stale_running_chunk_tasks(&self, run_id: &str) -> Result<usize> {
167        let now = chrono::Utc::now().to_rfc3339();
168        let sql = "UPDATE chunk_task SET status = 'pending', updated_at = ?1
169             WHERE run_id = ?2 AND status = 'running'";
170        match &self.conn {
171            StateConn::Sqlite(c) => {
172                let n = c.execute(sql, rusqlite::params![now, run_id])?;
173                Ok(n)
174            }
175            StateConn::Postgres(client) => {
176                let mut c = client.borrow_mut();
177                let n = c.execute(&pg_sql(sql), &[&now, &run_id])?;
178                Ok(n as usize)
179            }
180        }
181    }
182
183    /// Atomically claim the next pending or retryable failed chunk.
184    pub fn claim_next_chunk_task(&self, run_id: &str) -> Result<Option<(i64, String, String)>> {
185        Self::claim_next_chunk_task_at_ref(&self.state_ref, run_id)
186    }
187
188    fn claim_next_chunk_in_sqlite_tx(
189        tx: &rusqlite::Transaction<'_>,
190        now: &str,
191        run_id: &str,
192    ) -> Result<Option<(i64, String, String)>> {
193        let mut stmt = tx.prepare(
194            "UPDATE chunk_task
195             SET status = 'running', attempts = attempts + 1, updated_at = ?1
196             WHERE rowid = (
197               SELECT ct.rowid FROM chunk_task ct
198               INNER JOIN chunk_run cr ON cr.run_id = ct.run_id
199               WHERE ct.run_id = ?2
200                 AND cr.status = 'in_progress'
201                 AND (
202                   ct.status = 'pending'
203                   OR (ct.status = 'failed' AND ct.attempts < cr.max_chunk_attempts)
204                 )
205               ORDER BY ct.chunk_index ASC
206               LIMIT 1
207             )
208             RETURNING chunk_index, start_key, end_key",
209        )?;
210        let mut rows = stmt.query(rusqlite::params![now, run_id])?;
211        let out = match rows.next()? {
212            Some(row) => Some((row.get(0)?, row.get(1)?, row.get(2)?)),
213            None => None,
214        };
215        Ok(out)
216    }
217
218    /// Claim next chunk using a fresh connection identified by `state_ref`.
219    /// Used by parallel workers that cannot share a single connection.
220    pub fn claim_next_chunk_task_at_ref(
221        state_ref: &StateRef,
222        run_id: &str,
223    ) -> Result<Option<(i64, String, String)>> {
224        match state_ref {
225            StateRef::Sqlite(db_path) => {
226                let mut conn = open_connection(db_path)?;
227                let now = chrono::Utc::now().to_rfc3339();
228                let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
229                let res = Self::claim_next_chunk_in_sqlite_tx(&tx, &now, run_id)?;
230                tx.commit()?;
231                Ok(res)
232            }
233            StateRef::Postgres(url) => {
234                let mut client = postgres::Client::connect(url, postgres::NoTls)
235                    .map_err(|e| anyhow::anyhow!("state(pg): connect for claim: {:#}", e))?;
236                let now = chrono::Utc::now().to_rfc3339();
237                // FOR UPDATE SKIP LOCKED ensures concurrent workers each get a distinct task.
238                let rows = client
239                    .query(
240                        "UPDATE chunk_task
241                         SET status = 'running', attempts = attempts + 1, updated_at = $1
242                         WHERE id = (
243                             SELECT ct.id FROM chunk_task ct
244                             INNER JOIN chunk_run cr ON cr.run_id = ct.run_id
245                             WHERE ct.run_id = $2
246                               AND cr.status = 'in_progress'
247                               AND (
248                                 ct.status = 'pending'
249                                 OR (ct.status = 'failed' AND ct.attempts < cr.max_chunk_attempts)
250                               )
251                             ORDER BY ct.chunk_index ASC
252                             LIMIT 1
253                             FOR UPDATE SKIP LOCKED
254                         )
255                         RETURNING chunk_index, start_key, end_key",
256                        &[&now, &run_id],
257                    )
258                    .map_err(|e| anyhow::anyhow!("state(pg): claim chunk: {:#}", e))?;
259                Ok(rows.first().map(|row| (row.get(0), row.get(1), row.get(2))))
260            }
261        }
262    }
263
264    pub fn complete_chunk_task(
265        &self,
266        run_id: &str,
267        chunk_index: i64,
268        rows_written: i64,
269        file_name: Option<&str>,
270    ) -> Result<()> {
271        let now = chrono::Utc::now().to_rfc3339();
272        let sql = "UPDATE chunk_task
273             SET status = 'completed', rows_written = ?1, file_name = ?2, last_error = NULL, updated_at = ?3
274             WHERE run_id = ?4 AND chunk_index = ?5";
275        match &self.conn {
276            StateConn::Sqlite(c) => {
277                c.execute(
278                    sql,
279                    rusqlite::params![rows_written, file_name, now, run_id, chunk_index],
280                )?;
281            }
282            StateConn::Postgres(client) => {
283                let mut c = client.borrow_mut();
284                c.execute(
285                    &pg_sql(sql),
286                    &[&rows_written, &file_name, &now, &run_id, &chunk_index],
287                )?;
288            }
289        }
290        Ok(())
291    }
292
293    pub fn fail_chunk_task(&self, run_id: &str, chunk_index: i64, err: &str) -> Result<()> {
294        let now = chrono::Utc::now().to_rfc3339();
295        let sql = "UPDATE chunk_task SET status = 'failed', last_error = ?1, updated_at = ?2
296             WHERE run_id = ?3 AND chunk_index = ?4";
297        match &self.conn {
298            StateConn::Sqlite(c) => {
299                c.execute(sql, rusqlite::params![err, now, run_id, chunk_index])?;
300            }
301            StateConn::Postgres(client) => {
302                let mut c = client.borrow_mut();
303                c.execute(&pg_sql(sql), &[&err, &now, &run_id, &chunk_index])?;
304            }
305        }
306        Ok(())
307    }
308
309    pub fn fail_chunk_task_at_ref(
310        state_ref: &StateRef,
311        run_id: &str,
312        chunk_index: i64,
313        err: &str,
314    ) -> Result<()> {
315        let now = chrono::Utc::now().to_rfc3339();
316        let sql = "UPDATE chunk_task SET status = 'failed', last_error = ?1, updated_at = ?2
317             WHERE run_id = ?3 AND chunk_index = ?4";
318        match state_ref {
319            StateRef::Sqlite(db_path) => {
320                let conn = open_connection(db_path)?;
321                conn.execute(sql, rusqlite::params![err, now, run_id, chunk_index])?;
322            }
323            StateRef::Postgres(url) => {
324                let mut client = postgres::Client::connect(url, postgres::NoTls)
325                    .map_err(|e| anyhow::anyhow!("state(pg): connect for fail: {:#}", e))?;
326                client.execute(&pg_sql(sql), &[&err, &now, &run_id, &chunk_index])?;
327            }
328        }
329        Ok(())
330    }
331
332    pub fn complete_chunk_task_at_ref(
333        state_ref: &StateRef,
334        run_id: &str,
335        chunk_index: i64,
336        rows_written: i64,
337        file_name: Option<&str>,
338    ) -> Result<()> {
339        let now = chrono::Utc::now().to_rfc3339();
340        let sql = "UPDATE chunk_task
341             SET status = 'completed', rows_written = ?1, file_name = ?2, last_error = NULL, updated_at = ?3
342             WHERE run_id = ?4 AND chunk_index = ?5";
343        match state_ref {
344            StateRef::Sqlite(db_path) => {
345                let conn = open_connection(db_path)?;
346                conn.execute(
347                    sql,
348                    rusqlite::params![rows_written, file_name, now, run_id, chunk_index],
349                )?;
350            }
351            StateRef::Postgres(url) => {
352                let mut client = postgres::Client::connect(url, postgres::NoTls)
353                    .map_err(|e| anyhow::anyhow!("state(pg): connect for complete: {:#}", e))?;
354                client.execute(
355                    &pg_sql(sql),
356                    &[&rows_written, &file_name, &now, &run_id, &chunk_index],
357                )?;
358            }
359        }
360        Ok(())
361    }
362
363    pub fn count_chunk_tasks_total(&self, run_id: &str) -> Result<usize> {
364        let sql = "SELECT COUNT(*) FROM chunk_task WHERE run_id = ?1";
365        match &self.conn {
366            StateConn::Sqlite(c) => {
367                let n: i64 = c.query_row(sql, [run_id], |row| row.get(0))?;
368                Ok(n as usize)
369            }
370            StateConn::Postgres(client) => {
371                let mut c = client.borrow_mut();
372                let row = c.query_one(&pg_sql(sql), &[&run_id])?;
373                let n: i64 = row.get(0);
374                Ok(n as usize)
375            }
376        }
377    }
378
379    pub fn count_chunk_tasks_not_completed(&self, run_id: &str) -> Result<i64> {
380        let sql = "SELECT COUNT(*) FROM chunk_task WHERE run_id = ?1 AND status != 'completed'";
381        match &self.conn {
382            StateConn::Sqlite(c) => {
383                let n: i64 = c.query_row(sql, [run_id], |row| row.get(0))?;
384                Ok(n)
385            }
386            StateConn::Postgres(client) => {
387                let mut c = client.borrow_mut();
388                let row = c.query_one(&pg_sql(sql), &[&run_id])?;
389                Ok(row.get(0))
390            }
391        }
392    }
393
394    pub fn finalize_chunk_run_completed(&self, run_id: &str) -> Result<()> {
395        let now = chrono::Utc::now().to_rfc3339();
396        let sql = "UPDATE chunk_run SET status = 'completed', updated_at = ?1 WHERE run_id = ?2";
397        match &self.conn {
398            StateConn::Sqlite(c) => {
399                c.execute(sql, rusqlite::params![now, run_id])?;
400            }
401            StateConn::Postgres(client) => {
402                let mut c = client.borrow_mut();
403                c.execute(&pg_sql(sql), &[&now, &run_id])?;
404            }
405        }
406        Ok(())
407    }
408
409    /// Reset a single completed chunk task back to `pending` so the next
410    /// `--resume` run re-exports it.
411    ///
412    /// Used by ADR-0012 M8 reconciliation: when the destination's manifest
413    /// says a part was committed but the actual object is missing or has
414    /// drifted (size mismatch), the chunk_task that produced it must run
415    /// again.  Without this reset, `claim_next_chunk_task` would skip the
416    /// completed row and the destination would stay broken across resumes.
417    ///
418    /// Distinct from `reset_chunk_checkpoint(export_name)` (which wipes
419    /// every run+task for an export — the operator-facing "abandon resume"
420    /// nuke) and from `reset_stale_running_chunk_tasks` (which only
421    /// rescues tasks left in 'running' after a crash).  This one is
422    /// surgical: a single (run_id, chunk_index) goes from completed
423    /// back to pending, attempts reset to 0, file_name cleared, and
424    /// last_error annotated with the M8 reason so the journal/audit
425    /// trail records why.
426    ///
427    /// Returns the number of rows updated (0 or 1).  Idempotent —
428    /// calling it on a non-completed task is a no-op.
429    pub fn reset_chunk_task_for_re_export(
430        &self,
431        run_id: &str,
432        chunk_index: i64,
433        reason: &str,
434    ) -> Result<usize> {
435        let now = chrono::Utc::now().to_rfc3339();
436        let sql = "UPDATE chunk_task
437             SET status = 'pending',
438                 attempts = 0,
439                 file_name = NULL,
440                 rows_written = NULL,
441                 last_error = ?1,
442                 updated_at = ?2
443             WHERE run_id = ?3
444               AND chunk_index = ?4
445               AND status = 'completed'";
446        match &self.conn {
447            StateConn::Sqlite(c) => {
448                let n = c.execute(sql, rusqlite::params![reason, now, run_id, chunk_index])?;
449                Ok(n)
450            }
451            StateConn::Postgres(client) => {
452                let mut c = client.borrow_mut();
453                let n = c.execute(&pg_sql(sql), &[&reason, &now, &run_id, &chunk_index])?;
454                Ok(n as usize)
455            }
456        }
457    }
458
459    /// Remove all chunk runs and tasks for an export (abandon resume).
460    pub fn reset_chunk_checkpoint(&self, export_name: &str) -> Result<usize> {
461        match &self.conn {
462            StateConn::Sqlite(c) => {
463                let run_ids: Vec<String> = {
464                    let mut stmt =
465                        c.prepare("SELECT run_id FROM chunk_run WHERE export_name = ?1")?;
466                    let rows = stmt.query_map([export_name], |row| row.get(0))?;
467                    rows.collect::<std::result::Result<Vec<_>, _>>()?
468                };
469                for rid in &run_ids {
470                    let _ = c.execute("DELETE FROM chunk_task WHERE run_id = ?1", [rid]);
471                }
472                let deleted = c.execute(
473                    "DELETE FROM chunk_run WHERE export_name = ?1",
474                    [export_name],
475                )?;
476                Ok(deleted)
477            }
478            StateConn::Postgres(client) => {
479                let mut c = client.borrow_mut();
480                let rows = c.query(
481                    "SELECT run_id FROM chunk_run WHERE export_name = $1",
482                    &[&export_name],
483                )?;
484                let run_ids: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
485                for rid in &run_ids {
486                    let _ = c.execute("DELETE FROM chunk_task WHERE run_id = $1", &[rid]);
487                }
488                let deleted = c.execute(
489                    "DELETE FROM chunk_run WHERE export_name = $1",
490                    &[&export_name],
491                )?;
492                Ok(deleted as usize)
493            }
494        }
495    }
496
497    /// Latest chunk_run row for an export (any status), for `rivet state chunks`.
498    pub fn get_latest_chunk_run(
499        &self,
500        export_name: &str,
501    ) -> Result<Option<(String, String, String, String)>> {
502        let sql = "SELECT run_id, plan_hash, status, updated_at FROM chunk_run
503             WHERE export_name = ?1 ORDER BY updated_at DESC LIMIT 1";
504        match &self.conn {
505            StateConn::Sqlite(c) => {
506                let mut stmt = c.prepare(sql)?;
507                let mut rows = stmt.query_map([export_name], |row| {
508                    Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
509                })?;
510                Ok(rows.next().transpose()?)
511            }
512            StateConn::Postgres(client) => {
513                let mut c = client.borrow_mut();
514                let rows = c.query(&pg_sql(sql), &[&export_name])?;
515                Ok(rows
516                    .first()
517                    .map(|row| (row.get(0), row.get(1), row.get(2), row.get(3))))
518            }
519        }
520    }
521
522    pub fn list_chunk_tasks_for_run(&self, run_id: &str) -> Result<Vec<ChunkTaskInfo>> {
523        let sql = "SELECT chunk_index, start_key, end_key, status, attempts, last_error, rows_written, file_name
524             FROM chunk_task WHERE run_id = ?1 ORDER BY chunk_index ASC";
525        match &self.conn {
526            StateConn::Sqlite(c) => {
527                let mut stmt = c.prepare(sql)?;
528                let rows = stmt.query_map([run_id], |row| {
529                    Ok(ChunkTaskInfo {
530                        chunk_index: row.get(0)?,
531                        start_key: row.get(1)?,
532                        end_key: row.get(2)?,
533                        status: row.get(3)?,
534                        attempts: row.get(4)?,
535                        last_error: row.get(5)?,
536                        rows_written: row.get(6)?,
537                        file_name: row.get(7)?,
538                    })
539                })?;
540                rows.collect::<std::result::Result<Vec<_>, _>>()
541                    .map_err(Into::into)
542            }
543            StateConn::Postgres(client) => {
544                let mut c = client.borrow_mut();
545                let rows = c.query(&pg_sql(sql), &[&run_id])?;
546                Ok(rows
547                    .iter()
548                    .map(|row| ChunkTaskInfo {
549                        chunk_index: row.get(0),
550                        start_key: row.get(1),
551                        end_key: row.get(2),
552                        status: row.get(3),
553                        attempts: row.get(4),
554                        last_error: row.get(5),
555                        rows_written: row.get(6),
556                        file_name: row.get(7),
557                    })
558                    .collect())
559            }
560        }
561    }
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567
568    fn store_on_disk() -> (tempfile::TempDir, StateStore) {
569        let dir = tempfile::tempdir().expect("tempdir");
570        let cfg = dir.path().join("rivet.yaml");
571        std::fs::write(&cfg, "# test").expect("write cfg");
572        let s = StateStore::open(cfg.to_str().unwrap()).expect("open store");
573        (dir, s)
574    }
575
576    #[test]
577    fn chunk_claim_complete_and_finalize() {
578        let (_dir, s) = store_on_disk();
579        s.create_chunk_run("run_a", "orders", "deadbeef", 2)
580            .unwrap();
581        s.insert_chunk_tasks("run_a", &[(1, 5), (6, 10)]).unwrap();
582
583        let t0 = s.claim_next_chunk_task("run_a").unwrap().expect("claim 0");
584        assert_eq!(t0.0, 0);
585        assert_eq!(t0.1, "1");
586        assert_eq!(t0.2, "5");
587
588        s.complete_chunk_task("run_a", 0, 3, Some("part0.csv"))
589            .unwrap();
590
591        let t1 = s.claim_next_chunk_task("run_a").unwrap().expect("claim 1");
592        assert_eq!(t1.0, 1);
593        s.complete_chunk_task("run_a", 1, 2, Some("part1.csv"))
594            .unwrap();
595
596        assert_eq!(s.count_chunk_tasks_not_completed("run_a").unwrap(), 0);
597        s.finalize_chunk_run_completed("run_a").unwrap();
598    }
599
600    #[test]
601    fn chunk_fail_then_retry_until_max() {
602        let (_dir, s) = store_on_disk();
603        s.create_chunk_run("run_b", "orders", "ab", 2).unwrap();
604        s.insert_chunk_tasks("run_b", &[(1, 2)]).unwrap();
605
606        let t = s.claim_next_chunk_task("run_b").unwrap().unwrap();
607        assert_eq!(t.0, 0);
608        s.fail_chunk_task("run_b", 0, "boom").unwrap();
609
610        let t2 = s.claim_next_chunk_task("run_b").unwrap().unwrap();
611        assert_eq!(t2.0, 0);
612        s.fail_chunk_task("run_b", 0, "again").unwrap();
613
614        assert!(s.claim_next_chunk_task("run_b").unwrap().is_none());
615        assert_eq!(s.count_chunk_tasks_not_completed("run_b").unwrap(), 1);
616    }
617
618    #[test]
619    fn reset_chunk_checkpoint_clears_runs() {
620        let (_dir, s) = store_on_disk();
621        s.create_chunk_run("r1", "e", "h", 1).unwrap();
622        s.insert_chunk_tasks("r1", &[(0, 1)]).unwrap();
623        assert_eq!(s.reset_chunk_checkpoint("e").unwrap(), 1);
624        assert!(s.find_in_progress_chunk_run("e").unwrap().is_none());
625    }
626
627    #[test]
628    fn list_in_progress_exports_orders_and_deduplicates() {
629        let (_dir, s) = store_on_disk();
630        assert!(
631            s.list_export_names_with_in_progress_chunk_runs()
632                .unwrap()
633                .is_empty()
634        );
635        s.create_chunk_run("r_old", "zebra", "h", 1).unwrap();
636        s.finalize_chunk_run_completed("r_old").unwrap();
637        s.create_chunk_run("r_a", "alpha", "h1", 1).unwrap();
638        s.create_chunk_run("r_b", "beta", "h2", 1).unwrap();
639        assert_eq!(
640            s.list_export_names_with_in_progress_chunk_runs().unwrap(),
641            vec!["alpha".to_string(), "beta".to_string()]
642        );
643    }
644}