1use std::path::Path;
2use std::time::Duration;
3
4use super::RunSummary;
5use super::chunked::{run_chunked_sequential, run_chunked_sequential_checkpoint};
6use super::retry::classify_error;
7use super::sink::{CompletedPart, ExportSink, extract_last_cursor_value};
8use super::validate::validate_output;
9use crate::config::{ExportConfig, ExportMode, SourceConfig, TimeColumnType};
10use crate::error::Result;
11use crate::source::{self, Source};
12use crate::state::StateStore;
13use crate::tuning::SourceTuning;
14use crate::{destination, format};
15
16#[allow(clippy::too_many_arguments)]
17pub(crate) fn run_with_reconnect(
18 source_config: &SourceConfig,
19 state: &StateStore,
20 export: &ExportConfig,
21 tuning: &SourceTuning,
22 config_dir: &Path,
23 validate: bool,
24 summary: &mut RunSummary,
25 params: Option<&std::collections::HashMap<String, String>>,
26 resume: bool,
27 config_path: &str,
28) -> Result<()> {
29 let mut last_err: Option<anyhow::Error> = None;
30
31 for attempt in 0..=tuning.max_retries {
32 if attempt > 0 {
33 summary.retries = attempt;
34 let (_, needs_reconnect, extra_delay) = last_err
35 .as_ref()
36 .map(classify_error)
37 .unwrap_or((false, false, 0));
38 let backoff = tuning.retry_backoff_ms * 2u64.pow(attempt - 1) + extra_delay;
39 log::warn!(
40 "export '{}': retry {}/{} in {}ms{}({})",
41 export.name,
42 attempt,
43 tuning.max_retries,
44 backoff,
45 if needs_reconnect {
46 " [reconnecting] "
47 } else {
48 " "
49 },
50 last_err
51 .as_ref()
52 .map(|e: &anyhow::Error| format!("{:#}", e))
53 .unwrap_or_default(),
54 );
55 std::thread::sleep(Duration::from_millis(backoff));
56 }
57
58 let mut src = match source::create_source(source_config) {
59 Ok(s) => s,
60 Err(e) => {
61 let (transient, _, _) = classify_error(&e);
62 if attempt < tuning.max_retries && transient {
63 log::warn!(
64 "export '{}': connection failed, will retry: {:#}",
65 export.name,
66 e
67 );
68 last_err = Some(e);
69 continue;
70 }
71 return Err(e);
72 }
73 };
74
75 match run_export(
76 &mut *src,
77 source_config,
78 state,
79 export,
80 tuning,
81 config_dir,
82 validate,
83 summary,
84 params,
85 resume,
86 config_path,
87 ) {
88 Ok(()) => return Ok(()),
89 Err(e) => {
90 let (transient, _, _) = classify_error(&e);
91 if attempt < tuning.max_retries && transient {
92 last_err = Some(e);
93 continue;
94 }
95 return Err(e);
96 }
97 }
98 }
99
100 Err(last_err.unwrap_or_else(|| anyhow::anyhow!("export failed after retries")))
101}
102
103#[allow(clippy::too_many_arguments)]
104pub(crate) fn run_export(
105 src: &mut dyn Source,
106 source_config: &SourceConfig,
107 state: &StateStore,
108 export: &ExportConfig,
109 tuning: &SourceTuning,
110 config_dir: &Path,
111 validate: bool,
112 summary: &mut RunSummary,
113 params: Option<&std::collections::HashMap<String, String>>,
114 resume: bool,
115 config_path: &str,
116) -> Result<()> {
117 let base_query = export.resolve_query(config_dir, params)?;
118
119 match export.mode {
120 ExportMode::Full => {
121 run_single_export(
122 src,
123 &base_query,
124 None,
125 None,
126 export,
127 tuning,
128 validate,
129 Some(state),
130 summary,
131 )?;
132 }
133 ExportMode::Incremental => {
134 let cursor_state = state.get(&export.name)?;
135 let cursor_col = export.cursor_column.as_deref();
136 run_single_export(
137 src,
138 &base_query,
139 cursor_col,
140 Some(&cursor_state),
141 export,
142 tuning,
143 validate,
144 Some(state),
145 summary,
146 )?;
147 }
148 ExportMode::Chunked => {
149 if export.chunk_checkpoint {
150 run_chunked_sequential_checkpoint(
151 src,
152 source_config,
153 state,
154 &base_query,
155 export,
156 tuning,
157 validate,
158 summary,
159 Some(state),
160 resume,
161 config_path,
162 )?;
163 } else {
164 run_chunked_sequential(
165 src,
166 &base_query,
167 export,
168 tuning,
169 validate,
170 summary,
171 Some(state),
172 )?;
173 }
174 }
175 ExportMode::TimeWindow => {
176 let windowed_query = build_time_window_query(
177 &base_query,
178 export
179 .time_column
180 .as_deref()
181 .expect("time_column required for TimeWindow mode"),
182 export.time_column_type,
183 export
184 .days_window
185 .expect("days_window required for TimeWindow mode"),
186 );
187 run_single_export(
188 src,
189 &windowed_query,
190 None,
191 None,
192 export,
193 tuning,
194 validate,
195 Some(state),
196 summary,
197 )?;
198 }
199 }
200
201 Ok(())
202}
203
204#[allow(clippy::too_many_arguments)]
205pub(super) fn run_single_export(
206 src: &mut dyn Source,
207 query: &str,
208 cursor_column: Option<&str>,
209 cursor: Option<&crate::types::CursorState>,
210 export: &ExportConfig,
211 tuning: &SourceTuning,
212 validate: bool,
213 state: Option<&StateStore>,
214 summary: &mut RunSummary,
215) -> Result<()> {
216 let mut sink = ExportSink::new(export)?;
217
218 src.export(query, cursor_column, cursor, tuning, &mut sink)?;
219
220 if let Some(w) = sink.writer.take() {
221 w.finish()?;
222 }
223
224 summary.total_rows += sink.total_rows as i64;
225 log::info!("export '{}': {} rows written", export.name, sink.total_rows);
226
227 if sink.total_rows == 0 {
228 if export.skip_empty {
229 summary.status = "skipped".into();
230 log::info!(
231 "export '{}': skipped (0 rows, skip_empty=true)",
232 export.name
233 );
234 } else {
235 log::info!("export '{}': no data to export", export.name);
236 }
237 return Ok(());
238 }
239
240 let quality_issues = sink.run_quality_checks();
241 if !quality_issues.is_empty() {
242 for issue in &quality_issues {
243 let level = match issue.severity {
244 crate::quality::Severity::Fail => "FAIL",
245 crate::quality::Severity::Warn => "WARN",
246 };
247 log::warn!("quality {}: {}", level, issue.message);
248 }
249 if quality_issues
250 .iter()
251 .any(|i| i.severity == crate::quality::Severity::Fail)
252 {
253 summary.quality_passed = Some(false);
254 anyhow::bail!("export '{}': quality checks failed", export.name);
255 }
256 }
257 if export.quality.is_some() {
258 summary.quality_passed = Some(true);
259 }
260
261 if sink.part_rows > 0 {
262 sink.completed_parts.push(CompletedPart {
263 tmp: std::mem::replace(&mut sink.tmp, tempfile::NamedTempFile::new()?),
264 rows: sink.part_rows,
265 });
266 }
267
268 let fmt = format::create_format(export.format, export.compression, export.compression_level);
269 let ext = fmt.file_extension();
270 let dest = destination::create_destination(&export.destination)?;
271 let has_parts = sink.completed_parts.len() > 1;
272 let ts = chrono::Utc::now().format("%Y%m%d_%H%M%S");
273
274 for (part_idx, part) in sink.completed_parts.iter().enumerate() {
275 if validate {
276 validate_output(part.tmp.path(), export.format, part.rows)?;
277 summary.validated = Some(true);
278 }
279
280 let file_bytes = std::fs::metadata(part.tmp.path())
281 .map(|m| m.len())
282 .unwrap_or(0);
283 summary.bytes_written += file_bytes;
284 summary.files_produced += 1;
285
286 let file_name = if has_parts {
287 format!("{}_{}_part{}.{}", export.name, ts, part_idx, ext)
288 } else {
289 format!("{}_{}.{}", export.name, ts, ext)
290 };
291 dest.write(part.tmp.path(), &file_name)?;
292
293 if let Some(st) = state {
294 let _ = st.record_file(
295 &summary.run_id,
296 &export.name,
297 &file_name,
298 part.rows as i64,
299 file_bytes as i64,
300 &format!("{:?}", export.format).to_lowercase(),
301 Some(&format!("{:?}", export.compression).to_lowercase()),
302 );
303 }
304 }
305
306 if export.mode == ExportMode::Incremental
307 && let (Some(cursor_col), Some(batch), Some(schema), Some(st)) =
308 (&export.cursor_column, &sink.last_batch, &sink.schema, state)
309 && let Some(last_val) = extract_last_cursor_value(batch, cursor_col, schema)
310 {
311 st.update(&export.name, &last_val)?;
312 log::info!("export '{}': cursor updated to '{}'", export.name, last_val);
313 }
314
315 if let (Some(schema), Some(st)) = (&sink.schema, state) {
316 let columns: Vec<crate::state::SchemaColumn> = schema
317 .fields()
318 .iter()
319 .map(|f| crate::state::SchemaColumn {
320 name: f.name().clone(),
321 data_type: format!("{:?}", f.data_type()),
322 })
323 .collect();
324
325 match st.detect_schema_change(&export.name, &columns) {
326 Ok(Some(change)) => {
327 summary.schema_changed = Some(true);
328 log::warn!("export '{}': schema changed!", export.name);
329 if !change.added.is_empty() {
330 log::warn!(" added columns: {}", change.added.join(", "));
331 }
332 if !change.removed.is_empty() {
333 log::warn!(" removed columns: {}", change.removed.join(", "));
334 }
335 for (col, old, new) in &change.type_changed {
336 log::warn!(" type changed: {} ({} -> {})", col, old, new);
337 }
338 }
339 Ok(None) => {
340 summary.schema_changed = Some(false);
341 }
342 Err(e) => log::warn!("schema tracking error: {:#}", e),
343 }
344 }
345
346 log::info!("export '{}' completed successfully", export.name);
347 Ok(())
348}
349
350pub fn build_time_window_query(
351 base_query: &str,
352 time_column: &str,
353 time_type: TimeColumnType,
354 days_window: u32,
355) -> String {
356 let now = chrono::Utc::now();
357 let window_start = now - chrono::Duration::days(days_window as i64);
358 let truncated = window_start
359 .date_naive()
360 .and_hms_opt(0, 0, 0)
361 .expect("midnight is always valid");
362
363 let condition = match time_type {
364 TimeColumnType::Timestamp => {
365 format!(
366 "{} >= '{}'",
367 time_column,
368 truncated.format("%Y-%m-%d %H:%M:%S")
369 )
370 }
371 TimeColumnType::Unix => {
372 format!("{} >= {}", time_column, truncated.and_utc().timestamp())
373 }
374 };
375
376 format!(
377 "SELECT * FROM ({base}) AS _rivet WHERE {cond}",
378 base = base_query,
379 cond = condition,
380 )
381}
382
383#[cfg(test)]
384mod tests {
385 use super::*;
386
387 #[test]
388 fn test_build_time_window_timestamp() {
389 let q = build_time_window_query(
390 "SELECT * FROM events",
391 "created_at",
392 TimeColumnType::Timestamp,
393 7,
394 );
395 assert!(q.contains("created_at >= '"), "got: {}", q);
396 assert!(q.contains("_rivet WHERE"));
397 }
398
399 #[test]
400 fn test_build_time_window_unix() {
401 let q = build_time_window_query("SELECT * FROM events", "ts", TimeColumnType::Unix, 30);
402 assert!(q.contains("ts >= "), "got: {}", q);
403 assert!(!q.contains("'"), "unix should not have quotes, got: {}", q);
404 }
405}