1mod aggregate;
10mod apply_cmd;
11pub(crate) mod chunked;
12mod cli;
13mod commit;
14mod finalize;
15pub(crate) mod ipc;
16mod job;
17mod keyset;
18mod manifest_writer;
19mod parallel_children;
20pub(crate) mod parent_ui;
21mod plan_cmd;
22pub(crate) mod progress;
23mod reconcile_cmd;
24mod repair_cmd;
25pub(crate) mod report;
26mod resume_decisions;
27mod retry;
28mod run;
34mod run_store;
35mod single;
36mod sink;
37mod summary;
38mod validate;
39mod validate_cmd;
40mod validate_manifest;
41
42pub use apply_cmd::run_apply_command;
49pub use cli::{
50 reset_chunk_checkpoint, reset_chunk_checkpoints_stuck, reset_state, show_chunk_checkpoint,
51 show_files, show_journal, show_metrics, show_progression, show_state,
52};
53pub use plan_cmd::{PlanOutputFormat, run_plan_command};
54pub use reconcile_cmd::{ReconcileOutputFormat, run_reconcile_command};
55pub use repair_cmd::{RepairOutputFormat, RepairReportSource, run_repair_command};
56pub use validate_cmd::{ValidateOutputFormat, ValidateTarget, run_validate_command};
57
58pub use summary::RunSummary;
62
63pub(crate) use job::run_export_job_with_chunk_source;
66#[cfg(test)]
67#[allow(unused_imports)]
68pub(crate) use retry::is_transient;
69
70#[doc(hidden)]
85pub mod for_tests {
86 pub use super::chunked::generate_chunks;
87 pub use super::manifest_writer::{ManifestBuilder, WriteOutcome, write_manifest};
88 pub use super::report::{RunReport, report_dir, write_run_report};
89 pub use super::resume_decisions::{
90 PartDecision, QuarantineReason, ResumeDecision, ResumePlan, UntrackedDecision,
91 build_resume_plan,
92 };
93 pub use super::retry::{RetryClass, classify_error};
94 pub use super::validate::validate_output;
95 pub use super::validate_manifest::{
96 Failure as ManifestVerificationFailure, ManifestVerification, verify_at_destination,
97 };
98 pub use crate::plan::build_time_window_query;
99}
100
101#[doc(hidden)]
109#[allow(unused_imports)]
110pub use for_tests::{
111 ManifestBuilder, ManifestVerification, ManifestVerificationFailure, PartDecision,
112 QuarantineReason, ResumeDecision, ResumePlan, RetryClass, RunReport, UntrackedDecision,
113 WriteOutcome, build_resume_plan, build_time_window_query, classify_error, generate_chunks,
114 report_dir, validate_output, verify_at_destination, write_manifest, write_run_report,
115};
116
117pub use run::{RunOptions, run};
122#[allow(unused_imports)] pub(crate) use run::{multi_export_concurrent, multi_export_mode};
124
125pub(crate) fn format_bytes(b: u64) -> String {
126 if b >= 1_073_741_824 {
127 format!("{:.1} GB", b as f64 / 1_073_741_824.0)
128 } else if b >= 1_048_576 {
129 format!("{:.1} MB", b as f64 / 1_048_576.0)
130 } else if b >= 1024 {
131 format!("{:.1} KB", b as f64 / 1024.0)
132 } else {
133 format!("{} B", b)
134 }
135}
136
137pub(crate) fn strip_chunked_recovery_hint(msg: &str) -> (&str, bool) {
150 let mut pos = 0;
151 while let Some(off) = msg[pos..].find("; ") {
152 let abs = pos + off;
153 let tail = &msg[abs + 2..];
154 if tail.contains("`rivet ") {
155 return (&msg[..abs], true);
156 }
157 pos = abs + 2;
158 }
159 (msg, false)
160}
161
162pub(crate) fn clamp_line(s: &str, max_chars: usize) -> String {
168 if max_chars == 0 {
169 return String::new();
170 }
171 if s.chars().count() <= max_chars {
172 return s.to_string();
173 }
174 let keep = max_chars.saturating_sub(1);
175 let mut out: String = s.chars().take(keep).collect();
176 out.push('…');
177 out
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use crate::config::{SourceConfig, SourceType};
184 use crate::plan::{
185 CompressionType, DestinationConfig, DestinationType, DiagnosticLevel, ExtractionStrategy,
186 FormatType, MetaColumns, ResolvedRunPlan, validate_plan,
187 };
188 use crate::tuning::SourceTuning;
189
190 #[test]
191 fn test_format_bytes() {
192 assert_eq!(format_bytes(500), "500 B");
193 assert_eq!(format_bytes(1024), "1.0 KB");
194 assert_eq!(format_bytes(1536), "1.5 KB");
195 assert_eq!(format_bytes(1_048_576), "1.0 MB");
196 assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
197 assert_eq!(format_bytes(2_684_354_560), "2.5 GB");
198 }
199
200 #[test]
201 fn strip_chunked_recovery_hint_strips_use_form() {
202 let m = "export 'users': chunk checkpoint run 'users_x' still in progress; \
203 use `rivet run --config foo.yaml --export users --resume` or \
204 `rivet state reset-chunks --config foo.yaml --export users`";
205 let (cause, hinted) = strip_chunked_recovery_hint(m);
206 assert!(hinted);
207 assert_eq!(
208 cause,
209 "export 'users': chunk checkpoint run 'users_x' still in progress"
210 );
211 }
212
213 #[test]
214 fn strip_chunked_recovery_hint_strips_fix_errors_form() {
215 let m = "export 'a': chunk checkpoint incomplete (3 tasks not completed); \
216 fix errors and `rivet run --config c.yaml --export a --resume` or \
217 `rivet state reset-chunks --config c.yaml --export a`";
218 let (cause, hinted) = strip_chunked_recovery_hint(m);
219 assert!(hinted);
220 assert_eq!(
221 cause,
222 "export 'a': chunk checkpoint incomplete (3 tasks not completed)"
223 );
224 }
225
226 #[test]
227 fn strip_chunked_recovery_hint_passthrough_when_no_hint() {
228 let m = "export 'q': source connection refused; retry exhausted";
229 let (cause, hinted) = strip_chunked_recovery_hint(m);
230 assert!(!hinted);
231 assert_eq!(cause, m);
232 }
233
234 #[test]
235 fn clamp_line_truncates_with_ellipsis() {
236 assert_eq!(clamp_line("short", 80), "short");
237 assert_eq!(clamp_line("hello world", 8), "hello w…");
238 let s = "αβγδ".repeat(50);
239 let out = clamp_line(&s, 10);
240 assert_eq!(out.chars().count(), 10);
241 assert!(out.ends_with('…'));
242 }
243
244 #[test]
245 fn format_bytes_boundary_values() {
246 assert_eq!(format_bytes(0), "0 B");
247 assert_eq!(format_bytes(1), "1 B");
248 assert_eq!(format_bytes(1023), "1023 B");
249 assert_eq!(format_bytes(1024), "1.0 KB");
250 assert_eq!(format_bytes(1025), "1.0 KB");
251 assert_eq!(format_bytes(1_048_575), "1024.0 KB");
252 assert_eq!(format_bytes(1_048_576), "1.0 MB");
253 assert_eq!(format_bytes(1_073_741_823), "1024.0 MB");
254 assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
255 }
256
257 fn minimal_plan() -> ResolvedRunPlan {
258 ResolvedRunPlan {
259 export_name: "test_export".into(),
260 base_query: "SELECT 1".into(),
261 strategy: ExtractionStrategy::Snapshot,
262 format: FormatType::Parquet,
263 compression: CompressionType::default(),
264 compression_level: None,
265 max_file_size_bytes: None,
266 skip_empty: false,
267 meta_columns: MetaColumns::default(),
268 destination: DestinationConfig {
269 destination_type: DestinationType::Local,
270 path: Some("./out".into()),
271 ..Default::default()
272 },
273 quality: None,
274 tuning: SourceTuning::from_config(None),
275 tuning_profile_label: "balanced (default)".into(),
276 validate: false,
277 reconcile: false,
278 resume: false,
279 source: SourceConfig {
280 source_type: SourceType::Postgres,
281 url: Some("postgresql://localhost/test".into()),
282 url_env: None,
283 url_file: None,
284 host: None,
285 port: None,
286 user: None,
287 password: None,
288 password_env: None,
289 database: None,
290 environment: None,
291 tuning: None,
292 tls: None,
293 },
294 column_overrides: Default::default(),
295 schema_drift_policy: Default::default(),
296 shape_drift_warn_factor: 2.0,
297 parquet: None,
298 }
299 }
300
301 #[test]
302 fn test_run_summary_fields() {
303 let plan = minimal_plan();
304 let summary = RunSummary::new(&plan);
305 assert_eq!(summary.export_name, "test_export");
306 assert_eq!(summary.status, "running");
307 assert_eq!(summary.total_rows, 0);
308 assert_eq!(summary.files_produced, 0);
309 assert_eq!(summary.tuning_profile, "balanced (default)");
310 assert_eq!(summary.batch_size, 10_000);
311 assert_eq!(summary.format, "parquet");
312 assert_eq!(summary.mode, "full");
313 assert!(
314 summary.run_id.starts_with("test_export_"),
315 "run_id should start with export name, got: {}",
316 summary.run_id
317 );
318 }
319
320 #[test]
325 fn run_summary_new_records_plan_resolved_as_first_event() {
326 let plan = minimal_plan();
327 let summary = RunSummary::new(&plan);
328
329 assert!(
330 !summary.journal.entries.is_empty(),
331 "journal must have at least one entry after RunSummary::new()"
332 );
333 assert!(
334 matches!(
335 summary.journal.entries[0].event,
336 crate::journal::RunEvent::PlanResolved(_)
337 ),
338 "first journal event must be PlanResolved, got: {:?}",
339 summary.journal.entries[0].event
340 );
341 }
342
343 #[test]
346 fn run_summary_plan_snapshot_matches_plan_fields() {
347 let plan = minimal_plan();
348 let summary = RunSummary::new(&plan);
349
350 let snap = summary
351 .journal
352 .plan_snapshot()
353 .expect("plan_snapshot() must be Some after RunSummary::new()");
354
355 assert_eq!(snap.export_name, plan.export_name);
356 assert_eq!(snap.validate, plan.validate);
357 assert_eq!(snap.reconcile, plan.reconcile);
358 assert_eq!(snap.resume, plan.resume);
359 assert_eq!(snap.batch_size, plan.tuning.batch_size);
360 }
361
362 #[test]
364 fn run_summary_journal_run_id_matches_summary_run_id() {
365 let plan = minimal_plan();
366 let summary = RunSummary::new(&plan);
367 assert_eq!(
368 summary.journal.run_id, summary.run_id,
369 "journal run_id must match summary run_id"
370 );
371 }
372
373 #[test]
384 fn rejected_plan_produces_rejected_diagnostic_blocking_run_export_job() {
385 let mut plan = minimal_plan();
386 plan.destination.destination_type = DestinationType::Stdout;
388 plan.max_file_size_bytes = Some(10 * 1024 * 1024);
389
390 let diags = validate_plan(&plan);
391 let rejected_count = diags
392 .iter()
393 .filter(|d| d.level == DiagnosticLevel::Rejected)
394 .count();
395
396 assert!(
397 rejected_count > 0,
398 "stdout + max_file_size must produce a Rejected diagnostic so that \
399 run_export_job bails before calling run_with_reconnect; got: {:?}",
400 diags
401 .iter()
402 .map(|d| (&d.rule, &d.level))
403 .collect::<Vec<_>>()
404 );
405 }
406
407 #[test]
409 fn rejected_plan_stdout_chunked_blocks_run_export_job() {
410 use crate::plan::ChunkedPlan;
411 let mut plan = minimal_plan();
412 plan.destination.destination_type = DestinationType::Stdout;
413 plan.strategy = ExtractionStrategy::Chunked(ChunkedPlan {
414 column: "id".into(),
415 chunk_size: 1000,
416 chunk_count: None,
417 parallel: 1,
418 dense: false,
419 by_days: None,
420 max_attempts: 3,
421 checkpoint: false,
422 });
423
424 let diags = validate_plan(&plan);
425 assert!(
426 diags.iter().any(|d| d.level == DiagnosticLevel::Rejected),
427 "stdout + chunked must produce a Rejected diagnostic"
428 );
429 }
430
431 #[test]
438 fn synthetic_failed_summary_carries_error_and_status() {
439 let err = anyhow::anyhow!("could not connect to source: timeout");
440 let s = job::synthetic_failed_summary("orders", &err);
441 assert_eq!(s.export_name, "orders");
442 assert_eq!(s.status, "failed");
443 assert_eq!(
444 s.error_message.as_deref(),
445 Some("could not connect to source: timeout")
446 );
447 assert!(
448 s.run_id.starts_with("orders_"),
449 "run_id must be derived from export name, got {}",
450 s.run_id
451 );
452 assert_eq!(s.total_rows, 0);
454 assert_eq!(s.files_produced, 0);
455 assert_eq!(s.bytes_written, 0);
456 assert_eq!(s.duration_ms, 0);
457 }
458
459 #[test]
463 fn aggregate_entry_from_summary_copies_observable_fields() {
464 let plan = minimal_plan();
465 let mut summary = RunSummary::new(&plan);
466 summary.status = "success".into();
467 summary.total_rows = 12_345;
468 summary.files_produced = 3;
469 summary.bytes_written = 9_876_543;
470 summary.duration_ms = 5_000;
471
472 let entry = aggregate::entry_from_summary(&summary);
473 assert_eq!(entry.export_name, summary.export_name);
474 assert_eq!(entry.status, "success");
475 assert_eq!(entry.run_id, summary.run_id);
476 assert_eq!(entry.rows, 12_345);
477 assert_eq!(entry.files, 3);
478 assert_eq!(entry.bytes, 9_876_543);
479 assert_eq!(entry.duration_ms, 5_000);
480 assert_eq!(entry.mode, summary.mode);
481 assert_eq!(entry.error_message, None);
482 }
483}