1use rusqlite::{Connection, TransactionBehavior};
2
3use crate::error::Result;
4use crate::types::CursorState;
5
6const STATE_DB_NAME: &str = ".rivet_state.db";
7
8#[cfg(test)]
10const SCHEMA_VERSION: i64 = 3;
11
12const MIGRATIONS: &[(i64, &str)] = &[
14 (
16 1,
17 "CREATE TABLE IF NOT EXISTS export_state (
18 export_name TEXT PRIMARY KEY,
19 last_cursor_value TEXT,
20 last_run_at TEXT
21 );
22 CREATE TABLE IF NOT EXISTS export_metrics (
23 id INTEGER PRIMARY KEY AUTOINCREMENT,
24 export_name TEXT NOT NULL,
25 run_at TEXT NOT NULL,
26 duration_ms INTEGER NOT NULL,
27 total_rows INTEGER NOT NULL,
28 peak_rss_mb INTEGER,
29 status TEXT NOT NULL,
30 error_message TEXT,
31 tuning_profile TEXT,
32 format TEXT,
33 mode TEXT,
34 files_produced INTEGER DEFAULT 0,
35 bytes_written INTEGER DEFAULT 0,
36 retries INTEGER DEFAULT 0,
37 validated INTEGER,
38 schema_changed INTEGER,
39 run_id TEXT
40 );
41 CREATE TABLE IF NOT EXISTS export_schema (
42 export_name TEXT PRIMARY KEY,
43 columns_json TEXT NOT NULL,
44 updated_at TEXT NOT NULL
45 );
46 CREATE TABLE IF NOT EXISTS file_manifest (
47 id INTEGER PRIMARY KEY AUTOINCREMENT,
48 run_id TEXT NOT NULL,
49 export_name TEXT NOT NULL,
50 file_name TEXT NOT NULL,
51 row_count INTEGER NOT NULL,
52 bytes INTEGER NOT NULL,
53 format TEXT NOT NULL,
54 compression TEXT,
55 created_at TEXT NOT NULL
56 );",
57 ),
58 (
60 2,
61 "CREATE TABLE IF NOT EXISTS chunk_run (
62 run_id TEXT PRIMARY KEY,
63 export_name TEXT NOT NULL,
64 plan_hash TEXT NOT NULL,
65 status TEXT NOT NULL,
66 max_chunk_attempts INTEGER NOT NULL DEFAULT 3,
67 created_at TEXT NOT NULL,
68 updated_at TEXT NOT NULL
69 );
70 CREATE INDEX IF NOT EXISTS idx_chunk_run_export_status
71 ON chunk_run(export_name, status);
72 CREATE TABLE IF NOT EXISTS chunk_task (
73 id INTEGER PRIMARY KEY AUTOINCREMENT,
74 run_id TEXT NOT NULL,
75 chunk_index INTEGER NOT NULL,
76 start_key TEXT NOT NULL,
77 end_key TEXT NOT NULL,
78 status TEXT NOT NULL,
79 attempts INTEGER NOT NULL DEFAULT 0,
80 last_error TEXT,
81 rows_written INTEGER,
82 file_name TEXT,
83 updated_at TEXT NOT NULL,
84 UNIQUE(run_id, chunk_index)
85 );
86 CREATE INDEX IF NOT EXISTS idx_chunk_task_run_status ON chunk_task(run_id, status);",
87 ),
88 (
90 3,
91 "CREATE INDEX IF NOT EXISTS idx_file_manifest_export ON file_manifest(export_name, id DESC);",
92 ),
93];
94
95fn ensure_schema_version_table(conn: &Connection) {
96 let _ = conn.execute_batch(
97 "CREATE TABLE IF NOT EXISTS schema_version (
98 version INTEGER NOT NULL
99 );",
100 );
101}
102
103fn get_current_version(conn: &Connection) -> i64 {
104 conn.query_row(
105 "SELECT COALESCE(MAX(version), 0) FROM schema_version",
106 [],
107 |row| row.get(0),
108 )
109 .unwrap_or(0)
110}
111
112fn migrate(conn: &Connection) -> Result<()> {
113 ensure_schema_version_table(conn);
114
115 let current = get_current_version(conn);
116
117 if current == 0 {
120 let has_export_state: bool = conn
121 .query_row(
122 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='export_state'",
123 [],
124 |row| row.get(0),
125 )
126 .unwrap_or(false);
127
128 if has_export_state {
129 let metrics_cols = [
130 "files_produced INTEGER DEFAULT 0",
131 "bytes_written INTEGER DEFAULT 0",
132 "retries INTEGER DEFAULT 0",
133 "validated INTEGER",
134 "schema_changed INTEGER",
135 "run_id TEXT",
136 ];
137 for col_def in &metrics_cols {
138 let sql = format!("ALTER TABLE export_metrics ADD COLUMN {}", col_def);
139 let _ = conn.execute(&sql, []);
140 }
141 }
142 }
143
144 for &(ver, sql) in MIGRATIONS {
145 if ver > current {
146 log::debug!("state: applying migration v{}", ver);
147 conn.execute_batch(sql)
148 .map_err(|e| anyhow::anyhow!("state: migration v{} failed: {}", ver, e))?;
149 conn.execute("INSERT INTO schema_version (version) VALUES (?1)", [ver])
150 .map_err(|e| anyhow::anyhow!("state: recording migration v{}: {}", ver, e))?;
151 }
152 }
153 Ok(())
154}
155
156#[derive(Debug, Clone)]
158pub struct ChunkTaskInfo {
159 pub chunk_index: i64,
160 pub start_key: String,
161 pub end_key: String,
162 pub status: String,
163 pub attempts: i64,
164 pub last_error: Option<String>,
165 pub rows_written: Option<i64>,
166 pub file_name: Option<String>,
167}
168
169pub struct StateStore {
170 conn: Connection,
171 db_path: std::path::PathBuf,
172}
173
174#[derive(Debug)]
175#[allow(dead_code)]
176pub struct ExportMetric {
177 pub export_name: String,
178 pub run_id: Option<String>,
179 pub run_at: String,
180 pub duration_ms: i64,
181 pub total_rows: i64,
182 pub peak_rss_mb: Option<i64>,
183 pub status: String,
184 pub error_message: Option<String>,
185 pub tuning_profile: Option<String>,
186 pub format: Option<String>,
187 pub mode: Option<String>,
188 pub files_produced: i64,
189 pub bytes_written: i64,
190 pub retries: i64,
191 pub validated: Option<bool>,
192 pub schema_changed: Option<bool>,
193}
194
195#[derive(Debug)]
196#[allow(dead_code)]
197pub struct FileRecord {
198 pub run_id: String,
199 pub export_name: String,
200 pub file_name: String,
201 pub row_count: i64,
202 pub bytes: i64,
203 pub format: String,
204 pub compression: Option<String>,
205 pub created_at: String,
206}
207
208#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
209pub struct SchemaColumn {
210 pub name: String,
211 #[serde(rename = "type")]
212 pub data_type: String,
213}
214
215#[derive(Debug)]
216pub struct SchemaChange {
217 pub added: Vec<String>,
218 pub removed: Vec<String>,
219 pub type_changed: Vec<(String, String, String)>, }
221
222impl SchemaChange {
223 pub fn is_empty(&self) -> bool {
224 self.added.is_empty() && self.removed.is_empty() && self.type_changed.is_empty()
225 }
226}
227
228impl StateStore {
229 pub fn open(config_path: &str) -> Result<Self> {
230 let config_dir = std::path::Path::new(config_path)
231 .parent()
232 .unwrap_or(std::path::Path::new("."));
233 let db_path = config_dir.join(STATE_DB_NAME);
234 let db_path_buf = db_path.to_path_buf();
235 let conn = Connection::open(db_path)?;
236 let _ = conn.execute_batch("PRAGMA journal_mode=WAL;");
237 migrate(&conn)?;
238 Ok(Self {
239 conn,
240 db_path: db_path_buf,
241 })
242 }
243
244 pub fn state_db_path(config_path: &str) -> std::path::PathBuf {
246 let config_dir = std::path::Path::new(config_path)
247 .parent()
248 .unwrap_or(std::path::Path::new("."));
249 config_dir.join(STATE_DB_NAME)
250 }
251
252 pub(crate) fn database_path(&self) -> &std::path::Path {
253 self.db_path.as_path()
254 }
255
256 pub fn find_in_progress_chunk_run(
260 &self,
261 export_name: &str,
262 ) -> Result<Option<(String, String)>> {
263 let mut stmt = self.conn.prepare(
264 "SELECT run_id, plan_hash FROM chunk_run
265 WHERE export_name = ?1 AND status = 'in_progress'
266 ORDER BY created_at DESC LIMIT 1",
267 )?;
268 let mut rows = stmt.query_map([export_name], |row| Ok((row.get(0)?, row.get(1)?)))?;
269 Ok(rows.next().transpose()?)
270 }
271
272 pub fn create_chunk_run(
273 &self,
274 run_id: &str,
275 export_name: &str,
276 plan_hash: &str,
277 max_chunk_attempts: u32,
278 ) -> Result<()> {
279 let now = chrono::Utc::now().to_rfc3339();
280 self.conn.execute(
281 "INSERT INTO chunk_run (run_id, export_name, plan_hash, status, max_chunk_attempts, created_at, updated_at)
282 VALUES (?1, ?2, ?3, 'in_progress', ?4, ?5, ?5)",
283 rusqlite::params![run_id, export_name, plan_hash, max_chunk_attempts as i64, now],
284 )?;
285 Ok(())
286 }
287
288 pub fn insert_chunk_tasks(&self, run_id: &str, ranges: &[(i64, i64)]) -> Result<()> {
289 let now = chrono::Utc::now().to_rfc3339();
290 let mut stmt = self.conn.prepare(
291 "INSERT INTO chunk_task (run_id, chunk_index, start_key, end_key, status, attempts, updated_at)
292 VALUES (?1, ?2, ?3, ?4, 'pending', 0, ?5)",
293 )?;
294 for (i, (start, end)) in ranges.iter().enumerate() {
295 stmt.execute(rusqlite::params![
296 run_id,
297 i as i64,
298 start.to_string(),
299 end.to_string(),
300 now,
301 ])?;
302 }
303 Ok(())
304 }
305
306 pub fn reset_stale_running_chunk_tasks(&self, run_id: &str) -> Result<usize> {
308 let now = chrono::Utc::now().to_rfc3339();
309 let n = self.conn.execute(
310 "UPDATE chunk_task SET status = 'pending', updated_at = ?1
311 WHERE run_id = ?2 AND status = 'running'",
312 rusqlite::params![now, run_id],
313 )?;
314 Ok(n)
315 }
316
317 pub fn claim_next_chunk_task(&self, run_id: &str) -> Result<Option<(i64, String, String)>> {
319 Self::claim_next_chunk_task_at_path(self.db_path.as_path(), run_id)
320 }
321
322 fn claim_next_chunk_in_tx(
323 tx: &rusqlite::Transaction<'_>,
324 now: &str,
325 run_id: &str,
326 ) -> Result<Option<(i64, String, String)>> {
327 let mut stmt = tx.prepare(
328 "UPDATE chunk_task
329 SET status = 'running', attempts = attempts + 1, updated_at = ?1
330 WHERE rowid = (
331 SELECT ct.rowid FROM chunk_task ct
332 INNER JOIN chunk_run cr ON cr.run_id = ct.run_id
333 WHERE ct.run_id = ?2
334 AND cr.status = 'in_progress'
335 AND (
336 ct.status = 'pending'
337 OR (ct.status = 'failed' AND ct.attempts < cr.max_chunk_attempts)
338 )
339 ORDER BY ct.chunk_index ASC
340 LIMIT 1
341 )
342 RETURNING chunk_index, start_key, end_key",
343 )?;
344 let mut rows = stmt.query(rusqlite::params![now, run_id])?;
345 let out = match rows.next()? {
346 Some(row) => Some((row.get(0)?, row.get(1)?, row.get(2)?)),
347 None => None,
348 };
349 Ok(out)
350 }
351
352 pub fn claim_next_chunk_task_at_path(
354 db_path: &std::path::Path,
355 run_id: &str,
356 ) -> Result<Option<(i64, String, String)>> {
357 let mut conn = Connection::open(db_path)?;
358 let now = chrono::Utc::now().to_rfc3339();
359 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
360 let res = Self::claim_next_chunk_in_tx(&tx, &now, run_id)?;
361 tx.commit()?;
362 Ok(res)
363 }
364
365 pub fn complete_chunk_task(
366 &self,
367 run_id: &str,
368 chunk_index: i64,
369 rows_written: i64,
370 file_name: Option<&str>,
371 ) -> Result<()> {
372 let now = chrono::Utc::now().to_rfc3339();
373 self.conn.execute(
374 "UPDATE chunk_task
375 SET status = 'completed', rows_written = ?1, file_name = ?2, last_error = NULL, updated_at = ?3
376 WHERE run_id = ?4 AND chunk_index = ?5",
377 rusqlite::params![rows_written, file_name, now, run_id, chunk_index],
378 )?;
379 Ok(())
380 }
381
382 pub fn fail_chunk_task(&self, run_id: &str, chunk_index: i64, err: &str) -> Result<()> {
383 let now = chrono::Utc::now().to_rfc3339();
384 self.conn.execute(
385 "UPDATE chunk_task SET status = 'failed', last_error = ?1, updated_at = ?2
386 WHERE run_id = ?3 AND chunk_index = ?4",
387 rusqlite::params![err, now, run_id, chunk_index],
388 )?;
389 Ok(())
390 }
391
392 pub fn fail_chunk_task_at_path(
393 db_path: &std::path::Path,
394 run_id: &str,
395 chunk_index: i64,
396 err: &str,
397 ) -> Result<()> {
398 let conn = Connection::open(db_path)?;
399 let now = chrono::Utc::now().to_rfc3339();
400 conn.execute(
401 "UPDATE chunk_task SET status = 'failed', last_error = ?1, updated_at = ?2
402 WHERE run_id = ?3 AND chunk_index = ?4",
403 rusqlite::params![err, now, run_id, chunk_index],
404 )?;
405 Ok(())
406 }
407
408 pub fn complete_chunk_task_at_path(
409 db_path: &std::path::Path,
410 run_id: &str,
411 chunk_index: i64,
412 rows_written: i64,
413 file_name: Option<&str>,
414 ) -> Result<()> {
415 let conn = Connection::open(db_path)?;
416 let now = chrono::Utc::now().to_rfc3339();
417 conn.execute(
418 "UPDATE chunk_task
419 SET status = 'completed', rows_written = ?1, file_name = ?2, last_error = NULL, updated_at = ?3
420 WHERE run_id = ?4 AND chunk_index = ?5",
421 rusqlite::params![rows_written, file_name, now, run_id, chunk_index],
422 )?;
423 Ok(())
424 }
425
426 pub fn count_chunk_tasks_not_completed(&self, run_id: &str) -> Result<i64> {
427 let n: i64 = self.conn.query_row(
428 "SELECT COUNT(*) FROM chunk_task WHERE run_id = ?1 AND status != 'completed'",
429 [run_id],
430 |row| row.get(0),
431 )?;
432 Ok(n)
433 }
434
435 pub fn finalize_chunk_run_completed(&self, run_id: &str) -> Result<()> {
436 let now = chrono::Utc::now().to_rfc3339();
437 self.conn.execute(
438 "UPDATE chunk_run SET status = 'completed', updated_at = ?1 WHERE run_id = ?2",
439 rusqlite::params![now, run_id],
440 )?;
441 Ok(())
442 }
443
444 pub fn reset_chunk_checkpoint(&self, export_name: &str) -> Result<usize> {
446 let run_ids: Vec<String> = {
447 let mut stmt = self
448 .conn
449 .prepare("SELECT run_id FROM chunk_run WHERE export_name = ?1")?;
450 let rows = stmt.query_map([export_name], |row| row.get(0))?;
451 rows.collect::<std::result::Result<Vec<_>, _>>()?
452 };
453 for rid in &run_ids {
454 let _ = self
455 .conn
456 .execute("DELETE FROM chunk_task WHERE run_id = ?1", [rid]);
457 }
458 let deleted = self.conn.execute(
459 "DELETE FROM chunk_run WHERE export_name = ?1",
460 [export_name],
461 )?;
462 Ok(deleted)
463 }
464
465 pub fn get_latest_chunk_run(
467 &self,
468 export_name: &str,
469 ) -> Result<Option<(String, String, String, String)>> {
470 let mut stmt = self.conn.prepare(
471 "SELECT run_id, plan_hash, status, updated_at FROM chunk_run
472 WHERE export_name = ?1 ORDER BY updated_at DESC LIMIT 1",
473 )?;
474 let mut rows = stmt.query_map([export_name], |row| {
475 Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
476 })?;
477 Ok(rows.next().transpose()?)
478 }
479
480 pub fn list_chunk_tasks_for_run(&self, run_id: &str) -> Result<Vec<ChunkTaskInfo>> {
481 let mut stmt = self.conn.prepare(
482 "SELECT chunk_index, start_key, end_key, status, attempts, last_error, rows_written, file_name
483 FROM chunk_task WHERE run_id = ?1 ORDER BY chunk_index ASC",
484 )?;
485 let rows = stmt.query_map([run_id], |row| {
486 Ok(ChunkTaskInfo {
487 chunk_index: row.get(0)?,
488 start_key: row.get(1)?,
489 end_key: row.get(2)?,
490 status: row.get(3)?,
491 attempts: row.get(4)?,
492 last_error: row.get(5)?,
493 rows_written: row.get(6)?,
494 file_name: row.get(7)?,
495 })
496 })?;
497 rows.collect::<std::result::Result<Vec<_>, _>>()
498 .map_err(Into::into)
499 }
500
501 pub fn get(&self, export_name: &str) -> Result<CursorState> {
504 let mut stmt = self.conn.prepare(
505 "SELECT last_cursor_value, last_run_at FROM export_state WHERE export_name = ?1",
506 )?;
507 let result = stmt.query_row([export_name], |row| {
508 Ok(CursorState {
509 export_name: export_name.to_string(),
510 last_cursor_value: row.get(0)?,
511 last_run_at: row.get(1)?,
512 })
513 });
514 match result {
515 Ok(state) => Ok(state),
516 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(CursorState {
517 export_name: export_name.to_string(),
518 last_cursor_value: None,
519 last_run_at: None,
520 }),
521 Err(e) => Err(e.into()),
522 }
523 }
524
525 pub fn update(&self, export_name: &str, cursor_value: &str) -> Result<()> {
526 let now = chrono::Utc::now().to_rfc3339();
527 self.conn.execute(
528 "INSERT INTO export_state (export_name, last_cursor_value, last_run_at)
529 VALUES (?1, ?2, ?3)
530 ON CONFLICT(export_name) DO UPDATE SET
531 last_cursor_value = excluded.last_cursor_value,
532 last_run_at = excluded.last_run_at",
533 rusqlite::params![export_name, cursor_value, now],
534 )?;
535 Ok(())
536 }
537
538 pub fn reset(&self, export_name: &str) -> Result<()> {
539 self.conn.execute(
540 "DELETE FROM export_state WHERE export_name = ?1",
541 [export_name],
542 )?;
543 Ok(())
544 }
545
546 pub fn list_all(&self) -> Result<Vec<CursorState>> {
547 let mut stmt = self.conn.prepare(
548 "SELECT export_name, last_cursor_value, last_run_at FROM export_state ORDER BY export_name",
549 )?;
550 let rows = stmt.query_map([], |row| {
551 Ok(CursorState {
552 export_name: row.get(0)?,
553 last_cursor_value: row.get(1)?,
554 last_run_at: row.get(2)?,
555 })
556 })?;
557 rows.collect::<std::result::Result<Vec<_>, _>>()
558 .map_err(Into::into)
559 }
560
561 #[allow(clippy::too_many_arguments)]
564 pub fn record_metric(
565 &self,
566 export_name: &str,
567 run_id: &str,
568 duration_ms: i64,
569 total_rows: i64,
570 peak_rss_mb: Option<i64>,
571 status: &str,
572 error_message: Option<&str>,
573 tuning_profile: Option<&str>,
574 format: Option<&str>,
575 mode: Option<&str>,
576 files_produced: i64,
577 bytes_written: i64,
578 retries: i64,
579 validated: Option<bool>,
580 schema_changed: Option<bool>,
581 ) -> Result<()> {
582 let now = chrono::Utc::now().to_rfc3339();
583 self.conn.execute(
584 "INSERT INTO export_metrics (export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
585 status, error_message, tuning_profile, format, mode,
586 files_produced, bytes_written, retries, validated, schema_changed)
587 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
588 rusqlite::params![
589 export_name, run_id, now, duration_ms, total_rows, peak_rss_mb,
590 status, error_message, tuning_profile, format, mode,
591 files_produced, bytes_written, retries, validated, schema_changed
592 ],
593 )?;
594 Ok(())
595 }
596
597 pub fn get_metrics(
598 &self,
599 export_name: Option<&str>,
600 limit: usize,
601 ) -> Result<Vec<ExportMetric>> {
602 let cols = "export_name, run_id, run_at, duration_ms, total_rows, peak_rss_mb,
603 status, error_message, tuning_profile, format, mode,
604 files_produced, bytes_written, retries, validated, schema_changed";
605 let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(name) =
606 export_name
607 {
608 (
609 format!(
610 "SELECT {} FROM export_metrics WHERE export_name = ?1 ORDER BY id DESC LIMIT {}",
611 cols, limit
612 ),
613 vec![Box::new(name.to_string())],
614 )
615 } else {
616 (
617 format!(
618 "SELECT {} FROM export_metrics ORDER BY id DESC LIMIT {}",
619 cols, limit
620 ),
621 vec![],
622 )
623 };
624
625 let mut stmt = self.conn.prepare(&sql)?;
626 let params_refs: Vec<&dyn rusqlite::types::ToSql> =
627 params.iter().map(|p| p.as_ref()).collect();
628 let rows = stmt.query_map(params_refs.as_slice(), |row| {
629 Ok(ExportMetric {
630 export_name: row.get(0)?,
631 run_id: row.get(1)?,
632 run_at: row.get(2)?,
633 duration_ms: row.get(3)?,
634 total_rows: row.get(4)?,
635 peak_rss_mb: row.get(5)?,
636 status: row.get(6)?,
637 error_message: row.get(7)?,
638 tuning_profile: row.get(8)?,
639 format: row.get(9)?,
640 mode: row.get(10)?,
641 files_produced: row.get::<_, Option<i64>>(11)?.unwrap_or(0),
642 bytes_written: row.get::<_, Option<i64>>(12)?.unwrap_or(0),
643 retries: row.get::<_, Option<i64>>(13)?.unwrap_or(0),
644 validated: row.get(14)?,
645 schema_changed: row.get(15)?,
646 })
647 })?;
648 rows.collect::<std::result::Result<Vec<_>, _>>()
649 .map_err(Into::into)
650 }
651
652 #[allow(clippy::too_many_arguments)]
655 pub fn record_file(
656 &self,
657 run_id: &str,
658 export_name: &str,
659 file_name: &str,
660 row_count: i64,
661 bytes: i64,
662 format: &str,
663 compression: Option<&str>,
664 ) -> Result<()> {
665 let now = chrono::Utc::now().to_rfc3339();
666 self.conn.execute(
667 "INSERT INTO file_manifest (run_id, export_name, file_name, row_count, bytes, format, compression, created_at)
668 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
669 rusqlite::params![run_id, export_name, file_name, row_count, bytes, format, compression, now],
670 )?;
671 Ok(())
672 }
673
674 pub fn get_files(&self, export_name: Option<&str>, limit: usize) -> Result<Vec<FileRecord>> {
675 let (sql, params): (String, Vec<Box<dyn rusqlite::types::ToSql>>) = if let Some(name) =
676 export_name
677 {
678 (
679 format!(
680 "SELECT run_id, export_name, file_name, row_count, bytes, format, compression, created_at
681 FROM file_manifest WHERE export_name = ?1 ORDER BY id DESC LIMIT {}",
682 limit
683 ),
684 vec![Box::new(name.to_string())],
685 )
686 } else {
687 (
688 format!(
689 "SELECT run_id, export_name, file_name, row_count, bytes, format, compression, created_at
690 FROM file_manifest ORDER BY id DESC LIMIT {}",
691 limit
692 ),
693 vec![],
694 )
695 };
696
697 let mut stmt = self.conn.prepare(&sql)?;
698 let params_refs: Vec<&dyn rusqlite::types::ToSql> =
699 params.iter().map(|p| p.as_ref()).collect();
700 let rows = stmt.query_map(params_refs.as_slice(), |row| {
701 Ok(FileRecord {
702 run_id: row.get(0)?,
703 export_name: row.get(1)?,
704 file_name: row.get(2)?,
705 row_count: row.get(3)?,
706 bytes: row.get(4)?,
707 format: row.get(5)?,
708 compression: row.get(6)?,
709 created_at: row.get(7)?,
710 })
711 })?;
712 rows.collect::<std::result::Result<Vec<_>, _>>()
713 .map_err(Into::into)
714 }
715
716 pub fn get_stored_schema(&self, export_name: &str) -> Result<Option<Vec<SchemaColumn>>> {
719 let mut stmt = self
720 .conn
721 .prepare("SELECT columns_json FROM export_schema WHERE export_name = ?1")?;
722 let result = stmt.query_row([export_name], |row| {
723 let json_str: String = row.get(0)?;
724 Ok(json_str)
725 });
726 match result {
727 Ok(json_str) => {
728 let cols: Vec<SchemaColumn> = serde_json::from_str(&json_str)?;
729 Ok(Some(cols))
730 }
731 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
732 Err(e) => Err(e.into()),
733 }
734 }
735
736 pub fn store_schema(&self, export_name: &str, columns: &[SchemaColumn]) -> Result<()> {
737 let json = serde_json::to_string(columns)?;
738 let now = chrono::Utc::now().to_rfc3339();
739 self.conn.execute(
740 "INSERT INTO export_schema (export_name, columns_json, updated_at)
741 VALUES (?1, ?2, ?3)
742 ON CONFLICT(export_name) DO UPDATE SET
743 columns_json = excluded.columns_json,
744 updated_at = excluded.updated_at",
745 rusqlite::params![export_name, json, now],
746 )?;
747 Ok(())
748 }
749
750 pub fn detect_schema_change(
751 &self,
752 export_name: &str,
753 current: &[SchemaColumn],
754 ) -> Result<Option<SchemaChange>> {
755 let stored = match self.get_stored_schema(export_name)? {
756 Some(s) => s,
757 None => {
758 self.store_schema(export_name, current)?;
759 return Ok(None);
760 }
761 };
762
763 let stored_map: std::collections::HashMap<&str, &str> = stored
764 .iter()
765 .map(|c| (c.name.as_str(), c.data_type.as_str()))
766 .collect();
767 let current_map: std::collections::HashMap<&str, &str> = current
768 .iter()
769 .map(|c| (c.name.as_str(), c.data_type.as_str()))
770 .collect();
771
772 let added: Vec<String> = current
773 .iter()
774 .filter(|c| !stored_map.contains_key(c.name.as_str()))
775 .map(|c| format!("{} ({})", c.name, c.data_type))
776 .collect();
777
778 let removed: Vec<String> = stored
779 .iter()
780 .filter(|c| !current_map.contains_key(c.name.as_str()))
781 .map(|c| c.name.clone())
782 .collect();
783
784 let type_changed: Vec<(String, String, String)> = current
785 .iter()
786 .filter_map(|c| {
787 stored_map.get(c.name.as_str()).and_then(|old_type| {
788 if *old_type != c.data_type.as_str() {
789 Some((c.name.clone(), old_type.to_string(), c.data_type.clone()))
790 } else {
791 None
792 }
793 })
794 })
795 .collect();
796
797 let change = SchemaChange {
798 added,
799 removed,
800 type_changed,
801 };
802
803 if !change.is_empty() {
804 self.store_schema(export_name, current)?;
805 Ok(Some(change))
806 } else {
807 Ok(None)
808 }
809 }
810
811 #[allow(dead_code)] pub fn open_in_memory() -> Result<Self> {
813 let conn = Connection::open_in_memory()?;
814 migrate(&conn)?;
815 Ok(Self {
816 conn,
817 db_path: std::path::PathBuf::from(":memory:"),
818 })
819 }
820}
821
822#[cfg(test)]
823mod tests {
824 use super::*;
825
826 fn store() -> StateStore {
827 StateStore::open_in_memory().expect("in-memory store")
828 }
829
830 #[test]
833 fn get_unknown_returns_empty_state() {
834 let s = store();
835 let state = s.get("nonexistent").unwrap();
836 assert!(state.last_cursor_value.is_none());
837 }
838
839 #[test]
840 fn update_then_get_returns_stored_cursor() {
841 let s = store();
842 s.update("orders", "2024-06-01").unwrap();
843 assert_eq!(
844 s.get("orders").unwrap().last_cursor_value.as_deref(),
845 Some("2024-06-01")
846 );
847 }
848
849 #[test]
850 fn update_overwrites_previous_cursor() {
851 let s = store();
852 s.update("orders", "100").unwrap();
853 s.update("orders", "200").unwrap();
854 assert_eq!(
855 s.get("orders").unwrap().last_cursor_value.as_deref(),
856 Some("200")
857 );
858 }
859
860 #[test]
861 fn reset_clears_cursor_state() {
862 let s = store();
863 s.update("orders", "100").unwrap();
864 s.reset("orders").unwrap();
865 assert!(s.get("orders").unwrap().last_cursor_value.is_none());
866 }
867
868 #[test]
869 fn list_all_on_empty_store_returns_empty() {
870 assert!(store().list_all().unwrap().is_empty());
871 }
872
873 #[test]
874 fn list_all_returns_entries_sorted_by_name() {
875 let s = store();
876 s.update("gamma", "3").unwrap();
877 s.update("alpha", "1").unwrap();
878 s.update("beta", "2").unwrap();
879 let all = s.list_all().unwrap();
880 assert_eq!(all[0].export_name, "alpha");
881 assert_eq!(all[2].export_name, "gamma");
882 }
883
884 #[test]
887 fn record_and_query_metrics() {
888 let s = store();
889 s.record_metric(
890 "orders",
891 "run_001",
892 1200,
893 50000,
894 Some(142),
895 "success",
896 None,
897 Some("safe"),
898 Some("parquet"),
899 Some("full"),
900 1,
901 4096,
902 0,
903 Some(true),
904 Some(false),
905 )
906 .unwrap();
907 s.record_metric(
908 "orders",
909 "run_002",
910 300,
911 0,
912 Some(30),
913 "failed",
914 Some("timeout"),
915 Some("safe"),
916 Some("parquet"),
917 Some("full"),
918 0,
919 0,
920 2,
921 None,
922 None,
923 )
924 .unwrap();
925
926 let metrics = s.get_metrics(Some("orders"), 10).unwrap();
927 assert_eq!(metrics.len(), 2);
928 assert_eq!(metrics[0].status, "failed");
929 assert_eq!(metrics[0].run_id.as_deref(), Some("run_002"));
930 assert_eq!(metrics[0].retries, 2);
931 assert_eq!(metrics[1].total_rows, 50000);
932 assert_eq!(metrics[1].run_id.as_deref(), Some("run_001"));
933 assert_eq!(metrics[1].files_produced, 1);
934 assert_eq!(metrics[1].bytes_written, 4096);
935 assert_eq!(metrics[1].validated, Some(true));
936 assert_eq!(metrics[1].schema_changed, Some(false));
937 }
938
939 #[test]
940 fn query_metrics_all_exports() {
941 let s = store();
942 s.record_metric(
943 "orders", "r1", 100, 1000, None, "success", None, None, None, None, 1, 500, 0, None,
944 None,
945 )
946 .unwrap();
947 s.record_metric(
948 "users", "r2", 200, 2000, None, "success", None, None, None, None, 1, 800, 0, None,
949 None,
950 )
951 .unwrap();
952
953 let metrics = s.get_metrics(None, 10).unwrap();
954 assert_eq!(metrics.len(), 2);
955 }
956
957 #[test]
958 fn metrics_limit_works() {
959 let s = store();
960 for i in 0..10 {
961 s.record_metric(
962 "t",
963 &format!("r{}", i),
964 i * 100,
965 i,
966 None,
967 "success",
968 None,
969 None,
970 None,
971 None,
972 0,
973 0,
974 0,
975 None,
976 None,
977 )
978 .unwrap();
979 }
980 let metrics = s.get_metrics(Some("t"), 3).unwrap();
981 assert_eq!(metrics.len(), 3);
982 }
983
984 #[test]
987 fn record_and_query_files() {
988 let s = store();
989 s.record_file(
990 "run_001",
991 "orders",
992 "orders_20260329.parquet",
993 50000,
994 4096,
995 "parquet",
996 Some("zstd"),
997 )
998 .unwrap();
999 s.record_file(
1000 "run_001",
1001 "orders",
1002 "orders_20260329_chunk1.parquet",
1003 25000,
1004 2048,
1005 "parquet",
1006 Some("zstd"),
1007 )
1008 .unwrap();
1009 s.record_file(
1010 "run_002",
1011 "users",
1012 "users_20260329.csv",
1013 1000,
1014 500,
1015 "csv",
1016 None,
1017 )
1018 .unwrap();
1019
1020 let files = s.get_files(Some("orders"), 10).unwrap();
1021 assert_eq!(files.len(), 2);
1022 assert_eq!(files[0].run_id, "run_001");
1023 assert_eq!(files[0].row_count, 25000);
1024
1025 let all = s.get_files(None, 10).unwrap();
1026 assert_eq!(all.len(), 3);
1027 }
1028
1029 #[test]
1030 fn files_limit_works() {
1031 let s = store();
1032 for i in 0..10 {
1033 s.record_file(
1034 &format!("r{}", i),
1035 "t",
1036 &format!("f{}.parquet", i),
1037 i,
1038 i * 100,
1039 "parquet",
1040 None,
1041 )
1042 .unwrap();
1043 }
1044 let files = s.get_files(Some("t"), 3).unwrap();
1045 assert_eq!(files.len(), 3);
1046 }
1047
1048 #[test]
1051 fn first_schema_stored_no_change() {
1052 let s = store();
1053 let cols = vec![
1054 SchemaColumn {
1055 name: "id".into(),
1056 data_type: "Int64".into(),
1057 },
1058 SchemaColumn {
1059 name: "name".into(),
1060 data_type: "Utf8".into(),
1061 },
1062 ];
1063 let change = s.detect_schema_change("orders", &cols).unwrap();
1064 assert!(change.is_none(), "first run should detect no change");
1065 assert!(s.get_stored_schema("orders").unwrap().is_some());
1066 }
1067
1068 #[test]
1069 fn same_schema_no_change() {
1070 let s = store();
1071 let cols = vec![SchemaColumn {
1072 name: "id".into(),
1073 data_type: "Int64".into(),
1074 }];
1075 s.detect_schema_change("t", &cols).unwrap();
1076 let change = s.detect_schema_change("t", &cols).unwrap();
1077 assert!(change.is_none());
1078 }
1079
1080 #[test]
1081 fn added_column_detected() {
1082 let s = store();
1083 let v1 = vec![SchemaColumn {
1084 name: "id".into(),
1085 data_type: "Int64".into(),
1086 }];
1087 s.detect_schema_change("t", &v1).unwrap();
1088
1089 let v2 = vec![
1090 SchemaColumn {
1091 name: "id".into(),
1092 data_type: "Int64".into(),
1093 },
1094 SchemaColumn {
1095 name: "email".into(),
1096 data_type: "Utf8".into(),
1097 },
1098 ];
1099 let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
1100 assert_eq!(change.added.len(), 1);
1101 assert!(change.added[0].contains("email"));
1102 }
1103
1104 #[test]
1105 fn removed_column_detected() {
1106 let s = store();
1107 let v1 = vec![
1108 SchemaColumn {
1109 name: "id".into(),
1110 data_type: "Int64".into(),
1111 },
1112 SchemaColumn {
1113 name: "old_field".into(),
1114 data_type: "Utf8".into(),
1115 },
1116 ];
1117 s.detect_schema_change("t", &v1).unwrap();
1118
1119 let v2 = vec![SchemaColumn {
1120 name: "id".into(),
1121 data_type: "Int64".into(),
1122 }];
1123 let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
1124 assert_eq!(change.removed, vec!["old_field"]);
1125 }
1126
1127 #[test]
1128 fn type_change_detected() {
1129 let s = store();
1130 let v1 = vec![SchemaColumn {
1131 name: "price".into(),
1132 data_type: "Float64".into(),
1133 }];
1134 s.detect_schema_change("t", &v1).unwrap();
1135
1136 let v2 = vec![SchemaColumn {
1137 name: "price".into(),
1138 data_type: "Utf8".into(),
1139 }];
1140 let change = s.detect_schema_change("t", &v2).unwrap().unwrap();
1141 assert_eq!(change.type_changed.len(), 1);
1142 assert_eq!(
1143 change.type_changed[0],
1144 ("price".into(), "Float64".into(), "Utf8".into())
1145 );
1146 }
1147
1148 fn store_on_disk() -> (tempfile::TempDir, StateStore) {
1151 let dir = tempfile::tempdir().expect("tempdir");
1152 let cfg = dir.path().join("rivet.yaml");
1153 std::fs::write(&cfg, "# test").expect("write cfg");
1154 let s = StateStore::open(cfg.to_str().unwrap()).expect("open store");
1155 (dir, s)
1156 }
1157
1158 #[test]
1159 fn chunk_claim_complete_and_finalize() {
1160 let (_dir, s) = store_on_disk();
1161 s.create_chunk_run("run_a", "orders", "deadbeef", 2)
1162 .unwrap();
1163 s.insert_chunk_tasks("run_a", &[(1, 5), (6, 10)]).unwrap();
1164
1165 let t0 = s.claim_next_chunk_task("run_a").unwrap().expect("claim 0");
1166 assert_eq!(t0.0, 0);
1167 assert_eq!(t0.1, "1");
1168 assert_eq!(t0.2, "5");
1169
1170 s.complete_chunk_task("run_a", 0, 3, Some("part0.csv"))
1171 .unwrap();
1172
1173 let t1 = s.claim_next_chunk_task("run_a").unwrap().expect("claim 1");
1174 assert_eq!(t1.0, 1);
1175 s.complete_chunk_task("run_a", 1, 2, Some("part1.csv"))
1176 .unwrap();
1177
1178 assert_eq!(s.count_chunk_tasks_not_completed("run_a").unwrap(), 0);
1179 s.finalize_chunk_run_completed("run_a").unwrap();
1180 }
1181
1182 #[test]
1183 fn chunk_fail_then_retry_until_max() {
1184 let (_dir, s) = store_on_disk();
1185 s.create_chunk_run("run_b", "orders", "ab", 2).unwrap();
1186 s.insert_chunk_tasks("run_b", &[(1, 2)]).unwrap();
1187
1188 let t = s.claim_next_chunk_task("run_b").unwrap().unwrap();
1189 assert_eq!(t.0, 0);
1190 s.fail_chunk_task("run_b", 0, "boom").unwrap();
1191
1192 let t2 = s.claim_next_chunk_task("run_b").unwrap().unwrap();
1193 assert_eq!(t2.0, 0);
1194 s.fail_chunk_task("run_b", 0, "again").unwrap();
1195
1196 assert!(s.claim_next_chunk_task("run_b").unwrap().is_none());
1197 assert_eq!(s.count_chunk_tasks_not_completed("run_b").unwrap(), 1);
1198 }
1199
1200 #[test]
1201 fn reset_chunk_checkpoint_clears_runs() {
1202 let (_dir, s) = store_on_disk();
1203 s.create_chunk_run("r1", "e", "h", 1).unwrap();
1204 s.insert_chunk_tasks("r1", &[(0, 1)]).unwrap();
1205 assert_eq!(s.reset_chunk_checkpoint("e").unwrap(), 1);
1206 assert!(s.find_in_progress_chunk_run("e").unwrap().is_none());
1207 }
1208
1209 #[test]
1212 fn fresh_db_reaches_latest_version() {
1213 let s = store();
1214 let ver = get_current_version(&s.conn);
1215 assert_eq!(ver, SCHEMA_VERSION);
1216 }
1217
1218 #[test]
1219 fn migration_is_idempotent() {
1220 let s = store();
1221 migrate(&s.conn).unwrap();
1222 migrate(&s.conn).unwrap();
1223 let ver = get_current_version(&s.conn);
1224 assert_eq!(ver, SCHEMA_VERSION);
1225 }
1226
1227 #[test]
1228 fn legacy_db_gets_upgraded() {
1229 let conn = Connection::open_in_memory().unwrap();
1230 conn.execute_batch(
1231 "CREATE TABLE export_state (
1232 export_name TEXT PRIMARY KEY,
1233 last_cursor_value TEXT,
1234 last_run_at TEXT
1235 );
1236 CREATE TABLE export_metrics (
1237 id INTEGER PRIMARY KEY AUTOINCREMENT,
1238 export_name TEXT NOT NULL,
1239 run_at TEXT NOT NULL,
1240 duration_ms INTEGER NOT NULL,
1241 total_rows INTEGER NOT NULL,
1242 status TEXT NOT NULL
1243 );",
1244 )
1245 .unwrap();
1246
1247 migrate(&conn).unwrap();
1248
1249 let ver = get_current_version(&conn);
1250 assert_eq!(ver, SCHEMA_VERSION);
1251
1252 let has_chunk_run: bool = conn
1253 .query_row(
1254 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='chunk_run'",
1255 [],
1256 |row| row.get(0),
1257 )
1258 .unwrap();
1259 assert!(has_chunk_run);
1260 }
1261}