Skip to main content

rivet_cli/pipeline/
mod.rs

1mod chunked;
2mod cli;
3mod retry;
4mod single;
5mod sink;
6mod validate;
7
8#[allow(unused_imports)]
9pub use chunked::generate_chunks;
10pub use cli::{
11    reset_chunk_checkpoint, reset_state, show_chunk_checkpoint, show_files, show_metrics,
12    show_state,
13};
14#[allow(unused_imports)]
15pub use retry::classify_error;
16#[allow(unused_imports)]
17pub use single::build_time_window_query;
18#[allow(unused_imports)]
19pub use validate::validate_output;
20
21#[cfg(test)]
22#[allow(unused_imports)]
23pub(crate) use retry::is_transient;
24
25use std::path::Path;
26
27use crate::config::{Config, ExportConfig, ExportMode};
28use crate::error::Result;
29use crate::state::StateStore;
30use crate::tuning::{SourceTuning, TuningProfile, merge_tuning_config};
31
32use chunked::run_chunked_parallel_checkpoint;
33use single::run_with_reconnect;
34
35/// Collects operational data during an export for end-of-run summary and metrics.
36#[derive(Debug, Clone)]
37pub struct RunSummary {
38    pub run_id: String,
39    pub export_name: String,
40    pub status: String,
41    pub total_rows: i64,
42    pub files_produced: usize,
43    pub bytes_written: u64,
44    pub duration_ms: i64,
45    pub peak_rss_mb: i64,
46    pub retries: u32,
47    pub validated: Option<bool>,
48    pub schema_changed: Option<bool>,
49    pub quality_passed: Option<bool>,
50    pub error_message: Option<String>,
51    /// `profile` from YAML, or `balanced (default)` if omitted.
52    pub tuning_profile: String,
53    /// Configured `batch_size` from YAML/profile (FETCH cap before `batch_size_memory_mb` override).
54    pub batch_size: usize,
55    /// When set, actual FETCH size is derived from schema (see logs / `SourceTuning::effective_batch_size`).
56    pub batch_size_memory_mb: Option<usize>,
57    pub format: String,
58    pub mode: String,
59    pub compression: String,
60    /// Source COUNT(*) result for reconciliation (None = not requested or not applicable).
61    pub source_count: Option<i64>,
62    /// Whether reconciliation passed (Some(true) = match, Some(false) = mismatch, None = skipped).
63    pub reconciled: Option<bool>,
64}
65
66impl RunSummary {
67    fn new(export: &ExportConfig, tuning: &SourceTuning, yaml_profile_label: &str) -> Self {
68        let run_id = format!(
69            "{}_{}",
70            export.name,
71            chrono::Utc::now().format("%Y%m%dT%H%M%S%.3f"),
72        );
73        Self {
74            run_id,
75            export_name: export.name.clone(),
76            status: "running".into(),
77            total_rows: 0,
78            files_produced: 0,
79            bytes_written: 0,
80            duration_ms: 0,
81            peak_rss_mb: 0,
82            retries: 0,
83            validated: None,
84            schema_changed: None,
85            quality_passed: None,
86            error_message: None,
87            tuning_profile: yaml_profile_label.to_string(),
88            batch_size: tuning.batch_size,
89            batch_size_memory_mb: tuning.batch_size_memory_mb,
90            format: format!("{:?}", export.format).to_lowercase(),
91            mode: format!("{:?}", export.mode).to_lowercase(),
92            compression: format!("{:?}", export.compression).to_lowercase(),
93            source_count: None,
94            reconciled: None,
95        }
96    }
97
98    fn print(&self) {
99        eprintln!();
100        eprintln!("── {} ──", self.export_name);
101        eprintln!("  run_id:      {}", self.run_id);
102        eprintln!("  status:      {}", self.status);
103        if let Some(mem) = self.batch_size_memory_mb {
104            eprintln!(
105                "  tuning:      profile={}, batch_size={} (batch_size_memory_mb={}MiB → effective FETCH in logs)",
106                self.tuning_profile, self.batch_size, mem
107            );
108        } else {
109            eprintln!(
110                "  tuning:      profile={}, batch_size={}",
111                self.tuning_profile, self.batch_size
112            );
113        }
114        eprintln!("  rows:        {}", self.total_rows);
115        eprintln!("  files:       {}", self.files_produced);
116        if self.bytes_written > 0 {
117            eprintln!("  bytes:       {}", format_bytes(self.bytes_written));
118        }
119        let dur = if self.duration_ms >= 1000 {
120            format!("{:.1}s", self.duration_ms as f64 / 1000.0)
121        } else {
122            format!("{}ms", self.duration_ms)
123        };
124        eprintln!("  duration:    {}", dur);
125        if self.peak_rss_mb > 0 {
126            eprintln!("  peak RSS:    {}MB (sampled during run)", self.peak_rss_mb);
127        }
128        if self.format == "parquet" && self.compression != "zstd" {
129            eprintln!("  compression: {}", self.compression);
130        }
131        if self.retries > 0 {
132            eprintln!("  retries:     {}", self.retries);
133        }
134        if let Some(v) = self.validated {
135            eprintln!("  validated:   {}", if v { "pass" } else { "FAIL" });
136        }
137        if let Some(sc) = self.schema_changed {
138            eprintln!(
139                "  schema:      {}",
140                if sc { "CHANGED" } else { "unchanged" }
141            );
142        }
143        if let Some(q) = self.quality_passed {
144            eprintln!("  quality:     {}", if q { "pass" } else { "FAIL" });
145        }
146        if let Some(reconciled) = self.reconciled {
147            let src = self
148                .source_count
149                .map(|c| c.to_string())
150                .unwrap_or("?".into());
151            if reconciled {
152                eprintln!("  reconcile:   MATCH ({}/{})", self.total_rows, src);
153            } else {
154                eprintln!(
155                    "  reconcile:   MISMATCH (exported {} vs source {})",
156                    self.total_rows, src
157                );
158            }
159        }
160        if let Some(err) = &self.error_message {
161            eprintln!("  error:       {}", err);
162        }
163    }
164}
165
166/// For chunked mode: quality checks (row_count, null_ratio, uniqueness) cannot run per-batch
167/// inside the chunk workers because each chunk has its own `ExportSink`. This gate runs
168/// row_count bounds after all chunks complete. Null/unique checks are warned-and-skipped.
169fn run_chunked_quality_gate(
170    result: Result<()>,
171    export: &ExportConfig,
172    summary: &mut RunSummary,
173) -> Result<()> {
174    result?;
175
176    if export.mode != ExportMode::Chunked {
177        return Ok(());
178    }
179    let qc = match &export.quality {
180        Some(q) => q,
181        None => return Ok(()),
182    };
183
184    let total = summary.total_rows as usize;
185    let row_issues = crate::quality::check_row_count(total, qc);
186    let has_unsupported = !qc.null_ratio_max.is_empty() || !qc.unique_columns.is_empty();
187
188    if has_unsupported {
189        log::warn!(
190            "export '{}': quality checks null_ratio_max and unique_columns are not supported in chunked mode (each chunk processes independently); only row_count bounds are checked",
191            export.name
192        );
193    }
194
195    if !row_issues.is_empty() {
196        for issue in &row_issues {
197            log::warn!("quality FAIL: {}", issue.message);
198        }
199        summary.quality_passed = Some(false);
200        anyhow::bail!(
201            "export '{}': quality checks failed (chunked aggregate)",
202            export.name
203        );
204    }
205
206    summary.quality_passed = Some(true);
207    Ok(())
208}
209
210/// Run `SELECT COUNT(*) FROM ({query})` against the source and compare with exported rows.
211/// Skips reconciliation for incremental exports that used a cursor (moving target).
212fn reconcile_source_count(
213    source_config: &crate::config::SourceConfig,
214    export: &ExportConfig,
215    params: Option<&std::collections::HashMap<String, String>>,
216    summary: &mut RunSummary,
217) {
218    use crate::config::ExportMode;
219
220    if export.mode == ExportMode::Incremental {
221        log::info!(
222            "reconcile: skipping for incremental export '{}' (cursor-based, count may differ)",
223            export.name
224        );
225        return;
226    }
227
228    let base_query = match &export.query {
229        Some(q) => q.clone(),
230        None => {
231            log::warn!(
232                "reconcile: export '{}' has no inline query, skipping",
233                export.name
234            );
235            return;
236        }
237    };
238    let mut query = base_query;
239    if let Some(p) = params {
240        for (k, v) in p {
241            query = query.replace(&format!("${{{}}}", k), v);
242        }
243    }
244
245    let count_sql = format!("SELECT COUNT(*) FROM ({}) AS _rivet_reconcile", query);
246    log::info!(
247        "reconcile: running source count query for '{}'",
248        export.name
249    );
250
251    let mut src = match crate::source::create_source(source_config) {
252        Ok(s) => s,
253        Err(e) => {
254            log::warn!("reconcile: could not connect to source: {:#}", e);
255            return;
256        }
257    };
258
259    match src.query_scalar(&count_sql) {
260        Ok(Some(val)) => {
261            if let Ok(count) = val.parse::<i64>() {
262                summary.source_count = Some(count);
263                summary.reconciled = Some(summary.total_rows == count);
264                if summary.total_rows != count {
265                    log::warn!(
266                        "reconcile MISMATCH for '{}': exported {} rows, source has {}",
267                        export.name,
268                        summary.total_rows,
269                        count
270                    );
271                } else {
272                    log::info!(
273                        "reconcile MATCH for '{}': {}/{}",
274                        export.name,
275                        summary.total_rows,
276                        count
277                    );
278                }
279            } else {
280                log::warn!(
281                    "reconcile: could not parse count result '{}' as integer",
282                    val
283                );
284            }
285        }
286        Ok(None) => {
287            log::warn!("reconcile: COUNT(*) returned NULL for '{}'", export.name);
288        }
289        Err(e) => {
290            log::warn!(
291                "reconcile: count query failed for '{}': {:#}",
292                export.name,
293                e
294            );
295        }
296    }
297}
298
299pub(crate) fn format_bytes(b: u64) -> String {
300    if b >= 1_073_741_824 {
301        format!("{:.1} GB", b as f64 / 1_073_741_824.0)
302    } else if b >= 1_048_576 {
303        format!("{:.1} MB", b as f64 / 1_048_576.0)
304    } else if b >= 1024 {
305        format!("{:.1} KB", b as f64 / 1024.0)
306    } else {
307        format!("{} B", b)
308    }
309}
310
311#[allow(clippy::too_many_arguments)]
312fn run_export_job(
313    config_path: &str,
314    config: &Config,
315    export: &ExportConfig,
316    state: &StateStore,
317    config_dir: &Path,
318    validate: bool,
319    reconcile: bool,
320    resume: bool,
321    params: Option<&std::collections::HashMap<String, String>>,
322) -> Result<()> {
323    let merged = merge_tuning_config(config.source.tuning.as_ref(), export.tuning.as_ref());
324    let tuning = SourceTuning::from_config(merged.as_ref());
325    let yaml_profile_label = match merged.as_ref().and_then(|t| t.profile) {
326        Some(TuningProfile::Fast) => "fast",
327        Some(TuningProfile::Balanced) => "balanced",
328        Some(TuningProfile::Safe) => "safe",
329        None => "balanced (default)",
330    };
331    log::info!(
332        "starting export '{}' (effective tuning: {})",
333        export.name,
334        tuning
335    );
336
337    let start = std::time::Instant::now();
338    let rss_before = crate::resource::get_rss_mb();
339    let rss_sampler = crate::resource::RssPeakSampler::start(rss_before, 100);
340    let mut summary = RunSummary::new(export, &tuning, yaml_profile_label);
341
342    let result = match export.mode {
343        ExportMode::Chunked if export.parallel > 1 && export.chunk_checkpoint => {
344            run_chunked_parallel_checkpoint(
345                config_path,
346                &config.source,
347                state,
348                export,
349                &tuning,
350                config_dir,
351                validate,
352                &mut summary,
353                params,
354                resume,
355            )
356        }
357        ExportMode::Chunked if export.parallel > 1 => chunked::run_chunked_parallel(
358            &config.source,
359            state,
360            export,
361            &tuning,
362            config_dir,
363            validate,
364            &mut summary,
365            params,
366        ),
367        _ => run_with_reconnect(
368            &config.source,
369            state,
370            export,
371            &tuning,
372            config_dir,
373            validate,
374            &mut summary,
375            params,
376            resume,
377            config_path,
378        ),
379    };
380
381    let rss_peak = rss_sampler.stop();
382    let rss_after = crate::resource::get_rss_mb();
383    summary.duration_ms = start.elapsed().as_millis() as i64;
384    summary.peak_rss_mb = rss_peak.max(rss_after).max(rss_before) as i64;
385
386    let tuning_class = tuning.profile_name().to_string();
387    let result = run_chunked_quality_gate(result, export, &mut summary);
388    let failed = result.is_err();
389    match &result {
390        Ok(()) => {
391            if summary.status == "running" {
392                summary.status = "success".into();
393            }
394        }
395        Err(e) => {
396            summary.status = "failed".into();
397            summary.error_message = Some(format!("{:#}", e));
398            log::error!("export '{}' failed: {:#}", export.name, e);
399        }
400    }
401
402    if reconcile && !failed {
403        reconcile_source_count(&config.source, export, params, &mut summary);
404    }
405
406    let _ = state.record_metric(
407        &summary.export_name,
408        &summary.run_id,
409        summary.duration_ms,
410        summary.total_rows,
411        Some(summary.peak_rss_mb),
412        &summary.status,
413        summary.error_message.as_deref(),
414        Some(&tuning_class),
415        Some(&summary.format),
416        Some(&summary.mode),
417        summary.files_produced as i64,
418        summary.bytes_written as i64,
419        summary.retries as i64,
420        summary.validated,
421        summary.schema_changed,
422    );
423
424    summary.print();
425    crate::notify::maybe_send(config.notifications.as_ref(), &summary);
426
427    if failed { result } else { Ok(()) }
428}
429
430/// Re-invoke this binary once per export. Children do not inherit parallel flags, so there is no recursion.
431fn run_exports_as_child_processes(
432    config_path: &str,
433    exports: &[&ExportConfig],
434    validate: bool,
435    reconcile: bool,
436    resume: bool,
437    params: Option<&std::collections::HashMap<String, String>>,
438) -> Result<()> {
439    use std::process::{Command, Stdio};
440
441    let exe = std::env::current_exe().map_err(|e| {
442        anyhow::anyhow!(
443            "failed to resolve rivet executable for child processes: {:#}",
444            e
445        )
446    })?;
447
448    let config_arg = std::path::Path::new(config_path)
449        .canonicalize()
450        .unwrap_or_else(|_| std::path::PathBuf::from(config_path));
451
452    log::info!(
453        "running {} exports as separate rivet processes (each child: single `--export`; SQLite state WAL allows concurrent writers)",
454        exports.len()
455    );
456
457    let mut children: Vec<(String, std::process::Child)> = Vec::with_capacity(exports.len());
458    for export in exports {
459        let mut cmd = Command::new(&exe);
460        cmd.arg("run")
461            .arg("--config")
462            .arg(&config_arg)
463            .arg("--export")
464            .arg(export.name.as_str());
465        if validate {
466            cmd.arg("--validate");
467        }
468        if reconcile {
469            cmd.arg("--reconcile");
470        }
471        if resume {
472            cmd.arg("--resume");
473        }
474        if let Some(p) = params {
475            for (k, v) in p {
476                cmd.arg("--param").arg(format!("{k}={v}"));
477            }
478        }
479        cmd.stdin(Stdio::null());
480        log::debug!("spawning child for export '{}': {:?}", export.name, cmd);
481        let child = cmd.spawn().map_err(|e| {
482            anyhow::anyhow!(
483                "failed to spawn rivet child for export '{}': {:#}",
484                export.name,
485                e
486            )
487        })?;
488        children.push((export.name.clone(), child));
489    }
490
491    let mut failures = Vec::new();
492    for (name, mut child) in children {
493        let status = match child.wait() {
494            Ok(s) => s,
495            Err(e) => {
496                failures.push(format!("export '{name}': wait failed: {e:#}"));
497                continue;
498            }
499        };
500        if !status.success() {
501            let code = status
502                .code()
503                .map(|c| c.to_string())
504                .unwrap_or_else(|| "signal".to_string());
505            failures.push(format!("export '{name}' exited with status {code}"));
506        }
507    }
508
509    if !failures.is_empty() {
510        anyhow::bail!("{}", failures.join("; "));
511    }
512    Ok(())
513}
514
515#[allow(clippy::too_many_arguments)]
516pub fn run(
517    config_path: &str,
518    export_name: Option<&str>,
519    validate: bool,
520    reconcile: bool,
521    resume: bool,
522    params: Option<&std::collections::HashMap<String, String>>,
523    parallel_exports_cli: bool,
524    parallel_export_processes_cli: bool,
525) -> Result<()> {
526    let config = Config::load_with_params(config_path, params)?;
527
528    let config_dir = Path::new(config_path)
529        .parent()
530        .unwrap_or(Path::new("."))
531        .to_path_buf();
532
533    let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
534        let e = config
535            .exports
536            .iter()
537            .find(|e| e.name == name)
538            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
539        vec![e]
540    } else {
541        config.exports.iter().collect()
542    };
543
544    let run_parallel_processes = (parallel_export_processes_cli
545        || config.parallel_export_processes)
546        && export_name.is_none()
547        && exports.len() > 1;
548
549    if run_parallel_processes {
550        return run_exports_as_child_processes(
551            config_path,
552            &exports,
553            validate,
554            reconcile,
555            resume,
556            params,
557        );
558    }
559
560    let run_parallel = (parallel_exports_cli || config.parallel_exports)
561        && export_name.is_none()
562        && exports.len() > 1;
563
564    if run_parallel {
565        log::info!(
566            "running {} exports in parallel (separate state DB connection per export)",
567            exports.len()
568        );
569        let mut export_errors: Vec<anyhow::Error> = Vec::new();
570        std::thread::scope(|s| {
571            let mut handles = Vec::new();
572            for &export in &exports {
573                handles.push(s.spawn(|| {
574                    let state = StateStore::open(config_path).map_err(|e| {
575                        anyhow::anyhow!(
576                            "export '{}': failed to open state database: {:#}",
577                            export.name,
578                            e
579                        )
580                    })?;
581                    run_export_job(
582                        config_path,
583                        &config,
584                        export,
585                        &state,
586                        &config_dir,
587                        validate,
588                        reconcile,
589                        resume,
590                        params,
591                    )
592                }));
593            }
594            for h in handles {
595                match h.join() {
596                    Ok(Ok(())) => {}
597                    Ok(Err(e)) => export_errors.push(e),
598                    Err(payload) => std::panic::resume_unwind(payload),
599                }
600            }
601        });
602        if !export_errors.is_empty() {
603            let text = export_errors
604                .into_iter()
605                .map(|e| format!("{e:#}"))
606                .collect::<Vec<_>>()
607                .join("; ");
608            return Err(anyhow::anyhow!(text));
609        }
610    } else {
611        let state = StateStore::open(config_path)?;
612        let mut failures = Vec::new();
613        for export in &exports {
614            if let Err(e) = run_export_job(
615                config_path,
616                &config,
617                export,
618                &state,
619                &config_dir,
620                validate,
621                reconcile,
622                resume,
623                params,
624            ) {
625                failures.push(format!("{:#}", e));
626            }
627        }
628        if !failures.is_empty() {
629            anyhow::bail!("{}", failures.join("; "));
630        }
631    }
632
633    Ok(())
634}
635
636#[cfg(test)]
637mod tests {
638    use super::*;
639    use crate::config::{CompressionType, FormatType, MetaColumns, TimeColumnType};
640    use crate::tuning::SourceTuning;
641
642    #[test]
643    fn test_format_bytes() {
644        assert_eq!(format_bytes(500), "500 B");
645        assert_eq!(format_bytes(1024), "1.0 KB");
646        assert_eq!(format_bytes(1536), "1.5 KB");
647        assert_eq!(format_bytes(1_048_576), "1.0 MB");
648        assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
649        assert_eq!(format_bytes(2_684_354_560), "2.5 GB");
650    }
651
652    #[test]
653    fn format_bytes_boundary_values() {
654        assert_eq!(format_bytes(0), "0 B");
655        assert_eq!(format_bytes(1), "1 B");
656        assert_eq!(format_bytes(1023), "1023 B");
657        assert_eq!(format_bytes(1024), "1.0 KB");
658        assert_eq!(format_bytes(1025), "1.0 KB");
659        assert_eq!(format_bytes(1_048_575), "1024.0 KB");
660        assert_eq!(format_bytes(1_048_576), "1.0 MB");
661        assert_eq!(format_bytes(1_073_741_823), "1024.0 MB");
662        assert_eq!(format_bytes(1_073_741_824), "1.0 GB");
663    }
664
665    #[test]
666    fn test_run_summary_fields() {
667        let export = ExportConfig {
668            name: "test_export".into(),
669            query: Some("SELECT 1".into()),
670            query_file: None,
671            mode: ExportMode::Full,
672            cursor_column: None,
673            chunk_column: None,
674            chunk_size: 100_000,
675            parallel: 1,
676            time_column: None,
677            time_column_type: TimeColumnType::Timestamp,
678            days_window: None,
679            format: FormatType::Parquet,
680            compression: CompressionType::default(),
681            compression_level: None,
682            skip_empty: false,
683            destination: crate::config::DestinationConfig {
684                destination_type: crate::config::DestinationType::Local,
685                bucket: None,
686                prefix: None,
687                path: Some("./out".into()),
688                region: None,
689                endpoint: None,
690                credentials_file: None,
691                access_key_env: None,
692                secret_key_env: None,
693                aws_profile: None,
694                allow_anonymous: false,
695            },
696            meta_columns: MetaColumns::default(),
697            quality: None,
698            max_file_size: None,
699            chunk_checkpoint: false,
700            chunk_max_attempts: None,
701            tuning: None,
702            chunk_dense: false,
703        };
704        let tuning = SourceTuning::from_config(None);
705        let summary = RunSummary::new(&export, &tuning, "balanced (default)");
706        assert_eq!(summary.export_name, "test_export");
707        assert_eq!(summary.status, "running");
708        assert_eq!(summary.total_rows, 0);
709        assert_eq!(summary.files_produced, 0);
710        assert_eq!(summary.tuning_profile, "balanced (default)");
711        assert_eq!(summary.batch_size, 10_000);
712        assert_eq!(summary.format, "parquet");
713        assert_eq!(summary.mode, "full");
714        assert!(
715            summary.run_id.starts_with("test_export_"),
716            "run_id should start with export name, got: {}",
717            summary.run_id
718        );
719    }
720}