1use rusqlite::TransactionBehavior;
2
3use crate::error::Result;
4
5use super::{StateConn, StateRef, StateStore, open_connection, pg_sql};
6
7#[derive(Debug, Clone)]
9pub struct ChunkTaskInfo {
10 pub chunk_index: i64,
11 pub start_key: String,
12 pub end_key: String,
13 pub status: String,
14 pub attempts: i64,
15 pub last_error: Option<String>,
16 pub rows_written: Option<i64>,
17 pub file_name: Option<String>,
18}
19
20impl StateStore {
30 pub fn list_export_names_with_in_progress_chunk_runs(&self) -> Result<Vec<String>> {
33 let sql = "SELECT DISTINCT export_name FROM chunk_run WHERE status = 'in_progress' ORDER BY export_name ASC";
34 match &self.conn {
35 StateConn::Sqlite(c) => {
36 let mut stmt = c.prepare(sql)?;
37 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
38 rows.collect::<std::result::Result<Vec<_>, _>>()
39 .map_err(Into::into)
40 }
41 StateConn::Postgres(client) => {
42 let mut c = client.borrow_mut();
43 let rows = c.query(&pg_sql(sql), &[])?;
44 Ok(rows.iter().map(|row| row.get(0)).collect())
45 }
46 }
47 }
48
49 pub fn find_in_progress_chunk_run(
51 &self,
52 export_name: &str,
53 ) -> Result<Option<(String, String)>> {
54 let sql = "SELECT run_id, plan_hash FROM chunk_run
55 WHERE export_name = ?1 AND status = 'in_progress'
56 ORDER BY created_at DESC LIMIT 1";
57 match &self.conn {
58 StateConn::Sqlite(c) => {
59 let mut stmt = c.prepare(sql)?;
60 let mut rows =
61 stmt.query_map([export_name], |row| Ok((row.get(0)?, row.get(1)?)))?;
62 Ok(rows.next().transpose()?)
63 }
64 StateConn::Postgres(client) => {
65 let mut c = client.borrow_mut();
68 let rows = c.query(&pg_sql(sql), &[&export_name])?;
69 Ok(rows.first().map(|row| (row.get(0), row.get(1))))
70 }
71 }
72 }
73
74 pub fn create_chunk_run(
75 &self,
76 run_id: &str,
77 export_name: &str,
78 plan_hash: &str,
79 max_chunk_attempts: u32,
80 ) -> Result<()> {
81 let now = chrono::Utc::now().to_rfc3339();
82 let sql = "INSERT INTO chunk_run (run_id, export_name, plan_hash, status, max_chunk_attempts, created_at, updated_at)
83 VALUES (?1, ?2, ?3, 'in_progress', ?4, ?5, ?5)";
84 match &self.conn {
85 StateConn::Sqlite(c) => {
86 c.execute(
87 sql,
88 rusqlite::params![
89 run_id,
90 export_name,
91 plan_hash,
92 max_chunk_attempts as i64,
93 now
94 ],
95 )?;
96 }
97 StateConn::Postgres(client) => {
98 let mut c = client.borrow_mut();
99 c.execute(
100 &pg_sql(sql),
101 &[
102 &run_id,
103 &export_name,
104 &plan_hash,
105 &(max_chunk_attempts as i64),
106 &now,
107 ],
108 )?;
109 }
110 }
111 Ok(())
112 }
113
114 pub fn insert_chunk_tasks(&self, run_id: &str, ranges: &[(i64, i64)]) -> Result<()> {
115 if ranges.is_empty() {
116 return Ok(());
117 }
118 let now = chrono::Utc::now().to_rfc3339();
119 match &self.conn {
120 StateConn::Sqlite(c) => {
121 let tx = c.unchecked_transaction()?;
122 {
123 let mut stmt = tx.prepare(
124 "INSERT INTO chunk_task (run_id, chunk_index, start_key, end_key, status, attempts, updated_at)
125 VALUES (?1, ?2, ?3, ?4, 'pending', 0, ?5)",
126 )?;
127 for (i, (start, end)) in ranges.iter().enumerate() {
128 stmt.execute(rusqlite::params![
129 run_id,
130 i as i64,
131 start.to_string(),
132 end.to_string(),
133 now,
134 ])?;
135 }
136 }
137 tx.commit()?;
138 }
139 StateConn::Postgres(client) => {
140 let mut c = client.borrow_mut();
141 let mut tx = c
142 .transaction()
143 .map_err(|e| anyhow::anyhow!("state(pg): begin transaction: {:#}", e))?;
144 for (i, (start, end)) in ranges.iter().enumerate() {
145 tx.execute(
146 "INSERT INTO chunk_task (run_id, chunk_index, start_key, end_key, status, attempts, updated_at)
147 VALUES ($1, $2, $3, $4, 'pending', 0, $5)",
148 &[
149 &run_id,
150 &(i as i64),
151 &start.to_string(),
152 &end.to_string(),
153 &now,
154 ],
155 )
156 .map_err(|e| anyhow::anyhow!("state(pg): insert chunk_task: {:#}", e))?;
157 }
158 tx.commit()
159 .map_err(|e| anyhow::anyhow!("state(pg): commit: {:#}", e))?;
160 }
161 }
162 Ok(())
163 }
164
165 pub fn reset_stale_running_chunk_tasks(&self, run_id: &str) -> Result<usize> {
167 let now = chrono::Utc::now().to_rfc3339();
168 let sql = "UPDATE chunk_task SET status = 'pending', updated_at = ?1
169 WHERE run_id = ?2 AND status = 'running'";
170 match &self.conn {
171 StateConn::Sqlite(c) => {
172 let n = c.execute(sql, rusqlite::params![now, run_id])?;
173 Ok(n)
174 }
175 StateConn::Postgres(client) => {
176 let mut c = client.borrow_mut();
177 let n = c.execute(&pg_sql(sql), &[&now, &run_id])?;
178 Ok(n as usize)
179 }
180 }
181 }
182
183 pub fn claim_next_chunk_task(&self, run_id: &str) -> Result<Option<(i64, String, String)>> {
185 Self::claim_next_chunk_task_at_ref(&self.state_ref, run_id)
186 }
187
188 fn claim_next_chunk_in_sqlite_tx(
189 tx: &rusqlite::Transaction<'_>,
190 now: &str,
191 run_id: &str,
192 ) -> Result<Option<(i64, String, String)>> {
193 let mut stmt = tx.prepare(
194 "UPDATE chunk_task
195 SET status = 'running', attempts = attempts + 1, updated_at = ?1
196 WHERE rowid = (
197 SELECT ct.rowid FROM chunk_task ct
198 INNER JOIN chunk_run cr ON cr.run_id = ct.run_id
199 WHERE ct.run_id = ?2
200 AND cr.status = 'in_progress'
201 AND (
202 ct.status = 'pending'
203 OR (ct.status = 'failed' AND ct.attempts < cr.max_chunk_attempts)
204 )
205 ORDER BY ct.chunk_index ASC
206 LIMIT 1
207 )
208 RETURNING chunk_index, start_key, end_key",
209 )?;
210 let mut rows = stmt.query(rusqlite::params![now, run_id])?;
211 let out = match rows.next()? {
212 Some(row) => Some((row.get(0)?, row.get(1)?, row.get(2)?)),
213 None => None,
214 };
215 Ok(out)
216 }
217
218 pub fn claim_next_chunk_task_at_ref(
221 state_ref: &StateRef,
222 run_id: &str,
223 ) -> Result<Option<(i64, String, String)>> {
224 match state_ref {
225 StateRef::Sqlite(db_path) => {
226 let mut conn = open_connection(db_path)?;
227 let now = chrono::Utc::now().to_rfc3339();
228 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
229 let res = Self::claim_next_chunk_in_sqlite_tx(&tx, &now, run_id)?;
230 tx.commit()?;
231 Ok(res)
232 }
233 StateRef::Postgres(url) => {
234 let mut client = postgres::Client::connect(url, postgres::NoTls)
235 .map_err(|e| anyhow::anyhow!("state(pg): connect for claim: {:#}", e))?;
236 let now = chrono::Utc::now().to_rfc3339();
237 let rows = client
239 .query(
240 "UPDATE chunk_task
241 SET status = 'running', attempts = attempts + 1, updated_at = $1
242 WHERE id = (
243 SELECT ct.id FROM chunk_task ct
244 INNER JOIN chunk_run cr ON cr.run_id = ct.run_id
245 WHERE ct.run_id = $2
246 AND cr.status = 'in_progress'
247 AND (
248 ct.status = 'pending'
249 OR (ct.status = 'failed' AND ct.attempts < cr.max_chunk_attempts)
250 )
251 ORDER BY ct.chunk_index ASC
252 LIMIT 1
253 FOR UPDATE SKIP LOCKED
254 )
255 RETURNING chunk_index, start_key, end_key",
256 &[&now, &run_id],
257 )
258 .map_err(|e| anyhow::anyhow!("state(pg): claim chunk: {:#}", e))?;
259 Ok(rows.first().map(|row| (row.get(0), row.get(1), row.get(2))))
260 }
261 }
262 }
263
264 pub fn complete_chunk_task(
265 &self,
266 run_id: &str,
267 chunk_index: i64,
268 rows_written: i64,
269 file_name: Option<&str>,
270 ) -> Result<()> {
271 let now = chrono::Utc::now().to_rfc3339();
272 let sql = "UPDATE chunk_task
273 SET status = 'completed', rows_written = ?1, file_name = ?2, last_error = NULL, updated_at = ?3
274 WHERE run_id = ?4 AND chunk_index = ?5";
275 match &self.conn {
276 StateConn::Sqlite(c) => {
277 c.execute(
278 sql,
279 rusqlite::params![rows_written, file_name, now, run_id, chunk_index],
280 )?;
281 }
282 StateConn::Postgres(client) => {
283 let mut c = client.borrow_mut();
284 c.execute(
285 &pg_sql(sql),
286 &[&rows_written, &file_name, &now, &run_id, &chunk_index],
287 )?;
288 }
289 }
290 Ok(())
291 }
292
293 pub fn fail_chunk_task(&self, run_id: &str, chunk_index: i64, err: &str) -> Result<()> {
294 let now = chrono::Utc::now().to_rfc3339();
295 let sql = "UPDATE chunk_task SET status = 'failed', last_error = ?1, updated_at = ?2
296 WHERE run_id = ?3 AND chunk_index = ?4";
297 match &self.conn {
298 StateConn::Sqlite(c) => {
299 c.execute(sql, rusqlite::params![err, now, run_id, chunk_index])?;
300 }
301 StateConn::Postgres(client) => {
302 let mut c = client.borrow_mut();
303 c.execute(&pg_sql(sql), &[&err, &now, &run_id, &chunk_index])?;
304 }
305 }
306 Ok(())
307 }
308
309 pub fn fail_chunk_task_at_ref(
310 state_ref: &StateRef,
311 run_id: &str,
312 chunk_index: i64,
313 err: &str,
314 ) -> Result<()> {
315 let now = chrono::Utc::now().to_rfc3339();
316 let sql = "UPDATE chunk_task SET status = 'failed', last_error = ?1, updated_at = ?2
317 WHERE run_id = ?3 AND chunk_index = ?4";
318 match state_ref {
319 StateRef::Sqlite(db_path) => {
320 let conn = open_connection(db_path)?;
321 conn.execute(sql, rusqlite::params![err, now, run_id, chunk_index])?;
322 }
323 StateRef::Postgres(url) => {
324 let mut client = postgres::Client::connect(url, postgres::NoTls)
325 .map_err(|e| anyhow::anyhow!("state(pg): connect for fail: {:#}", e))?;
326 client.execute(&pg_sql(sql), &[&err, &now, &run_id, &chunk_index])?;
327 }
328 }
329 Ok(())
330 }
331
332 pub fn complete_chunk_task_at_ref(
333 state_ref: &StateRef,
334 run_id: &str,
335 chunk_index: i64,
336 rows_written: i64,
337 file_name: Option<&str>,
338 ) -> Result<()> {
339 let now = chrono::Utc::now().to_rfc3339();
340 let sql = "UPDATE chunk_task
341 SET status = 'completed', rows_written = ?1, file_name = ?2, last_error = NULL, updated_at = ?3
342 WHERE run_id = ?4 AND chunk_index = ?5";
343 match state_ref {
344 StateRef::Sqlite(db_path) => {
345 let conn = open_connection(db_path)?;
346 conn.execute(
347 sql,
348 rusqlite::params![rows_written, file_name, now, run_id, chunk_index],
349 )?;
350 }
351 StateRef::Postgres(url) => {
352 let mut client = postgres::Client::connect(url, postgres::NoTls)
353 .map_err(|e| anyhow::anyhow!("state(pg): connect for complete: {:#}", e))?;
354 client.execute(
355 &pg_sql(sql),
356 &[&rows_written, &file_name, &now, &run_id, &chunk_index],
357 )?;
358 }
359 }
360 Ok(())
361 }
362
363 pub fn count_chunk_tasks_total(&self, run_id: &str) -> Result<usize> {
364 let sql = "SELECT COUNT(*) FROM chunk_task WHERE run_id = ?1";
365 match &self.conn {
366 StateConn::Sqlite(c) => {
367 let n: i64 = c.query_row(sql, [run_id], |row| row.get(0))?;
368 Ok(n as usize)
369 }
370 StateConn::Postgres(client) => {
371 let mut c = client.borrow_mut();
372 let row = c.query_one(&pg_sql(sql), &[&run_id])?;
373 let n: i64 = row.get(0);
374 Ok(n as usize)
375 }
376 }
377 }
378
379 pub fn count_chunk_tasks_not_completed(&self, run_id: &str) -> Result<i64> {
380 let sql = "SELECT COUNT(*) FROM chunk_task WHERE run_id = ?1 AND status != 'completed'";
381 match &self.conn {
382 StateConn::Sqlite(c) => {
383 let n: i64 = c.query_row(sql, [run_id], |row| row.get(0))?;
384 Ok(n)
385 }
386 StateConn::Postgres(client) => {
387 let mut c = client.borrow_mut();
388 let row = c.query_one(&pg_sql(sql), &[&run_id])?;
389 Ok(row.get(0))
390 }
391 }
392 }
393
394 pub fn finalize_chunk_run_completed(&self, run_id: &str) -> Result<()> {
395 let now = chrono::Utc::now().to_rfc3339();
396 let sql = "UPDATE chunk_run SET status = 'completed', updated_at = ?1 WHERE run_id = ?2";
397 match &self.conn {
398 StateConn::Sqlite(c) => {
399 c.execute(sql, rusqlite::params![now, run_id])?;
400 }
401 StateConn::Postgres(client) => {
402 let mut c = client.borrow_mut();
403 c.execute(&pg_sql(sql), &[&now, &run_id])?;
404 }
405 }
406 Ok(())
407 }
408
409 pub fn reset_chunk_task_for_re_export(
430 &self,
431 run_id: &str,
432 chunk_index: i64,
433 reason: &str,
434 ) -> Result<usize> {
435 let now = chrono::Utc::now().to_rfc3339();
436 let sql = "UPDATE chunk_task
437 SET status = 'pending',
438 attempts = 0,
439 file_name = NULL,
440 rows_written = NULL,
441 last_error = ?1,
442 updated_at = ?2
443 WHERE run_id = ?3
444 AND chunk_index = ?4
445 AND status = 'completed'";
446 match &self.conn {
447 StateConn::Sqlite(c) => {
448 let n = c.execute(sql, rusqlite::params![reason, now, run_id, chunk_index])?;
449 Ok(n)
450 }
451 StateConn::Postgres(client) => {
452 let mut c = client.borrow_mut();
453 let n = c.execute(&pg_sql(sql), &[&reason, &now, &run_id, &chunk_index])?;
454 Ok(n as usize)
455 }
456 }
457 }
458
459 pub fn reset_chunk_checkpoint(&self, export_name: &str) -> Result<usize> {
461 match &self.conn {
462 StateConn::Sqlite(c) => {
463 let run_ids: Vec<String> = {
464 let mut stmt =
465 c.prepare("SELECT run_id FROM chunk_run WHERE export_name = ?1")?;
466 let rows = stmt.query_map([export_name], |row| row.get(0))?;
467 rows.collect::<std::result::Result<Vec<_>, _>>()?
468 };
469 for rid in &run_ids {
470 let _ = c.execute("DELETE FROM chunk_task WHERE run_id = ?1", [rid]);
471 }
472 let deleted = c.execute(
473 "DELETE FROM chunk_run WHERE export_name = ?1",
474 [export_name],
475 )?;
476 Ok(deleted)
477 }
478 StateConn::Postgres(client) => {
479 let mut c = client.borrow_mut();
480 let rows = c.query(
481 "SELECT run_id FROM chunk_run WHERE export_name = $1",
482 &[&export_name],
483 )?;
484 let run_ids: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
485 for rid in &run_ids {
486 let _ = c.execute("DELETE FROM chunk_task WHERE run_id = $1", &[rid]);
487 }
488 let deleted = c.execute(
489 "DELETE FROM chunk_run WHERE export_name = $1",
490 &[&export_name],
491 )?;
492 Ok(deleted as usize)
493 }
494 }
495 }
496
497 pub fn get_latest_chunk_run(
499 &self,
500 export_name: &str,
501 ) -> Result<Option<(String, String, String, String)>> {
502 let sql = "SELECT run_id, plan_hash, status, updated_at FROM chunk_run
503 WHERE export_name = ?1 ORDER BY updated_at DESC LIMIT 1";
504 match &self.conn {
505 StateConn::Sqlite(c) => {
506 let mut stmt = c.prepare(sql)?;
507 let mut rows = stmt.query_map([export_name], |row| {
508 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
509 })?;
510 Ok(rows.next().transpose()?)
511 }
512 StateConn::Postgres(client) => {
513 let mut c = client.borrow_mut();
514 let rows = c.query(&pg_sql(sql), &[&export_name])?;
515 Ok(rows
516 .first()
517 .map(|row| (row.get(0), row.get(1), row.get(2), row.get(3))))
518 }
519 }
520 }
521
522 pub fn list_chunk_tasks_for_run(&self, run_id: &str) -> Result<Vec<ChunkTaskInfo>> {
523 let sql = "SELECT chunk_index, start_key, end_key, status, attempts, last_error, rows_written, file_name
524 FROM chunk_task WHERE run_id = ?1 ORDER BY chunk_index ASC";
525 match &self.conn {
526 StateConn::Sqlite(c) => {
527 let mut stmt = c.prepare(sql)?;
528 let rows = stmt.query_map([run_id], |row| {
529 Ok(ChunkTaskInfo {
530 chunk_index: row.get(0)?,
531 start_key: row.get(1)?,
532 end_key: row.get(2)?,
533 status: row.get(3)?,
534 attempts: row.get(4)?,
535 last_error: row.get(5)?,
536 rows_written: row.get(6)?,
537 file_name: row.get(7)?,
538 })
539 })?;
540 rows.collect::<std::result::Result<Vec<_>, _>>()
541 .map_err(Into::into)
542 }
543 StateConn::Postgres(client) => {
544 let mut c = client.borrow_mut();
545 let rows = c.query(&pg_sql(sql), &[&run_id])?;
546 Ok(rows
547 .iter()
548 .map(|row| ChunkTaskInfo {
549 chunk_index: row.get(0),
550 start_key: row.get(1),
551 end_key: row.get(2),
552 status: row.get(3),
553 attempts: row.get(4),
554 last_error: row.get(5),
555 rows_written: row.get(6),
556 file_name: row.get(7),
557 })
558 .collect())
559 }
560 }
561 }
562}
563
564#[cfg(test)]
565mod tests {
566 use super::*;
567
568 fn store_on_disk() -> (tempfile::TempDir, StateStore) {
569 let dir = tempfile::tempdir().expect("tempdir");
570 let cfg = dir.path().join("rivet.yaml");
571 std::fs::write(&cfg, "# test").expect("write cfg");
572 let s = StateStore::open(cfg.to_str().unwrap()).expect("open store");
573 (dir, s)
574 }
575
576 #[test]
577 fn chunk_claim_complete_and_finalize() {
578 let (_dir, s) = store_on_disk();
579 s.create_chunk_run("run_a", "orders", "deadbeef", 2)
580 .unwrap();
581 s.insert_chunk_tasks("run_a", &[(1, 5), (6, 10)]).unwrap();
582
583 let t0 = s.claim_next_chunk_task("run_a").unwrap().expect("claim 0");
584 assert_eq!(t0.0, 0);
585 assert_eq!(t0.1, "1");
586 assert_eq!(t0.2, "5");
587
588 s.complete_chunk_task("run_a", 0, 3, Some("part0.csv"))
589 .unwrap();
590
591 let t1 = s.claim_next_chunk_task("run_a").unwrap().expect("claim 1");
592 assert_eq!(t1.0, 1);
593 s.complete_chunk_task("run_a", 1, 2, Some("part1.csv"))
594 .unwrap();
595
596 assert_eq!(s.count_chunk_tasks_not_completed("run_a").unwrap(), 0);
597 s.finalize_chunk_run_completed("run_a").unwrap();
598 }
599
600 #[test]
601 fn chunk_fail_then_retry_until_max() {
602 let (_dir, s) = store_on_disk();
603 s.create_chunk_run("run_b", "orders", "ab", 2).unwrap();
604 s.insert_chunk_tasks("run_b", &[(1, 2)]).unwrap();
605
606 let t = s.claim_next_chunk_task("run_b").unwrap().unwrap();
607 assert_eq!(t.0, 0);
608 s.fail_chunk_task("run_b", 0, "boom").unwrap();
609
610 let t2 = s.claim_next_chunk_task("run_b").unwrap().unwrap();
611 assert_eq!(t2.0, 0);
612 s.fail_chunk_task("run_b", 0, "again").unwrap();
613
614 assert!(s.claim_next_chunk_task("run_b").unwrap().is_none());
615 assert_eq!(s.count_chunk_tasks_not_completed("run_b").unwrap(), 1);
616 }
617
618 #[test]
619 fn reset_chunk_checkpoint_clears_runs() {
620 let (_dir, s) = store_on_disk();
621 s.create_chunk_run("r1", "e", "h", 1).unwrap();
622 s.insert_chunk_tasks("r1", &[(0, 1)]).unwrap();
623 assert_eq!(s.reset_chunk_checkpoint("e").unwrap(), 1);
624 assert!(s.find_in_progress_chunk_run("e").unwrap().is_none());
625 }
626
627 #[test]
628 fn list_in_progress_exports_orders_and_deduplicates() {
629 let (_dir, s) = store_on_disk();
630 assert!(
631 s.list_export_names_with_in_progress_chunk_runs()
632 .unwrap()
633 .is_empty()
634 );
635 s.create_chunk_run("r_old", "zebra", "h", 1).unwrap();
636 s.finalize_chunk_run_completed("r_old").unwrap();
637 s.create_chunk_run("r_a", "alpha", "h1", 1).unwrap();
638 s.create_chunk_run("r_b", "beta", "h2", 1).unwrap();
639 assert_eq!(
640 s.list_export_names_with_in_progress_chunk_runs().unwrap(),
641 vec!["alpha".to_string(), "beta".to_string()]
642 );
643 }
644}