1use rusqlite::TransactionBehavior;
2
3use crate::error::Result;
4
5use super::{StateConn, StateRef, StateStore, open_connection, pg_sql};
6
7#[derive(Debug, Clone)]
9pub struct ChunkTaskInfo {
10 pub chunk_index: i64,
11 pub start_key: String,
12 pub end_key: String,
13 pub status: String,
14 pub attempts: i64,
15 pub last_error: Option<String>,
16 pub rows_written: Option<i64>,
17 pub file_name: Option<String>,
18}
19
20impl StateStore {
30 pub fn list_export_names_with_in_progress_chunk_runs(&self) -> Result<Vec<String>> {
33 let sql = "SELECT DISTINCT export_name FROM chunk_run WHERE status = 'in_progress' ORDER BY export_name ASC";
34 match &self.conn {
35 StateConn::Sqlite(c) => {
36 let mut stmt = c.prepare(sql)?;
37 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
38 rows.collect::<std::result::Result<Vec<_>, _>>()
39 .map_err(Into::into)
40 }
41 StateConn::Postgres(client) => {
42 let mut c = client.borrow_mut();
43 let rows = c.query(&pg_sql(sql), &[])?;
44 Ok(rows.iter().map(|row| row.get(0)).collect())
45 }
46 }
47 }
48
49 pub fn find_in_progress_chunk_run(
51 &self,
52 export_name: &str,
53 ) -> Result<Option<(String, String)>> {
54 let sql = "SELECT run_id, plan_hash FROM chunk_run
55 WHERE export_name = ?1 AND status = 'in_progress'
56 ORDER BY created_at DESC LIMIT 1";
57 match &self.conn {
58 StateConn::Sqlite(c) => {
59 let mut stmt = c.prepare(sql)?;
60 let mut rows =
61 stmt.query_map([export_name], |row| Ok((row.get(0)?, row.get(1)?)))?;
62 Ok(rows.next().transpose()?)
63 }
64 StateConn::Postgres(client) => {
65 let mut c = client.borrow_mut();
68 let rows = c.query(&pg_sql(sql), &[&export_name])?;
69 Ok(rows.first().map(|row| (row.get(0), row.get(1))))
70 }
71 }
72 }
73
74 pub fn create_chunk_run(
75 &self,
76 run_id: &str,
77 export_name: &str,
78 plan_hash: &str,
79 max_chunk_attempts: u32,
80 ) -> Result<()> {
81 let now = chrono::Utc::now().to_rfc3339();
82 let sql = "INSERT INTO chunk_run (run_id, export_name, plan_hash, status, max_chunk_attempts, created_at, updated_at)
83 VALUES (?1, ?2, ?3, 'in_progress', ?4, ?5, ?5)";
84 match &self.conn {
85 StateConn::Sqlite(c) => {
86 c.execute(
87 sql,
88 rusqlite::params![
89 run_id,
90 export_name,
91 plan_hash,
92 max_chunk_attempts as i64,
93 now
94 ],
95 )?;
96 }
97 StateConn::Postgres(client) => {
98 let mut c = client.borrow_mut();
99 c.execute(
100 &pg_sql(sql),
101 &[
102 &run_id,
103 &export_name,
104 &plan_hash,
105 &(max_chunk_attempts as i64),
106 &now,
107 ],
108 )?;
109 }
110 }
111 Ok(())
112 }
113
114 pub fn insert_chunk_tasks(&self, run_id: &str, ranges: &[(i64, i64)]) -> Result<()> {
115 if ranges.is_empty() {
116 return Ok(());
117 }
118 let now = chrono::Utc::now().to_rfc3339();
119 match &self.conn {
120 StateConn::Sqlite(c) => {
121 let tx = c.unchecked_transaction()?;
122 {
123 let mut stmt = tx.prepare(
124 "INSERT INTO chunk_task (run_id, chunk_index, start_key, end_key, status, attempts, updated_at)
125 VALUES (?1, ?2, ?3, ?4, 'pending', 0, ?5)",
126 )?;
127 for (i, (start, end)) in ranges.iter().enumerate() {
128 stmt.execute(rusqlite::params![
129 run_id,
130 i as i64,
131 start.to_string(),
132 end.to_string(),
133 now,
134 ])?;
135 }
136 }
137 tx.commit()?;
138 }
139 StateConn::Postgres(client) => {
140 let mut c = client.borrow_mut();
141 let mut tx = c
142 .transaction()
143 .map_err(|e| anyhow::anyhow!("state(pg): begin transaction: {:#}", e))?;
144 for (i, (start, end)) in ranges.iter().enumerate() {
145 tx.execute(
146 "INSERT INTO chunk_task (run_id, chunk_index, start_key, end_key, status, attempts, updated_at)
147 VALUES ($1, $2, $3, $4, 'pending', 0, $5)",
148 &[
149 &run_id,
150 &(i as i64),
151 &start.to_string(),
152 &end.to_string(),
153 &now,
154 ],
155 )
156 .map_err(|e| anyhow::anyhow!("state(pg): insert chunk_task: {:#}", e))?;
157 }
158 tx.commit()
159 .map_err(|e| anyhow::anyhow!("state(pg): commit: {:#}", e))?;
160 }
161 }
162 Ok(())
163 }
164
165 pub fn reset_stale_running_chunk_tasks(&self, run_id: &str) -> Result<usize> {
167 let now = chrono::Utc::now().to_rfc3339();
168 let sql = "UPDATE chunk_task SET status = 'pending', updated_at = ?1
169 WHERE run_id = ?2 AND status = 'running'";
170 match &self.conn {
171 StateConn::Sqlite(c) => {
172 let n = c.execute(sql, rusqlite::params![now, run_id])?;
173 Ok(n)
174 }
175 StateConn::Postgres(client) => {
176 let mut c = client.borrow_mut();
177 let n = c.execute(&pg_sql(sql), &[&now, &run_id])?;
178 Ok(n as usize)
179 }
180 }
181 }
182
183 pub fn claim_next_chunk_task(&self, run_id: &str) -> Result<Option<(i64, String, String)>> {
185 Self::claim_next_chunk_task_at_ref(&self.state_ref, run_id)
186 }
187
188 fn claim_next_chunk_in_sqlite_tx(
189 tx: &rusqlite::Transaction<'_>,
190 now: &str,
191 run_id: &str,
192 ) -> Result<Option<(i64, String, String)>> {
193 let mut stmt = tx.prepare(
194 "UPDATE chunk_task
195 SET status = 'running', attempts = attempts + 1, updated_at = ?1
196 WHERE rowid = (
197 SELECT ct.rowid FROM chunk_task ct
198 INNER JOIN chunk_run cr ON cr.run_id = ct.run_id
199 WHERE ct.run_id = ?2
200 AND cr.status = 'in_progress'
201 AND (
202 ct.status = 'pending'
203 OR (ct.status = 'failed' AND ct.attempts < cr.max_chunk_attempts)
204 )
205 ORDER BY ct.chunk_index ASC
206 LIMIT 1
207 )
208 RETURNING chunk_index, start_key, end_key",
209 )?;
210 let mut rows = stmt.query(rusqlite::params![now, run_id])?;
211 let out = match rows.next()? {
212 Some(row) => Some((row.get(0)?, row.get(1)?, row.get(2)?)),
213 None => None,
214 };
215 Ok(out)
216 }
217
218 pub fn claim_next_chunk_task_at_ref(
221 state_ref: &StateRef,
222 run_id: &str,
223 ) -> Result<Option<(i64, String, String)>> {
224 match state_ref {
225 StateRef::Sqlite(db_path) => {
226 let mut conn = open_connection(db_path)?;
227 let now = chrono::Utc::now().to_rfc3339();
228 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
229 let res = Self::claim_next_chunk_in_sqlite_tx(&tx, &now, run_id)?;
230 tx.commit()?;
231 Ok(res)
232 }
233 StateRef::Postgres(url) => {
234 let mut client = super::connect_pg(url)?;
235 let now = chrono::Utc::now().to_rfc3339();
236 let rows = client
238 .query(
239 "UPDATE chunk_task
240 SET status = 'running', attempts = attempts + 1, updated_at = $1
241 WHERE id = (
242 SELECT ct.id FROM chunk_task ct
243 INNER JOIN chunk_run cr ON cr.run_id = ct.run_id
244 WHERE ct.run_id = $2
245 AND cr.status = 'in_progress'
246 AND (
247 ct.status = 'pending'
248 OR (ct.status = 'failed' AND ct.attempts < cr.max_chunk_attempts)
249 )
250 ORDER BY ct.chunk_index ASC
251 LIMIT 1
252 FOR UPDATE SKIP LOCKED
253 )
254 RETURNING chunk_index, start_key, end_key",
255 &[&now, &run_id],
256 )
257 .map_err(|e| anyhow::anyhow!("state(pg): claim chunk: {:#}", e))?;
258 Ok(rows.first().map(|row| (row.get(0), row.get(1), row.get(2))))
259 }
260 }
261 }
262
263 pub fn complete_chunk_task(
264 &self,
265 run_id: &str,
266 chunk_index: i64,
267 rows_written: i64,
268 file_name: Option<&str>,
269 ) -> Result<()> {
270 let now = chrono::Utc::now().to_rfc3339();
271 let sql = "UPDATE chunk_task
272 SET status = 'completed', rows_written = ?1, file_name = ?2, last_error = NULL, updated_at = ?3
273 WHERE run_id = ?4 AND chunk_index = ?5";
274 match &self.conn {
275 StateConn::Sqlite(c) => {
276 c.execute(
277 sql,
278 rusqlite::params![rows_written, file_name, now, run_id, chunk_index],
279 )?;
280 }
281 StateConn::Postgres(client) => {
282 let mut c = client.borrow_mut();
283 c.execute(
284 &pg_sql(sql),
285 &[&rows_written, &file_name, &now, &run_id, &chunk_index],
286 )?;
287 }
288 }
289 Ok(())
290 }
291
292 pub fn fail_chunk_task(&self, run_id: &str, chunk_index: i64, err: &str) -> Result<()> {
293 let now = chrono::Utc::now().to_rfc3339();
294 let sql = "UPDATE chunk_task SET status = 'failed', last_error = ?1, updated_at = ?2
295 WHERE run_id = ?3 AND chunk_index = ?4";
296 match &self.conn {
297 StateConn::Sqlite(c) => {
298 c.execute(sql, rusqlite::params![err, now, run_id, chunk_index])?;
299 }
300 StateConn::Postgres(client) => {
301 let mut c = client.borrow_mut();
302 c.execute(&pg_sql(sql), &[&err, &now, &run_id, &chunk_index])?;
303 }
304 }
305 Ok(())
306 }
307
308 pub fn fail_chunk_task_at_ref(
309 state_ref: &StateRef,
310 run_id: &str,
311 chunk_index: i64,
312 err: &str,
313 ) -> Result<()> {
314 let now = chrono::Utc::now().to_rfc3339();
315 let sql = "UPDATE chunk_task SET status = 'failed', last_error = ?1, updated_at = ?2
316 WHERE run_id = ?3 AND chunk_index = ?4";
317 match state_ref {
318 StateRef::Sqlite(db_path) => {
319 let conn = open_connection(db_path)?;
320 conn.execute(sql, rusqlite::params![err, now, run_id, chunk_index])?;
321 }
322 StateRef::Postgres(url) => {
323 let mut client = super::connect_pg(url)?;
324 client.execute(&pg_sql(sql), &[&err, &now, &run_id, &chunk_index])?;
325 }
326 }
327 Ok(())
328 }
329
330 pub fn complete_chunk_task_at_ref(
331 state_ref: &StateRef,
332 run_id: &str,
333 chunk_index: i64,
334 rows_written: i64,
335 file_name: Option<&str>,
336 ) -> Result<()> {
337 let now = chrono::Utc::now().to_rfc3339();
338 let sql = "UPDATE chunk_task
339 SET status = 'completed', rows_written = ?1, file_name = ?2, last_error = NULL, updated_at = ?3
340 WHERE run_id = ?4 AND chunk_index = ?5";
341 match state_ref {
342 StateRef::Sqlite(db_path) => {
343 let conn = open_connection(db_path)?;
344 conn.execute(
345 sql,
346 rusqlite::params![rows_written, file_name, now, run_id, chunk_index],
347 )?;
348 }
349 StateRef::Postgres(url) => {
350 let mut client = super::connect_pg(url)?;
351 client.execute(
352 &pg_sql(sql),
353 &[&rows_written, &file_name, &now, &run_id, &chunk_index],
354 )?;
355 }
356 }
357 Ok(())
358 }
359
360 pub fn count_chunk_tasks_total(&self, run_id: &str) -> Result<usize> {
361 let sql = "SELECT COUNT(*) FROM chunk_task WHERE run_id = ?1";
362 match &self.conn {
363 StateConn::Sqlite(c) => {
364 let n: i64 = c.query_row(sql, [run_id], |row| row.get(0))?;
365 Ok(n as usize)
366 }
367 StateConn::Postgres(client) => {
368 let mut c = client.borrow_mut();
369 let row = c.query_one(&pg_sql(sql), &[&run_id])?;
370 let n: i64 = row.get(0);
371 Ok(n as usize)
372 }
373 }
374 }
375
376 pub fn count_chunk_tasks_not_completed(&self, run_id: &str) -> Result<i64> {
377 let sql = "SELECT COUNT(*) FROM chunk_task WHERE run_id = ?1 AND status != 'completed'";
378 match &self.conn {
379 StateConn::Sqlite(c) => {
380 let n: i64 = c.query_row(sql, [run_id], |row| row.get(0))?;
381 Ok(n)
382 }
383 StateConn::Postgres(client) => {
384 let mut c = client.borrow_mut();
385 let row = c.query_one(&pg_sql(sql), &[&run_id])?;
386 Ok(row.get(0))
387 }
388 }
389 }
390
391 pub fn finalize_chunk_run_completed(&self, run_id: &str) -> Result<()> {
392 let now = chrono::Utc::now().to_rfc3339();
393 let sql = "UPDATE chunk_run SET status = 'completed', updated_at = ?1 WHERE run_id = ?2";
394 match &self.conn {
395 StateConn::Sqlite(c) => {
396 c.execute(sql, rusqlite::params![now, run_id])?;
397 }
398 StateConn::Postgres(client) => {
399 let mut c = client.borrow_mut();
400 c.execute(&pg_sql(sql), &[&now, &run_id])?;
401 }
402 }
403 Ok(())
404 }
405
406 pub fn reset_chunk_task_for_re_export(
427 &self,
428 run_id: &str,
429 chunk_index: i64,
430 reason: &str,
431 ) -> Result<usize> {
432 let now = chrono::Utc::now().to_rfc3339();
433 let sql = "UPDATE chunk_task
434 SET status = 'pending',
435 attempts = 0,
436 file_name = NULL,
437 rows_written = NULL,
438 last_error = ?1,
439 updated_at = ?2
440 WHERE run_id = ?3
441 AND chunk_index = ?4
442 AND status = 'completed'";
443 match &self.conn {
444 StateConn::Sqlite(c) => {
445 let n = c.execute(sql, rusqlite::params![reason, now, run_id, chunk_index])?;
446 Ok(n)
447 }
448 StateConn::Postgres(client) => {
449 let mut c = client.borrow_mut();
450 let n = c.execute(&pg_sql(sql), &[&reason, &now, &run_id, &chunk_index])?;
451 Ok(n as usize)
452 }
453 }
454 }
455
456 pub fn reset_chunk_checkpoint(&self, export_name: &str) -> Result<usize> {
458 match &self.conn {
459 StateConn::Sqlite(c) => {
460 let run_ids: Vec<String> = {
461 let mut stmt =
462 c.prepare("SELECT run_id FROM chunk_run WHERE export_name = ?1")?;
463 let rows = stmt.query_map([export_name], |row| row.get(0))?;
464 rows.collect::<std::result::Result<Vec<_>, _>>()?
465 };
466 for rid in &run_ids {
467 let _ = c.execute("DELETE FROM chunk_task WHERE run_id = ?1", [rid]);
468 }
469 let deleted = c.execute(
470 "DELETE FROM chunk_run WHERE export_name = ?1",
471 [export_name],
472 )?;
473 Ok(deleted)
474 }
475 StateConn::Postgres(client) => {
476 let mut c = client.borrow_mut();
477 let rows = c.query(
478 "SELECT run_id FROM chunk_run WHERE export_name = $1",
479 &[&export_name],
480 )?;
481 let run_ids: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
482 for rid in &run_ids {
483 let _ = c.execute("DELETE FROM chunk_task WHERE run_id = $1", &[rid]);
484 }
485 let deleted = c.execute(
486 "DELETE FROM chunk_run WHERE export_name = $1",
487 &[&export_name],
488 )?;
489 Ok(deleted as usize)
490 }
491 }
492 }
493
494 pub fn get_latest_chunk_run(
496 &self,
497 export_name: &str,
498 ) -> Result<Option<(String, String, String, String)>> {
499 let sql = "SELECT run_id, plan_hash, status, updated_at FROM chunk_run
500 WHERE export_name = ?1 ORDER BY updated_at DESC LIMIT 1";
501 match &self.conn {
502 StateConn::Sqlite(c) => {
503 let mut stmt = c.prepare(sql)?;
504 let mut rows = stmt.query_map([export_name], |row| {
505 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
506 })?;
507 Ok(rows.next().transpose()?)
508 }
509 StateConn::Postgres(client) => {
510 let mut c = client.borrow_mut();
511 let rows = c.query(&pg_sql(sql), &[&export_name])?;
512 Ok(rows
513 .first()
514 .map(|row| (row.get(0), row.get(1), row.get(2), row.get(3))))
515 }
516 }
517 }
518
519 pub fn list_chunk_tasks_for_run(&self, run_id: &str) -> Result<Vec<ChunkTaskInfo>> {
520 let sql = "SELECT chunk_index, start_key, end_key, status, attempts, last_error, rows_written, file_name
521 FROM chunk_task WHERE run_id = ?1 ORDER BY chunk_index ASC";
522 match &self.conn {
523 StateConn::Sqlite(c) => {
524 let mut stmt = c.prepare(sql)?;
525 let rows = stmt.query_map([run_id], |row| {
526 Ok(ChunkTaskInfo {
527 chunk_index: row.get(0)?,
528 start_key: row.get(1)?,
529 end_key: row.get(2)?,
530 status: row.get(3)?,
531 attempts: row.get(4)?,
532 last_error: row.get(5)?,
533 rows_written: row.get(6)?,
534 file_name: row.get(7)?,
535 })
536 })?;
537 rows.collect::<std::result::Result<Vec<_>, _>>()
538 .map_err(Into::into)
539 }
540 StateConn::Postgres(client) => {
541 let mut c = client.borrow_mut();
542 let rows = c.query(&pg_sql(sql), &[&run_id])?;
543 Ok(rows
544 .iter()
545 .map(|row| ChunkTaskInfo {
546 chunk_index: row.get(0),
547 start_key: row.get(1),
548 end_key: row.get(2),
549 status: row.get(3),
550 attempts: row.get(4),
551 last_error: row.get(5),
552 rows_written: row.get(6),
553 file_name: row.get(7),
554 })
555 .collect())
556 }
557 }
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use super::*;
564
565 fn store_on_disk() -> (tempfile::TempDir, StateStore) {
566 let dir = tempfile::tempdir().expect("tempdir");
567 let cfg = dir.path().join("rivet.yaml");
568 std::fs::write(&cfg, "# test").expect("write cfg");
569 let s = StateStore::open(cfg.to_str().unwrap()).expect("open store");
570 (dir, s)
571 }
572
573 #[test]
574 fn chunk_claim_complete_and_finalize() {
575 let (_dir, s) = store_on_disk();
576 s.create_chunk_run("run_a", "orders", "deadbeef", 2)
577 .unwrap();
578 s.insert_chunk_tasks("run_a", &[(1, 5), (6, 10)]).unwrap();
579
580 let t0 = s.claim_next_chunk_task("run_a").unwrap().expect("claim 0");
581 assert_eq!(t0.0, 0);
582 assert_eq!(t0.1, "1");
583 assert_eq!(t0.2, "5");
584
585 s.complete_chunk_task("run_a", 0, 3, Some("part0.csv"))
586 .unwrap();
587
588 let t1 = s.claim_next_chunk_task("run_a").unwrap().expect("claim 1");
589 assert_eq!(t1.0, 1);
590 s.complete_chunk_task("run_a", 1, 2, Some("part1.csv"))
591 .unwrap();
592
593 assert_eq!(s.count_chunk_tasks_not_completed("run_a").unwrap(), 0);
594 s.finalize_chunk_run_completed("run_a").unwrap();
595 }
596
597 #[test]
598 fn chunk_fail_then_retry_until_max() {
599 let (_dir, s) = store_on_disk();
600 s.create_chunk_run("run_b", "orders", "ab", 2).unwrap();
601 s.insert_chunk_tasks("run_b", &[(1, 2)]).unwrap();
602
603 let t = s.claim_next_chunk_task("run_b").unwrap().unwrap();
604 assert_eq!(t.0, 0);
605 s.fail_chunk_task("run_b", 0, "boom").unwrap();
606
607 let t2 = s.claim_next_chunk_task("run_b").unwrap().unwrap();
608 assert_eq!(t2.0, 0);
609 s.fail_chunk_task("run_b", 0, "again").unwrap();
610
611 assert!(s.claim_next_chunk_task("run_b").unwrap().is_none());
612 assert_eq!(s.count_chunk_tasks_not_completed("run_b").unwrap(), 1);
613 }
614
615 #[test]
616 fn reset_chunk_checkpoint_clears_runs() {
617 let (_dir, s) = store_on_disk();
618 s.create_chunk_run("r1", "e", "h", 1).unwrap();
619 s.insert_chunk_tasks("r1", &[(0, 1)]).unwrap();
620 assert_eq!(s.reset_chunk_checkpoint("e").unwrap(), 1);
621 assert!(s.find_in_progress_chunk_run("e").unwrap().is_none());
622 }
623
624 #[test]
625 fn list_in_progress_exports_orders_and_deduplicates() {
626 let (_dir, s) = store_on_disk();
627 assert!(
628 s.list_export_names_with_in_progress_chunk_runs()
629 .unwrap()
630 .is_empty()
631 );
632 s.create_chunk_run("r_old", "zebra", "h", 1).unwrap();
633 s.finalize_chunk_run_completed("r_old").unwrap();
634 s.create_chunk_run("r_a", "alpha", "h1", 1).unwrap();
635 s.create_chunk_run("r_b", "beta", "h2", 1).unwrap();
636 assert_eq!(
637 s.list_export_names_with_in_progress_chunk_runs().unwrap(),
638 vec!["alpha".to_string(), "beta".to_string()]
639 );
640 }
641}