Skip to main content

rivet/state/
checkpoint.rs

1use rusqlite::TransactionBehavior;
2
3use crate::error::Result;
4
5use super::{StateConn, StateRef, StateStore, open_connection, pg_sql};
6
7/// One row from `chunk_task` for display / debugging.
8#[derive(Debug, Clone)]
9pub struct ChunkTaskInfo {
10    pub chunk_index: i64,
11    pub start_key: String,
12    pub end_key: String,
13    pub status: String,
14    pub attempts: i64,
15    pub last_error: Option<String>,
16    pub rows_written: Option<i64>,
17    pub file_name: Option<String>,
18}
19
20/// Chunk checkpoint store — reads and writes `chunk_run` and `chunk_task`.
21///
22/// Governs Invariant I5 (Chunk Task Acyclicity): transitions are strictly
23/// forward (`pending → running → completed | failed`).  Completed tasks are
24/// never re-claimed.  Failed tasks return to `running` only while
25/// `attempts < max_chunk_attempts`.
26///
27/// Some methods accept an explicit `state_ref` and open a fresh connection.
28/// This is required for parallel workers that cannot share one `Connection`.
29impl StateStore {
30    /// Distinct `export_name` values that currently have at least one
31    /// `chunk_run` row with `status = 'in_progress'` (interrupted run — resume or reset).
32    pub fn list_export_names_with_in_progress_chunk_runs(&self) -> Result<Vec<String>> {
33        let sql = "SELECT DISTINCT export_name FROM chunk_run WHERE status = 'in_progress' ORDER BY export_name ASC";
34        match &self.conn {
35            StateConn::Sqlite(c) => {
36                let mut stmt = c.prepare(sql)?;
37                let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
38                rows.collect::<std::result::Result<Vec<_>, _>>()
39                    .map_err(Into::into)
40            }
41            StateConn::Postgres(client) => {
42                let mut c = client.borrow_mut();
43                let rows = c.query(&pg_sql(sql), &[])?;
44                Ok(rows.iter().map(|row| row.get(0)).collect())
45            }
46        }
47    }
48
49    /// Latest `in_progress` chunk run for this export, if any.
50    pub fn find_in_progress_chunk_run(
51        &self,
52        export_name: &str,
53    ) -> Result<Option<(String, String)>> {
54        let sql = "SELECT run_id, plan_hash FROM chunk_run
55             WHERE export_name = ?1 AND status = 'in_progress'
56             ORDER BY created_at DESC LIMIT 1";
57        match &self.conn {
58            StateConn::Sqlite(c) => {
59                let mut stmt = c.prepare(sql)?;
60                let mut rows =
61                    stmt.query_map([export_name], |row| Ok((row.get(0)?, row.get(1)?)))?;
62                Ok(rows.next().transpose()?)
63            }
64            StateConn::Postgres(client) => {
65                // Single borrow for the duration of this call; safe because all Postgres
66                // operations in StateStore are sequential (no re-entrant borrows).
67                let mut c = client.borrow_mut();
68                let rows = c.query(&pg_sql(sql), &[&export_name])?;
69                Ok(rows.first().map(|row| (row.get(0), row.get(1))))
70            }
71        }
72    }
73
74    pub fn create_chunk_run(
75        &self,
76        run_id: &str,
77        export_name: &str,
78        plan_hash: &str,
79        max_chunk_attempts: u32,
80    ) -> Result<()> {
81        let now = chrono::Utc::now().to_rfc3339();
82        let sql = "INSERT INTO chunk_run (run_id, export_name, plan_hash, status, max_chunk_attempts, created_at, updated_at)
83             VALUES (?1, ?2, ?3, 'in_progress', ?4, ?5, ?5)";
84        match &self.conn {
85            StateConn::Sqlite(c) => {
86                c.execute(
87                    sql,
88                    rusqlite::params![
89                        run_id,
90                        export_name,
91                        plan_hash,
92                        max_chunk_attempts as i64,
93                        now
94                    ],
95                )?;
96            }
97            StateConn::Postgres(client) => {
98                let mut c = client.borrow_mut();
99                c.execute(
100                    &pg_sql(sql),
101                    &[
102                        &run_id,
103                        &export_name,
104                        &plan_hash,
105                        &(max_chunk_attempts as i64),
106                        &now,
107                    ],
108                )?;
109            }
110        }
111        Ok(())
112    }
113
114    pub fn insert_chunk_tasks(&self, run_id: &str, ranges: &[(i64, i64)]) -> Result<()> {
115        if ranges.is_empty() {
116            return Ok(());
117        }
118        let now = chrono::Utc::now().to_rfc3339();
119        match &self.conn {
120            StateConn::Sqlite(c) => {
121                let tx = c.unchecked_transaction()?;
122                {
123                    let mut stmt = tx.prepare(
124                        "INSERT INTO chunk_task (run_id, chunk_index, start_key, end_key, status, attempts, updated_at)
125                         VALUES (?1, ?2, ?3, ?4, 'pending', 0, ?5)",
126                    )?;
127                    for (i, (start, end)) in ranges.iter().enumerate() {
128                        stmt.execute(rusqlite::params![
129                            run_id,
130                            i as i64,
131                            start.to_string(),
132                            end.to_string(),
133                            now,
134                        ])?;
135                    }
136                }
137                tx.commit()?;
138            }
139            StateConn::Postgres(client) => {
140                let mut c = client.borrow_mut();
141                let mut tx = c
142                    .transaction()
143                    .map_err(|e| anyhow::anyhow!("state(pg): begin transaction: {:#}", e))?;
144                for (i, (start, end)) in ranges.iter().enumerate() {
145                    tx.execute(
146                        "INSERT INTO chunk_task (run_id, chunk_index, start_key, end_key, status, attempts, updated_at)
147                         VALUES ($1, $2, $3, $4, 'pending', 0, $5)",
148                        &[
149                            &run_id,
150                            &(i as i64),
151                            &start.to_string(),
152                            &end.to_string(),
153                            &now,
154                        ],
155                    )
156                    .map_err(|e| anyhow::anyhow!("state(pg): insert chunk_task: {:#}", e))?;
157                }
158                tx.commit()
159                    .map_err(|e| anyhow::anyhow!("state(pg): commit: {:#}", e))?;
160            }
161        }
162        Ok(())
163    }
164
165    /// Mark tasks left `running` after a crash as `pending` so they can be retried.
166    pub fn reset_stale_running_chunk_tasks(&self, run_id: &str) -> Result<usize> {
167        let now = chrono::Utc::now().to_rfc3339();
168        let sql = "UPDATE chunk_task SET status = 'pending', updated_at = ?1
169             WHERE run_id = ?2 AND status = 'running'";
170        match &self.conn {
171            StateConn::Sqlite(c) => {
172                let n = c.execute(sql, rusqlite::params![now, run_id])?;
173                Ok(n)
174            }
175            StateConn::Postgres(client) => {
176                let mut c = client.borrow_mut();
177                let n = c.execute(&pg_sql(sql), &[&now, &run_id])?;
178                Ok(n as usize)
179            }
180        }
181    }
182
183    /// Atomically claim the next pending or retryable failed chunk.
184    pub fn claim_next_chunk_task(&self, run_id: &str) -> Result<Option<(i64, String, String)>> {
185        Self::claim_next_chunk_task_at_ref(&self.state_ref, run_id)
186    }
187
188    fn claim_next_chunk_in_sqlite_tx(
189        tx: &rusqlite::Transaction<'_>,
190        now: &str,
191        run_id: &str,
192    ) -> Result<Option<(i64, String, String)>> {
193        let mut stmt = tx.prepare(
194            "UPDATE chunk_task
195             SET status = 'running', attempts = attempts + 1, updated_at = ?1
196             WHERE rowid = (
197               SELECT ct.rowid FROM chunk_task ct
198               INNER JOIN chunk_run cr ON cr.run_id = ct.run_id
199               WHERE ct.run_id = ?2
200                 AND cr.status = 'in_progress'
201                 AND (
202                   ct.status = 'pending'
203                   OR (ct.status = 'failed' AND ct.attempts < cr.max_chunk_attempts)
204                 )
205               ORDER BY ct.chunk_index ASC
206               LIMIT 1
207             )
208             RETURNING chunk_index, start_key, end_key",
209        )?;
210        let mut rows = stmt.query(rusqlite::params![now, run_id])?;
211        let out = match rows.next()? {
212            Some(row) => Some((row.get(0)?, row.get(1)?, row.get(2)?)),
213            None => None,
214        };
215        Ok(out)
216    }
217
218    /// Claim next chunk using a fresh connection identified by `state_ref`.
219    /// Used by parallel workers that cannot share a single connection.
220    pub fn claim_next_chunk_task_at_ref(
221        state_ref: &StateRef,
222        run_id: &str,
223    ) -> Result<Option<(i64, String, String)>> {
224        match state_ref {
225            StateRef::Sqlite(db_path) => {
226                let mut conn = open_connection(db_path)?;
227                let now = chrono::Utc::now().to_rfc3339();
228                let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
229                let res = Self::claim_next_chunk_in_sqlite_tx(&tx, &now, run_id)?;
230                tx.commit()?;
231                Ok(res)
232            }
233            StateRef::Postgres(url) => {
234                let mut client = super::connect_pg(url)?;
235                let now = chrono::Utc::now().to_rfc3339();
236                // FOR UPDATE SKIP LOCKED ensures concurrent workers each get a distinct task.
237                let rows = client
238                    .query(
239                        "UPDATE chunk_task
240                         SET status = 'running', attempts = attempts + 1, updated_at = $1
241                         WHERE id = (
242                             SELECT ct.id FROM chunk_task ct
243                             INNER JOIN chunk_run cr ON cr.run_id = ct.run_id
244                             WHERE ct.run_id = $2
245                               AND cr.status = 'in_progress'
246                               AND (
247                                 ct.status = 'pending'
248                                 OR (ct.status = 'failed' AND ct.attempts < cr.max_chunk_attempts)
249                               )
250                             ORDER BY ct.chunk_index ASC
251                             LIMIT 1
252                             FOR UPDATE SKIP LOCKED
253                         )
254                         RETURNING chunk_index, start_key, end_key",
255                        &[&now, &run_id],
256                    )
257                    .map_err(|e| anyhow::anyhow!("state(pg): claim chunk: {:#}", e))?;
258                Ok(rows.first().map(|row| (row.get(0), row.get(1), row.get(2))))
259            }
260        }
261    }
262
263    pub fn complete_chunk_task(
264        &self,
265        run_id: &str,
266        chunk_index: i64,
267        rows_written: i64,
268        file_name: Option<&str>,
269    ) -> Result<()> {
270        let now = chrono::Utc::now().to_rfc3339();
271        let sql = "UPDATE chunk_task
272             SET status = 'completed', rows_written = ?1, file_name = ?2, last_error = NULL, updated_at = ?3
273             WHERE run_id = ?4 AND chunk_index = ?5";
274        match &self.conn {
275            StateConn::Sqlite(c) => {
276                c.execute(
277                    sql,
278                    rusqlite::params![rows_written, file_name, now, run_id, chunk_index],
279                )?;
280            }
281            StateConn::Postgres(client) => {
282                let mut c = client.borrow_mut();
283                c.execute(
284                    &pg_sql(sql),
285                    &[&rows_written, &file_name, &now, &run_id, &chunk_index],
286                )?;
287            }
288        }
289        Ok(())
290    }
291
292    pub fn fail_chunk_task(&self, run_id: &str, chunk_index: i64, err: &str) -> Result<()> {
293        let now = chrono::Utc::now().to_rfc3339();
294        let sql = "UPDATE chunk_task SET status = 'failed', last_error = ?1, updated_at = ?2
295             WHERE run_id = ?3 AND chunk_index = ?4";
296        match &self.conn {
297            StateConn::Sqlite(c) => {
298                c.execute(sql, rusqlite::params![err, now, run_id, chunk_index])?;
299            }
300            StateConn::Postgres(client) => {
301                let mut c = client.borrow_mut();
302                c.execute(&pg_sql(sql), &[&err, &now, &run_id, &chunk_index])?;
303            }
304        }
305        Ok(())
306    }
307
308    pub fn fail_chunk_task_at_ref(
309        state_ref: &StateRef,
310        run_id: &str,
311        chunk_index: i64,
312        err: &str,
313    ) -> Result<()> {
314        let now = chrono::Utc::now().to_rfc3339();
315        let sql = "UPDATE chunk_task SET status = 'failed', last_error = ?1, updated_at = ?2
316             WHERE run_id = ?3 AND chunk_index = ?4";
317        match state_ref {
318            StateRef::Sqlite(db_path) => {
319                let conn = open_connection(db_path)?;
320                conn.execute(sql, rusqlite::params![err, now, run_id, chunk_index])?;
321            }
322            StateRef::Postgres(url) => {
323                let mut client = super::connect_pg(url)?;
324                client.execute(&pg_sql(sql), &[&err, &now, &run_id, &chunk_index])?;
325            }
326        }
327        Ok(())
328    }
329
330    pub fn complete_chunk_task_at_ref(
331        state_ref: &StateRef,
332        run_id: &str,
333        chunk_index: i64,
334        rows_written: i64,
335        file_name: Option<&str>,
336    ) -> Result<()> {
337        let now = chrono::Utc::now().to_rfc3339();
338        let sql = "UPDATE chunk_task
339             SET status = 'completed', rows_written = ?1, file_name = ?2, last_error = NULL, updated_at = ?3
340             WHERE run_id = ?4 AND chunk_index = ?5";
341        match state_ref {
342            StateRef::Sqlite(db_path) => {
343                let conn = open_connection(db_path)?;
344                conn.execute(
345                    sql,
346                    rusqlite::params![rows_written, file_name, now, run_id, chunk_index],
347                )?;
348            }
349            StateRef::Postgres(url) => {
350                let mut client = super::connect_pg(url)?;
351                client.execute(
352                    &pg_sql(sql),
353                    &[&rows_written, &file_name, &now, &run_id, &chunk_index],
354                )?;
355            }
356        }
357        Ok(())
358    }
359
360    pub fn count_chunk_tasks_total(&self, run_id: &str) -> Result<usize> {
361        let sql = "SELECT COUNT(*) FROM chunk_task WHERE run_id = ?1";
362        match &self.conn {
363            StateConn::Sqlite(c) => {
364                let n: i64 = c.query_row(sql, [run_id], |row| row.get(0))?;
365                Ok(n as usize)
366            }
367            StateConn::Postgres(client) => {
368                let mut c = client.borrow_mut();
369                let row = c.query_one(&pg_sql(sql), &[&run_id])?;
370                let n: i64 = row.get(0);
371                Ok(n as usize)
372            }
373        }
374    }
375
376    pub fn count_chunk_tasks_not_completed(&self, run_id: &str) -> Result<i64> {
377        let sql = "SELECT COUNT(*) FROM chunk_task WHERE run_id = ?1 AND status != 'completed'";
378        match &self.conn {
379            StateConn::Sqlite(c) => {
380                let n: i64 = c.query_row(sql, [run_id], |row| row.get(0))?;
381                Ok(n)
382            }
383            StateConn::Postgres(client) => {
384                let mut c = client.borrow_mut();
385                let row = c.query_one(&pg_sql(sql), &[&run_id])?;
386                Ok(row.get(0))
387            }
388        }
389    }
390
391    pub fn finalize_chunk_run_completed(&self, run_id: &str) -> Result<()> {
392        let now = chrono::Utc::now().to_rfc3339();
393        let sql = "UPDATE chunk_run SET status = 'completed', updated_at = ?1 WHERE run_id = ?2";
394        match &self.conn {
395            StateConn::Sqlite(c) => {
396                c.execute(sql, rusqlite::params![now, run_id])?;
397            }
398            StateConn::Postgres(client) => {
399                let mut c = client.borrow_mut();
400                c.execute(&pg_sql(sql), &[&now, &run_id])?;
401            }
402        }
403        Ok(())
404    }
405
406    /// Reset a single completed chunk task back to `pending` so the next
407    /// `--resume` run re-exports it.
408    ///
409    /// Used by ADR-0012 M8 reconciliation: when the destination's manifest
410    /// says a part was committed but the actual object is missing or has
411    /// drifted (size mismatch), the chunk_task that produced it must run
412    /// again.  Without this reset, `claim_next_chunk_task` would skip the
413    /// completed row and the destination would stay broken across resumes.
414    ///
415    /// Distinct from `reset_chunk_checkpoint(export_name)` (which wipes
416    /// every run+task for an export — the operator-facing "abandon resume"
417    /// nuke) and from `reset_stale_running_chunk_tasks` (which only
418    /// rescues tasks left in 'running' after a crash).  This one is
419    /// surgical: a single (run_id, chunk_index) goes from completed
420    /// back to pending, attempts reset to 0, file_name cleared, and
421    /// last_error annotated with the M8 reason so the journal/audit
422    /// trail records why.
423    ///
424    /// Returns the number of rows updated (0 or 1).  Idempotent —
425    /// calling it on a non-completed task is a no-op.
426    pub fn reset_chunk_task_for_re_export(
427        &self,
428        run_id: &str,
429        chunk_index: i64,
430        reason: &str,
431    ) -> Result<usize> {
432        let now = chrono::Utc::now().to_rfc3339();
433        let sql = "UPDATE chunk_task
434             SET status = 'pending',
435                 attempts = 0,
436                 file_name = NULL,
437                 rows_written = NULL,
438                 last_error = ?1,
439                 updated_at = ?2
440             WHERE run_id = ?3
441               AND chunk_index = ?4
442               AND status = 'completed'";
443        match &self.conn {
444            StateConn::Sqlite(c) => {
445                let n = c.execute(sql, rusqlite::params![reason, now, run_id, chunk_index])?;
446                Ok(n)
447            }
448            StateConn::Postgres(client) => {
449                let mut c = client.borrow_mut();
450                let n = c.execute(&pg_sql(sql), &[&reason, &now, &run_id, &chunk_index])?;
451                Ok(n as usize)
452            }
453        }
454    }
455
456    /// Remove all chunk runs and tasks for an export (abandon resume).
457    pub fn reset_chunk_checkpoint(&self, export_name: &str) -> Result<usize> {
458        match &self.conn {
459            StateConn::Sqlite(c) => {
460                let run_ids: Vec<String> = {
461                    let mut stmt =
462                        c.prepare("SELECT run_id FROM chunk_run WHERE export_name = ?1")?;
463                    let rows = stmt.query_map([export_name], |row| row.get(0))?;
464                    rows.collect::<std::result::Result<Vec<_>, _>>()?
465                };
466                for rid in &run_ids {
467                    let _ = c.execute("DELETE FROM chunk_task WHERE run_id = ?1", [rid]);
468                }
469                let deleted = c.execute(
470                    "DELETE FROM chunk_run WHERE export_name = ?1",
471                    [export_name],
472                )?;
473                Ok(deleted)
474            }
475            StateConn::Postgres(client) => {
476                let mut c = client.borrow_mut();
477                let rows = c.query(
478                    "SELECT run_id FROM chunk_run WHERE export_name = $1",
479                    &[&export_name],
480                )?;
481                let run_ids: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
482                for rid in &run_ids {
483                    let _ = c.execute("DELETE FROM chunk_task WHERE run_id = $1", &[rid]);
484                }
485                let deleted = c.execute(
486                    "DELETE FROM chunk_run WHERE export_name = $1",
487                    &[&export_name],
488                )?;
489                Ok(deleted as usize)
490            }
491        }
492    }
493
494    /// Latest chunk_run row for an export (any status), for `rivet state chunks`.
495    pub fn get_latest_chunk_run(
496        &self,
497        export_name: &str,
498    ) -> Result<Option<(String, String, String, String)>> {
499        let sql = "SELECT run_id, plan_hash, status, updated_at FROM chunk_run
500             WHERE export_name = ?1 ORDER BY updated_at DESC LIMIT 1";
501        match &self.conn {
502            StateConn::Sqlite(c) => {
503                let mut stmt = c.prepare(sql)?;
504                let mut rows = stmt.query_map([export_name], |row| {
505                    Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
506                })?;
507                Ok(rows.next().transpose()?)
508            }
509            StateConn::Postgres(client) => {
510                let mut c = client.borrow_mut();
511                let rows = c.query(&pg_sql(sql), &[&export_name])?;
512                Ok(rows
513                    .first()
514                    .map(|row| (row.get(0), row.get(1), row.get(2), row.get(3))))
515            }
516        }
517    }
518
519    pub fn list_chunk_tasks_for_run(&self, run_id: &str) -> Result<Vec<ChunkTaskInfo>> {
520        let sql = "SELECT chunk_index, start_key, end_key, status, attempts, last_error, rows_written, file_name
521             FROM chunk_task WHERE run_id = ?1 ORDER BY chunk_index ASC";
522        match &self.conn {
523            StateConn::Sqlite(c) => {
524                let mut stmt = c.prepare(sql)?;
525                let rows = stmt.query_map([run_id], |row| {
526                    Ok(ChunkTaskInfo {
527                        chunk_index: row.get(0)?,
528                        start_key: row.get(1)?,
529                        end_key: row.get(2)?,
530                        status: row.get(3)?,
531                        attempts: row.get(4)?,
532                        last_error: row.get(5)?,
533                        rows_written: row.get(6)?,
534                        file_name: row.get(7)?,
535                    })
536                })?;
537                rows.collect::<std::result::Result<Vec<_>, _>>()
538                    .map_err(Into::into)
539            }
540            StateConn::Postgres(client) => {
541                let mut c = client.borrow_mut();
542                let rows = c.query(&pg_sql(sql), &[&run_id])?;
543                Ok(rows
544                    .iter()
545                    .map(|row| ChunkTaskInfo {
546                        chunk_index: row.get(0),
547                        start_key: row.get(1),
548                        end_key: row.get(2),
549                        status: row.get(3),
550                        attempts: row.get(4),
551                        last_error: row.get(5),
552                        rows_written: row.get(6),
553                        file_name: row.get(7),
554                    })
555                    .collect())
556            }
557        }
558    }
559}
560
561#[cfg(test)]
562mod tests {
563    use super::*;
564
565    fn store_on_disk() -> (tempfile::TempDir, StateStore) {
566        let dir = tempfile::tempdir().expect("tempdir");
567        let cfg = dir.path().join("rivet.yaml");
568        std::fs::write(&cfg, "# test").expect("write cfg");
569        let s = StateStore::open(cfg.to_str().unwrap()).expect("open store");
570        (dir, s)
571    }
572
573    #[test]
574    fn chunk_claim_complete_and_finalize() {
575        let (_dir, s) = store_on_disk();
576        s.create_chunk_run("run_a", "orders", "deadbeef", 2)
577            .unwrap();
578        s.insert_chunk_tasks("run_a", &[(1, 5), (6, 10)]).unwrap();
579
580        let t0 = s.claim_next_chunk_task("run_a").unwrap().expect("claim 0");
581        assert_eq!(t0.0, 0);
582        assert_eq!(t0.1, "1");
583        assert_eq!(t0.2, "5");
584
585        s.complete_chunk_task("run_a", 0, 3, Some("part0.csv"))
586            .unwrap();
587
588        let t1 = s.claim_next_chunk_task("run_a").unwrap().expect("claim 1");
589        assert_eq!(t1.0, 1);
590        s.complete_chunk_task("run_a", 1, 2, Some("part1.csv"))
591            .unwrap();
592
593        assert_eq!(s.count_chunk_tasks_not_completed("run_a").unwrap(), 0);
594        s.finalize_chunk_run_completed("run_a").unwrap();
595    }
596
597    #[test]
598    fn chunk_fail_then_retry_until_max() {
599        let (_dir, s) = store_on_disk();
600        s.create_chunk_run("run_b", "orders", "ab", 2).unwrap();
601        s.insert_chunk_tasks("run_b", &[(1, 2)]).unwrap();
602
603        let t = s.claim_next_chunk_task("run_b").unwrap().unwrap();
604        assert_eq!(t.0, 0);
605        s.fail_chunk_task("run_b", 0, "boom").unwrap();
606
607        let t2 = s.claim_next_chunk_task("run_b").unwrap().unwrap();
608        assert_eq!(t2.0, 0);
609        s.fail_chunk_task("run_b", 0, "again").unwrap();
610
611        assert!(s.claim_next_chunk_task("run_b").unwrap().is_none());
612        assert_eq!(s.count_chunk_tasks_not_completed("run_b").unwrap(), 1);
613    }
614
615    #[test]
616    fn reset_chunk_checkpoint_clears_runs() {
617        let (_dir, s) = store_on_disk();
618        s.create_chunk_run("r1", "e", "h", 1).unwrap();
619        s.insert_chunk_tasks("r1", &[(0, 1)]).unwrap();
620        assert_eq!(s.reset_chunk_checkpoint("e").unwrap(), 1);
621        assert!(s.find_in_progress_chunk_run("e").unwrap().is_none());
622    }
623
624    #[test]
625    fn list_in_progress_exports_orders_and_deduplicates() {
626        let (_dir, s) = store_on_disk();
627        assert!(
628            s.list_export_names_with_in_progress_chunk_runs()
629                .unwrap()
630                .is_empty()
631        );
632        s.create_chunk_run("r_old", "zebra", "h", 1).unwrap();
633        s.finalize_chunk_run_completed("r_old").unwrap();
634        s.create_chunk_run("r_a", "alpha", "h1", 1).unwrap();
635        s.create_chunk_run("r_b", "beta", "h2", 1).unwrap();
636        assert_eq!(
637            s.list_export_names_with_in_progress_chunk_runs().unwrap(),
638            vec!["alpha".to_string(), "beta".to_string()]
639        );
640    }
641}