Skip to main content

rivet/state/
progression.rs

1//! Committed / verified export progression (Epic G — ADR-0008).
2
3use chrono::{DateTime, Utc};
4
5use super::{StateConn, StateStore, pg_sql};
6use crate::error::Result;
7
8/// One export's progression record.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ExportProgression {
11    pub export_name: String,
12    pub committed: Option<Boundary>,
13    pub verified: Option<Boundary>,
14}
15
16/// A single boundary snapshot (committed or verified).
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct Boundary {
19    pub strategy: String,
20    pub run_id: Option<String>,
21    pub cursor: Option<String>,
22    pub chunk_index: Option<i64>,
23    pub at: DateTime<Utc>,
24}
25
26impl StateStore {
27    /// Record a successful incremental commit: `cursor` is the max value written
28    /// to destination in this run.
29    ///
30    /// Monotonic guard: the boundary only moves forward; a non-advancing commit
31    /// leaves the row untouched. The comparison happens in Rust (see
32    /// `cursor_advances`) because the column is TEXT and SQL `<` would order
33    /// numeric cursors lexicographically ("1000" < "999"), freezing the
34    /// boundary. Read-then-write is not atomic, but — like the single-statement
35    /// guard it replaces — this is regression protection, not a lock; rivet
36    /// never runs two commits for the same export concurrently.
37    pub fn record_committed_incremental(
38        &self,
39        export_name: &str,
40        cursor: &str,
41        run_id: &str,
42    ) -> Result<()> {
43        if let Some(stored) = self.committed_cursor(export_name)?
44            && !cursor_advances(&stored, cursor)
45        {
46            return Ok(());
47        }
48        let now = Utc::now().to_rfc3339();
49        let sql = "INSERT INTO export_progression (
50                export_name,
51                last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
52                last_committed_run_id, last_committed_at
53             ) VALUES (?1, 'incremental', ?2, NULL, ?3, ?4)
54             ON CONFLICT(export_name) DO UPDATE SET
55                last_committed_strategy = 'incremental',
56                last_committed_cursor = excluded.last_committed_cursor,
57                last_committed_chunk_index = NULL,
58                last_committed_run_id = excluded.last_committed_run_id,
59                last_committed_at = excluded.last_committed_at";
60        match &self.conn {
61            StateConn::Sqlite(c) => {
62                c.execute(sql, rusqlite::params![export_name, cursor, run_id, now])?;
63            }
64            StateConn::Postgres(client) => {
65                let mut c = client.borrow_mut();
66                c.execute(&pg_sql(sql), &[&export_name, &cursor, &run_id, &now])?;
67            }
68        }
69        Ok(())
70    }
71
72    /// Stored committed cursor for `export_name` — `None` when the export has
73    /// no progression row or its committed boundary is chunked (cursor NULL).
74    fn committed_cursor(&self, export_name: &str) -> Result<Option<String>> {
75        let sql = "SELECT last_committed_cursor FROM export_progression WHERE export_name = ?1";
76        match &self.conn {
77            StateConn::Sqlite(c) => {
78                match c.query_row(sql, [export_name], |r| r.get::<_, Option<String>>(0)) {
79                    Ok(v) => Ok(v),
80                    Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
81                    Err(e) => Err(e.into()),
82                }
83            }
84            StateConn::Postgres(client) => {
85                let mut c = client.borrow_mut();
86                match c.query_opt(&pg_sql(sql), &[&export_name])? {
87                    Some(row) => Ok(row.get::<_, Option<String>>(0)),
88                    None => Ok(None),
89                }
90            }
91        }
92    }
93
94    /// Record a successful chunked-run commit: the highest completed `chunk_index` for this run.
95    pub fn record_committed_chunked(
96        &self,
97        export_name: &str,
98        highest_chunk_index: i64,
99        run_id: &str,
100    ) -> Result<()> {
101        let now = Utc::now().to_rfc3339();
102        let sql = "INSERT INTO export_progression (
103                export_name,
104                last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
105                last_committed_run_id, last_committed_at
106             ) VALUES (?1, 'chunked', NULL, ?2, ?3, ?4)
107             ON CONFLICT(export_name) DO UPDATE SET
108                last_committed_strategy = 'chunked',
109                last_committed_cursor = NULL,
110                last_committed_chunk_index = excluded.last_committed_chunk_index,
111                last_committed_run_id = excluded.last_committed_run_id,
112                last_committed_at = excluded.last_committed_at";
113        match &self.conn {
114            StateConn::Sqlite(c) => {
115                c.execute(
116                    sql,
117                    rusqlite::params![export_name, highest_chunk_index, run_id, now],
118                )?;
119            }
120            StateConn::Postgres(client) => {
121                let mut c = client.borrow_mut();
122                c.execute(
123                    &pg_sql(sql),
124                    &[&export_name, &highest_chunk_index, &run_id, &now],
125                )?;
126            }
127        }
128        Ok(())
129    }
130
131    /// Record a successful reconcile: all partitions in `run_id` matched.
132    pub fn record_verified_chunked(
133        &self,
134        export_name: &str,
135        highest_chunk_index: i64,
136        run_id: &str,
137    ) -> Result<()> {
138        let now = Utc::now().to_rfc3339();
139        let sql = "INSERT INTO export_progression (
140                export_name,
141                last_verified_strategy, last_verified_cursor, last_verified_chunk_index,
142                last_verified_run_id, last_verified_at
143             ) VALUES (?1, 'chunked', NULL, ?2, ?3, ?4)
144             ON CONFLICT(export_name) DO UPDATE SET
145                last_verified_strategy = 'chunked',
146                last_verified_cursor = NULL,
147                last_verified_chunk_index = excluded.last_verified_chunk_index,
148                last_verified_run_id = excluded.last_verified_run_id,
149                last_verified_at = excluded.last_verified_at";
150        match &self.conn {
151            StateConn::Sqlite(c) => {
152                c.execute(
153                    sql,
154                    rusqlite::params![export_name, highest_chunk_index, run_id, now],
155                )?;
156            }
157            StateConn::Postgres(client) => {
158                let mut c = client.borrow_mut();
159                c.execute(
160                    &pg_sql(sql),
161                    &[&export_name, &highest_chunk_index, &run_id, &now],
162                )?;
163            }
164        }
165        Ok(())
166    }
167
168    pub fn get_progression(&self, export_name: &str) -> Result<ExportProgression> {
169        let sql = "SELECT
170                last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
171                last_committed_run_id, last_committed_at,
172                last_verified_strategy, last_verified_cursor, last_verified_chunk_index,
173                last_verified_run_id, last_verified_at
174             FROM export_progression WHERE export_name = ?1";
175        match &self.conn {
176            StateConn::Sqlite(c) => {
177                let mut stmt = c.prepare(sql)?;
178                let row = stmt.query_row([export_name], |r| {
179                    Ok((
180                        r.get::<_, Option<String>>(0)?,
181                        r.get::<_, Option<String>>(1)?,
182                        r.get::<_, Option<i64>>(2)?,
183                        r.get::<_, Option<String>>(3)?,
184                        r.get::<_, Option<String>>(4)?,
185                        r.get::<_, Option<String>>(5)?,
186                        r.get::<_, Option<String>>(6)?,
187                        r.get::<_, Option<i64>>(7)?,
188                        r.get::<_, Option<String>>(8)?,
189                        r.get::<_, Option<String>>(9)?,
190                    ))
191                });
192                let (c_str, c_cur, c_idx, c_run, c_at, v_str, v_cur, v_idx, v_run, v_at) = match row
193                {
194                    Ok(t) => t,
195                    Err(rusqlite::Error::QueryReturnedNoRows) => {
196                        return Ok(ExportProgression {
197                            export_name: export_name.to_string(),
198                            committed: None,
199                            verified: None,
200                        });
201                    }
202                    Err(e) => return Err(e.into()),
203                };
204                Ok(ExportProgression {
205                    export_name: export_name.to_string(),
206                    committed: boundary_from_row(c_str, c_cur, c_idx, c_run, c_at),
207                    verified: boundary_from_row(v_str, v_cur, v_idx, v_run, v_at),
208                })
209            }
210            StateConn::Postgres(client) => {
211                let mut c = client.borrow_mut();
212                match c.query_opt(&pg_sql(sql), &[&export_name])? {
213                    None => Ok(ExportProgression {
214                        export_name: export_name.to_string(),
215                        committed: None,
216                        verified: None,
217                    }),
218                    Some(row) => {
219                        let c_str: Option<String> = row.get(0);
220                        let c_cur: Option<String> = row.get(1);
221                        let c_idx: Option<i64> = row.get(2);
222                        let c_run: Option<String> = row.get(3);
223                        let c_at: Option<String> = row.get(4);
224                        let v_str: Option<String> = row.get(5);
225                        let v_cur: Option<String> = row.get(6);
226                        let v_idx: Option<i64> = row.get(7);
227                        let v_run: Option<String> = row.get(8);
228                        let v_at: Option<String> = row.get(9);
229                        Ok(ExportProgression {
230                            export_name: export_name.to_string(),
231                            committed: boundary_from_row(c_str, c_cur, c_idx, c_run, c_at),
232                            verified: boundary_from_row(v_str, v_cur, v_idx, v_run, v_at),
233                        })
234                    }
235                }
236            }
237        }
238    }
239
240    /// Delete the progression row for an export (committed + verified boundary).
241    ///
242    /// Called from `StateStore::reset` / `reset_chunk_checkpoint` so a reset
243    /// returns the export to a "never ran" state across *all* state tables. A
244    /// surviving `export_progression` row would make `rivet state progression`
245    /// report a stale committed boundary after `state show` is already empty —
246    /// a silent inconsistency that masks the reset.
247    ///
248    /// Returns the number of rows deleted (0 or 1).
249    pub fn delete_progression(&self, export_name: &str) -> Result<usize> {
250        let sql = "DELETE FROM export_progression WHERE export_name = ?1";
251        match &self.conn {
252            StateConn::Sqlite(c) => Ok(c.execute(sql, [export_name])?),
253            StateConn::Postgres(client) => {
254                let mut c = client.borrow_mut();
255                Ok(c.execute(&pg_sql(sql), &[&export_name])? as usize)
256            }
257        }
258    }
259
260    pub fn list_progression(&self) -> Result<Vec<ExportProgression>> {
261        match &self.conn {
262            StateConn::Sqlite(c) => {
263                let mut stmt =
264                    c.prepare("SELECT export_name FROM export_progression ORDER BY export_name")?;
265                let names: Vec<String> = stmt
266                    .query_map([], |r| r.get::<_, String>(0))?
267                    .collect::<std::result::Result<_, _>>()?;
268                drop(stmt);
269                let mut out = Vec::with_capacity(names.len());
270                for n in names {
271                    out.push(self.get_progression(&n)?);
272                }
273                Ok(out)
274            }
275            StateConn::Postgres(client) => {
276                // Single query to avoid nested borrow_mut() calls.
277                let mut c = client.borrow_mut();
278                let rows = c.query(
279                    "SELECT export_name,
280                            last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
281                            last_committed_run_id, last_committed_at,
282                            last_verified_strategy, last_verified_cursor, last_verified_chunk_index,
283                            last_verified_run_id, last_verified_at
284                     FROM export_progression ORDER BY export_name",
285                    &[],
286                )?;
287                Ok(rows
288                    .iter()
289                    .map(|row| {
290                        let export_name: String = row.get(0);
291                        let c_str: Option<String> = row.get(1);
292                        let c_cur: Option<String> = row.get(2);
293                        let c_idx: Option<i64> = row.get(3);
294                        let c_run: Option<String> = row.get(4);
295                        let c_at: Option<String> = row.get(5);
296                        let v_str: Option<String> = row.get(6);
297                        let v_cur: Option<String> = row.get(7);
298                        let v_idx: Option<i64> = row.get(8);
299                        let v_run: Option<String> = row.get(9);
300                        let v_at: Option<String> = row.get(10);
301                        ExportProgression {
302                            export_name,
303                            committed: boundary_from_row(c_str, c_cur, c_idx, c_run, c_at),
304                            verified: boundary_from_row(v_str, v_cur, v_idx, v_run, v_at),
305                        }
306                    })
307                    .collect())
308            }
309        }
310    }
311}
312
313/// True when `new` advances strictly past `stored` under cursor ordering.
314///
315/// Cursors are stored as TEXT but are often numeric (integer PKs, Float64
316/// columns stringified by the sink). Integers compare as i128 first — exact
317/// past f64's 2^53 mantissa — then floats as f64; when either side has no
318/// numeric reading (or the f64s are unordered, e.g. NaN) the guard falls back
319/// to byte-wise string order, which is correct for RFC3339 timestamps,
320/// `YYYY-MM-DD` dates, and UUIDv7 keys.
321fn cursor_advances(stored: &str, new: &str) -> bool {
322    if let (Ok(a), Ok(b)) = (stored.parse::<i128>(), new.parse::<i128>()) {
323        return b > a;
324    }
325    if let (Ok(a), Ok(b)) = (stored.parse::<f64>(), new.parse::<f64>())
326        && let Some(ord) = b.partial_cmp(&a)
327    {
328        return ord.is_gt();
329    }
330    new > stored
331}
332
333fn boundary_from_row(
334    strategy: Option<String>,
335    cursor: Option<String>,
336    chunk_index: Option<i64>,
337    run_id: Option<String>,
338    at: Option<String>,
339) -> Option<Boundary> {
340    let strategy = strategy?;
341    let at = at
342        .as_deref()
343        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
344        .map(|dt| dt.with_timezone(&Utc))?;
345    Some(Boundary {
346        strategy,
347        run_id,
348        cursor,
349        chunk_index,
350        at,
351    })
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    fn store() -> StateStore {
359        StateStore::open_in_memory().expect("in-memory store")
360    }
361
362    #[test]
363    fn progression_unknown_export_returns_empty() {
364        let s = store();
365        let p = s.get_progression("orders").unwrap();
366        assert!(p.committed.is_none());
367        assert!(p.verified.is_none());
368    }
369
370    #[test]
371    fn committed_incremental_records_cursor_and_run() {
372        let s = store();
373        s.record_committed_incremental("orders", "2024-06-01", "run-1")
374            .unwrap();
375        let b = s.get_progression("orders").unwrap().committed.unwrap();
376        assert_eq!(b.strategy, "incremental");
377        assert_eq!(b.cursor.as_deref(), Some("2024-06-01"));
378        assert_eq!(b.chunk_index, None);
379        assert_eq!(b.run_id.as_deref(), Some("run-1"));
380    }
381
382    #[test]
383    fn committed_cursor_does_not_regress_lexicographically() {
384        let s = store();
385        s.record_committed_incremental("orders", "2024-06-10", "run-10")
386            .unwrap();
387        s.record_committed_incremental("orders", "2024-01-01", "run-01")
388            .unwrap();
389        let b = s.get_progression("orders").unwrap().committed.unwrap();
390        assert_eq!(b.cursor.as_deref(), Some("2024-06-10"));
391    }
392
393    #[test]
394    fn committed_chunked_records_chunk_index() {
395        let s = store();
396        s.record_committed_chunked("orders", 41, "run-A").unwrap();
397        let b = s.get_progression("orders").unwrap().committed.unwrap();
398        assert_eq!(b.strategy, "chunked");
399        assert_eq!(b.chunk_index, Some(41));
400        assert_eq!(b.cursor, None);
401    }
402
403    #[test]
404    fn verified_and_committed_are_independent() {
405        let s = store();
406        s.record_committed_chunked("orders", 10, "run-A").unwrap();
407        s.record_verified_chunked("orders", 5, "run-A").unwrap();
408        let p = s.get_progression("orders").unwrap();
409        assert_eq!(p.committed.as_ref().unwrap().chunk_index, Some(10));
410        assert_eq!(p.verified.as_ref().unwrap().chunk_index, Some(5));
411    }
412
413    #[test]
414    fn switching_strategy_updates_committed_row() {
415        let s = store();
416        s.record_committed_incremental("orders", "2024-01-01", "inc-1")
417            .unwrap();
418        s.record_committed_chunked("orders", 7, "chunk-1").unwrap();
419        let b = s.get_progression("orders").unwrap().committed.unwrap();
420        assert_eq!(b.strategy, "chunked");
421        assert_eq!(b.chunk_index, Some(7));
422        assert_eq!(b.cursor, None);
423    }
424
425    // ROAST-RED progression-numeric-cursor: committed-boundary guard compares cursors
426    // with SQL string '<' on a TEXT column, so numeric cursors regress ("1000" < "999"
427    // lexicographically) and the boundary freezes at the shorter value.
428    // Asserts CORRECT behavior; expected to FAIL until the fix lands.
429    #[test]
430    fn roast_committed_numeric_cursor_advances_past_lexicographic_boundary() {
431        let s = store();
432        s.record_committed_incremental("orders", "999", "run-999")
433            .unwrap();
434        s.record_committed_incremental("orders", "1000", "run-1000")
435            .unwrap();
436        let b = s.get_progression("orders").unwrap().committed.unwrap();
437        assert_eq!(
438            b.cursor.as_deref(),
439            Some("1000"),
440            "numeric cursor must advance from 999 to 1000, but the lexicographic \
441             TEXT comparison froze the committed boundary at {:?}",
442            b.cursor
443        );
444    }
445
446    #[test]
447    fn committed_numeric_cursor_does_not_regress() {
448        let s = store();
449        s.record_committed_incremental("orders", "1000", "run-1000")
450            .unwrap();
451        s.record_committed_incremental("orders", "999", "run-999")
452            .unwrap();
453        let b = s.get_progression("orders").unwrap().committed.unwrap();
454        assert_eq!(b.cursor.as_deref(), Some("1000"));
455        assert_eq!(
456            b.run_id.as_deref(),
457            Some("run-1000"),
458            "non-advancing commit must leave the boundary row untouched"
459        );
460    }
461
462    #[test]
463    fn committed_float_cursor_advances_across_integer_boundary() {
464        // "10" < "9.9" lexicographically; the sink stringifies Float64 cursors
465        // as "10" (no trailing .0), so once the i128 parse fails on "9.9" the
466        // guard must compare as f64.
467        let s = store();
468        s.record_committed_incremental("scores", "9.9", "run-1")
469            .unwrap();
470        s.record_committed_incremental("scores", "10", "run-2")
471            .unwrap();
472        let b = s.get_progression("scores").unwrap().committed.unwrap();
473        assert_eq!(b.cursor.as_deref(), Some("10"));
474        s.record_committed_incremental("scores", "9.95", "run-3")
475            .unwrap();
476        let b = s.get_progression("scores").unwrap().committed.unwrap();
477        assert_eq!(b.cursor.as_deref(), Some("10"), "9.95 must not regress 10");
478    }
479
480    #[test]
481    fn committed_equal_cursor_is_a_no_op() {
482        let s = store();
483        s.record_committed_incremental("orders", "100", "run-1")
484            .unwrap();
485        s.record_committed_incremental("orders", "100", "run-2")
486            .unwrap();
487        let b = s.get_progression("orders").unwrap().committed.unwrap();
488        assert_eq!(b.cursor.as_deref(), Some("100"));
489        assert_eq!(
490            b.run_id.as_deref(),
491            Some("run-1"),
492            "an equal cursor does not advance; the row must stay untouched"
493        );
494    }
495
496    #[test]
497    fn committed_rfc3339_cursor_advances_and_does_not_regress() {
498        let s = store();
499        s.record_committed_incremental("orders", "2024-06-01T00:00:00Z", "run-1")
500            .unwrap();
501        s.record_committed_incremental("orders", "2024-06-02T00:00:00Z", "run-2")
502            .unwrap();
503        let b = s.get_progression("orders").unwrap().committed.unwrap();
504        assert_eq!(b.cursor.as_deref(), Some("2024-06-02T00:00:00Z"));
505        s.record_committed_incremental("orders", "2024-05-31T00:00:00Z", "run-3")
506            .unwrap();
507        let b = s.get_progression("orders").unwrap().committed.unwrap();
508        assert_eq!(b.cursor.as_deref(), Some("2024-06-02T00:00:00Z"));
509    }
510
511    #[test]
512    fn committed_mixed_kind_cursor_falls_back_to_string_order() {
513        // Old stored cursor is non-numeric, new one is numeric: there is no
514        // shared numeric domain, so the guard keeps plain string order —
515        // "123" < "abc" byte-wise, so the boundary holds.
516        let s = store();
517        s.record_committed_incremental("orders", "abc", "run-1")
518            .unwrap();
519        s.record_committed_incremental("orders", "123", "run-2")
520            .unwrap();
521        let b = s.get_progression("orders").unwrap().committed.unwrap();
522        assert_eq!(b.cursor.as_deref(), Some("abc"));
523    }
524
525    #[test]
526    fn committed_large_integer_cursor_compares_exactly() {
527        // 2^53 and 2^53 + 1 collapse to the same f64; the i128 path must
528        // compare them exactly so the boundary still advances by one.
529        let s = store();
530        s.record_committed_incremental("orders", "9007199254740992", "run-1")
531            .unwrap();
532        s.record_committed_incremental("orders", "9007199254740993", "run-2")
533            .unwrap();
534        let b = s.get_progression("orders").unwrap().committed.unwrap();
535        assert_eq!(b.cursor.as_deref(), Some("9007199254740993"));
536    }
537
538    #[test]
539    fn switching_chunked_to_incremental_writes_cursor() {
540        // The progression row exists but its committed cursor is NULL
541        // (chunked); an incremental commit must write unconditionally.
542        let s = store();
543        s.record_committed_chunked("orders", 7, "chunk-1").unwrap();
544        s.record_committed_incremental("orders", "100", "inc-1")
545            .unwrap();
546        let b = s.get_progression("orders").unwrap().committed.unwrap();
547        assert_eq!(b.strategy, "incremental");
548        assert_eq!(b.cursor.as_deref(), Some("100"));
549        assert_eq!(b.chunk_index, None);
550    }
551
552    #[test]
553    fn cursor_advances_orders_numbers_strings_and_nan() {
554        assert!(cursor_advances("999", "1000"));
555        assert!(!cursor_advances("1000", "999"));
556        assert!(!cursor_advances("100", "100"));
557        assert!(cursor_advances("9.9", "10"));
558        assert!(cursor_advances("-5", "-4"));
559        assert!(cursor_advances("2024-01-01", "2024-06-10"));
560        // NaN has no f64 order; fall back to string order instead of
561        // freezing the boundary forever.
562        assert!(cursor_advances("NaN", "inf"));
563        assert!(!cursor_advances("inf", "NaN"));
564    }
565
566    #[test]
567    fn delete_progression_removes_only_the_named_export() {
568        let s = store();
569        s.record_committed_incremental("orders", "100", "run-o")
570            .unwrap();
571        s.record_committed_incremental("users", "9", "run-u")
572            .unwrap();
573
574        assert_eq!(
575            s.delete_progression("orders").unwrap(),
576            1,
577            "deleting an existing progression row reports one row removed"
578        );
579        assert!(s.get_progression("orders").unwrap().committed.is_none());
580        assert!(
581            s.get_progression("users").unwrap().committed.is_some(),
582            "delete must be scoped to the named export"
583        );
584        assert_eq!(
585            s.delete_progression("orders").unwrap(),
586            0,
587            "deleting an absent progression row is a no-op (zero rows)"
588        );
589    }
590
591    #[test]
592    fn list_progression_sorted_by_name() {
593        let s = store();
594        s.record_committed_incremental("gamma", "3", "r").unwrap();
595        s.record_committed_incremental("alpha", "1", "r").unwrap();
596        s.record_committed_incremental("beta", "2", "r").unwrap();
597        let all = s.list_progression().unwrap();
598        let names: Vec<_> = all.iter().map(|p| p.export_name.as_str()).collect();
599        assert_eq!(names, vec!["alpha", "beta", "gamma"]);
600    }
601}