1mod chunked;
2mod cli;
3mod retry;
4mod single;
5mod sink;
6mod validate;
7
8#[allow(unused_imports)]
9pub use chunked::generate_chunks;
10pub use cli::{
11 reset_chunk_checkpoint, reset_state, show_chunk_checkpoint, show_files, show_metrics,
12 show_state,
13};
14#[allow(unused_imports)]
15pub use retry::classify_error;
16#[allow(unused_imports)]
17pub use single::build_time_window_query;
18#[allow(unused_imports)]
19pub use validate::validate_output;
20
21#[cfg(test)]
22#[allow(unused_imports)]
23pub(crate) use retry::is_transient;
24
25use std::path::Path;
26
27use crate::config::{Config, ExportConfig, ExportMode};
28use crate::error::Result;
29use crate::state::StateStore;
30use crate::tuning::{SourceTuning, TuningProfile, merge_tuning_config};
31
32use chunked::run_chunked_parallel_checkpoint;
33use single::run_with_reconnect;
34
35#[derive(Debug, Clone)]
37pub struct RunSummary {
38 pub run_id: String,
39 pub export_name: String,
40 pub status: String,
41 pub total_rows: i64,
42 pub files_produced: usize,
43 pub bytes_written: u64,
44 pub duration_ms: i64,
45 pub peak_rss_mb: i64,
46 pub retries: u32,
47 pub validated: Option<bool>,
48 pub schema_changed: Option<bool>,
49 pub quality_passed: Option<bool>,
50 pub error_message: Option<String>,
51 pub tuning_profile: String,
53 pub batch_size: usize,
55 pub batch_size_memory_mb: Option<usize>,
57 pub format: String,
58 pub mode: String,
59 pub compression: String,
60 pub source_count: Option<i64>,
62 pub reconciled: Option<bool>,
64}
65
66impl RunSummary {
67 fn new(export: &ExportConfig, tuning: &SourceTuning, yaml_profile_label: &str) -> Self {
68 let run_id = format!(
69 "{}_{}",
70 export.name,
71 chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
72 );
73 Self {
74 run_id,
75 export_name: export.name.clone(),
76 status: "running".into(),
77 total_rows: 0,
78 files_produced: 0,
79 bytes_written: 0,
80 duration_ms: 0,
81 peak_rss_mb: 0,
82 retries: 0,
83 validated: None,
84 schema_changed: None,
85 quality_passed: None,
86 error_message: None,
87 tuning_profile: yaml_profile_label.to_string(),
88 batch_size: tuning.batch_size,
89 batch_size_memory_mb: tuning.batch_size_memory_mb,
90 format: format!("{:?}", export.format).to_lowercase(),
91 mode: format!("{:?}", export.mode).to_lowercase(),
92 compression: format!("{:?}", export.compression).to_lowercase(),
93 source_count: None,
94 reconciled: None,
95 }
96 }
97
98 fn print(&self) {
99 eprintln!();
100 eprintln!("── {} ──", self.export_name);
101 eprintln!(" run_id: {}", self.run_id);
102 eprintln!(" status: {}", self.status);
103 if let Some(mem) = self.batch_size_memory_mb {
104 eprintln!(
105 " tuning: profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
106 self.tuning_profile, self.batch_size, mem
107 );
108 } else {
109 eprintln!(
110 " tuning: profile={}, batch_size={}",
111 self.tuning_profile, self.batch_size
112 );
113 }
114 eprintln!(" rows: {}", self.total_rows);
115 eprintln!(" files: {}", self.files_produced);
116 if self.bytes_written > 0 {
117 eprintln!(" bytes: {}", format_bytes(self.bytes_written));
118 }
119 let dur = if self.duration_ms >= 1000 {
120 format!("{:.1}s", self.duration_ms as f64 / 1000.0)
121 } else {
122 format!("{}ms", self.duration_ms)
123 };
124 eprintln!(" duration: {}", dur);
125 if self.peak_rss_mb > 0 {
126 eprintln!(" peak RSS: {}MB (sampled during run)", self.peak_rss_mb);
127 }
128 if self.format == "parquet" && self.compression != "zstd" {
129 eprintln!(" compression: {}", self.compression);
130 }
131 if self.retries > 0 {
132 eprintln!(" retries: {}", self.retries);
133 }
134 if let Some(v) = self.validated {
135 eprintln!(" validated: {}", if v { "pass" } else { "FAIL" });
136 }
137 if let Some(sc) = self.schema_changed {
138 eprintln!(
139 " schema: {}",
140 if sc { "CHANGED" } else { "unchanged" }
141 );
142 }
143 if let Some(q) = self.quality_passed {
144 eprintln!(" quality: {}", if q { "pass" } else { "FAIL" });
145 }
146 if let Some(reconciled) = self.reconciled {
147 let src = self
148 .source_count
149 .map(|c| c.to_string())
150 .unwrap_or("?".into());
151 if reconciled {
152 eprintln!(" reconcile: MATCH ({}/{})", self.total_rows, src);
153 } else {
154 eprintln!(
155 " reconcile: MISMATCH (exported {} vs source {})",
156 self.total_rows, src
157 );
158 }
159 }
160 if let Some(err) = &self.error_message {
161 eprintln!(" error: {}", err);
162 }
163 }
164}
165
166fn run_chunked_quality_gate(
170 result: Result<()>,
171 export: &ExportConfig,
172 summary: &mut RunSummary,
173) -> Result<()> {
174 result?;
175
176 if export.mode != ExportMode::Chunked {
177 return Ok(());
178 }
179 let qc = match &export.quality {
180 Some(q) => q,
181 None => return Ok(()),
182 };
183
184 let total = summary.total_rows as usize;
185 let row_issues = crate::quality::check_row_count(total, qc);
186 let has_unsupported = !qc.null_ratio_max.is_empty() || !qc.unique_columns.is_empty();
187
188 if has_unsupported {
189 log::warn!(
190 "export '{}': quality checks null_ratio_max and unique_columns are not supported in chunked mode (each chunk processes independently); only row_count bounds are checked",
191 export.name
192 );
193 }
194
195 if !row_issues.is_empty() {
196 for issue in &row_issues {
197 log::warn!("quality FAIL: {}", issue.message);
198 }
199 summary.quality_passed = Some(false);
200 anyhow::bail!(
201 "export '{}': quality checks failed (chunked aggregate)",
202 export.name
203 );
204 }
205
206 summary.quality_passed = Some(true);
207 Ok(())
208}
209
210fn reconcile_source_count(
213 source_config: &crate::config::SourceConfig,
214 export: &ExportConfig,
215 params: Option<&std::collections::HashMap<String, String>>,
216 summary: &mut RunSummary,
217) {
218 use crate::config::ExportMode;
219
220 if export.mode == ExportMode::Incremental {
221 log::info!(
222 "reconcile: skipping for incremental export '{}' (cursor-based, count may differ)",
223 export.name
224 );
225 return;
226 }
227
228 let base_query = match &export.query {
229 Some(q) => q.clone(),
230 None => {
231 log::warn!(
232 "reconcile: export '{}' has no inline query, skipping",
233 export.name
234 );
235 return;
236 }
237 };
238 let mut query = base_query;
239 if let Some(p) = params {
240 for (k, v) in p {
241 query = query.replace(&format!("${{{}}}", k), v);
242 }
243 }
244
245 let count_sql = format!("SELECT COUNT(*) FROM ({}) AS _rivet_reconcile", query);
246 log::info!(
247 "reconcile: running source count query for '{}'",
248 export.name
249 );
250
251 let mut src = match crate::source::create_source(source_config) {
252 Ok(s) => s,
253 Err(e) => {
254 log::warn!("reconcile: could not connect to source: {:#}", e);
255 return;
256 }
257 };
258
259 match src.query_scalar(&count_sql) {
260 Ok(Some(val)) => {
261 if let Ok(count) = val.parse::<i64>() {
262 summary.source_count = Some(count);
263 summary.reconciled = Some(summary.total_rows == count);
264 if summary.total_rows != count {
265 log::warn!(
266 "reconcile MISMATCH for '{}': exported {} rows, source has {}",
267 export.name,
268 summary.total_rows,
269 count
270 );
271 } else {
272 log::info!(
273 "reconcile MATCH for '{}': {}/{}",
274 export.name,
275 summary.total_rows,
276 count
277 );
278 }
279 } else {
280 log::warn!(
281 "reconcile: could not parse count result '{}' as integer",
282 val
283 );
284 }
285 }
286 Ok(None) => {
287 log::warn!("reconcile: COUNT(*) returned NULL for '{}'", export.name);
288 }
289 Err(e) => {
290 log::warn!(
291 "reconcile: count query failed for '{}': {:#}",
292 export.name,
293 e
294 );
295 }
296 }
297}
298
299pub(crate) fn format_bytes(b: u64) -> String {
300 if b >= 1_073_741_824 {
301 format!("{:.1} GB", b as f64 / 1_073_741_824.0)
302 } else if b >= 1_048_576 {
303 format!("{:.1} MB", b as f64 / 1_048_576.0)
304 } else if b >= 1024 {
305 format!("{:.1} KB", b as f64 / 1024.0)
306 } else {
307 format!("{} B", b)
308 }
309}
310
311#[allow(clippy::too_many_arguments)]
312fn run_export_job(
313 config_path: &str,
314 config: &Config,
315 export: &ExportConfig,
316 state: &StateStore,
317 config_dir: &Path,
318 validate: bool,
319 reconcile: bool,
320 resume: bool,
321 params: Option<&std::collections::HashMap<String, String>>,
322) -> Result<()> {
323 let merged = merge_tuning_config(config.source.tuning.as_ref(), export.tuning.as_ref());
324 let tuning = SourceTuning::from_config(merged.as_ref());
325 let yaml_profile_label = match merged.as_ref().and_then(|t| t.profile) {
326 Some(TuningProfile::Fast) => "fast",
327 Some(TuningProfile::Balanced) => "balanced",
328 Some(TuningProfile::Safe) => "safe",
329 None => "balanced (default)",
330 };
331 log::info!(
332 "starting export '{}' (effective tuning: {})",
333 export.name,
334 tuning
335 );
336
337 let start = std::time::Instant::now();
338 let rss_before = crate::resource::get_rss_mb();
339 let rss_sampler = crate::resource::RssPeakSampler::start(rss_before, 100);
340 let mut summary = RunSummary::new(export, &tuning, yaml_profile_label);
341
342 let result = match export.mode {
343 ExportMode::Chunked if export.parallel > 1 && export.chunk_checkpoint => {
344 run_chunked_parallel_checkpoint(
345 config_path,
346 &config.source,
347 state,
348 export,
349 &tuning,
350 config_dir,
351 validate,
352 &mut summary,
353 params,
354 resume,
355 )
356 }
357 ExportMode::Chunked if export.parallel > 1 => chunked::run_chunked_parallel(
358 &config.source,
359 state,
360 export,
361 &tuning,
362 config_dir,
363 validate,
364 &mut summary,
365 params,
366 ),
367 _ => run_with_reconnect(
368 &config.source,
369 state,
370 export,
371 &tuning,
372 config_dir,
373 validate,
374 &mut summary,
375 params,
376 resume,
377 config_path,
378 ),
379 };
380
381 let rss_peak = rss_sampler.stop();
382 let rss_after = crate::resource::get_rss_mb();
383 summary.duration_ms = start.elapsed().as_millis() as i64;
384 summary.peak_rss_mb = rss_peak.max(rss_after).max(rss_before) as i64;
385
386 let tuning_class = tuning.profile_name().to_string();
387 let result = run_chunked_quality_gate(result, export, &mut summary);
388 let failed = result.is_err();
389 match &result {
390 Ok(()) => {
391 if summary.status == "running" {
392 summary.status = "success".into();
393 }
394 }
395 Err(e) => {
396 summary.status = "failed".into();
397 summary.error_message = Some(format!("{:#}", e));
398 log::error!("export '{}' failed: {:#}", export.name, e);
399 }
400 }
401
402 if reconcile && !failed {
403 reconcile_source_count(&config.source, export, params, &mut summary);
404 }
405
406 let _ = state.record_metric(
407 &summary.export_name,
408 &summary.run_id,
409 summary.duration_ms,
410 summary.total_rows,
411 Some(summary.peak_rss_mb),
412 &summary.status,
413 summary.error_message.as_deref(),
414 Some(&tuning_class),
415 Some(&summary.format),
416 Some(&summary.mode),
417 summary.files_produced as i64,
418 summary.bytes_written as i64,
419 summary.retries as i64,
420 summary.validated,
421 summary.schema_changed,
422 );
423
424 summary.print();
425 crate::notify::maybe_send(config.notifications.as_ref(), &summary);
426
427 if failed { result } else { Ok(()) }
428}
429
430fn run_exports_as_child_processes(
432 config_path: &str,
433 exports: &[&ExportConfig],
434 validate: bool,
435 reconcile: bool,
436 resume: bool,
437 params: Option<&std::collections::HashMap<String, String>>,
438) -> Result<()> {
439 use std::process::{Command, Stdio};
440
441 let exe = std::env::current_exe().map_err(|e| {
442 anyhow::anyhow!(
443 "failed to resolve rivet executable for child processes: {:#}",
444 e
445 )
446 })?;
447
448 let config_arg = std::path::Path::new(config_path)
449 .canonicalize()
450 .unwrap_or_else(|_| std::path::PathBuf::from(config_path));
451
452 log::info!(
453 "running {} exports as separate rivet processes (each child: single `--export`; SQLite state WAL allows concurrent writers)",
454 exports.len()
455 );
456
457 let mut children: Vec<(String, std::process::Child)> = Vec::with_capacity(exports.len());
458 for export in exports {
459 let mut cmd = Command::new(&exe);
460 cmd.arg("run")
461 .arg("--config")
462 .arg(&config_arg)
463 .arg("--export")
464 .arg(export.name.as_str());
465 if validate {
466 cmd.arg("--validate");
467 }
468 if reconcile {
469 cmd.arg("--reconcile");
470 }
471 if resume {
472 cmd.arg("--resume");
473 }
474 if let Some(p) = params {
475 for (k, v) in p {
476 cmd.arg("--param").arg(format!("{k}={v}"));
477 }
478 }
479 cmd.stdin(Stdio::null());
480 log::debug!("spawning child for export '{}': {:?}", export.name, cmd);
481 let child = cmd.spawn().map_err(|e| {
482 anyhow::anyhow!(
483 "failed to spawn rivet child for export '{}': {:#}",
484 export.name,
485 e
486 )
487 })?;
488 children.push((export.name.clone(), child));
489 }
490
491 let mut failures = Vec::new();
492 for (name, mut child) in children {
493 let status = match child.wait() {
494 Ok(s) => s,
495 Err(e) => {
496 failures.push(format!("export '{name}': wait failed: {e:#}"));
497 continue;
498 }
499 };
500 if !status.success() {
501 let code = status
502 .code()
503 .map(|c| c.to_string())
504 .unwrap_or_else(|| "signal".to_string());
505 failures.push(format!("export '{name}' exited with status {code}"));
506 }
507 }
508
509 if !failures.is_empty() {
510 anyhow::bail!("{}", failures.join("; "));
511 }
512 Ok(())
513}
514
515#[allow(clippy::too_many_arguments)]
516pub fn run(
517 config_path: &str,
518 export_name: Option<&str>,
519 validate: bool,
520 reconcile: bool,
521 resume: bool,
522 params: Option<&std::collections::HashMap<String, String>>,
523 parallel_exports_cli: bool,
524 parallel_export_processes_cli: bool,
525) -> Result<()> {
526 let config = Config::load_with_params(config_path, params)?;
527
528 let config_dir = Path::new(config_path)
529 .parent()
530 .unwrap_or(Path::new("."))
531 .to_path_buf();
532
533 let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
534 let e = config
535 .exports
536 .iter()
537 .find(|e| e.name == name)
538 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
539 vec![e]
540 } else {
541 config.exports.iter().collect()
542 };
543
544 let run_parallel_processes = (parallel_export_processes_cli
545 || config.parallel_export_processes)
546 && export_name.is_none()
547 && exports.len() > 1;
548
549 if run_parallel_processes {
550 return run_exports_as_child_processes(
551 config_path,
552 &exports,
553 validate,
554 reconcile,
555 resume,
556 params,
557 );
558 }
559
560 let run_parallel = (parallel_exports_cli || config.parallel_exports)
561 && export_name.is_none()
562 && exports.len() > 1;
563
564 if run_parallel {
565 log::info!(
566 "running {} exports in parallel (separate state DB connection per export)",
567 exports.len()
568 );
569 let mut export_errors: Vec<anyhow::Error> = Vec::new();
570 std::thread::scope(|s| {
571 let mut handles = Vec::new();
572 for &export in &exports {
573 handles.push(s.spawn(|| {
574 let state = StateStore::open(config_path).map_err(|e| {
575 anyhow::anyhow!(
576 "export '{}': failed to open state database: {:#}",
577 export.name,
578 e
579 )
580 })?;
581 run_export_job(
582 config_path,
583 &config,
584 export,
585 &state,
586 &config_dir,
587 validate,
588 reconcile,
589 resume,
590 params,
591 )
592 }));
593 }
594 for h in handles {
595 match h.join() {
596 Ok(Ok(())) => {}
597 Ok(Err(e)) => export_errors.push(e),
598 Err(payload) => std::panic::resume_unwind(payload),
599 }
600 }
601 });
602 if !export_errors.is_empty() {
603 let text = export_errors
604 .into_iter()
605 .map(|e| format!("{e:#}"))
606 .collect::<Vec<_>>()
607 .join("; ");
608 return Err(anyhow::anyhow!(text));
609 }
610 } else {
611 let state = StateStore::open(config_path)?;
612 let mut failures = Vec::new();
613 for export in &exports {
614 if let Err(e) = run_export_job(
615 config_path,
616 &config,
617 export,
618 &state,
619 &config_dir,
620 validate,
621 reconcile,
622 resume,
623 params,
624 ) {
625 failures.push(format!("{:#}", e));
626 }
627 }
628 if !failures.is_empty() {
629 anyhow::bail!("{}", failures.join("; "));
630 }
631 }
632
633 Ok(())
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639 use crate::config::{CompressionType, FormatType, MetaColumns, TimeColumnType};
640 use crate::tuning::SourceTuning;
641
642 #[test]
643 fn test_format_bytes() {
644 assert_eq!(format_bytes(500), "500 B");
645 assert_eq!(format_bytes(1024), "1.0 KB");
646 assert_eq!(format_bytes(1536), "1.5 KB");
647 assert_eq!(format_bytes(1_048_576), "1.0 MB");
648 assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
649 assert_eq!(format_bytes(2_684_354_560), "2.5 GB");
650 }
651
652 #[test]
653 fn format_bytes_boundary_values() {
654 assert_eq!(format_bytes(0), "0 B");
655 assert_eq!(format_bytes(1), "1 B");
656 assert_eq!(format_bytes(1023), "1023 B");
657 assert_eq!(format_bytes(1024), "1.0 KB");
658 assert_eq!(format_bytes(1025), "1.0 KB");
659 assert_eq!(format_bytes(1_048_575), "1024.0 KB");
660 assert_eq!(format_bytes(1_048_576), "1.0 MB");
661 assert_eq!(format_bytes(1_073_741_823), "1024.0 MB");
662 assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
663 }
664
665 #[test]
666 fn test_run_summary_fields() {
667 let export = ExportConfig {
668 name: "test_export".into(),
669 query: Some("SELECT 1".into()),
670 query_file: None,
671 mode: ExportMode::Full,
672 cursor_column: None,
673 chunk_column: None,
674 chunk_size: 100_000,
675 parallel: 1,
676 time_column: None,
677 time_column_type: TimeColumnType::Timestamp,
678 days_window: None,
679 format: FormatType::Parquet,
680 compression: CompressionType::default(),
681 compression_level: None,
682 skip_empty: false,
683 destination: crate::config::DestinationConfig {
684 destination_type: crate::config::DestinationType::Local,
685 bucket: None,
686 prefix: None,
687 path: Some("./out".into()),
688 region: None,
689 endpoint: None,
690 credentials_file: None,
691 access_key_env: None,
692 secret_key_env: None,
693 aws_profile: None,
694 allow_anonymous: false,
695 },
696 meta_columns: MetaColumns::default(),
697 quality: None,
698 max_file_size: None,
699 chunk_checkpoint: false,
700 chunk_max_attempts: None,
701 tuning: None,
702 chunk_dense: false,
703 };
704 let tuning = SourceTuning::from_config(None);
705 let summary = RunSummary::new(&export, &tuning, "balanced (default)");
706 assert_eq!(summary.export_name, "test_export");
707 assert_eq!(summary.status, "running");
708 assert_eq!(summary.total_rows, 0);
709 assert_eq!(summary.files_produced, 0);
710 assert_eq!(summary.tuning_profile, "balanced (default)");
711 assert_eq!(summary.batch_size, 10_000);
712 assert_eq!(summary.format, "parquet");
713 assert_eq!(summary.mode, "full");
714 assert!(
715 summary.run_id.starts_with("test_export_"),
716 "run_id should start with export name, got: {}",
717 summary.run_id
718 );
719 }
720}