1use std::path::Path;
7
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10
11use super::IncrementalCursorMode;
12use super::destination::DestinationConfig;
13use super::format::{CompressionProfile, CompressionType, FormatType, ParquetConfig};
14use super::resolve::{parse_file_size, resolve_vars};
15use crate::tuning::TuningConfig;
16
17#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
26#[serde(rename_all = "snake_case")]
27pub enum VerifyMode {
28 #[default]
30 Size,
31 Content,
34}
35
36impl VerifyMode {
37 pub fn requires_content(self) -> bool {
39 matches!(self, VerifyMode::Content)
40 }
41}
42
43#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
44#[serde(rename_all = "snake_case")]
45pub enum SchemaDriftPolicy {
46 #[default]
48 Warn,
49 Continue,
51 Fail,
54}
55#[derive(Debug, Deserialize, JsonSchema, Clone)]
56#[serde(deny_unknown_fields)]
57pub struct ExportConfig {
58 pub name: String,
59 #[serde(default)]
60 pub query: Option<String>,
61 pub query_file: Option<String>,
62 #[serde(default)]
71 pub table: Option<String>,
72 #[serde(default = "default_mode")]
73 pub mode: ExportMode,
74 #[serde(default)]
78 pub cdc: Option<CdcExportConfig>,
79 pub cursor_column: Option<String>,
80 #[serde(default)]
82 pub cursor_fallback_column: Option<String>,
83 #[serde(default)]
85 pub incremental_cursor_mode: IncrementalCursorMode,
86 pub chunk_column: Option<String>,
87 #[serde(default)]
88 pub chunk_dense: bool,
89 #[serde(default = "default_chunk_size")]
90 pub chunk_size: usize,
91 #[serde(default)]
107 pub chunk_size_memory_mb: Option<u64>,
108 pub chunk_count: Option<usize>,
112 pub chunk_by_days: Option<u32>,
113 pub chunk_by_key: Option<String>,
118 #[serde(default = "default_parallel")]
119 pub parallel: usize,
120
121 #[serde(default, skip_serializing_if = "Option::is_none")]
127 pub wave: Option<u32>,
128
129 #[serde(default, skip_serializing_if = "Option::is_none")]
137 pub parallel_safe: Option<bool>,
138 pub time_column: Option<String>,
139 #[serde(default = "default_time_column_type")]
140 pub time_column_type: TimeColumnType,
141 pub days_window: Option<u32>,
142
143 #[serde(default)]
175 pub partition_by: Option<String>,
176
177 #[serde(default)]
183 pub partition_granularity: PartitionGranularity,
184 pub format: FormatType,
185 #[serde(default)]
186 pub compression: CompressionType,
187 pub compression_level: Option<u32>,
188 pub compression_profile: Option<CompressionProfile>,
189 #[serde(default)]
190 pub skip_empty: bool,
191 pub destination: DestinationConfig,
192 #[serde(default)]
199 pub verify: VerifyMode,
200 #[serde(default)]
201 pub meta_columns: MetaColumns,
202 #[serde(default)]
203 pub quality: Option<QualityConfig>,
204 pub max_file_size: Option<String>,
209 #[serde(default)]
210 pub chunk_checkpoint: bool,
211 pub chunk_max_attempts: Option<u32>,
212 #[serde(default)]
213 pub tuning: Option<TuningConfig>,
214 #[serde(default)]
216 pub source_group: Option<String>,
217 #[serde(default)]
220 pub reconcile_required: bool,
221
222 #[serde(default)]
237 pub columns: std::collections::HashMap<String, String>,
238
239 #[serde(default)]
252 pub target: Option<String>,
253
254 #[serde(default)]
257 pub on_schema_drift: SchemaDriftPolicy,
258
259 #[serde(default)]
264 pub shape_drift_warn_factor: Option<f64>,
265
266 #[serde(default)]
269 pub parquet: Option<ParquetConfig>,
270}
271
272impl ExportConfig {
273 pub fn effective_compression(&self) -> (CompressionType, Option<u32>) {
282 if let Some(profile) = self.compression_profile {
283 let explicit_codec =
284 (self.compression != CompressionType::default()).then_some(self.compression);
285 if let Some(msg) = super::format::compression_profile_override_warning(
286 profile,
287 explicit_codec,
288 self.compression_level,
289 ) {
290 log::warn!("export '{}': {}", self.name, msg);
291 }
292 profile.to_codec()
293 } else {
294 (self.compression, self.compression_level)
295 }
296 }
297
298 pub fn max_file_size_bytes(&self) -> Option<u64> {
299 self.max_file_size
300 .as_ref()
301 .and_then(|s| parse_file_size(s).ok())
302 }
303
304 pub fn resolve_query(
305 &self,
306 config_dir: &Path,
307 params: Option<&std::collections::HashMap<String, String>>,
308 ) -> crate::error::Result<String> {
309 if let Some(tbl) = &self.table {
312 validate_table_shortcut_ident(&self.name, tbl)?;
313 return Ok(format!("SELECT * FROM {tbl}"));
314 }
315 match (&self.query, &self.query_file) {
316 (Some(q), None) => {
317 if params.is_some() {
318 resolve_vars(q, params)
319 } else {
320 Ok(q.clone())
321 }
322 }
323 (None, Some(file)) => {
324 let file_path = std::path::Path::new(file);
325 if file_path.is_absolute() {
327 anyhow::bail!(
328 "export '{}': query_file must be a relative path: '{}'",
329 self.name,
330 file
331 );
332 }
333 if file_path
334 .components()
335 .any(|c| c == std::path::Component::ParentDir)
336 {
337 anyhow::bail!(
338 "export '{}': query_file path must not contain '..': '{}'",
339 self.name,
340 file
341 );
342 }
343 let joined = config_dir.join(file);
344 if let Ok(canonical) = joined.canonicalize() {
347 let base = config_dir
348 .canonicalize()
349 .unwrap_or_else(|_| config_dir.to_path_buf());
350 if !canonical.starts_with(&base) {
351 anyhow::bail!(
352 "export '{}': query_file '{}' resolves outside the config directory",
353 self.name,
354 file
355 );
356 }
357 }
358 let raw = std::fs::read_to_string(&joined)?;
359 resolve_vars(&raw, params)
360 }
361 (Some(_), Some(_)) => {
362 anyhow::bail!(
363 "export '{}': specify either 'query' or 'query_file', not both",
364 self.name
365 )
366 }
367 (None, None) => {
368 anyhow::bail!(
369 "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
370 self.name
371 )
372 }
373 }
374 }
375}
376
377fn validate_table_shortcut_ident(export_name: &str, raw: &str) -> crate::error::Result<()> {
388 let trimmed = raw.trim();
389 if trimmed.is_empty() {
390 anyhow::bail!("export '{export_name}': 'table' is empty");
391 }
392 let parts: Vec<&str> = trimmed.split('.').collect();
393 if parts.len() > 2 {
394 anyhow::bail!(
395 "export '{export_name}': 'table' must be `<name>` or `<schema>.<name>` (got '{raw}')"
396 );
397 }
398 for part in &parts {
399 if part.is_empty() {
400 anyhow::bail!("export '{export_name}': 'table' has an empty segment in '{raw}'");
401 }
402 let mut chars = part.chars();
403 let first = chars.next().unwrap();
404 if !(first.is_ascii_alphabetic() || first == '_') {
405 anyhow::bail!(
406 "export '{export_name}': 'table' segment '{part}' must start with a letter or underscore (use 'query:' for quoted identifiers)"
407 );
408 }
409 if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
410 anyhow::bail!(
411 "export '{export_name}': 'table' segment '{part}' contains non-identifier characters (use 'query:' for quoted identifiers)"
412 );
413 }
414 }
415 Ok(())
416}
417
418#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
419#[serde(deny_unknown_fields)]
420pub struct QualityConfig {
421 pub row_count_min: Option<usize>,
422 pub row_count_max: Option<usize>,
423 #[serde(default)]
424 pub null_ratio_max: std::collections::HashMap<String, f64>,
425 #[serde(default)]
426 pub unique_columns: Vec<String>,
427 pub unique_max_entries: Option<usize>,
431}
432
433#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
434#[serde(deny_unknown_fields)]
435pub struct MetaColumns {
436 #[serde(default)]
437 pub exported_at: bool,
438 #[serde(default)]
439 pub row_hash: bool,
440}
441
442fn default_mode() -> ExportMode {
443 ExportMode::Full
444}
445
446fn default_chunk_size() -> usize {
447 100_000
448}
449
450fn default_parallel() -> usize {
451 1
452}
453
454fn default_time_column_type() -> TimeColumnType {
455 TimeColumnType::Timestamp
456}
457
458#[derive(Debug, Deserialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
459#[serde(rename_all = "snake_case")]
460pub enum ExportMode {
461 Full,
462 Incremental,
463 Chunked,
464 TimeWindow,
465 Cdc,
469}
470
471#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
475pub struct CdcExportConfig {
476 pub checkpoint: Option<String>,
479 #[serde(default)]
483 pub until_current: bool,
484 pub max_events: Option<usize>,
486 pub rollover: Option<usize>,
489 pub rollover_memory_mb: Option<usize>,
494 pub server_id: Option<u32>,
497 pub slot: Option<String>,
499 pub capture_instance: Option<String>,
502}
503
504#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
505#[serde(rename_all = "lowercase")]
506pub enum TimeColumnType {
507 Timestamp,
508 Unix,
509}
510
511#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
516#[serde(rename_all = "lowercase")]
517pub enum PartitionGranularity {
518 #[default]
520 Day,
521 Month,
523 Year,
525}
526
527#[cfg(test)]
534pub(crate) fn sample_export(name: &str) -> ExportConfig {
535 ExportConfig {
536 name: name.into(),
537 target: None,
538 verify: VerifyMode::Size,
539 query: Some("SELECT 1".into()),
540 query_file: None,
541 table: None,
542 mode: ExportMode::Full,
543 cdc: None,
544 cursor_column: None,
545 cursor_fallback_column: None,
546 incremental_cursor_mode: Default::default(),
547 chunk_column: None,
548 chunk_dense: false,
549 chunk_size: 100_000,
550 chunk_size_memory_mb: None,
551 chunk_count: None,
552 chunk_by_days: None,
553 chunk_by_key: None,
554 parallel: 1,
555 wave: None,
556 parallel_safe: None,
557 time_column: None,
558 time_column_type: TimeColumnType::Timestamp,
559 days_window: None,
560 partition_by: None,
561 partition_granularity: PartitionGranularity::Day,
562 format: FormatType::Parquet,
563 compression: CompressionType::None,
564 compression_level: None,
565 compression_profile: None,
566 skip_empty: false,
567 destination: crate::config::DestinationConfig {
568 destination_type: crate::config::DestinationType::Local,
569 path: Some("/tmp".into()),
570 ..Default::default()
571 },
572 meta_columns: MetaColumns::default(),
573 quality: None,
574 max_file_size: None,
575 chunk_checkpoint: false,
576 chunk_max_attempts: None,
577 tuning: None,
578 source_group: None,
579 reconcile_required: false,
580 columns: Default::default(),
581 on_schema_drift: Default::default(),
582 shape_drift_warn_factor: None,
583 parquet: None,
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590
591 fn make_export_yaml(name: &str, extra: &str) -> ExportConfig {
594 let yaml = format!(
595 "name: {name}\nquery: \"SELECT 1\"\nformat: parquet\ndestination:\n type: local\n path: /tmp\n{extra}"
596 );
597 serde_yaml_ng::from_str(&yaml).expect("parse ExportConfig")
598 }
599
600 #[test]
601 fn max_file_size_bytes_none_when_unset() {
602 let exp = make_export_yaml("no_limit", "");
603 assert!(exp.max_file_size_bytes().is_none());
604 }
605
606 #[test]
607 fn max_file_size_bytes_parses_mb() {
608 let exp = make_export_yaml("sized", "max_file_size: \"128MB\"\n");
609 assert_eq!(exp.max_file_size_bytes(), Some(128 * 1024 * 1024));
610 }
611
612 #[test]
613 fn max_file_size_bytes_parses_gb() {
614 let exp = make_export_yaml("sized_gb", "max_file_size: \"2GB\"\n");
615 assert_eq!(exp.max_file_size_bytes(), Some(2 * 1024 * 1024 * 1024));
616 }
617
618 #[test]
619 fn max_file_size_bytes_returns_none_on_invalid() {
620 let exp = make_export_yaml("bad_size", "max_file_size: \"notanumber\"\n");
621 assert!(exp.max_file_size_bytes().is_none());
622 }
623
624 fn make_export_direct(query: Option<&str>, query_file: Option<&str>) -> ExportConfig {
631 ExportConfig {
632 query: query.map(|s| s.to_string()),
633 query_file: query_file.map(|s| s.to_string()),
634 ..sample_export("test")
635 }
636 }
637
638 fn params(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
639 pairs
640 .iter()
641 .map(|(k, v)| (k.to_string(), v.to_string()))
642 .collect()
643 }
644
645 #[test]
646 fn resolve_query_inline_no_params_returns_query_as_is() {
647 let exp = make_export_direct(Some("SELECT id FROM orders"), None);
648 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
649 assert_eq!(q, "SELECT id FROM orders");
650 }
651
652 #[test]
653 fn resolve_query_inline_with_params_substitutes_vars() {
654 let exp = make_export_direct(Some("SELECT ${col} FROM ${table}"), None);
655 let p = params(&[("col", "id"), ("table", "orders")]);
656 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
657 assert_eq!(q, "SELECT id FROM orders");
658 }
659
660 #[test]
661 fn resolve_query_inline_params_empty_map_is_noop() {
662 let exp = make_export_direct(Some("SELECT 1"), None);
663 let p = params(&[]);
664 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
665 assert_eq!(q, "SELECT 1");
666 }
667
668 #[test]
669 fn resolve_query_inline_missing_var_returns_error() {
670 unsafe { std::env::remove_var("UNSET_RIVET_TEST_VAR") };
672 let exp = make_export_direct(Some("SELECT ${UNSET_RIVET_TEST_VAR}"), None);
673 let p = params(&[]);
674 let result = exp.resolve_query(Path::new("/tmp"), Some(&p));
675 assert!(result.is_err());
676 let msg = format!("{:#}", result.unwrap_err());
677 assert!(
678 msg.contains("UNSET_RIVET_TEST_VAR") || msg.contains("not set"),
679 "got: {msg}"
680 );
681 }
682
683 #[test]
684 fn resolve_query_file_reads_content() {
685 let dir = tempfile::TempDir::new().unwrap();
686 let sql_path = dir.path().join("query.sql");
687 std::fs::write(&sql_path, "SELECT * FROM customers").unwrap();
688 let exp = make_export_direct(None, Some("query.sql"));
689 let q = exp.resolve_query(dir.path(), None).unwrap();
690 assert_eq!(q, "SELECT * FROM customers");
691 }
692
693 #[test]
694 fn resolve_query_file_with_params_substitutes() {
695 let dir = tempfile::TempDir::new().unwrap();
696 let sql_path = dir.path().join("q.sql");
697 std::fs::write(&sql_path, "SELECT ${col} FROM ${tbl}").unwrap();
698 let exp = make_export_direct(None, Some("q.sql"));
699 let p = params(&[("col", "name"), ("tbl", "users")]);
700 let q = exp.resolve_query(dir.path(), Some(&p)).unwrap();
701 assert_eq!(q, "SELECT name FROM users");
702 }
703
704 #[test]
707 fn resolve_query_table_shortcut_qualified() {
708 let mut exp = make_export_direct(None, None);
709 exp.table = Some("public.users".into());
710 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
711 assert_eq!(q, "SELECT * FROM public.users");
712 }
713
714 #[test]
715 fn resolve_query_table_shortcut_unqualified() {
716 let mut exp = make_export_direct(None, None);
717 exp.table = Some("orders".into());
718 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
719 assert_eq!(q, "SELECT * FROM orders");
720 }
721
722 #[test]
723 fn resolve_query_table_shortcut_rejects_three_part_name() {
724 let mut exp = make_export_direct(None, None);
725 exp.table = Some("db.public.users".into());
726 let err = exp.resolve_query(Path::new("/tmp"), None).unwrap_err();
727 let msg = format!("{err:#}");
728 assert!(msg.contains("<schema>.<name>"), "got: {msg}");
729 }
730
731 #[test]
732 fn resolve_query_table_shortcut_rejects_sql_injection() {
733 for bad in [
734 "users; DROP TABLE x",
735 "users--",
736 "users'",
737 "users\"",
738 "public.\"My Table\"",
739 "0starts_with_digit",
740 "",
741 ".trailing",
742 "leading.",
743 "two..dots",
744 ] {
745 let mut exp = make_export_direct(None, None);
746 exp.table = Some(bad.into());
747 assert!(
748 exp.resolve_query(Path::new("/tmp"), None).is_err(),
749 "should reject `table:` value '{bad}'",
750 );
751 }
752 }
753
754 #[test]
755 fn resolve_query_table_shortcut_takes_precedence_over_query() {
756 let mut exp = make_export_direct(Some("SELECT id FROM x"), None);
757 exp.table = Some("public.y".into());
758 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
759 assert_eq!(q, "SELECT * FROM public.y");
760 }
761
762 #[test]
763 fn resolve_query_file_missing_returns_error() {
764 let dir = tempfile::TempDir::new().unwrap();
765 let exp = make_export_direct(None, Some("nonexistent.sql"));
766 let result = exp.resolve_query(dir.path(), None);
767 assert!(result.is_err());
768 let msg = format!("{:#}", result.unwrap_err());
769 assert!(
770 msg.contains("nonexistent.sql") || msg.contains("No such file"),
771 "got: {msg}"
772 );
773 }
774
775 #[test]
776 fn resolve_query_both_set_returns_error() {
777 let mut exp = make_export_direct(Some("SELECT 1"), None);
778 exp.query_file = Some("file.sql".into());
779 let result = exp.resolve_query(Path::new("/tmp"), None);
780 assert!(result.is_err());
781 let msg = format!("{:#}", result.unwrap_err());
782 assert!(
783 msg.contains("not both") || msg.contains("query_file"),
784 "got: {msg}"
785 );
786 }
787
788 #[test]
789 fn resolve_query_neither_set_returns_error() {
790 let exp = make_export_direct(None, None);
791 let result = exp.resolve_query(Path::new("/tmp"), None);
792 assert!(result.is_err());
793 let msg = format!("{:#}", result.unwrap_err());
794 assert!(
795 msg.contains("query") || msg.contains("query_file"),
796 "got: {msg}"
797 );
798 }
799
800 #[test]
803 fn resolve_query_file_dotdot_is_rejected() {
804 let dir = tempfile::TempDir::new().unwrap();
805 let exp = make_export_direct(None, Some("../secret.sql"));
806 let result = exp.resolve_query(dir.path(), None);
807 assert!(result.is_err());
808 let msg = format!("{:#}", result.unwrap_err());
809 assert!(
810 msg.contains("..") || msg.contains("traversal"),
811 "got: {msg}"
812 );
813 }
814
815 #[test]
816 fn resolve_query_file_nested_dotdot_is_rejected() {
817 let dir = tempfile::TempDir::new().unwrap();
818 let exp = make_export_direct(None, Some("subdir/../../etc/passwd"));
819 let result = exp.resolve_query(dir.path(), None);
820 assert!(result.is_err());
821 let msg = format!("{:#}", result.unwrap_err());
822 assert!(
823 msg.contains("..") || msg.contains("traversal"),
824 "got: {msg}"
825 );
826 }
827
828 #[test]
829 fn resolve_query_file_absolute_path_is_rejected() {
830 let dir = tempfile::TempDir::new().unwrap();
831 let exp = make_export_direct(None, Some("/etc/passwd"));
832 let result = exp.resolve_query(dir.path(), None);
833 assert!(result.is_err());
834 let msg = format!("{:#}", result.unwrap_err());
835 assert!(
836 msg.contains("relative") || msg.contains("absolute"),
837 "got: {msg}"
838 );
839 }
840
841 #[test]
842 fn resolve_query_file_in_subdir_is_allowed() {
843 let dir = tempfile::TempDir::new().unwrap();
844 let subdir = dir.path().join("queries");
845 std::fs::create_dir(&subdir).unwrap();
846 std::fs::write(subdir.join("orders.sql"), "SELECT * FROM orders").unwrap();
847 let exp = make_export_direct(None, Some("queries/orders.sql"));
848 let q = exp.resolve_query(dir.path(), None).unwrap();
849 assert_eq!(q, "SELECT * FROM orders");
850 }
851}