1use super::format_bytes;
7use crate::config::Config;
8use crate::error::Result;
9use crate::journal::RunEvent;
10use crate::state::StateStore;
11
12pub fn show_state(config_path: &str) -> Result<()> {
13 let state = StateStore::open(config_path)?;
14 let states = state.list_all()?;
15 if states.is_empty() {
16 println!("No export state recorded yet.");
17 return Ok(());
18 }
19 println!("{:<30} {:<40} LAST RUN", "EXPORT", "LAST CURSOR");
20 println!("{}", "-".repeat(90));
21 for s in &states {
22 println!(
23 "{:<30} {:<40} {}",
24 s.export_name,
25 s.last_cursor_value.as_deref().unwrap_or("-"),
26 s.last_run_at.as_deref().unwrap_or("-"),
27 );
28 }
29 Ok(())
30}
31
32pub fn show_progression(config_path: &str, export_name: Option<&str>) -> Result<()> {
34 let state = StateStore::open(config_path)?;
35 let entries = match export_name {
36 Some(name) => vec![state.get_progression(name)?],
37 None => state.list_progression()?,
38 };
39 let has_any = entries
40 .iter()
41 .any(|p| p.committed.is_some() || p.verified.is_some());
42 if !has_any {
43 println!("No progression boundaries recorded yet.");
44 return Ok(());
45 }
46
47 println!(
48 "{:<30} {:<12} {:<30} {:<25} {:<12} {:<30}",
49 "EXPORT", "COMM MODE", "COMMITTED", "COMMITTED AT", "VERI MODE", "VERIFIED"
50 );
51 println!("{}", "-".repeat(145));
52 for p in &entries {
53 let (c_mode, c_val, c_at) = match &p.committed {
54 Some(b) => (
55 b.strategy.as_str().to_string(),
56 boundary_value(b),
57 b.at.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
58 ),
59 None => ("-".into(), "-".into(), "-".into()),
60 };
61 let (v_mode, v_val) = match &p.verified {
62 Some(b) => (b.strategy.as_str().to_string(), boundary_value(b)),
63 None => ("-".into(), "-".into()),
64 };
65 println!(
66 "{:<30} {:<12} {:<30} {:<25} {:<12} {:<30}",
67 p.export_name, c_mode, c_val, c_at, v_mode, v_val
68 );
69 }
70 Ok(())
71}
72
73fn boundary_value(b: &crate::state::Boundary) -> String {
74 if let Some(c) = &b.cursor {
75 c.clone()
76 } else if let Some(idx) = b.chunk_index {
77 format!("chunk #{idx}")
78 } else {
79 "-".into()
80 }
81}
82
83pub fn reset_state(config_path: &str, export_name: &str) -> Result<()> {
84 let config = crate::config::Config::load(config_path)?;
91 if !config.exports.iter().any(|e| e.name == export_name) {
92 let known: Vec<String> = config.exports.iter().map(|e| e.name.clone()).collect();
93 anyhow::bail!(
94 "export '{}' not found in config '{}'.\n Known exports: {}\n Hint: check the spelling, or run `rivet state show -c {}` to see what is currently tracked.",
95 export_name,
96 config_path,
97 if known.is_empty() {
98 "(none defined)".to_string()
99 } else {
100 known.join(", ")
101 },
102 config_path,
103 );
104 }
105 let state = StateStore::open(config_path)?;
106 state.reset(export_name)?;
107 println!("State reset for export '{}'", export_name);
108 Ok(())
109}
110
111pub fn show_files(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
112 let state = StateStore::open(config_path)?;
113 let files = state.get_files(export_name, limit)?;
114 if files.is_empty() {
115 println!("No files recorded yet.");
116 return Ok(());
117 }
118 println!(
119 "{:<35} {:<40} {:>8} {:>10} CREATED",
120 "RUN ID", "FILE", "ROWS", "BYTES"
121 );
122 println!("{}", "-".repeat(110));
123 for f in &files {
124 println!(
125 "{:<35} {:<40} {:>8} {:>10} {}",
126 f.run_id,
127 f.file_name,
128 f.row_count,
129 format_bytes(f.bytes as u64),
130 f.created_at,
131 );
132 }
133 Ok(())
134}
135
136pub fn show_metrics(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
137 let state = StateStore::open(config_path)?;
138 let metrics = state.get_metrics(export_name, limit)?;
139 if metrics.is_empty() {
140 println!("No metrics recorded yet.");
141 return Ok(());
142 }
143 println!(
144 "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} RUN ID",
145 "EXPORT", "STATUS", "ROWS", "DURATION", "RSS", "FILES", "BYTES"
146 );
147 println!("{}", "-".repeat(110));
148 for m in &metrics {
149 let duration = if m.duration_ms >= 1000 {
150 format!("{:.1}s", m.duration_ms as f64 / 1000.0)
151 } else {
152 format!("{}ms", m.duration_ms)
153 };
154 let rss = m
155 .peak_rss_mb
156 .map(|r| format!("{}MB", r))
157 .unwrap_or_else(|| "-".into());
158 let bytes = if m.bytes_written > 0 {
159 format_bytes(m.bytes_written as u64)
160 } else {
161 "-".into()
162 };
163 let run_id = m.run_id.as_deref().unwrap_or(&m.run_at);
164 println!(
165 "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} {}",
166 m.export_name, m.status, m.total_rows, duration, rss, m.files_produced, bytes, run_id
167 );
168 if let Some(err) = &m.error_message {
169 println!(" Error: {}", err);
170 }
171 let mut flags = Vec::new();
172 if m.retries > 0 {
173 flags.push(format!("retries={}", m.retries));
174 }
175 if let Some(v) = m.validated {
176 flags.push(format!("validated={}", if v { "pass" } else { "FAIL" }));
177 }
178 if let Some(sc) = m.schema_changed {
179 flags.push(format!("schema={}", if sc { "CHANGED" } else { "ok" }));
180 }
181 if !flags.is_empty() {
182 println!(" {}", flags.join(" "));
183 }
184 }
185 Ok(())
186}
187
188pub fn reset_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
189 let state = StateStore::open(config_path)?;
190 let n = state.reset_chunk_checkpoint(export_name)?;
191 println!(
192 "Removed {} chunk run record(s) for export '{}'.",
193 n, export_name
194 );
195 Ok(())
196}
197
198pub fn reset_chunk_checkpoints_stuck(config_path: &str) -> Result<()> {
203 let cfg = Config::load(config_path)?;
204 let allowed: std::collections::HashSet<&str> =
205 cfg.exports.iter().map(|e| e.name.as_str()).collect();
206 let state = StateStore::open(config_path)?;
207 let stuck = state.list_export_names_with_in_progress_chunk_runs()?;
208 if stuck.is_empty() {
209 println!("No exports have an in-progress chunk checkpoint run.");
210 println!(
211 "(Nothing with chunk_run.status = 'in_progress' in {}.)",
212 StateStore::state_db_path(config_path).display()
213 );
214 return Ok(());
215 }
216
217 let mut skipped_not_in_config = Vec::new();
218 let mut targets = Vec::new();
219 for name in stuck {
220 if allowed.contains(name.as_str()) {
221 targets.push(name);
222 } else {
223 skipped_not_in_config.push(name);
224 }
225 }
226
227 for name in &skipped_not_in_config {
228 println!(
229 "Skipping '{}' — chunk checkpoint still in_progress but this export is not in the config.",
230 name
231 );
232 }
233
234 if targets.is_empty() {
235 println!(
236 "No matching exports to reset (none of the in-progress runs belong to exports in this config)."
237 );
238 return Ok(());
239 }
240
241 println!(
242 "Resetting chunk checkpoints for {} export(s) with in_progress runs: {}",
243 targets.len(),
244 targets.join(", ")
245 );
246
247 for name in targets {
248 let n = state.reset_chunk_checkpoint(&name)?;
249 println!("Removed {} chunk run record(s) for export '{}'.", n, name);
250 }
251 Ok(())
252}
253
254pub fn show_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
255 let state = StateStore::open(config_path)?;
256 println!(
257 "database: {}",
258 StateStore::state_db_path(config_path).display()
259 );
260 let Some((run_id, plan_hash, status, updated_at)) = state.get_latest_chunk_run(export_name)?
261 else {
262 println!("No chunk checkpoint data for export '{}'.", export_name);
263 return Ok(());
264 };
265 println!("export: {}", export_name);
266 println!("run_id: {}", run_id);
267 println!("plan_hash: {}", plan_hash);
268 println!("status: {}", status);
269 println!("updated_at: {}", updated_at);
270 println!();
271 println!(
272 "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} FILE",
273 "IDX", "STATUS", "START", "END", "ATT", "ROWS"
274 );
275 println!("{}", "-".repeat(90));
276 for t in state.list_chunk_tasks_for_run(&run_id)? {
277 let file = t.file_name.as_deref().unwrap_or("-");
278 let rows = t
279 .rows_written
280 .map(|r| r.to_string())
281 .unwrap_or_else(|| "-".into());
282 println!(
283 "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} {}",
284 t.chunk_index, t.status, t.start_key, t.end_key, t.attempts, rows, file
285 );
286 if let Some(e) = &t.last_error {
287 println!(" error: {}", e);
288 }
289 }
290 Ok(())
291}
292
293pub fn show_journal(
298 config_path: &str,
299 export_name: &str,
300 limit: usize,
301 run_id: Option<&str>,
302) -> Result<()> {
303 let state = StateStore::open(config_path)?;
304
305 let journals = if let Some(rid) = run_id {
306 match state.load_journal(rid)? {
307 Some(j) => vec![j],
308 None => {
309 println!("No journal found for run_id '{rid}'.");
310 return Ok(());
311 }
312 }
313 } else {
314 state.recent_journals(export_name, limit)?
315 };
316
317 if journals.is_empty() {
318 println!("No journal entries for export '{export_name}' yet.");
319 println!("Journals are recorded after each `rivet run`.");
320 return Ok(());
321 }
322
323 for journal in &journals {
324 let outcome = journal.final_outcome().and_then(|e| {
326 if let RunEvent::RunCompleted {
327 status,
328 duration_ms,
329 ..
330 } = &e.event
331 {
332 Some((status.as_str(), *duration_ms))
333 } else {
334 None
335 }
336 });
337 let (status_str, duration_str) = match outcome {
338 Some((s, ms)) if ms >= 1000 => (s, format!("{:.1}s", ms as f64 / 1000.0)),
339 Some((s, ms)) => (s, format!("{ms}ms")),
340 None => ("(incomplete)", String::new()),
341 };
342 let icon = match status_str {
343 "success" => "✓",
344 "failed" => "✗",
345 _ => "•",
346 };
347 println!(
348 "\n{icon} {export} {status} {dur}",
349 export = journal.export_name,
350 status = status_str,
351 dur = duration_str,
352 );
353 println!(" run_id: {}", journal.run_id);
354
355 let files = journal.files();
357 if !files.is_empty() {
358 let total_rows: i64 = files
359 .iter()
360 .filter_map(|e| {
361 if let RunEvent::FileWritten { rows, .. } = &e.event {
362 Some(*rows)
363 } else {
364 None
365 }
366 })
367 .sum();
368 let total_bytes: u64 = files
369 .iter()
370 .filter_map(|e| {
371 if let RunEvent::FileWritten { bytes, .. } = &e.event {
372 Some(*bytes)
373 } else {
374 None
375 }
376 })
377 .sum();
378 println!(
379 " files: {} rows: {} size: {}",
380 files.len(),
381 total_rows,
382 format_bytes(total_bytes),
383 );
384 }
385
386 let retries = journal.retries();
387 if !retries.is_empty() {
388 println!(" retries: {}", retries.len());
389 }
390
391 for e in journal.quality_issues() {
392 if let RunEvent::QualityIssue { severity, message } = &e.event {
393 println!(" quality [{severity}]: {message}");
394 }
395 }
396
397 for e in journal.schema_changes() {
398 if let RunEvent::SchemaChanged {
399 added,
400 removed,
401 type_changed,
402 } = &e.event
403 {
404 if !added.is_empty() {
405 println!(" schema: +{}", added.join(", +"));
406 }
407 if !removed.is_empty() {
408 println!(" schema: -{}", removed.join(", -"));
409 }
410 for (col, old, new) in type_changed {
411 println!(" schema: {col} {old}→{new}");
412 }
413 }
414 }
415
416 if let Some(e) = journal.final_outcome()
417 && let RunEvent::RunCompleted {
418 error_message: Some(err),
419 ..
420 } = &e.event
421 {
422 let first_line = err.lines().next().unwrap_or(err);
423 println!(" error: {first_line}");
424 }
425 }
426 println!();
427 Ok(())
428}
429
430#[cfg(test)]
431mod tests {
432 use super::*;
433 use crate::journal::{RunEvent, RunJournal};
434
435 fn setup_dir() -> (tempfile::TempDir, String) {
438 let dir = tempfile::TempDir::new().unwrap();
439 let config_path = dir.path().join("rivet.yaml").to_str().unwrap().to_string();
441 (dir, config_path)
442 }
443
444 fn write_two_export_config(config_path: &str) {
445 std::fs::write(
446 config_path,
447 br#"source:
448 type: postgres
449 url: postgresql://localhost/testdb
450exports:
451 - name: transactions
452 query: "SELECT 1"
453 mode: full
454 format: parquet
455 destination:
456 type: local
457 path: ./out
458 - name: orders
459 query: "SELECT 1"
460 mode: full
461 format: parquet
462 destination:
463 type: local
464 path: ./out
465"#,
466 )
467 .unwrap();
468 }
469
470 fn write_single_export_config(config_path: &str) {
471 std::fs::write(
472 config_path,
473 br#"source:
474 type: postgres
475 url: postgresql://localhost/testdb
476exports:
477 - name: transactions
478 query: "SELECT 1"
479 mode: full
480 format: parquet
481 destination:
482 type: local
483 path: ./out
484"#,
485 )
486 .unwrap();
487 }
488
489 fn open_state(dir: &tempfile::TempDir) -> StateStore {
490 let db_path = dir.path().join(".rivet_state.db");
491 StateStore::open_at_path(&db_path).unwrap()
492 }
493
494 fn make_journal(run_id: &str, export: &str) -> RunJournal {
495 let mut j = RunJournal::new(run_id, export);
496 j.record(RunEvent::FileWritten {
497 file_name: "part0.parquet".into(),
498 rows: 1_000,
499 bytes: 65_536,
500 part_index: 0,
501 });
502 j.record(RunEvent::RunCompleted {
503 status: "success".into(),
504 error_message: None,
505 duration_ms: 1_500,
506 });
507 j
508 }
509
510 fn make_boundary(cursor: Option<&str>, chunk_index: Option<i64>) -> crate::state::Boundary {
513 crate::state::Boundary {
514 strategy: "incremental".into(),
515 run_id: None,
516 cursor: cursor.map(|s| s.to_string()),
517 chunk_index,
518 at: chrono::Utc::now(),
519 }
520 }
521
522 #[test]
523 fn boundary_value_cursor_takes_precedence_over_chunk_index() {
524 let b = make_boundary(Some("2025-01-15"), Some(42));
525 assert_eq!(boundary_value(&b), "2025-01-15");
526 }
527
528 #[test]
529 fn boundary_value_chunk_index_used_when_no_cursor() {
530 let b = make_boundary(None, Some(7));
531 assert_eq!(boundary_value(&b), "chunk #7");
532 }
533
534 #[test]
535 fn boundary_value_dash_when_neither_set() {
536 let b = make_boundary(None, None);
537 assert_eq!(boundary_value(&b), "-");
538 }
539
540 #[test]
543 fn show_state_empty_db_returns_ok() {
544 let (dir, config_path) = setup_dir();
545 let _ = open_state(&dir); assert!(show_state(&config_path).is_ok());
547 }
548
549 #[test]
550 fn show_state_with_cursor_record_returns_ok() {
551 let (dir, config_path) = setup_dir();
552 let state = open_state(&dir);
553 state.update("orders", "2025-01-15").unwrap();
554 drop(state);
555 assert!(show_state(&config_path).is_ok());
556 }
557
558 #[test]
561 fn show_files_empty_returns_ok() {
562 let (dir, config_path) = setup_dir();
563 let _ = open_state(&dir);
564 assert!(show_files(&config_path, None, 10).is_ok());
565 }
566
567 #[test]
568 fn show_files_with_record_returns_ok() {
569 let (dir, config_path) = setup_dir();
570 let state = open_state(&dir);
571 state
572 .record_file(
573 "r1",
574 "orders",
575 "orders_001.parquet",
576 50_000,
577 4096,
578 "parquet",
579 Some("zstd"),
580 )
581 .unwrap();
582 drop(state);
583 assert!(show_files(&config_path, Some("orders"), 10).is_ok());
584 }
585
586 #[test]
589 fn show_metrics_empty_returns_ok() {
590 let (dir, config_path) = setup_dir();
591 let _ = open_state(&dir);
592 assert!(show_metrics(&config_path, None, 10).is_ok());
593 }
594
595 #[test]
596 fn show_metrics_exercises_flag_and_duration_paths() {
597 let (dir, config_path) = setup_dir();
600 let state = open_state(&dir);
601 state
603 .record_metric(
604 "orders",
605 "r1",
606 1_500,
607 50_000,
608 Some(42),
609 "success",
610 None,
611 Some("balanced"),
612 Some("parquet"),
613 Some("full"),
614 1,
615 4096,
616 3,
617 Some(true),
618 Some(true),
619 )
620 .unwrap();
621 state
623 .record_metric(
624 "orders",
625 "r2",
626 800,
627 0,
628 None,
629 "failed",
630 Some("timeout"),
631 None,
632 None,
633 None,
634 0,
635 0,
636 0,
637 Some(false),
638 None,
639 )
640 .unwrap();
641 drop(state);
642 assert!(show_metrics(&config_path, Some("orders"), 10).is_ok());
643 }
644
645 #[test]
648 fn show_journal_empty_returns_ok() {
649 let (dir, config_path) = setup_dir();
650 let _ = open_state(&dir);
651 assert!(show_journal(&config_path, "orders", 5, None).is_ok());
652 }
653
654 #[test]
655 fn show_journal_with_entry_returns_ok() {
656 let (dir, config_path) = setup_dir();
657 let state = open_state(&dir);
658 state
659 .store_journal(&make_journal("run_001", "orders"))
660 .unwrap();
661 drop(state);
662 assert!(show_journal(&config_path, "orders", 5, None).is_ok());
663 }
664
665 #[test]
666 fn show_journal_by_run_id_not_found_returns_ok() {
667 let (dir, config_path) = setup_dir();
668 let _ = open_state(&dir);
669 assert!(show_journal(&config_path, "orders", 5, Some("no_such_run")).is_ok());
670 }
671
672 #[test]
673 fn show_journal_by_run_id_found_returns_ok() {
674 let (dir, config_path) = setup_dir();
675 let state = open_state(&dir);
676 state
677 .store_journal(&make_journal("run_xyz", "orders"))
678 .unwrap();
679 drop(state);
680 assert!(show_journal(&config_path, "orders", 5, Some("run_xyz")).is_ok());
681 }
682
683 #[test]
686 fn reset_state_returns_ok() {
687 let (dir, config_path) = setup_dir();
688 write_two_export_config(&config_path);
689 let state = open_state(&dir);
690 state.update("orders", "100").unwrap();
691 drop(state);
692 assert!(reset_state(&config_path, "orders").is_ok());
693 }
694
695 #[test]
701 fn reset_state_unknown_export_bails_with_hint() {
702 let (_dir, config_path) = setup_dir();
703 write_two_export_config(&config_path);
704 let err = reset_state(&config_path, "ghost").unwrap_err();
705 let msg = format!("{err:#}");
706 assert!(
707 msg.contains("export 'ghost' not found"),
708 "must name the missing export: {msg}"
709 );
710 assert!(
711 msg.contains("orders") && msg.contains("transactions"),
712 "must list the declared exports so the user can spot the typo: {msg}"
713 );
714 assert!(
715 msg.contains("rivet state show"),
716 "must point at a follow-up command: {msg}"
717 );
718 }
719
720 #[test]
723 fn reset_chunk_checkpoint_on_empty_db_returns_ok() {
724 let (dir, config_path) = setup_dir();
725 let _ = open_state(&dir);
726 assert!(reset_chunk_checkpoint(&config_path, "orders").is_ok());
727 }
728
729 #[test]
730 fn reset_chunk_checkpoints_stuck_no_rows_returns_ok() {
731 let (dir, config_path) = setup_dir();
732 write_two_export_config(&config_path);
733 let _ = open_state(&dir);
734 assert!(reset_chunk_checkpoints_stuck(&config_path).is_ok());
735 }
736
737 #[test]
738 fn reset_chunk_checkpoints_stuck_clears_matching_exports_only() {
739 let (dir, config_path) = setup_dir();
740 write_two_export_config(&config_path);
741 let state = open_state(&dir);
742 state
743 .create_chunk_run("r_tx", "transactions", "plan", 3)
744 .unwrap();
745 state.create_chunk_run("r_g", "ghost", "plan", 3).unwrap();
746 drop(state);
747
748 reset_chunk_checkpoints_stuck(&config_path).unwrap();
749
750 let state = StateStore::open(&config_path).unwrap();
751 assert!(
752 state
753 .find_in_progress_chunk_run("transactions")
754 .unwrap()
755 .is_none()
756 );
757 assert!(state.find_in_progress_chunk_run("ghost").unwrap().is_some());
758 assert_eq!(
759 state.reset_chunk_checkpoint("ghost").unwrap(),
760 1,
761 "cleanup ghost row"
762 );
763 }
764
765 #[test]
766 fn reset_chunk_checkpoints_stuck_skips_when_only_unknown_exports_stuck() {
767 let (dir, config_path) = setup_dir();
768 write_single_export_config(&config_path);
769 let state = open_state(&dir);
770 state.create_chunk_run("r_g", "ghost", "plan", 3).unwrap();
771 drop(state);
772
773 reset_chunk_checkpoints_stuck(&config_path).unwrap();
774
775 let state = StateStore::open(&config_path).unwrap();
776 assert!(state.find_in_progress_chunk_run("ghost").unwrap().is_some());
777 assert_eq!(state.reset_chunk_checkpoint("ghost").unwrap(), 1);
778 }
779
780 #[test]
783 fn show_progression_empty_returns_ok() {
784 let (dir, config_path) = setup_dir();
785 let _ = open_state(&dir);
786 assert!(show_progression(&config_path, None).is_ok());
787 }
788
789 #[test]
790 fn show_progression_with_incremental_boundary_returns_ok() {
791 let (dir, config_path) = setup_dir();
792 let state = open_state(&dir);
793 state
794 .record_committed_incremental("orders", "2025-06-01", "run_001")
795 .unwrap();
796 drop(state);
797 assert!(show_progression(&config_path, Some("orders")).is_ok());
798 }
799
800 #[test]
803 fn show_chunk_checkpoint_no_data_returns_ok() {
804 let (dir, config_path) = setup_dir();
805 let _ = open_state(&dir);
806 assert!(show_chunk_checkpoint(&config_path, "orders").is_ok());
807 }
808}