Skip to main content

rivet/state/
progression.rs

1//! Committed / verified export progression (Epic G — ADR-0008).
2
3use chrono::{DateTime, Utc};
4
5use super::{StateConn, StateStore, pg_sql};
6use crate::error::Result;
7
8/// One export's progression record.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ExportProgression {
11    pub export_name: String,
12    pub committed: Option<Boundary>,
13    pub verified: Option<Boundary>,
14}
15
16/// A single boundary snapshot (committed or verified).
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct Boundary {
19    pub strategy: String,
20    pub run_id: Option<String>,
21    pub cursor: Option<String>,
22    pub chunk_index: Option<i64>,
23    pub at: DateTime<Utc>,
24}
25
26impl StateStore {
27    /// Record a successful incremental commit: `cursor` is the max value written
28    /// to destination in this run.
29    pub fn record_committed_incremental(
30        &self,
31        export_name: &str,
32        cursor: &str,
33        run_id: &str,
34    ) -> Result<()> {
35        let now = Utc::now().to_rfc3339();
36        let sql = "INSERT INTO export_progression (
37                export_name,
38                last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
39                last_committed_run_id, last_committed_at
40             ) VALUES (?1, 'incremental', ?2, NULL, ?3, ?4)
41             ON CONFLICT(export_name) DO UPDATE SET
42                last_committed_strategy = 'incremental',
43                last_committed_cursor = CASE
44                    WHEN export_progression.last_committed_cursor IS NULL
45                         OR export_progression.last_committed_cursor < excluded.last_committed_cursor
46                    THEN excluded.last_committed_cursor
47                    ELSE export_progression.last_committed_cursor END,
48                last_committed_chunk_index = NULL,
49                last_committed_run_id = excluded.last_committed_run_id,
50                last_committed_at = excluded.last_committed_at";
51        match &self.conn {
52            StateConn::Sqlite(c) => {
53                c.execute(sql, rusqlite::params![export_name, cursor, run_id, now])?;
54            }
55            StateConn::Postgres(client) => {
56                let mut c = client.borrow_mut();
57                c.execute(&pg_sql(sql), &[&export_name, &cursor, &run_id, &now])?;
58            }
59        }
60        Ok(())
61    }
62
63    /// Record a successful chunked-run commit: the highest completed `chunk_index` for this run.
64    pub fn record_committed_chunked(
65        &self,
66        export_name: &str,
67        highest_chunk_index: i64,
68        run_id: &str,
69    ) -> Result<()> {
70        let now = Utc::now().to_rfc3339();
71        let sql = "INSERT INTO export_progression (
72                export_name,
73                last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
74                last_committed_run_id, last_committed_at
75             ) VALUES (?1, 'chunked', NULL, ?2, ?3, ?4)
76             ON CONFLICT(export_name) DO UPDATE SET
77                last_committed_strategy = 'chunked',
78                last_committed_cursor = NULL,
79                last_committed_chunk_index = excluded.last_committed_chunk_index,
80                last_committed_run_id = excluded.last_committed_run_id,
81                last_committed_at = excluded.last_committed_at";
82        match &self.conn {
83            StateConn::Sqlite(c) => {
84                c.execute(
85                    sql,
86                    rusqlite::params![export_name, highest_chunk_index, run_id, now],
87                )?;
88            }
89            StateConn::Postgres(client) => {
90                let mut c = client.borrow_mut();
91                c.execute(
92                    &pg_sql(sql),
93                    &[&export_name, &highest_chunk_index, &run_id, &now],
94                )?;
95            }
96        }
97        Ok(())
98    }
99
100    /// Record a successful reconcile: all partitions in `run_id` matched.
101    pub fn record_verified_chunked(
102        &self,
103        export_name: &str,
104        highest_chunk_index: i64,
105        run_id: &str,
106    ) -> Result<()> {
107        let now = Utc::now().to_rfc3339();
108        let sql = "INSERT INTO export_progression (
109                export_name,
110                last_verified_strategy, last_verified_cursor, last_verified_chunk_index,
111                last_verified_run_id, last_verified_at
112             ) VALUES (?1, 'chunked', NULL, ?2, ?3, ?4)
113             ON CONFLICT(export_name) DO UPDATE SET
114                last_verified_strategy = 'chunked',
115                last_verified_cursor = NULL,
116                last_verified_chunk_index = excluded.last_verified_chunk_index,
117                last_verified_run_id = excluded.last_verified_run_id,
118                last_verified_at = excluded.last_verified_at";
119        match &self.conn {
120            StateConn::Sqlite(c) => {
121                c.execute(
122                    sql,
123                    rusqlite::params![export_name, highest_chunk_index, run_id, now],
124                )?;
125            }
126            StateConn::Postgres(client) => {
127                let mut c = client.borrow_mut();
128                c.execute(
129                    &pg_sql(sql),
130                    &[&export_name, &highest_chunk_index, &run_id, &now],
131                )?;
132            }
133        }
134        Ok(())
135    }
136
137    pub fn get_progression(&self, export_name: &str) -> Result<ExportProgression> {
138        let sql = "SELECT
139                last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
140                last_committed_run_id, last_committed_at,
141                last_verified_strategy, last_verified_cursor, last_verified_chunk_index,
142                last_verified_run_id, last_verified_at
143             FROM export_progression WHERE export_name = ?1";
144        match &self.conn {
145            StateConn::Sqlite(c) => {
146                let mut stmt = c.prepare(sql)?;
147                let row = stmt.query_row([export_name], |r| {
148                    Ok((
149                        r.get::<_, Option<String>>(0)?,
150                        r.get::<_, Option<String>>(1)?,
151                        r.get::<_, Option<i64>>(2)?,
152                        r.get::<_, Option<String>>(3)?,
153                        r.get::<_, Option<String>>(4)?,
154                        r.get::<_, Option<String>>(5)?,
155                        r.get::<_, Option<String>>(6)?,
156                        r.get::<_, Option<i64>>(7)?,
157                        r.get::<_, Option<String>>(8)?,
158                        r.get::<_, Option<String>>(9)?,
159                    ))
160                });
161                let (c_str, c_cur, c_idx, c_run, c_at, v_str, v_cur, v_idx, v_run, v_at) = match row
162                {
163                    Ok(t) => t,
164                    Err(rusqlite::Error::QueryReturnedNoRows) => {
165                        return Ok(ExportProgression {
166                            export_name: export_name.to_string(),
167                            committed: None,
168                            verified: None,
169                        });
170                    }
171                    Err(e) => return Err(e.into()),
172                };
173                Ok(ExportProgression {
174                    export_name: export_name.to_string(),
175                    committed: boundary_from_row(c_str, c_cur, c_idx, c_run, c_at),
176                    verified: boundary_from_row(v_str, v_cur, v_idx, v_run, v_at),
177                })
178            }
179            StateConn::Postgres(client) => {
180                let mut c = client.borrow_mut();
181                match c.query_opt(&pg_sql(sql), &[&export_name])? {
182                    None => Ok(ExportProgression {
183                        export_name: export_name.to_string(),
184                        committed: None,
185                        verified: None,
186                    }),
187                    Some(row) => {
188                        let c_str: Option<String> = row.get(0);
189                        let c_cur: Option<String> = row.get(1);
190                        let c_idx: Option<i64> = row.get(2);
191                        let c_run: Option<String> = row.get(3);
192                        let c_at: Option<String> = row.get(4);
193                        let v_str: Option<String> = row.get(5);
194                        let v_cur: Option<String> = row.get(6);
195                        let v_idx: Option<i64> = row.get(7);
196                        let v_run: Option<String> = row.get(8);
197                        let v_at: Option<String> = row.get(9);
198                        Ok(ExportProgression {
199                            export_name: export_name.to_string(),
200                            committed: boundary_from_row(c_str, c_cur, c_idx, c_run, c_at),
201                            verified: boundary_from_row(v_str, v_cur, v_idx, v_run, v_at),
202                        })
203                    }
204                }
205            }
206        }
207    }
208
209    pub fn list_progression(&self) -> Result<Vec<ExportProgression>> {
210        match &self.conn {
211            StateConn::Sqlite(c) => {
212                let mut stmt =
213                    c.prepare("SELECT export_name FROM export_progression ORDER BY export_name")?;
214                let names: Vec<String> = stmt
215                    .query_map([], |r| r.get::<_, String>(0))?
216                    .collect::<std::result::Result<_, _>>()?;
217                drop(stmt);
218                let mut out = Vec::with_capacity(names.len());
219                for n in names {
220                    out.push(self.get_progression(&n)?);
221                }
222                Ok(out)
223            }
224            StateConn::Postgres(client) => {
225                // Single query to avoid nested borrow_mut() calls.
226                let mut c = client.borrow_mut();
227                let rows = c.query(
228                    "SELECT export_name,
229                            last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
230                            last_committed_run_id, last_committed_at,
231                            last_verified_strategy, last_verified_cursor, last_verified_chunk_index,
232                            last_verified_run_id, last_verified_at
233                     FROM export_progression ORDER BY export_name",
234                    &[],
235                )?;
236                Ok(rows
237                    .iter()
238                    .map(|row| {
239                        let export_name: String = row.get(0);
240                        let c_str: Option<String> = row.get(1);
241                        let c_cur: Option<String> = row.get(2);
242                        let c_idx: Option<i64> = row.get(3);
243                        let c_run: Option<String> = row.get(4);
244                        let c_at: Option<String> = row.get(5);
245                        let v_str: Option<String> = row.get(6);
246                        let v_cur: Option<String> = row.get(7);
247                        let v_idx: Option<i64> = row.get(8);
248                        let v_run: Option<String> = row.get(9);
249                        let v_at: Option<String> = row.get(10);
250                        ExportProgression {
251                            export_name,
252                            committed: boundary_from_row(c_str, c_cur, c_idx, c_run, c_at),
253                            verified: boundary_from_row(v_str, v_cur, v_idx, v_run, v_at),
254                        }
255                    })
256                    .collect())
257            }
258        }
259    }
260}
261
262fn boundary_from_row(
263    strategy: Option<String>,
264    cursor: Option<String>,
265    chunk_index: Option<i64>,
266    run_id: Option<String>,
267    at: Option<String>,
268) -> Option<Boundary> {
269    let strategy = strategy?;
270    let at = at
271        .as_deref()
272        .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
273        .map(|dt| dt.with_timezone(&Utc))?;
274    Some(Boundary {
275        strategy,
276        run_id,
277        cursor,
278        chunk_index,
279        at,
280    })
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286
287    fn store() -> StateStore {
288        StateStore::open_in_memory().expect("in-memory store")
289    }
290
291    #[test]
292    fn progression_unknown_export_returns_empty() {
293        let s = store();
294        let p = s.get_progression("orders").unwrap();
295        assert!(p.committed.is_none());
296        assert!(p.verified.is_none());
297    }
298
299    #[test]
300    fn committed_incremental_records_cursor_and_run() {
301        let s = store();
302        s.record_committed_incremental("orders", "2024-06-01", "run-1")
303            .unwrap();
304        let b = s.get_progression("orders").unwrap().committed.unwrap();
305        assert_eq!(b.strategy, "incremental");
306        assert_eq!(b.cursor.as_deref(), Some("2024-06-01"));
307        assert_eq!(b.chunk_index, None);
308        assert_eq!(b.run_id.as_deref(), Some("run-1"));
309    }
310
311    #[test]
312    fn committed_cursor_does_not_regress_lexicographically() {
313        let s = store();
314        s.record_committed_incremental("orders", "2024-06-10", "run-10")
315            .unwrap();
316        s.record_committed_incremental("orders", "2024-01-01", "run-01")
317            .unwrap();
318        let b = s.get_progression("orders").unwrap().committed.unwrap();
319        assert_eq!(b.cursor.as_deref(), Some("2024-06-10"));
320    }
321
322    #[test]
323    fn committed_chunked_records_chunk_index() {
324        let s = store();
325        s.record_committed_chunked("orders", 41, "run-A").unwrap();
326        let b = s.get_progression("orders").unwrap().committed.unwrap();
327        assert_eq!(b.strategy, "chunked");
328        assert_eq!(b.chunk_index, Some(41));
329        assert_eq!(b.cursor, None);
330    }
331
332    #[test]
333    fn verified_and_committed_are_independent() {
334        let s = store();
335        s.record_committed_chunked("orders", 10, "run-A").unwrap();
336        s.record_verified_chunked("orders", 5, "run-A").unwrap();
337        let p = s.get_progression("orders").unwrap();
338        assert_eq!(p.committed.as_ref().unwrap().chunk_index, Some(10));
339        assert_eq!(p.verified.as_ref().unwrap().chunk_index, Some(5));
340    }
341
342    #[test]
343    fn switching_strategy_updates_committed_row() {
344        let s = store();
345        s.record_committed_incremental("orders", "2024-01-01", "inc-1")
346            .unwrap();
347        s.record_committed_chunked("orders", 7, "chunk-1").unwrap();
348        let b = s.get_progression("orders").unwrap().committed.unwrap();
349        assert_eq!(b.strategy, "chunked");
350        assert_eq!(b.chunk_index, Some(7));
351        assert_eq!(b.cursor, None);
352    }
353
354    #[test]
355    fn list_progression_sorted_by_name() {
356        let s = store();
357        s.record_committed_incremental("gamma", "3", "r").unwrap();
358        s.record_committed_incremental("alpha", "1", "r").unwrap();
359        s.record_committed_incremental("beta", "2", "r").unwrap();
360        let all = s.list_progression().unwrap();
361        let names: Vec<_> = all.iter().map(|p| p.export_name.as_str()).collect();
362        assert_eq!(names, vec!["alpha", "beta", "gamma"]);
363    }
364}