rivet-cli 0.9.2

Rivet: PostgreSQL/MySQL/SQL Server → Parquet/CSV (local, S3, GCS, Azure). Crate name rivet-cli; binary rivet.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
pub mod cursor;
mod destination;
mod export;
mod format;
mod lints;
mod notifications;
pub mod resolve;
pub mod schema;
mod source;

pub use cursor::IncrementalCursorMode;
pub use destination::*;
pub use export::*;
pub use format::*;
pub use notifications::*;
#[allow(unused_imports)]
pub(crate) use resolve::resolve_env_vars;
pub use resolve::{parse_file_size, resolve_vars};
pub use schema::generate_config_schema_pretty;
pub use source::*;

use schemars::JsonSchema;
use serde::Deserialize;

/// Top-level Rivet configuration root.
///
/// Operators write this struct as YAML (typically `rivet.yaml`).  The
/// `JsonSchema` derive is the source of truth for the `schemas/rivet.schema.json`
/// artifact and the `rivet schema config` command's output (v0.7.3 P0).
#[derive(Debug, Deserialize, JsonSchema, Clone)]
#[serde(deny_unknown_fields)]
pub struct Config {
    pub source: SourceConfig,
    pub exports: Vec<ExportConfig>,
    #[serde(default)]
    pub notifications: Option<NotificationsConfig>,
    #[serde(default)]
    pub parallel_exports: bool,
    #[serde(default)]
    pub parallel_export_processes: bool,
}

impl Config {
    pub fn load(path: &str) -> crate::error::Result<Self> {
        Self::load_with_params(path, None)
    }

    pub fn load_with_params(
        path: &str,
        params: Option<&std::collections::HashMap<String, String>>,
    ) -> crate::error::Result<Self> {
        // F11 (0.7.5 audit): raw `std::io::Error` lost the path on
        // not-found.  Wrap with the file path + a hint so the operator
        // can see *which* config the tool could not open.
        let contents = std::fs::read_to_string(path).map_err(|e| {
            if e.kind() == std::io::ErrorKind::NotFound {
                anyhow::anyhow!(
                    "config file '{}' not found.\n  Hint: check the path, or run `rivet init` to generate one.",
                    path
                )
            } else {
                anyhow::anyhow!("cannot read config file '{}': {}", path, e)
            }
        })?;
        // Warn about typo'd `--param` keys once per CLI invocation, using the
        // un-resolved YAML as the haystack so the placeholders are still there.
        // We pass the raw `contents` (not `resolved`) on purpose: after
        // resolution the placeholders are gone, and every key would look unused.
        resolve::warn_unused_params(&contents, params);
        let resolved = resolve_vars(&contents, params)?;
        // F12 (0.7.5 audit): YAML parse errors did not name the config
        // file.  When loading from disk we know the path — thread it
        // into the parse error.
        Self::from_yaml(&resolved).map_err(|e| anyhow::anyhow!("config file '{}': {:#}", path, e))
    }

    pub fn from_yaml(yaml: &str) -> crate::error::Result<Self> {
        Self::check_misplaced_tuning_fields(yaml)?;
        let config: Config = serde_yaml_ng::from_str(yaml).map_err(lints::enhance_parse_error)?;
        config.validate()?;
        Ok(config)
    }

    /// Detect tuning-related fields placed directly under `source:` or an
    /// `exports[]` entry instead of inside the `tuning:` sub-key. Without this
    /// check serde silently ignores unknown keys and the user gets unexpected
    /// defaults (e.g. batch_size=10 000 instead of the intended 1 000).
    fn check_misplaced_tuning_fields(yaml: &str) -> crate::error::Result<()> {
        const TUNING_FIELDS: &[&str] = &[
            "batch_size",
            "batch_size_memory_mb",
            "throttle_ms",
            "statement_timeout_s",
            "max_retries",
            "retry_backoff_ms",
            "lock_timeout_s",
            "memory_threshold_mb",
            "profile",
        ];

        let root: serde_yaml_ng::Value = serde_yaml_ng::from_str(yaml)?;

        if let Some(source) = root.get("source") {
            let misplaced: Vec<&str> = TUNING_FIELDS
                .iter()
                .copied()
                .filter(|&f| source.get(f).is_some())
                .collect();
            if !misplaced.is_empty() {
                anyhow::bail!(
                    "source: field(s) [{}] belong under 'source.tuning:', not directly under 'source:'. \
                     Example:\n  source:\n    tuning:\n      {}: <value>",
                    misplaced.join(", "),
                    misplaced[0],
                );
            }
        }

        if let Some(exports) = root.get("exports").and_then(|e| e.as_sequence()) {
            for (i, export) in exports.iter().enumerate() {
                let name = export
                    .get("name")
                    .and_then(|n| n.as_str())
                    .unwrap_or("<unnamed>");
                let misplaced: Vec<&str> = TUNING_FIELDS
                    .iter()
                    .copied()
                    .filter(|&f| export.get(f).is_some())
                    .collect();
                if !misplaced.is_empty() {
                    anyhow::bail!(
                        "export '{}' (index {}): field(s) [{}] belong under 'exports[].tuning:', \
                         not directly in the export. Example:\n  exports:\n    - name: {}\n      tuning:\n        {}: <value>",
                        name,
                        i,
                        misplaced.join(", "),
                        name,
                        misplaced[0],
                    );
                }
            }
        }

        Ok(())
    }

