1use crate::error::Result;
2use crate::types::CursorState;
3
4use super::{StateConn, StateStore, pg_sql};
5
6impl StateStore {
12 pub fn get(&self, export_name: &str) -> Result<CursorState> {
13 match &self.conn {
14 StateConn::Sqlite(c) => {
15 let mut stmt = c.prepare(
16 "SELECT last_cursor_value, last_run_at FROM export_state WHERE export_name = ?1",
17 )?;
18 let result = stmt.query_row([export_name], |row| {
19 Ok(CursorState {
20 export_name: export_name.to_string(),
21 last_cursor_value: row.get(0)?,
22 last_run_at: row.get(1)?,
23 })
24 });
25 match result {
26 Ok(state) => Ok(state),
27 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(CursorState {
28 export_name: export_name.to_string(),
29 last_cursor_value: None,
30 last_run_at: None,
31 }),
32 Err(e) => Err(e.into()),
33 }
34 }
35 StateConn::Postgres(client) => {
36 let mut c = client.borrow_mut();
37 match c.query_opt(
38 "SELECT last_cursor_value, last_run_at FROM export_state WHERE export_name = $1",
39 &[&export_name],
40 )? {
41 Some(row) => Ok(CursorState {
42 export_name: export_name.to_string(),
43 last_cursor_value: row.get(0),
44 last_run_at: row.get(1),
45 }),
46 None => Ok(CursorState {
47 export_name: export_name.to_string(),
48 last_cursor_value: None,
49 last_run_at: None,
50 }),
51 }
52 }
53 }
54 }
55
56 pub fn update(&self, export_name: &str, cursor_value: &str) -> Result<()> {
57 let now = chrono::Utc::now().to_rfc3339();
58 let sql = "INSERT INTO export_state (export_name, last_cursor_value, last_run_at)
59 VALUES (?1, ?2, ?3)
60 ON CONFLICT(export_name) DO UPDATE SET
61 last_cursor_value = excluded.last_cursor_value,
62 last_run_at = excluded.last_run_at";
63 match &self.conn {
64 StateConn::Sqlite(c) => {
65 c.execute(sql, rusqlite::params![export_name, cursor_value, now])?;
66 }
67 StateConn::Postgres(client) => {
68 let mut c = client.borrow_mut();
69 c.execute(&pg_sql(sql), &[&export_name, &cursor_value, &now])?;
70 }
71 }
72 Ok(())
73 }
74
75 pub fn reset(&self, export_name: &str) -> Result<()> {
82 let sql = "DELETE FROM export_state WHERE export_name = ?1";
83 match &self.conn {
84 StateConn::Sqlite(c) => {
85 c.execute(sql, [export_name])?;
86 }
87 StateConn::Postgres(client) => {
88 let mut c = client.borrow_mut();
89 c.execute(&pg_sql(sql), &[&export_name])?;
90 }
91 }
92 self.delete_progression(export_name)?;
93 Ok(())
94 }
95
96 pub fn list_all(&self) -> Result<Vec<CursorState>> {
97 let sql = "SELECT export_name, last_cursor_value, last_run_at FROM export_state ORDER BY export_name";
98 match &self.conn {
99 StateConn::Sqlite(c) => {
100 let mut stmt = c.prepare(sql)?;
101 let rows = stmt.query_map([], |row| {
102 Ok(CursorState {
103 export_name: row.get(0)?,
104 last_cursor_value: row.get(1)?,
105 last_run_at: row.get(2)?,
106 })
107 })?;
108 rows.collect::<std::result::Result<Vec<_>, _>>()
109 .map_err(Into::into)
110 }
111 StateConn::Postgres(client) => {
112 let mut c = client.borrow_mut();
113 let rows = c.query(sql, &[])?;
114 Ok(rows
115 .iter()
116 .map(|row| CursorState {
117 export_name: row.get(0),
118 last_cursor_value: row.get(1),
119 last_run_at: row.get(2),
120 })
121 .collect())
122 }
123 }
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 fn store() -> StateStore {
132 StateStore::open_in_memory().expect("in-memory store")
133 }
134
135 #[test]
136 fn get_unknown_returns_empty_state() {
137 let s = store();
138 let state = s.get("nonexistent").unwrap();
139 assert!(state.last_cursor_value.is_none());
140 }
141
142 #[test]
143 fn update_then_get_returns_stored_cursor() {
144 let s = store();
145 s.update("orders", "2024-06-01").unwrap();
146 assert_eq!(
147 s.get("orders").unwrap().last_cursor_value.as_deref(),
148 Some("2024-06-01")
149 );
150 }
151
152 #[test]
153 fn update_overwrites_previous_cursor() {
154 let s = store();
155 s.update("orders", "100").unwrap();
156 s.update("orders", "200").unwrap();
157 assert_eq!(
158 s.get("orders").unwrap().last_cursor_value.as_deref(),
159 Some("200")
160 );
161 }
162
163 #[test]
164 fn reset_clears_cursor_state() {
165 let s = store();
166 s.update("orders", "100").unwrap();
167 s.reset("orders").unwrap();
168 assert!(s.get("orders").unwrap().last_cursor_value.is_none());
169 }
170
171 #[test]
172 fn list_all_on_empty_store_returns_empty() {
173 assert!(store().list_all().unwrap().is_empty());
174 }
175
176 #[test]
177 fn list_all_returns_entries_sorted_by_name() {
178 let s = store();
179 s.update("gamma", "3").unwrap();
180 s.update("alpha", "1").unwrap();
181 s.update("beta", "2").unwrap();
182 let all = s.list_all().unwrap();
183 assert_eq!(all[0].export_name, "alpha");
184 assert_eq!(all[2].export_name, "gamma");
185 }
186
187 #[test]
198 fn duplicate_cursor_values_are_stored_as_written() {
199 let s = store();
200 s.update("orders", "2024-06-01T00:00:00Z").unwrap();
201 s.update("orders", "2024-06-01T00:00:00Z").unwrap();
202 assert_eq!(
203 s.get("orders").unwrap().last_cursor_value.as_deref(),
204 Some("2024-06-01T00:00:00Z")
205 );
206 }
207
208 #[test]
212 fn high_precision_timestamp_is_preserved_byte_for_byte() {
213 let s = store();
214 let ts = "2024-06-01T12:34:56.123456789+02:00";
215 s.update("events", ts).unwrap();
216 assert_eq!(
217 s.get("events").unwrap().last_cursor_value.as_deref(),
218 Some(ts)
219 );
220 }
221
222 #[test]
225 fn unicode_and_binary_like_cursor_values_round_trip() {
226 let s = store();
227 let values = [
228 "2024-06-01",
229 "018f1c0b-7a34-7b54-8e16-1c5a9b3f1c2d", "ελληνικά 🚀 cursor",
231 "v\n\t with whitespace",
232 "",
233 ];
234 for v in values {
235 s.update("t", v).unwrap();
236 assert_eq!(
237 s.get("t").unwrap().last_cursor_value.as_deref(),
238 Some(v),
239 "cursor value {v:?} must round-trip exactly"
240 );
241 }
242 }
243
244 #[test]
247 fn reset_clears_cursor_state_completely() {
248 let s = store();
249 s.update("orders", "2024-06-01").unwrap();
250 s.reset("orders").unwrap();
251 let after = s.get("orders").unwrap();
252 assert!(after.last_cursor_value.is_none());
253 assert!(
254 after.last_run_at.is_none(),
255 "reset must clear last_run_at as well"
256 );
257 }
258
259 #[test]
263 fn reset_clears_committed_progression() {
264 let s = store();
265 s.update("orders", "100").unwrap();
266 s.record_committed_incremental("orders", "100", "run-1")
267 .unwrap();
268 s.record_committed_incremental("users", "9", "run-u")
270 .unwrap();
271
272 s.reset("orders").unwrap();
273
274 let p = s.get_progression("orders").unwrap();
275 assert!(
276 p.committed.is_none() && p.verified.is_none(),
277 "reset must clear the export's committed/verified boundary"
278 );
279 assert!(
280 s.get_progression("users").unwrap().committed.is_some(),
281 "reset must not touch another export's progression"
282 );
283 }
284}