1use std::path::Path;
7
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10
11use super::IncrementalCursorMode;
12use super::destination::DestinationConfig;
13use super::format::{CompressionProfile, CompressionType, FormatType, ParquetConfig};
14use super::resolve::{parse_file_size, resolve_vars};
15use crate::tuning::TuningConfig;
16
17#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
25#[serde(rename_all = "snake_case")]
26pub enum SchemaDriftPolicy {
27 #[default]
29 Warn,
30 Continue,
32 Fail,
35}
36#[derive(Debug, Deserialize, JsonSchema, Clone)]
37#[serde(deny_unknown_fields)]
38pub struct ExportConfig {
39 pub name: String,
40 #[serde(default)]
41 pub query: Option<String>,
42 pub query_file: Option<String>,
43 #[serde(default)]
52 pub table: Option<String>,
53 #[serde(default = "default_mode")]
54 pub mode: ExportMode,
55 pub cursor_column: Option<String>,
56 #[serde(default)]
58 pub cursor_fallback_column: Option<String>,
59 #[serde(default)]
61 pub incremental_cursor_mode: IncrementalCursorMode,
62 pub chunk_column: Option<String>,
63 #[serde(default)]
64 pub chunk_dense: bool,
65 #[serde(default = "default_chunk_size")]
66 pub chunk_size: usize,
67 #[serde(default)]
83 pub chunk_size_memory_mb: Option<u64>,
84 pub chunk_count: Option<usize>,
88 pub chunk_by_days: Option<u32>,
89 #[serde(default = "default_parallel")]
90 pub parallel: usize,
91 pub time_column: Option<String>,
92 #[serde(default = "default_time_column_type")]
93 pub time_column_type: TimeColumnType,
94 pub days_window: Option<u32>,
95 pub format: FormatType,
96 #[serde(default)]
97 pub compression: CompressionType,
98 pub compression_level: Option<u32>,
99 pub compression_profile: Option<CompressionProfile>,
100 #[serde(default)]
101 pub skip_empty: bool,
102 pub destination: DestinationConfig,
103 #[serde(default)]
104 pub meta_columns: MetaColumns,
105 #[serde(default)]
106 pub quality: Option<QualityConfig>,
107 pub max_file_size: Option<String>,
108 #[serde(default)]
109 pub chunk_checkpoint: bool,
110 pub chunk_max_attempts: Option<u32>,
111 #[serde(default)]
112 pub tuning: Option<TuningConfig>,
113 #[serde(default)]
115 pub source_group: Option<String>,
116 #[serde(default)]
119 pub reconcile_required: bool,
120
121 #[serde(default)]
136 pub columns: std::collections::HashMap<String, String>,
137
138 #[serde(default)]
141 pub on_schema_drift: SchemaDriftPolicy,
142
143 #[serde(default)]
148 pub shape_drift_warn_factor: Option<f64>,
149
150 #[serde(default)]
153 pub parquet: Option<ParquetConfig>,
154}
155
156impl ExportConfig {
157 pub fn effective_compression(&self) -> (CompressionType, Option<u32>) {
160 if let Some(profile) = self.compression_profile {
161 profile.to_codec()
162 } else {
163 (self.compression, self.compression_level)
164 }
165 }
166
167 pub fn max_file_size_bytes(&self) -> Option<u64> {
168 self.max_file_size
169 .as_ref()
170 .and_then(|s| parse_file_size(s).ok())
171 }
172
173 pub fn resolve_query(
174 &self,
175 config_dir: &Path,
176 params: Option<&std::collections::HashMap<String, String>>,
177 ) -> crate::error::Result<String> {
178 if let Some(tbl) = &self.table {
181 validate_table_shortcut_ident(&self.name, tbl)?;
182 return Ok(format!("SELECT * FROM {tbl}"));
183 }
184 match (&self.query, &self.query_file) {
185 (Some(q), None) => {
186 if params.is_some() {
187 resolve_vars(q, params)
188 } else {
189 Ok(q.clone())
190 }
191 }
192 (None, Some(file)) => {
193 let file_path = std::path::Path::new(file);
194 if file_path.is_absolute() {
196 anyhow::bail!(
197 "export '{}': query_file must be a relative path: '{}'",
198 self.name,
199 file
200 );
201 }
202 if file_path
203 .components()
204 .any(|c| c == std::path::Component::ParentDir)
205 {
206 anyhow::bail!(
207 "export '{}': query_file path must not contain '..': '{}'",
208 self.name,
209 file
210 );
211 }
212 let joined = config_dir.join(file);
213 if let Ok(canonical) = joined.canonicalize() {
216 let base = config_dir
217 .canonicalize()
218 .unwrap_or_else(|_| config_dir.to_path_buf());
219 if !canonical.starts_with(&base) {
220 anyhow::bail!(
221 "export '{}': query_file '{}' resolves outside the config directory",
222 self.name,
223 file
224 );
225 }
226 }
227 let raw = std::fs::read_to_string(&joined)?;
228 resolve_vars(&raw, params)
229 }
230 (Some(_), Some(_)) => {
231 anyhow::bail!(
232 "export '{}': specify either 'query' or 'query_file', not both",
233 self.name
234 )
235 }
236 (None, None) => {
237 anyhow::bail!(
238 "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
239 self.name
240 )
241 }
242 }
243 }
244}
245
246fn validate_table_shortcut_ident(export_name: &str, raw: &str) -> crate::error::Result<()> {
257 let trimmed = raw.trim();
258 if trimmed.is_empty() {
259 anyhow::bail!("export '{export_name}': 'table' is empty");
260 }
261 let parts: Vec<&str> = trimmed.split('.').collect();
262 if parts.len() > 2 {
263 anyhow::bail!(
264 "export '{export_name}': 'table' must be `<name>` or `<schema>.<name>` (got '{raw}')"
265 );
266 }
267 for part in &parts {
268 if part.is_empty() {
269 anyhow::bail!("export '{export_name}': 'table' has an empty segment in '{raw}'");
270 }
271 let mut chars = part.chars();
272 let first = chars.next().unwrap();
273 if !(first.is_ascii_alphabetic() || first == '_') {
274 anyhow::bail!(
275 "export '{export_name}': 'table' segment '{part}' must start with a letter or underscore (use 'query:' for quoted identifiers)"
276 );
277 }
278 if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
279 anyhow::bail!(
280 "export '{export_name}': 'table' segment '{part}' contains non-identifier characters (use 'query:' for quoted identifiers)"
281 );
282 }
283 }
284 Ok(())
285}
286
287#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
288#[serde(deny_unknown_fields)]
289pub struct QualityConfig {
290 pub row_count_min: Option<usize>,
291 pub row_count_max: Option<usize>,
292 #[serde(default)]
293 pub null_ratio_max: std::collections::HashMap<String, f64>,
294 #[serde(default)]
295 pub unique_columns: Vec<String>,
296 pub unique_max_entries: Option<usize>,
300}
301
302#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
303#[serde(deny_unknown_fields)]
304pub struct MetaColumns {
305 #[serde(default)]
306 pub exported_at: bool,
307 #[serde(default)]
308 pub row_hash: bool,
309}
310
311fn default_mode() -> ExportMode {
312 ExportMode::Full
313}
314
315fn default_chunk_size() -> usize {
316 100_000
317}
318
319fn default_parallel() -> usize {
320 1
321}
322
323fn default_time_column_type() -> TimeColumnType {
324 TimeColumnType::Timestamp
325}
326
327#[derive(Debug, Deserialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
328#[serde(rename_all = "snake_case")]
329pub enum ExportMode {
330 Full,
331 Incremental,
332 Chunked,
333 TimeWindow,
334}
335
336#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
337#[serde(rename_all = "lowercase")]
338pub enum TimeColumnType {
339 Timestamp,
340 Unix,
341}
342
343#[cfg(test)]
344mod tests {
345 use super::*;
346 use crate::config::{DestinationConfig, DestinationType};
347
348 fn make_export_yaml(name: &str, extra: &str) -> ExportConfig {
351 let yaml = format!(
352 "name: {name}\nquery: \"SELECT 1\"\nformat: parquet\ndestination:\n type: local\n path: /tmp\n{extra}"
353 );
354 serde_yaml_ng::from_str(&yaml).expect("parse ExportConfig")
355 }
356
357 #[test]
358 fn max_file_size_bytes_none_when_unset() {
359 let exp = make_export_yaml("no_limit", "");
360 assert!(exp.max_file_size_bytes().is_none());
361 }
362
363 #[test]
364 fn max_file_size_bytes_parses_mb() {
365 let exp = make_export_yaml("sized", "max_file_size: \"128MB\"\n");
366 assert_eq!(exp.max_file_size_bytes(), Some(128 * 1024 * 1024));
367 }
368
369 #[test]
370 fn max_file_size_bytes_parses_gb() {
371 let exp = make_export_yaml("sized_gb", "max_file_size: \"2GB\"\n");
372 assert_eq!(exp.max_file_size_bytes(), Some(2 * 1024 * 1024 * 1024));
373 }
374
375 #[test]
376 fn max_file_size_bytes_returns_none_on_invalid() {
377 let exp = make_export_yaml("bad_size", "max_file_size: \"notanumber\"\n");
378 assert!(exp.max_file_size_bytes().is_none());
379 }
380
381 fn make_export_direct(query: Option<&str>, query_file: Option<&str>) -> ExportConfig {
388 ExportConfig {
389 name: "test".into(),
390 query: query.map(|s| s.to_string()),
391 query_file: query_file.map(|s| s.to_string()),
392 table: None,
393 mode: ExportMode::Full,
394 cursor_column: None,
395 cursor_fallback_column: None,
396 incremental_cursor_mode: Default::default(),
397 chunk_column: None,
398 chunk_dense: false,
399 chunk_size: 100_000,
400 chunk_size_memory_mb: None,
401 chunk_count: None,
402 chunk_by_days: None,
403 parallel: 1,
404 time_column: None,
405 time_column_type: TimeColumnType::Timestamp,
406 days_window: None,
407 format: FormatType::Parquet,
408 compression: CompressionType::None,
409 compression_level: None,
410 compression_profile: None,
411 skip_empty: false,
412 destination: DestinationConfig {
413 destination_type: DestinationType::Local,
414 path: Some("/tmp".into()),
415 ..Default::default()
416 },
417 meta_columns: MetaColumns::default(),
418 quality: None,
419 max_file_size: None,
420 chunk_checkpoint: false,
421 chunk_max_attempts: None,
422 tuning: None,
423 source_group: None,
424 reconcile_required: false,
425 columns: Default::default(),
426 on_schema_drift: Default::default(),
427 shape_drift_warn_factor: None,
428 parquet: None,
429 }
430 }
431
432 fn params(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
433 pairs
434 .iter()
435 .map(|(k, v)| (k.to_string(), v.to_string()))
436 .collect()
437 }
438
439 #[test]
440 fn resolve_query_inline_no_params_returns_query_as_is() {
441 let exp = make_export_direct(Some("SELECT id FROM orders"), None);
442 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
443 assert_eq!(q, "SELECT id FROM orders");
444 }
445
446 #[test]
447 fn resolve_query_inline_with_params_substitutes_vars() {
448 let exp = make_export_direct(Some("SELECT ${col} FROM ${table}"), None);
449 let p = params(&[("col", "id"), ("table", "orders")]);
450 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
451 assert_eq!(q, "SELECT id FROM orders");
452 }
453
454 #[test]
455 fn resolve_query_inline_params_empty_map_is_noop() {
456 let exp = make_export_direct(Some("SELECT 1"), None);
457 let p = params(&[]);
458 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
459 assert_eq!(q, "SELECT 1");
460 }
461
462 #[test]
463 fn resolve_query_inline_missing_var_returns_error() {
464 unsafe { std::env::remove_var("UNSET_RIVET_TEST_VAR") };
466 let exp = make_export_direct(Some("SELECT ${UNSET_RIVET_TEST_VAR}"), None);
467 let p = params(&[]);
468 let result = exp.resolve_query(Path::new("/tmp"), Some(&p));
469 assert!(result.is_err());
470 let msg = format!("{:#}", result.unwrap_err());
471 assert!(
472 msg.contains("UNSET_RIVET_TEST_VAR") || msg.contains("not set"),
473 "got: {msg}"
474 );
475 }
476
477 #[test]
478 fn resolve_query_file_reads_content() {
479 let dir = tempfile::TempDir::new().unwrap();
480 let sql_path = dir.path().join("query.sql");
481 std::fs::write(&sql_path, "SELECT * FROM customers").unwrap();
482 let exp = make_export_direct(None, Some("query.sql"));
483 let q = exp.resolve_query(dir.path(), None).unwrap();
484 assert_eq!(q, "SELECT * FROM customers");
485 }
486
487 #[test]
488 fn resolve_query_file_with_params_substitutes() {
489 let dir = tempfile::TempDir::new().unwrap();
490 let sql_path = dir.path().join("q.sql");
491 std::fs::write(&sql_path, "SELECT ${col} FROM ${tbl}").unwrap();
492 let exp = make_export_direct(None, Some("q.sql"));
493 let p = params(&[("col", "name"), ("tbl", "users")]);
494 let q = exp.resolve_query(dir.path(), Some(&p)).unwrap();
495 assert_eq!(q, "SELECT name FROM users");
496 }
497
498 #[test]
501 fn resolve_query_table_shortcut_qualified() {
502 let mut exp = make_export_direct(None, None);
503 exp.table = Some("public.users".into());
504 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
505 assert_eq!(q, "SELECT * FROM public.users");
506 }
507
508 #[test]
509 fn resolve_query_table_shortcut_unqualified() {
510 let mut exp = make_export_direct(None, None);
511 exp.table = Some("orders".into());
512 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
513 assert_eq!(q, "SELECT * FROM orders");
514 }
515
516 #[test]
517 fn resolve_query_table_shortcut_rejects_three_part_name() {
518 let mut exp = make_export_direct(None, None);
519 exp.table = Some("db.public.users".into());
520 let err = exp.resolve_query(Path::new("/tmp"), None).unwrap_err();
521 let msg = format!("{err:#}");
522 assert!(msg.contains("<schema>.<name>"), "got: {msg}");
523 }
524
525 #[test]
526 fn resolve_query_table_shortcut_rejects_sql_injection() {
527 for bad in [
528 "users; DROP TABLE x",
529 "users--",
530 "users'",
531 "users\"",
532 "public.\"My Table\"",
533 "0starts_with_digit",
534 "",
535 ".trailing",
536 "leading.",
537 "two..dots",
538 ] {
539 let mut exp = make_export_direct(None, None);
540 exp.table = Some(bad.into());
541 assert!(
542 exp.resolve_query(Path::new("/tmp"), None).is_err(),
543 "should reject `table:` value '{bad}'",
544 );
545 }
546 }
547
548 #[test]
549 fn resolve_query_table_shortcut_takes_precedence_over_query() {
550 let mut exp = make_export_direct(Some("SELECT id FROM x"), None);
551 exp.table = Some("public.y".into());
552 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
553 assert_eq!(q, "SELECT * FROM public.y");
554 }
555
556 #[test]
557 fn resolve_query_file_missing_returns_error() {
558 let dir = tempfile::TempDir::new().unwrap();
559 let exp = make_export_direct(None, Some("nonexistent.sql"));
560 let result = exp.resolve_query(dir.path(), None);
561 assert!(result.is_err());
562 let msg = format!("{:#}", result.unwrap_err());
563 assert!(
564 msg.contains("nonexistent.sql") || msg.contains("No such file"),
565 "got: {msg}"
566 );
567 }
568
569 #[test]
570 fn resolve_query_both_set_returns_error() {
571 let mut exp = make_export_direct(Some("SELECT 1"), None);
572 exp.query_file = Some("file.sql".into());
573 let result = exp.resolve_query(Path::new("/tmp"), None);
574 assert!(result.is_err());
575 let msg = format!("{:#}", result.unwrap_err());
576 assert!(
577 msg.contains("not both") || msg.contains("query_file"),
578 "got: {msg}"
579 );
580 }
581
582 #[test]
583 fn resolve_query_neither_set_returns_error() {
584 let exp = make_export_direct(None, None);
585 let result = exp.resolve_query(Path::new("/tmp"), None);
586 assert!(result.is_err());
587 let msg = format!("{:#}", result.unwrap_err());
588 assert!(
589 msg.contains("query") || msg.contains("query_file"),
590 "got: {msg}"
591 );
592 }
593
594 #[test]
597 fn resolve_query_file_dotdot_is_rejected() {
598 let dir = tempfile::TempDir::new().unwrap();
599 let exp = make_export_direct(None, Some("../secret.sql"));
600 let result = exp.resolve_query(dir.path(), None);
601 assert!(result.is_err());
602 let msg = format!("{:#}", result.unwrap_err());
603 assert!(
604 msg.contains("..") || msg.contains("traversal"),
605 "got: {msg}"
606 );
607 }
608
609 #[test]
610 fn resolve_query_file_nested_dotdot_is_rejected() {
611 let dir = tempfile::TempDir::new().unwrap();
612 let exp = make_export_direct(None, Some("subdir/../../etc/passwd"));
613 let result = exp.resolve_query(dir.path(), None);
614 assert!(result.is_err());
615 let msg = format!("{:#}", result.unwrap_err());
616 assert!(
617 msg.contains("..") || msg.contains("traversal"),
618 "got: {msg}"
619 );
620 }
621
622 #[test]
623 fn resolve_query_file_absolute_path_is_rejected() {
624 let dir = tempfile::TempDir::new().unwrap();
625 let exp = make_export_direct(None, Some("/etc/passwd"));
626 let result = exp.resolve_query(dir.path(), None);
627 assert!(result.is_err());
628 let msg = format!("{:#}", result.unwrap_err());
629 assert!(
630 msg.contains("relative") || msg.contains("absolute"),
631 "got: {msg}"
632 );
633 }
634
635 #[test]
636 fn resolve_query_file_in_subdir_is_allowed() {
637 let dir = tempfile::TempDir::new().unwrap();
638 let subdir = dir.path().join("queries");
639 std::fs::create_dir(&subdir).unwrap();
640 std::fs::write(subdir.join("orders.sql"), "SELECT * FROM orders").unwrap();
641 let exp = make_export_direct(None, Some("queries/orders.sql"));
642 let q = exp.resolve_query(dir.path(), None).unwrap();
643 assert_eq!(q, "SELECT * FROM orders");
644 }
645}