1pub mod cursor;
2mod destination;
3mod export;
4mod format;
5mod lints;
6mod notifications;
7pub mod resolve;
8pub mod schema;
9mod source;
10
11pub use cursor::IncrementalCursorMode;
12pub use destination::*;
13pub use export::*;
14pub use format::*;
15pub use notifications::*;
16#[allow(unused_imports)]
17pub(crate) use resolve::resolve_env_vars;
18pub use resolve::{parse_file_size, resolve_vars};
19pub use schema::generate_config_schema_pretty;
20pub use source::*;
21
22use schemars::JsonSchema;
23use serde::Deserialize;
24
25#[derive(Debug, Deserialize, JsonSchema, Clone)]
31#[serde(deny_unknown_fields)]
32pub struct Config {
33 pub source: SourceConfig,
34 pub exports: Vec<ExportConfig>,
35 #[serde(default)]
36 pub notifications: Option<NotificationsConfig>,
37 #[serde(default)]
38 pub parallel_exports: bool,
39 #[serde(default)]
40 pub parallel_export_processes: bool,
41}
42
43impl Config {
44 pub fn load(path: &str) -> crate::error::Result<Self> {
45 Self::load_with_params(path, None)
46 }
47
48 pub fn load_with_params(
49 path: &str,
50 params: Option<&std::collections::HashMap<String, String>>,
51 ) -> crate::error::Result<Self> {
52 let contents = std::fs::read_to_string(path).map_err(|e| {
56 if e.kind() == std::io::ErrorKind::NotFound {
57 anyhow::anyhow!(
58 "config file '{}' not found.\n Hint: check the path, or run `rivet init` to generate one.",
59 path
60 )
61 } else {
62 anyhow::anyhow!("cannot read config file '{}': {}", path, e)
63 }
64 })?;
65 resolve::warn_unused_params(&contents, params);
70 let resolved = resolve_vars(&contents, params)?;
71 Self::from_yaml(&resolved).map_err(|e| anyhow::anyhow!("config file '{}': {:#}", path, e))
75 }
76
77 pub fn from_yaml(yaml: &str) -> crate::error::Result<Self> {
78 Self::check_misplaced_tuning_fields(yaml)?;
79 let config: Config = serde_yaml_ng::from_str(yaml).map_err(lints::enhance_parse_error)?;
80 config.validate()?;
81 Ok(config)
82 }
83
84 fn check_misplaced_tuning_fields(yaml: &str) -> crate::error::Result<()> {
89 const TUNING_FIELDS: &[&str] = &[
90 "batch_size",
91 "batch_size_memory_mb",
92 "throttle_ms",
93 "statement_timeout_s",
94 "max_retries",
95 "retry_backoff_ms",
96 "lock_timeout_s",
97 "memory_threshold_mb",
98 "profile",
99 ];
100
101 let root: serde_yaml_ng::Value = serde_yaml_ng::from_str(yaml)?;
102
103 if let Some(source) = root.get("source") {
104 let misplaced: Vec<&str> = TUNING_FIELDS
105 .iter()
106 .copied()
107 .filter(|&f| source.get(f).is_some())
108 .collect();
109 if !misplaced.is_empty() {
110 anyhow::bail!(
111 "source: field(s) [{}] belong under 'source.tuning:', not directly under 'source:'. \
112 Example:\n source:\n tuning:\n {}: <value>",
113 misplaced.join(", "),
114 misplaced[0],
115 );
116 }
117 }
118
119 if let Some(exports) = root.get("exports").and_then(|e| e.as_sequence()) {
120 for (i, export) in exports.iter().enumerate() {
121 let name = export
122 .get("name")
123 .and_then(|n| n.as_str())
124 .unwrap_or("<unnamed>");
125 let misplaced: Vec<&str> = TUNING_FIELDS
126 .iter()
127 .copied()
128 .filter(|&f| export.get(f).is_some())
129 .collect();
130 if !misplaced.is_empty() {
131 anyhow::bail!(
132 "export '{}' (index {}): field(s) [{}] belong under 'exports[].tuning:', \
133 not directly in the export. Example:\n exports:\n - name: {}\n tuning:\n {}: <value>",
134 name,
135 i,
136 misplaced.join(", "),
137 name,
138 misplaced[0],
139 );
140 }
141 }
142 }
143
144 Ok(())
145 }
146
147 fn validate(&self) -> crate::error::Result<()> {
148 if self.exports.is_empty() {
153 anyhow::bail!("exports: at least one export must be defined (got empty list)");
154 }
155
156 {
161 let mut seen: std::collections::HashSet<&str> =
162 std::collections::HashSet::with_capacity(self.exports.len());
163 for e in &self.exports {
164 if !seen.insert(e.name.as_str()) {
165 anyhow::bail!(
166 "exports: duplicate export name '{}' (each export must have a unique name; state is keyed by name)",
167 e.name
168 );
169 }
170 }
171 }
172
173 if let Some(t) = &self.source.tuning
174 && t.batch_size.is_some()
175 && t.batch_size_memory_mb.is_some()
176 {
177 anyhow::bail!("tuning: batch_size and batch_size_memory_mb are mutually exclusive");
178 }
179
180 for export in &self.exports {
181 let merged = crate::tuning::merge_tuning_config(
182 self.source.tuning.as_ref(),
183 export.tuning.as_ref(),
184 );
185 if let Some(t) = merged
186 && t.batch_size.is_some()
187 && t.batch_size_memory_mb.is_some()
188 {
189 anyhow::bail!(
190 "export '{}': effective tuning has both batch_size and batch_size_memory_mb (mutually exclusive)",
191 export.name
192 );
193 }
194 if let Some(et) = &export.tuning
195 && et.batch_size.is_some()
196 && et.batch_size_memory_mb.is_some()
197 {
198 anyhow::bail!(
199 "export '{}': tuning.batch_size and tuning.batch_size_memory_mb are mutually exclusive",
200 export.name
201 );
202 }
203 }
204
205 if !self.source.has_url_fields() && !self.source.has_structured_fields() {
206 anyhow::bail!(
211 "source: no connection method configured. Add one of:\n url_env: DATABASE_URL (URL from env var — recommended)\n url: 'postgresql://user:pass@host:5432/db' (inline — not recommended for committed configs)\n url_file: /etc/rivet/source.url (URL from file — rotation-friendly)\n host/user/database/... (structured fields under `source:`)"
212 );
213 }
214
215 if self.source.has_url_fields() {
216 let url_count = [
217 &self.source.url,
218 &self.source.url_env,
219 &self.source.url_file,
220 ]
221 .iter()
222 .filter(|u| u.is_some())
223 .count();
224 if url_count > 1 {
225 anyhow::bail!(
226 "source: specify exactly one of 'url', 'url_env', or 'url_file' (got {} set).\n Hint: pick one — `url_env` is recommended so credentials never enter the YAML.",
227 url_count
228 );
229 }
230 }
231
232 if self.source.has_url_fields() && self.source.has_structured_fields() {
233 anyhow::bail!(
234 "source: pick either URL-based config (url/url_env/url_file) OR structured fields (host/user/database/port/password_env), not both.\n Hint: remove whichever block you don't want; mixing the two is ambiguous."
235 );
236 }
237
238 if self.source.has_structured_fields() {
239 if self.source.host.is_none() {
240 anyhow::bail!(
241 "source: structured config is missing 'host'.\n Hint: add `host: localhost` (or your DB host) under `source:` in rivet.yaml.\n Or switch to URL-based config: `url_env: DATABASE_URL`."
242 );
243 }
244 if self.source.user.is_none() {
245 anyhow::bail!(
246 "source: structured config is missing 'user'.\n Hint: add `user: <username>` under `source:` in rivet.yaml."
247 );
248 }
249 if self.source.database.is_none() {
250 anyhow::bail!(
251 "source: structured config is missing 'database'.\n Hint: add `database: <dbname>` under `source:` in rivet.yaml."
252 );
253 }
254 if self.source.password.is_some() && self.source.password_env.is_some() {
255 anyhow::bail!(
256 "source: specify 'password' OR 'password_env', not both.\n Hint: prefer `password_env: DB_PASSWORD` so credentials never enter the YAML."
257 );
258 }
259 }
260
261 for export in &self.exports {
262 let set_count = [
263 export.query.is_some(),
264 export.query_file.is_some(),
265 export.table.is_some(),
266 ]
267 .iter()
268 .filter(|b| **b)
269 .count();
270 if set_count == 0 {
271 anyhow::bail!(
272 "export '{}': must specify exactly one of 'query', 'query_file', or 'table'",
273 export.name
274 );
275 }
276 if set_count > 1 {
277 anyhow::bail!(
278 "export '{}': specify exactly one of 'query', 'query_file', or 'table' (got {} set)",
279 export.name,
280 set_count
281 );
282 }
283 if let Some(file) = &export.query_file {
289 let p = std::path::Path::new(file);
290 if p.is_absolute() {
291 anyhow::bail!(
292 "export '{}': query_file must be a relative path: '{}'",
293 export.name,
294 file
295 );
296 }
297 if p.components().any(|c| c == std::path::Component::ParentDir) {
298 anyhow::bail!(
299 "export '{}': query_file path must not contain '..': '{}'",
300 export.name,
301 file
302 );
303 }
304 }
305 if export.destination.destination_type == DestinationType::S3 {
306 let ak = export.destination.access_key_env.is_some();
307 let sk = export.destination.secret_key_env.is_some();
308 if ak != sk {
309 anyhow::bail!(
310 "export '{}': S3 requires both access_key_env and secret_key_env, or neither (use default AWS credential chain)",
311 export.name
312 );
313 }
314 }
315
316 if export.destination.destination_type == DestinationType::Gcs
317 && export.destination.allow_anonymous
318 && export.destination.credentials_file.is_some()
319 {
320 anyhow::bail!(
321 "export '{}': GCS allow_anonymous cannot be used together with credentials_file",
322 export.name
323 );
324 }
325
326 if export.destination.destination_type == DestinationType::Azure {
327 let has_name = export.destination.account_name.is_some();
328 let has_key = export.destination.account_key_env.is_some();
329 let has_sas = export.destination.sas_token_env.is_some();
330 if export.destination.allow_anonymous {
331 if has_name || has_key || has_sas {
332 anyhow::bail!(
333 "export '{}': Azure allow_anonymous cannot be combined with account_name/account_key_env/sas_token_env",
334 export.name
335 );
336 }
337 } else if has_key && has_sas {
338 anyhow::bail!(
339 "export '{}': Azure account_key_env and sas_token_env are mutually exclusive — pick one auth mode",
340 export.name
341 );
342 } else if !has_name {
343 anyhow::bail!(
344 "export '{}': Azure requires account_name (plus account_key_env or sas_token_env), or allow_anonymous: true for Azurite",
345 export.name
346 );
347 } else if !has_key && !has_sas {
348 anyhow::bail!(
349 "export '{}': Azure requires account_key_env or sas_token_env (or allow_anonymous: true for Azurite)",
350 export.name
351 );
352 }
353 }
354
355 if let Some(cred_path) = &export.destination.credentials_file
356 && !std::path::Path::new(cred_path).exists()
357 {
358 anyhow::bail!(
359 "export '{}': credentials_file '{}' does not exist",
360 export.name,
361 cred_path
362 );
363 }
364
365 if let Some(ref size_str) = export.max_file_size {
366 parse_file_size(size_str).map_err(|_| {
367 anyhow::anyhow!(
368 "export '{}': invalid max_file_size '{}'",
369 export.name,
370 size_str
371 )
372 })?;
373 }
374
375 if let Some(level) = export.compression_level {
376 match export.compression {
377 CompressionType::Zstd => {
378 if !(1..=22).contains(&level) {
379 anyhow::bail!(
380 "export '{}': zstd compression_level must be 1..22, got {}",
381 export.name,
382 level
383 );
384 }
385 }
386 CompressionType::Gzip => {
387 if level > 10 {
388 anyhow::bail!(
389 "export '{}': gzip compression_level must be 0..10, got {}",
390 export.name,
391 level
392 );
393 }
394 }
395 _ => {
396 anyhow::bail!(
397 "export '{}': compression_level is only supported for zstd and gzip",
398 export.name
399 );
400 }
401 }
402 }
403
404 match export.mode {
405 ExportMode::Incremental => {
406 if export.cursor_column.is_none() {
407 anyhow::bail!(
408 "export '{}': incremental mode requires cursor_column",
409 export.name
410 );
411 }
412 match export.incremental_cursor_mode {
413 IncrementalCursorMode::Coalesce => {
414 if export.cursor_fallback_column.is_none() {
415 anyhow::bail!(
416 "export '{}': incremental_cursor_mode: coalesce requires cursor_fallback_column",
417 export.name
418 );
419 }
420 }
421 IncrementalCursorMode::SingleColumn => {
422 if export.cursor_fallback_column.is_some() {
423 anyhow::bail!(
424 "export '{}': cursor_fallback_column is only valid with incremental_cursor_mode: coalesce",
425 export.name
426 );
427 }
428 }
429 }
430 }
431 ExportMode::Chunked => {
432 if export.chunk_column.is_none() && export.table.is_none() {
437 anyhow::bail!(
438 "export '{}': chunked mode requires chunk_column \
439 (or use `table:` shortcut on a Postgres source to auto-resolve from PK)",
440 export.name
441 );
442 }
443 if export.chunk_size == 0 {
448 anyhow::bail!(
449 "export '{}': chunked mode requires chunk_size >= 1 (got 0)",
450 export.name
451 );
452 }
453 if export.parallel == 0 {
456 anyhow::bail!(
457 "export '{}': chunked mode requires parallel >= 1 (got 0)",
458 export.name
459 );
460 }
461 if let Some(0) = export.chunk_count {
462 anyhow::bail!("export '{}': chunk_count must be >= 1", export.name);
463 }
464 if export.chunk_count.is_some() && export.chunk_dense {
465 anyhow::bail!(
466 "export '{}': chunk_count and chunk_dense are mutually exclusive",
467 export.name
468 );
469 }
470 if export.chunk_count.is_some() && export.chunk_by_days.is_some() {
471 anyhow::bail!(
472 "export '{}': chunk_count and chunk_by_days are mutually exclusive",
473 export.name
474 );
475 }
476 }
477 ExportMode::TimeWindow => {
478 if export.time_column.is_none() {
479 anyhow::bail!(
480 "export '{}': time_window mode requires time_column",
481 export.name
482 );
483 }
484 if export.days_window.is_none() {
485 anyhow::bail!(
486 "export '{}': time_window mode requires days_window",
487 export.name
488 );
489 }
490 }
491 ExportMode::Full => {}
492 }
493
494 if export.chunk_dense && export.mode != ExportMode::Chunked {
495 anyhow::bail!(
496 "export '{}': chunk_dense is only valid with mode: chunked",
497 export.name
498 );
499 }
500
501 if let Some(days) = export.chunk_by_days {
502 if export.mode != ExportMode::Chunked {
503 anyhow::bail!(
504 "export '{}': chunk_by_days requires mode: chunked",
505 export.name
506 );
507 }
508 if export.chunk_dense {
509 anyhow::bail!(
510 "export '{}': chunk_by_days cannot be combined with chunk_dense",
511 export.name
512 );
513 }
514 if days == 0 {
515 anyhow::bail!("export '{}': chunk_by_days must be at least 1", export.name);
516 }
517 }
518 }
519 Ok(())
520 }
521}
522
523#[cfg(test)]
524mod tests;