1use std::path::Path;
7
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10
11use super::IncrementalCursorMode;
12use super::destination::DestinationConfig;
13use super::format::{CompressionProfile, CompressionType, FormatType, ParquetConfig};
14use super::resolve::{parse_file_size, resolve_vars};
15use crate::tuning::TuningConfig;
16
17#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
26#[serde(rename_all = "snake_case")]
27pub enum VerifyMode {
28 #[default]
30 Size,
31 Content,
34}
35
36impl VerifyMode {
37 pub fn requires_content(self) -> bool {
39 matches!(self, VerifyMode::Content)
40 }
41}
42
43#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
44#[serde(rename_all = "snake_case")]
45pub enum SchemaDriftPolicy {
46 #[default]
48 Warn,
49 Continue,
51 Fail,
54}
55#[derive(Debug, Deserialize, JsonSchema, Clone)]
56#[serde(deny_unknown_fields)]
57pub struct ExportConfig {
58 pub name: String,
59 #[serde(default)]
60 pub query: Option<String>,
61 pub query_file: Option<String>,
62 #[serde(default)]
71 pub table: Option<String>,
72 #[serde(default = "default_mode")]
73 pub mode: ExportMode,
74 pub cursor_column: Option<String>,
75 #[serde(default)]
77 pub cursor_fallback_column: Option<String>,
78 #[serde(default)]
80 pub incremental_cursor_mode: IncrementalCursorMode,
81 pub chunk_column: Option<String>,
82 #[serde(default)]
83 pub chunk_dense: bool,
84 #[serde(default = "default_chunk_size")]
85 pub chunk_size: usize,
86 #[serde(default)]
102 pub chunk_size_memory_mb: Option<u64>,
103 pub chunk_count: Option<usize>,
107 pub chunk_by_days: Option<u32>,
108 pub chunk_by_key: Option<String>,
113 #[serde(default = "default_parallel")]
114 pub parallel: usize,
115 pub time_column: Option<String>,
116 #[serde(default = "default_time_column_type")]
117 pub time_column_type: TimeColumnType,
118 pub days_window: Option<u32>,
119
120 #[serde(default)]
152 pub partition_by: Option<String>,
153
154 #[serde(default)]
160 pub partition_granularity: PartitionGranularity,
161 pub format: FormatType,
162 #[serde(default)]
163 pub compression: CompressionType,
164 pub compression_level: Option<u32>,
165 pub compression_profile: Option<CompressionProfile>,
166 #[serde(default)]
167 pub skip_empty: bool,
168 pub destination: DestinationConfig,
169 #[serde(default)]
176 pub verify: VerifyMode,
177 #[serde(default)]
178 pub meta_columns: MetaColumns,
179 #[serde(default)]
180 pub quality: Option<QualityConfig>,
181 pub max_file_size: Option<String>,
186 #[serde(default)]
187 pub chunk_checkpoint: bool,
188 pub chunk_max_attempts: Option<u32>,
189 #[serde(default)]
190 pub tuning: Option<TuningConfig>,
191 #[serde(default)]
193 pub source_group: Option<String>,
194 #[serde(default)]
197 pub reconcile_required: bool,
198
199 #[serde(default)]
214 pub columns: std::collections::HashMap<String, String>,
215
216 #[serde(default)]
229 pub target: Option<String>,
230
231 #[serde(default)]
234 pub on_schema_drift: SchemaDriftPolicy,
235
236 #[serde(default)]
241 pub shape_drift_warn_factor: Option<f64>,
242
243 #[serde(default)]
246 pub parquet: Option<ParquetConfig>,
247}
248
249impl ExportConfig {
250 pub fn effective_compression(&self) -> (CompressionType, Option<u32>) {
259 if let Some(profile) = self.compression_profile {
260 let explicit_codec =
261 (self.compression != CompressionType::default()).then_some(self.compression);
262 if let Some(msg) = super::format::compression_profile_override_warning(
263 profile,
264 explicit_codec,
265 self.compression_level,
266 ) {
267 log::warn!("export '{}': {}", self.name, msg);
268 }
269 profile.to_codec()
270 } else {
271 (self.compression, self.compression_level)
272 }
273 }
274
275 pub fn max_file_size_bytes(&self) -> Option<u64> {
276 self.max_file_size
277 .as_ref()
278 .and_then(|s| parse_file_size(s).ok())
279 }
280
281 pub fn resolve_query(
282 &self,
283 config_dir: &Path,
284 params: Option<&std::collections::HashMap<String, String>>,
285 ) -> crate::error::Result<String> {
286 if let Some(tbl) = &self.table {
289 validate_table_shortcut_ident(&self.name, tbl)?;
290 return Ok(format!("SELECT * FROM {tbl}"));
291 }
292 match (&self.query, &self.query_file) {
293 (Some(q), None) => {
294 if params.is_some() {
295 resolve_vars(q, params)
296 } else {
297 Ok(q.clone())
298 }
299 }
300 (None, Some(file)) => {
301 let file_path = std::path::Path::new(file);
302 if file_path.is_absolute() {
304 anyhow::bail!(
305 "export '{}': query_file must be a relative path: '{}'",
306 self.name,
307 file
308 );
309 }
310 if file_path
311 .components()
312 .any(|c| c == std::path::Component::ParentDir)
313 {
314 anyhow::bail!(
315 "export '{}': query_file path must not contain '..': '{}'",
316 self.name,
317 file
318 );
319 }
320 let joined = config_dir.join(file);
321 if let Ok(canonical) = joined.canonicalize() {
324 let base = config_dir
325 .canonicalize()
326 .unwrap_or_else(|_| config_dir.to_path_buf());
327 if !canonical.starts_with(&base) {
328 anyhow::bail!(
329 "export '{}': query_file '{}' resolves outside the config directory",
330 self.name,
331 file
332 );
333 }
334 }
335 let raw = std::fs::read_to_string(&joined)?;
336 resolve_vars(&raw, params)
337 }
338 (Some(_), Some(_)) => {
339 anyhow::bail!(
340 "export '{}': specify either 'query' or 'query_file', not both",
341 self.name
342 )
343 }
344 (None, None) => {
345 anyhow::bail!(
346 "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
347 self.name
348 )
349 }
350 }
351 }
352}
353
354fn validate_table_shortcut_ident(export_name: &str, raw: &str) -> crate::error::Result<()> {
365 let trimmed = raw.trim();
366 if trimmed.is_empty() {
367 anyhow::bail!("export '{export_name}': 'table' is empty");
368 }
369 let parts: Vec<&str> = trimmed.split('.').collect();
370 if parts.len() > 2 {
371 anyhow::bail!(
372 "export '{export_name}': 'table' must be `<name>` or `<schema>.<name>` (got '{raw}')"
373 );
374 }
375 for part in &parts {
376 if part.is_empty() {
377 anyhow::bail!("export '{export_name}': 'table' has an empty segment in '{raw}'");
378 }
379 let mut chars = part.chars();
380 let first = chars.next().unwrap();
381 if !(first.is_ascii_alphabetic() || first == '_') {
382 anyhow::bail!(
383 "export '{export_name}': 'table' segment '{part}' must start with a letter or underscore (use 'query:' for quoted identifiers)"
384 );
385 }
386 if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
387 anyhow::bail!(
388 "export '{export_name}': 'table' segment '{part}' contains non-identifier characters (use 'query:' for quoted identifiers)"
389 );
390 }
391 }
392 Ok(())
393}
394
395#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
396#[serde(deny_unknown_fields)]
397pub struct QualityConfig {
398 pub row_count_min: Option<usize>,
399 pub row_count_max: Option<usize>,
400 #[serde(default)]
401 pub null_ratio_max: std::collections::HashMap<String, f64>,
402 #[serde(default)]
403 pub unique_columns: Vec<String>,
404 pub unique_max_entries: Option<usize>,
408}
409
410#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
411#[serde(deny_unknown_fields)]
412pub struct MetaColumns {
413 #[serde(default)]
414 pub exported_at: bool,
415 #[serde(default)]
416 pub row_hash: bool,
417}
418
419fn default_mode() -> ExportMode {
420 ExportMode::Full
421}
422
423fn default_chunk_size() -> usize {
424 100_000
425}
426
427fn default_parallel() -> usize {
428 1
429}
430
431fn default_time_column_type() -> TimeColumnType {
432 TimeColumnType::Timestamp
433}
434
435#[derive(Debug, Deserialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
436#[serde(rename_all = "snake_case")]
437pub enum ExportMode {
438 Full,
439 Incremental,
440 Chunked,
441 TimeWindow,
442}
443
444#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
445#[serde(rename_all = "lowercase")]
446pub enum TimeColumnType {
447 Timestamp,
448 Unix,
449}
450
451#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
456#[serde(rename_all = "lowercase")]
457pub enum PartitionGranularity {
458 #[default]
460 Day,
461 Month,
463 Year,
465}
466
467#[cfg(test)]
474pub(crate) fn sample_export(name: &str) -> ExportConfig {
475 ExportConfig {
476 name: name.into(),
477 target: None,
478 verify: VerifyMode::Size,
479 query: Some("SELECT 1".into()),
480 query_file: None,
481 table: None,
482 mode: ExportMode::Full,
483 cursor_column: None,
484 cursor_fallback_column: None,
485 incremental_cursor_mode: Default::default(),
486 chunk_column: None,
487 chunk_dense: false,
488 chunk_size: 100_000,
489 chunk_size_memory_mb: None,
490 chunk_count: None,
491 chunk_by_days: None,
492 chunk_by_key: None,
493 parallel: 1,
494 time_column: None,
495 time_column_type: TimeColumnType::Timestamp,
496 days_window: None,
497 partition_by: None,
498 partition_granularity: PartitionGranularity::Day,
499 format: FormatType::Parquet,
500 compression: CompressionType::None,
501 compression_level: None,
502 compression_profile: None,
503 skip_empty: false,
504 destination: crate::config::DestinationConfig {
505 destination_type: crate::config::DestinationType::Local,
506 path: Some("/tmp".into()),
507 ..Default::default()
508 },
509 meta_columns: MetaColumns::default(),
510 quality: None,
511 max_file_size: None,
512 chunk_checkpoint: false,
513 chunk_max_attempts: None,
514 tuning: None,
515 source_group: None,
516 reconcile_required: false,
517 columns: Default::default(),
518 on_schema_drift: Default::default(),
519 shape_drift_warn_factor: None,
520 parquet: None,
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527
528 fn make_export_yaml(name: &str, extra: &str) -> ExportConfig {
531 let yaml = format!(
532 "name: {name}\nquery: \"SELECT 1\"\nformat: parquet\ndestination:\n type: local\n path: /tmp\n{extra}"
533 );
534 serde_yaml_ng::from_str(&yaml).expect("parse ExportConfig")
535 }
536
537 #[test]
538 fn max_file_size_bytes_none_when_unset() {
539 let exp = make_export_yaml("no_limit", "");
540 assert!(exp.max_file_size_bytes().is_none());
541 }
542
543 #[test]
544 fn max_file_size_bytes_parses_mb() {
545 let exp = make_export_yaml("sized", "max_file_size: \"128MB\"\n");
546 assert_eq!(exp.max_file_size_bytes(), Some(128 * 1024 * 1024));
547 }
548
549 #[test]
550 fn max_file_size_bytes_parses_gb() {
551 let exp = make_export_yaml("sized_gb", "max_file_size: \"2GB\"\n");
552 assert_eq!(exp.max_file_size_bytes(), Some(2 * 1024 * 1024 * 1024));
553 }
554
555 #[test]
556 fn max_file_size_bytes_returns_none_on_invalid() {
557 let exp = make_export_yaml("bad_size", "max_file_size: \"notanumber\"\n");
558 assert!(exp.max_file_size_bytes().is_none());
559 }
560
561 fn make_export_direct(query: Option<&str>, query_file: Option<&str>) -> ExportConfig {
568 ExportConfig {
569 query: query.map(|s| s.to_string()),
570 query_file: query_file.map(|s| s.to_string()),
571 ..sample_export("test")
572 }
573 }
574
575 fn params(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
576 pairs
577 .iter()
578 .map(|(k, v)| (k.to_string(), v.to_string()))
579 .collect()
580 }
581
582 #[test]
583 fn resolve_query_inline_no_params_returns_query_as_is() {
584 let exp = make_export_direct(Some("SELECT id FROM orders"), None);
585 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
586 assert_eq!(q, "SELECT id FROM orders");
587 }
588
589 #[test]
590 fn resolve_query_inline_with_params_substitutes_vars() {
591 let exp = make_export_direct(Some("SELECT ${col} FROM ${table}"), None);
592 let p = params(&[("col", "id"), ("table", "orders")]);
593 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
594 assert_eq!(q, "SELECT id FROM orders");
595 }
596
597 #[test]
598 fn resolve_query_inline_params_empty_map_is_noop() {
599 let exp = make_export_direct(Some("SELECT 1"), None);
600 let p = params(&[]);
601 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
602 assert_eq!(q, "SELECT 1");
603 }
604
605 #[test]
606 fn resolve_query_inline_missing_var_returns_error() {
607 unsafe { std::env::remove_var("UNSET_RIVET_TEST_VAR") };
609 let exp = make_export_direct(Some("SELECT ${UNSET_RIVET_TEST_VAR}"), None);
610 let p = params(&[]);
611 let result = exp.resolve_query(Path::new("/tmp"), Some(&p));
612 assert!(result.is_err());
613 let msg = format!("{:#}", result.unwrap_err());
614 assert!(
615 msg.contains("UNSET_RIVET_TEST_VAR") || msg.contains("not set"),
616 "got: {msg}"
617 );
618 }
619
620 #[test]
621 fn resolve_query_file_reads_content() {
622 let dir = tempfile::TempDir::new().unwrap();
623 let sql_path = dir.path().join("query.sql");
624 std::fs::write(&sql_path, "SELECT * FROM customers").unwrap();
625 let exp = make_export_direct(None, Some("query.sql"));
626 let q = exp.resolve_query(dir.path(), None).unwrap();
627 assert_eq!(q, "SELECT * FROM customers");
628 }
629
630 #[test]
631 fn resolve_query_file_with_params_substitutes() {
632 let dir = tempfile::TempDir::new().unwrap();
633 let sql_path = dir.path().join("q.sql");
634 std::fs::write(&sql_path, "SELECT ${col} FROM ${tbl}").unwrap();
635 let exp = make_export_direct(None, Some("q.sql"));
636 let p = params(&[("col", "name"), ("tbl", "users")]);
637 let q = exp.resolve_query(dir.path(), Some(&p)).unwrap();
638 assert_eq!(q, "SELECT name FROM users");
639 }
640
641 #[test]
644 fn resolve_query_table_shortcut_qualified() {
645 let mut exp = make_export_direct(None, None);
646 exp.table = Some("public.users".into());
647 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
648 assert_eq!(q, "SELECT * FROM public.users");
649 }
650
651 #[test]
652 fn resolve_query_table_shortcut_unqualified() {
653 let mut exp = make_export_direct(None, None);
654 exp.table = Some("orders".into());
655 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
656 assert_eq!(q, "SELECT * FROM orders");
657 }
658
659 #[test]
660 fn resolve_query_table_shortcut_rejects_three_part_name() {
661 let mut exp = make_export_direct(None, None);
662 exp.table = Some("db.public.users".into());
663 let err = exp.resolve_query(Path::new("/tmp"), None).unwrap_err();
664 let msg = format!("{err:#}");
665 assert!(msg.contains("<schema>.<name>"), "got: {msg}");
666 }
667
668 #[test]
669 fn resolve_query_table_shortcut_rejects_sql_injection() {
670 for bad in [
671 "users; DROP TABLE x",
672 "users--",
673 "users'",
674 "users\"",
675 "public.\"My Table\"",
676 "0starts_with_digit",
677 "",
678 ".trailing",
679 "leading.",
680 "two..dots",
681 ] {
682 let mut exp = make_export_direct(None, None);
683 exp.table = Some(bad.into());
684 assert!(
685 exp.resolve_query(Path::new("/tmp"), None).is_err(),
686 "should reject `table:` value '{bad}'",
687 );
688 }
689 }
690
691 #[test]
692 fn resolve_query_table_shortcut_takes_precedence_over_query() {
693 let mut exp = make_export_direct(Some("SELECT id FROM x"), None);
694 exp.table = Some("public.y".into());
695 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
696 assert_eq!(q, "SELECT * FROM public.y");
697 }
698
699 #[test]
700 fn resolve_query_file_missing_returns_error() {
701 let dir = tempfile::TempDir::new().unwrap();
702 let exp = make_export_direct(None, Some("nonexistent.sql"));
703 let result = exp.resolve_query(dir.path(), None);
704 assert!(result.is_err());
705 let msg = format!("{:#}", result.unwrap_err());
706 assert!(
707 msg.contains("nonexistent.sql") || msg.contains("No such file"),
708 "got: {msg}"
709 );
710 }
711
712 #[test]
713 fn resolve_query_both_set_returns_error() {
714 let mut exp = make_export_direct(Some("SELECT 1"), None);
715 exp.query_file = Some("file.sql".into());
716 let result = exp.resolve_query(Path::new("/tmp"), None);
717 assert!(result.is_err());
718 let msg = format!("{:#}", result.unwrap_err());
719 assert!(
720 msg.contains("not both") || msg.contains("query_file"),
721 "got: {msg}"
722 );
723 }
724
725 #[test]
726 fn resolve_query_neither_set_returns_error() {
727 let exp = make_export_direct(None, None);
728 let result = exp.resolve_query(Path::new("/tmp"), None);
729 assert!(result.is_err());
730 let msg = format!("{:#}", result.unwrap_err());
731 assert!(
732 msg.contains("query") || msg.contains("query_file"),
733 "got: {msg}"
734 );
735 }
736
737 #[test]
740 fn resolve_query_file_dotdot_is_rejected() {
741 let dir = tempfile::TempDir::new().unwrap();
742 let exp = make_export_direct(None, Some("../secret.sql"));
743 let result = exp.resolve_query(dir.path(), None);
744 assert!(result.is_err());
745 let msg = format!("{:#}", result.unwrap_err());
746 assert!(
747 msg.contains("..") || msg.contains("traversal"),
748 "got: {msg}"
749 );
750 }
751
752 #[test]
753 fn resolve_query_file_nested_dotdot_is_rejected() {
754 let dir = tempfile::TempDir::new().unwrap();
755 let exp = make_export_direct(None, Some("subdir/../../etc/passwd"));
756 let result = exp.resolve_query(dir.path(), None);
757 assert!(result.is_err());
758 let msg = format!("{:#}", result.unwrap_err());
759 assert!(
760 msg.contains("..") || msg.contains("traversal"),
761 "got: {msg}"
762 );
763 }
764
765 #[test]
766 fn resolve_query_file_absolute_path_is_rejected() {
767 let dir = tempfile::TempDir::new().unwrap();
768 let exp = make_export_direct(None, Some("/etc/passwd"));
769 let result = exp.resolve_query(dir.path(), None);
770 assert!(result.is_err());
771 let msg = format!("{:#}", result.unwrap_err());
772 assert!(
773 msg.contains("relative") || msg.contains("absolute"),
774 "got: {msg}"
775 );
776 }
777
778 #[test]
779 fn resolve_query_file_in_subdir_is_allowed() {
780 let dir = tempfile::TempDir::new().unwrap();
781 let subdir = dir.path().join("queries");
782 std::fs::create_dir(&subdir).unwrap();
783 std::fs::write(subdir.join("orders.sql"), "SELECT * FROM orders").unwrap();
784 let exp = make_export_direct(None, Some("queries/orders.sql"));
785 let q = exp.resolve_query(dir.path(), None).unwrap();
786 assert_eq!(q, "SELECT * FROM orders");
787 }
788}