1use chrono::{DateTime, Utc};
4
5use super::{StateConn, StateStore, pg_sql};
6use crate::error::Result;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ExportProgression {
11 pub export_name: String,
12 pub committed: Option<Boundary>,
13 pub verified: Option<Boundary>,
14}
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct Boundary {
19 pub strategy: String,
20 pub run_id: Option<String>,
21 pub cursor: Option<String>,
22 pub chunk_index: Option<i64>,
23 pub at: DateTime<Utc>,
24}
25
26impl StateStore {
27 pub fn record_committed_incremental(
38 &self,
39 export_name: &str,
40 cursor: &str,
41 run_id: &str,
42 ) -> Result<()> {
43 if let Some(stored) = self.committed_cursor(export_name)?
44 && !cursor_advances(&stored, cursor)
45 {
46 return Ok(());
47 }
48 let now = Utc::now().to_rfc3339();
49 let sql = "INSERT INTO export_progression (
50 export_name,
51 last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
52 last_committed_run_id, last_committed_at
53 ) VALUES (?1, 'incremental', ?2, NULL, ?3, ?4)
54 ON CONFLICT(export_name) DO UPDATE SET
55 last_committed_strategy = 'incremental',
56 last_committed_cursor = excluded.last_committed_cursor,
57 last_committed_chunk_index = NULL,
58 last_committed_run_id = excluded.last_committed_run_id,
59 last_committed_at = excluded.last_committed_at";
60 match &self.conn {
61 StateConn::Sqlite(c) => {
62 c.execute(sql, rusqlite::params![export_name, cursor, run_id, now])?;
63 }
64 StateConn::Postgres(client) => {
65 let mut c = client.borrow_mut();
66 c.execute(&pg_sql(sql), &[&export_name, &cursor, &run_id, &now])?;
67 }
68 }
69 Ok(())
70 }
71
72 fn committed_cursor(&self, export_name: &str) -> Result<Option<String>> {
75 let sql = "SELECT last_committed_cursor FROM export_progression WHERE export_name = ?1";
76 match &self.conn {
77 StateConn::Sqlite(c) => {
78 match c.query_row(sql, [export_name], |r| r.get::<_, Option<String>>(0)) {
79 Ok(v) => Ok(v),
80 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
81 Err(e) => Err(e.into()),
82 }
83 }
84 StateConn::Postgres(client) => {
85 let mut c = client.borrow_mut();
86 match c.query_opt(&pg_sql(sql), &[&export_name])? {
87 Some(row) => Ok(row.get::<_, Option<String>>(0)),
88 None => Ok(None),
89 }
90 }
91 }
92 }
93
94 pub fn record_committed_chunked(
96 &self,
97 export_name: &str,
98 highest_chunk_index: i64,
99 run_id: &str,
100 ) -> Result<()> {
101 let now = Utc::now().to_rfc3339();
102 let sql = "INSERT INTO export_progression (
103 export_name,
104 last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
105 last_committed_run_id, last_committed_at
106 ) VALUES (?1, 'chunked', NULL, ?2, ?3, ?4)
107 ON CONFLICT(export_name) DO UPDATE SET
108 last_committed_strategy = 'chunked',
109 last_committed_cursor = NULL,
110 last_committed_chunk_index = excluded.last_committed_chunk_index,
111 last_committed_run_id = excluded.last_committed_run_id,
112 last_committed_at = excluded.last_committed_at";
113 match &self.conn {
114 StateConn::Sqlite(c) => {
115 c.execute(
116 sql,
117 rusqlite::params![export_name, highest_chunk_index, run_id, now],
118 )?;
119 }
120 StateConn::Postgres(client) => {
121 let mut c = client.borrow_mut();
122 c.execute(
123 &pg_sql(sql),
124 &[&export_name, &highest_chunk_index, &run_id, &now],
125 )?;
126 }
127 }
128 Ok(())
129 }
130
131 pub fn record_verified_chunked(
133 &self,
134 export_name: &str,
135 highest_chunk_index: i64,
136 run_id: &str,
137 ) -> Result<()> {
138 let now = Utc::now().to_rfc3339();
139 let sql = "INSERT INTO export_progression (
140 export_name,
141 last_verified_strategy, last_verified_cursor, last_verified_chunk_index,
142 last_verified_run_id, last_verified_at
143 ) VALUES (?1, 'chunked', NULL, ?2, ?3, ?4)
144 ON CONFLICT(export_name) DO UPDATE SET
145 last_verified_strategy = 'chunked',
146 last_verified_cursor = NULL,
147 last_verified_chunk_index = excluded.last_verified_chunk_index,
148 last_verified_run_id = excluded.last_verified_run_id,
149 last_verified_at = excluded.last_verified_at";
150 match &self.conn {
151 StateConn::Sqlite(c) => {
152 c.execute(
153 sql,
154 rusqlite::params![export_name, highest_chunk_index, run_id, now],
155 )?;
156 }
157 StateConn::Postgres(client) => {
158 let mut c = client.borrow_mut();
159 c.execute(
160 &pg_sql(sql),
161 &[&export_name, &highest_chunk_index, &run_id, &now],
162 )?;
163 }
164 }
165 Ok(())
166 }
167
168 pub fn get_progression(&self, export_name: &str) -> Result<ExportProgression> {
169 let sql = "SELECT
170 last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
171 last_committed_run_id, last_committed_at,
172 last_verified_strategy, last_verified_cursor, last_verified_chunk_index,
173 last_verified_run_id, last_verified_at
174 FROM export_progression WHERE export_name = ?1";
175 match &self.conn {
176 StateConn::Sqlite(c) => {
177 let mut stmt = c.prepare(sql)?;
178 let row = stmt.query_row([export_name], |r| {
179 Ok((
180 r.get::<_, Option<String>>(0)?,
181 r.get::<_, Option<String>>(1)?,
182 r.get::<_, Option<i64>>(2)?,
183 r.get::<_, Option<String>>(3)?,
184 r.get::<_, Option<String>>(4)?,
185 r.get::<_, Option<String>>(5)?,
186 r.get::<_, Option<String>>(6)?,
187 r.get::<_, Option<i64>>(7)?,
188 r.get::<_, Option<String>>(8)?,
189 r.get::<_, Option<String>>(9)?,
190 ))
191 });
192 let (c_str, c_cur, c_idx, c_run, c_at, v_str, v_cur, v_idx, v_run, v_at) = match row
193 {
194 Ok(t) => t,
195 Err(rusqlite::Error::QueryReturnedNoRows) => {
196 return Ok(ExportProgression {
197 export_name: export_name.to_string(),
198 committed: None,
199 verified: None,
200 });
201 }
202 Err(e) => return Err(e.into()),
203 };
204 Ok(ExportProgression {
205 export_name: export_name.to_string(),
206 committed: boundary_from_row(c_str, c_cur, c_idx, c_run, c_at),
207 verified: boundary_from_row(v_str, v_cur, v_idx, v_run, v_at),
208 })
209 }
210 StateConn::Postgres(client) => {
211 let mut c = client.borrow_mut();
212 match c.query_opt(&pg_sql(sql), &[&export_name])? {
213 None => Ok(ExportProgression {
214 export_name: export_name.to_string(),
215 committed: None,
216 verified: None,
217 }),
218 Some(row) => {
219 let c_str: Option<String> = row.get(0);
220 let c_cur: Option<String> = row.get(1);
221 let c_idx: Option<i64> = row.get(2);
222 let c_run: Option<String> = row.get(3);
223 let c_at: Option<String> = row.get(4);
224 let v_str: Option<String> = row.get(5);
225 let v_cur: Option<String> = row.get(6);
226 let v_idx: Option<i64> = row.get(7);
227 let v_run: Option<String> = row.get(8);
228 let v_at: Option<String> = row.get(9);
229 Ok(ExportProgression {
230 export_name: export_name.to_string(),
231 committed: boundary_from_row(c_str, c_cur, c_idx, c_run, c_at),
232 verified: boundary_from_row(v_str, v_cur, v_idx, v_run, v_at),
233 })
234 }
235 }
236 }
237 }
238 }
239
240 pub fn delete_progression(&self, export_name: &str) -> Result<usize> {
250 let sql = "DELETE FROM export_progression WHERE export_name = ?1";
251 match &self.conn {
252 StateConn::Sqlite(c) => Ok(c.execute(sql, [export_name])?),
253 StateConn::Postgres(client) => {
254 let mut c = client.borrow_mut();
255 Ok(c.execute(&pg_sql(sql), &[&export_name])? as usize)
256 }
257 }
258 }
259
260 pub fn list_progression(&self) -> Result<Vec<ExportProgression>> {
261 match &self.conn {
262 StateConn::Sqlite(c) => {
263 let mut stmt =
264 c.prepare("SELECT export_name FROM export_progression ORDER BY export_name")?;
265 let names: Vec<String> = stmt
266 .query_map([], |r| r.get::<_, String>(0))?
267 .collect::<std::result::Result<_, _>>()?;
268 drop(stmt);
269 let mut out = Vec::with_capacity(names.len());
270 for n in names {
271 out.push(self.get_progression(&n)?);
272 }
273 Ok(out)
274 }
275 StateConn::Postgres(client) => {
276 let mut c = client.borrow_mut();
278 let rows = c.query(
279 "SELECT export_name,
280 last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
281 last_committed_run_id, last_committed_at,
282 last_verified_strategy, last_verified_cursor, last_verified_chunk_index,
283 last_verified_run_id, last_verified_at
284 FROM export_progression ORDER BY export_name",
285 &[],
286 )?;
287 Ok(rows
288 .iter()
289 .map(|row| {
290 let export_name: String = row.get(0);
291 let c_str: Option<String> = row.get(1);
292 let c_cur: Option<String> = row.get(2);
293 let c_idx: Option<i64> = row.get(3);
294 let c_run: Option<String> = row.get(4);
295 let c_at: Option<String> = row.get(5);
296 let v_str: Option<String> = row.get(6);
297 let v_cur: Option<String> = row.get(7);
298 let v_idx: Option<i64> = row.get(8);
299 let v_run: Option<String> = row.get(9);
300 let v_at: Option<String> = row.get(10);
301 ExportProgression {
302 export_name,
303 committed: boundary_from_row(c_str, c_cur, c_idx, c_run, c_at),
304 verified: boundary_from_row(v_str, v_cur, v_idx, v_run, v_at),
305 }
306 })
307 .collect())
308 }
309 }
310 }
311}
312
313fn cursor_advances(stored: &str, new: &str) -> bool {
322 if let (Ok(a), Ok(b)) = (stored.parse::<i128>(), new.parse::<i128>()) {
323 return b > a;
324 }
325 if let (Ok(a), Ok(b)) = (stored.parse::<f64>(), new.parse::<f64>())
326 && let Some(ord) = b.partial_cmp(&a)
327 {
328 return ord.is_gt();
329 }
330 new > stored
331}
332
333fn boundary_from_row(
334 strategy: Option<String>,
335 cursor: Option<String>,
336 chunk_index: Option<i64>,
337 run_id: Option<String>,
338 at: Option<String>,
339) -> Option<Boundary> {
340 let strategy = strategy?;
341 let at = at
342 .as_deref()
343 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
344 .map(|dt| dt.with_timezone(&Utc))?;
345 Some(Boundary {
346 strategy,
347 run_id,
348 cursor,
349 chunk_index,
350 at,
351 })
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357
358 fn store() -> StateStore {
359 StateStore::open_in_memory().expect("in-memory store")
360 }
361
362 #[test]
363 fn progression_unknown_export_returns_empty() {
364 let s = store();
365 let p = s.get_progression("orders").unwrap();
366 assert!(p.committed.is_none());
367 assert!(p.verified.is_none());
368 }
369
370 #[test]
371 fn committed_incremental_records_cursor_and_run() {
372 let s = store();
373 s.record_committed_incremental("orders", "2024-06-01", "run-1")
374 .unwrap();
375 let b = s.get_progression("orders").unwrap().committed.unwrap();
376 assert_eq!(b.strategy, "incremental");
377 assert_eq!(b.cursor.as_deref(), Some("2024-06-01"));
378 assert_eq!(b.chunk_index, None);
379 assert_eq!(b.run_id.as_deref(), Some("run-1"));
380 }
381
382 #[test]
383 fn committed_cursor_does_not_regress_lexicographically() {
384 let s = store();
385 s.record_committed_incremental("orders", "2024-06-10", "run-10")
386 .unwrap();
387 s.record_committed_incremental("orders", "2024-01-01", "run-01")
388 .unwrap();
389 let b = s.get_progression("orders").unwrap().committed.unwrap();
390 assert_eq!(b.cursor.as_deref(), Some("2024-06-10"));
391 }
392
393 #[test]
394 fn committed_chunked_records_chunk_index() {
395 let s = store();
396 s.record_committed_chunked("orders", 41, "run-A").unwrap();
397 let b = s.get_progression("orders").unwrap().committed.unwrap();
398 assert_eq!(b.strategy, "chunked");
399 assert_eq!(b.chunk_index, Some(41));
400 assert_eq!(b.cursor, None);
401 }
402
403 #[test]
404 fn verified_and_committed_are_independent() {
405 let s = store();
406 s.record_committed_chunked("orders", 10, "run-A").unwrap();
407 s.record_verified_chunked("orders", 5, "run-A").unwrap();
408 let p = s.get_progression("orders").unwrap();
409 assert_eq!(p.committed.as_ref().unwrap().chunk_index, Some(10));
410 assert_eq!(p.verified.as_ref().unwrap().chunk_index, Some(5));
411 }
412
413 #[test]
414 fn switching_strategy_updates_committed_row() {
415 let s = store();
416 s.record_committed_incremental("orders", "2024-01-01", "inc-1")
417 .unwrap();
418 s.record_committed_chunked("orders", 7, "chunk-1").unwrap();
419 let b = s.get_progression("orders").unwrap().committed.unwrap();
420 assert_eq!(b.strategy, "chunked");
421 assert_eq!(b.chunk_index, Some(7));
422 assert_eq!(b.cursor, None);
423 }
424
425 #[test]
430 fn roast_committed_numeric_cursor_advances_past_lexicographic_boundary() {
431 let s = store();
432 s.record_committed_incremental("orders", "999", "run-999")
433 .unwrap();
434 s.record_committed_incremental("orders", "1000", "run-1000")
435 .unwrap();
436 let b = s.get_progression("orders").unwrap().committed.unwrap();
437 assert_eq!(
438 b.cursor.as_deref(),
439 Some("1000"),
440 "numeric cursor must advance from 999 to 1000, but the lexicographic \
441 TEXT comparison froze the committed boundary at {:?}",
442 b.cursor
443 );
444 }
445
446 #[test]
447 fn committed_numeric_cursor_does_not_regress() {
448 let s = store();
449 s.record_committed_incremental("orders", "1000", "run-1000")
450 .unwrap();
451 s.record_committed_incremental("orders", "999", "run-999")
452 .unwrap();
453 let b = s.get_progression("orders").unwrap().committed.unwrap();
454 assert_eq!(b.cursor.as_deref(), Some("1000"));
455 assert_eq!(
456 b.run_id.as_deref(),
457 Some("run-1000"),
458 "non-advancing commit must leave the boundary row untouched"
459 );
460 }
461
462 #[test]
463 fn committed_float_cursor_advances_across_integer_boundary() {
464 let s = store();
468 s.record_committed_incremental("scores", "9.9", "run-1")
469 .unwrap();
470 s.record_committed_incremental("scores", "10", "run-2")
471 .unwrap();
472 let b = s.get_progression("scores").unwrap().committed.unwrap();
473 assert_eq!(b.cursor.as_deref(), Some("10"));
474 s.record_committed_incremental("scores", "9.95", "run-3")
475 .unwrap();
476 let b = s.get_progression("scores").unwrap().committed.unwrap();
477 assert_eq!(b.cursor.as_deref(), Some("10"), "9.95 must not regress 10");
478 }
479
480 #[test]
481 fn committed_equal_cursor_is_a_no_op() {
482 let s = store();
483 s.record_committed_incremental("orders", "100", "run-1")
484 .unwrap();
485 s.record_committed_incremental("orders", "100", "run-2")
486 .unwrap();
487 let b = s.get_progression("orders").unwrap().committed.unwrap();
488 assert_eq!(b.cursor.as_deref(), Some("100"));
489 assert_eq!(
490 b.run_id.as_deref(),
491 Some("run-1"),
492 "an equal cursor does not advance; the row must stay untouched"
493 );
494 }
495
496 #[test]
497 fn committed_rfc3339_cursor_advances_and_does_not_regress() {
498 let s = store();
499 s.record_committed_incremental("orders", "2024-06-01T00:00:00Z", "run-1")
500 .unwrap();
501 s.record_committed_incremental("orders", "2024-06-02T00:00:00Z", "run-2")
502 .unwrap();
503 let b = s.get_progression("orders").unwrap().committed.unwrap();
504 assert_eq!(b.cursor.as_deref(), Some("2024-06-02T00:00:00Z"));
505 s.record_committed_incremental("orders", "2024-05-31T00:00:00Z", "run-3")
506 .unwrap();
507 let b = s.get_progression("orders").unwrap().committed.unwrap();
508 assert_eq!(b.cursor.as_deref(), Some("2024-06-02T00:00:00Z"));
509 }
510
511 #[test]
512 fn committed_mixed_kind_cursor_falls_back_to_string_order() {
513 let s = store();
517 s.record_committed_incremental("orders", "abc", "run-1")
518 .unwrap();
519 s.record_committed_incremental("orders", "123", "run-2")
520 .unwrap();
521 let b = s.get_progression("orders").unwrap().committed.unwrap();
522 assert_eq!(b.cursor.as_deref(), Some("abc"));
523 }
524
525 #[test]
526 fn committed_large_integer_cursor_compares_exactly() {
527 let s = store();
530 s.record_committed_incremental("orders", "9007199254740992", "run-1")
531 .unwrap();
532 s.record_committed_incremental("orders", "9007199254740993", "run-2")
533 .unwrap();
534 let b = s.get_progression("orders").unwrap().committed.unwrap();
535 assert_eq!(b.cursor.as_deref(), Some("9007199254740993"));
536 }
537
538 #[test]
539 fn switching_chunked_to_incremental_writes_cursor() {
540 let s = store();
543 s.record_committed_chunked("orders", 7, "chunk-1").unwrap();
544 s.record_committed_incremental("orders", "100", "inc-1")
545 .unwrap();
546 let b = s.get_progression("orders").unwrap().committed.unwrap();
547 assert_eq!(b.strategy, "incremental");
548 assert_eq!(b.cursor.as_deref(), Some("100"));
549 assert_eq!(b.chunk_index, None);
550 }
551
552 #[test]
553 fn cursor_advances_orders_numbers_strings_and_nan() {
554 assert!(cursor_advances("999", "1000"));
555 assert!(!cursor_advances("1000", "999"));
556 assert!(!cursor_advances("100", "100"));
557 assert!(cursor_advances("9.9", "10"));
558 assert!(cursor_advances("-5", "-4"));
559 assert!(cursor_advances("2024-01-01", "2024-06-10"));
560 assert!(cursor_advances("NaN", "inf"));
563 assert!(!cursor_advances("inf", "NaN"));
564 }
565
566 #[test]
567 fn delete_progression_removes_only_the_named_export() {
568 let s = store();
569 s.record_committed_incremental("orders", "100", "run-o")
570 .unwrap();
571 s.record_committed_incremental("users", "9", "run-u")
572 .unwrap();
573
574 assert_eq!(
575 s.delete_progression("orders").unwrap(),
576 1,
577 "deleting an existing progression row reports one row removed"
578 );
579 assert!(s.get_progression("orders").unwrap().committed.is_none());
580 assert!(
581 s.get_progression("users").unwrap().committed.is_some(),
582 "delete must be scoped to the named export"
583 );
584 assert_eq!(
585 s.delete_progression("orders").unwrap(),
586 0,
587 "deleting an absent progression row is a no-op (zero rows)"
588 );
589 }
590
591 #[test]
592 fn list_progression_sorted_by_name() {
593 let s = store();
594 s.record_committed_incremental("gamma", "3", "r").unwrap();
595 s.record_committed_incremental("alpha", "1", "r").unwrap();
596 s.record_committed_incremental("beta", "2", "r").unwrap();
597 let all = s.list_progression().unwrap();
598 let names: Vec<_> = all.iter().map(|p| p.export_name.as_str()).collect();
599 assert_eq!(names, vec!["alpha", "beta", "gamma"]);
600 }
601}