1use std::path::Path;
7
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10
11use super::IncrementalCursorMode;
12use super::destination::DestinationConfig;
13use super::format::{CompressionProfile, CompressionType, FormatType, ParquetConfig};
14use super::resolve::{parse_file_size, resolve_vars};
15use crate::tuning::TuningConfig;
16
17#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
26#[serde(rename_all = "snake_case")]
27pub enum VerifyMode {
28 #[default]
30 Size,
31 Content,
34}
35
36impl VerifyMode {
37 pub fn requires_content(self) -> bool {
39 matches!(self, VerifyMode::Content)
40 }
41}
42
43#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
44#[serde(rename_all = "snake_case")]
45pub enum SchemaDriftPolicy {
46 #[default]
48 Warn,
49 Continue,
51 Fail,
54}
55#[derive(Debug, Deserialize, JsonSchema, Clone)]
56#[serde(deny_unknown_fields)]
57pub struct ExportConfig {
58 pub name: String,
59 #[serde(default)]
60 pub query: Option<String>,
61 pub query_file: Option<String>,
62 #[serde(default)]
71 pub table: Option<String>,
72 #[serde(default = "default_mode")]
73 pub mode: ExportMode,
74 pub cursor_column: Option<String>,
75 #[serde(default)]
77 pub cursor_fallback_column: Option<String>,
78 #[serde(default)]
80 pub incremental_cursor_mode: IncrementalCursorMode,
81 pub chunk_column: Option<String>,
82 #[serde(default)]
83 pub chunk_dense: bool,
84 #[serde(default = "default_chunk_size")]
85 pub chunk_size: usize,
86 #[serde(default)]
102 pub chunk_size_memory_mb: Option<u64>,
103 pub chunk_count: Option<usize>,
107 pub chunk_by_days: Option<u32>,
108 pub chunk_by_key: Option<String>,
113 #[serde(default = "default_parallel")]
114 pub parallel: usize,
115 pub time_column: Option<String>,
116 #[serde(default = "default_time_column_type")]
117 pub time_column_type: TimeColumnType,
118 pub days_window: Option<u32>,
119 pub format: FormatType,
120 #[serde(default)]
121 pub compression: CompressionType,
122 pub compression_level: Option<u32>,
123 pub compression_profile: Option<CompressionProfile>,
124 #[serde(default)]
125 pub skip_empty: bool,
126 pub destination: DestinationConfig,
127 #[serde(default)]
134 pub verify: VerifyMode,
135 #[serde(default)]
136 pub meta_columns: MetaColumns,
137 #[serde(default)]
138 pub quality: Option<QualityConfig>,
139 pub max_file_size: Option<String>,
140 #[serde(default)]
141 pub chunk_checkpoint: bool,
142 pub chunk_max_attempts: Option<u32>,
143 #[serde(default)]
144 pub tuning: Option<TuningConfig>,
145 #[serde(default)]
147 pub source_group: Option<String>,
148 #[serde(default)]
151 pub reconcile_required: bool,
152
153 #[serde(default)]
168 pub columns: std::collections::HashMap<String, String>,
169
170 #[serde(default)]
183 pub target: Option<String>,
184
185 #[serde(default)]
188 pub on_schema_drift: SchemaDriftPolicy,
189
190 #[serde(default)]
195 pub shape_drift_warn_factor: Option<f64>,
196
197 #[serde(default)]
200 pub parquet: Option<ParquetConfig>,
201}
202
203impl ExportConfig {
204 pub fn effective_compression(&self) -> (CompressionType, Option<u32>) {
207 if let Some(profile) = self.compression_profile {
208 profile.to_codec()
209 } else {
210 (self.compression, self.compression_level)
211 }
212 }
213
214 pub fn max_file_size_bytes(&self) -> Option<u64> {
215 self.max_file_size
216 .as_ref()
217 .and_then(|s| parse_file_size(s).ok())
218 }
219
220 pub fn resolve_query(
221 &self,
222 config_dir: &Path,
223 params: Option<&std::collections::HashMap<String, String>>,
224 ) -> crate::error::Result<String> {
225 if let Some(tbl) = &self.table {
228 validate_table_shortcut_ident(&self.name, tbl)?;
229 return Ok(format!("SELECT * FROM {tbl}"));
230 }
231 match (&self.query, &self.query_file) {
232 (Some(q), None) => {
233 if params.is_some() {
234 resolve_vars(q, params)
235 } else {
236 Ok(q.clone())
237 }
238 }
239 (None, Some(file)) => {
240 let file_path = std::path::Path::new(file);
241 if file_path.is_absolute() {
243 anyhow::bail!(
244 "export '{}': query_file must be a relative path: '{}'",
245 self.name,
246 file
247 );
248 }
249 if file_path
250 .components()
251 .any(|c| c == std::path::Component::ParentDir)
252 {
253 anyhow::bail!(
254 "export '{}': query_file path must not contain '..': '{}'",
255 self.name,
256 file
257 );
258 }
259 let joined = config_dir.join(file);
260 if let Ok(canonical) = joined.canonicalize() {
263 let base = config_dir
264 .canonicalize()
265 .unwrap_or_else(|_| config_dir.to_path_buf());
266 if !canonical.starts_with(&base) {
267 anyhow::bail!(
268 "export '{}': query_file '{}' resolves outside the config directory",
269 self.name,
270 file
271 );
272 }
273 }
274 let raw = std::fs::read_to_string(&joined)?;
275 resolve_vars(&raw, params)
276 }
277 (Some(_), Some(_)) => {
278 anyhow::bail!(
279 "export '{}': specify either 'query' or 'query_file', not both",
280 self.name
281 )
282 }
283 (None, None) => {
284 anyhow::bail!(
285 "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
286 self.name
287 )
288 }
289 }
290 }
291}
292
293fn validate_table_shortcut_ident(export_name: &str, raw: &str) -> crate::error::Result<()> {
304 let trimmed = raw.trim();
305 if trimmed.is_empty() {
306 anyhow::bail!("export '{export_name}': 'table' is empty");
307 }
308 let parts: Vec<&str> = trimmed.split('.').collect();
309 if parts.len() > 2 {
310 anyhow::bail!(
311 "export '{export_name}': 'table' must be `<name>` or `<schema>.<name>` (got '{raw}')"
312 );
313 }
314 for part in &parts {
315 if part.is_empty() {
316 anyhow::bail!("export '{export_name}': 'table' has an empty segment in '{raw}'");
317 }
318 let mut chars = part.chars();
319 let first = chars.next().unwrap();
320 if !(first.is_ascii_alphabetic() || first == '_') {
321 anyhow::bail!(
322 "export '{export_name}': 'table' segment '{part}' must start with a letter or underscore (use 'query:' for quoted identifiers)"
323 );
324 }
325 if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
326 anyhow::bail!(
327 "export '{export_name}': 'table' segment '{part}' contains non-identifier characters (use 'query:' for quoted identifiers)"
328 );
329 }
330 }
331 Ok(())
332}
333
334#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
335#[serde(deny_unknown_fields)]
336pub struct QualityConfig {
337 pub row_count_min: Option<usize>,
338 pub row_count_max: Option<usize>,
339 #[serde(default)]
340 pub null_ratio_max: std::collections::HashMap<String, f64>,
341 #[serde(default)]
342 pub unique_columns: Vec<String>,
343 pub unique_max_entries: Option<usize>,
347}
348
349#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
350#[serde(deny_unknown_fields)]
351pub struct MetaColumns {
352 #[serde(default)]
353 pub exported_at: bool,
354 #[serde(default)]
355 pub row_hash: bool,
356}
357
358fn default_mode() -> ExportMode {
359 ExportMode::Full
360}
361
362fn default_chunk_size() -> usize {
363 100_000
364}
365
366fn default_parallel() -> usize {
367 1
368}
369
370fn default_time_column_type() -> TimeColumnType {
371 TimeColumnType::Timestamp
372}
373
374#[derive(Debug, Deserialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
375#[serde(rename_all = "snake_case")]
376pub enum ExportMode {
377 Full,
378 Incremental,
379 Chunked,
380 TimeWindow,
381}
382
383#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
384#[serde(rename_all = "lowercase")]
385pub enum TimeColumnType {
386 Timestamp,
387 Unix,
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use crate::config::{DestinationConfig, DestinationType};
394
395 fn make_export_yaml(name: &str, extra: &str) -> ExportConfig {
398 let yaml = format!(
399 "name: {name}\nquery: \"SELECT 1\"\nformat: parquet\ndestination:\n type: local\n path: /tmp\n{extra}"
400 );
401 serde_yaml_ng::from_str(&yaml).expect("parse ExportConfig")
402 }
403
404 #[test]
405 fn max_file_size_bytes_none_when_unset() {
406 let exp = make_export_yaml("no_limit", "");
407 assert!(exp.max_file_size_bytes().is_none());
408 }
409
410 #[test]
411 fn max_file_size_bytes_parses_mb() {
412 let exp = make_export_yaml("sized", "max_file_size: \"128MB\"\n");
413 assert_eq!(exp.max_file_size_bytes(), Some(128 * 1024 * 1024));
414 }
415
416 #[test]
417 fn max_file_size_bytes_parses_gb() {
418 let exp = make_export_yaml("sized_gb", "max_file_size: \"2GB\"\n");
419 assert_eq!(exp.max_file_size_bytes(), Some(2 * 1024 * 1024 * 1024));
420 }
421
422 #[test]
423 fn max_file_size_bytes_returns_none_on_invalid() {
424 let exp = make_export_yaml("bad_size", "max_file_size: \"notanumber\"\n");
425 assert!(exp.max_file_size_bytes().is_none());
426 }
427
428 fn make_export_direct(query: Option<&str>, query_file: Option<&str>) -> ExportConfig {
435 ExportConfig {
436 name: "test".into(),
437 target: None,
438 verify: VerifyMode::Size,
439 query: query.map(|s| s.to_string()),
440 query_file: query_file.map(|s| s.to_string()),
441 table: None,
442 mode: ExportMode::Full,
443 cursor_column: None,
444 cursor_fallback_column: None,
445 incremental_cursor_mode: Default::default(),
446 chunk_column: None,
447 chunk_dense: false,
448 chunk_size: 100_000,
449 chunk_size_memory_mb: None,
450 chunk_count: None,
451 chunk_by_days: None,
452 chunk_by_key: None,
453 parallel: 1,
454 time_column: None,
455 time_column_type: TimeColumnType::Timestamp,
456 days_window: None,
457 format: FormatType::Parquet,
458 compression: CompressionType::None,
459 compression_level: None,
460 compression_profile: None,
461 skip_empty: false,
462 destination: DestinationConfig {
463 destination_type: DestinationType::Local,
464 path: Some("/tmp".into()),
465 ..Default::default()
466 },
467 meta_columns: MetaColumns::default(),
468 quality: None,
469 max_file_size: None,
470 chunk_checkpoint: false,
471 chunk_max_attempts: None,
472 tuning: None,
473 source_group: None,
474 reconcile_required: false,
475 columns: Default::default(),
476 on_schema_drift: Default::default(),
477 shape_drift_warn_factor: None,
478 parquet: None,
479 }
480 }
481
482 fn params(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
483 pairs
484 .iter()
485 .map(|(k, v)| (k.to_string(), v.to_string()))
486 .collect()
487 }
488
489 #[test]
490 fn resolve_query_inline_no_params_returns_query_as_is() {
491 let exp = make_export_direct(Some("SELECT id FROM orders"), None);
492 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
493 assert_eq!(q, "SELECT id FROM orders");
494 }
495
496 #[test]
497 fn resolve_query_inline_with_params_substitutes_vars() {
498 let exp = make_export_direct(Some("SELECT ${col} FROM ${table}"), None);
499 let p = params(&[("col", "id"), ("table", "orders")]);
500 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
501 assert_eq!(q, "SELECT id FROM orders");
502 }
503
504 #[test]
505 fn resolve_query_inline_params_empty_map_is_noop() {
506 let exp = make_export_direct(Some("SELECT 1"), None);
507 let p = params(&[]);
508 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
509 assert_eq!(q, "SELECT 1");
510 }
511
512 #[test]
513 fn resolve_query_inline_missing_var_returns_error() {
514 unsafe { std::env::remove_var("UNSET_RIVET_TEST_VAR") };
516 let exp = make_export_direct(Some("SELECT ${UNSET_RIVET_TEST_VAR}"), None);
517 let p = params(&[]);
518 let result = exp.resolve_query(Path::new("/tmp"), Some(&p));
519 assert!(result.is_err());
520 let msg = format!("{:#}", result.unwrap_err());
521 assert!(
522 msg.contains("UNSET_RIVET_TEST_VAR") || msg.contains("not set"),
523 "got: {msg}"
524 );
525 }
526
527 #[test]
528 fn resolve_query_file_reads_content() {
529 let dir = tempfile::TempDir::new().unwrap();
530 let sql_path = dir.path().join("query.sql");
531 std::fs::write(&sql_path, "SELECT * FROM customers").unwrap();
532 let exp = make_export_direct(None, Some("query.sql"));
533 let q = exp.resolve_query(dir.path(), None).unwrap();
534 assert_eq!(q, "SELECT * FROM customers");
535 }
536
537 #[test]
538 fn resolve_query_file_with_params_substitutes() {
539 let dir = tempfile::TempDir::new().unwrap();
540 let sql_path = dir.path().join("q.sql");
541 std::fs::write(&sql_path, "SELECT ${col} FROM ${tbl}").unwrap();
542 let exp = make_export_direct(None, Some("q.sql"));
543 let p = params(&[("col", "name"), ("tbl", "users")]);
544 let q = exp.resolve_query(dir.path(), Some(&p)).unwrap();
545 assert_eq!(q, "SELECT name FROM users");
546 }
547
548 #[test]
551 fn resolve_query_table_shortcut_qualified() {
552 let mut exp = make_export_direct(None, None);
553 exp.table = Some("public.users".into());
554 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
555 assert_eq!(q, "SELECT * FROM public.users");
556 }
557
558 #[test]
559 fn resolve_query_table_shortcut_unqualified() {
560 let mut exp = make_export_direct(None, None);
561 exp.table = Some("orders".into());
562 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
563 assert_eq!(q, "SELECT * FROM orders");
564 }
565
566 #[test]
567 fn resolve_query_table_shortcut_rejects_three_part_name() {
568 let mut exp = make_export_direct(None, None);
569 exp.table = Some("db.public.users".into());
570 let err = exp.resolve_query(Path::new("/tmp"), None).unwrap_err();
571 let msg = format!("{err:#}");
572 assert!(msg.contains("<schema>.<name>"), "got: {msg}");
573 }
574
575 #[test]
576 fn resolve_query_table_shortcut_rejects_sql_injection() {
577 for bad in [
578 "users; DROP TABLE x",
579 "users--",
580 "users'",
581 "users\"",
582 "public.\"My Table\"",
583 "0starts_with_digit",
584 "",
585 ".trailing",
586 "leading.",
587 "two..dots",
588 ] {
589 let mut exp = make_export_direct(None, None);
590 exp.table = Some(bad.into());
591 assert!(
592 exp.resolve_query(Path::new("/tmp"), None).is_err(),
593 "should reject `table:` value '{bad}'",
594 );
595 }
596 }
597
598 #[test]
599 fn resolve_query_table_shortcut_takes_precedence_over_query() {
600 let mut exp = make_export_direct(Some("SELECT id FROM x"), None);
601 exp.table = Some("public.y".into());
602 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
603 assert_eq!(q, "SELECT * FROM public.y");
604 }
605
606 #[test]
607 fn resolve_query_file_missing_returns_error() {
608 let dir = tempfile::TempDir::new().unwrap();
609 let exp = make_export_direct(None, Some("nonexistent.sql"));
610 let result = exp.resolve_query(dir.path(), None);
611 assert!(result.is_err());
612 let msg = format!("{:#}", result.unwrap_err());
613 assert!(
614 msg.contains("nonexistent.sql") || msg.contains("No such file"),
615 "got: {msg}"
616 );
617 }
618
619 #[test]
620 fn resolve_query_both_set_returns_error() {
621 let mut exp = make_export_direct(Some("SELECT 1"), None);
622 exp.query_file = Some("file.sql".into());
623 let result = exp.resolve_query(Path::new("/tmp"), None);
624 assert!(result.is_err());
625 let msg = format!("{:#}", result.unwrap_err());
626 assert!(
627 msg.contains("not both") || msg.contains("query_file"),
628 "got: {msg}"
629 );
630 }
631
632 #[test]
633 fn resolve_query_neither_set_returns_error() {
634 let exp = make_export_direct(None, None);
635 let result = exp.resolve_query(Path::new("/tmp"), None);
636 assert!(result.is_err());
637 let msg = format!("{:#}", result.unwrap_err());
638 assert!(
639 msg.contains("query") || msg.contains("query_file"),
640 "got: {msg}"
641 );
642 }
643
644 #[test]
647 fn resolve_query_file_dotdot_is_rejected() {
648 let dir = tempfile::TempDir::new().unwrap();
649 let exp = make_export_direct(None, Some("../secret.sql"));
650 let result = exp.resolve_query(dir.path(), None);
651 assert!(result.is_err());
652 let msg = format!("{:#}", result.unwrap_err());
653 assert!(
654 msg.contains("..") || msg.contains("traversal"),
655 "got: {msg}"
656 );
657 }
658
659 #[test]
660 fn resolve_query_file_nested_dotdot_is_rejected() {
661 let dir = tempfile::TempDir::new().unwrap();
662 let exp = make_export_direct(None, Some("subdir/../../etc/passwd"));
663 let result = exp.resolve_query(dir.path(), None);
664 assert!(result.is_err());
665 let msg = format!("{:#}", result.unwrap_err());
666 assert!(
667 msg.contains("..") || msg.contains("traversal"),
668 "got: {msg}"
669 );
670 }
671
672 #[test]
673 fn resolve_query_file_absolute_path_is_rejected() {
674 let dir = tempfile::TempDir::new().unwrap();
675 let exp = make_export_direct(None, Some("/etc/passwd"));
676 let result = exp.resolve_query(dir.path(), None);
677 assert!(result.is_err());
678 let msg = format!("{:#}", result.unwrap_err());
679 assert!(
680 msg.contains("relative") || msg.contains("absolute"),
681 "got: {msg}"
682 );
683 }
684
685 #[test]
686 fn resolve_query_file_in_subdir_is_allowed() {
687 let dir = tempfile::TempDir::new().unwrap();
688 let subdir = dir.path().join("queries");
689 std::fs::create_dir(&subdir).unwrap();
690 std::fs::write(subdir.join("orders.sql"), "SELECT * FROM orders").unwrap();
691 let exp = make_export_direct(None, Some("queries/orders.sql"));
692 let q = exp.resolve_query(dir.path(), None).unwrap();
693 assert_eq!(q, "SELECT * FROM orders");
694 }
695}