1pub mod cursor;
2mod destination;
3mod export;
4mod format;
5mod lints;
6mod notifications;
7pub mod resolve;
8pub mod schema;
9mod source;
10
11pub use cursor::IncrementalCursorMode;
12pub use destination::*;
13pub use export::*;
14pub use format::*;
15pub use notifications::*;
16#[allow(unused_imports)]
17pub(crate) use resolve::resolve_env_vars;
18pub use resolve::{parse_file_size, resolve_vars};
19pub use schema::generate_config_schema_pretty;
20pub use source::*;
21
22use schemars::JsonSchema;
23use serde::Deserialize;
24
25#[derive(Debug, Deserialize, JsonSchema, Clone)]
31#[serde(deny_unknown_fields)]
32pub struct Config {
33 pub source: SourceConfig,
34 pub exports: Vec<ExportConfig>,
35 #[serde(default)]
36 pub notifications: Option<NotificationsConfig>,
37 #[serde(default)]
38 pub parallel_exports: bool,
39 #[serde(default)]
40 pub parallel_export_processes: bool,
41}
42
43impl Config {
44 pub fn load(path: &str) -> crate::error::Result<Self> {
45 Self::load_with_params(path, None)
46 }
47
48 pub fn load_with_params(
49 path: &str,
50 params: Option<&std::collections::HashMap<String, String>>,
51 ) -> crate::error::Result<Self> {
52 let contents = std::fs::read_to_string(path).map_err(|e| {
56 if e.kind() == std::io::ErrorKind::NotFound {
57 anyhow::anyhow!(
58 "config file '{}' not found.\n Hint: check the path, or run `rivet init` to generate one.",
59 path
60 )
61 } else {
62 anyhow::anyhow!("cannot read config file '{}': {}", path, e)
63 }
64 })?;
65 resolve::warn_unused_params(&contents, params);
70 let resolved = resolve_vars(&contents, params)?;
71 Self::from_yaml(&resolved).map_err(|e| anyhow::anyhow!("config file '{}': {:#}", path, e))
75 }
76
77 pub fn from_yaml(yaml: &str) -> crate::error::Result<Self> {
78 Self::check_misplaced_tuning_fields(yaml)?;
79 let config: Config = serde_yaml_ng::from_str(yaml).map_err(lints::enhance_parse_error)?;
80 config.validate()?;
81 Ok(config)
82 }
83
84 fn check_misplaced_tuning_fields(yaml: &str) -> crate::error::Result<()> {
89 const TUNING_FIELDS: &[&str] = &[
90 "batch_size",
91 "batch_size_memory_mb",
92 "throttle_ms",
93 "statement_timeout_s",
94 "max_retries",
95 "retry_backoff_ms",
96 "lock_timeout_s",
97 "memory_threshold_mb",
98 "profile",
99 ];
100
101 let root: serde_yaml_ng::Value = serde_yaml_ng::from_str(yaml)?;
102
103 if let Some(source) = root.get("source") {
104 let misplaced: Vec<&str> = TUNING_FIELDS
105 .iter()
106 .copied()
107 .filter(|&f| source.get(f).is_some())
108 .collect();
109 if !misplaced.is_empty() {
110 anyhow::bail!(
111 "source: field(s) [{}] belong under 'source.tuning:', not directly under 'source:'. \
112 Example:\n source:\n tuning:\n {}: <value>",
113 misplaced.join(", "),
114 misplaced[0],
115 );
116 }
117 }
118
119 if let Some(exports) = root.get("exports").and_then(|e| e.as_sequence()) {
120 for (i, export) in exports.iter().enumerate() {
121 let name = export
122 .get("name")
123 .and_then(|n| n.as_str())
124 .unwrap_or("<unnamed>");
125 let misplaced: Vec<&str> = TUNING_FIELDS
126 .iter()
127 .copied()
128 .filter(|&f| export.get(f).is_some())
129 .collect();
130 if !misplaced.is_empty() {
131 anyhow::bail!(
132 "export '{}' (index {}): field(s) [{}] belong under 'exports[].tuning:', \
133 not directly in the export. Example:\n exports:\n - name: {}\n tuning:\n {}: <value>",
134 name,
135 i,
136 misplaced.join(", "),
137 name,
138 misplaced[0],
139 );
140 }
141 }
142 }
143
144 Ok(())
145 }
146
147 fn validate(&self) -> crate::error::Result<()> {
154 self.validate_exports_list()?;
155 self.validate_source_connection()?;
156 for export in &self.exports {
157 self.validate_export(export)?;
158 }
159 Ok(())
160 }
161
162 fn validate_exports_list(&self) -> crate::error::Result<()> {
164 if self.exports.is_empty() {
169 anyhow::bail!("exports: at least one export must be defined (got empty list)");
170 }
171
172 let mut seen: std::collections::HashSet<&str> =
177 std::collections::HashSet::with_capacity(self.exports.len());
178 for e in &self.exports {
179 if !seen.insert(e.name.as_str()) {
180 anyhow::bail!(
181 "exports: duplicate export name '{}' (each export must have a unique name; state is keyed by name)",
182 e.name
183 );
184 }
185 }
186 Ok(())
187 }
188
189 fn validate_source_connection(&self) -> crate::error::Result<()> {
192 if let Some(t) = &self.source.tuning
193 && t.batch_size.is_some()
194 && t.batch_size_memory_mb.is_some()
195 {
196 anyhow::bail!("tuning: batch_size and batch_size_memory_mb are mutually exclusive");
197 }
198
199 if !self.source.has_url_fields() && !self.source.has_structured_fields() {
200 anyhow::bail!(
205 "source: no connection method configured. Add one of:\n url_env: DATABASE_URL (URL from env var — recommended)\n url: 'postgresql://user:pass@host:5432/db' (inline — not recommended for committed configs)\n url_file: /etc/rivet/source.url (URL from file — rotation-friendly)\n host/user/database/... (structured fields under `source:`)"
206 );
207 }
208
209 if self.source.has_url_fields() {
210 let url_count = [
211 &self.source.url,
212 &self.source.url_env,
213 &self.source.url_file,
214 ]
215 .iter()
216 .filter(|u| u.is_some())
217 .count();
218 if url_count > 1 {
219 anyhow::bail!(
220 "source: specify exactly one of 'url', 'url_env', or 'url_file' (got {} set).\n Hint: pick one — `url_env` is recommended so credentials never enter the YAML.",
221 url_count
222 );
223 }
224 }
225
226 if self.source.has_url_fields() && self.source.has_structured_fields() {
227 anyhow::bail!(
228 "source: pick either URL-based config (url/url_env/url_file) OR structured fields (host/user/database/port/password_env), not both.\n Hint: remove whichever block you don't want; mixing the two is ambiguous."
229 );
230 }
231
232 if self.source.has_structured_fields() {
233 if self.source.host.is_none() {
234 anyhow::bail!(
235 "source: structured config is missing 'host'.\n Hint: add `host: localhost` (or your DB host) under `source:` in rivet.yaml.\n Or switch to URL-based config: `url_env: DATABASE_URL`."
236 );
237 }
238 if self.source.user.is_none() {
239 anyhow::bail!(
240 "source: structured config is missing 'user'.\n Hint: add `user: <username>` under `source:` in rivet.yaml."
241 );
242 }
243 if self.source.database.is_none() {
244 anyhow::bail!(
245 "source: structured config is missing 'database'.\n Hint: add `database: <dbname>` under `source:` in rivet.yaml."
246 );
247 }
248 if self.source.password.is_some() && self.source.password_env.is_some() {
249 anyhow::bail!(
250 "source: specify 'password' OR 'password_env', not both.\n Hint: prefer `password_env: DB_PASSWORD` so credentials never enter the YAML."
251 );
252 }
253 }
254 Ok(())
255 }
256
257 fn validate_export(&self, export: &ExportConfig) -> crate::error::Result<()> {
261 let merged =
262 crate::tuning::merge_tuning_config(self.source.tuning.as_ref(), export.tuning.as_ref());
263 if let Some(t) = merged
264 && t.batch_size.is_some()
265 && t.batch_size_memory_mb.is_some()
266 {
267 anyhow::bail!(
268 "export '{}': effective tuning has both batch_size and batch_size_memory_mb (mutually exclusive)",
269 export.name
270 );
271 }
272 if let Some(et) = &export.tuning
273 && et.batch_size.is_some()
274 && et.batch_size_memory_mb.is_some()
275 {
276 anyhow::bail!(
277 "export '{}': tuning.batch_size and tuning.batch_size_memory_mb are mutually exclusive",
278 export.name
279 );
280 }
281
282 let set_count = [
283 export.query.is_some(),
284 export.query_file.is_some(),
285 export.table.is_some(),
286 ]
287 .iter()
288 .filter(|b| **b)
289 .count();
290 if set_count == 0 {
291 anyhow::bail!(
292 "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
293 export.name
294 );
295 }
296 if set_count > 1 {
297 anyhow::bail!(
298 "export '{}': specify exactly one of 'query', 'query_file', or 'table' (got {} set)",
299 export.name,
300 set_count
301 );
302 }
303 if let Some(file) = &export.query_file {
309 let p = std::path::Path::new(file);
310 if p.is_absolute() {
311 anyhow::bail!(
312 "export '{}': query_file must be a relative path: '{}'",
313 export.name,
314 file
315 );
316 }
317 if p.components().any(|c| c == std::path::Component::ParentDir) {
318 anyhow::bail!(
319 "export '{}': query_file path must not contain '..': '{}'",
320 export.name,
321 file
322 );
323 }
324 }
325 if export.destination.destination_type == DestinationType::S3 {
326 let ak = export.destination.access_key_env.is_some();
327 let sk = export.destination.secret_key_env.is_some();
328 if ak != sk {
329 anyhow::bail!(
330 "export '{}': S3 requires both access_key_env and secret_key_env, or neither (use default AWS credential chain)",
331 export.name
332 );
333 }
334 }
335
336 if export.destination.destination_type == DestinationType::Gcs
337 && export.destination.allow_anonymous
338 && export.destination.credentials_file.is_some()
339 {
340 anyhow::bail!(
341 "export '{}': GCS allow_anonymous cannot be used together with credentials_file",
342 export.name
343 );
344 }
345
346 if export.destination.destination_type == DestinationType::Azure {
347 let has_name = export.destination.account_name.is_some();
348 let has_key = export.destination.account_key_env.is_some();
349 let has_sas = export.destination.sas_token_env.is_some();
350 if export.destination.allow_anonymous {
351 if has_name || has_key || has_sas {
352 anyhow::bail!(
353 "export '{}': Azure allow_anonymous cannot be combined with account_name/account_key_env/sas_token_env",
354 export.name
355 );
356 }
357 } else if has_key && has_sas {
358 anyhow::bail!(
359 "export '{}': Azure account_key_env and sas_token_env are mutually exclusive — pick one auth mode",
360 export.name
361 );
362 } else if !has_name {
363 anyhow::bail!(
364 "export '{}': Azure requires account_name (plus account_key_env or sas_token_env), or allow_anonymous: true for Azurite",
365 export.name
366 );
367 } else if !has_key && !has_sas {
368 anyhow::bail!(
369 "export '{}': Azure requires account_key_env or sas_token_env (or allow_anonymous: true for Azurite)",
370 export.name
371 );
372 }
373 }
374
375 if let Some(cred_path) = &export.destination.credentials_file
376 && !std::path::Path::new(cred_path).exists()
377 {
378 anyhow::bail!(
379 "export '{}': credentials_file '{}' does not exist",
380 export.name,
381 cred_path
382 );
383 }
384
385 if let Some(ref size_str) = export.max_file_size {
386 parse_file_size(size_str).map_err(|_| {
387 anyhow::anyhow!(
388 "export '{}': invalid max_file_size '{}'",
389 export.name,
390 size_str
391 )
392 })?;
393 }
394
395 if let Some(level) = export.compression_level {
396 match export.compression {
397 CompressionType::Zstd => {
398 if !(1..=22).contains(&level) {
399 anyhow::bail!(
400 "export '{}': zstd compression_level must be 1..22, got {}",
401 export.name,
402 level
403 );
404 }
405 }
406 CompressionType::Gzip => {
407 if level > 10 {
408 anyhow::bail!(
409 "export '{}': gzip compression_level must be 0..10, got {}",
410 export.name,
411 level
412 );
413 }
414 }
415 _ => {
416 anyhow::bail!(
417 "export '{}': compression_level is only supported for zstd and gzip",
418 export.name
419 );
420 }
421 }
422 }
423
424 match export.mode {
425 ExportMode::Incremental => {
426 if export.cursor_column.is_none() {
427 anyhow::bail!(
428 "export '{}': incremental mode requires cursor_column",
429 export.name
430 );
431 }
432 match export.incremental_cursor_mode {
433 IncrementalCursorMode::Coalesce => {
434 if export.cursor_fallback_column.is_none() {
435 anyhow::bail!(
436 "export '{}': incremental_cursor_mode: coalesce requires cursor_fallback_column",
437 export.name
438 );
439 }
440 }
441 IncrementalCursorMode::SingleColumn => {
442 if export.cursor_fallback_column.is_some() {
443 anyhow::bail!(
444 "export '{}': cursor_fallback_column is only valid with incremental_cursor_mode: coalesce",
445 export.name
446 );
447 }
448 }
449 }
450 }
451 ExportMode::Chunked => {
452 if export.chunk_column.is_none() && export.table.is_none() {
457 anyhow::bail!(
458 "export '{}': chunked mode requires chunk_column \
459 (or use `table:` shortcut on a Postgres source to auto-resolve from PK)",
460 export.name
461 );
462 }
463 if export.chunk_size == 0 {
468 anyhow::bail!(
469 "export '{}': chunked mode requires chunk_size >= 1 (got 0)",
470 export.name
471 );
472 }
473 if export.parallel == 0 {
476 anyhow::bail!(
477 "export '{}': chunked mode requires parallel >= 1 (got 0)",
478 export.name
479 );
480 }
481 if let Some(0) = export.chunk_count {
482 anyhow::bail!("export '{}': chunk_count must be >= 1", export.name);
483 }
484 if export.chunk_count.is_some() && export.chunk_dense {
485 anyhow::bail!(
486 "export '{}': chunk_count and chunk_dense are mutually exclusive",
487 export.name
488 );
489 }
490 if export.chunk_count.is_some() && export.chunk_by_days.is_some() {
491 anyhow::bail!(
492 "export '{}': chunk_count and chunk_by_days are mutually exclusive",
493 export.name
494 );
495 }
496 }
497 ExportMode::TimeWindow => {
498 if export.time_column.is_none() {
499 anyhow::bail!(
500 "export '{}': time_window mode requires time_column",
501 export.name
502 );
503 }
504 if export.days_window.is_none() {
505 anyhow::bail!(
506 "export '{}': time_window mode requires days_window",
507 export.name
508 );
509 }
510 }
511 ExportMode::Full => {}
512 }
513
514 if export.chunk_dense && export.mode != ExportMode::Chunked {
515 anyhow::bail!(
516 "export '{}': chunk_dense is only valid with mode: chunked",
517 export.name
518 );
519 }
520
521 if let Some(days) = export.chunk_by_days {
522 if export.mode != ExportMode::Chunked {
523 anyhow::bail!(
524 "export '{}': chunk_by_days requires mode: chunked",
525 export.name
526 );
527 }
528 if export.chunk_dense {
529 anyhow::bail!(
530 "export '{}': chunk_by_days cannot be combined with chunk_dense",
531 export.name
532 );
533 }
534 if days == 0 {
535 anyhow::bail!("export '{}': chunk_by_days must be at least 1", export.name);
536 }
537 }
538 Ok(())
539 }
540}
541
542#[cfg(test)]
543mod tests;