Skip to main content

rivet/state/
cursor.rs

1use crate::error::Result;
2use crate::types::CursorState;
3
4use super::{StateConn, StateStore, pg_sql};
5
6/// Incremental cursor store — reads and writes `export_state`.
7///
8/// The cursor records the last extracted value so incremental runs can pick up
9/// where the previous run left off.  Invariant I3 (Write Before Cursor) governs
10/// the ordering of cursor updates relative to destination writes.
11impl StateStore {
12    pub fn get(&self, export_name: &str) -> Result<CursorState> {
13        match &self.conn {
14            StateConn::Sqlite(c) => {
15                let mut stmt = c.prepare(
16                    "SELECT last_cursor_value, last_run_at FROM export_state WHERE export_name = ?1",
17                )?;
18                let result = stmt.query_row([export_name], |row| {
19                    Ok(CursorState {
20                        export_name: export_name.to_string(),
21                        last_cursor_value: row.get(0)?,
22                        last_run_at: row.get(1)?,
23                    })
24                });
25                match result {
26                    Ok(state) => Ok(state),
27                    Err(rusqlite::Error::QueryReturnedNoRows) => Ok(CursorState {
28                        export_name: export_name.to_string(),
29                        last_cursor_value: None,
30                        last_run_at: None,
31                    }),
32                    Err(e) => Err(e.into()),
33                }
34            }
35            StateConn::Postgres(client) => {
36                let mut c = client.borrow_mut();
37                match c.query_opt(
38                    "SELECT last_cursor_value, last_run_at FROM export_state WHERE export_name = $1",
39                    &[&export_name],
40                )? {
41                    Some(row) => Ok(CursorState {
42                        export_name: export_name.to_string(),
43                        last_cursor_value: row.get(0),
44                        last_run_at: row.get(1),
45                    }),
46                    None => Ok(CursorState {
47                        export_name: export_name.to_string(),
48                        last_cursor_value: None,
49                        last_run_at: None,
50                    }),
51                }
52            }
53        }
54    }
55
56    pub fn update(&self, export_name: &str, cursor_value: &str) -> Result<()> {
57        let now = chrono::Utc::now().to_rfc3339();
58        let sql = "INSERT INTO export_state (export_name, last_cursor_value, last_run_at)
59             VALUES (?1, ?2, ?3)
60             ON CONFLICT(export_name) DO UPDATE SET
61                last_cursor_value = excluded.last_cursor_value,
62                last_run_at = excluded.last_run_at";
63        match &self.conn {
64            StateConn::Sqlite(c) => {
65                c.execute(sql, rusqlite::params![export_name, cursor_value, now])?;
66            }
67            StateConn::Postgres(client) => {
68                let mut c = client.borrow_mut();
69                c.execute(&pg_sql(sql), &[&export_name, &cursor_value, &now])?;
70            }
71        }
72        Ok(())
73    }
74
75    pub fn reset(&self, export_name: &str) -> Result<()> {
76        let sql = "DELETE FROM export_state WHERE export_name = ?1";
77        match &self.conn {
78            StateConn::Sqlite(c) => {
79                c.execute(sql, [export_name])?;
80            }
81            StateConn::Postgres(client) => {
82                let mut c = client.borrow_mut();
83                c.execute(&pg_sql(sql), &[&export_name])?;
84            }
85        }
86        Ok(())
87    }
88
89    pub fn list_all(&self) -> Result<Vec<CursorState>> {
90        let sql = "SELECT export_name, last_cursor_value, last_run_at FROM export_state ORDER BY export_name";
91        match &self.conn {
92            StateConn::Sqlite(c) => {
93                let mut stmt = c.prepare(sql)?;
94                let rows = stmt.query_map([], |row| {
95                    Ok(CursorState {
96                        export_name: row.get(0)?,
97                        last_cursor_value: row.get(1)?,
98                        last_run_at: row.get(2)?,
99                    })
100                })?;
101                rows.collect::<std::result::Result<Vec<_>, _>>()
102                    .map_err(Into::into)
103            }
104            StateConn::Postgres(client) => {
105                let mut c = client.borrow_mut();
106                let rows = c.query(sql, &[])?;
107                Ok(rows
108                    .iter()
109                    .map(|row| CursorState {
110                        export_name: row.get(0),
111                        last_cursor_value: row.get(1),
112                        last_run_at: row.get(2),
113                    })
114                    .collect())
115            }
116        }
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    fn store() -> StateStore {
125        StateStore::open_in_memory().expect("in-memory store")
126    }
127
128    #[test]
129    fn get_unknown_returns_empty_state() {
130        let s = store();
131        let state = s.get("nonexistent").unwrap();
132        assert!(state.last_cursor_value.is_none());
133    }
134
135    #[test]
136    fn update_then_get_returns_stored_cursor() {
137        let s = store();
138        s.update("orders", "2024-06-01").unwrap();
139        assert_eq!(
140            s.get("orders").unwrap().last_cursor_value.as_deref(),
141            Some("2024-06-01")
142        );
143    }
144
145    #[test]
146    fn update_overwrites_previous_cursor() {
147        let s = store();
148        s.update("orders", "100").unwrap();
149        s.update("orders", "200").unwrap();
150        assert_eq!(
151            s.get("orders").unwrap().last_cursor_value.as_deref(),
152            Some("200")
153        );
154    }
155
156    #[test]
157    fn reset_clears_cursor_state() {
158        let s = store();
159        s.update("orders", "100").unwrap();
160        s.reset("orders").unwrap();
161        assert!(s.get("orders").unwrap().last_cursor_value.is_none());
162    }
163
164    #[test]
165    fn list_all_on_empty_store_returns_empty() {
166        assert!(store().list_all().unwrap().is_empty());
167    }
168
169    #[test]
170    fn list_all_returns_entries_sorted_by_name() {
171        let s = store();
172        s.update("gamma", "3").unwrap();
173        s.update("alpha", "1").unwrap();
174        s.update("beta", "2").unwrap();
175        let all = s.list_all().unwrap();
176        assert_eq!(all[0].export_name, "alpha");
177        assert_eq!(all[2].export_name, "gamma");
178    }
179
180    // ─── Cursor round-trip / monotonicity (QA backlog Task 3.1) ─────────────
181    //
182    // ADR-0001 I3 makes monotonicity a pipeline responsibility, not a storage
183    // one.  These tests pin the *value-preservation* contract on the state
184    // side — the subset the pipeline relies on when reading the stored cursor
185    // back on resume.
186
187    /// Duplicate cursor values across runs are common when the cursor column
188    /// is a low-precision timestamp with ties.  The store must return each
189    /// written value verbatim.
190    #[test]
191    fn duplicate_cursor_values_are_stored_as_written() {
192        let s = store();
193        s.update("orders", "2024-06-01T00:00:00Z").unwrap();
194        s.update("orders", "2024-06-01T00:00:00Z").unwrap();
195        assert_eq!(
196            s.get("orders").unwrap().last_cursor_value.as_deref(),
197            Some("2024-06-01T00:00:00Z")
198        );
199    }
200
201    /// Microsecond/nanosecond precision must not be rounded or truncated on
202    /// round-trip — otherwise the pipeline's strict-greater-than boundary
203    /// check would re-export rows on the microsecond edge.
204    #[test]
205    fn high_precision_timestamp_is_preserved_byte_for_byte() {
206        let s = store();
207        let ts = "2024-06-01T12:34:56.123456789+02:00";
208        s.update("events", ts).unwrap();
209        assert_eq!(
210            s.get("events").unwrap().last_cursor_value.as_deref(),
211            Some(ts)
212        );
213    }
214
215    /// Cursor values can be arbitrary UTF-8: UUID v7, version tokens,
216    /// Cyrillic names, multiline strings, the empty string.
217    #[test]
218    fn unicode_and_binary_like_cursor_values_round_trip() {
219        let s = store();
220        let values = [
221            "2024-06-01",
222            "018f1c0b-7a34-7b54-8e16-1c5a9b3f1c2d", // UUID v7
223            "ελληνικά 🚀 cursor",
224            "v\n\t with whitespace",
225            "",
226        ];
227        for v in values {
228            s.update("t", v).unwrap();
229            assert_eq!(
230                s.get("t").unwrap().last_cursor_value.as_deref(),
231                Some(v),
232                "cursor value {v:?} must round-trip exactly"
233            );
234        }
235    }
236
237    /// Resume-from-zero tooling depends on `reset` producing a state
238    /// indistinguishable from "never ran": both cursor and last_run_at gone.
239    #[test]
240    fn reset_clears_cursor_state_completely() {
241        let s = store();
242        s.update("orders", "2024-06-01").unwrap();
243        s.reset("orders").unwrap();
244        let after = s.get("orders").unwrap();
245        assert!(after.last_cursor_value.is_none());
246        assert!(
247            after.last_run_at.is_none(),
248            "reset must clear last_run_at as well"
249        );
250    }
251}