Skip to main content

rivet_cli/pipeline/
single.rs

1use std::path::Path;
2use std::time::Duration;
3
4use super::RunSummary;
5use super::chunked::{run_chunked_sequential, run_chunked_sequential_checkpoint};
6use super::retry::classify_error;
7use super::sink::{CompletedPart, ExportSink, extract_last_cursor_value};
8use super::validate::validate_output;
9use crate::config::{ExportConfig, ExportMode, SourceConfig, TimeColumnType};
10use crate::error::Result;
11use crate::source::{self, Source};
12use crate::state::StateStore;
13use crate::tuning::SourceTuning;
14use crate::{destination, format};
15
16#[allow(clippy::too_many_arguments)]
17pub(crate) fn run_with_reconnect(
18    source_config: &SourceConfig,
19    state: &StateStore,
20    export: &ExportConfig,
21    tuning: &SourceTuning,
22    config_dir: &Path,
23    validate: bool,
24    summary: &mut RunSummary,
25    params: Option<&std::collections::HashMap<String, String>>,
26    resume: bool,
27    config_path: &str,
28) -> Result<()> {
29    let mut last_err: Option<anyhow::Error> = None;
30
31    for attempt in 0..=tuning.max_retries {
32        if attempt > 0 {
33            summary.retries = attempt;
34            let (_, needs_reconnect, extra_delay) = last_err
35                .as_ref()
36                .map(classify_error)
37                .unwrap_or((false, false, 0));
38            let backoff = tuning.retry_backoff_ms * 2u64.pow(attempt - 1) + extra_delay;
39            log::warn!(
40                "export '{}': retry {}/{} in {}ms{}({})",
41                export.name,
42                attempt,
43                tuning.max_retries,
44                backoff,
45                if needs_reconnect {
46                    " [reconnecting] "
47                } else {
48                    " "
49                },
50                last_err
51                    .as_ref()
52                    .map(|e: &anyhow::Error| format!("{:#}", e))
53                    .unwrap_or_default(),
54            );
55            std::thread::sleep(Duration::from_millis(backoff));
56        }
57
58        let mut src = match source::create_source(source_config) {
59            Ok(s) => s,
60            Err(e) => {
61                let (transient, _, _) = classify_error(&e);
62                if attempt < tuning.max_retries && transient {
63                    log::warn!(
64                        "export '{}': connection failed, will retry: {:#}",
65                        export.name,
66                        e
67                    );
68                    last_err = Some(e);
69                    continue;
70                }
71                return Err(e);
72            }
73        };
74
75        match run_export(
76            &mut *src,
77            source_config,
78            state,
79            export,
80            tuning,
81            config_dir,
82            validate,
83            summary,
84            params,
85            resume,
86            config_path,
87        ) {
88            Ok(()) => return Ok(()),
89            Err(e) => {
90                let (transient, _, _) = classify_error(&e);
91                if attempt < tuning.max_retries && transient {
92                    last_err = Some(e);
93                    continue;
94                }
95                return Err(e);
96            }
97        }
98    }
99
100    Err(last_err.unwrap_or_else(|| anyhow::anyhow!("export failed after retries")))
101}
102
103#[allow(clippy::too_many_arguments)]
104pub(crate) fn run_export(
105    src: &mut dyn Source,
106    source_config: &SourceConfig,
107    state: &StateStore,
108    export: &ExportConfig,
109    tuning: &SourceTuning,
110    config_dir: &Path,
111    validate: bool,
112    summary: &mut RunSummary,
113    params: Option<&std::collections::HashMap<String, String>>,
114    resume: bool,
115    config_path: &str,
116) -> Result<()> {
117    let base_query = export.resolve_query(config_dir, params)?;
118
119    match export.mode {
120        ExportMode::Full => {
121            run_single_export(
122                src,
123                &base_query,
124                None,
125                None,
126                export,
127                tuning,
128                validate,
129                Some(state),
130                summary,
131            )?;
132        }
133        ExportMode::Incremental => {
134            let cursor_state = state.get(&export.name)?;
135            let cursor_col = export.cursor_column.as_deref();
136            run_single_export(
137                src,
138                &base_query,
139                cursor_col,
140                Some(&cursor_state),
141                export,
142                tuning,
143                validate,
144                Some(state),
145                summary,
146            )?;
147        }
148        ExportMode::Chunked => {
149            if export.chunk_checkpoint {
150                run_chunked_sequential_checkpoint(
151                    src,
152                    source_config,
153                    state,
154                    &base_query,
155                    export,
156                    tuning,
157                    validate,
158                    summary,
159                    Some(state),
160                    resume,
161                    config_path,
162                )?;
163            } else {
164                run_chunked_sequential(
165                    src,
166                    &base_query,
167                    export,
168                    tuning,
169                    validate,
170                    summary,
171                    Some(state),
172                )?;
173            }
174        }
175        ExportMode::TimeWindow => {
176            let windowed_query = build_time_window_query(
177                &base_query,
178                export
179                    .time_column
180                    .as_deref()
181                    .expect("time_column required for TimeWindow mode"),
182                export.time_column_type,
183                export
184                    .days_window
185                    .expect("days_window required for TimeWindow mode"),
186            );
187            run_single_export(
188                src,
189                &windowed_query,
190                None,
191                None,
192                export,
193                tuning,
194                validate,
195                Some(state),
196                summary,
197            )?;
198        }
199    }
200
201    Ok(())
202}
203
204#[allow(clippy::too_many_arguments)]
205pub(super) fn run_single_export(
206    src: &mut dyn Source,
207    query: &str,
208    cursor_column: Option<&str>,
209    cursor: Option<&crate::types::CursorState>,
210    export: &ExportConfig,
211    tuning: &SourceTuning,
212    validate: bool,
213    state: Option<&StateStore>,
214    summary: &mut RunSummary,
215) -> Result<()> {
216    let mut sink = ExportSink::new(export)?;
217
218    src.export(query, cursor_column, cursor, tuning, &mut sink)?;
219
220    if let Some(w) = sink.writer.take() {
221        w.finish()?;
222    }
223
224    summary.total_rows += sink.total_rows as i64;
225    log::info!("export '{}': {} rows written", export.name, sink.total_rows);
226
227    if sink.total_rows == 0 {
228        if export.skip_empty {
229            summary.status = "skipped".into();
230            log::info!(
231                "export '{}': skipped (0 rows, skip_empty=true)",
232                export.name
233            );
234        } else {
235            log::info!("export '{}': no data to export", export.name);
236        }
237        return Ok(());
238    }
239
240    let quality_issues = sink.run_quality_checks();
241    if !quality_issues.is_empty() {
242        for issue in &quality_issues {
243            let level = match issue.severity {
244                crate::quality::Severity::Fail => "FAIL",
245                crate::quality::Severity::Warn => "WARN",
246            };
247            log::warn!("quality {}: {}", level, issue.message);
248        }
249        if quality_issues
250            .iter()
251            .any(|i| i.severity == crate::quality::Severity::Fail)
252        {
253            summary.quality_passed = Some(false);
254            anyhow::bail!("export '{}': quality checks failed", export.name);
255        }
256    }
257    if export.quality.is_some() {
258        summary.quality_passed = Some(true);
259    }
260
261    if sink.part_rows > 0 {
262        sink.completed_parts.push(CompletedPart {
263            tmp: std::mem::replace(&mut sink.tmp, tempfile::NamedTempFile::new()?),
264            rows: sink.part_rows,
265        });
266    }
267
268    let fmt = format::create_format(export.format, export.compression, export.compression_level);
269    let ext = fmt.file_extension();
270    let dest = destination::create_destination(&export.destination)?;
271    let has_parts = sink.completed_parts.len() > 1;
272    let ts = chrono::Utc::now().format("%Y%m%d_%H%M%S");
273
274    for (part_idx, part) in sink.completed_parts.iter().enumerate() {
275        if validate {
276            validate_output(part.tmp.path(), export.format, part.rows)?;
277            summary.validated = Some(true);
278        }
279
280        let file_bytes = std::fs::metadata(part.tmp.path())
281            .map(|m| m.len())
282            .unwrap_or(0);
283        summary.bytes_written += file_bytes;
284        summary.files_produced += 1;
285
286        let file_name = if has_parts {
287            format!("{}_{}_part{}.{}", export.name, ts, part_idx, ext)
288        } else {
289            format!("{}_{}.{}", export.name, ts, ext)
290        };
291        dest.write(part.tmp.path(), &file_name)?;
292
293        if let Some(st) = state {
294            let _ = st.record_file(
295                &summary.run_id,
296                &export.name,
297                &file_name,
298                part.rows as i64,
299                file_bytes as i64,
300                &format!("{:?}", export.format).to_lowercase(),
301                Some(&format!("{:?}", export.compression).to_lowercase()),
302            );
303        }
304    }
305
306    if export.mode == ExportMode::Incremental
307        && let (Some(cursor_col), Some(batch), Some(schema), Some(st)) =
308            (&export.cursor_column, &sink.last_batch, &sink.schema, state)
309        && let Some(last_val) = extract_last_cursor_value(batch, cursor_col, schema)
310    {
311        st.update(&export.name, &last_val)?;
312        log::info!("export '{}': cursor updated to '{}'", export.name, last_val);
313    }
314
315    if let (Some(schema), Some(st)) = (&sink.schema, state) {
316        let columns: Vec<crate::state::SchemaColumn> = schema
317            .fields()
318            .iter()
319            .map(|f| crate::state::SchemaColumn {
320                name: f.name().clone(),
321                data_type: format!("{:?}", f.data_type()),
322            })
323            .collect();
324
325        match st.detect_schema_change(&export.name, &columns) {
326            Ok(Some(change)) => {
327                summary.schema_changed = Some(true);
328                log::warn!("export '{}': schema changed!", export.name);
329                if !change.added.is_empty() {
330                    log::warn!("  added columns: {}", change.added.join(", "));
331                }
332                if !change.removed.is_empty() {
333                    log::warn!("  removed columns: {}", change.removed.join(", "));
334                }
335                for (col, old, new) in &change.type_changed {
336                    log::warn!("  type changed: {} ({} -> {})", col, old, new);
337                }
338            }
339            Ok(None) => {
340                summary.schema_changed = Some(false);
341            }
342            Err(e) => log::warn!("schema tracking error: {:#}", e),
343        }
344    }
345
346    log::info!("export '{}' completed successfully", export.name);
347    Ok(())
348}
349
350pub fn build_time_window_query(
351    base_query: &str,
352    time_column: &str,
353    time_type: TimeColumnType,
354    days_window: u32,
355) -> String {
356    let now = chrono::Utc::now();
357    let window_start = now - chrono::Duration::days(days_window as i64);
358    let truncated = window_start
359        .date_naive()
360        .and_hms_opt(0, 0, 0)
361        .expect("midnight is always valid");
362
363    let condition = match time_type {
364        TimeColumnType::Timestamp => {
365            format!(
366                "{} >= '{}'",
367                time_column,
368                truncated.format("%Y-%m-%d %H:%M:%S")
369            )
370        }
371        TimeColumnType::Unix => {
372            format!("{} >= {}", time_column, truncated.and_utc().timestamp())
373        }
374    };
375
376    format!(
377        "SELECT * FROM ({base}) AS _rivet WHERE {cond}",
378        base = base_query,
379        cond = condition,
380    )
381}
382
383#[cfg(test)]
384mod tests {
385    use super::*;
386
387    #[test]
388    fn test_build_time_window_timestamp() {
389        let q = build_time_window_query(
390            "SELECT * FROM events",
391            "created_at",
392            TimeColumnType::Timestamp,
393            7,
394        );
395        assert!(q.contains("created_at >= '"), "got: {}", q);
396        assert!(q.contains("_rivet WHERE"));
397    }
398
399    #[test]
400    fn test_build_time_window_unix() {
401        let q = build_time_window_query("SELECT * FROM events", "ts", TimeColumnType::Unix, 30);
402        assert!(q.contains("ts >= "), "got: {}", q);
403        assert!(!q.contains("'"), "unix should not have quotes, got: {}", q);
404    }
405}