1use std::path::Path;
7
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10
11use super::IncrementalCursorMode;
12use super::destination::DestinationConfig;
13use super::format::{CompressionProfile, CompressionType, FormatType, ParquetConfig};
14use super::resolve::{parse_file_size, resolve_vars};
15use crate::tuning::TuningConfig;
16
17#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
26#[serde(rename_all = "snake_case")]
27pub enum VerifyMode {
28 #[default]
30 Size,
31 Content,
34}
35
36impl VerifyMode {
37 pub fn requires_content(self) -> bool {
39 matches!(self, VerifyMode::Content)
40 }
41}
42
43#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
44#[serde(rename_all = "snake_case")]
45pub enum SchemaDriftPolicy {
46 #[default]
48 Warn,
49 Continue,
51 Fail,
54}
55#[derive(Debug, Deserialize, JsonSchema, Clone)]
56#[serde(deny_unknown_fields)]
57pub struct ExportConfig {
58 pub name: String,
59 #[serde(default)]
60 pub query: Option<String>,
61 pub query_file: Option<String>,
62 #[serde(default)]
71 pub table: Option<String>,
72 #[serde(default = "default_mode")]
73 pub mode: ExportMode,
74 pub cursor_column: Option<String>,
75 #[serde(default)]
77 pub cursor_fallback_column: Option<String>,
78 #[serde(default)]
80 pub incremental_cursor_mode: IncrementalCursorMode,
81 pub chunk_column: Option<String>,
82 #[serde(default)]
83 pub chunk_dense: bool,
84 #[serde(default = "default_chunk_size")]
85 pub chunk_size: usize,
86 #[serde(default)]
102 pub chunk_size_memory_mb: Option<u64>,
103 pub chunk_count: Option<usize>,
107 pub chunk_by_days: Option<u32>,
108 pub chunk_by_key: Option<String>,
113 #[serde(default = "default_parallel")]
114 pub parallel: usize,
115 pub time_column: Option<String>,
116 #[serde(default = "default_time_column_type")]
117 pub time_column_type: TimeColumnType,
118 pub days_window: Option<u32>,
119
120 #[serde(default)]
143 pub partition_by: Option<String>,
144
145 #[serde(default)]
147 pub partition_granularity: PartitionGranularity,
148 pub format: FormatType,
149 #[serde(default)]
150 pub compression: CompressionType,
151 pub compression_level: Option<u32>,
152 pub compression_profile: Option<CompressionProfile>,
153 #[serde(default)]
154 pub skip_empty: bool,
155 pub destination: DestinationConfig,
156 #[serde(default)]
163 pub verify: VerifyMode,
164 #[serde(default)]
165 pub meta_columns: MetaColumns,
166 #[serde(default)]
167 pub quality: Option<QualityConfig>,
168 pub max_file_size: Option<String>,
169 #[serde(default)]
170 pub chunk_checkpoint: bool,
171 pub chunk_max_attempts: Option<u32>,
172 #[serde(default)]
173 pub tuning: Option<TuningConfig>,
174 #[serde(default)]
176 pub source_group: Option<String>,
177 #[serde(default)]
180 pub reconcile_required: bool,
181
182 #[serde(default)]
197 pub columns: std::collections::HashMap<String, String>,
198
199 #[serde(default)]
212 pub target: Option<String>,
213
214 #[serde(default)]
217 pub on_schema_drift: SchemaDriftPolicy,
218
219 #[serde(default)]
224 pub shape_drift_warn_factor: Option<f64>,
225
226 #[serde(default)]
229 pub parquet: Option<ParquetConfig>,
230}
231
232impl ExportConfig {
233 pub fn effective_compression(&self) -> (CompressionType, Option<u32>) {
236 if let Some(profile) = self.compression_profile {
237 profile.to_codec()
238 } else {
239 (self.compression, self.compression_level)
240 }
241 }
242
243 pub fn max_file_size_bytes(&self) -> Option<u64> {
244 self.max_file_size
245 .as_ref()
246 .and_then(|s| parse_file_size(s).ok())
247 }
248
249 pub fn resolve_query(
250 &self,
251 config_dir: &Path,
252 params: Option<&std::collections::HashMap<String, String>>,
253 ) -> crate::error::Result<String> {
254 if let Some(tbl) = &self.table {
257 validate_table_shortcut_ident(&self.name, tbl)?;
258 return Ok(format!("SELECT * FROM {tbl}"));
259 }
260 match (&self.query, &self.query_file) {
261 (Some(q), None) => {
262 if params.is_some() {
263 resolve_vars(q, params)
264 } else {
265 Ok(q.clone())
266 }
267 }
268 (None, Some(file)) => {
269 let file_path = std::path::Path::new(file);
270 if file_path.is_absolute() {
272 anyhow::bail!(
273 "export '{}': query_file must be a relative path: '{}'",
274 self.name,
275 file
276 );
277 }
278 if file_path
279 .components()
280 .any(|c| c == std::path::Component::ParentDir)
281 {
282 anyhow::bail!(
283 "export '{}': query_file path must not contain '..': '{}'",
284 self.name,
285 file
286 );
287 }
288 let joined = config_dir.join(file);
289 if let Ok(canonical) = joined.canonicalize() {
292 let base = config_dir
293 .canonicalize()
294 .unwrap_or_else(|_| config_dir.to_path_buf());
295 if !canonical.starts_with(&base) {
296 anyhow::bail!(
297 "export '{}': query_file '{}' resolves outside the config directory",
298 self.name,
299 file
300 );
301 }
302 }
303 let raw = std::fs::read_to_string(&joined)?;
304 resolve_vars(&raw, params)
305 }
306 (Some(_), Some(_)) => {
307 anyhow::bail!(
308 "export '{}': specify either 'query' or 'query_file', not both",
309 self.name
310 )
311 }
312 (None, None) => {
313 anyhow::bail!(
314 "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
315 self.name
316 )
317 }
318 }
319 }
320}
321
322fn validate_table_shortcut_ident(export_name: &str, raw: &str) -> crate::error::Result<()> {
333 let trimmed = raw.trim();
334 if trimmed.is_empty() {
335 anyhow::bail!("export '{export_name}': 'table' is empty");
336 }
337 let parts: Vec<&str> = trimmed.split('.').collect();
338 if parts.len() > 2 {
339 anyhow::bail!(
340 "export '{export_name}': 'table' must be `<name>` or `<schema>.<name>` (got '{raw}')"
341 );
342 }
343 for part in &parts {
344 if part.is_empty() {
345 anyhow::bail!("export '{export_name}': 'table' has an empty segment in '{raw}'");
346 }
347 let mut chars = part.chars();
348 let first = chars.next().unwrap();
349 if !(first.is_ascii_alphabetic() || first == '_') {
350 anyhow::bail!(
351 "export '{export_name}': 'table' segment '{part}' must start with a letter or underscore (use 'query:' for quoted identifiers)"
352 );
353 }
354 if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
355 anyhow::bail!(
356 "export '{export_name}': 'table' segment '{part}' contains non-identifier characters (use 'query:' for quoted identifiers)"
357 );
358 }
359 }
360 Ok(())
361}
362
363#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
364#[serde(deny_unknown_fields)]
365pub struct QualityConfig {
366 pub row_count_min: Option<usize>,
367 pub row_count_max: Option<usize>,
368 #[serde(default)]
369 pub null_ratio_max: std::collections::HashMap<String, f64>,
370 #[serde(default)]
371 pub unique_columns: Vec<String>,
372 pub unique_max_entries: Option<usize>,
376}
377
378#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
379#[serde(deny_unknown_fields)]
380pub struct MetaColumns {
381 #[serde(default)]
382 pub exported_at: bool,
383 #[serde(default)]
384 pub row_hash: bool,
385}
386
387fn default_mode() -> ExportMode {
388 ExportMode::Full
389}
390
391fn default_chunk_size() -> usize {
392 100_000
393}
394
395fn default_parallel() -> usize {
396 1
397}
398
399fn default_time_column_type() -> TimeColumnType {
400 TimeColumnType::Timestamp
401}
402
403#[derive(Debug, Deserialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
404#[serde(rename_all = "snake_case")]
405pub enum ExportMode {
406 Full,
407 Incremental,
408 Chunked,
409 TimeWindow,
410}
411
412#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
413#[serde(rename_all = "lowercase")]
414pub enum TimeColumnType {
415 Timestamp,
416 Unix,
417}
418
419#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
421#[serde(rename_all = "lowercase")]
422pub enum PartitionGranularity {
423 #[default]
424 Day,
425 Month,
426 Year,
427}
428
429#[cfg(test)]
436pub(crate) fn sample_export(name: &str) -> ExportConfig {
437 ExportConfig {
438 name: name.into(),
439 target: None,
440 verify: VerifyMode::Size,
441 query: Some("SELECT 1".into()),
442 query_file: None,
443 table: None,
444 mode: ExportMode::Full,
445 cursor_column: None,
446 cursor_fallback_column: None,
447 incremental_cursor_mode: Default::default(),
448 chunk_column: None,
449 chunk_dense: false,
450 chunk_size: 100_000,
451 chunk_size_memory_mb: None,
452 chunk_count: None,
453 chunk_by_days: None,
454 chunk_by_key: None,
455 parallel: 1,
456 time_column: None,
457 time_column_type: TimeColumnType::Timestamp,
458 days_window: None,
459 partition_by: None,
460 partition_granularity: PartitionGranularity::Day,
461 format: FormatType::Parquet,
462 compression: CompressionType::None,
463 compression_level: None,
464 compression_profile: None,
465 skip_empty: false,
466 destination: crate::config::DestinationConfig {
467 destination_type: crate::config::DestinationType::Local,
468 path: Some("/tmp".into()),
469 ..Default::default()
470 },
471 meta_columns: MetaColumns::default(),
472 quality: None,
473 max_file_size: None,
474 chunk_checkpoint: false,
475 chunk_max_attempts: None,
476 tuning: None,
477 source_group: None,
478 reconcile_required: false,
479 columns: Default::default(),
480 on_schema_drift: Default::default(),
481 shape_drift_warn_factor: None,
482 parquet: None,
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489
490 fn make_export_yaml(name: &str, extra: &str) -> ExportConfig {
493 let yaml = format!(
494 "name: {name}\nquery: \"SELECT 1\"\nformat: parquet\ndestination:\n type: local\n path: /tmp\n{extra}"
495 );
496 serde_yaml_ng::from_str(&yaml).expect("parse ExportConfig")
497 }
498
499 #[test]
500 fn max_file_size_bytes_none_when_unset() {
501 let exp = make_export_yaml("no_limit", "");
502 assert!(exp.max_file_size_bytes().is_none());
503 }
504
505 #[test]
506 fn max_file_size_bytes_parses_mb() {
507 let exp = make_export_yaml("sized", "max_file_size: \"128MB\"\n");
508 assert_eq!(exp.max_file_size_bytes(), Some(128 * 1024 * 1024));
509 }
510
511 #[test]
512 fn max_file_size_bytes_parses_gb() {
513 let exp = make_export_yaml("sized_gb", "max_file_size: \"2GB\"\n");
514 assert_eq!(exp.max_file_size_bytes(), Some(2 * 1024 * 1024 * 1024));
515 }
516
517 #[test]
518 fn max_file_size_bytes_returns_none_on_invalid() {
519 let exp = make_export_yaml("bad_size", "max_file_size: \"notanumber\"\n");
520 assert!(exp.max_file_size_bytes().is_none());
521 }
522
523 fn make_export_direct(query: Option<&str>, query_file: Option<&str>) -> ExportConfig {
530 ExportConfig {
531 query: query.map(|s| s.to_string()),
532 query_file: query_file.map(|s| s.to_string()),
533 ..sample_export("test")
534 }
535 }
536
537 fn params(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
538 pairs
539 .iter()
540 .map(|(k, v)| (k.to_string(), v.to_string()))
541 .collect()
542 }
543
544 #[test]
545 fn resolve_query_inline_no_params_returns_query_as_is() {
546 let exp = make_export_direct(Some("SELECT id FROM orders"), None);
547 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
548 assert_eq!(q, "SELECT id FROM orders");
549 }
550
551 #[test]
552 fn resolve_query_inline_with_params_substitutes_vars() {
553 let exp = make_export_direct(Some("SELECT ${col} FROM ${table}"), None);
554 let p = params(&[("col", "id"), ("table", "orders")]);
555 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
556 assert_eq!(q, "SELECT id FROM orders");
557 }
558
559 #[test]
560 fn resolve_query_inline_params_empty_map_is_noop() {
561 let exp = make_export_direct(Some("SELECT 1"), None);
562 let p = params(&[]);
563 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
564 assert_eq!(q, "SELECT 1");
565 }
566
567 #[test]
568 fn resolve_query_inline_missing_var_returns_error() {
569 unsafe { std::env::remove_var("UNSET_RIVET_TEST_VAR") };
571 let exp = make_export_direct(Some("SELECT ${UNSET_RIVET_TEST_VAR}"), None);
572 let p = params(&[]);
573 let result = exp.resolve_query(Path::new("/tmp"), Some(&p));
574 assert!(result.is_err());
575 let msg = format!("{:#}", result.unwrap_err());
576 assert!(
577 msg.contains("UNSET_RIVET_TEST_VAR") || msg.contains("not set"),
578 "got: {msg}"
579 );
580 }
581
582 #[test]
583 fn resolve_query_file_reads_content() {
584 let dir = tempfile::TempDir::new().unwrap();
585 let sql_path = dir.path().join("query.sql");
586 std::fs::write(&sql_path, "SELECT * FROM customers").unwrap();
587 let exp = make_export_direct(None, Some("query.sql"));
588 let q = exp.resolve_query(dir.path(), None).unwrap();
589 assert_eq!(q, "SELECT * FROM customers");
590 }
591
592 #[test]
593 fn resolve_query_file_with_params_substitutes() {
594 let dir = tempfile::TempDir::new().unwrap();
595 let sql_path = dir.path().join("q.sql");
596 std::fs::write(&sql_path, "SELECT ${col} FROM ${tbl}").unwrap();
597 let exp = make_export_direct(None, Some("q.sql"));
598 let p = params(&[("col", "name"), ("tbl", "users")]);
599 let q = exp.resolve_query(dir.path(), Some(&p)).unwrap();
600 assert_eq!(q, "SELECT name FROM users");
601 }
602
603 #[test]
606 fn resolve_query_table_shortcut_qualified() {
607 let mut exp = make_export_direct(None, None);
608 exp.table = Some("public.users".into());
609 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
610 assert_eq!(q, "SELECT * FROM public.users");
611 }
612
613 #[test]
614 fn resolve_query_table_shortcut_unqualified() {
615 let mut exp = make_export_direct(None, None);
616 exp.table = Some("orders".into());
617 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
618 assert_eq!(q, "SELECT * FROM orders");
619 }
620
621 #[test]
622 fn resolve_query_table_shortcut_rejects_three_part_name() {
623 let mut exp = make_export_direct(None, None);
624 exp.table = Some("db.public.users".into());
625 let err = exp.resolve_query(Path::new("/tmp"), None).unwrap_err();
626 let msg = format!("{err:#}");
627 assert!(msg.contains("<schema>.<name>"), "got: {msg}");
628 }
629
630 #[test]
631 fn resolve_query_table_shortcut_rejects_sql_injection() {
632 for bad in [
633 "users; DROP TABLE x",
634 "users--",
635 "users'",
636 "users\"",
637 "public.\"My Table\"",
638 "0starts_with_digit",
639 "",
640 ".trailing",
641 "leading.",
642 "two..dots",
643 ] {
644 let mut exp = make_export_direct(None, None);
645 exp.table = Some(bad.into());
646 assert!(
647 exp.resolve_query(Path::new("/tmp"), None).is_err(),
648 "should reject `table:` value '{bad}'",
649 );
650 }
651 }
652
653 #[test]
654 fn resolve_query_table_shortcut_takes_precedence_over_query() {
655 let mut exp = make_export_direct(Some("SELECT id FROM x"), None);
656 exp.table = Some("public.y".into());
657 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
658 assert_eq!(q, "SELECT * FROM public.y");
659 }
660
661 #[test]
662 fn resolve_query_file_missing_returns_error() {
663 let dir = tempfile::TempDir::new().unwrap();
664 let exp = make_export_direct(None, Some("nonexistent.sql"));
665 let result = exp.resolve_query(dir.path(), None);
666 assert!(result.is_err());
667 let msg = format!("{:#}", result.unwrap_err());
668 assert!(
669 msg.contains("nonexistent.sql") || msg.contains("No such file"),
670 "got: {msg}"
671 );
672 }
673
674 #[test]
675 fn resolve_query_both_set_returns_error() {
676 let mut exp = make_export_direct(Some("SELECT 1"), None);
677 exp.query_file = Some("file.sql".into());
678 let result = exp.resolve_query(Path::new("/tmp"), None);
679 assert!(result.is_err());
680 let msg = format!("{:#}", result.unwrap_err());
681 assert!(
682 msg.contains("not both") || msg.contains("query_file"),
683 "got: {msg}"
684 );
685 }
686
687 #[test]
688 fn resolve_query_neither_set_returns_error() {
689 let exp = make_export_direct(None, None);
690 let result = exp.resolve_query(Path::new("/tmp"), None);
691 assert!(result.is_err());
692 let msg = format!("{:#}", result.unwrap_err());
693 assert!(
694 msg.contains("query") || msg.contains("query_file"),
695 "got: {msg}"
696 );
697 }
698
699 #[test]
702 fn resolve_query_file_dotdot_is_rejected() {
703 let dir = tempfile::TempDir::new().unwrap();
704 let exp = make_export_direct(None, Some("../secret.sql"));
705 let result = exp.resolve_query(dir.path(), None);
706 assert!(result.is_err());
707 let msg = format!("{:#}", result.unwrap_err());
708 assert!(
709 msg.contains("..") || msg.contains("traversal"),
710 "got: {msg}"
711 );
712 }
713
714 #[test]
715 fn resolve_query_file_nested_dotdot_is_rejected() {
716 let dir = tempfile::TempDir::new().unwrap();
717 let exp = make_export_direct(None, Some("subdir/../../etc/passwd"));
718 let result = exp.resolve_query(dir.path(), None);
719 assert!(result.is_err());
720 let msg = format!("{:#}", result.unwrap_err());
721 assert!(
722 msg.contains("..") || msg.contains("traversal"),
723 "got: {msg}"
724 );
725 }
726
727 #[test]
728 fn resolve_query_file_absolute_path_is_rejected() {
729 let dir = tempfile::TempDir::new().unwrap();
730 let exp = make_export_direct(None, Some("/etc/passwd"));
731 let result = exp.resolve_query(dir.path(), None);
732 assert!(result.is_err());
733 let msg = format!("{:#}", result.unwrap_err());
734 assert!(
735 msg.contains("relative") || msg.contains("absolute"),
736 "got: {msg}"
737 );
738 }
739
740 #[test]
741 fn resolve_query_file_in_subdir_is_allowed() {
742 let dir = tempfile::TempDir::new().unwrap();
743 let subdir = dir.path().join("queries");
744 std::fs::create_dir(&subdir).unwrap();
745 std::fs::write(subdir.join("orders.sql"), "SELECT * FROM orders").unwrap();
746 let exp = make_export_direct(None, Some("queries/orders.sql"));
747 let q = exp.resolve_query(dir.path(), None).unwrap();
748 assert_eq!(q, "SELECT * FROM orders");
749 }
750}