1use super::format_bytes;
7use crate::config::Config;
8use crate::error::Result;
9use crate::journal::RunEvent;
10use crate::state::StateStore;
11
12pub fn show_state(config_path: &str) -> Result<()> {
13 let state = StateStore::open(config_path)?;
14 let states = state.list_all()?;
15 if states.is_empty() {
16 let any_run = state
25 .get_metrics(None, 1)
26 .map(|m| !m.is_empty())
27 .unwrap_or(false);
28 if any_run {
29 println!(
30 "No incremental cursor recorded yet.\n \
31 This command shows incremental-mode cursors only.\n \
32 For chunked / full runs, see:\n \
33 • rivet metrics — per-run history (status, rows, duration)\n \
34 • rivet state files — every produced file with row count + size"
35 );
36 } else {
37 println!(
38 "No exports have been run yet.\n \
39 Run `rivet run --config {}` first, then try `rivet state` again.",
40 config_path
41 );
42 }
43 return Ok(());
44 }
45 println!("{:<30} {:<40} LAST RUN", "EXPORT", "LAST CURSOR");
46 println!("{}", "-".repeat(90));
47 for s in &states {
48 println!(
49 "{:<30} {:<40} {}",
50 s.export_name,
51 s.last_cursor_value.as_deref().unwrap_or("-"),
52 s.last_run_at.as_deref().unwrap_or("-"),
53 );
54 }
55 Ok(())
56}
57
58pub fn show_progression(config_path: &str, export_name: Option<&str>) -> Result<()> {
60 let state = StateStore::open(config_path)?;
61 let entries = match export_name {
62 Some(name) => vec![state.get_progression(name)?],
63 None => state.list_progression()?,
64 };
65 let has_any = entries
66 .iter()
67 .any(|p| p.committed.is_some() || p.verified.is_some());
68 if !has_any {
69 println!("No progression boundaries recorded yet.");
70 return Ok(());
71 }
72
73 println!(
74 "{:<30} {:<12} {:<30} {:<25} {:<12} {:<30}",
75 "EXPORT", "COMM MODE", "COMMITTED", "COMMITTED AT", "VERI MODE", "VERIFIED"
76 );
77 println!("{}", "-".repeat(145));
78 for p in &entries {
79 let (c_mode, c_val, c_at) = match &p.committed {
80 Some(b) => (
81 b.strategy.as_str().to_string(),
82 boundary_value(b),
83 b.at.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
84 ),
85 None => ("-".into(), "-".into(), "-".into()),
86 };
87 let (v_mode, v_val) = match &p.verified {
88 Some(b) => (b.strategy.as_str().to_string(), boundary_value(b)),
89 None => ("-".into(), "-".into()),
90 };
91 println!(
92 "{:<30} {:<12} {:<30} {:<25} {:<12} {:<30}",
93 p.export_name, c_mode, c_val, c_at, v_mode, v_val
94 );
95 }
96 Ok(())
97}
98
99fn boundary_value(b: &crate::state::Boundary) -> String {
100 if let Some(c) = &b.cursor {
101 c.clone()
102 } else if let Some(idx) = b.chunk_index {
103 format!("chunk #{idx}")
104 } else {
105 "-".into()
106 }
107}
108
109pub fn reset_state(config_path: &str, export_name: &str) -> Result<()> {
110 let config = crate::config::Config::load(config_path)?;
117 if !config.exports.iter().any(|e| e.name == export_name) {
118 let known: Vec<String> = config.exports.iter().map(|e| e.name.clone()).collect();
119 anyhow::bail!(
120 "export '{}' not found in config '{}'.\n Known exports: {}\n Hint: check the spelling, or run `rivet state show -c {}` to see what is currently tracked.",
121 export_name,
122 config_path,
123 if known.is_empty() {
124 "(none defined)".to_string()
125 } else {
126 known.join(", ")
127 },
128 config_path,
129 );
130 }
131 let state = StateStore::open(config_path)?;
132 state.reset(export_name)?;
133 println!("State reset for export '{}'", export_name);
134 Ok(())
135}
136
137pub fn show_files(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
138 let state = StateStore::open(config_path)?;
139 let files = state.get_files(export_name, limit)?;
140 if files.is_empty() {
141 println!("No files recorded yet.");
142 return Ok(());
143 }
144 println!(
145 "{:<35} {:<40} {:>8} {:>10} CREATED",
146 "RUN ID", "FILE", "ROWS", "BYTES"
147 );
148 println!("{}", "-".repeat(110));
149 for f in &files {
150 println!(
151 "{:<35} {:<40} {:>8} {:>10} {}",
152 f.run_id,
153 f.file_name,
154 f.row_count,
155 format_bytes(f.bytes as u64),
156 f.created_at,
157 );
158 }
159 Ok(())
160}
161
162pub fn show_metrics(config_path: &str, export_name: Option<&str>, limit: usize) -> Result<()> {
163 let state = StateStore::open(config_path)?;
164 let metrics = state.get_metrics(export_name, limit)?;
165 if metrics.is_empty() {
166 println!("No metrics recorded yet.");
167 return Ok(());
168 }
169 println!(
170 "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} RUN ID",
171 "EXPORT", "STATUS", "ROWS", "DURATION", "RSS", "FILES", "BYTES"
172 );
173 println!("{}", "-".repeat(110));
174 for m in &metrics {
175 let duration = if m.duration_ms >= 1000 {
176 format!("{:.1}s", m.duration_ms as f64 / 1000.0)
177 } else {
178 format!("{}ms", m.duration_ms)
179 };
180 let rss = m
181 .peak_rss_mb
182 .map(|r| format!("{}MB", r))
183 .unwrap_or_else(|| "-".into());
184 let bytes = if m.bytes_written > 0 {
185 format_bytes(m.bytes_written as u64)
186 } else {
187 "-".into()
188 };
189 let run_id = m.run_id.as_deref().unwrap_or(&m.run_at);
190 println!(
191 "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} {}",
192 m.export_name, m.status, m.total_rows, duration, rss, m.files_produced, bytes, run_id
193 );
194 if let Some(err) = &m.error_message {
195 println!(" Error: {}", err);
196 }
197 let mut flags = Vec::new();
198 if m.retries > 0 {
199 flags.push(format!("retries={}", m.retries));
200 }
201 if let Some(v) = m.validated {
202 flags.push(format!("validated={}", if v { "pass" } else { "FAIL" }));
203 }
204 if let Some(sc) = m.schema_changed {
205 flags.push(format!("schema={}", if sc { "CHANGED" } else { "ok" }));
206 }
207 if !flags.is_empty() {
208 println!(" {}", flags.join(" "));
209 }
210 }
211 Ok(())
212}
213
214pub fn reset_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
215 let state = StateStore::open(config_path)?;
216 let n = state.reset_chunk_checkpoint(export_name)?;
217 println!(
218 "Removed {} chunk run record(s) for export '{}'.",
219 n, export_name
220 );
221 Ok(())
222}
223
224pub fn reset_chunk_checkpoints_stuck(config_path: &str) -> Result<()> {
229 let cfg = Config::load(config_path)?;
230 let allowed: std::collections::HashSet<&str> =
231 cfg.exports.iter().map(|e| e.name.as_str()).collect();
232 let state = StateStore::open(config_path)?;
233 let stuck = state.list_export_names_with_in_progress_chunk_runs()?;
234 if stuck.is_empty() {
235 println!("No exports have an in-progress chunk checkpoint run.");
236 println!(
237 "(Nothing with chunk_run.status = 'in_progress' in {}.)",
238 StateStore::state_db_path(config_path).display()
239 );
240 return Ok(());
241 }
242
243 let mut skipped_not_in_config = Vec::new();
244 let mut targets = Vec::new();
245 for name in stuck {
246 if allowed.contains(name.as_str()) {
247 targets.push(name);
248 } else {
249 skipped_not_in_config.push(name);
250 }
251 }
252
253 for name in &skipped_not_in_config {
254 println!(
255 "Skipping '{}' — chunk checkpoint still in_progress but this export is not in the config.",
256 name
257 );
258 }
259
260 if targets.is_empty() {
261 println!(
262 "No matching exports to reset (none of the in-progress runs belong to exports in this config)."
263 );
264 return Ok(());
265 }
266
267 println!(
268 "Resetting chunk checkpoints for {} export(s) with in_progress runs: {}",
269 targets.len(),
270 targets.join(", ")
271 );
272
273 for name in targets {
274 let n = state.reset_chunk_checkpoint(&name)?;
275 println!("Removed {} chunk run record(s) for export '{}'.", n, name);
276 }
277 Ok(())
278}
279
280pub fn show_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
281 let state = StateStore::open(config_path)?;
282 println!(
283 "database: {}",
284 StateStore::state_db_path(config_path).display()
285 );
286 let Some((run_id, plan_hash, status, updated_at)) = state.get_latest_chunk_run(export_name)?
287 else {
288 println!("No chunk checkpoint data for export '{}'.", export_name);
289 return Ok(());
290 };
291 println!("export: {}", export_name);
292 println!("run_id: {}", run_id);
293 println!("plan_hash: {}", plan_hash);
294 println!("status: {}", status);
295 println!("updated_at: {}", updated_at);
296 println!();
297 println!(
298 "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} FILE",
299 "IDX", "STATUS", "START", "END", "ATT", "ROWS"
300 );
301 println!("{}", "-".repeat(90));
302 for t in state.list_chunk_tasks_for_run(&run_id)? {
303 let file = t.file_name.as_deref().unwrap_or("-");
304 let rows = t
305 .rows_written
306 .map(|r| r.to_string())
307 .unwrap_or_else(|| "-".into());
308 println!(
309 "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} {}",
310 t.chunk_index, t.status, t.start_key, t.end_key, t.attempts, rows, file
311 );
312 if let Some(e) = &t.last_error {
313 println!(" error: {}", e);
314 }
315 }
316 Ok(())
317}
318
319pub fn show_journal(
324 config_path: &str,
325 export_name: &str,
326 limit: usize,
327 run_id: Option<&str>,
328) -> Result<()> {
329 let state = StateStore::open(config_path)?;
330
331 let journals = if let Some(rid) = run_id {
332 match state.load_journal(rid)? {
333 Some(j) => vec![j],
334 None => {
335 println!("No journal found for run_id '{rid}'.");
336 return Ok(());
337 }
338 }
339 } else {
340 state.recent_journals(export_name, limit)?
341 };
342
343 if journals.is_empty() {
344 println!("No journal entries for export '{export_name}' yet.");
345 println!("Journals are recorded after each `rivet run`.");
346 return Ok(());
347 }
348
349 for journal in &journals {
350 let outcome = journal.final_outcome().and_then(|e| {
352 if let RunEvent::RunCompleted {
353 status,
354 duration_ms,
355 ..
356 } = &e.event
357 {
358 Some((status.as_str(), *duration_ms))
359 } else {
360 None
361 }
362 });
363 let (status_str, duration_str) = match outcome {
364 Some((s, ms)) if ms >= 1000 => (s, format!("{:.1}s", ms as f64 / 1000.0)),
365 Some((s, ms)) => (s, format!("{ms}ms")),
366 None => ("(incomplete)", String::new()),
367 };
368 let icon = match status_str {
369 "success" => "✓",
370 "failed" => "✗",
371 _ => "•",
372 };
373 println!(
374 "\n{icon} {export} {status} {dur}",
375 export = journal.export_name,
376 status = status_str,
377 dur = duration_str,
378 );
379 println!(" run_id: {}", journal.run_id);
380
381 let files = journal.files();
383 if !files.is_empty() {
384 let total_rows: i64 = files
385 .iter()
386 .filter_map(|e| {
387 if let RunEvent::FileWritten { rows, .. } = &e.event {
388 Some(*rows)
389 } else {
390 None
391 }
392 })
393 .sum();
394 let total_bytes: u64 = files
395 .iter()
396 .filter_map(|e| {
397 if let RunEvent::FileWritten { bytes, .. } = &e.event {
398 Some(*bytes)
399 } else {
400 None
401 }
402 })
403 .sum();
404 println!(
405 " files: {} rows: {} size: {}",
406 files.len(),
407 total_rows,
408 format_bytes(total_bytes),
409 );
410 }
411
412 let retries = journal.retries();
413 if !retries.is_empty() {
414 println!(" retries: {}", retries.len());
415 }
416
417 for e in journal.quality_issues() {
418 if let RunEvent::QualityIssue { severity, message } = &e.event {
419 println!(" quality [{severity}]: {message}");
420 }
421 }
422
423 for e in journal.schema_changes() {
424 if let RunEvent::SchemaChanged {
425 added,
426 removed,
427 type_changed,
428 } = &e.event
429 {
430 if !added.is_empty() {
431 println!(" schema: +{}", added.join(", +"));
432 }
433 if !removed.is_empty() {
434 println!(" schema: -{}", removed.join(", -"));
435 }
436 for (col, old, new) in type_changed {
437 println!(" schema: {col} {old}→{new}");
438 }
439 }
440 }
441
442 if let Some(e) = journal.final_outcome()
443 && let RunEvent::RunCompleted {
444 error_message: Some(err),
445 ..
446 } = &e.event
447 {
448 let first_line = err.lines().next().unwrap_or(err);
449 println!(" error: {first_line}");
450 }
451 }
452 println!();
453 Ok(())
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459 use crate::journal::{RunEvent, RunJournal};
460
461 fn setup_dir() -> (tempfile::TempDir, String) {
464 let dir = tempfile::TempDir::new().unwrap();
465 let config_path = dir.path().join("rivet.yaml").to_str().unwrap().to_string();
467 (dir, config_path)
468 }
469
470 fn write_two_export_config(config_path: &str) {
471 std::fs::write(
472 config_path,
473 br#"source:
474 type: postgres
475 url: postgresql://localhost/testdb
476exports:
477 - name: transactions
478 query: "SELECT 1"
479 mode: full
480 format: parquet
481 destination:
482 type: local
483 path: ./out
484 - name: orders
485 query: "SELECT 1"
486 mode: full
487 format: parquet
488 destination:
489 type: local
490 path: ./out
491"#,
492 )
493 .unwrap();
494 }
495
496 fn write_single_export_config(config_path: &str) {
497 std::fs::write(
498 config_path,
499 br#"source:
500 type: postgres
501 url: postgresql://localhost/testdb
502exports:
503 - name: transactions
504 query: "SELECT 1"
505 mode: full
506 format: parquet
507 destination:
508 type: local
509 path: ./out
510"#,
511 )
512 .unwrap();
513 }
514
515 fn open_state(dir: &tempfile::TempDir) -> StateStore {
516 let db_path = dir.path().join(".rivet_state.db");
517 StateStore::open_at_path(&db_path).unwrap()
518 }
519
520 fn make_journal(run_id: &str, export: &str) -> RunJournal {
521 let mut j = RunJournal::new(run_id, export);
522 j.record(RunEvent::FileWritten {
523 file_name: "part0.parquet".into(),
524 rows: 1_000,
525 bytes: 65_536,
526 part_index: 0,
527 });
528 j.record(RunEvent::RunCompleted {
529 status: "success".into(),
530 error_message: None,
531 duration_ms: 1_500,
532 });
533 j
534 }
535
536 fn make_boundary(cursor: Option<&str>, chunk_index: Option<i64>) -> crate::state::Boundary {
539 crate::state::Boundary {
540 strategy: "incremental".into(),
541 run_id: None,
542 cursor: cursor.map(|s| s.to_string()),
543 chunk_index,
544 at: chrono::Utc::now(),
545 }
546 }
547
548 #[test]
549 fn boundary_value_cursor_takes_precedence_over_chunk_index() {
550 let b = make_boundary(Some("2025-01-15"), Some(42));
551 assert_eq!(boundary_value(&b), "2025-01-15");
552 }
553
554 #[test]
555 fn boundary_value_chunk_index_used_when_no_cursor() {
556 let b = make_boundary(None, Some(7));
557 assert_eq!(boundary_value(&b), "chunk #7");
558 }
559
560 #[test]
561 fn boundary_value_dash_when_neither_set() {
562 let b = make_boundary(None, None);
563 assert_eq!(boundary_value(&b), "-");
564 }
565
566 #[test]
569 fn show_state_empty_db_returns_ok() {
570 let (dir, config_path) = setup_dir();
571 let _ = open_state(&dir); assert!(show_state(&config_path).is_ok());
573 }
574
575 #[test]
576 fn show_state_with_cursor_record_returns_ok() {
577 let (dir, config_path) = setup_dir();
578 let state = open_state(&dir);
579 state.update("orders", "2025-01-15").unwrap();
580 drop(state);
581 assert!(show_state(&config_path).is_ok());
582 }
583
584 #[test]
587 fn show_files_empty_returns_ok() {
588 let (dir, config_path) = setup_dir();
589 let _ = open_state(&dir);
590 assert!(show_files(&config_path, None, 10).is_ok());
591 }
592
593 #[test]
594 fn show_files_with_record_returns_ok() {
595 let (dir, config_path) = setup_dir();
596 let state = open_state(&dir);
597 state
598 .record_file(
599 "r1",
600 "orders",
601 "orders_001.parquet",
602 50_000,
603 4096,
604 "parquet",
605 Some("zstd"),
606 )
607 .unwrap();
608 drop(state);
609 assert!(show_files(&config_path, Some("orders"), 10).is_ok());
610 }
611
612 #[test]
615 fn show_metrics_empty_returns_ok() {
616 let (dir, config_path) = setup_dir();
617 let _ = open_state(&dir);
618 assert!(show_metrics(&config_path, None, 10).is_ok());
619 }
620
621 #[test]
622 fn show_metrics_exercises_flag_and_duration_paths() {
623 let (dir, config_path) = setup_dir();
626 let state = open_state(&dir);
627 state
629 .record_metric(
630 "orders",
631 "r1",
632 1_500,
633 50_000,
634 Some(42),
635 "success",
636 None,
637 Some("balanced"),
638 Some("parquet"),
639 Some("full"),
640 1,
641 4096,
642 3,
643 Some(true),
644 Some(true),
645 )
646 .unwrap();
647 state
649 .record_metric(
650 "orders",
651 "r2",
652 800,
653 0,
654 None,
655 "failed",
656 Some("timeout"),
657 None,
658 None,
659 None,
660 0,
661 0,
662 0,
663 Some(false),
664 None,
665 )
666 .unwrap();
667 drop(state);
668 assert!(show_metrics(&config_path, Some("orders"), 10).is_ok());
669 }
670
671 #[test]
674 fn show_journal_empty_returns_ok() {
675 let (dir, config_path) = setup_dir();
676 let _ = open_state(&dir);
677 assert!(show_journal(&config_path, "orders", 5, None).is_ok());
678 }
679
680 #[test]
681 fn show_journal_with_entry_returns_ok() {
682 let (dir, config_path) = setup_dir();
683 let state = open_state(&dir);
684 state
685 .store_journal(&make_journal("run_001", "orders"))
686 .unwrap();
687 drop(state);
688 assert!(show_journal(&config_path, "orders", 5, None).is_ok());
689 }
690
691 #[test]
692 fn show_journal_by_run_id_not_found_returns_ok() {
693 let (dir, config_path) = setup_dir();
694 let _ = open_state(&dir);
695 assert!(show_journal(&config_path, "orders", 5, Some("no_such_run")).is_ok());
696 }
697
698 #[test]
699 fn show_journal_by_run_id_found_returns_ok() {
700 let (dir, config_path) = setup_dir();
701 let state = open_state(&dir);
702 state
703 .store_journal(&make_journal("run_xyz", "orders"))
704 .unwrap();
705 drop(state);
706 assert!(show_journal(&config_path, "orders", 5, Some("run_xyz")).is_ok());
707 }
708
709 #[test]
712 fn reset_state_returns_ok() {
713 let (dir, config_path) = setup_dir();
714 write_two_export_config(&config_path);
715 let state = open_state(&dir);
716 state.update("orders", "100").unwrap();
717 drop(state);
718 assert!(reset_state(&config_path, "orders").is_ok());
719 }
720
721 #[test]
727 fn reset_state_unknown_export_bails_with_hint() {
728 let (_dir, config_path) = setup_dir();
729 write_two_export_config(&config_path);
730 let err = reset_state(&config_path, "ghost").unwrap_err();
731 let msg = format!("{err:#}");
732 assert!(
733 msg.contains("export 'ghost' not found"),
734 "must name the missing export: {msg}"
735 );
736 assert!(
737 msg.contains("orders") && msg.contains("transactions"),
738 "must list the declared exports so the user can spot the typo: {msg}"
739 );
740 assert!(
741 msg.contains("rivet state show"),
742 "must point at a follow-up command: {msg}"
743 );
744 }
745
746 #[test]
749 fn reset_chunk_checkpoint_on_empty_db_returns_ok() {
750 let (dir, config_path) = setup_dir();
751 let _ = open_state(&dir);
752 assert!(reset_chunk_checkpoint(&config_path, "orders").is_ok());
753 }
754
755 #[test]
756 fn reset_chunk_checkpoints_stuck_no_rows_returns_ok() {
757 let (dir, config_path) = setup_dir();
758 write_two_export_config(&config_path);
759 let _ = open_state(&dir);
760 assert!(reset_chunk_checkpoints_stuck(&config_path).is_ok());
761 }
762
763 #[test]
764 fn reset_chunk_checkpoints_stuck_clears_matching_exports_only() {
765 let (dir, config_path) = setup_dir();
766 write_two_export_config(&config_path);
767 let state = open_state(&dir);
768 state
769 .create_chunk_run("r_tx", "transactions", "plan", 3)
770 .unwrap();
771 state.create_chunk_run("r_g", "ghost", "plan", 3).unwrap();
772 drop(state);
773
774 reset_chunk_checkpoints_stuck(&config_path).unwrap();
775
776 let state = StateStore::open(&config_path).unwrap();
777 assert!(
778 state
779 .find_in_progress_chunk_run("transactions")
780 .unwrap()
781 .is_none()
782 );
783 assert!(state.find_in_progress_chunk_run("ghost").unwrap().is_some());
784 assert_eq!(
785 state.reset_chunk_checkpoint("ghost").unwrap(),
786 1,
787 "cleanup ghost row"
788 );
789 }
790
791 #[test]
792 fn reset_chunk_checkpoints_stuck_skips_when_only_unknown_exports_stuck() {
793 let (dir, config_path) = setup_dir();
794 write_single_export_config(&config_path);
795 let state = open_state(&dir);
796 state.create_chunk_run("r_g", "ghost", "plan", 3).unwrap();
797 drop(state);
798
799 reset_chunk_checkpoints_stuck(&config_path).unwrap();
800
801 let state = StateStore::open(&config_path).unwrap();
802 assert!(state.find_in_progress_chunk_run("ghost").unwrap().is_some());
803 assert_eq!(state.reset_chunk_checkpoint("ghost").unwrap(), 1);
804 }
805
806 #[test]
809 fn show_progression_empty_returns_ok() {
810 let (dir, config_path) = setup_dir();
811 let _ = open_state(&dir);
812 assert!(show_progression(&config_path, None).is_ok());
813 }
814
815 #[test]
816 fn show_progression_with_incremental_boundary_returns_ok() {
817 let (dir, config_path) = setup_dir();
818 let state = open_state(&dir);
819 state
820 .record_committed_incremental("orders", "2025-06-01", "run_001")
821 .unwrap();
822 drop(state);
823 assert!(show_progression(&config_path, Some("orders")).is_ok());
824 }
825
826 #[test]
829 fn show_chunk_checkpoint_no_data_returns_ok() {
830 let (dir, config_path) = setup_dir();
831 let _ = open_state(&dir);
832 assert!(show_chunk_checkpoint(&config_path, "orders").is_ok());
833 }
834}