1mod aggregate;
10mod apply_cmd;
11pub(crate) mod chunked;
12mod cli;
13mod commit;
14mod finalize;
15pub(crate) mod ipc;
16mod job;
17mod keyset;
18mod manifest_reconcile;
19mod manifest_writer;
20mod parallel_children;
21pub(crate) mod parent_ui;
22mod plan_cmd;
23pub(crate) mod progress;
24mod reconcile_cmd;
25mod repair_cmd;
26pub(crate) mod report;
27mod resume_decisions;
28mod retry;
29mod run;
35mod run_store;
36mod single;
37mod sink;
38mod summary;
39mod validate;
40mod validate_cmd;
41mod validate_manifest;
42
43pub use apply_cmd::run_apply_command;
50pub use cli::{
51 reset_chunk_checkpoint, reset_chunk_checkpoints_stuck, reset_state, show_chunk_checkpoint,
52 show_files, show_journal, show_metrics, show_progression, show_state,
53};
54pub use plan_cmd::{PlanOutputFormat, run_plan_command};
55pub use reconcile_cmd::{ReconcileOutputFormat, run_reconcile_command};
56pub use repair_cmd::{RepairOutputFormat, RepairReportSource, run_repair_command};
57pub use validate_cmd::{ValidateOutputFormat, ValidateTarget, run_validate_command};
58
59pub use summary::RunSummary;
63
64pub(crate) use job::run_export_job_with_chunk_source;
67#[cfg(test)]
68#[allow(unused_imports)]
69pub(crate) use retry::is_transient;
70
71#[doc(hidden)]
86pub mod for_tests {
87 pub use super::chunked::generate_chunks;
88 pub use super::manifest_writer::{ManifestBuilder, WriteOutcome, write_manifest};
89 pub use super::report::{RunReport, report_dir, write_run_report};
90 pub use super::resume_decisions::{
91 PartDecision, QuarantineReason, ResumeDecision, ResumePlan, UntrackedDecision,
92 build_resume_plan,
93 };
94 pub use super::retry::{RetryClass, classify_error};
95 pub use super::validate::validate_output;
96 pub use super::validate_manifest::{
97 Failure as ManifestVerificationFailure, ManifestVerification, verify_at_destination,
98 };
99 pub use crate::plan::build_time_window_query;
100}
101
102#[doc(hidden)]
110#[allow(unused_imports)]
111pub use for_tests::{
112 ManifestBuilder, ManifestVerification, ManifestVerificationFailure, PartDecision,
113 QuarantineReason, ResumeDecision, ResumePlan, RetryClass, RunReport, UntrackedDecision,
114 WriteOutcome, build_resume_plan, build_time_window_query, classify_error, generate_chunks,
115 report_dir, validate_output, verify_at_destination, write_manifest, write_run_report,
116};
117
118pub use run::{RunOptions, run};
123#[allow(unused_imports)] pub(crate) use run::{multi_export_concurrent, multi_export_mode};
125
126pub(crate) fn format_bytes(b: u64) -> String {
127 if b >= 1_073_741_824 {
128 format!("{:.1} GB", b as f64 / 1_073_741_824.0)
129 } else if b >= 1_048_576 {
130 format!("{:.1} MB", b as f64 / 1_048_576.0)
131 } else if b >= 1024 {
132 format!("{:.1} KB", b as f64 / 1024.0)
133 } else {
134 format!("{} B", b)
135 }
136}
137
138pub(crate) fn strip_chunked_recovery_hint(msg: &str) -> (&str, bool) {
151 let mut pos = 0;
152 while let Some(off) = msg[pos..].find("; ") {
153 let abs = pos + off;
154 let tail = &msg[abs + 2..];
155 if tail.contains("`rivet ") {
156 return (&msg[..abs], true);
157 }
158 pos = abs + 2;
159 }
160 (msg, false)
161}
162
163pub(crate) fn clamp_line(s: &str, max_chars: usize) -> String {
169 if max_chars == 0 {
170 return String::new();
171 }
172 if s.chars().count() <= max_chars {
173 return s.to_string();
174 }
175 let keep = max_chars.saturating_sub(1);
176 let mut out: String = s.chars().take(keep).collect();
177 out.push('…');
178 out
179}
180
181#[cfg(test)]
182mod tests {
183 use super::*;
184 use crate::config::{SourceConfig, SourceType};
185 use crate::plan::{
186 CompressionType, DestinationConfig, DestinationType, DiagnosticLevel, ExtractionStrategy,
187 FormatType, MetaColumns, ResolvedRunPlan, validate_plan,
188 };
189 use crate::tuning::SourceTuning;
190
191 #[test]
192 fn test_format_bytes() {
193 assert_eq!(format_bytes(500), "500 B");
194 assert_eq!(format_bytes(1024), "1.0 KB");
195 assert_eq!(format_bytes(1536), "1.5 KB");
196 assert_eq!(format_bytes(1_048_576), "1.0 MB");
197 assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
198 assert_eq!(format_bytes(2_684_354_560), "2.5 GB");
199 }
200
201 #[test]
202 fn strip_chunked_recovery_hint_strips_use_form() {
203 let m = "export 'users': chunk checkpoint run 'users_x' still in progress; \
204 use `rivet run --config foo.yaml --export users --resume` or \
205 `rivet state reset-chunks --config foo.yaml --export users`";
206 let (cause, hinted) = strip_chunked_recovery_hint(m);
207 assert!(hinted);
208 assert_eq!(
209 cause,
210 "export 'users': chunk checkpoint run 'users_x' still in progress"
211 );
212 }
213
214 #[test]
215 fn strip_chunked_recovery_hint_strips_fix_errors_form() {
216 let m = "export 'a': chunk checkpoint incomplete (3 tasks not completed); \
217 fix errors and `rivet run --config c.yaml --export a --resume` or \
218 `rivet state reset-chunks --config c.yaml --export a`";
219 let (cause, hinted) = strip_chunked_recovery_hint(m);
220 assert!(hinted);
221 assert_eq!(
222 cause,
223 "export 'a': chunk checkpoint incomplete (3 tasks not completed)"
224 );
225 }
226
227 #[test]
228 fn strip_chunked_recovery_hint_passthrough_when_no_hint() {
229 let m = "export 'q': source connection refused; retry exhausted";
230 let (cause, hinted) = strip_chunked_recovery_hint(m);
231 assert!(!hinted);
232 assert_eq!(cause, m);
233 }
234
235 #[test]
236 fn clamp_line_truncates_with_ellipsis() {
237 assert_eq!(clamp_line("short", 80), "short");
238 assert_eq!(clamp_line("hello world", 8), "hello w…");
239 let s = "αβγδ".repeat(50);
240 let out = clamp_line(&s, 10);
241 assert_eq!(out.chars().count(), 10);
242 assert!(out.ends_with('…'));
243 }
244
245 #[test]
246 fn format_bytes_boundary_values() {
247 assert_eq!(format_bytes(0), "0 B");
248 assert_eq!(format_bytes(1), "1 B");
249 assert_eq!(format_bytes(1023), "1023 B");
250 assert_eq!(format_bytes(1024), "1.0 KB");
251 assert_eq!(format_bytes(1025), "1.0 KB");
252 assert_eq!(format_bytes(1_048_575), "1024.0 KB");
253 assert_eq!(format_bytes(1_048_576), "1.0 MB");
254 assert_eq!(format_bytes(1_073_741_823), "1024.0 MB");
255 assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
256 }
257
258 fn minimal_plan() -> ResolvedRunPlan {
259 ResolvedRunPlan {
260 export_name: "test_export".into(),
261 base_query: "SELECT 1".into(),
262 strategy: ExtractionStrategy::Snapshot,
263 format: FormatType::Parquet,
264 compression: CompressionType::default(),
265 compression_level: None,
266 max_file_size_bytes: None,
267 skip_empty: false,
268 meta_columns: MetaColumns::default(),
269 destination: DestinationConfig {
270 destination_type: DestinationType::Local,
271 path: Some("./out".into()),
272 ..Default::default()
273 },
274 quality: None,
275 tuning: SourceTuning::from_config(None),
276 tuning_profile_label: "balanced (default)".into(),
277 validate: false,
278 reconcile: false,
279 resume: false,
280 source: SourceConfig {
281 source_type: SourceType::Postgres,
282 url: Some("postgresql://localhost/test".into()),
283 url_env: None,
284 url_file: None,
285 host: None,
286 port: None,
287 user: None,
288 password: None,
289 password_env: None,
290 database: None,
291 environment: None,
292 tuning: None,
293 tls: None,
294 },
295 column_overrides: Default::default(),
296 verify: crate::config::VerifyMode::Size,
297 schema_drift_policy: Default::default(),
298 shape_drift_warn_factor: 2.0,
299 parquet: None,
300 }
301 }
302
303 #[test]
304 fn test_run_summary_fields() {
305 let plan = minimal_plan();
306 let summary = RunSummary::new(&plan);
307 assert_eq!(summary.export_name, "test_export");
308 assert_eq!(summary.status, "running");
309 assert_eq!(summary.total_rows, 0);
310 assert_eq!(summary.files_produced, 0);
311 assert_eq!(summary.tuning_profile, "balanced (default)");
312 assert_eq!(summary.batch_size, 10_000);
313 assert_eq!(summary.format, "parquet");
314 assert_eq!(summary.mode, "full");
315 assert!(
316 summary.run_id.starts_with("test_export_"),
317 "run_id should start with export name, got: {}",
318 summary.run_id
319 );
320 }
321
322 #[test]
327 fn run_summary_new_records_plan_resolved_as_first_event() {
328 let plan = minimal_plan();
329 let summary = RunSummary::new(&plan);
330
331 assert!(
332 !summary.journal.entries.is_empty(),
333 "journal must have at least one entry after RunSummary::new()"
334 );
335 assert!(
336 matches!(
337 summary.journal.entries[0].event,
338 crate::journal::RunEvent::PlanResolved(_)
339 ),
340 "first journal event must be PlanResolved, got: {:?}",
341 summary.journal.entries[0].event
342 );
343 }
344
345 #[test]
348 fn run_summary_plan_snapshot_matches_plan_fields() {
349 let plan = minimal_plan();
350 let summary = RunSummary::new(&plan);
351
352 let snap = summary
353 .journal
354 .plan_snapshot()
355 .expect("plan_snapshot() must be Some after RunSummary::new()");
356
357 assert_eq!(snap.export_name, plan.export_name);
358 assert_eq!(snap.validate, plan.validate);
359 assert_eq!(snap.reconcile, plan.reconcile);
360 assert_eq!(snap.resume, plan.resume);
361 assert_eq!(snap.batch_size, plan.tuning.batch_size);
362 }
363
364 #[test]
366 fn run_summary_journal_run_id_matches_summary_run_id() {
367 let plan = minimal_plan();
368 let summary = RunSummary::new(&plan);
369 assert_eq!(
370 summary.journal.run_id, summary.run_id,
371 "journal run_id must match summary run_id"
372 );
373 }
374
375 #[test]
386 fn rejected_plan_produces_rejected_diagnostic_blocking_run_export_job() {
387 let mut plan = minimal_plan();
388 plan.destination.destination_type = DestinationType::Stdout;
390 plan.max_file_size_bytes = Some(10 * 1024 * 1024);
391
392 let diags = validate_plan(&plan);
393 let rejected_count = diags
394 .iter()
395 .filter(|d| d.level == DiagnosticLevel::Rejected)
396 .count();
397
398 assert!(
399 rejected_count > 0,
400 "stdout + max_file_size must produce a Rejected diagnostic so that \
401 run_export_job bails before calling run_with_reconnect; got: {:?}",
402 diags
403 .iter()
404 .map(|d| (&d.rule, &d.level))
405 .collect::<Vec<_>>()
406 );
407 }
408
409 #[test]
411 fn rejected_plan_stdout_chunked_blocks_run_export_job() {
412 use crate::plan::ChunkedPlan;
413 let mut plan = minimal_plan();
414 plan.destination.destination_type = DestinationType::Stdout;
415 plan.strategy = ExtractionStrategy::Chunked(ChunkedPlan {
416 column: "id".into(),
417 chunk_size: 1000,
418 chunk_count: None,
419 parallel: 1,
420 dense: false,
421 by_days: None,
422 max_attempts: 3,
423 checkpoint: false,
424 });
425
426 let diags = validate_plan(&plan);
427 assert!(
428 diags.iter().any(|d| d.level == DiagnosticLevel::Rejected),
429 "stdout + chunked must produce a Rejected diagnostic"
430 );
431 }
432
433 #[test]
440 fn synthetic_failed_summary_carries_error_and_status() {
441 let err = anyhow::anyhow!("could not connect to source: timeout");
442 let s = job::synthetic_failed_summary("orders", &err);
443 assert_eq!(s.export_name, "orders");
444 assert_eq!(s.status, "failed");
445 assert_eq!(
446 s.error_message.as_deref(),
447 Some("could not connect to source: timeout")
448 );
449 assert!(
450 s.run_id.starts_with("orders_"),
451 "run_id must be derived from export name, got {}",
452 s.run_id
453 );
454 assert_eq!(s.total_rows, 0);
456 assert_eq!(s.files_produced, 0);
457 assert_eq!(s.bytes_written, 0);
458 assert_eq!(s.duration_ms, 0);
459 }
460
461 #[test]
465 fn aggregate_entry_from_summary_copies_observable_fields() {
466 let plan = minimal_plan();
467 let mut summary = RunSummary::new(&plan);
468 summary.status = "success".into();
469 summary.total_rows = 12_345;
470 summary.files_produced = 3;
471 summary.bytes_written = 9_876_543;
472 summary.duration_ms = 5_000;
473
474 let entry = aggregate::entry_from_summary(&summary);
475 assert_eq!(entry.export_name, summary.export_name);
476 assert_eq!(entry.status, "success");
477 assert_eq!(entry.run_id, summary.run_id);
478 assert_eq!(entry.rows, 12_345);
479 assert_eq!(entry.files, 3);
480 assert_eq!(entry.bytes, 9_876_543);
481 assert_eq!(entry.duration_ms, 5_000);
482 assert_eq!(entry.mode, summary.mode);
483 assert_eq!(entry.error_message, None);
484 }
485}