Skip to main content

rivet_cli/config/
models.rs

1use std::path::Path;
2
3use serde::Deserialize;
4
5use super::resolve::{parse_file_size, resolve_env_vars, resolve_vars};
6use crate::tuning::TuningConfig;
7
8#[derive(Debug, Deserialize, Clone)]
9pub struct NotificationsConfig {
10    pub slack: Option<SlackConfig>,
11}
12
13#[derive(Debug, Deserialize, Clone)]
14pub struct SlackConfig {
15    pub webhook_url: Option<String>,
16    pub webhook_url_env: Option<String>,
17    #[serde(default)]
18    pub on: Vec<NotifyEvent>,
19}
20
21#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
22#[serde(rename_all = "snake_case")]
23pub enum NotifyEvent {
24    Failure,
25    SchemaChange,
26    Degraded,
27}
28
29#[derive(Debug, Deserialize, Clone)]
30pub struct SourceConfig {
31    #[serde(rename = "type")]
32    pub source_type: SourceType,
33
34    pub url: Option<String>,
35    pub url_env: Option<String>,
36    pub url_file: Option<String>,
37
38    pub host: Option<String>,
39    pub port: Option<u16>,
40    pub user: Option<String>,
41    pub password: Option<String>,
42    pub password_env: Option<String>,
43    pub database: Option<String>,
44
45    #[serde(default)]
46    pub tuning: Option<TuningConfig>,
47}
48
49impl SourceConfig {
50    pub(crate) fn has_structured_fields(&self) -> bool {
51        self.host.is_some()
52            || self.user.is_some()
53            || self.database.is_some()
54            || self.password.is_some()
55            || self.password_env.is_some()
56    }
57
58    pub(crate) fn has_url_fields(&self) -> bool {
59        self.url.is_some() || self.url_env.is_some() || self.url_file.is_some()
60    }
61
62    fn build_url_from_fields(&self) -> crate::error::Result<String> {
63        let host = self
64            .host
65            .as_deref()
66            .ok_or_else(|| anyhow::anyhow!("source: structured config requires 'host'"))?;
67        let user = self
68            .user
69            .as_deref()
70            .ok_or_else(|| anyhow::anyhow!("source: structured config requires 'user'"))?;
71        let database = self
72            .database
73            .as_deref()
74            .ok_or_else(|| anyhow::anyhow!("source: structured config requires 'database'"))?;
75
76        let password = match (&self.password, &self.password_env) {
77            (Some(_), Some(_)) => {
78                anyhow::bail!("source: specify 'password' or 'password_env', not both");
79            }
80            (Some(p), None) => {
81                log::warn!(
82                    "source config contains plaintext password -- consider using password_env"
83                );
84                resolve_env_vars(p)
85            }
86            (None, Some(env)) => std::env::var(env)
87                .map_err(|_| anyhow::anyhow!("source: env var '{}' not set (password_env)", env))?,
88            (None, None) => String::new(),
89        };
90
91        let default_port = match self.source_type {
92            SourceType::Postgres => 5432,
93            SourceType::Mysql => 3306,
94        };
95        let port = self.port.unwrap_or(default_port);
96
97        let scheme = match self.source_type {
98            SourceType::Postgres => "postgresql",
99            SourceType::Mysql => "mysql",
100        };
101
102        if password.is_empty() {
103            Ok(format!(
104                "{}://{}@{}:{}/{}",
105                scheme, user, host, port, database
106            ))
107        } else {
108            Ok(format!(
109                "{}://{}:{}@{}:{}/{}",
110                scheme, user, password, host, port, database
111            ))
112        }
113    }
114
115    pub fn resolve_url(&self) -> crate::error::Result<String> {
116        if self.has_url_fields() && self.has_structured_fields() {
117            anyhow::bail!(
118                "source: use either URL-based config (url/url_env/url_file) or structured fields (host/user/database/...), not both"
119            );
120        }
121
122        if self.has_structured_fields() {
123            return self.build_url_from_fields();
124        }
125
126        let raw = match (&self.url, &self.url_env, &self.url_file) {
127            (Some(u), None, None) => u.clone(),
128            (None, Some(env), None) => {
129                std::env::var(env).map_err(|_| anyhow::anyhow!("env var '{}' not set", env))?
130            }
131            (None, None, Some(file)) => std::fs::read_to_string(file)
132                .map_err(|e| anyhow::anyhow!("cannot read url_file '{}': {}", file, e))?
133                .trim()
134                .to_string(),
135            _ => anyhow::bail!(
136                "source: specify exactly one of 'url', 'url_env', 'url_file', or structured fields (host/user/database)"
137            ),
138        };
139
140        let resolved = resolve_env_vars(&raw);
141
142        if resolved.contains('@')
143            && resolved.contains(':')
144            && let Some(userinfo) = resolved.split('@').next()
145            && userinfo.contains(':')
146            && !userinfo.ends_with(':')
147        {
148            log::warn!(
149                "source URL contains plaintext password -- consider using url_env or url_file"
150            );
151        }
152
153        Ok(resolved)
154    }
155}
156
157#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
158#[serde(rename_all = "lowercase")]
159pub enum SourceType {
160    Postgres,
161    Mysql,
162}
163
164#[derive(Debug, Deserialize, Clone)]
165pub struct ExportConfig {
166    pub name: String,
167    #[serde(default)]
168    pub query: Option<String>,
169    pub query_file: Option<String>,
170    #[serde(default = "default_mode")]
171    pub mode: ExportMode,
172    pub cursor_column: Option<String>,
173    pub chunk_column: Option<String>,
174    #[serde(default)]
175    pub chunk_dense: bool,
176    #[serde(default = "default_chunk_size")]
177    pub chunk_size: usize,
178    #[serde(default = "default_parallel")]
179    pub parallel: usize,
180    pub time_column: Option<String>,
181    #[serde(default = "default_time_column_type")]
182    pub time_column_type: TimeColumnType,
183    pub days_window: Option<u32>,
184    pub format: FormatType,
185    #[serde(default)]
186    pub compression: CompressionType,
187    pub compression_level: Option<u32>,
188    #[serde(default)]
189    pub skip_empty: bool,
190    pub destination: DestinationConfig,
191    #[serde(default)]
192    pub meta_columns: MetaColumns,
193    #[serde(default)]
194    pub quality: Option<QualityConfig>,
195    pub max_file_size: Option<String>,
196    #[serde(default)]
197    pub chunk_checkpoint: bool,
198    pub chunk_max_attempts: Option<u32>,
199    #[serde(default)]
200    pub tuning: Option<TuningConfig>,
201}
202
203impl ExportConfig {
204    pub fn max_file_size_bytes(&self) -> Option<u64> {
205        self.max_file_size
206            .as_ref()
207            .and_then(|s| parse_file_size(s).ok())
208    }
209
210    pub fn resolve_query(
211        &self,
212        config_dir: &Path,
213        params: Option<&std::collections::HashMap<String, String>>,
214    ) -> crate::error::Result<String> {
215        match (&self.query, &self.query_file) {
216            (Some(q), None) => {
217                if params.is_some() {
218                    Ok(resolve_vars(q, params))
219                } else {
220                    Ok(q.clone())
221                }
222            }
223            (None, Some(file)) => {
224                let path = config_dir.join(file);
225                let raw = std::fs::read_to_string(&path)?;
226                Ok(resolve_vars(&raw, params))
227            }
228            (Some(_), Some(_)) => {
229                anyhow::bail!(
230                    "export '{}': specify either 'query' or 'query_file', not both",
231                    self.name
232                )
233            }
234            (None, None) => {
235                anyhow::bail!(
236                    "export '{}': must specify 'query' or 'query_file'",
237                    self.name
238                )
239            }
240        }
241    }
242}
243
244#[derive(Debug, Deserialize, Clone)]
245pub struct QualityConfig {
246    pub row_count_min: Option<usize>,
247    pub row_count_max: Option<usize>,
248    #[serde(default)]
249    pub null_ratio_max: std::collections::HashMap<String, f64>,
250    #[serde(default)]
251    pub unique_columns: Vec<String>,
252}
253
254#[derive(Debug, Deserialize, Clone, Default)]
255pub struct MetaColumns {
256    #[serde(default)]
257    pub exported_at: bool,
258    #[serde(default)]
259    pub row_hash: bool,
260}
261
262fn default_mode() -> ExportMode {
263    ExportMode::Full
264}
265
266fn default_chunk_size() -> usize {
267    100_000
268}
269
270fn default_parallel() -> usize {
271    1
272}
273
274fn default_time_column_type() -> TimeColumnType {
275    TimeColumnType::Timestamp
276}
277
278#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
279#[serde(rename_all = "snake_case")]
280pub enum ExportMode {
281    Full,
282    Incremental,
283    Chunked,
284    TimeWindow,
285}
286
287#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
288#[serde(rename_all = "lowercase")]
289pub enum TimeColumnType {
290    Timestamp,
291    Unix,
292}
293
294#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
295#[serde(rename_all = "lowercase")]
296pub enum FormatType {
297    Parquet,
298    Csv,
299}
300
301#[derive(Debug, Default, Deserialize, Clone, Copy, PartialEq, Eq)]
302#[serde(rename_all = "lowercase")]
303pub enum CompressionType {
304    #[default]
305    Zstd,
306    Snappy,
307    Gzip,
308    Lz4,
309    None,
310}
311
312#[derive(Debug, Deserialize, Clone)]
313pub struct DestinationConfig {
314    #[serde(rename = "type")]
315    pub destination_type: DestinationType,
316    pub bucket: Option<String>,
317    pub prefix: Option<String>,
318    pub path: Option<String>,
319    pub region: Option<String>,
320    pub endpoint: Option<String>,
321    pub credentials_file: Option<String>,
322    pub access_key_env: Option<String>,
323    pub secret_key_env: Option<String>,
324    pub aws_profile: Option<String>,
325    #[serde(default)]
326    pub allow_anonymous: bool,
327}
328
329#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
330#[serde(rename_all = "lowercase")]
331pub enum DestinationType {
332    Local,
333    S3,
334    Gcs,
335    Stdout,
336}