Skip to main content

rivet/config/
mod.rs

1pub mod cursor;
2mod destination;
3mod export;
4mod format;
5mod lints;
6mod notifications;
7pub mod resolve;
8pub mod schema;
9mod source;
10
11pub use cursor::IncrementalCursorMode;
12pub use destination::*;
13pub use export::*;
14pub use format::*;
15pub use notifications::*;
16#[allow(unused_imports)]
17pub(crate) use resolve::resolve_env_vars;
18pub use resolve::{parse_file_size, resolve_vars};
19pub use schema::generate_config_schema_pretty;
20pub use source::*;
21
22use schemars::JsonSchema;
23use serde::Deserialize;
24
25/// Top-level Rivet configuration root.
26///
27/// Operators write this struct as YAML (typically `rivet.yaml`).  The
28/// `JsonSchema` derive is the source of truth for the `schemas/rivet.schema.json`
29/// artifact and the `rivet schema config` command's output (v0.7.3 P0).
30#[derive(Debug, Deserialize, JsonSchema, Clone)]
31#[serde(deny_unknown_fields)]
32pub struct Config {
33    pub source: SourceConfig,
34    pub exports: Vec<ExportConfig>,
35    #[serde(default)]
36    pub notifications: Option<NotificationsConfig>,
37    #[serde(default)]
38    pub parallel_exports: bool,
39    #[serde(default)]
40    pub parallel_export_processes: bool,
41}
42
43impl Config {
44    pub fn load(path: &str) -> crate::error::Result<Self> {
45        Self::load_with_params(path, None)
46    }
47
48    pub fn load_with_params(
49        path: &str,
50        params: Option<&std::collections::HashMap<String, String>>,
51    ) -> crate::error::Result<Self> {
52        // F11 (0.7.5 audit): raw `std::io::Error` lost the path on
53        // not-found.  Wrap with the file path + a hint so the operator
54        // can see *which* config the tool could not open.
55        let contents = std::fs::read_to_string(path).map_err(|e| {
56            if e.kind() == std::io::ErrorKind::NotFound {
57                anyhow::anyhow!(
58                    "config file '{}' not found.\n  Hint: check the path, or run `rivet init` to generate one.",
59                    path
60                )
61            } else {
62                anyhow::anyhow!("cannot read config file '{}': {}", path, e)
63            }
64        })?;
65        // Warn about typo'd `--param` keys once per CLI invocation, using the
66        // un-resolved YAML as the haystack so the placeholders are still there.
67        // We pass the raw `contents` (not `resolved`) on purpose: after
68        // resolution the placeholders are gone, and every key would look unused.
69        resolve::warn_unused_params(&contents, params);
70        let resolved = resolve_vars(&contents, params)?;
71        // F12 (0.7.5 audit): YAML parse errors did not name the config
72        // file.  When loading from disk we know the path — thread it
73        // into the parse error.
74        Self::from_yaml(&resolved).map_err(|e| anyhow::anyhow!("config file '{}': {:#}", path, e))
75    }
76
77    pub fn from_yaml(yaml: &str) -> crate::error::Result<Self> {
78        Self::check_misplaced_tuning_fields(yaml)?;
79        let config: Config = serde_yaml_ng::from_str(yaml).map_err(lints::enhance_parse_error)?;
80        config.validate()?;
81        Ok(config)
82    }
83
84    /// Detect tuning-related fields placed directly under `source:` or an
85    /// `exports[]` entry instead of inside the `tuning:` sub-key. Without this
86    /// check serde silently ignores unknown keys and the user gets unexpected
87    /// defaults (e.g. batch_size=10 000 instead of the intended 1 000).
88    fn check_misplaced_tuning_fields(yaml: &str) -> crate::error::Result<()> {
89        const TUNING_FIELDS: &[&str] = &[
90            "batch_size",
91            "batch_size_memory_mb",
92            "throttle_ms",
93            "statement_timeout_s",
94            "max_retries",
95            "retry_backoff_ms",
96            "lock_timeout_s",
97            "memory_threshold_mb",
98            "profile",
99        ];
100
101        let root: serde_yaml_ng::Value = serde_yaml_ng::from_str(yaml)?;
102
103        if let Some(source) = root.get("source") {
104            let misplaced: Vec<&str> = TUNING_FIELDS
105                .iter()
106                .copied()
107                .filter(|&f| source.get(f).is_some())
108                .collect();
109            if !misplaced.is_empty() {
110                anyhow::bail!(
111                    "source: field(s) [{}] belong under 'source.tuning:', not directly under 'source:'. \
112                     Example:\n  source:\n    tuning:\n      {}: <value>",
113                    misplaced.join(", "),
114                    misplaced[0],
115                );
116            }
117        }
118
119        if let Some(exports) = root.get("exports").and_then(|e| e.as_sequence()) {
120            for (i, export) in exports.iter().enumerate() {
121                let name = export
122                    .get("name")
123                    .and_then(|n| n.as_str())
124                    .unwrap_or("<unnamed>");
125                let misplaced: Vec<&str> = TUNING_FIELDS
126                    .iter()
127                    .copied()
128                    .filter(|&f| export.get(f).is_some())
129                    .collect();
130                if !misplaced.is_empty() {
131                    anyhow::bail!(
132                        "export '{}' (index {}): field(s) [{}] belong under 'exports[].tuning:', \
133                         not directly in the export. Example:\n  exports:\n    - name: {}\n      tuning:\n        {}: <value>",
134                        name,
135                        i,
136                        misplaced.join(", "),
137                        name,
138                        misplaced[0],
139                    );
140                }
141            }
142        }
143
144        Ok(())
145    }
146
147    fn validate(&self) -> crate::error::Result<()> {
148        // An empty `exports:` list is almost always a typo (wrong config file,
149        // dropped anchor, merged doc with the anchor section missing). Running
150        // with zero exports is a silent no-op that looks like success in CI;
151        // reject fast instead. See QA backlog Task 5.1.
152        if self.exports.is_empty() {
153            anyhow::bail!("exports: at least one export must be defined (got empty list)");
154        }
155
156        // Duplicate export names break state tracking: `export_state`,
157        // `file_log`, and `chunk_run` are all keyed by `export_name`, so
158        // two configs with the same name silently share cursor/file-log rows.
159        // QA backlog Task 5.1.
160        {
161            let mut seen: std::collections::HashSet<&str> =
162                std::collections::HashSet::with_capacity(self.exports.len());
163            for e in &self.exports {
164                if !seen.insert(e.name.as_str()) {
165                    anyhow::bail!(
166                        "exports: duplicate export name '{}' (each export must have a unique name; state is keyed by name)",
167                        e.name
168                    );
169                }
170            }
171        }
172
173        if let Some(t) = &self.source.tuning
174            && t.batch_size.is_some()
175            && t.batch_size_memory_mb.is_some()
176        {
177            anyhow::bail!("tuning: batch_size and batch_size_memory_mb are mutually exclusive");
178        }
179
180        for export in &self.exports {
181            let merged = crate::tuning::merge_tuning_config(
182                self.source.tuning.as_ref(),
183                export.tuning.as_ref(),
184            );
185            if let Some(t) = merged
186                && t.batch_size.is_some()
187                && t.batch_size_memory_mb.is_some()
188            {
189                anyhow::bail!(
190                    "export '{}': effective tuning has both batch_size and batch_size_memory_mb (mutually exclusive)",
191                    export.name
192                );
193            }
194            if let Some(et) = &export.tuning
195                && et.batch_size.is_some()
196                && et.batch_size_memory_mb.is_some()
197            {
198                anyhow::bail!(
199                    "export '{}': tuning.batch_size and tuning.batch_size_memory_mb are mutually exclusive",
200                    export.name
201                );
202            }
203        }
204
205        if !self.source.has_url_fields() && !self.source.has_structured_fields() {
206            // First-run footgun: a config that forgot the source block
207            // entirely.  Show the recommended path (`url_env`) up-front;
208            // operators who actually want structured fields know to look
209            // for them.
210            anyhow::bail!(
211                "source: no connection method configured. Add one of:\n  url_env: DATABASE_URL                          (URL from env var — recommended)\n  url: 'postgresql://user:pass@host:5432/db'      (inline — not recommended for committed configs)\n  url_file: /etc/rivet/source.url                 (URL from file — rotation-friendly)\n  host/user/database/...                          (structured fields under `source:`)"
212            );
213        }
214
215        if self.source.has_url_fields() {
216            let url_count = [
217                &self.source.url,
218                &self.source.url_env,
219                &self.source.url_file,
220            ]
221            .iter()
222            .filter(|u| u.is_some())
223            .count();
224            if url_count > 1 {
225                anyhow::bail!(
226                    "source: specify exactly one of 'url', 'url_env', or 'url_file' (got {} set).\n  Hint: pick one — `url_env` is recommended so credentials never enter the YAML.",
227                    url_count
228                );
229            }
230        }
231
232        if self.source.has_url_fields() && self.source.has_structured_fields() {
233            anyhow::bail!(
234                "source: pick either URL-based config (url/url_env/url_file) OR structured fields (host/user/database/port/password_env), not both.\n  Hint: remove whichever block you don't want; mixing the two is ambiguous."
235            );
236        }
237
238        if self.source.has_structured_fields() {
239            if self.source.host.is_none() {
240                anyhow::bail!(
241                    "source: structured config is missing 'host'.\n  Hint: add `host: localhost` (or your DB host) under `source:` in rivet.yaml.\n  Or switch to URL-based config: `url_env: DATABASE_URL`."
242                );
243            }
244            if self.source.user.is_none() {
245                anyhow::bail!(
246                    "source: structured config is missing 'user'.\n  Hint: add `user: <username>` under `source:` in rivet.yaml."
247                );
248            }
249            if self.source.database.is_none() {
250                anyhow::bail!(
251                    "source: structured config is missing 'database'.\n  Hint: add `database: <dbname>` under `source:` in rivet.yaml."
252                );
253            }
254            if self.source.password.is_some() && self.source.password_env.is_some() {
255                anyhow::bail!(
256                    "source: specify 'password' OR 'password_env', not both.\n  Hint: prefer `password_env: DB_PASSWORD` so credentials never enter the YAML."
257                );
258            }
259        }
260
261        for export in &self.exports {
262            let set_count = [
263                export.query.is_some(),
264                export.query_file.is_some(),
265                export.table.is_some(),
266            ]
267            .iter()
268            .filter(|b| **b)
269            .count();
270            if set_count == 0 {
271                anyhow::bail!(
272                    "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
273                    export.name
274                );
275            }
276            if set_count > 1 {
277                anyhow::bail!(
278                    "export '{}': specify exactly one of 'query', 'query_file', or 'table' (got {} set)",
279                    export.name,
280                    set_count
281                );
282            }
283            // SecOps: syntactic `query_file` checks must run at config-validate
284            // time so `rivet check` / `rivet doctor` catch them before any
285            // plan step. The same checks repeat (with a canonicalize-based
286            // symlink probe) in `ExportConfig::resolve_query` because the
287            // file may have been swapped between validation and read.
288            if let Some(file) = &export.query_file {
289                let p = std::path::Path::new(file);
290                if p.is_absolute() {
291                    anyhow::bail!(
292                        "export '{}': query_file must be a relative path: '{}'",
293                        export.name,
294                        file
295                    );
296                }
297                if p.components().any(|c| c == std::path::Component::ParentDir) {
298                    anyhow::bail!(
299                        "export '{}': query_file path must not contain '..': '{}'",
300                        export.name,
301                        file
302                    );
303                }
304            }
305            if export.destination.destination_type == DestinationType::S3 {
306                let ak = export.destination.access_key_env.is_some();
307                let sk = export.destination.secret_key_env.is_some();
308                if ak != sk {
309                    anyhow::bail!(
310                        "export '{}': S3 requires both access_key_env and secret_key_env, or neither (use default AWS credential chain)",
311                        export.name
312                    );
313                }
314            }
315
316            if export.destination.destination_type == DestinationType::Gcs
317                && export.destination.allow_anonymous
318                && export.destination.credentials_file.is_some()
319            {
320                anyhow::bail!(
321                    "export '{}': GCS allow_anonymous cannot be used together with credentials_file",
322                    export.name
323                );
324            }
325
326            if export.destination.destination_type == DestinationType::Azure {
327                let has_name = export.destination.account_name.is_some();
328                let has_key = export.destination.account_key_env.is_some();
329                let has_sas = export.destination.sas_token_env.is_some();
330                if export.destination.allow_anonymous {
331                    if has_name || has_key || has_sas {
332                        anyhow::bail!(
333                            "export '{}': Azure allow_anonymous cannot be combined with account_name/account_key_env/sas_token_env",
334                            export.name
335                        );
336                    }
337                } else if has_key && has_sas {
338                    anyhow::bail!(
339                        "export '{}': Azure account_key_env and sas_token_env are mutually exclusive — pick one auth mode",
340                        export.name
341                    );
342                } else if !has_name {
343                    anyhow::bail!(
344                        "export '{}': Azure requires account_name (plus account_key_env or sas_token_env), or allow_anonymous: true for Azurite",
345                        export.name
346                    );
347                } else if !has_key && !has_sas {
348                    anyhow::bail!(
349                        "export '{}': Azure requires account_key_env or sas_token_env (or allow_anonymous: true for Azurite)",
350                        export.name
351                    );
352                }
353            }
354
355            if let Some(cred_path) = &export.destination.credentials_file
356                && !std::path::Path::new(cred_path).exists()
357            {
358                anyhow::bail!(
359                    "export '{}': credentials_file '{}' does not exist",
360                    export.name,
361                    cred_path
362                );
363            }
364
365            if let Some(ref size_str) = export.max_file_size {
366                parse_file_size(size_str).map_err(|_| {
367                    anyhow::anyhow!(
368                        "export '{}': invalid max_file_size '{}'",
369                        export.name,
370                        size_str
371                    )
372                })?;
373            }
374
375            if let Some(level) = export.compression_level {
376                match export.compression {
377                    CompressionType::Zstd => {
378                        if !(1..=22).contains(&level) {
379                            anyhow::bail!(
380                                "export '{}': zstd compression_level must be 1..22, got {}",
381                                export.name,
382                                level
383                            );
384                        }
385                    }
386                    CompressionType::Gzip => {
387                        if level > 10 {
388                            anyhow::bail!(
389                                "export '{}': gzip compression_level must be 0..10, got {}",
390                                export.name,
391                                level
392                            );
393                        }
394                    }
395                    _ => {
396                        anyhow::bail!(
397                            "export '{}': compression_level is only supported for zstd and gzip",
398                            export.name
399                        );
400                    }
401                }
402            }
403
404            match export.mode {
405                ExportMode::Incremental => {
406                    if export.cursor_column.is_none() {
407                        anyhow::bail!(
408                            "export '{}': incremental mode requires cursor_column",
409                            export.name
410                        );
411                    }
412                    match export.incremental_cursor_mode {
413                        IncrementalCursorMode::Coalesce => {
414                            if export.cursor_fallback_column.is_none() {
415                                anyhow::bail!(
416                                    "export '{}': incremental_cursor_mode: coalesce requires cursor_fallback_column",
417                                    export.name
418                                );
419                            }
420                        }
421                        IncrementalCursorMode::SingleColumn => {
422                            if export.cursor_fallback_column.is_some() {
423                                anyhow::bail!(
424                                    "export '{}': cursor_fallback_column is only valid with incremental_cursor_mode: coalesce",
425                                    export.name
426                                );
427                            }
428                        }
429                    }
430                }
431                ExportMode::Chunked => {
432                    // `chunk_column` is mandatory unless the user used the `table:`
433                    // shortcut on a Postgres source — in that case it is auto-resolved
434                    // from the table's single-integer PK at plan-build time (see
435                    // `crate::plan::build::resolve_chunk_column`).
436                    if export.chunk_column.is_none() && export.table.is_none() {
437                        anyhow::bail!(
438                            "export '{}': chunked mode requires chunk_column \
439                             (or use `table:` shortcut on a Postgres source to auto-resolve from PK)",
440                            export.name
441                        );
442                    }
443                    // chunk_size == 0 would divide the range into zero-width
444                    // slices and (before the saturating fix in generate_chunks)
445                    // either infinite-loop or produce no progress. QA backlog
446                    // Task 5.1.
447                    if export.chunk_size == 0 {
448                        anyhow::bail!(
449                            "export '{}': chunked mode requires chunk_size >= 1 (got 0)",
450                            export.name
451                        );
452                    }
453                    // parallel == 0 means "spawn zero workers". Claiming tasks
454                    // with no workers stalls the pipeline. QA backlog Task 5.1.
455                    if export.parallel == 0 {
456                        anyhow::bail!(
457                            "export '{}': chunked mode requires parallel >= 1 (got 0)",
458                            export.name
459                        );
460                    }
461                    if let Some(0) = export.chunk_count {
462                        anyhow::bail!("export '{}': chunk_count must be >= 1", export.name);
463                    }
464                    if export.chunk_count.is_some() && export.chunk_dense {
465                        anyhow::bail!(
466                            "export '{}': chunk_count and chunk_dense are mutually exclusive",
467                            export.name
468                        );
469                    }
470                    if export.chunk_count.is_some() && export.chunk_by_days.is_some() {
471                        anyhow::bail!(
472                            "export '{}': chunk_count and chunk_by_days are mutually exclusive",
473                            export.name
474                        );
475                    }
476                }
477                ExportMode::TimeWindow => {
478                    if export.time_column.is_none() {
479                        anyhow::bail!(
480                            "export '{}': time_window mode requires time_column",
481                            export.name
482                        );
483                    }
484                    if export.days_window.is_none() {
485                        anyhow::bail!(
486                            "export '{}': time_window mode requires days_window",
487                            export.name
488                        );
489                    }
490                }
491                ExportMode::Full => {}
492            }
493
494            if export.chunk_dense && export.mode != ExportMode::Chunked {
495                anyhow::bail!(
496                    "export '{}': chunk_dense is only valid with mode: chunked",
497                    export.name
498                );
499            }
500
501            if let Some(days) = export.chunk_by_days {
502                if export.mode != ExportMode::Chunked {
503                    anyhow::bail!(
504                        "export '{}': chunk_by_days requires mode: chunked",
505                        export.name
506                    );
507                }
508                if export.chunk_dense {
509                    anyhow::bail!(
510                        "export '{}': chunk_by_days cannot be combined with chunk_dense",
511                        export.name
512                    );
513                }
514                if days == 0 {
515                    anyhow::bail!("export '{}': chunk_by_days must be at least 1", export.name);
516                }
517            }
518        }
519        Ok(())
520    }
521}
522
523#[cfg(test)]
524mod tests;