1use super::format_bytes;
7use crate::config::Config;
8use crate::error::Result;
9use crate::journal::RunEvent;
10use crate::state::StateStore;
11
12fn require_config(config_path: &str) -> Result<Config> {
24 Config::load(config_path)
25}
26
27fn require_known_export(config: &Config, config_path: &str, export_name: &str) -> Result<()> {
34 if config.exports.iter().any(|e| e.name == export_name) {
35 return Ok(());
36 }
37 let known: Vec<&str> = config.exports.iter().map(|e| e.name.as_str()).collect();
38 anyhow::bail!(
39 "export '{}' is not defined in '{}'.\n Known exports: {}",
40 export_name,
41 config_path,
42 if known.is_empty() {
43 "(none defined)".to_string()
44 } else {
45 known.join(", ")
46 },
47 );
48}
49
50pub fn show_state(config_path: &str, json: bool) -> Result<()> {
51 require_config(config_path)?;
52 let state = StateStore::open(config_path)?;
53 let states = state.list_all()?;
54 if json {
55 println!("{}", serde_json::to_string_pretty(&states)?);
58 return Ok(());
59 }
60 if states.is_empty() {
61 let any_run = state
70 .get_metrics(None, 1)
71 .map(|m| !m.is_empty())
72 .unwrap_or(false);
73 if any_run {
74 println!(
75 "No incremental cursor recorded yet.\n \
76 This command shows incremental-mode cursors only.\n \
77 For chunked / full runs, see:\n \
78 • rivet metrics — per-run history (status, rows, duration)\n \
79 • rivet state files — every produced file with row count + size"
80 );
81 } else {
82 println!(
83 "No exports have been run yet.\n \
84 Run `rivet run --config {}` first, then try `rivet state show` again.",
85 config_path
86 );
87 }
88 return Ok(());
89 }
90 println!("{:<30} {:<40} LAST RUN", "EXPORT", "LAST CURSOR");
91 println!("{}", "-".repeat(90));
92 for s in &states {
93 println!(
94 "{:<30} {:<40} {}",
95 s.export_name,
96 s.last_cursor_value.as_deref().unwrap_or("-"),
97 s.last_run_at.as_deref().unwrap_or("-"),
98 );
99 }
100 Ok(())
101}
102
103pub fn show_progression(config_path: &str, export_name: Option<&str>) -> Result<()> {
105 require_config(config_path)?;
106 let state = StateStore::open(config_path)?;
107 let entries = match export_name {
108 Some(name) => vec![state.get_progression(name)?],
109 None => state.list_progression()?,
110 };
111 let has_any = entries
112 .iter()
113 .any(|p| p.committed.is_some() || p.verified.is_some());
114 if !has_any {
115 println!("No progression boundaries recorded yet.");
116 return Ok(());
117 }
118
119 println!(
120 "{:<30} {:<12} {:<30} {:<25} {:<12} {:<30}",
121 "EXPORT", "COMM MODE", "COMMITTED", "COMMITTED AT", "VERI MODE", "VERIFIED"
122 );
123 println!("{}", "-".repeat(145));
124 for p in &entries {
125 let (c_mode, c_val, c_at) = match &p.committed {
126 Some(b) => (
127 b.strategy.as_str().to_string(),
128 boundary_value(b),
129 b.at.format("%Y-%m-%d %H:%M:%S UTC").to_string(),
130 ),
131 None => ("-".into(), "-".into(), "-".into()),
132 };
133 let (v_mode, v_val) = match &p.verified {
134 Some(b) => (b.strategy.as_str().to_string(), boundary_value(b)),
135 None => ("-".into(), "-".into()),
136 };
137 println!(
138 "{:<30} {:<12} {:<30} {:<25} {:<12} {:<30}",
139 p.export_name, c_mode, c_val, c_at, v_mode, v_val
140 );
141 }
142 Ok(())
143}
144
145fn boundary_value(b: &crate::state::Boundary) -> String {
146 if let Some(c) = &b.cursor {
147 c.clone()
148 } else if let Some(idx) = b.chunk_index {
149 format!("chunk #{idx}")
150 } else {
151 "-".into()
152 }
153}
154
155pub fn reset_state(config_path: &str, export_name: &str) -> Result<()> {
156 let config = crate::config::Config::load(config_path)?;
163 if !config.exports.iter().any(|e| e.name == export_name) {
164 let known: Vec<String> = config.exports.iter().map(|e| e.name.clone()).collect();
165 anyhow::bail!(
166 "export '{}' not found in config '{}'.\n Known exports: {}\n Hint: check the spelling, or run `rivet state show -c {}` to see what is currently tracked.",
167 export_name,
168 config_path,
169 if known.is_empty() {
170 "(none defined)".to_string()
171 } else {
172 known.join(", ")
173 },
174 config_path,
175 );
176 }
177 let state = StateStore::open(config_path)?;
178 state.reset(export_name)?;
179 println!("State reset for export '{}'", export_name);
180 Ok(())
181}
182
183pub fn show_files(
184 config_path: &str,
185 export_name: Option<&str>,
186 limit: usize,
187 json: bool,
188) -> Result<()> {
189 require_config(config_path)?;
190 let state = StateStore::open(config_path)?;
191 let files = state.get_files(export_name, limit)?;
192 if json {
193 println!("{}", serde_json::to_string_pretty(&files)?);
196 return Ok(());
197 }
198 if files.is_empty() {
199 println!("No files recorded yet.");
200 return Ok(());
201 }
202 println!(
206 "{:<40} {:<40} {:>8} {:>10} CREATED",
207 "RUN ID", "FILE", "ROWS", "BYTES"
208 );
209 println!("{}", "-".repeat(115));
210 for f in &files {
211 println!(
212 "{:<40} {:<40} {:>8} {:>10} {}",
213 f.run_id,
214 f.file_name,
215 f.row_count,
216 format_bytes(f.bytes as u64),
217 f.created_at,
218 );
219 }
220 Ok(())
221}
222
223pub fn show_metrics(
224 config_path: &str,
225 export_name: Option<&str>,
226 limit: usize,
227 json: bool,
228) -> Result<()> {
229 let config = require_config(config_path)?;
230 if let Some(name) = export_name {
235 require_known_export(&config, config_path, name)?;
236 }
237 let state = StateStore::open(config_path)?;
238 let metrics = state.get_metrics(export_name, limit)?;
239 if json {
240 let rows: Vec<super::aggregate::MetricRowJson> = metrics
244 .iter()
245 .map(super::aggregate::MetricRowJson::from)
246 .collect();
247 println!("{}", serde_json::to_string_pretty(&rows)?);
248 return Ok(());
249 }
250 if metrics.is_empty() {
251 println!("No metrics recorded yet.");
252 return Ok(());
253 }
254 println!(
255 "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} RUN ID",
256 "EXPORT", "STATUS", "ROWS", "DURATION", "RSS", "FILES", "BYTES"
257 );
258 println!("{}", "-".repeat(110));
259 for m in &metrics {
260 let duration = if m.duration_ms >= 1000 {
261 format!("{:.1}s", m.duration_ms as f64 / 1000.0)
262 } else {
263 format!("{}ms", m.duration_ms)
264 };
265 let rss = m
266 .peak_rss_mb
267 .map(|r| format!("{}MB", r))
268 .unwrap_or_else(|| "-".into());
269 let bytes = if m.bytes_written > 0 {
270 format_bytes(m.bytes_written as u64)
271 } else {
272 "-".into()
273 };
274 let run_id = m.run_id.as_deref().unwrap_or(&m.run_at);
275 println!(
276 "{:<20} {:<10} {:>10} {:>10} {:>8} {:>6} {:>10} {}",
277 m.export_name, m.status, m.total_rows, duration, rss, m.files_produced, bytes, run_id
278 );
279 if let Some(err) = &m.error_message {
280 println!(" Error: {}", err);
281 }
282 let mut flags = Vec::new();
283 if m.retries > 0 {
284 flags.push(format!("retries={}", m.retries));
285 }
286 if let Some(v) = m.validated {
287 flags.push(format!("validated={}", if v { "pass" } else { "FAIL" }));
288 }
289 if let Some(sc) = m.schema_changed {
290 flags.push(format!("schema={}", if sc { "CHANGED" } else { "ok" }));
291 }
292 if !flags.is_empty() {
293 println!(" {}", flags.join(" "));
294 }
295 }
296 Ok(())
297}
298
299pub fn reset_chunk_checkpoint(config_path: &str, export_name: &str) -> Result<()> {
300 let config = require_config(config_path)?;
305 if !config.exports.iter().any(|e| e.name == export_name) {
306 let known: Vec<String> = config.exports.iter().map(|e| e.name.clone()).collect();
307 anyhow::bail!(
308 "export '{}' not found in config '{}'.\n Known exports: {}\n Hint: check the spelling, or run `rivet state chunks -c {} -e <name>` to inspect a checkpoint.",
309 export_name,
310 config_path,
311 if known.is_empty() {
312 "(none defined)".to_string()
313 } else {
314 known.join(", ")
315 },
316 config_path,
317 );
318 }
319 let state = StateStore::open(config_path)?;
320 let n = state.reset_chunk_checkpoint(export_name)?;
321 state.delete_progression(export_name)?;
325 println!(
326 "Removed {} chunk run record(s) for export '{}'.",
327 n, export_name
328 );
329 Ok(())
330}
331
332pub fn reset_chunk_checkpoints_stuck(config_path: &str) -> Result<()> {
337 let cfg = Config::load(config_path)?;
338 let allowed: std::collections::HashSet<&str> =
339 cfg.exports.iter().map(|e| e.name.as_str()).collect();
340 let state = StateStore::open(config_path)?;
341 let stuck = state.list_export_names_with_in_progress_chunk_runs()?;
342 if stuck.is_empty() {
343 println!("No exports have an in-progress chunk checkpoint run.");
344 println!(
345 "(Nothing with chunk_run.status = 'in_progress' in {}.)",
346 StateStore::state_db_path(config_path).display()
347 );
348 return Ok(());
349 }
350
351 let mut skipped_not_in_config = Vec::new();
352 let mut targets = Vec::new();
353 for name in stuck {
354 if allowed.contains(name.as_str()) {
355 targets.push(name);
356 } else {
357 skipped_not_in_config.push(name);
358 }
359 }
360
361 for name in &skipped_not_in_config {
362 println!(
363 "Skipping '{}' — chunk checkpoint still in_progress but this export is not in the config.",
364 name
365 );
366 }
367
368 if targets.is_empty() {
369 println!(
370 "No matching exports to reset (none of the in-progress runs belong to exports in this config)."
371 );
372 return Ok(());
373 }
374
375 println!(
376 "Resetting chunk checkpoints for {} export(s) with in_progress runs: {}",
377 targets.len(),
378 targets.join(", ")
379 );
380
381 for name in targets {
382 let n = state.reset_chunk_checkpoint(&name)?;
383 println!("Removed {} chunk run record(s) for export '{}'.", n, name);
384 }
385 Ok(())
386}
387
388pub fn show_chunk_checkpoint(config_path: &str, export_name: &str, json: bool) -> Result<()> {
389 require_config(config_path)?;
390 let state = StateStore::open(config_path)?;
391 if json {
392 let report = match state.get_latest_chunk_run(export_name)? {
395 None => serde_json::Value::Null,
396 Some((run_id, plan_hash, status, updated_at)) => {
397 let tasks: Vec<serde_json::Value> = state
398 .list_chunk_tasks_for_run(&run_id)?
399 .iter()
400 .map(|t| {
401 serde_json::json!({
402 "chunk_index": t.chunk_index,
403 "status": t.status,
404 "start_key": t.start_key,
405 "end_key": t.end_key,
406 "attempts": t.attempts,
407 "rows_written": t.rows_written,
408 "file_name": t.file_name,
409 "last_error": t.last_error,
410 })
411 })
412 .collect();
413 serde_json::json!({
414 "export": export_name,
415 "run_id": run_id,
416 "plan_hash": plan_hash,
417 "status": status,
418 "updated_at": updated_at,
419 "tasks": tasks,
420 })
421 }
422 };
423 println!("{}", serde_json::to_string_pretty(&report)?);
424 return Ok(());
425 }
426 println!(
427 "database: {}",
428 StateStore::state_db_path(config_path).display()
429 );
430 let Some((run_id, plan_hash, status, updated_at)) = state.get_latest_chunk_run(export_name)?
431 else {
432 println!("No chunk checkpoint data for export '{}'.", export_name);
433 return Ok(());
434 };
435 println!("export: {}", export_name);
436 println!("run_id: {}", run_id);
437 println!("plan_hash: {}", plan_hash);
438 println!("status: {}", status);
439 println!("updated_at: {}", updated_at);
440 println!();
441 println!(
442 "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} FILE",
443 "IDX", "STATUS", "START", "END", "ATT", "ROWS"
444 );
445 println!("{}", "-".repeat(90));
446 for t in state.list_chunk_tasks_for_run(&run_id)? {
447 let file = t.file_name.as_deref().unwrap_or("-");
448 let rows = t
449 .rows_written
450 .map(|r| r.to_string())
451 .unwrap_or_else(|| "-".into());
452 println!(
453 "{:<6} {:<12} {:<18} {:<18} {:>4} {:>8} {}",
454 t.chunk_index, t.status, t.start_key, t.end_key, t.attempts, rows, file
455 );
456 if let Some(e) = &t.last_error {
457 println!(" error: {}", e);
458 }
459 }
460 Ok(())
461}
462
463pub fn show_journal(
468 config_path: &str,
469 export_name: &str,
470 limit: usize,
471 run_id: Option<&str>,
472) -> Result<()> {
473 let config = require_config(config_path)?;
474 if run_id.is_none() {
480 require_known_export(&config, config_path, export_name)?;
481 }
482 let state = StateStore::open(config_path)?;
483
484 let journals = if let Some(rid) = run_id {
485 match state.load_journal(rid)? {
486 Some(j) => vec![j],
487 None => {
488 println!("No journal found for run_id '{rid}'.");
489 return Ok(());
490 }
491 }
492 } else {
493 state.recent_journals(export_name, limit)?
494 };
495
496 if journals.is_empty() {
497 println!("No journal entries for export '{export_name}' yet.");
498 println!("Journals are recorded after each `rivet run`.");
499 return Ok(());
500 }
501
502 for journal in &journals {
503 let outcome = journal.final_outcome().and_then(|e| {
505 if let RunEvent::RunCompleted {
506 status,
507 duration_ms,
508 ..
509 } = &e.event
510 {
511 Some((status.as_str(), *duration_ms))
512 } else {
513 None
514 }
515 });
516 let (status_str, duration_str) = match outcome {
517 Some((s, ms)) if ms >= 1000 => (s, format!("{:.1}s", ms as f64 / 1000.0)),
518 Some((s, ms)) => (s, format!("{ms}ms")),
519 None => ("(incomplete)", String::new()),
520 };
521 let icon = match status_str {
522 "success" => "✓",
523 "failed" => "✗",
524 _ => "•",
525 };
526 println!(
527 "\n{icon} {export} {status} {dur}",
528 export = journal.export_name,
529 status = status_str,
530 dur = duration_str,
531 );
532 println!(" run_id: {}", journal.run_id);
533
534 let files = journal.files();
536 if !files.is_empty() {
537 let total_rows: i64 = files
538 .iter()
539 .filter_map(|e| {
540 if let RunEvent::FileWritten { rows, .. } = &e.event {
541 Some(*rows)
542 } else {
543 None
544 }
545 })
546 .sum();
547 let total_bytes: u64 = files
548 .iter()
549 .filter_map(|e| {
550 if let RunEvent::FileWritten { bytes, .. } = &e.event {
551 Some(*bytes)
552 } else {
553 None
554 }
555 })
556 .sum();
557 println!(
558 " files: {} rows: {} size: {}",
559 files.len(),
560 total_rows,
561 format_bytes(total_bytes),
562 );
563 for e in &files {
568 if let RunEvent::FileWritten {
569 file_name,
570 rows,
571 bytes,
572 ..
573 } = &e.event
574 {
575 println!(
576 " - {} ({} rows, {})",
577 file_name,
578 rows,
579 format_bytes(*bytes),
580 );
581 }
582 }
583 }
584
585 let retries = journal.retries();
586 if !retries.is_empty() {
587 println!(" retries: {}", retries.len());
588 }
589
590 for e in journal.quality_issues() {
591 if let RunEvent::QualityIssue { severity, message } = &e.event {
592 println!(" quality [{severity}]: {message}");
593 }
594 }
595
596 for e in journal.schema_changes() {
597 if let RunEvent::SchemaChanged {
598 added,
599 removed,
600 type_changed,
601 } = &e.event
602 {
603 if !added.is_empty() {
604 println!(" schema: +{}", added.join(", +"));
605 }
606 if !removed.is_empty() {
607 println!(" schema: -{}", removed.join(", -"));
608 }
609 for (col, old, new) in type_changed {
610 println!(" schema: {col} {old}→{new}");
611 }
612 }
613 }
614
615 if let Some(e) = journal.final_outcome()
616 && let RunEvent::RunCompleted {
617 error_message: Some(err),
618 ..
619 } = &e.event
620 {
621 let first_line = err.lines().next().unwrap_or(err);
622 println!(" error: {first_line}");
623 }
624 }
625 println!();
626 Ok(())
627}
628
629#[cfg(test)]
630mod tests {
631 use super::*;
632 use crate::journal::{RunEvent, RunJournal};
633
634 fn setup_dir() -> (tempfile::TempDir, String) {
637 let dir = tempfile::TempDir::new().unwrap();
638 let config_path = dir.path().join("rivet.yaml").to_str().unwrap().to_string();
645 write_two_export_config(&config_path);
646 (dir, config_path)
647 }
648
649 fn write_two_export_config(config_path: &str) {
650 std::fs::write(
651 config_path,
652 br#"source:
653 type: postgres
654 url: postgresql://localhost/testdb
655exports:
656 - name: transactions
657 query: "SELECT 1"
658 mode: full
659 format: parquet
660 destination:
661 type: local
662 path: ./out
663 - name: orders
664 query: "SELECT 1"
665 mode: full
666 format: parquet
667 destination:
668 type: local
669 path: ./out
670"#,
671 )
672 .unwrap();
673 }
674
675 fn write_single_export_config(config_path: &str) {
676 std::fs::write(
677 config_path,
678 br#"source:
679 type: postgres
680 url: postgresql://localhost/testdb
681exports:
682 - name: transactions
683 query: "SELECT 1"
684 mode: full
685 format: parquet
686 destination:
687 type: local
688 path: ./out
689"#,
690 )
691 .unwrap();
692 }
693
694 fn open_state(dir: &tempfile::TempDir) -> StateStore {
695 let db_path = dir.path().join(".rivet_state.db");
696 StateStore::open_at_path(&db_path).unwrap()
697 }
698
699 fn make_journal(run_id: &str, export: &str) -> RunJournal {
700 let mut j = RunJournal::new(run_id, export);
701 j.record(RunEvent::FileWritten {
702 file_name: "part0.parquet".into(),
703 rows: 1_000,
704 bytes: 65_536,
705 part_index: 0,
706 });
707 j.record(RunEvent::RunCompleted {
708 status: "success".into(),
709 error_message: None,
710 duration_ms: 1_500,
711 });
712 j
713 }
714
715 fn make_boundary(cursor: Option<&str>, chunk_index: Option<i64>) -> crate::state::Boundary {
718 crate::state::Boundary {
719 strategy: "incremental".into(),
720 run_id: None,
721 cursor: cursor.map(|s| s.to_string()),
722 chunk_index,
723 at: chrono::Utc::now(),
724 }
725 }
726
727 #[test]
728 fn boundary_value_cursor_takes_precedence_over_chunk_index() {
729 let b = make_boundary(Some("2025-01-15"), Some(42));
730 assert_eq!(boundary_value(&b), "2025-01-15");
731 }
732
733 #[test]
734 fn boundary_value_chunk_index_used_when_no_cursor() {
735 let b = make_boundary(None, Some(7));
736 assert_eq!(boundary_value(&b), "chunk #7");
737 }
738
739 #[test]
740 fn boundary_value_dash_when_neither_set() {
741 let b = make_boundary(None, None);
742 assert_eq!(boundary_value(&b), "-");
743 }
744
745 #[test]
755 fn state_files_run_id_column_fits_a_40_char_run_id() {
756 let run_id = "transactions_historyy_20250115T143022999"; assert_eq!(run_id.len(), 40, "fixture must be a realistic 40-char id");
758
759 let header = format!(
760 "{:<40} {:<40} {:>8} {:>10} CREATED",
761 "RUN ID", "FILE", "ROWS", "BYTES"
762 );
763 let row = format!(
764 "{:<40} {:<40} {:>8} {:>10} {}",
765 run_id, "orders_001.parquet", 50_000, "4.0KB", "2025-01-15",
766 );
767
768 let header_file_at = header.find("FILE").unwrap();
771 let row_file_at = row.find("orders_001.parquet").unwrap();
772 assert_eq!(
773 header_file_at, row_file_at,
774 "a 40-char RUN ID must not push the FILE column out of alignment\nheader: {header}\nrow: {row}"
775 );
776 assert!(
778 row.starts_with(run_id),
779 "run_id must not be truncated: {row}"
780 );
781 }
782
783 #[test]
790 fn show_state_never_run_hint_points_at_state_show() {
791 let config_path = "rivet.yaml";
792 let hint = format!(
793 "No exports have been run yet.\n \
794 Run `rivet run --config {}` first, then try `rivet state show` again.",
795 config_path
796 );
797 assert!(
798 hint.contains("try `rivet state show` again"),
799 "hint must name the runnable leaf command, not the bare group: {hint}"
800 );
801 assert!(
802 !hint.contains("try `rivet state` again"),
803 "must not point at the bare (subcommand-requiring) group: {hint}"
804 );
805 }
806
807 #[test]
813 fn show_metrics_unknown_export_bails_with_known_names() {
814 let (dir, config_path) = setup_dir(); let _ = open_state(&dir);
816 let err = show_metrics(&config_path, Some("ghost"), 10, false).unwrap_err();
817 let msg = format!("{err:#}");
818 assert!(
819 msg.contains("export 'ghost' is not defined"),
820 "must name the unknown export, not say 'No metrics recorded yet': {msg}"
821 );
822 assert!(
823 msg.contains("orders") && msg.contains("transactions"),
824 "must list the declared exports so the user can spot the typo: {msg}"
825 );
826 }
827
828 #[test]
831 fn show_metrics_known_but_unrun_export_returns_ok() {
832 let (dir, config_path) = setup_dir();
833 let _ = open_state(&dir);
834 assert!(show_metrics(&config_path, Some("orders"), 10, false).is_ok());
835 }
836
837 #[test]
839 fn show_journal_unknown_export_bails_with_known_names() {
840 let (dir, config_path) = setup_dir();
841 let _ = open_state(&dir);
842 let err = show_journal(&config_path, "ghost", 5, None).unwrap_err();
843 let msg = format!("{err:#}");
844 assert!(
845 msg.contains("export 'ghost' is not defined"),
846 "must name the unknown export, not say 'No journal entries … yet': {msg}"
847 );
848 assert!(
849 msg.contains("orders") && msg.contains("transactions"),
850 "must list the declared exports: {msg}"
851 );
852 }
853
854 #[test]
857 fn show_journal_by_run_id_skips_export_name_check() {
858 let (dir, config_path) = setup_dir();
859 let _ = open_state(&dir);
860 assert!(show_journal(&config_path, "ghost", 5, Some("no_such_run")).is_ok());
861 }
862
863 #[test]
864 fn show_state_empty_db_returns_ok() {
865 let (dir, config_path) = setup_dir();
866 let _ = open_state(&dir); assert!(show_state(&config_path, false).is_ok());
868 }
869
870 #[test]
871 fn show_state_with_cursor_record_returns_ok() {
872 let (dir, config_path) = setup_dir();
873 let state = open_state(&dir);
874 state.update("orders", "2025-01-15").unwrap();
875 drop(state);
876 assert!(show_state(&config_path, false).is_ok());
877 }
878
879 #[test]
882 fn show_files_empty_returns_ok() {
883 let (dir, config_path) = setup_dir();
884 let _ = open_state(&dir);
885 assert!(show_files(&config_path, None, 10, false).is_ok());
886 }
887
888 #[test]
889 fn show_files_with_record_returns_ok() {
890 let (dir, config_path) = setup_dir();
891 let state = open_state(&dir);
892 state
893 .record_file(
894 "r1",
895 "orders",
896 "orders_001.parquet",
897 50_000,
898 4096,
899 "parquet",
900 Some("zstd"),
901 )
902 .unwrap();
903 drop(state);
904 assert!(show_files(&config_path, Some("orders"), 10, false).is_ok());
905 }
906
907 #[test]
910 fn show_metrics_empty_returns_ok() {
911 let (dir, config_path) = setup_dir();
912 let _ = open_state(&dir);
913 assert!(show_metrics(&config_path, None, 10, false).is_ok());
914 }
915
916 #[test]
917 fn show_metrics_exercises_flag_and_duration_paths() {
918 let (dir, config_path) = setup_dir();
921 let state = open_state(&dir);
922 state
924 .record_metric(
925 "orders",
926 "r1",
927 1_500,
928 50_000,
929 Some(42),
930 "success",
931 None,
932 Some("balanced"),
933 Some("parquet"),
934 Some("full"),
935 1,
936 4096,
937 3,
938 Some(true),
939 Some(true),
940 )
941 .unwrap();
942 state
944 .record_metric(
945 "orders",
946 "r2",
947 800,
948 0,
949 None,
950 "failed",
951 Some("timeout"),
952 None,
953 None,
954 None,
955 0,
956 0,
957 0,
958 Some(false),
959 None,
960 )
961 .unwrap();
962 drop(state);
963 assert!(show_metrics(&config_path, Some("orders"), 10, false).is_ok());
964 }
965
966 #[test]
969 fn show_journal_empty_returns_ok() {
970 let (dir, config_path) = setup_dir();
971 let _ = open_state(&dir);
972 assert!(show_journal(&config_path, "orders", 5, None).is_ok());
973 }
974
975 #[test]
976 fn show_journal_with_entry_returns_ok() {
977 let (dir, config_path) = setup_dir();
978 let state = open_state(&dir);
979 state
980 .store_journal(&make_journal("run_001", "orders"))
981 .unwrap();
982 drop(state);
983 assert!(show_journal(&config_path, "orders", 5, None).is_ok());
984 }
985
986 #[test]
987 fn show_journal_by_run_id_not_found_returns_ok() {
988 let (dir, config_path) = setup_dir();
989 let _ = open_state(&dir);
990 assert!(show_journal(&config_path, "orders", 5, Some("no_such_run")).is_ok());
991 }
992
993 #[test]
994 fn show_journal_by_run_id_found_returns_ok() {
995 let (dir, config_path) = setup_dir();
996 let state = open_state(&dir);
997 state
998 .store_journal(&make_journal("run_xyz", "orders"))
999 .unwrap();
1000 drop(state);
1001 assert!(show_journal(&config_path, "orders", 5, Some("run_xyz")).is_ok());
1002 }
1003
1004 #[test]
1007 fn reset_state_returns_ok() {
1008 let (dir, config_path) = setup_dir();
1009 write_two_export_config(&config_path);
1010 let state = open_state(&dir);
1011 state.update("orders", "100").unwrap();
1012 drop(state);
1013 assert!(reset_state(&config_path, "orders").is_ok());
1014 }
1015
1016 #[test]
1022 fn reset_state_unknown_export_bails_with_hint() {
1023 let (_dir, config_path) = setup_dir();
1024 write_two_export_config(&config_path);
1025 let err = reset_state(&config_path, "ghost").unwrap_err();
1026 let msg = format!("{err:#}");
1027 assert!(
1028 msg.contains("export 'ghost' not found"),
1029 "must name the missing export: {msg}"
1030 );
1031 assert!(
1032 msg.contains("orders") && msg.contains("transactions"),
1033 "must list the declared exports so the user can spot the typo: {msg}"
1034 );
1035 assert!(
1036 msg.contains("rivet state show"),
1037 "must point at a follow-up command: {msg}"
1038 );
1039 }
1040
1041 #[test]
1044 fn reset_chunk_checkpoint_on_empty_db_returns_ok() {
1045 let (dir, config_path) = setup_dir();
1046 let _ = open_state(&dir);
1047 assert!(reset_chunk_checkpoint(&config_path, "orders").is_ok());
1048 }
1049
1050 #[test]
1055 fn reset_chunk_checkpoint_unknown_export_bails_with_hint() {
1056 let (_dir, config_path) = setup_dir(); let err = reset_chunk_checkpoint(&config_path, "ghost").unwrap_err();
1058 let msg = format!("{err:#}");
1059 assert!(
1060 msg.contains("export 'ghost' not found"),
1061 "must name the missing export: {msg}"
1062 );
1063 assert!(
1064 msg.contains("orders") && msg.contains("transactions"),
1065 "must list the declared exports so the user can spot the typo: {msg}"
1066 );
1067 }
1068
1069 #[test]
1074 fn inspect_commands_bail_on_nonexistent_config() {
1075 let dir = tempfile::TempDir::new().unwrap();
1076 let missing = dir
1077 .path()
1078 .join("does_not_exist.yaml")
1079 .to_str()
1080 .unwrap()
1081 .to_string();
1082
1083 for res in [
1084 show_state(&missing, false),
1085 show_files(&missing, None, 10, false),
1086 show_metrics(&missing, None, 10, false),
1087 show_progression(&missing, None),
1088 show_journal(&missing, "orders", 5, None),
1089 show_chunk_checkpoint(&missing, "orders", false),
1090 reset_state(&missing, "orders"),
1091 reset_chunk_checkpoint(&missing, "orders"),
1092 ] {
1093 let err = res.expect_err("nonexistent config must error, not exit Ok");
1094 assert!(
1095 format!("{err:#}").contains("does_not_exist.yaml"),
1096 "error must name the missing config path: {err:#}"
1097 );
1098 }
1099
1100 assert!(
1103 !dir.path().join(".rivet_state.db").exists(),
1104 "a bad-config inspect must not leak a fresh .rivet_state.db"
1105 );
1106 }
1107
1108 #[test]
1109 fn reset_chunk_checkpoints_stuck_no_rows_returns_ok() {
1110 let (dir, config_path) = setup_dir();
1111 write_two_export_config(&config_path);
1112 let _ = open_state(&dir);
1113 assert!(reset_chunk_checkpoints_stuck(&config_path).is_ok());
1114 }
1115
1116 #[test]
1117 fn reset_chunk_checkpoints_stuck_clears_matching_exports_only() {
1118 let (dir, config_path) = setup_dir();
1119 write_two_export_config(&config_path);
1120 let state = open_state(&dir);
1121 state
1122 .create_chunk_run("r_tx", "transactions", "plan", 3)
1123 .unwrap();
1124 state.create_chunk_run("r_g", "ghost", "plan", 3).unwrap();
1125 drop(state);
1126
1127 reset_chunk_checkpoints_stuck(&config_path).unwrap();
1128
1129 let state = StateStore::open(&config_path).unwrap();
1130 assert!(
1131 state
1132 .find_in_progress_chunk_run("transactions")
1133 .unwrap()
1134 .is_none()
1135 );
1136 assert!(state.find_in_progress_chunk_run("ghost").unwrap().is_some());
1137 assert_eq!(
1138 state.reset_chunk_checkpoint("ghost").unwrap(),
1139 1,
1140 "cleanup ghost row"
1141 );
1142 }
1143
1144 #[test]
1145 fn reset_chunk_checkpoints_stuck_skips_when_only_unknown_exports_stuck() {
1146 let (dir, config_path) = setup_dir();
1147 write_single_export_config(&config_path);
1148 let state = open_state(&dir);
1149 state.create_chunk_run("r_g", "ghost", "plan", 3).unwrap();
1150 drop(state);
1151
1152 reset_chunk_checkpoints_stuck(&config_path).unwrap();
1153
1154 let state = StateStore::open(&config_path).unwrap();
1155 assert!(state.find_in_progress_chunk_run("ghost").unwrap().is_some());
1156 assert_eq!(state.reset_chunk_checkpoint("ghost").unwrap(), 1);
1157 }
1158
1159 #[test]
1162 fn show_progression_empty_returns_ok() {
1163 let (dir, config_path) = setup_dir();
1164 let _ = open_state(&dir);
1165 assert!(show_progression(&config_path, None).is_ok());
1166 }
1167
1168 #[test]
1169 fn show_progression_with_incremental_boundary_returns_ok() {
1170 let (dir, config_path) = setup_dir();
1171 let state = open_state(&dir);
1172 state
1173 .record_committed_incremental("orders", "2025-06-01", "run_001")
1174 .unwrap();
1175 drop(state);
1176 assert!(show_progression(&config_path, Some("orders")).is_ok());
1177 }
1178
1179 #[test]
1182 fn show_chunk_checkpoint_no_data_returns_ok() {
1183 let (dir, config_path) = setup_dir();
1184 let _ = open_state(&dir);
1185 assert!(show_chunk_checkpoint(&config_path, "orders", false).is_ok());
1186 }
1187}