1use serde::{Deserialize, Serialize};
31use std::path::{Path, PathBuf};
32
33#[derive(Debug, Clone, Serialize, Deserialize, Default)]
35#[serde(default)]
36pub struct Config {
37 pub query_file: Option<PathBuf>,
39
40 pub server: ServerConfig,
42
43 pub simulation: SimulationConfig,
45
46 pub kafka: Option<KafkaConfig>,
48
49 pub http_webhook: Option<HttpWebhookConfig>,
51
52 pub logging: LoggingConfig,
54
55 pub processing: ProcessingConfig,
57
58 pub tls: Option<TlsConfig>,
60
61 pub auth: Option<AuthConfig>,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67#[serde(default)]
68pub struct ServerConfig {
69 pub port: u16,
71
72 pub bind: String,
74
75 pub metrics_enabled: bool,
77
78 pub metrics_port: u16,
80
81 pub workdir: Option<PathBuf>,
83}
84
85impl Default for ServerConfig {
86 fn default() -> Self {
87 Self {
88 port: 9000,
89 bind: "127.0.0.1".to_string(),
90 metrics_enabled: false,
91 metrics_port: 9090,
92 workdir: None,
93 }
94 }
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, Default)]
99#[serde(default)]
100pub struct SimulationConfig {
101 pub timed: bool,
103
104 pub streaming: bool,
106
107 pub verbose: bool,
109
110 pub events_file: Option<PathBuf>,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
116#[serde(default)]
117pub struct KafkaConfig {
118 pub bootstrap_servers: String,
120
121 pub consumer_group: Option<String>,
123
124 pub input_topic: Option<String>,
126
127 pub output_topic: Option<String>,
129
130 pub auto_commit: bool,
132
133 pub auto_offset_reset: String,
135}
136
137impl Default for KafkaConfig {
138 fn default() -> Self {
139 Self {
140 bootstrap_servers: "localhost:9092".to_string(),
141 consumer_group: None,
142 input_topic: None,
143 output_topic: None,
144 auto_commit: true,
145 auto_offset_reset: "latest".to_string(),
146 }
147 }
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize)]
152#[serde(default)]
153pub struct HttpWebhookConfig {
154 pub enabled: bool,
156
157 pub port: u16,
159
160 pub bind: String,
162
163 pub api_key: Option<String>,
165
166 pub rate_limit: u32,
168
169 pub max_batch_size: usize,
171}
172
173impl Default for HttpWebhookConfig {
174 fn default() -> Self {
175 Self {
176 enabled: false,
177 port: 8080,
178 bind: "0.0.0.0".to_string(),
179 api_key: None,
180 rate_limit: 0,
181 max_batch_size: 1000,
182 }
183 }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
188#[serde(default)]
189pub struct LoggingConfig {
190 pub level: String,
192
193 pub format: String,
195
196 pub timestamps: bool,
198}
199
200impl Default for LoggingConfig {
201 fn default() -> Self {
202 Self {
203 level: "info".to_string(),
204 format: "text".to_string(),
205 timestamps: true,
206 }
207 }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize, Default)]
212#[serde(default)]
213pub struct ProcessingConfig {
214 pub workers: Option<usize>,
216
217 pub partition_by: Option<String>,
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct TlsConfig {
224 pub cert_file: PathBuf,
226
227 pub key_file: PathBuf,
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct AuthConfig {
234 pub api_key: Option<String>,
236}
237
238impl Config {
239 pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
241 let path = path.as_ref();
242 let content = std::fs::read_to_string(path)
243 .map_err(|e| ConfigError::IoError(path.to_path_buf(), e.to_string()))?;
244
245 let extension = path
246 .extension()
247 .and_then(|e| e.to_str())
248 .unwrap_or("")
249 .to_lowercase();
250
251 match extension.as_str() {
252 "yaml" | "yml" => Self::from_yaml(&content),
253 "toml" => Self::from_toml(&content),
254 _ => {
255 Self::from_yaml(&content).or_else(|_| Self::from_toml(&content))
257 }
258 }
259 }
260
261 pub fn from_yaml(content: &str) -> Result<Self, ConfigError> {
263 serde_yaml::from_str(content).map_err(|e| ConfigError::ParseError(e.to_string()))
264 }
265
266 pub fn from_toml(content: &str) -> Result<Self, ConfigError> {
268 toml::from_str(content).map_err(|e| ConfigError::ParseError(e.to_string()))
269 }
270
271 pub fn merge(&mut self, other: Self) {
273 if other.query_file.is_some() {
274 self.query_file = other.query_file;
275 }
276
277 if other.server.port != ServerConfig::default().port {
279 self.server.port = other.server.port;
280 }
281 if other.server.bind != ServerConfig::default().bind {
282 self.server.bind = other.server.bind;
283 }
284 if other.server.metrics_enabled {
285 self.server.metrics_enabled = true;
286 }
287 if other.server.metrics_port != ServerConfig::default().metrics_port {
288 self.server.metrics_port = other.server.metrics_port;
289 }
290 if other.server.workdir.is_some() {
291 self.server.workdir = other.server.workdir;
292 }
293
294 if other.processing.workers.is_some() {
296 self.processing.workers = other.processing.workers;
297 }
298 if other.processing.partition_by.is_some() {
299 self.processing.partition_by = other.processing.partition_by;
300 }
301
302 if other.kafka.is_some() {
304 self.kafka = other.kafka;
305 }
306 if other.http_webhook.is_some() {
307 self.http_webhook = other.http_webhook;
308 }
309 if other.tls.is_some() {
310 self.tls = other.tls;
311 }
312 if other.auth.is_some() {
313 self.auth = other.auth;
314 }
315 }
316
317 pub fn example() -> Self {
319 Self {
320 query_file: Some(PathBuf::from("/app/queries/queries.vql")),
321 server: ServerConfig {
322 port: 9000,
323 bind: "0.0.0.0".to_string(),
324 metrics_enabled: true,
325 metrics_port: 9090,
326 workdir: Some(PathBuf::from("/app")),
327 },
328 simulation: SimulationConfig::default(),
329 kafka: Some(KafkaConfig {
330 bootstrap_servers: "kafka:9092".to_string(),
331 consumer_group: Some("varpulis-consumer".to_string()),
332 input_topic: Some("events".to_string()),
333 output_topic: Some("alerts".to_string()),
334 ..Default::default()
335 }),
336 http_webhook: Some(HttpWebhookConfig {
337 enabled: true,
338 port: 8080,
339 bind: "0.0.0.0".to_string(),
340 api_key: Some("your-api-key-here".to_string()),
341 rate_limit: 1000,
342 max_batch_size: 100,
343 }),
344 logging: LoggingConfig {
345 level: "info".to_string(),
346 format: "json".to_string(),
347 timestamps: true,
348 },
349 processing: ProcessingConfig {
350 workers: Some(4),
351 partition_by: Some("source_id".to_string()),
352 },
353 tls: None,
354 auth: Some(AuthConfig {
355 api_key: Some("your-websocket-api-key".to_string()),
356 }),
357 }
358 }
359
360 pub fn example_yaml() -> String {
362 serde_yaml::to_string(&Self::example()).unwrap_or_default()
363 }
364
365 pub fn example_toml() -> String {
367 toml::to_string_pretty(&Self::example()).unwrap_or_default()
368 }
369}
370
371#[derive(Debug, thiserror::Error)]
373pub enum ConfigError {
374 #[error("Failed to read config file {0}: {1}")]
375 IoError(PathBuf, String),
376
377 #[error("Failed to parse config: {0}")]
378 ParseError(String),
379}
380
381#[derive(Debug, Clone, Serialize, Deserialize, Default)]
401#[serde(default)]
402pub struct ProjectConfig {
403 pub remote: RemoteConfig,
405
406 pub deploy: DeployConfig,
408}
409
410#[derive(Debug, Clone, Serialize, Deserialize, Default)]
412#[serde(default)]
413pub struct RemoteConfig {
414 pub url: Option<String>,
416
417 pub api_key: Option<String>,
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize, Default)]
423#[serde(default)]
424pub struct DeployConfig {
425 pub name: Option<String>,
427}
428
429impl ProjectConfig {
430 pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
432 let path = path.as_ref();
433 let content = std::fs::read_to_string(path)
434 .map_err(|e| ConfigError::IoError(path.to_path_buf(), e.to_string()))?;
435 toml::from_str(&content).map_err(|e| ConfigError::ParseError(e.to_string()))
436 }
437
438 pub fn discover(start_dir: &Path) -> Option<Self> {
441 let mut dir = start_dir;
442 loop {
443 let candidate = dir.join(".varpulis.toml");
444 if candidate.is_file() {
445 return Self::load(&candidate).ok();
446 }
447 dir = dir.parent()?;
448 }
449 }
450
451 pub fn discover_cwd() -> Option<Self> {
453 let cwd = std::env::current_dir().ok()?;
454 Self::discover(&cwd)
455 }
456
457 pub fn resolve_url(&self, cli_flag: Option<&str>) -> Option<String> {
459 cli_flag
460 .map(|s| s.to_string())
461 .or_else(|| self.remote.url.clone())
462 }
463
464 pub fn resolve_api_key(&self, cli_flag: Option<&str>) -> Option<String> {
466 cli_flag
467 .map(|s| s.to_string())
468 .or_else(|| self.remote.api_key.clone())
469 }
470
471 pub fn example() -> String {
473 r#"# Varpulis project configuration
474# Place this file in your project root as .varpulis.toml
475
476[remote]
477url = "http://localhost:9000"
478api_key = "your-api-key-here"
479
480[deploy]
481name = "my-pipeline"
482"#
483 .to_string()
484 }
485}
486
487#[cfg(test)]
488mod tests {
489 use super::*;
490
491 #[test]
492 fn test_default_config() {
493 let config = Config::default();
494 assert_eq!(config.server.port, 9000);
495 assert_eq!(config.server.bind, "127.0.0.1");
496 }
497
498 #[test]
499 fn test_yaml_parsing() {
500 let yaml = r#"
501query_file: /app/queries.vql
502server:
503 port: 8080
504 bind: "0.0.0.0"
505 metrics_enabled: true
506processing:
507 workers: 8
508"#;
509 let config = Config::from_yaml(yaml).unwrap();
510 assert_eq!(config.query_file, Some(PathBuf::from("/app/queries.vql")));
511 assert_eq!(config.server.port, 8080);
512 assert_eq!(config.server.bind, "0.0.0.0");
513 assert!(config.server.metrics_enabled);
514 assert_eq!(config.processing.workers, Some(8));
515 }
516
517 #[test]
518 fn test_toml_parsing() {
519 let toml = r#"
520query_file = "/app/queries.vql"
521
522[server]
523port = 8080
524bind = "0.0.0.0"
525metrics_enabled = true
526
527[processing]
528workers = 8
529"#;
530 let config = Config::from_toml(toml).unwrap();
531 assert_eq!(config.query_file, Some(PathBuf::from("/app/queries.vql")));
532 assert_eq!(config.server.port, 8080);
533 assert_eq!(config.server.bind, "0.0.0.0");
534 assert!(config.server.metrics_enabled);
535 assert_eq!(config.processing.workers, Some(8));
536 }
537
538 #[test]
539 fn test_config_merge() {
540 let mut base = Config::default();
541 let override_config = Config {
542 server: ServerConfig {
543 port: 8888,
544 ..Default::default()
545 },
546 ..Default::default()
547 };
548
549 base.merge(override_config);
550 assert_eq!(base.server.port, 8888);
551 }
552
553 #[test]
555 fn test_project_config_parse() {
556 let toml = r#"
557[remote]
558url = "http://prod.example.com:9000"
559api_key = "secret-123"
560
561[deploy]
562name = "fraud-detection"
563"#;
564 let config: ProjectConfig = toml::from_str(toml).unwrap();
565 assert_eq!(
566 config.remote.url.as_deref(),
567 Some("http://prod.example.com:9000")
568 );
569 assert_eq!(config.remote.api_key.as_deref(), Some("secret-123"));
570 assert_eq!(config.deploy.name.as_deref(), Some("fraud-detection"));
571 }
572
573 #[test]
574 fn test_project_config_partial() {
575 let toml = r#"
576[remote]
577url = "http://localhost:9000"
578"#;
579 let config: ProjectConfig = toml::from_str(toml).unwrap();
580 assert_eq!(config.remote.url.as_deref(), Some("http://localhost:9000"));
581 assert!(config.remote.api_key.is_none());
582 assert!(config.deploy.name.is_none());
583 }
584
585 #[test]
586 fn test_project_config_empty() {
587 let config: ProjectConfig = toml::from_str("").unwrap();
588 assert!(config.remote.url.is_none());
589 assert!(config.remote.api_key.is_none());
590 }
591
592 #[test]
593 fn test_project_config_resolve_url_flag_wins() {
594 let config = ProjectConfig {
595 remote: RemoteConfig {
596 url: Some("http://config-server:9000".to_string()),
597 ..Default::default()
598 },
599 ..Default::default()
600 };
601 assert_eq!(
602 config.resolve_url(Some("http://flag-server:9000")),
603 Some("http://flag-server:9000".to_string())
604 );
605 }
606
607 #[test]
608 fn test_project_config_resolve_url_config_fallback() {
609 let config = ProjectConfig {
610 remote: RemoteConfig {
611 url: Some("http://config-server:9000".to_string()),
612 ..Default::default()
613 },
614 ..Default::default()
615 };
616 assert_eq!(
617 config.resolve_url(None),
618 Some("http://config-server:9000".to_string())
619 );
620 }
621
622 #[test]
623 fn test_project_config_resolve_api_key() {
624 let config = ProjectConfig {
625 remote: RemoteConfig {
626 api_key: Some("config-key".to_string()),
627 ..Default::default()
628 },
629 ..Default::default()
630 };
631 assert_eq!(
633 config.resolve_api_key(Some("flag-key")),
634 Some("flag-key".to_string())
635 );
636 assert_eq!(config.resolve_api_key(None), Some("config-key".to_string()));
638 }
639
640 #[test]
641 fn test_project_config_discover_file() {
642 let dir = tempfile::tempdir().unwrap();
643 let config_path = dir.path().join(".varpulis.toml");
644 std::fs::write(
645 &config_path,
646 r#"
647[remote]
648url = "http://test:9000"
649api_key = "test-key"
650"#,
651 )
652 .unwrap();
653
654 let config = ProjectConfig::discover(dir.path()).unwrap();
655 assert_eq!(config.remote.url.as_deref(), Some("http://test:9000"));
656 assert_eq!(config.remote.api_key.as_deref(), Some("test-key"));
657 }
658
659 #[test]
660 fn test_project_config_discover_parent() {
661 let dir = tempfile::tempdir().unwrap();
662 let config_path = dir.path().join(".varpulis.toml");
663 std::fs::write(
664 &config_path,
665 r#"
666[remote]
667url = "http://parent:9000"
668"#,
669 )
670 .unwrap();
671
672 let subdir = dir.path().join("sub").join("dir");
674 std::fs::create_dir_all(&subdir).unwrap();
675
676 let config = ProjectConfig::discover(&subdir).unwrap();
677 assert_eq!(config.remote.url.as_deref(), Some("http://parent:9000"));
678 }
679
680 #[test]
681 fn test_project_config_discover_not_found() {
682 let dir = tempfile::tempdir().unwrap();
683 let config = ProjectConfig::discover(dir.path());
685 assert!(config.is_none());
686 }
687
688 #[test]
689 fn test_project_config_example() {
690 let example = ProjectConfig::example();
691 assert!(example.contains("[remote]"));
692 assert!(example.contains("url ="));
693 assert!(example.contains("api_key ="));
694 }
695}