1use std::path::Path;
7
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10
11use super::IncrementalCursorMode;
12use super::destination::DestinationConfig;
13use super::format::{CompressionProfile, CompressionType, FormatType, ParquetConfig};
14use super::resolve::{parse_file_size, resolve_vars};
15use crate::tuning::TuningConfig;
16
17#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq, Default)]
25#[serde(rename_all = "snake_case")]
26pub enum SchemaDriftPolicy {
27 #[default]
29 Warn,
30 Continue,
32 Fail,
35}
36#[derive(Debug, Deserialize, JsonSchema, Clone)]
37#[serde(deny_unknown_fields)]
38pub struct ExportConfig {
39 pub name: String,
40 #[serde(default)]
41 pub query: Option<String>,
42 pub query_file: Option<String>,
43 #[serde(default)]
52 pub table: Option<String>,
53 #[serde(default = "default_mode")]
54 pub mode: ExportMode,
55 pub cursor_column: Option<String>,
56 #[serde(default)]
58 pub cursor_fallback_column: Option<String>,
59 #[serde(default)]
61 pub incremental_cursor_mode: IncrementalCursorMode,
62 pub chunk_column: Option<String>,
63 #[serde(default)]
64 pub chunk_dense: bool,
65 #[serde(default = "default_chunk_size")]
66 pub chunk_size: usize,
67 #[serde(default)]
83 pub chunk_size_memory_mb: Option<u64>,
84 pub chunk_count: Option<usize>,
88 pub chunk_by_days: Option<u32>,
89 pub chunk_by_key: Option<String>,
94 #[serde(default = "default_parallel")]
95 pub parallel: usize,
96 pub time_column: Option<String>,
97 #[serde(default = "default_time_column_type")]
98 pub time_column_type: TimeColumnType,
99 pub days_window: Option<u32>,
100 pub format: FormatType,
101 #[serde(default)]
102 pub compression: CompressionType,
103 pub compression_level: Option<u32>,
104 pub compression_profile: Option<CompressionProfile>,
105 #[serde(default)]
106 pub skip_empty: bool,
107 pub destination: DestinationConfig,
108 #[serde(default)]
109 pub meta_columns: MetaColumns,
110 #[serde(default)]
111 pub quality: Option<QualityConfig>,
112 pub max_file_size: Option<String>,
113 #[serde(default)]
114 pub chunk_checkpoint: bool,
115 pub chunk_max_attempts: Option<u32>,
116 #[serde(default)]
117 pub tuning: Option<TuningConfig>,
118 #[serde(default)]
120 pub source_group: Option<String>,
121 #[serde(default)]
124 pub reconcile_required: bool,
125
126 #[serde(default)]
141 pub columns: std::collections::HashMap<String, String>,
142
143 #[serde(default)]
146 pub on_schema_drift: SchemaDriftPolicy,
147
148 #[serde(default)]
153 pub shape_drift_warn_factor: Option<f64>,
154
155 #[serde(default)]
158 pub parquet: Option<ParquetConfig>,
159}
160
161impl ExportConfig {
162 pub fn effective_compression(&self) -> (CompressionType, Option<u32>) {
165 if let Some(profile) = self.compression_profile {
166 profile.to_codec()
167 } else {
168 (self.compression, self.compression_level)
169 }
170 }
171
172 pub fn max_file_size_bytes(&self) -> Option<u64> {
173 self.max_file_size
174 .as_ref()
175 .and_then(|s| parse_file_size(s).ok())
176 }
177
178 pub fn resolve_query(
179 &self,
180 config_dir: &Path,
181 params: Option<&std::collections::HashMap<String, String>>,
182 ) -> crate::error::Result<String> {
183 if let Some(tbl) = &self.table {
186 validate_table_shortcut_ident(&self.name, tbl)?;
187 return Ok(format!("SELECT * FROM {tbl}"));
188 }
189 match (&self.query, &self.query_file) {
190 (Some(q), None) => {
191 if params.is_some() {
192 resolve_vars(q, params)
193 } else {
194 Ok(q.clone())
195 }
196 }
197 (None, Some(file)) => {
198 let file_path = std::path::Path::new(file);
199 if file_path.is_absolute() {
201 anyhow::bail!(
202 "export '{}': query_file must be a relative path: '{}'",
203 self.name,
204 file
205 );
206 }
207 if file_path
208 .components()
209 .any(|c| c == std::path::Component::ParentDir)
210 {
211 anyhow::bail!(
212 "export '{}': query_file path must not contain '..': '{}'",
213 self.name,
214 file
215 );
216 }
217 let joined = config_dir.join(file);
218 if let Ok(canonical) = joined.canonicalize() {
221 let base = config_dir
222 .canonicalize()
223 .unwrap_or_else(|_| config_dir.to_path_buf());
224 if !canonical.starts_with(&base) {
225 anyhow::bail!(
226 "export '{}': query_file '{}' resolves outside the config directory",
227 self.name,
228 file
229 );
230 }
231 }
232 let raw = std::fs::read_to_string(&joined)?;
233 resolve_vars(&raw, params)
234 }
235 (Some(_), Some(_)) => {
236 anyhow::bail!(
237 "export '{}': specify either 'query' or 'query_file', not both",
238 self.name
239 )
240 }
241 (None, None) => {
242 anyhow::bail!(
243 "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
244 self.name
245 )
246 }
247 }
248 }
249}
250
251fn validate_table_shortcut_ident(export_name: &str, raw: &str) -> crate::error::Result<()> {
262 let trimmed = raw.trim();
263 if trimmed.is_empty() {
264 anyhow::bail!("export '{export_name}': 'table' is empty");
265 }
266 let parts: Vec<&str> = trimmed.split('.').collect();
267 if parts.len() > 2 {
268 anyhow::bail!(
269 "export '{export_name}': 'table' must be `<name>` or `<schema>.<name>` (got '{raw}')"
270 );
271 }
272 for part in &parts {
273 if part.is_empty() {
274 anyhow::bail!("export '{export_name}': 'table' has an empty segment in '{raw}'");
275 }
276 let mut chars = part.chars();
277 let first = chars.next().unwrap();
278 if !(first.is_ascii_alphabetic() || first == '_') {
279 anyhow::bail!(
280 "export '{export_name}': 'table' segment '{part}' must start with a letter or underscore (use 'query:' for quoted identifiers)"
281 );
282 }
283 if !chars.all(|c| c.is_ascii_alphanumeric() || c == '_') {
284 anyhow::bail!(
285 "export '{export_name}': 'table' segment '{part}' contains non-identifier characters (use 'query:' for quoted identifiers)"
286 );
287 }
288 }
289 Ok(())
290}
291
292#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
293#[serde(deny_unknown_fields)]
294pub struct QualityConfig {
295 pub row_count_min: Option<usize>,
296 pub row_count_max: Option<usize>,
297 #[serde(default)]
298 pub null_ratio_max: std::collections::HashMap<String, f64>,
299 #[serde(default)]
300 pub unique_columns: Vec<String>,
301 pub unique_max_entries: Option<usize>,
305}
306
307#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Default)]
308#[serde(deny_unknown_fields)]
309pub struct MetaColumns {
310 #[serde(default)]
311 pub exported_at: bool,
312 #[serde(default)]
313 pub row_hash: bool,
314}
315
316fn default_mode() -> ExportMode {
317 ExportMode::Full
318}
319
320fn default_chunk_size() -> usize {
321 100_000
322}
323
324fn default_parallel() -> usize {
325 1
326}
327
328fn default_time_column_type() -> TimeColumnType {
329 TimeColumnType::Timestamp
330}
331
332#[derive(Debug, Deserialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
333#[serde(rename_all = "snake_case")]
334pub enum ExportMode {
335 Full,
336 Incremental,
337 Chunked,
338 TimeWindow,
339}
340
341#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone, Copy, PartialEq, Eq)]
342#[serde(rename_all = "lowercase")]
343pub enum TimeColumnType {
344 Timestamp,
345 Unix,
346}
347
348#[cfg(test)]
349mod tests {
350 use super::*;
351 use crate::config::{DestinationConfig, DestinationType};
352
353 fn make_export_yaml(name: &str, extra: &str) -> ExportConfig {
356 let yaml = format!(
357 "name: {name}\nquery: \"SELECT 1\"\nformat: parquet\ndestination:\n type: local\n path: /tmp\n{extra}"
358 );
359 serde_yaml_ng::from_str(&yaml).expect("parse ExportConfig")
360 }
361
362 #[test]
363 fn max_file_size_bytes_none_when_unset() {
364 let exp = make_export_yaml("no_limit", "");
365 assert!(exp.max_file_size_bytes().is_none());
366 }
367
368 #[test]
369 fn max_file_size_bytes_parses_mb() {
370 let exp = make_export_yaml("sized", "max_file_size: \"128MB\"\n");
371 assert_eq!(exp.max_file_size_bytes(), Some(128 * 1024 * 1024));
372 }
373
374 #[test]
375 fn max_file_size_bytes_parses_gb() {
376 let exp = make_export_yaml("sized_gb", "max_file_size: \"2GB\"\n");
377 assert_eq!(exp.max_file_size_bytes(), Some(2 * 1024 * 1024 * 1024));
378 }
379
380 #[test]
381 fn max_file_size_bytes_returns_none_on_invalid() {
382 let exp = make_export_yaml("bad_size", "max_file_size: \"notanumber\"\n");
383 assert!(exp.max_file_size_bytes().is_none());
384 }
385
386 fn make_export_direct(query: Option<&str>, query_file: Option<&str>) -> ExportConfig {
393 ExportConfig {
394 name: "test".into(),
395 query: query.map(|s| s.to_string()),
396 query_file: query_file.map(|s| s.to_string()),
397 table: None,
398 mode: ExportMode::Full,
399 cursor_column: None,
400 cursor_fallback_column: None,
401 incremental_cursor_mode: Default::default(),
402 chunk_column: None,
403 chunk_dense: false,
404 chunk_size: 100_000,
405 chunk_size_memory_mb: None,
406 chunk_count: None,
407 chunk_by_days: None,
408 chunk_by_key: None,
409 parallel: 1,
410 time_column: None,
411 time_column_type: TimeColumnType::Timestamp,
412 days_window: None,
413 format: FormatType::Parquet,
414 compression: CompressionType::None,
415 compression_level: None,
416 compression_profile: None,
417 skip_empty: false,
418 destination: DestinationConfig {
419 destination_type: DestinationType::Local,
420 path: Some("/tmp".into()),
421 ..Default::default()
422 },
423 meta_columns: MetaColumns::default(),
424 quality: None,
425 max_file_size: None,
426 chunk_checkpoint: false,
427 chunk_max_attempts: None,
428 tuning: None,
429 source_group: None,
430 reconcile_required: false,
431 columns: Default::default(),
432 on_schema_drift: Default::default(),
433 shape_drift_warn_factor: None,
434 parquet: None,
435 }
436 }
437
438 fn params(pairs: &[(&str, &str)]) -> std::collections::HashMap<String, String> {
439 pairs
440 .iter()
441 .map(|(k, v)| (k.to_string(), v.to_string()))
442 .collect()
443 }
444
445 #[test]
446 fn resolve_query_inline_no_params_returns_query_as_is() {
447 let exp = make_export_direct(Some("SELECT id FROM orders"), None);
448 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
449 assert_eq!(q, "SELECT id FROM orders");
450 }
451
452 #[test]
453 fn resolve_query_inline_with_params_substitutes_vars() {
454 let exp = make_export_direct(Some("SELECT ${col} FROM ${table}"), None);
455 let p = params(&[("col", "id"), ("table", "orders")]);
456 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
457 assert_eq!(q, "SELECT id FROM orders");
458 }
459
460 #[test]
461 fn resolve_query_inline_params_empty_map_is_noop() {
462 let exp = make_export_direct(Some("SELECT 1"), None);
463 let p = params(&[]);
464 let q = exp.resolve_query(Path::new("/tmp"), Some(&p)).unwrap();
465 assert_eq!(q, "SELECT 1");
466 }
467
468 #[test]
469 fn resolve_query_inline_missing_var_returns_error() {
470 unsafe { std::env::remove_var("UNSET_RIVET_TEST_VAR") };
472 let exp = make_export_direct(Some("SELECT ${UNSET_RIVET_TEST_VAR}"), None);
473 let p = params(&[]);
474 let result = exp.resolve_query(Path::new("/tmp"), Some(&p));
475 assert!(result.is_err());
476 let msg = format!("{:#}", result.unwrap_err());
477 assert!(
478 msg.contains("UNSET_RIVET_TEST_VAR") || msg.contains("not set"),
479 "got: {msg}"
480 );
481 }
482
483 #[test]
484 fn resolve_query_file_reads_content() {
485 let dir = tempfile::TempDir::new().unwrap();
486 let sql_path = dir.path().join("query.sql");
487 std::fs::write(&sql_path, "SELECT * FROM customers").unwrap();
488 let exp = make_export_direct(None, Some("query.sql"));
489 let q = exp.resolve_query(dir.path(), None).unwrap();
490 assert_eq!(q, "SELECT * FROM customers");
491 }
492
493 #[test]
494 fn resolve_query_file_with_params_substitutes() {
495 let dir = tempfile::TempDir::new().unwrap();
496 let sql_path = dir.path().join("q.sql");
497 std::fs::write(&sql_path, "SELECT ${col} FROM ${tbl}").unwrap();
498 let exp = make_export_direct(None, Some("q.sql"));
499 let p = params(&[("col", "name"), ("tbl", "users")]);
500 let q = exp.resolve_query(dir.path(), Some(&p)).unwrap();
501 assert_eq!(q, "SELECT name FROM users");
502 }
503
504 #[test]
507 fn resolve_query_table_shortcut_qualified() {
508 let mut exp = make_export_direct(None, None);
509 exp.table = Some("public.users".into());
510 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
511 assert_eq!(q, "SELECT * FROM public.users");
512 }
513
514 #[test]
515 fn resolve_query_table_shortcut_unqualified() {
516 let mut exp = make_export_direct(None, None);
517 exp.table = Some("orders".into());
518 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
519 assert_eq!(q, "SELECT * FROM orders");
520 }
521
522 #[test]
523 fn resolve_query_table_shortcut_rejects_three_part_name() {
524 let mut exp = make_export_direct(None, None);
525 exp.table = Some("db.public.users".into());
526 let err = exp.resolve_query(Path::new("/tmp"), None).unwrap_err();
527 let msg = format!("{err:#}");
528 assert!(msg.contains("<schema>.<name>"), "got: {msg}");
529 }
530
531 #[test]
532 fn resolve_query_table_shortcut_rejects_sql_injection() {
533 for bad in [
534 "users; DROP TABLE x",
535 "users--",
536 "users'",
537 "users\"",
538 "public.\"My Table\"",
539 "0starts_with_digit",
540 "",
541 ".trailing",
542 "leading.",
543 "two..dots",
544 ] {
545 let mut exp = make_export_direct(None, None);
546 exp.table = Some(bad.into());
547 assert!(
548 exp.resolve_query(Path::new("/tmp"), None).is_err(),
549 "should reject `table:` value '{bad}'",
550 );
551 }
552 }
553
554 #[test]
555 fn resolve_query_table_shortcut_takes_precedence_over_query() {
556 let mut exp = make_export_direct(Some("SELECT id FROM x"), None);
557 exp.table = Some("public.y".into());
558 let q = exp.resolve_query(Path::new("/tmp"), None).unwrap();
559 assert_eq!(q, "SELECT * FROM public.y");
560 }
561
562 #[test]
563 fn resolve_query_file_missing_returns_error() {
564 let dir = tempfile::TempDir::new().unwrap();
565 let exp = make_export_direct(None, Some("nonexistent.sql"));
566 let result = exp.resolve_query(dir.path(), None);
567 assert!(result.is_err());
568 let msg = format!("{:#}", result.unwrap_err());
569 assert!(
570 msg.contains("nonexistent.sql") || msg.contains("No such file"),
571 "got: {msg}"
572 );
573 }
574
575 #[test]
576 fn resolve_query_both_set_returns_error() {
577 let mut exp = make_export_direct(Some("SELECT 1"), None);
578 exp.query_file = Some("file.sql".into());
579 let result = exp.resolve_query(Path::new("/tmp"), None);
580 assert!(result.is_err());
581 let msg = format!("{:#}", result.unwrap_err());
582 assert!(
583 msg.contains("not both") || msg.contains("query_file"),
584 "got: {msg}"
585 );
586 }
587
588 #[test]
589 fn resolve_query_neither_set_returns_error() {
590 let exp = make_export_direct(None, None);
591 let result = exp.resolve_query(Path::new("/tmp"), None);
592 assert!(result.is_err());
593 let msg = format!("{:#}", result.unwrap_err());
594 assert!(
595 msg.contains("query") || msg.contains("query_file"),
596 "got: {msg}"
597 );
598 }
599
600 #[test]
603 fn resolve_query_file_dotdot_is_rejected() {
604 let dir = tempfile::TempDir::new().unwrap();
605 let exp = make_export_direct(None, Some("../secret.sql"));
606 let result = exp.resolve_query(dir.path(), None);
607 assert!(result.is_err());
608 let msg = format!("{:#}", result.unwrap_err());
609 assert!(
610 msg.contains("..") || msg.contains("traversal"),
611 "got: {msg}"
612 );
613 }
614
615 #[test]
616 fn resolve_query_file_nested_dotdot_is_rejected() {
617 let dir = tempfile::TempDir::new().unwrap();
618 let exp = make_export_direct(None, Some("subdir/../../etc/passwd"));
619 let result = exp.resolve_query(dir.path(), None);
620 assert!(result.is_err());
621 let msg = format!("{:#}", result.unwrap_err());
622 assert!(
623 msg.contains("..") || msg.contains("traversal"),
624 "got: {msg}"
625 );
626 }
627
628 #[test]
629 fn resolve_query_file_absolute_path_is_rejected() {
630 let dir = tempfile::TempDir::new().unwrap();
631 let exp = make_export_direct(None, Some("/etc/passwd"));
632 let result = exp.resolve_query(dir.path(), None);
633 assert!(result.is_err());
634 let msg = format!("{:#}", result.unwrap_err());
635 assert!(
636 msg.contains("relative") || msg.contains("absolute"),
637 "got: {msg}"
638 );
639 }
640
641 #[test]
642 fn resolve_query_file_in_subdir_is_allowed() {
643 let dir = tempfile::TempDir::new().unwrap();
644 let subdir = dir.path().join("queries");
645 std::fs::create_dir(&subdir).unwrap();
646 std::fs::write(subdir.join("orders.sql"), "SELECT * FROM orders").unwrap();
647 let exp = make_export_direct(None, Some("queries/orders.sql"));
648 let q = exp.resolve_query(dir.path(), None).unwrap();
649 assert_eq!(q, "SELECT * FROM orders");
650 }
651}