1mod aggregate;
10mod apply_cmd;
11mod cdc_job;
12pub(crate) mod chunked;
13mod cli;
14pub(crate) mod commit;
15mod finalize;
16pub(crate) mod ipc;
17mod job;
18mod keyset;
19mod manifest_reconcile;
20pub(crate) mod manifest_writer;
21mod parallel_children;
22pub(crate) mod parent_ui;
23mod partition_expand;
24mod plan_cmd;
25pub(crate) mod progress;
26mod reconcile_cmd;
27mod repair_cmd;
28pub(crate) mod report;
29mod resume_decisions;
30pub(crate) mod retry;
33mod run;
39mod run_store;
40mod schema_drift;
41mod single;
42mod sink;
43mod summary;
44mod validate;
45mod validate_cmd;
46mod validate_manifest;
47
48pub use apply_cmd::run_apply_command;
55pub use cli::{
56 reset_chunk_checkpoint, reset_chunk_checkpoints_stuck, reset_state, show_chunk_checkpoint,
57 show_files, show_journal, show_metrics, show_progression, show_state,
58};
59pub use plan_cmd::{PlanOutputFormat, run_plan_command};
60pub use reconcile_cmd::{ReconcileOutputFormat, run_reconcile_command};
61pub use repair_cmd::{RepairOutputFormat, RepairReportSource, run_repair_command};
62pub use validate_cmd::{ValidateOutputFormat, ValidateTarget, run_validate_command};
63pub use validate_manifest::ValidateDepth;
68
69pub use summary::RunSummary;
73
74pub(crate) use job::run_export_job_with_chunk_source;
77#[cfg(test)]
78#[allow(unused_imports)]
79pub(crate) use retry::is_transient;
80
81#[doc(hidden)]
96pub mod for_tests {
97 pub use super::chunked::generate_chunks;
98 pub use super::manifest_writer::{ManifestBuilder, WriteOutcome, write_manifest};
99 pub use super::report::{RunReport, report_dir, write_run_report};
100 pub use super::resume_decisions::{
101 PartDecision, QuarantineReason, ResumeDecision, ResumePlan, UntrackedDecision,
102 build_resume_plan,
103 };
104 pub use super::retry::{RetryClass, classify_error};
105 pub use super::validate::validate_output;
106 pub use super::validate_manifest::{
107 Failure as ManifestVerificationFailure, ManifestVerification, verify_at_destination,
108 };
109 pub use crate::plan::build_time_window_query;
110}
111
112#[doc(hidden)]
120#[allow(unused_imports)]
121pub use for_tests::{
122 ManifestBuilder, ManifestVerification, ManifestVerificationFailure, PartDecision,
123 QuarantineReason, ResumeDecision, ResumePlan, RetryClass, RunReport, UntrackedDecision,
124 WriteOutcome, build_resume_plan, build_time_window_query, classify_error, generate_chunks,
125 report_dir, validate_output, verify_at_destination, write_manifest, write_run_report,
126};
127
128pub use run::{RunOptions, run};
133#[allow(unused_imports)] pub(crate) use run::{multi_export_concurrent, multi_export_mode};
135
136pub(crate) fn format_bytes(b: u64) -> String {
137 if b >= 1_073_741_824 {
138 format!("{:.1} GB", b as f64 / 1_073_741_824.0)
139 } else if b >= 1_048_576 {
140 format!("{:.1} MB", b as f64 / 1_048_576.0)
141 } else if b >= 1024 {
142 format!("{:.1} KB", b as f64 / 1024.0)
143 } else {
144 format!("{} B", b)
145 }
146}
147
148pub(crate) fn strip_chunked_recovery_hint(msg: &str) -> (&str, bool) {
161 let mut pos = 0;
162 while let Some(off) = msg[pos..].find("; ") {
163 let abs = pos + off;
164 let tail = &msg[abs + 2..];
165 if tail.contains("`rivet ") {
166 return (&msg[..abs], true);
167 }
168 pos = abs + 2;
169 }
170 (msg, false)
171}
172
173pub(crate) fn clamp_line(s: &str, max_chars: usize) -> String {
179 if max_chars == 0 {
180 return String::new();
181 }
182 if s.chars().count() <= max_chars {
183 return s.to_string();
184 }
185 let keep = max_chars.saturating_sub(1);
186 let mut out: String = s.chars().take(keep).collect();
187 out.push('…');
188 out
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use crate::config::{SourceConfig, SourceType};
195 use crate::plan::{
196 CompressionType, DestinationConfig, DestinationType, DiagnosticLevel, ExtractionStrategy,
197 FormatType, MetaColumns, ResolvedRunPlan, validate_plan,
198 };
199 use crate::tuning::SourceTuning;
200
201 #[test]
202 fn test_format_bytes() {
203 assert_eq!(format_bytes(500), "500 B");
204 assert_eq!(format_bytes(1024), "1.0 KB");
205 assert_eq!(format_bytes(1536), "1.5 KB");
206 assert_eq!(format_bytes(1_048_576), "1.0 MB");
207 assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
208 assert_eq!(format_bytes(2_684_354_560), "2.5 GB");
209 }
210
211 #[test]
212 fn strip_chunked_recovery_hint_strips_use_form() {
213 let m = "export 'users': chunk checkpoint run 'users_x' still in progress; \
214 use `rivet run --config foo.yaml --export users --resume` or \
215 `rivet state reset-chunks --config foo.yaml --export users`";
216 let (cause, hinted) = strip_chunked_recovery_hint(m);
217 assert!(hinted);
218 assert_eq!(
219 cause,
220 "export 'users': chunk checkpoint run 'users_x' still in progress"
221 );
222 }
223
224 #[test]
225 fn strip_chunked_recovery_hint_strips_fix_errors_form() {
226 let m = "export 'a': chunk checkpoint incomplete (3 tasks not completed); \
227 fix errors and `rivet run --config c.yaml --export a --resume` or \
228 `rivet state reset-chunks --config c.yaml --export a`";
229 let (cause, hinted) = strip_chunked_recovery_hint(m);
230 assert!(hinted);
231 assert_eq!(
232 cause,
233 "export 'a': chunk checkpoint incomplete (3 tasks not completed)"
234 );
235 }
236
237 #[test]
238 fn strip_chunked_recovery_hint_passthrough_when_no_hint() {
239 let m = "export 'q': source connection refused; retry exhausted";
240 let (cause, hinted) = strip_chunked_recovery_hint(m);
241 assert!(!hinted);
242 assert_eq!(cause, m);
243 }
244
245 #[test]
246 fn clamp_line_truncates_with_ellipsis() {
247 assert_eq!(clamp_line("short", 80), "short");
248 assert_eq!(clamp_line("hello world", 8), "hello w…");
249 let s = "αβγδ".repeat(50);
250 let out = clamp_line(&s, 10);
251 assert_eq!(out.chars().count(), 10);
252 assert!(out.ends_with('…'));
253 }
254
255 #[test]
256 fn format_bytes_boundary_values() {
257 assert_eq!(format_bytes(0), "0 B");
258 assert_eq!(format_bytes(1), "1 B");
259 assert_eq!(format_bytes(1023), "1023 B");
260 assert_eq!(format_bytes(1024), "1.0 KB");
261 assert_eq!(format_bytes(1025), "1.0 KB");
262 assert_eq!(format_bytes(1_048_575), "1024.0 KB");
263 assert_eq!(format_bytes(1_048_576), "1.0 MB");
264 assert_eq!(format_bytes(1_073_741_823), "1024.0 MB");
265 assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
266 }
267
268 fn minimal_plan() -> ResolvedRunPlan {
269 ResolvedRunPlan {
270 export_name: "test_export".into(),
271 base_query: "SELECT 1".into(),
272 strategy: ExtractionStrategy::Snapshot,
273 format: FormatType::Parquet,
274 compression: CompressionType::default(),
275 compression_level: None,
276 max_file_size_bytes: None,
277 skip_empty: false,
278 meta_columns: MetaColumns::default(),
279 destination: DestinationConfig {
280 destination_type: DestinationType::Local,
281 path: Some("./out".into()),
282 ..Default::default()
283 },
284 quality: None,
285 tuning: SourceTuning::from_config(None),
286 tuning_profile_label: "balanced (default)".into(),
287 validate: false,
288 reconcile: false,
289 resume: false,
290 source: SourceConfig {
291 source_type: SourceType::Postgres,
292 url: Some("postgresql://localhost/test".into()),
293 url_env: None,
294 url_file: None,
295 host: None,
296 port: None,
297 user: None,
298 password: None,
299 password_env: None,
300 database: None,
301 environment: None,
302 tuning: None,
303 tls: None,
304 },
305 column_overrides: Default::default(),
306 verify: crate::config::VerifyMode::Size,
307 schema_drift_policy: Default::default(),
308 shape_drift_warn_factor: 2.0,
309 parquet: None,
310 }
311 }
312
313 #[test]
314 fn test_run_summary_fields() {
315 let plan = minimal_plan();
316 let summary = RunSummary::new(&plan);
317 assert_eq!(summary.export_name, "test_export");
318 assert_eq!(summary.status, "running");
319 assert_eq!(summary.total_rows, 0);
320 assert_eq!(summary.files_produced, 0);
321 assert_eq!(summary.tuning_profile, "balanced (default)");
322 assert_eq!(summary.batch_size, 10_000);
323 assert_eq!(summary.format, "parquet");
324 assert_eq!(summary.mode, "full");
325 assert!(
326 summary.run_id.starts_with("test_export_"),
327 "run_id should start with export name, got: {}",
328 summary.run_id
329 );
330 }
331
332 #[test]
337 fn run_summary_new_records_plan_resolved_as_first_event() {
338 let plan = minimal_plan();
339 let summary = RunSummary::new(&plan);
340
341 assert!(
342 !summary.journal.entries.is_empty(),
343 "journal must have at least one entry after RunSummary::new()"
344 );
345 assert!(
346 matches!(
347 summary.journal.entries[0].event,
348 crate::journal::RunEvent::PlanResolved(_)
349 ),
350 "first journal event must be PlanResolved, got: {:?}",
351 summary.journal.entries[0].event
352 );
353 }
354
355 #[test]
358 fn run_summary_plan_snapshot_matches_plan_fields() {
359 let plan = minimal_plan();
360 let summary = RunSummary::new(&plan);
361
362 let snap = summary
363 .journal
364 .plan_snapshot()
365 .expect("plan_snapshot() must be Some after RunSummary::new()");
366
367 assert_eq!(snap.export_name, plan.export_name);
368 assert_eq!(snap.validate, plan.validate);
369 assert_eq!(snap.reconcile, plan.reconcile);
370 assert_eq!(snap.resume, plan.resume);
371 assert_eq!(snap.batch_size, plan.tuning.batch_size);
372 }
373
374 #[test]
376 fn run_summary_journal_run_id_matches_summary_run_id() {
377 let plan = minimal_plan();
378 let summary = RunSummary::new(&plan);
379 assert_eq!(
380 summary.journal.run_id, summary.run_id,
381 "journal run_id must match summary run_id"
382 );
383 }
384
385 #[test]
396 fn rejected_plan_produces_rejected_diagnostic_blocking_run_export_job() {
397 let mut plan = minimal_plan();
398 plan.destination.destination_type = DestinationType::Stdout;
400 plan.max_file_size_bytes = Some(10 * 1024 * 1024);
401
402 let diags = validate_plan(&plan);
403 let rejected_count = diags
404 .iter()
405 .filter(|d| d.level == DiagnosticLevel::Rejected)
406 .count();
407
408 assert!(
409 rejected_count > 0,
410 "stdout + max_file_size must produce a Rejected diagnostic so that \
411 run_export_job bails before calling run_with_reconnect; got: {:?}",
412 diags
413 .iter()
414 .map(|d| (&d.rule, &d.level))
415 .collect::<Vec<_>>()
416 );
417 }
418
419 #[test]
421 fn rejected_plan_stdout_chunked_blocks_run_export_job() {
422 use crate::plan::ChunkedPlan;
423 let mut plan = minimal_plan();
424 plan.destination.destination_type = DestinationType::Stdout;
425 plan.strategy = ExtractionStrategy::Chunked(ChunkedPlan {
426 column: "id".into(),
427 chunk_size: 1000,
428 chunk_count: None,
429 parallel: 1,
430 dense: false,
431 by_days: None,
432 max_attempts: 3,
433 checkpoint: false,
434 });
435
436 let diags = validate_plan(&plan);
437 assert!(
438 diags.iter().any(|d| d.level == DiagnosticLevel::Rejected),
439 "stdout + chunked must produce a Rejected diagnostic"
440 );
441 }
442
443 #[test]
450 fn synthetic_failed_summary_carries_error_and_status() {
451 let err = anyhow::anyhow!("could not connect to source: timeout");
452 let s = job::synthetic_failed_summary("orders", &err);
453 assert_eq!(s.export_name, "orders");
454 assert_eq!(s.status, "failed");
455 assert_eq!(
456 s.error_message.as_deref(),
457 Some("could not connect to source: timeout")
458 );
459 assert!(
460 s.run_id.starts_with("orders_"),
461 "run_id must be derived from export name, got {}",
462 s.run_id
463 );
464 assert_eq!(s.total_rows, 0);
466 assert_eq!(s.files_produced, 0);
467 assert_eq!(s.bytes_written, 0);
468 assert_eq!(s.duration_ms, 0);
469 }
470
471 #[test]
475 fn aggregate_entry_from_summary_copies_observable_fields() {
476 let plan = minimal_plan();
477 let mut summary = RunSummary::new(&plan);
478 summary.status = "success".into();
479 summary.total_rows = 12_345;
480 summary.files_produced = 3;
481 summary.bytes_written = 9_876_543;
482 summary.duration_ms = 5_000;
483
484 let entry = aggregate::entry_from_summary(&summary);
485 assert_eq!(entry.export_name, summary.export_name);
486 assert_eq!(entry.status, "success");
487 assert_eq!(entry.run_id, summary.run_id);
488 assert_eq!(entry.rows, 12_345);
489 assert_eq!(entry.files, 3);
490 assert_eq!(entry.bytes, 9_876_543);
491 assert_eq!(entry.duration_ms, 5_000);
492 assert_eq!(entry.mode, summary.mode);
493 assert_eq!(entry.error_message, None);
494 }
495}