1mod aggregate;
10mod apply_cmd;
11pub(crate) mod chunked;
12mod cli;
13mod commit;
14mod finalize;
15pub(crate) mod ipc;
16mod job;
17mod keyset;
18mod manifest_reconcile;
19mod manifest_writer;
20mod parallel_children;
21pub(crate) mod parent_ui;
22mod partition_expand;
23mod plan_cmd;
24pub(crate) mod progress;
25mod reconcile_cmd;
26mod repair_cmd;
27pub(crate) mod report;
28mod resume_decisions;
29pub(crate) mod retry;
32mod run;
38mod run_store;
39mod schema_drift;
40mod single;
41mod sink;
42mod summary;
43mod validate;
44mod validate_cmd;
45mod validate_manifest;
46
47pub use apply_cmd::run_apply_command;
54pub use cli::{
55 reset_chunk_checkpoint, reset_chunk_checkpoints_stuck, reset_state, show_chunk_checkpoint,
56 show_files, show_journal, show_metrics, show_progression, show_state,
57};
58pub use plan_cmd::{PlanOutputFormat, run_plan_command};
59pub use reconcile_cmd::{ReconcileOutputFormat, run_reconcile_command};
60pub use repair_cmd::{RepairOutputFormat, RepairReportSource, run_repair_command};
61pub use validate_cmd::{ValidateOutputFormat, ValidateTarget, run_validate_command};
62
63pub use summary::RunSummary;
67
68pub(crate) use job::run_export_job_with_chunk_source;
71#[cfg(test)]
72#[allow(unused_imports)]
73pub(crate) use retry::is_transient;
74
75#[doc(hidden)]
90pub mod for_tests {
91 pub use super::chunked::generate_chunks;
92 pub use super::manifest_writer::{ManifestBuilder, WriteOutcome, write_manifest};
93 pub use super::report::{RunReport, report_dir, write_run_report};
94 pub use super::resume_decisions::{
95 PartDecision, QuarantineReason, ResumeDecision, ResumePlan, UntrackedDecision,
96 build_resume_plan,
97 };
98 pub use super::retry::{RetryClass, classify_error};
99 pub use super::validate::validate_output;
100 pub use super::validate_manifest::{
101 Failure as ManifestVerificationFailure, ManifestVerification, verify_at_destination,
102 };
103 pub use crate::plan::build_time_window_query;
104}
105
106#[doc(hidden)]
114#[allow(unused_imports)]
115pub use for_tests::{
116 ManifestBuilder, ManifestVerification, ManifestVerificationFailure, PartDecision,
117 QuarantineReason, ResumeDecision, ResumePlan, RetryClass, RunReport, UntrackedDecision,
118 WriteOutcome, build_resume_plan, build_time_window_query, classify_error, generate_chunks,
119 report_dir, validate_output, verify_at_destination, write_manifest, write_run_report,
120};
121
122pub use run::{RunOptions, run};
127#[allow(unused_imports)] pub(crate) use run::{multi_export_concurrent, multi_export_mode};
129
130pub(crate) fn format_bytes(b: u64) -> String {
131 if b >= 1_073_741_824 {
132 format!("{:.1} GB", b as f64 / 1_073_741_824.0)
133 } else if b >= 1_048_576 {
134 format!("{:.1} MB", b as f64 / 1_048_576.0)
135 } else if b >= 1024 {
136 format!("{:.1} KB", b as f64 / 1024.0)
137 } else {
138 format!("{} B", b)
139 }
140}
141
142pub(crate) fn strip_chunked_recovery_hint(msg: &str) -> (&str, bool) {
155 let mut pos = 0;
156 while let Some(off) = msg[pos..].find("; ") {
157 let abs = pos + off;
158 let tail = &msg[abs + 2..];
159 if tail.contains("`rivet ") {
160 return (&msg[..abs], true);
161 }
162 pos = abs + 2;
163 }
164 (msg, false)
165}
166
167pub(crate) fn clamp_line(s: &str, max_chars: usize) -> String {
173 if max_chars == 0 {
174 return String::new();
175 }
176 if s.chars().count() <= max_chars {
177 return s.to_string();
178 }
179 let keep = max_chars.saturating_sub(1);
180 let mut out: String = s.chars().take(keep).collect();
181 out.push('…');
182 out
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use crate::config::{SourceConfig, SourceType};
189 use crate::plan::{
190 CompressionType, DestinationConfig, DestinationType, DiagnosticLevel, ExtractionStrategy,
191 FormatType, MetaColumns, ResolvedRunPlan, validate_plan,
192 };
193 use crate::tuning::SourceTuning;
194
195 #[test]
196 fn test_format_bytes() {
197 assert_eq!(format_bytes(500), "500 B");
198 assert_eq!(format_bytes(1024), "1.0 KB");
199 assert_eq!(format_bytes(1536), "1.5 KB");
200 assert_eq!(format_bytes(1_048_576), "1.0 MB");
201 assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
202 assert_eq!(format_bytes(2_684_354_560), "2.5 GB");
203 }
204
205 #[test]
206 fn strip_chunked_recovery_hint_strips_use_form() {
207 let m = "export 'users': chunk checkpoint run 'users_x' still in progress; \
208 use `rivet run --config foo.yaml --export users --resume` or \
209 `rivet state reset-chunks --config foo.yaml --export users`";
210 let (cause, hinted) = strip_chunked_recovery_hint(m);
211 assert!(hinted);
212 assert_eq!(
213 cause,
214 "export 'users': chunk checkpoint run 'users_x' still in progress"
215 );
216 }
217
218 #[test]
219 fn strip_chunked_recovery_hint_strips_fix_errors_form() {
220 let m = "export 'a': chunk checkpoint incomplete (3 tasks not completed); \
221 fix errors and `rivet run --config c.yaml --export a --resume` or \
222 `rivet state reset-chunks --config c.yaml --export a`";
223 let (cause, hinted) = strip_chunked_recovery_hint(m);
224 assert!(hinted);
225 assert_eq!(
226 cause,
227 "export 'a': chunk checkpoint incomplete (3 tasks not completed)"
228 );
229 }
230
231 #[test]
232 fn strip_chunked_recovery_hint_passthrough_when_no_hint() {
233 let m = "export 'q': source connection refused; retry exhausted";
234 let (cause, hinted) = strip_chunked_recovery_hint(m);
235 assert!(!hinted);
236 assert_eq!(cause, m);
237 }
238
239 #[test]
240 fn clamp_line_truncates_with_ellipsis() {
241 assert_eq!(clamp_line("short", 80), "short");
242 assert_eq!(clamp_line("hello world", 8), "hello w…");
243 let s = "αβγδ".repeat(50);
244 let out = clamp_line(&s, 10);
245 assert_eq!(out.chars().count(), 10);
246 assert!(out.ends_with('…'));
247 }
248
249 #[test]
250 fn format_bytes_boundary_values() {
251 assert_eq!(format_bytes(0), "0 B");
252 assert_eq!(format_bytes(1), "1 B");
253 assert_eq!(format_bytes(1023), "1023 B");
254 assert_eq!(format_bytes(1024), "1.0 KB");
255 assert_eq!(format_bytes(1025), "1.0 KB");
256 assert_eq!(format_bytes(1_048_575), "1024.0 KB");
257 assert_eq!(format_bytes(1_048_576), "1.0 MB");
258 assert_eq!(format_bytes(1_073_741_823), "1024.0 MB");
259 assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
260 }
261
262 fn minimal_plan() -> ResolvedRunPlan {
263 ResolvedRunPlan {
264 export_name: "test_export".into(),
265 base_query: "SELECT 1".into(),
266 strategy: ExtractionStrategy::Snapshot,
267 format: FormatType::Parquet,
268 compression: CompressionType::default(),
269 compression_level: None,
270 max_file_size_bytes: None,
271 skip_empty: false,
272 meta_columns: MetaColumns::default(),
273 destination: DestinationConfig {
274 destination_type: DestinationType::Local,
275 path: Some("./out".into()),
276 ..Default::default()
277 },
278 quality: None,
279 tuning: SourceTuning::from_config(None),
280 tuning_profile_label: "balanced (default)".into(),
281 validate: false,
282 reconcile: false,
283 resume: false,
284 source: SourceConfig {
285 source_type: SourceType::Postgres,
286 url: Some("postgresql://localhost/test".into()),
287 url_env: None,
288 url_file: None,
289 host: None,
290 port: None,
291 user: None,
292 password: None,
293 password_env: None,
294 database: None,
295 environment: None,
296 tuning: None,
297 tls: None,
298 },
299 column_overrides: Default::default(),
300 verify: crate::config::VerifyMode::Size,
301 schema_drift_policy: Default::default(),
302 shape_drift_warn_factor: 2.0,
303 parquet: None,
304 }
305 }
306
307 #[test]
308 fn test_run_summary_fields() {
309 let plan = minimal_plan();
310 let summary = RunSummary::new(&plan);
311 assert_eq!(summary.export_name, "test_export");
312 assert_eq!(summary.status, "running");
313 assert_eq!(summary.total_rows, 0);
314 assert_eq!(summary.files_produced, 0);
315 assert_eq!(summary.tuning_profile, "balanced (default)");
316 assert_eq!(summary.batch_size, 10_000);
317 assert_eq!(summary.format, "parquet");
318 assert_eq!(summary.mode, "full");
319 assert!(
320 summary.run_id.starts_with("test_export_"),
321 "run_id should start with export name, got: {}",
322 summary.run_id
323 );
324 }
325
326 #[test]
331 fn run_summary_new_records_plan_resolved_as_first_event() {
332 let plan = minimal_plan();
333 let summary = RunSummary::new(&plan);
334
335 assert!(
336 !summary.journal.entries.is_empty(),
337 "journal must have at least one entry after RunSummary::new()"
338 );
339 assert!(
340 matches!(
341 summary.journal.entries[0].event,
342 crate::journal::RunEvent::PlanResolved(_)
343 ),
344 "first journal event must be PlanResolved, got: {:?}",
345 summary.journal.entries[0].event
346 );
347 }
348
349 #[test]
352 fn run_summary_plan_snapshot_matches_plan_fields() {
353 let plan = minimal_plan();
354 let summary = RunSummary::new(&plan);
355
356 let snap = summary
357 .journal
358 .plan_snapshot()
359 .expect("plan_snapshot() must be Some after RunSummary::new()");
360
361 assert_eq!(snap.export_name, plan.export_name);
362 assert_eq!(snap.validate, plan.validate);
363 assert_eq!(snap.reconcile, plan.reconcile);
364 assert_eq!(snap.resume, plan.resume);
365 assert_eq!(snap.batch_size, plan.tuning.batch_size);
366 }
367
368 #[test]
370 fn run_summary_journal_run_id_matches_summary_run_id() {
371 let plan = minimal_plan();
372 let summary = RunSummary::new(&plan);
373 assert_eq!(
374 summary.journal.run_id, summary.run_id,
375 "journal run_id must match summary run_id"
376 );
377 }
378
379 #[test]
390 fn rejected_plan_produces_rejected_diagnostic_blocking_run_export_job() {
391 let mut plan = minimal_plan();
392 plan.destination.destination_type = DestinationType::Stdout;
394 plan.max_file_size_bytes = Some(10 * 1024 * 1024);
395
396 let diags = validate_plan(&plan);
397 let rejected_count = diags
398 .iter()
399 .filter(|d| d.level == DiagnosticLevel::Rejected)
400 .count();
401
402 assert!(
403 rejected_count > 0,
404 "stdout + max_file_size must produce a Rejected diagnostic so that \
405 run_export_job bails before calling run_with_reconnect; got: {:?}",
406 diags
407 .iter()
408 .map(|d| (&d.rule, &d.level))
409 .collect::<Vec<_>>()
410 );
411 }
412
413 #[test]
415 fn rejected_plan_stdout_chunked_blocks_run_export_job() {
416 use crate::plan::ChunkedPlan;
417 let mut plan = minimal_plan();
418 plan.destination.destination_type = DestinationType::Stdout;
419 plan.strategy = ExtractionStrategy::Chunked(ChunkedPlan {
420 column: "id".into(),
421 chunk_size: 1000,
422 chunk_count: None,
423 parallel: 1,
424 dense: false,
425 by_days: None,
426 max_attempts: 3,
427 checkpoint: false,
428 });
429
430 let diags = validate_plan(&plan);
431 assert!(
432 diags.iter().any(|d| d.level == DiagnosticLevel::Rejected),
433 "stdout + chunked must produce a Rejected diagnostic"
434 );
435 }
436
437 #[test]
444 fn synthetic_failed_summary_carries_error_and_status() {
445 let err = anyhow::anyhow!("could not connect to source: timeout");
446 let s = job::synthetic_failed_summary("orders", &err);
447 assert_eq!(s.export_name, "orders");
448 assert_eq!(s.status, "failed");
449 assert_eq!(
450 s.error_message.as_deref(),
451 Some("could not connect to source: timeout")
452 );
453 assert!(
454 s.run_id.starts_with("orders_"),
455 "run_id must be derived from export name, got {}",
456 s.run_id
457 );
458 assert_eq!(s.total_rows, 0);
460 assert_eq!(s.files_produced, 0);
461 assert_eq!(s.bytes_written, 0);
462 assert_eq!(s.duration_ms, 0);
463 }
464
465 #[test]
469 fn aggregate_entry_from_summary_copies_observable_fields() {
470 let plan = minimal_plan();
471 let mut summary = RunSummary::new(&plan);
472 summary.status = "success".into();
473 summary.total_rows = 12_345;
474 summary.files_produced = 3;
475 summary.bytes_written = 9_876_543;
476 summary.duration_ms = 5_000;
477
478 let entry = aggregate::entry_from_summary(&summary);
479 assert_eq!(entry.export_name, summary.export_name);
480 assert_eq!(entry.status, "success");
481 assert_eq!(entry.run_id, summary.run_id);
482 assert_eq!(entry.rows, 12_345);
483 assert_eq!(entry.files, 3);
484 assert_eq!(entry.bytes, 9_876_543);
485 assert_eq!(entry.duration_ms, 5_000);
486 assert_eq!(entry.mode, summary.mode);
487 assert_eq!(entry.error_message, None);
488 }
489}