    fn validate(&self) -> crate::error::Result<()> {
        // An empty `exports:` list is almost always a typo (wrong config file,
        // dropped anchor, merged doc with the anchor section missing). Running
        // with zero exports is a silent no-op that looks like success in CI;
        // reject fast instead. See QA backlog Task 5.1.
        if self.exports.is_empty() {
            anyhow::bail!("exports: at least one export must be defined (got empty list)");
        }

        // Duplicate export names break state tracking: `export_state`,
        // `file_log`, and `chunk_run` are all keyed by `export_name`, so
        // two configs with the same name silently share cursor/file-log rows.
        // QA backlog Task 5.1.
        {
            let mut seen: std::collections::HashSet<&str> =
                std::collections::HashSet::with_capacity(self.exports.len());
            for e in &self.exports {
                if !seen.insert(e.name.as_str()) {
                    anyhow::bail!(
                        "exports: duplicate export name '{}' (each export must have a unique name; state is keyed by name)",
                        e.name
                    );
                }
            }
        }

        if let Some(t) = &self.source.tuning
            && t.batch_size.is_some()
            && t.batch_size_memory_mb.is_some()
        {
            anyhow::bail!("tuning: batch_size and batch_size_memory_mb are mutually exclusive");
        }

        for export in &self.exports {
            let merged = crate::tuning::merge_tuning_config(
                self.source.tuning.as_ref(),
                export.tuning.as_ref(),
            );
            if let Some(t) = merged
                && t.batch_size.is_some()
                && t.batch_size_memory_mb.is_some()
            {
                anyhow::bail!(
                    "export '{}': effective tuning has both batch_size and batch_size_memory_mb (mutually exclusive)",
                    export.name
                );
            }
            if let Some(et) = &export.tuning
                && et.batch_size.is_some()
                && et.batch_size_memory_mb.is_some()
            {
                anyhow::bail!(
                    "export '{}': tuning.batch_size and tuning.batch_size_memory_mb are mutually exclusive",
                    export.name
                );
            }
        }

        if !self.source.has_url_fields() && !self.source.has_structured_fields() {
            // First-run footgun: a config that forgot the source block
            // entirely.  Show the recommended path (`url_env`) up-front;
            // operators who actually want structured fields know to look
            // for them.
            anyhow::bail!(
                "source: no connection method configured. Add one of:\n  url_env: DATABASE_URL                          (URL from env var — recommended)\n  url: 'postgresql://user:pass@host:5432/db'      (inline — not recommended for committed configs)\n  url_file: /etc/rivet/source.url                 (URL from file — rotation-friendly)\n  host/user/database/...                          (structured fields under `source:`)"
            );
        }

        if self.source.has_url_fields() {
            let url_count = [
                &self.source.url,
                &self.source.url_env,
                &self.source.url_file,
            ]
            .iter()
            .filter(|u| u.is_some())
            .count();
            if url_count > 1 {
                anyhow::bail!(
                    "source: specify exactly one of 'url', 'url_env', or 'url_file' (got {} set).\n  Hint: pick one — `url_env` is recommended so credentials never enter the YAML.",
                    url_count
                );
            }
        }

        if self.source.has_url_fields() && self.source.has_structured_fields() {
            anyhow::bail!(
                "source: pick either URL-based config (url/url_env/url_file) OR structured fields (host/user/database/port/password_env), not both.\n  Hint: remove whichever block you don't want; mixing the two is ambiguous."
            );
        }

        if self.source.has_structured_fields() {
            if self.source.host.is_none() {
                anyhow::bail!(
                    "source: structured config is missing 'host'.\n  Hint: add `host: localhost` (or your DB host) under `source:` in rivet.yaml.\n  Or switch to URL-based config: `url_env: DATABASE_URL`."
                );
            }
            if self.source.user.is_none() {
                anyhow::bail!(
                    "source: structured config is missing 'user'.\n  Hint: add `user: <username>` under `source:` in rivet.yaml."
                );
            }
            if self.source.database.is_none() {
                anyhow::bail!(
                    "source: structured config is missing 'database'.\n  Hint: add `database: <dbname>` under `source:` in rivet.yaml."
                );
            }
            if self.source.password.is_some() && self.source.password_env.is_some() {
                anyhow::bail!(
                    "source: specify 'password' OR 'password_env', not both.\n  Hint: prefer `password_env: DB_PASSWORD` so credentials never enter the YAML."
                );
            }
        }

        for export in &self.exports {
            let set_count = [
                export.query.is_some(),
                export.query_file.is_some(),
                export.table.is_some(),
            ]
            .iter()
            .filter(|b| **b)
            .count();
            if set_count == 0 {
                anyhow::bail!(
                    "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
                    export.name
                );
            }
            if set_count > 1 {
                anyhow::bail!(
                    "export '{}': specify exactly one of 'query', 'query_file', or 'table' (got {} set)",
                    export.name,
                    set_count
                );
            }
            // SecOps: syntactic `query_file` checks must run at config-validate
            // time so `rivet check` / `rivet doctor` catch them before any
            // plan step. The same checks repeat (with a canonicalize-based
            // symlink probe) in `ExportConfig::resolve_query` because the
            // file may have been swapped between validation and read.
            if let Some(file) = &export.query_file {
                let p = std::path::Path::new(file);
                if p.is_absolute() {
                    anyhow::bail!(
                        "export '{}': query_file must be a relative path: '{}'",
                        export.name,
                        file
                    );
                }
                if p.components().any(|c| c == std::path::Component::ParentDir) {
                    anyhow::bail!(
                        "export '{}': query_file path must not contain '..': '{}'",
                        export.name,
                        file
                    );
                }
            }
            if export.destination.destination_type == DestinationType::S3 {
                let ak = export.destination.access_key_env.is_some();
                let sk = export.destination.secret_key_env.is_some();
                if ak != sk {
                    anyhow::bail!(
                        "export '{}': S3 requires both access_key_env and secret_key_env, or neither (use default AWS credential chain)",
                        export.name
                    );
                }
            }

            if export.destination.destination_type == DestinationType::Gcs
                && export.destination.allow_anonymous
                && export.destination.credentials_file.is_some()
            {
                anyhow::bail!(
                    "export '{}': GCS allow_anonymous cannot be used together with credentials_file",
                    export.name
                );
            }

            if export.destination.destination_type == DestinationType::Azure {
                let has_name = export.destination.account_name.is_some();
                let has_key = export.destination.account_key_env.is_some();
                let has_sas = export.destination.sas_token_env.is_some();
                if export.destination.allow_anonymous {
                    if has_name || has_key || has_sas {
                        anyhow::bail!(
                            "export '{}': Azure allow_anonymous cannot be combined with account_name/account_key_env/sas_token_env",
                            export.name
                        );
                    }
                } else if has_key && has_sas {
                    anyhow::bail!(
                        "export '{}': Azure account_key_env and sas_token_env are mutually exclusive — pick one auth mode",
                        export.name
                    );
                } else if !has_name {
                    anyhow::bail!(
                        "export '{}': Azure requires account_name (plus account_key_env or sas_token_env), or allow_anonymous: true for Azurite",
                        export.name
                    );
                } else if !has_key && !has_sas {
                    anyhow::bail!(
                        "export '{}': Azure requires account_key_env or sas_token_env (or allow_anonymous: true for Azurite)",
                        export.name
                    );
                }
            }

            if let Some(cred_path) = &export.destination.credentials_file
                && !std::path::Path::new(cred_path).exists()
            {
                anyhow::bail!(
                    "export '{}': credentials_file '{}' does not exist",
                    export.name,
                    cred_path
                );
            }

            if let Some(ref size_str) = export.max_file_size {
                parse_file_size(size_str).map_err(|_| {
                    anyhow::anyhow!(
                        "export '{}': invalid max_file_size '{}'",
                        export.name,
                        size_str
                    )
                })?;
            }

            if let Some(level) = export.compression_level {
                match export.compression {
                    CompressionType::Zstd => {
                        if !(1..=22).contains(&level) {
                            anyhow::bail!(
                                "export '{}': zstd compression_level must be 1..22, got {}",
                                export.name,
                                level
                            );
                        }
                    }
                    CompressionType::Gzip => {
                        if level > 10 {
                            anyhow::bail!(
                                "export '{}': gzip compression_level must be 0..10, got {}",
                                export.name,
                                level
                            );
                        }
                    }
                    _ => {
                        anyhow::bail!(
                            "export '{}': compression_level is only supported for zstd and gzip",
                            export.name
                        );
                    }
                }
            }

            match export.mode {
                ExportMode::Incremental => {
                    if export.cursor_column.is_none() {
                        anyhow::bail!(
                            "export '{}': incremental mode requires cursor_column",
                            export.name
                        );
                    }
                    match export.incremental_cursor_mode {
                        IncrementalCursorMode::Coalesce => {
                            if export.cursor_fallback_column.is_none() {
                                anyhow::bail!(
                                    "export '{}': incremental_cursor_mode: coalesce requires cursor_fallback_column",
                                    export.name
                                );
                            }
                        }
                        IncrementalCursorMode::SingleColumn => {
                            if export.cursor_fallback_column.is_some() {
                                anyhow::bail!(
                                    "export '{}': cursor_fallback_column is only valid with incremental_cursor_mode: coalesce",
                                    export.name
                                );
                            }
                        }
                    }
                }
                ExportMode::Chunked => {
                    // `chunk_column` is mandatory unless the user used the `table:`
                    // shortcut on a Postgres source — in that case it is auto-resolved
                    // from the table's single-integer PK at plan-build time (see
                    // `crate::plan::build::resolve_chunk_column`).
                    if export.chunk_column.is_none() && export.table.is_none() {
                        anyhow::bail!(
                            "export '{}': chunked mode requires chunk_column \
                             (or use `table:` shortcut on a Postgres source to auto-resolve from PK)",
                            export.name
                        );
                    }
                    // chunk_size == 0 would divide the range into zero-width
                    // slices and (before the saturating fix in generate_chunks)
                    // either infinite-loop or produce no progress. QA backlog
                    // Task 5.1.
                    if export.chunk_size == 0 {
                        anyhow::bail!(
                            "export '{}': chunked mode requires chunk_size >= 1 (got 0)",
                            export.name
                        );
                    }
                    // parallel == 0 means "spawn zero workers". Claiming tasks
                    // with no workers stalls the pipeline. QA backlog Task 5.1.
                    if export.parallel == 0 {
                        anyhow::bail!(
                            "export '{}': chunked mode requires parallel >= 1 (got 0)",
                            export.name
                        );
                    }
                    if let Some(0) = export.chunk_count {
                        anyhow::bail!("export '{}': chunk_count must be >= 1", export.name);
                    }
                    if export.chunk_count.is_some() && export.chunk_dense {
                        anyhow::bail!(
                            "export '{}': chunk_count and chunk_dense are mutually exclusive",
                            export.name
                        );
                    }
                    if export.chunk_count.is_some() && export.chunk_by_days.is_some() {
                        anyhow::bail!(
                            "export '{}': chunk_count and chunk_by_days are mutually exclusive",
                            export.name
                        );
                    }
                }
                ExportMode::TimeWindow => {
                    if export.time_column.is_none() {
                        anyhow::bail!(
                            "export '{}': time_window mode requires time_column",
                            export.name
                        );
                    }
                    if export.days_window.is_none() {
                        anyhow::bail!(
                            "export '{}': time_window mode requires days_window",
                            export.name
                        );
                    }
                }
                ExportMode::Full => {}
            }

            if export.chunk_dense && export.mode != ExportMode::Chunked {
                anyhow::bail!(
                    "export '{}': chunk_dense is only valid with mode: chunked",
                    export.name
                );
            }

            if let Some(days) = export.chunk_by_days {
                if export.mode != ExportMode::Chunked {
                    anyhow::bail!(
                        "export '{}': chunk_by_days requires mode: chunked",
                        export.name
                    );
                }
                if export.chunk_dense {
                    anyhow::bail!(
                        "export '{}': chunk_by_days cannot be combined with chunk_dense",
                        export.name
                    );
                }
                if days == 0 {
                    anyhow::bail!("export '{}': chunk_by_days must be at least 1", export.name);
                }
            }
        }
        Ok(())
    }
}

#[cfg(test)]
mod tests;