1use chrono::{DateTime, Utc};
4
5use super::{StateConn, StateStore, pg_sql};
6use crate::error::Result;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ExportProgression {
11 pub export_name: String,
12 pub committed: Option<Boundary>,
13 pub verified: Option<Boundary>,
14}
15
16#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct Boundary {
19 pub strategy: String,
20 pub run_id: Option<String>,
21 pub cursor: Option<String>,
22 pub chunk_index: Option<i64>,
23 pub at: DateTime<Utc>,
24}
25
26impl StateStore {
27 pub fn record_committed_incremental(
30 &self,
31 export_name: &str,
32 cursor: &str,
33 run_id: &str,
34 ) -> Result<()> {
35 let now = Utc::now().to_rfc3339();
36 let sql = "INSERT INTO export_progression (
37 export_name,
38 last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
39 last_committed_run_id, last_committed_at
40 ) VALUES (?1, 'incremental', ?2, NULL, ?3, ?4)
41 ON CONFLICT(export_name) DO UPDATE SET
42 last_committed_strategy = 'incremental',
43 last_committed_cursor = CASE
44 WHEN export_progression.last_committed_cursor IS NULL
45 OR export_progression.last_committed_cursor < excluded.last_committed_cursor
46 THEN excluded.last_committed_cursor
47 ELSE export_progression.last_committed_cursor END,
48 last_committed_chunk_index = NULL,
49 last_committed_run_id = excluded.last_committed_run_id,
50 last_committed_at = excluded.last_committed_at";
51 match &self.conn {
52 StateConn::Sqlite(c) => {
53 c.execute(sql, rusqlite::params![export_name, cursor, run_id, now])?;
54 }
55 StateConn::Postgres(client) => {
56 let mut c = client.borrow_mut();
57 c.execute(&pg_sql(sql), &[&export_name, &cursor, &run_id, &now])?;
58 }
59 }
60 Ok(())
61 }
62
63 pub fn record_committed_chunked(
65 &self,
66 export_name: &str,
67 highest_chunk_index: i64,
68 run_id: &str,
69 ) -> Result<()> {
70 let now = Utc::now().to_rfc3339();
71 let sql = "INSERT INTO export_progression (
72 export_name,
73 last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
74 last_committed_run_id, last_committed_at
75 ) VALUES (?1, 'chunked', NULL, ?2, ?3, ?4)
76 ON CONFLICT(export_name) DO UPDATE SET
77 last_committed_strategy = 'chunked',
78 last_committed_cursor = NULL,
79 last_committed_chunk_index = excluded.last_committed_chunk_index,
80 last_committed_run_id = excluded.last_committed_run_id,
81 last_committed_at = excluded.last_committed_at";
82 match &self.conn {
83 StateConn::Sqlite(c) => {
84 c.execute(
85 sql,
86 rusqlite::params![export_name, highest_chunk_index, run_id, now],
87 )?;
88 }
89 StateConn::Postgres(client) => {
90 let mut c = client.borrow_mut();
91 c.execute(
92 &pg_sql(sql),
93 &[&export_name, &highest_chunk_index, &run_id, &now],
94 )?;
95 }
96 }
97 Ok(())
98 }
99
100 pub fn record_verified_chunked(
102 &self,
103 export_name: &str,
104 highest_chunk_index: i64,
105 run_id: &str,
106 ) -> Result<()> {
107 let now = Utc::now().to_rfc3339();
108 let sql = "INSERT INTO export_progression (
109 export_name,
110 last_verified_strategy, last_verified_cursor, last_verified_chunk_index,
111 last_verified_run_id, last_verified_at
112 ) VALUES (?1, 'chunked', NULL, ?2, ?3, ?4)
113 ON CONFLICT(export_name) DO UPDATE SET
114 last_verified_strategy = 'chunked',
115 last_verified_cursor = NULL,
116 last_verified_chunk_index = excluded.last_verified_chunk_index,
117 last_verified_run_id = excluded.last_verified_run_id,
118 last_verified_at = excluded.last_verified_at";
119 match &self.conn {
120 StateConn::Sqlite(c) => {
121 c.execute(
122 sql,
123 rusqlite::params![export_name, highest_chunk_index, run_id, now],
124 )?;
125 }
126 StateConn::Postgres(client) => {
127 let mut c = client.borrow_mut();
128 c.execute(
129 &pg_sql(sql),
130 &[&export_name, &highest_chunk_index, &run_id, &now],
131 )?;
132 }
133 }
134 Ok(())
135 }
136
137 pub fn get_progression(&self, export_name: &str) -> Result<ExportProgression> {
138 let sql = "SELECT
139 last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
140 last_committed_run_id, last_committed_at,
141 last_verified_strategy, last_verified_cursor, last_verified_chunk_index,
142 last_verified_run_id, last_verified_at
143 FROM export_progression WHERE export_name = ?1";
144 match &self.conn {
145 StateConn::Sqlite(c) => {
146 let mut stmt = c.prepare(sql)?;
147 let row = stmt.query_row([export_name], |r| {
148 Ok((
149 r.get::<_, Option<String>>(0)?,
150 r.get::<_, Option<String>>(1)?,
151 r.get::<_, Option<i64>>(2)?,
152 r.get::<_, Option<String>>(3)?,
153 r.get::<_, Option<String>>(4)?,
154 r.get::<_, Option<String>>(5)?,
155 r.get::<_, Option<String>>(6)?,
156 r.get::<_, Option<i64>>(7)?,
157 r.get::<_, Option<String>>(8)?,
158 r.get::<_, Option<String>>(9)?,
159 ))
160 });
161 let (c_str, c_cur, c_idx, c_run, c_at, v_str, v_cur, v_idx, v_run, v_at) = match row
162 {
163 Ok(t) => t,
164 Err(rusqlite::Error::QueryReturnedNoRows) => {
165 return Ok(ExportProgression {
166 export_name: export_name.to_string(),
167 committed: None,
168 verified: None,
169 });
170 }
171 Err(e) => return Err(e.into()),
172 };
173 Ok(ExportProgression {
174 export_name: export_name.to_string(),
175 committed: boundary_from_row(c_str, c_cur, c_idx, c_run, c_at),
176 verified: boundary_from_row(v_str, v_cur, v_idx, v_run, v_at),
177 })
178 }
179 StateConn::Postgres(client) => {
180 let mut c = client.borrow_mut();
181 match c.query_opt(&pg_sql(sql), &[&export_name])? {
182 None => Ok(ExportProgression {
183 export_name: export_name.to_string(),
184 committed: None,
185 verified: None,
186 }),
187 Some(row) => {
188 let c_str: Option<String> = row.get(0);
189 let c_cur: Option<String> = row.get(1);
190 let c_idx: Option<i64> = row.get(2);
191 let c_run: Option<String> = row.get(3);
192 let c_at: Option<String> = row.get(4);
193 let v_str: Option<String> = row.get(5);
194 let v_cur: Option<String> = row.get(6);
195 let v_idx: Option<i64> = row.get(7);
196 let v_run: Option<String> = row.get(8);
197 let v_at: Option<String> = row.get(9);
198 Ok(ExportProgression {
199 export_name: export_name.to_string(),
200 committed: boundary_from_row(c_str, c_cur, c_idx, c_run, c_at),
201 verified: boundary_from_row(v_str, v_cur, v_idx, v_run, v_at),
202 })
203 }
204 }
205 }
206 }
207 }
208
209 pub fn list_progression(&self) -> Result<Vec<ExportProgression>> {
210 match &self.conn {
211 StateConn::Sqlite(c) => {
212 let mut stmt =
213 c.prepare("SELECT export_name FROM export_progression ORDER BY export_name")?;
214 let names: Vec<String> = stmt
215 .query_map([], |r| r.get::<_, String>(0))?
216 .collect::<std::result::Result<_, _>>()?;
217 drop(stmt);
218 let mut out = Vec::with_capacity(names.len());
219 for n in names {
220 out.push(self.get_progression(&n)?);
221 }
222 Ok(out)
223 }
224 StateConn::Postgres(client) => {
225 let mut c = client.borrow_mut();
227 let rows = c.query(
228 "SELECT export_name,
229 last_committed_strategy, last_committed_cursor, last_committed_chunk_index,
230 last_committed_run_id, last_committed_at,
231 last_verified_strategy, last_verified_cursor, last_verified_chunk_index,
232 last_verified_run_id, last_verified_at
233 FROM export_progression ORDER BY export_name",
234 &[],
235 )?;
236 Ok(rows
237 .iter()
238 .map(|row| {
239 let export_name: String = row.get(0);
240 let c_str: Option<String> = row.get(1);
241 let c_cur: Option<String> = row.get(2);
242 let c_idx: Option<i64> = row.get(3);
243 let c_run: Option<String> = row.get(4);
244 let c_at: Option<String> = row.get(5);
245 let v_str: Option<String> = row.get(6);
246 let v_cur: Option<String> = row.get(7);
247 let v_idx: Option<i64> = row.get(8);
248 let v_run: Option<String> = row.get(9);
249 let v_at: Option<String> = row.get(10);
250 ExportProgression {
251 export_name,
252 committed: boundary_from_row(c_str, c_cur, c_idx, c_run, c_at),
253 verified: boundary_from_row(v_str, v_cur, v_idx, v_run, v_at),
254 }
255 })
256 .collect())
257 }
258 }
259 }
260}
261
262fn boundary_from_row(
263 strategy: Option<String>,
264 cursor: Option<String>,
265 chunk_index: Option<i64>,
266 run_id: Option<String>,
267 at: Option<String>,
268) -> Option<Boundary> {
269 let strategy = strategy?;
270 let at = at
271 .as_deref()
272 .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
273 .map(|dt| dt.with_timezone(&Utc))?;
274 Some(Boundary {
275 strategy,
276 run_id,
277 cursor,
278 chunk_index,
279 at,
280 })
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286
287 fn store() -> StateStore {
288 StateStore::open_in_memory().expect("in-memory store")
289 }
290
291 #[test]
292 fn progression_unknown_export_returns_empty() {
293 let s = store();
294 let p = s.get_progression("orders").unwrap();
295 assert!(p.committed.is_none());
296 assert!(p.verified.is_none());
297 }
298
299 #[test]
300 fn committed_incremental_records_cursor_and_run() {
301 let s = store();
302 s.record_committed_incremental("orders", "2024-06-01", "run-1")
303 .unwrap();
304 let b = s.get_progression("orders").unwrap().committed.unwrap();
305 assert_eq!(b.strategy, "incremental");
306 assert_eq!(b.cursor.as_deref(), Some("2024-06-01"));
307 assert_eq!(b.chunk_index, None);
308 assert_eq!(b.run_id.as_deref(), Some("run-1"));
309 }
310
311 #[test]
312 fn committed_cursor_does_not_regress_lexicographically() {
313 let s = store();
314 s.record_committed_incremental("orders", "2024-06-10", "run-10")
315 .unwrap();
316 s.record_committed_incremental("orders", "2024-01-01", "run-01")
317 .unwrap();
318 let b = s.get_progression("orders").unwrap().committed.unwrap();
319 assert_eq!(b.cursor.as_deref(), Some("2024-06-10"));
320 }
321
322 #[test]
323 fn committed_chunked_records_chunk_index() {
324 let s = store();
325 s.record_committed_chunked("orders", 41, "run-A").unwrap();
326 let b = s.get_progression("orders").unwrap().committed.unwrap();
327 assert_eq!(b.strategy, "chunked");
328 assert_eq!(b.chunk_index, Some(41));
329 assert_eq!(b.cursor, None);
330 }
331
332 #[test]
333 fn verified_and_committed_are_independent() {
334 let s = store();
335 s.record_committed_chunked("orders", 10, "run-A").unwrap();
336 s.record_verified_chunked("orders", 5, "run-A").unwrap();
337 let p = s.get_progression("orders").unwrap();
338 assert_eq!(p.committed.as_ref().unwrap().chunk_index, Some(10));
339 assert_eq!(p.verified.as_ref().unwrap().chunk_index, Some(5));
340 }
341
342 #[test]
343 fn switching_strategy_updates_committed_row() {
344 let s = store();
345 s.record_committed_incremental("orders", "2024-01-01", "inc-1")
346 .unwrap();
347 s.record_committed_chunked("orders", 7, "chunk-1").unwrap();
348 let b = s.get_progression("orders").unwrap().committed.unwrap();
349 assert_eq!(b.strategy, "chunked");
350 assert_eq!(b.chunk_index, Some(7));
351 assert_eq!(b.cursor, None);
352 }
353
354 #[test]
355 fn list_progression_sorted_by_name() {
356 let s = store();
357 s.record_committed_incremental("gamma", "3", "r").unwrap();
358 s.record_committed_incremental("alpha", "1", "r").unwrap();
359 s.record_committed_incremental("beta", "2", "r").unwrap();
360 let all = s.list_progression().unwrap();
361 let names: Vec<_> = all.iter().map(|p| p.export_name.as_str()).collect();
362 assert_eq!(names, vec!["alpha", "beta", "gamma"]);
363 }
364}