1mod aggregate;
10mod apply_cmd;
11pub(crate) mod chunked;
12mod cli;
13mod finalize;
14pub(crate) mod ipc;
15mod job;
16mod manifest_writer;
17mod parallel_children;
18pub(crate) mod parent_ui;
19mod plan_cmd;
20pub(crate) mod progress;
21mod reconcile_cmd;
22mod repair_cmd;
23pub(crate) mod report;
24mod resume_decisions;
25mod retry;
26mod run;
32mod single;
33mod sink;
34mod summary;
35mod validate;
36mod validate_cmd;
37mod validate_manifest;
38
39pub use apply_cmd::run_apply_command;
46pub use cli::{
47 reset_chunk_checkpoint, reset_chunk_checkpoints_stuck, reset_state, show_chunk_checkpoint,
48 show_files, show_journal, show_metrics, show_progression, show_state,
49};
50pub use plan_cmd::{PlanOutputFormat, run_plan_command};
51pub use reconcile_cmd::{ReconcileOutputFormat, run_reconcile_command};
52pub use repair_cmd::{RepairOutputFormat, RepairReportSource, run_repair_command};
53pub use validate_cmd::{ValidateOutputFormat, ValidateTarget, run_validate_command};
54
55pub use summary::RunSummary;
59
60pub(crate) use job::run_export_job_with_chunk_source;
63#[cfg(test)]
64#[allow(unused_imports)]
65pub(crate) use retry::is_transient;
66
67#[doc(hidden)]
82pub mod for_tests {
83 pub use super::chunked::generate_chunks;
84 pub use super::manifest_writer::{ManifestBuilder, WriteOutcome, write_manifest};
85 pub use super::report::{RunReport, report_dir, write_run_report};
86 pub use super::resume_decisions::{
87 PartDecision, QuarantineReason, ResumeDecision, ResumePlan, UntrackedDecision,
88 build_resume_plan,
89 };
90 pub use super::retry::{RetryClass, classify_error};
91 pub use super::validate::validate_output;
92 pub use super::validate_manifest::{
93 Failure as ManifestVerificationFailure, ManifestVerification, verify_at_destination,
94 };
95 pub use crate::plan::build_time_window_query;
96}
97
98#[doc(hidden)]
106#[allow(unused_imports)]
107pub use for_tests::{
108 ManifestBuilder, ManifestVerification, ManifestVerificationFailure, PartDecision,
109 QuarantineReason, ResumeDecision, ResumePlan, RetryClass, RunReport, UntrackedDecision,
110 WriteOutcome, build_resume_plan, build_time_window_query, classify_error, generate_chunks,
111 report_dir, validate_output, verify_at_destination, write_manifest, write_run_report,
112};
113
114pub use run::{RunOptions, run};
119#[allow(unused_imports)] pub(crate) use run::{multi_export_concurrent, multi_export_mode};
121
122pub(crate) fn format_bytes(b: u64) -> String {
123 if b >= 1_073_741_824 {
124 format!("{:.1} GB", b as f64 / 1_073_741_824.0)
125 } else if b >= 1_048_576 {
126 format!("{:.1} MB", b as f64 / 1_048_576.0)
127 } else if b >= 1024 {
128 format!("{:.1} KB", b as f64 / 1024.0)
129 } else {
130 format!("{} B", b)
131 }
132}
133
134pub(crate) fn strip_chunked_recovery_hint(msg: &str) -> (&str, bool) {
147 let mut pos = 0;
148 while let Some(off) = msg[pos..].find("; ") {
149 let abs = pos + off;
150 let tail = &msg[abs + 2..];
151 if tail.contains("`rivet ") {
152 return (&msg[..abs], true);
153 }
154 pos = abs + 2;
155 }
156 (msg, false)
157}
158
159pub(crate) fn clamp_line(s: &str, max_chars: usize) -> String {
165 if max_chars == 0 {
166 return String::new();
167 }
168 if s.chars().count() <= max_chars {
169 return s.to_string();
170 }
171 let keep = max_chars.saturating_sub(1);
172 let mut out: String = s.chars().take(keep).collect();
173 out.push('…');
174 out
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180 use crate::config::{SourceConfig, SourceType};
181 use crate::plan::{
182 CompressionType, DestinationConfig, DestinationType, DiagnosticLevel, ExtractionStrategy,
183 FormatType, MetaColumns, ResolvedRunPlan, validate_plan,
184 };
185 use crate::tuning::SourceTuning;
186
187 #[test]
188 fn test_format_bytes() {
189 assert_eq!(format_bytes(500), "500 B");
190 assert_eq!(format_bytes(1024), "1.0 KB");
191 assert_eq!(format_bytes(1536), "1.5 KB");
192 assert_eq!(format_bytes(1_048_576), "1.0 MB");
193 assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
194 assert_eq!(format_bytes(2_684_354_560), "2.5 GB");
195 }
196
197 #[test]
198 fn strip_chunked_recovery_hint_strips_use_form() {
199 let m = "export 'users': chunk checkpoint run 'users_x' still in progress; \
200 use `rivet run --config foo.yaml --export users --resume` or \
201 `rivet state reset-chunks --config foo.yaml --export users`";
202 let (cause, hinted) = strip_chunked_recovery_hint(m);
203 assert!(hinted);
204 assert_eq!(
205 cause,
206 "export 'users': chunk checkpoint run 'users_x' still in progress"
207 );
208 }
209
210 #[test]
211 fn strip_chunked_recovery_hint_strips_fix_errors_form() {
212 let m = "export 'a': chunk checkpoint incomplete (3 tasks not completed); \
213 fix errors and `rivet run --config c.yaml --export a --resume` or \
214 `rivet state reset-chunks --config c.yaml --export a`";
215 let (cause, hinted) = strip_chunked_recovery_hint(m);
216 assert!(hinted);
217 assert_eq!(
218 cause,
219 "export 'a': chunk checkpoint incomplete (3 tasks not completed)"
220 );
221 }
222
223 #[test]
224 fn strip_chunked_recovery_hint_passthrough_when_no_hint() {
225 let m = "export 'q': source connection refused; retry exhausted";
226 let (cause, hinted) = strip_chunked_recovery_hint(m);
227 assert!(!hinted);
228 assert_eq!(cause, m);
229 }
230
231 #[test]
232 fn clamp_line_truncates_with_ellipsis() {
233 assert_eq!(clamp_line("short", 80), "short");
234 assert_eq!(clamp_line("hello world", 8), "hello w…");
235 let s = "αβγδ".repeat(50);
236 let out = clamp_line(&s, 10);
237 assert_eq!(out.chars().count(), 10);
238 assert!(out.ends_with('…'));
239 }
240
241 #[test]
242 fn format_bytes_boundary_values() {
243 assert_eq!(format_bytes(0), "0 B");
244 assert_eq!(format_bytes(1), "1 B");
245 assert_eq!(format_bytes(1023), "1023 B");
246 assert_eq!(format_bytes(1024), "1.0 KB");
247 assert_eq!(format_bytes(1025), "1.0 KB");
248 assert_eq!(format_bytes(1_048_575), "1024.0 KB");
249 assert_eq!(format_bytes(1_048_576), "1.0 MB");
250 assert_eq!(format_bytes(1_073_741_823), "1024.0 MB");
251 assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
252 }
253
254 fn minimal_plan() -> ResolvedRunPlan {
255 ResolvedRunPlan {
256 export_name: "test_export".into(),
257 base_query: "SELECT 1".into(),
258 strategy: ExtractionStrategy::Snapshot,
259 format: FormatType::Parquet,
260 compression: CompressionType::default(),
261 compression_level: None,
262 max_file_size_bytes: None,
263 skip_empty: false,
264 meta_columns: MetaColumns::default(),
265 destination: DestinationConfig {
266 destination_type: DestinationType::Local,
267 path: Some("./out".into()),
268 ..Default::default()
269 },
270 quality: None,
271 tuning: SourceTuning::from_config(None),
272 tuning_profile_label: "balanced (default)".into(),
273 validate: false,
274 reconcile: false,
275 resume: false,
276 source: SourceConfig {
277 source_type: SourceType::Postgres,
278 url: Some("postgresql://localhost/test".into()),
279 url_env: None,
280 url_file: None,
281 host: None,
282 port: None,
283 user: None,
284 password: None,
285 password_env: None,
286 database: None,
287 environment: None,
288 tuning: None,
289 tls: None,
290 },
291 column_overrides: Default::default(),
292 schema_drift_policy: Default::default(),
293 shape_drift_warn_factor: 2.0,
294 parquet: None,
295 }
296 }
297
298 #[test]
299 fn test_run_summary_fields() {
300 let plan = minimal_plan();
301 let summary = RunSummary::new(&plan);
302 assert_eq!(summary.export_name, "test_export");
303 assert_eq!(summary.status, "running");
304 assert_eq!(summary.total_rows, 0);
305 assert_eq!(summary.files_produced, 0);
306 assert_eq!(summary.tuning_profile, "balanced (default)");
307 assert_eq!(summary.batch_size, 10_000);
308 assert_eq!(summary.format, "parquet");
309 assert_eq!(summary.mode, "full");
310 assert!(
311 summary.run_id.starts_with("test_export_"),
312 "run_id should start with export name, got: {}",
313 summary.run_id
314 );
315 }
316
317 #[test]
322 fn run_summary_new_records_plan_resolved_as_first_event() {
323 let plan = minimal_plan();
324 let summary = RunSummary::new(&plan);
325
326 assert!(
327 !summary.journal.entries.is_empty(),
328 "journal must have at least one entry after RunSummary::new()"
329 );
330 assert!(
331 matches!(
332 summary.journal.entries[0].event,
333 crate::journal::RunEvent::PlanResolved(_)
334 ),
335 "first journal event must be PlanResolved, got: {:?}",
336 summary.journal.entries[0].event
337 );
338 }
339
340 #[test]
343 fn run_summary_plan_snapshot_matches_plan_fields() {
344 let plan = minimal_plan();
345 let summary = RunSummary::new(&plan);
346
347 let snap = summary
348 .journal
349 .plan_snapshot()
350 .expect("plan_snapshot() must be Some after RunSummary::new()");
351
352 assert_eq!(snap.export_name, plan.export_name);
353 assert_eq!(snap.validate, plan.validate);
354 assert_eq!(snap.reconcile, plan.reconcile);
355 assert_eq!(snap.resume, plan.resume);
356 assert_eq!(snap.batch_size, plan.tuning.batch_size);
357 }
358
359 #[test]
361 fn run_summary_journal_run_id_matches_summary_run_id() {
362 let plan = minimal_plan();
363 let summary = RunSummary::new(&plan);
364 assert_eq!(
365 summary.journal.run_id, summary.run_id,
366 "journal run_id must match summary run_id"
367 );
368 }
369
370 #[test]
381 fn rejected_plan_produces_rejected_diagnostic_blocking_run_export_job() {
382 let mut plan = minimal_plan();
383 plan.destination.destination_type = DestinationType::Stdout;
385 plan.max_file_size_bytes = Some(10 * 1024 * 1024);
386
387 let diags = validate_plan(&plan);
388 let rejected_count = diags
389 .iter()
390 .filter(|d| d.level == DiagnosticLevel::Rejected)
391 .count();
392
393 assert!(
394 rejected_count > 0,
395 "stdout + max_file_size must produce a Rejected diagnostic so that \
396 run_export_job bails before calling run_with_reconnect; got: {:?}",
397 diags
398 .iter()
399 .map(|d| (&d.rule, &d.level))
400 .collect::<Vec<_>>()
401 );
402 }
403
404 #[test]
406 fn rejected_plan_stdout_chunked_blocks_run_export_job() {
407 use crate::plan::ChunkedPlan;
408 let mut plan = minimal_plan();
409 plan.destination.destination_type = DestinationType::Stdout;
410 plan.strategy = ExtractionStrategy::Chunked(ChunkedPlan {
411 column: "id".into(),
412 chunk_size: 1000,
413 chunk_count: None,
414 parallel: 1,
415 dense: false,
416 by_days: None,
417 max_attempts: 3,
418 checkpoint: false,
419 });
420
421 let diags = validate_plan(&plan);
422 assert!(
423 diags.iter().any(|d| d.level == DiagnosticLevel::Rejected),
424 "stdout + chunked must produce a Rejected diagnostic"
425 );
426 }
427
428 #[test]
435 fn synthetic_failed_summary_carries_error_and_status() {
436 let err = anyhow::anyhow!("could not connect to source: timeout");
437 let s = job::synthetic_failed_summary("orders", &err);
438 assert_eq!(s.export_name, "orders");
439 assert_eq!(s.status, "failed");
440 assert_eq!(
441 s.error_message.as_deref(),
442 Some("could not connect to source: timeout")
443 );
444 assert!(
445 s.run_id.starts_with("orders_"),
446 "run_id must be derived from export name, got {}",
447 s.run_id
448 );
449 assert_eq!(s.total_rows, 0);
451 assert_eq!(s.files_produced, 0);
452 assert_eq!(s.bytes_written, 0);
453 assert_eq!(s.duration_ms, 0);
454 }
455
456 #[test]
460 fn aggregate_entry_from_summary_copies_observable_fields() {
461 let plan = minimal_plan();
462 let mut summary = RunSummary::new(&plan);
463 summary.status = "success".into();
464 summary.total_rows = 12_345;
465 summary.files_produced = 3;
466 summary.bytes_written = 9_876_543;
467 summary.duration_ms = 5_000;
468
469 let entry = aggregate::entry_from_summary(&summary);
470 assert_eq!(entry.export_name, summary.export_name);
471 assert_eq!(entry.status, "success");
472 assert_eq!(entry.run_id, summary.run_id);
473 assert_eq!(entry.rows, 12_345);
474 assert_eq!(entry.files, 3);
475 assert_eq!(entry.bytes, 9_876_543);
476 assert_eq!(entry.duration_ms, 5_000);
477 assert_eq!(entry.mode, summary.mode);
478 assert_eq!(entry.error_message, None);
479 }
480}