1use std::path::Path;
2
3use serde::Deserialize;
4
5use super::resolve::{parse_file_size, resolve_env_vars, resolve_vars};
6use crate::tuning::TuningConfig;
7
8#[derive(Debug, Deserialize, Clone)]
9pub struct NotificationsConfig {
10 pub slack: Option<SlackConfig>,
11}
12
13#[derive(Debug, Deserialize, Clone)]
14pub struct SlackConfig {
15 pub webhook_url: Option<String>,
16 pub webhook_url_env: Option<String>,
17 #[serde(default)]
18 pub on: Vec<NotifyEvent>,
19}
20
21#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
22#[serde(rename_all = "snake_case")]
23pub enum NotifyEvent {
24 Failure,
25 SchemaChange,
26 Degraded,
27}
28
29#[derive(Debug, Deserialize, Clone)]
30pub struct SourceConfig {
31 #[serde(rename = "type")]
32 pub source_type: SourceType,
33
34 pub url: Option<String>,
35 pub url_env: Option<String>,
36 pub url_file: Option<String>,
37
38 pub host: Option<String>,
39 pub port: Option<u16>,
40 pub user: Option<String>,
41 pub password: Option<String>,
42 pub password_env: Option<String>,
43 pub database: Option<String>,
44
45 #[serde(default)]
46 pub tuning: Option<TuningConfig>,
47}
48
49impl SourceConfig {
50 pub(crate) fn has_structured_fields(&self) -> bool {
51 self.host.is_some()
52 || self.user.is_some()
53 || self.database.is_some()
54 || self.password.is_some()
55 || self.password_env.is_some()
56 }
57
58 pub(crate) fn has_url_fields(&self) -> bool {
59 self.url.is_some() || self.url_env.is_some() || self.url_file.is_some()
60 }
61
62 fn build_url_from_fields(&self) -> crate::error::Result<String> {
63 let host = self
64 .host
65 .as_deref()
66 .ok_or_else(|| anyhow::anyhow!("source: structured config requires 'host'"))?;
67 let user = self
68 .user
69 .as_deref()
70 .ok_or_else(|| anyhow::anyhow!("source: structured config requires 'user'"))?;
71 let database = self
72 .database
73 .as_deref()
74 .ok_or_else(|| anyhow::anyhow!("source: structured config requires 'database'"))?;
75
76 let password = match (&self.password, &self.password_env) {
77 (Some(_), Some(_)) => {
78 anyhow::bail!("source: specify 'password' or 'password_env', not both");
79 }
80 (Some(p), None) => {
81 log::warn!(
82 "source config contains plaintext password -- consider using password_env"
83 );
84 resolve_env_vars(p)
85 }
86 (None, Some(env)) => std::env::var(env)
87 .map_err(|_| anyhow::anyhow!("source: env var '{}' not set (password_env)", env))?,
88 (None, None) => String::new(),
89 };
90
91 let default_port = match self.source_type {
92 SourceType::Postgres => 5432,
93 SourceType::Mysql => 3306,
94 };
95 let port = self.port.unwrap_or(default_port);
96
97 let scheme = match self.source_type {
98 SourceType::Postgres => "postgresql",
99 SourceType::Mysql => "mysql",
100 };
101
102 if password.is_empty() {
103 Ok(format!(
104 "{}://{}@{}:{}/{}",
105 scheme, user, host, port, database
106 ))
107 } else {
108 Ok(format!(
109 "{}://{}:{}@{}:{}/{}",
110 scheme, user, password, host, port, database
111 ))
112 }
113 }
114
115 pub fn resolve_url(&self) -> crate::error::Result<String> {
116 if self.has_url_fields() && self.has_structured_fields() {
117 anyhow::bail!(
118 "source: use either URL-based config (url/url_env/url_file) or structured fields (host/user/database/...), not both"
119 );
120 }
121
122 if self.has_structured_fields() {
123 return self.build_url_from_fields();
124 }
125
126 let raw = match (&self.url, &self.url_env, &self.url_file) {
127 (Some(u), None, None) => u.clone(),
128 (None, Some(env), None) => {
129 std::env::var(env).map_err(|_| anyhow::anyhow!("env var '{}' not set", env))?
130 }
131 (None, None, Some(file)) => std::fs::read_to_string(file)
132 .map_err(|e| anyhow::anyhow!("cannot read url_file '{}': {}", file, e))?
133 .trim()
134 .to_string(),
135 _ => anyhow::bail!(
136 "source: specify exactly one of 'url', 'url_env', 'url_file', or structured fields (host/user/database)"
137 ),
138 };
139
140 let resolved = resolve_env_vars(&raw);
141
142 if resolved.contains('@')
143 && resolved.contains(':')
144 && let Some(userinfo) = resolved.split('@').next()
145 && userinfo.contains(':')
146 && !userinfo.ends_with(':')
147 {
148 log::warn!(
149 "source URL contains plaintext password -- consider using url_env or url_file"
150 );
151 }
152
153 Ok(resolved)
154 }
155}
156
157#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
158#[serde(rename_all = "lowercase")]
159pub enum SourceType {
160 Postgres,
161 Mysql,
162}
163
164#[derive(Debug, Deserialize, Clone)]
165pub struct ExportConfig {
166 pub name: String,
167 #[serde(default)]
168 pub query: Option<String>,
169 pub query_file: Option<String>,
170 #[serde(default = "default_mode")]
171 pub mode: ExportMode,
172 pub cursor_column: Option<String>,
173 pub chunk_column: Option<String>,
174 #[serde(default)]
175 pub chunk_dense: bool,
176 #[serde(default = "default_chunk_size")]
177 pub chunk_size: usize,
178 #[serde(default = "default_parallel")]
179 pub parallel: usize,
180 pub time_column: Option<String>,
181 #[serde(default = "default_time_column_type")]
182 pub time_column_type: TimeColumnType,
183 pub days_window: Option<u32>,
184 pub format: FormatType,
185 #[serde(default)]
186 pub compression: CompressionType,
187 pub compression_level: Option<u32>,
188 #[serde(default)]
189 pub skip_empty: bool,
190 pub destination: DestinationConfig,
191 #[serde(default)]
192 pub meta_columns: MetaColumns,
193 #[serde(default)]
194 pub quality: Option<QualityConfig>,
195 pub max_file_size: Option<String>,
196 #[serde(default)]
197 pub chunk_checkpoint: bool,
198 pub chunk_max_attempts: Option<u32>,
199 #[serde(default)]
200 pub tuning: Option<TuningConfig>,
201}
202
203impl ExportConfig {
204 pub fn max_file_size_bytes(&self) -> Option<u64> {
205 self.max_file_size
206 .as_ref()
207 .and_then(|s| parse_file_size(s).ok())
208 }
209
210 pub fn resolve_query(
211 &self,
212 config_dir: &Path,
213 params: Option<&std::collections::HashMap<String, String>>,
214 ) -> crate::error::Result<String> {
215 match (&self.query, &self.query_file) {
216 (Some(q), None) => {
217 if params.is_some() {
218 Ok(resolve_vars(q, params))
219 } else {
220 Ok(q.clone())
221 }
222 }
223 (None, Some(file)) => {
224 let path = config_dir.join(file);
225 let raw = std::fs::read_to_string(&path)?;
226 Ok(resolve_vars(&raw, params))
227 }
228 (Some(_), Some(_)) => {
229 anyhow::bail!(
230 "export '{}': specify either 'query' or 'query_file', not both",
231 self.name
232 )
233 }
234 (None, None) => {
235 anyhow::bail!(
236 "export '{}': must specify 'query' or 'query_file'",
237 self.name
238 )
239 }
240 }
241 }
242}
243
244#[derive(Debug, Deserialize, Clone)]
245pub struct QualityConfig {
246 pub row_count_min: Option<usize>,
247 pub row_count_max: Option<usize>,
248 #[serde(default)]
249 pub null_ratio_max: std::collections::HashMap<String, f64>,
250 #[serde(default)]
251 pub unique_columns: Vec<String>,
252}
253
254#[derive(Debug, Deserialize, Clone, Default)]
255pub struct MetaColumns {
256 #[serde(default)]
257 pub exported_at: bool,
258 #[serde(default)]
259 pub row_hash: bool,
260}
261
262fn default_mode() -> ExportMode {
263 ExportMode::Full
264}
265
266fn default_chunk_size() -> usize {
267 100_000
268}
269
270fn default_parallel() -> usize {
271 1
272}
273
274fn default_time_column_type() -> TimeColumnType {
275 TimeColumnType::Timestamp
276}
277
278#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
279#[serde(rename_all = "snake_case")]
280pub enum ExportMode {
281 Full,
282 Incremental,
283 Chunked,
284 TimeWindow,
285}
286
287#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
288#[serde(rename_all = "lowercase")]
289pub enum TimeColumnType {
290 Timestamp,
291 Unix,
292}
293
294#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
295#[serde(rename_all = "lowercase")]
296pub enum FormatType {
297 Parquet,
298 Csv,
299}
300
301#[derive(Debug, Default, Deserialize, Clone, Copy, PartialEq, Eq)]
302#[serde(rename_all = "lowercase")]
303pub enum CompressionType {
304 #[default]
305 Zstd,
306 Snappy,
307 Gzip,
308 Lz4,
309 None,
310}
311
312#[derive(Debug, Deserialize, Clone)]
313pub struct DestinationConfig {
314 #[serde(rename = "type")]
315 pub destination_type: DestinationType,
316 pub bucket: Option<String>,
317 pub prefix: Option<String>,
318 pub path: Option<String>,
319 pub region: Option<String>,
320 pub endpoint: Option<String>,
321 pub credentials_file: Option<String>,
322 pub access_key_env: Option<String>,
323 pub secret_key_env: Option<String>,
324 pub aws_profile: Option<String>,
325 #[serde(default)]
326 pub allow_anonymous: bool,
327}
328
329#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
330#[serde(rename_all = "lowercase")]
331pub enum DestinationType {
332 Local,
333 S3,
334 Gcs,
335 Stdout,
336}