1use std::path::Path;
7
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10
11use super::IncrementalCursorMode;
12use super::destination::DestinationConfig;
13use super::format::{CompressionProfile, CompressionType, FormatType, ParquetConfig};
14use super::resolve::{parse_file_size, resolve_vars};
15use crate::tuning::TuningConfig;
16
17#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
26#[serde(rename_all = "snake_case")]
27pub enum VerifyMode {
28 #[default]
30 Size,
31 Content,
34}
35
36impl VerifyMode {
37 pub fn requires_content(self) -> bool {
39 matches!(self, VerifyMode::Content)
40 }
41}
42
43#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
44#[serde(rename_all = "snake_case")]
45pub enum SchemaDriftPolicy {
46 #[default]
48 Warn,
49 Continue,
51 Fail,
54}
55#[derive(Debug, Deserialize, JsonSchema, Clone)]
56#[serde(deny_unknown_fields)]
57pub struct ExportConfig {
58 pub name: String,
59 #[serde(default)]
60 pub query: Option<String>,
61 pub query_file: Option<String>,
62 #[serde(default)]
71 pub table: Option<String>,
72 #[serde(default = "default_mode")]
73 pub mode: ExportMode,
74 #[serde(default)]
78 pub cdc: Option<CdcExportConfig>,
79 pub cursor_column: Option<String>,
80 #[serde(default)]
82 pub cursor_fallback_column: Option<String>,
83 #[serde(default)]
85 pub incremental_cursor_mode: IncrementalCursorMode,
86 pub chunk_column: Option<String>,
87 #[serde(default)]
88 pub chunk_dense: bool,
89 #[serde(default = "default_chunk_size")]
90 pub chunk_size: usize,
91 #[serde(default)]
107 pub chunk_size_memory_mb: Option<u64>,
108 pub chunk_count: Option<usize>,
112 pub chunk_by_days: Option<u32>,
113 pub chunk_by_key: Option<String>,
118 #[serde(default = "default_parallel")]
119 pub parallel: usize,
120 pub time_column: Option<String>,
121 #[serde(default = "default_time_column_type")]
122 pub time_column_type: TimeColumnType,
123 pub days_window: Option<u32>,
124
125 #[serde(default)]
157 pub partition_by: Option<String>,
158
159 #[serde(default)]
165 pub partition_granularity: PartitionGranularity,
166 pub format: FormatType,
167 #[serde(default)]
168 pub compression: CompressionType,
169 pub compression_level: Option<u32>,
170 pub compression_profile: Option<CompressionProfile>,
171 #[serde(default)]
172 pub skip_empty: bool,
173 pub destination: DestinationConfig,
174 #[serde(default)]
181 pub verify: VerifyMode,
182 #[serde(default)]
183 pub meta_columns: MetaColumns,
184 #[serde(default)]
185 pub quality: Option<QualityConfig>,
186 pub max_file_size: Option<String>,
191 #[serde(default)]
192 pub chunk_checkpoint: bool,
193 pub chunk_max_attempts: Option<u32>,
194 #[serde(default)]
195 pub tuning: Option<TuningConfig>,
196 #[serde(default)]
198 pub source_group: Option<String>,
199 #[serde(default)]
202 pub reconcile_required: bool,
203
204 #[serde(default)]
219 pub columns: std::collections::HashMap<String, String>,
220
221 #[serde(default)]
234 pub target: Option<String>,
235
236 #[serde(default)]
239 pub on_schema_drift: SchemaDriftPolicy,
240
241 #[serde(default)]
246 pub shape_drift_warn_factor: Option<f64>,
247
248 #[serde(default)]
251 pub parquet: Option<ParquetConfig>,
252}
253
254impl ExportConfig {
255 pub fn effective_compression(&self) -> (CompressionType, Option<u32>) {
264 if let Some(profile) = self.compression_profile {
265 let explicit_codec =
266 (self.compression != CompressionType::default()).then_some(self.compression);
267 if let Some(msg) = super::format::compression_profile_override_warning(
268 profile,
269 explicit_codec,
270 self.compression_level,
271 ) {
272 log::warn!("export '{}': {}", self.name, msg);
273 }
274 profile.to_codec()
275 } else {
276 (self.compression, self.compression_level)
277 }
278 }
279
280 pub fn max_file_size_bytes(&self) -> Option<u64> {
281 self.max_file_size
282 .as_ref()
283 .and_then(|s| parse_file_size(s).ok())
284 }
285
286 pub fn resolve_query(
287 &self,
288 config_dir: &Path,
289 params: Option<&std::collections::HashMap<String, String>>,
290 ) -> crate::error::Result<String> {
291 if let Some(tbl) = &self.table {
294 validate_table_shortcut_ident(&self.name, tbl)?;
295 return Ok(format!("SELECT * FROM {tbl}"));
296 }
297 match (&self.query, &self.query_file) {
298 (Some(q), None) => {
299 if params.is_some() {
300 resolve_vars(q, params)
301 } else {
302 Ok(q.clone())
303 }
304 }
305 (None, Some(file)) => {
306 let file_path = std::path::Path::new(file);
307 if file_path.is_absolute() {
309 anyhow::bail!(
310 "export '{}': query_file must be a relative path: '{}'",
311 self.name,
312 file
313 );
314 }
315 if file_path
316 .components()
317 .any(|c| c == std::path::Component::ParentDir)
318 {
319 anyhow::bail!(
320 "export '{}': query_file path must not contain '..': '{}'",
321 self.name,
322 file
323 );
324 }
325 let joined = config_dir.join(file);
326 if let Ok(canonical) = joined.canonicalize() {
329 let base = config_dir
330 .canonicalize()
331 .unwrap_or_else(|_| config_dir.to_path_buf());
332 if !canonical.starts_with(&base) {
333 anyhow::bail!(
334 "export '{}': query_file '{}' resolves outside the config directory",
335 self.name,
336 file
337 );
338 }
339 }
340 let raw = std::fs::read_to_string(&joined)?;
341 resolve_vars(&raw, params)
342 }
343 (Some(_), Some(_)) => {
344 anyhow::bail!(
345 "export '{}': specify either 'query' or 'query_file', not both",
346 self.name
347 )
348 }
349 (None, None) => {
350 anyhow::bail!(
351 "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
352 self.name
353 )
354 }
355 }
356 }
357}
358
359fn validate_table_shortcut_ident(export_name: &str, raw: &str) -> crate::error::Result<()> {
370 let trimmed = raw.trim();
371 if trimmed.is_empty() {
372 anyhow::bail!("export '{export_name}': 'table' is empty");
373 }
374 let parts: Vec<&str> = trimmed.split('.').collect();
375 if parts.len() > 2 {
376 anyhow::bail!(
377 "export '{export_name}': 'table' must be `<name>` or `<schema>.<name>` (got '{raw}')"
378 );
379 }
380 for part in &parts {
381 if part.is_empty() {
382 anyhow::bail!("export '{export_name}': 'table' has an empty segment in '{raw}'");
383 }
384 let mut chars = part.chars();
385 let first = chars.next().unwrap();
386 if !(first.is_ascii_alphabetic() || first == '_') {
387 anyhow::bail!(
388 "export '{export_name}': 'table' segment '{part}' must start with a letter or underscore (use 'query:' for quoted identifiers)"
389 );
390 }
391 if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
392 anyhow::bail!(
393 "export '{export_name}': 'table' segment '{part}' contains non-identifier characters (use 'query:' for quoted identifiers)"
394 );
395 }
396 }
397 Ok(())
398}
399
400#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
401#[serde(deny_unknown_fields)]
402pub struct QualityConfig {
403 pub row_count_min: Option<usize>,
404 pub row_count_max: Option<usize>,
405 #[serde(default)]
406 pub null_ratio_max: std::collections::HashMap<String, f64>,
407 #[serde(default)]
408 pub unique_columns: Vec<String>,
409 pub unique_max_entries: Option<usize>,
413}
414
415#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
416#[serde(deny_unknown_fields)]
417pub struct MetaColumns {
418 #[serde(default)]
419 pub exported_at: bool,
420 #[serde(default)]
421 pub row_hash: bool,
422}
423
424fn default_mode() -> ExportMode {
425 ExportMode::Full
426}
427
428fn default_chunk_size() -> usize {
429 100_000
430}
431
432fn default_parallel() -> usize {
433 1
434}
435
436fn default_time_column_type() -> TimeColumnType {
437 TimeColumnType::Timestamp
438}
439
440#[derive(Debug, Deserialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
441#[serde(rename_all = "snake_case")]
442pub enum ExportMode {
443 Full,
444 Incremental,
445 Chunked,
446 TimeWindow,
447 Cdc,
451}
452
453#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
457pub struct CdcExportConfig {
458 pub checkpoint: Option<String>,
461 #[serde(default)]
465 pub until_current: bool,
466 pub max_events: Option<usize>,
468 pub rollover: Option<usize>,
471 pub rollover_memory_mb: Option<usize>,
476 pub server_id: Option<u32>,
479 pub slot: Option<String>,
481 pub capture_instance: Option<String>,
484}
485
486#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
487#[serde(rename_all = "lowercase")]
488pub enum TimeColumnType {
489 Timestamp,
490 Unix,
491}
492
493#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
498#[serde(rename_all = "lowercase")]
499pub enum PartitionGranularity {
500 #[default]
502 Day,
503 Month,
505 Year,
507}
508
509#[cfg(test)]
516pub(crate) fn sample_export(name: &str) -> ExportConfig {
517 ExportConfig {
518 name: name.into(),
519 target: None,
520 verify: VerifyMode::Size,
521 query: Some("SELECT 1".into()),
522 query_file: None,
523 table: None,
524 mode: ExportMode::Full,
525 cdc: None,
526 cursor_column: None,
527 cursor_fallback_column: None,
528 incremental_cursor_mode: Default::default(),
529 chunk_column: None,
530 chunk_dense: false,
531 chunk_size: 100_000,
532 chunk_size_memory_mb: None,
533 chunk_count: None,
534 chunk_by_days: None,
535 chunk_by_key: None,
536 parallel: 1,
537 time_column: None,
538 time_column_type: TimeColumnType::Timestamp,
539 days_window: None,
540 partition_by: None,
541 partition_granularity: PartitionGranularity::Day,
542 format: FormatType::Parquet,
543 compression: CompressionType::None,
544 compression_level: None,
545 compression_profile: None,
546 skip_empty: false,
547 destination: crate::config::DestinationConfig {
548 destination_type: crate::config::DestinationType::Local,
549 path: Some("/tmp".into()),
550 ..Default::default()
551 },
552 meta_columns: MetaColumns::default(),
553 quality: None,
554 max_file_size: None,
555 chunk_checkpoint: false,
556 chunk_max_attempts: None,
557 tuning: None,
558 source_group: None,
559 reconcile_required: false,
560 columns: Default::default(),
561 on_schema_drift: Default::default(),
562 shape_drift_warn_factor: None,
563 parquet: None,
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570
571 fn make_export_yaml(name: &str, extra: &str) -> ExportConfig {
574 let yaml = format!(
575 "name: {name}\nquery: \"SELECT 1\"\nformat: parquet\ndestination:\n type: local\n path: /tmp\n{extra}"
576 );
577 serde_yaml_ng::from_str(&yaml).expect("parse ExportConfig")
578 }
579
580 #[test]
581 fn max_file_size_bytes_none_when_unset() {
582 let exp = make_export_yaml("no_limit", "");
583 assert!(exp.max_file_size_bytes().is_none());
584 }
585
586 #[test]
587 fn max_file_size_bytes_parses_mb() {
588 let exp = make_export_yaml("sized", "max_file_size: \"128MB\"\n");
589 assert_eq!(exp.max_file_size_bytes(), Some(128 * 1024 * 1024));
590 }
591
592 #[test]
593 fn max_file_size_bytes_parses_gb() {
594 let exp = make_export_yaml("sized_gb", "max_file_size: \"2GB\"\n");
595 assert_eq!(exp.max_file_size_bytes(), Some(2 * 1024 * 1024 * 1024));
596 }
597
598 #[test]
599 fn max_file_size_bytes_returns_none_on_invalid() {
600 let exp = make_export_yaml("bad_size", "max_file_size: \"notanumber\"\n");
601 assert!(exp.max_file_size_bytes().is_none());
602 }
603
604 fn make_export_direct(query: Option<&str>, query_file: Option<&str>) -> ExportConfig {
611 ExportConfig {
612 query: query.map(|s| s.to_string()),
613 query_file: query_file.map(|s| s.to_string()),
614 ..sample_export("test")
615 }
616 }
617
618 fn params(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
619 pairs
620 .iter()
621 .map(|(k, v)| (k.to_string(), v.to_string()))
622 .collect()
623 }
624
625 #[test]
626 fn resolve_query_inline_no_params_returns_query_as_is() {
627 let exp = make_export_direct(Some("SELECT id FROM orders"), None);
628 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
629 assert_eq!(q, "SELECT id FROM orders");
630 }
631
632 #[test]
633 fn resolve_query_inline_with_params_substitutes_vars() {
634 let exp = make_export_direct(Some("SELECT ${col} FROM ${table}"), None);
635 let p = params(&[("col", "id"), ("table", "orders")]);
636 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
637 assert_eq!(q, "SELECT id FROM orders");
638 }
639
640 #[test]
641 fn resolve_query_inline_params_empty_map_is_noop() {
642 let exp = make_export_direct(Some("SELECT 1"), None);
643 let p = params(&[]);
644 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
645 assert_eq!(q, "SELECT 1");
646 }
647
648 #[test]
649 fn resolve_query_inline_missing_var_returns_error() {
650 unsafe { std::env::remove_var("UNSET_RIVET_TEST_VAR") };
652 let exp = make_export_direct(Some("SELECT ${UNSET_RIVET_TEST_VAR}"), None);
653 let p = params(&[]);
654 let result = exp.resolve_query(Path::new("/tmp"), Some(&p));
655 assert!(result.is_err());
656 let msg = format!("{:#}", result.unwrap_err());
657 assert!(
658 msg.contains("UNSET_RIVET_TEST_VAR") || msg.contains("not set"),
659 "got: {msg}"
660 );
661 }
662
663 #[test]
664 fn resolve_query_file_reads_content() {
665 let dir = tempfile::TempDir::new().unwrap();
666 let sql_path = dir.path().join("query.sql");
667 std::fs::write(&sql_path, "SELECT * FROM customers").unwrap();
668 let exp = make_export_direct(None, Some("query.sql"));
669 let q = exp.resolve_query(dir.path(), None).unwrap();
670 assert_eq!(q, "SELECT * FROM customers");
671 }
672
673 #[test]
674 fn resolve_query_file_with_params_substitutes() {
675 let dir = tempfile::TempDir::new().unwrap();
676 let sql_path = dir.path().join("q.sql");
677 std::fs::write(&sql_path, "SELECT ${col} FROM ${tbl}").unwrap();
678 let exp = make_export_direct(None, Some("q.sql"));
679 let p = params(&[("col", "name"), ("tbl", "users")]);
680 let q = exp.resolve_query(dir.path(), Some(&p)).unwrap();
681 assert_eq!(q, "SELECT name FROM users");
682 }
683
684 #[test]
687 fn resolve_query_table_shortcut_qualified() {
688 let mut exp = make_export_direct(None, None);
689 exp.table = Some("public.users".into());
690 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
691 assert_eq!(q, "SELECT * FROM public.users");
692 }
693
694 #[test]
695 fn resolve_query_table_shortcut_unqualified() {
696 let mut exp = make_export_direct(None, None);
697 exp.table = Some("orders".into());
698 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
699 assert_eq!(q, "SELECT * FROM orders");
700 }
701
702 #[test]
703 fn resolve_query_table_shortcut_rejects_three_part_name() {
704 let mut exp = make_export_direct(None, None);
705 exp.table = Some("db.public.users".into());
706 let err = exp.resolve_query(Path::new("/tmp"), None).unwrap_err();
707 let msg = format!("{err:#}");
708 assert!(msg.contains("<schema>.<name>"), "got: {msg}");
709 }
710
711 #[test]
712 fn resolve_query_table_shortcut_rejects_sql_injection() {
713 for bad in [
714 "users; DROP TABLE x",
715 "users--",
716 "users'",
717 "users\"",
718 "public.\"My Table\"",
719 "0starts_with_digit",
720 "",
721 ".trailing",
722 "leading.",
723 "two..dots",
724 ] {
725 let mut exp = make_export_direct(None, None);
726 exp.table = Some(bad.into());
727 assert!(
728 exp.resolve_query(Path::new("/tmp"), None).is_err(),
729 "should reject `table:` value '{bad}'",
730 );
731 }
732 }
733
734 #[test]
735 fn resolve_query_table_shortcut_takes_precedence_over_query() {
736 let mut exp = make_export_direct(Some("SELECT id FROM x"), None);
737 exp.table = Some("public.y".into());
738 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
739 assert_eq!(q, "SELECT * FROM public.y");
740 }
741
742 #[test]
743 fn resolve_query_file_missing_returns_error() {
744 let dir = tempfile::TempDir::new().unwrap();
745 let exp = make_export_direct(None, Some("nonexistent.sql"));
746 let result = exp.resolve_query(dir.path(), None);
747 assert!(result.is_err());
748 let msg = format!("{:#}", result.unwrap_err());
749 assert!(
750 msg.contains("nonexistent.sql") || msg.contains("No such file"),
751 "got: {msg}"
752 );
753 }
754
755 #[test]
756 fn resolve_query_both_set_returns_error() {
757 let mut exp = make_export_direct(Some("SELECT 1"), None);
758 exp.query_file = Some("file.sql".into());
759 let result = exp.resolve_query(Path::new("/tmp"), None);
760 assert!(result.is_err());
761 let msg = format!("{:#}", result.unwrap_err());
762 assert!(
763 msg.contains("not both") || msg.contains("query_file"),
764 "got: {msg}"
765 );
766 }
767
768 #[test]
769 fn resolve_query_neither_set_returns_error() {
770 let exp = make_export_direct(None, None);
771 let result = exp.resolve_query(Path::new("/tmp"), None);
772 assert!(result.is_err());
773 let msg = format!("{:#}", result.unwrap_err());
774 assert!(
775 msg.contains("query") || msg.contains("query_file"),
776 "got: {msg}"
777 );
778 }
779
780 #[test]
783 fn resolve_query_file_dotdot_is_rejected() {
784 let dir = tempfile::TempDir::new().unwrap();
785 let exp = make_export_direct(None, Some("../secret.sql"));
786 let result = exp.resolve_query(dir.path(), None);
787 assert!(result.is_err());
788 let msg = format!("{:#}", result.unwrap_err());
789 assert!(
790 msg.contains("..") || msg.contains("traversal"),
791 "got: {msg}"
792 );
793 }
794
795 #[test]
796 fn resolve_query_file_nested_dotdot_is_rejected() {
797 let dir = tempfile::TempDir::new().unwrap();
798 let exp = make_export_direct(None, Some("subdir/../../etc/passwd"));
799 let result = exp.resolve_query(dir.path(), None);
800 assert!(result.is_err());
801 let msg = format!("{:#}", result.unwrap_err());
802 assert!(
803 msg.contains("..") || msg.contains("traversal"),
804 "got: {msg}"
805 );
806 }
807
808 #[test]
809 fn resolve_query_file_absolute_path_is_rejected() {
810 let dir = tempfile::TempDir::new().unwrap();
811 let exp = make_export_direct(None, Some("/etc/passwd"));
812 let result = exp.resolve_query(dir.path(), None);
813 assert!(result.is_err());
814 let msg = format!("{:#}", result.unwrap_err());
815 assert!(
816 msg.contains("relative") || msg.contains("absolute"),
817 "got: {msg}"
818 );
819 }
820
821 #[test]
822 fn resolve_query_file_in_subdir_is_allowed() {
823 let dir = tempfile::TempDir::new().unwrap();
824 let subdir = dir.path().join("queries");
825 std::fs::create_dir(&subdir).unwrap();
826 std::fs::write(subdir.join("orders.sql"), "SELECT * FROM orders").unwrap();
827 let exp = make_export_direct(None, Some("queries/orders.sql"));
828 let q = exp.resolve_query(dir.path(), None).unwrap();
829 assert_eq!(q, "SELECT * FROM orders");
830 }
831}