Skip to main content

rivet/config/
mod.rs

1pub mod cursor;
2mod destination;
3mod export;
4mod format;
5mod lints;
6mod notifications;
7pub mod resolve;
8pub mod schema;
9mod source;
10
11pub use cursor::IncrementalCursorMode;
12pub use destination::*;
13pub use export::*;
14pub use format::*;
15pub use notifications::*;
16#[allow(unused_imports)]
17pub(crate) use resolve::resolve_env_vars;
18pub use resolve::{parse_file_size, resolve_vars};
19pub use schema::generate_config_schema_pretty;
20pub use source::*;
21
22use schemars::JsonSchema;
23use serde::Deserialize;
24
25/// Top-level Rivet configuration root.
26///
27/// Operators write this struct as YAML (typically `rivet.yaml`).  The
28/// `JsonSchema` derive is the source of truth for the `schemas/rivet.schema.json`
29/// artifact and the `rivet schema config` command's output (v0.7.3 P0).
30#[derive(Debug, Deserialize, JsonSchema, Clone)]
31#[serde(deny_unknown_fields)]
32pub struct Config {
33    pub source: SourceConfig,
34    pub exports: Vec<ExportConfig>,
35    #[serde(default)]
36    pub notifications: Option<NotificationsConfig>,
37    #[serde(default)]
38    pub parallel_exports: bool,
39    #[serde(default)]
40    pub parallel_export_processes: bool,
41}
42
43impl Config {
44    pub fn load(path: &str) -> crate::error::Result<Self> {
45        Self::load_with_params(path, None)
46    }
47
48    pub fn load_with_params(
49        path: &str,
50        params: Option<&std::collections::HashMap<String, String>>,
51    ) -> crate::error::Result<Self> {
52        // F11 (0.7.5 audit): raw `std::io::Error` lost the path on
53        // not-found.  Wrap with the file path + a hint so the operator
54        // can see *which* config the tool could not open.
55        let contents = std::fs::read_to_string(path).map_err(|e| {
56            if e.kind() == std::io::ErrorKind::NotFound {
57                anyhow::anyhow!(
58                    "config file '{}' not found.\n  Hint: check the path, or run `rivet init` to generate one.",
59                    path
60                )
61            } else {
62                anyhow::anyhow!("cannot read config file '{}': {}", path, e)
63            }
64        })?;
65        // Warn about typo'd `--param` keys once per CLI invocation, using the
66        // un-resolved YAML as the haystack so the placeholders are still there.
67        // We pass the raw `contents` (not `resolved`) on purpose: after
68        // resolution the placeholders are gone, and every key would look unused.
69        resolve::warn_unused_params(&contents, params);
70        let resolved = resolve_vars(&contents, params)?;
71        // F12 (0.7.5 audit): YAML parse errors did not name the config
72        // file.  When loading from disk we know the path — thread it
73        // into the parse error.
74        Self::from_yaml(&resolved).map_err(|e| anyhow::anyhow!("config file '{}': {:#}", path, e))
75    }
76
77    pub fn from_yaml(yaml: &str) -> crate::error::Result<Self> {
78        Self::check_misplaced_tuning_fields(yaml)?;
79        let config: Config = serde_yaml_ng::from_str(yaml).map_err(lints::enhance_parse_error)?;
80        config.validate()?;
81        Ok(config)
82    }
83
84    /// Detect tuning-related fields placed directly under `source:` or an
85    /// `exports[]` entry instead of inside the `tuning:` sub-key. Without this
86    /// check serde silently ignores unknown keys and the user gets unexpected
87    /// defaults (e.g. batch_size=10 000 instead of the intended 1 000).
88    fn check_misplaced_tuning_fields(yaml: &str) -> crate::error::Result<()> {
89        const TUNING_FIELDS: &[&str] = &[
90            "batch_size",
91            "batch_size_memory_mb",
92            "throttle_ms",
93            "statement_timeout_s",
94            "max_retries",
95            "retry_backoff_ms",
96            "lock_timeout_s",
97            "memory_threshold_mb",
98            "profile",
99        ];
100
101        let root: serde_yaml_ng::Value = serde_yaml_ng::from_str(yaml)?;
102
103        if let Some(source) = root.get("source") {
104            let misplaced: Vec<&str> = TUNING_FIELDS
105                .iter()
106                .copied()
107                .filter(|&f| source.get(f).is_some())
108                .collect();
109            if !misplaced.is_empty() {
110                anyhow::bail!(
111                    "source: field(s) [{}] belong under 'source.tuning:', not directly under 'source:'. \
112                     Example:\n  source:\n    tuning:\n      {}: <value>",
113                    misplaced.join(", "),
114                    misplaced[0],
115                );
116            }
117        }
118
119        if let Some(exports) = root.get("exports").and_then(|e| e.as_sequence()) {
120            for (i, export) in exports.iter().enumerate() {
121                let name = export
122                    .get("name")
123                    .and_then(|n| n.as_str())
124                    .unwrap_or("<unnamed>");
125                let misplaced: Vec<&str> = TUNING_FIELDS
126                    .iter()
127                    .copied()
128                    .filter(|&f| export.get(f).is_some())
129                    .collect();
130                if !misplaced.is_empty() {
131                    anyhow::bail!(
132                        "export '{}' (index {}): field(s) [{}] belong under 'exports[].tuning:', \
133                         not directly in the export. Example:\n  exports:\n    - name: {}\n      tuning:\n        {}: <value>",
134                        name,
135                        i,
136                        misplaced.join(", "),
137                        name,
138                        misplaced[0],
139                    );
140                }
141            }
142        }
143
144        Ok(())
145    }
146
147    /// Reject a config before any plan/connect step. The body is split into
148    /// three cohesive validators so each can be read — and unit-tested — on its
149    /// own: the export-list shape, the source connection block, and the
150    /// per-export rules. The end-to-end surface (`Config::from_yaml`) is
151    /// covered by `config/tests/{validation,secops}.rs`; the split additionally
152    /// lets a rule be exercised directly via `validate_export`.
153    fn validate(&self) -> crate::error::Result<()> {
154        self.validate_exports_list()?;
155        self.validate_source_connection()?;
156        for export in &self.exports {
157            self.validate_export(export)?;
158        }
159        Ok(())
160    }
161
162    /// Whole-config shape: at least one export, names unique.
163    fn validate_exports_list(&self) -> crate::error::Result<()> {
164        // An empty `exports:` list is almost always a typo (wrong config file,
165        // dropped anchor, merged doc with the anchor section missing). Running
166        // with zero exports is a silent no-op that looks like success in CI;
167        // reject fast instead. See QA backlog Task 5.1.
168        if self.exports.is_empty() {
169            anyhow::bail!("exports: at least one export must be defined (got empty list)");
170        }
171
172        // Duplicate export names break state tracking: `export_state`,
173        // `file_log`, and `chunk_run` are all keyed by `export_name`, so
174        // two configs with the same name silently share cursor/file-log rows.
175        // QA backlog Task 5.1.
176        let mut seen: std::collections::HashSet<&str> =
177            std::collections::HashSet::with_capacity(self.exports.len());
178        for e in &self.exports {
179            if !seen.insert(e.name.as_str()) {
180                anyhow::bail!(
181                    "exports: duplicate export name '{}' (each export must have a unique name; state is keyed by name)",
182                    e.name
183                );
184            }
185        }
186        Ok(())
187    }
188
189    /// Source connection block: exactly one connection method, well-formed,
190    /// and the source-level tuning that is shared by every export.
191    fn validate_source_connection(&self) -> crate::error::Result<()> {
192        if let Some(t) = &self.source.tuning
193            && t.batch_size.is_some()
194            && t.batch_size_memory_mb.is_some()
195        {
196            anyhow::bail!("tuning: batch_size and batch_size_memory_mb are mutually exclusive");
197        }
198
199        if !self.source.has_url_fields() && !self.source.has_structured_fields() {
200            // First-run footgun: a config that forgot the source block
201            // entirely.  Show the recommended path (`url_env`) up-front;
202            // operators who actually want structured fields know to look
203            // for them.
204            anyhow::bail!(
205                "source: no connection method configured. Add one of:\n  url_env: DATABASE_URL                          (URL from env var — recommended)\n  url: 'postgresql://user:pass@host:5432/db'      (inline — not recommended for committed configs)\n  url_file: /etc/rivet/source.url                 (URL from file — rotation-friendly)\n  host/user/database/...                          (structured fields under `source:`)"
206            );
207        }
208
209        if self.source.has_url_fields() {
210            let url_count = [
211                &self.source.url,
212                &self.source.url_env,
213                &self.source.url_file,
214            ]
215            .iter()
216            .filter(|u| u.is_some())
217            .count();
218            if url_count > 1 {
219                anyhow::bail!(
220                    "source: specify exactly one of 'url', 'url_env', or 'url_file' (got {} set).\n  Hint: pick one — `url_env` is recommended so credentials never enter the YAML.",
221                    url_count
222                );
223            }
224        }
225
226        if self.source.has_url_fields() && self.source.has_structured_fields() {
227            anyhow::bail!(
228                "source: pick either URL-based config (url/url_env/url_file) OR structured fields (host/user/database/port/password_env), not both.\n  Hint: remove whichever block you don't want; mixing the two is ambiguous."
229            );
230        }
231
232        if self.source.has_structured_fields() {
233            if self.source.host.is_none() {
234                anyhow::bail!(
235                    "source: structured config is missing 'host'.\n  Hint: add `host: localhost` (or your DB host) under `source:` in rivet.yaml.\n  Or switch to URL-based config: `url_env: DATABASE_URL`."
236                );
237            }
238            if self.source.user.is_none() {
239                anyhow::bail!(
240                    "source: structured config is missing 'user'.\n  Hint: add `user: <username>` under `source:` in rivet.yaml."
241                );
242            }
243            if self.source.database.is_none() {
244                anyhow::bail!(
245                    "source: structured config is missing 'database'.\n  Hint: add `database: <dbname>` under `source:` in rivet.yaml."
246                );
247            }
248            if self.source.password.is_some() && self.source.password_env.is_some() {
249                anyhow::bail!(
250                    "source: specify 'password' OR 'password_env', not both.\n  Hint: prefer `password_env: DB_PASSWORD` so credentials never enter the YAML."
251                );
252            }
253        }
254        Ok(())
255    }
256
257    /// Per-export rules: effective tuning, query source, `query_file` SecOps,
258    /// destination auth, compression, and the mode/chunk matrix. Takes `&self`
259    /// because effective tuning merges the source-level block.
260    fn validate_export(&self, export: &ExportConfig) -> crate::error::Result<()> {
261        let merged =
262            crate::tuning::merge_tuning_config(self.source.tuning.as_ref(), export.tuning.as_ref());
263        if let Some(t) = merged
264            && t.batch_size.is_some()
265            && t.batch_size_memory_mb.is_some()
266        {
267            anyhow::bail!(
268                "export '{}': effective tuning has both batch_size and batch_size_memory_mb (mutually exclusive)",
269                export.name
270            );
271        }
272        if let Some(et) = &export.tuning
273            && et.batch_size.is_some()
274            && et.batch_size_memory_mb.is_some()
275        {
276            anyhow::bail!(
277                "export '{}': tuning.batch_size and tuning.batch_size_memory_mb are mutually exclusive",
278                export.name
279            );
280        }
281
282        let set_count = [
283            export.query.is_some(),
284            export.query_file.is_some(),
285            export.table.is_some(),
286        ]
287        .iter()
288        .filter(|b| **b)
289        .count();
290        if set_count == 0 {
291            anyhow::bail!(
292                "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
293                export.name
294            );
295        }
296        if set_count > 1 {
297            anyhow::bail!(
298                "export '{}': specify exactly one of 'query', 'query_file', or 'table' (got {} set)",
299                export.name,
300                set_count
301            );
302        }
303        // SecOps: syntactic `query_file` checks must run at config-validate
304        // time so `rivet check` / `rivet doctor` catch them before any
305        // plan step. The same checks repeat (with a canonicalize-based
306        // symlink probe) in `ExportConfig::resolve_query` because the
307        // file may have been swapped between validation and read.
308        if let Some(file) = &export.query_file {
309            let p = std::path::Path::new(file);
310            if p.is_absolute() {
311                anyhow::bail!(
312                    "export '{}': query_file must be a relative path: '{}'",
313                    export.name,
314                    file
315                );
316            }
317            if p.components().any(|c| c == std::path::Component::ParentDir) {
318                anyhow::bail!(
319                    "export '{}': query_file path must not contain '..': '{}'",
320                    export.name,
321                    file
322                );
323            }
324        }
325        if export.destination.destination_type == DestinationType::S3 {
326            let ak = export.destination.access_key_env.is_some();
327            let sk = export.destination.secret_key_env.is_some();
328            if ak != sk {
329                anyhow::bail!(
330                    "export '{}': S3 requires both access_key_env and secret_key_env, or neither (use default AWS credential chain)",
331                    export.name
332                );
333            }
334        }
335
336        if export.destination.destination_type == DestinationType::Gcs
337            && export.destination.allow_anonymous
338            && export.destination.credentials_file.is_some()
339        {
340            anyhow::bail!(
341                "export '{}': GCS allow_anonymous cannot be used together with credentials_file",
342                export.name
343            );
344        }
345
346        if export.destination.destination_type == DestinationType::Azure {
347            let has_name = export.destination.account_name.is_some();
348            let has_key = export.destination.account_key_env.is_some();
349            let has_sas = export.destination.sas_token_env.is_some();
350            if export.destination.allow_anonymous {
351                if has_name || has_key || has_sas {
352                    anyhow::bail!(
353                        "export '{}': Azure allow_anonymous cannot be combined with account_name/account_key_env/sas_token_env",
354                        export.name
355                    );
356                }
357            } else if has_key && has_sas {
358                anyhow::bail!(
359                    "export '{}': Azure account_key_env and sas_token_env are mutually exclusive — pick one auth mode",
360                    export.name
361                );
362            } else if !has_name {
363                anyhow::bail!(
364                    "export '{}': Azure requires account_name (plus account_key_env or sas_token_env), or allow_anonymous: true for Azurite",
365                    export.name
366                );
367            } else if !has_key && !has_sas {
368                anyhow::bail!(
369                    "export '{}': Azure requires account_key_env or sas_token_env (or allow_anonymous: true for Azurite)",
370                    export.name
371                );
372            }
373        }
374
375        if let Some(cred_path) = &export.destination.credentials_file
376            && !std::path::Path::new(cred_path).exists()
377        {
378            anyhow::bail!(
379                "export '{}': credentials_file '{}' does not exist",
380                export.name,
381                cred_path
382            );
383        }
384
385        if let Some(ref size_str) = export.max_file_size {
386            parse_file_size(size_str).map_err(|_| {
387                anyhow::anyhow!(
388                    "export '{}': invalid max_file_size '{}'",
389                    export.name,
390                    size_str
391                )
392            })?;
393        }
394
395        if let Some(level) = export.compression_level {
396            match export.compression {
397                CompressionType::Zstd => {
398                    if !(1..=22).contains(&level) {
399                        anyhow::bail!(
400                            "export '{}': zstd compression_level must be 1..22, got {}",
401                            export.name,
402                            level
403                        );
404                    }
405                }
406                CompressionType::Gzip => {
407                    if level > 10 {
408                        anyhow::bail!(
409                            "export '{}': gzip compression_level must be 0..10, got {}",
410                            export.name,
411                            level
412                        );
413                    }
414                }
415                _ => {
416                    anyhow::bail!(
417                        "export '{}': compression_level is only supported for zstd and gzip",
418                        export.name
419                    );
420                }
421            }
422        }
423
424        match export.mode {
425            ExportMode::Incremental => {
426                if export.cursor_column.is_none() {
427                    anyhow::bail!(
428                        "export '{}': incremental mode requires cursor_column",
429                        export.name
430                    );
431                }
432                match export.incremental_cursor_mode {
433                    IncrementalCursorMode::Coalesce => {
434                        if export.cursor_fallback_column.is_none() {
435                            anyhow::bail!(
436                                "export '{}': incremental_cursor_mode: coalesce requires cursor_fallback_column",
437                                export.name
438                            );
439                        }
440                    }
441                    IncrementalCursorMode::SingleColumn => {
442                        if export.cursor_fallback_column.is_some() {
443                            anyhow::bail!(
444                                "export '{}': cursor_fallback_column is only valid with incremental_cursor_mode: coalesce",
445                                export.name
446                            );
447                        }
448                    }
449                }
450            }
451            ExportMode::Chunked => {
452                // `chunk_column` is mandatory unless the user used the `table:`
453                // shortcut on a Postgres source — in that case it is auto-resolved
454                // from the table's single-integer PK at plan-build time (see
455                // `crate::plan::build::resolve_chunk_column`).
456                if export.chunk_column.is_none() && export.table.is_none() {
457                    anyhow::bail!(
458                        "export '{}': chunked mode requires chunk_column \
459                         (or use `table:` shortcut on a Postgres source to auto-resolve from PK)",
460                        export.name
461                    );
462                }
463                // chunk_size == 0 would divide the range into zero-width
464                // slices and (before the saturating fix in generate_chunks)
465                // either infinite-loop or produce no progress. QA backlog
466                // Task 5.1.
467                if export.chunk_size == 0 {
468                    anyhow::bail!(
469                        "export '{}': chunked mode requires chunk_size >= 1 (got 0)",
470                        export.name
471                    );
472                }
473                // parallel == 0 means "spawn zero workers". Claiming tasks
474                // with no workers stalls the pipeline. QA backlog Task 5.1.
475                if export.parallel == 0 {
476                    anyhow::bail!(
477                        "export '{}': chunked mode requires parallel >= 1 (got 0)",
478                        export.name
479                    );
480                }
481                if let Some(0) = export.chunk_count {
482                    anyhow::bail!("export '{}': chunk_count must be >= 1", export.name);
483                }
484                if export.chunk_count.is_some() && export.chunk_dense {
485                    anyhow::bail!(
486                        "export '{}': chunk_count and chunk_dense are mutually exclusive",
487                        export.name
488                    );
489                }
490                if export.chunk_count.is_some() && export.chunk_by_days.is_some() {
491                    anyhow::bail!(
492                        "export '{}': chunk_count and chunk_by_days are mutually exclusive",
493                        export.name
494                    );
495                }
496            }
497            ExportMode::TimeWindow => {
498                if export.time_column.is_none() {
499                    anyhow::bail!(
500                        "export '{}': time_window mode requires time_column",
501                        export.name
502                    );
503                }
504                if export.days_window.is_none() {
505                    anyhow::bail!(
506                        "export '{}': time_window mode requires days_window",
507                        export.name
508                    );
509                }
510            }
511            ExportMode::Full => {}
512        }
513
514        if export.chunk_dense && export.mode != ExportMode::Chunked {
515            anyhow::bail!(
516                "export '{}': chunk_dense is only valid with mode: chunked",
517                export.name
518            );
519        }
520
521        if let Some(days) = export.chunk_by_days {
522            if export.mode != ExportMode::Chunked {
523                anyhow::bail!(
524                    "export '{}': chunk_by_days requires mode: chunked",
525                    export.name
526                );
527            }
528            if export.chunk_dense {
529                anyhow::bail!(
530                    "export '{}': chunk_by_days cannot be combined with chunk_dense",
531                    export.name
532                );
533            }
534            if days == 0 {
535                anyhow::bail!("export '{}': chunk_by_days must be at least 1", export.name);
536            }
537        }
538        Ok(())
539    }
540}
541
542#[cfg(test)]
543mod tests;