1use std::path::{Path, PathBuf};
31
32use serde::{Deserialize, Serialize};
33
34#[derive(Debug, Clone, Serialize, Deserialize, Default)]
36#[serde(default)]
37pub struct Config {
38 pub query_file: Option<PathBuf>,
40
41 pub server: ServerConfig,
43
44 pub simulation: SimulationConfig,
46
47 pub kafka: Option<KafkaConfig>,
49
50 pub http_webhook: Option<HttpWebhookConfig>,
52
53 pub logging: LoggingConfig,
55
56 pub processing: ProcessingConfig,
58
59 pub tls: Option<TlsConfig>,
61
62 pub auth: Option<AuthConfig>,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68#[serde(default)]
69pub struct ServerConfig {
70 pub port: u16,
72
73 pub bind: String,
75
76 pub metrics_enabled: bool,
78
79 pub metrics_port: u16,
81
82 pub workdir: Option<PathBuf>,
84}
85
86impl Default for ServerConfig {
87 fn default() -> Self {
88 Self {
89 port: 9000,
90 bind: "127.0.0.1".to_string(),
91 metrics_enabled: false,
92 metrics_port: 9090,
93 workdir: None,
94 }
95 }
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize, Default)]
100#[serde(default)]
101pub struct SimulationConfig {
102 pub timed: bool,
104
105 pub streaming: bool,
107
108 pub verbose: bool,
110
111 pub events_file: Option<PathBuf>,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(default)]
118pub struct KafkaConfig {
119 pub bootstrap_servers: String,
121
122 pub consumer_group: Option<String>,
124
125 pub input_topic: Option<String>,
127
128 pub output_topic: Option<String>,
130
131 pub auto_commit: bool,
133
134 pub auto_offset_reset: String,
136}
137
138impl Default for KafkaConfig {
139 fn default() -> Self {
140 Self {
141 bootstrap_servers: "localhost:9092".to_string(),
142 consumer_group: None,
143 input_topic: None,
144 output_topic: None,
145 auto_commit: true,
146 auto_offset_reset: "latest".to_string(),
147 }
148 }
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
153#[serde(default)]
154pub struct HttpWebhookConfig {
155 pub enabled: bool,
157
158 pub port: u16,
160
161 pub bind: String,
163
164 pub api_key: Option<String>,
166
167 pub rate_limit: u32,
169
170 pub max_batch_size: usize,
172}
173
174impl Default for HttpWebhookConfig {
175 fn default() -> Self {
176 Self {
177 enabled: false,
178 port: 8080,
179 bind: "0.0.0.0".to_string(),
180 api_key: None,
181 rate_limit: 0,
182 max_batch_size: 1000,
183 }
184 }
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
189#[serde(default)]
190pub struct LoggingConfig {
191 pub level: String,
193
194 pub format: String,
196
197 pub timestamps: bool,
199}
200
201impl Default for LoggingConfig {
202 fn default() -> Self {
203 Self {
204 level: "info".to_string(),
205 format: "text".to_string(),
206 timestamps: true,
207 }
208 }
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize, Default)]
213#[serde(default)]
214pub struct ProcessingConfig {
215 pub workers: Option<usize>,
217
218 pub partition_by: Option<String>,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct TlsConfig {
225 pub cert_file: PathBuf,
227
228 pub key_file: PathBuf,
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct AuthConfig {
235 pub api_key: Option<String>,
237}
238
239impl Config {
240 pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
242 let path = path.as_ref();
243 let content = std::fs::read_to_string(path)
244 .map_err(|e| ConfigError::IoError(path.to_path_buf(), e.to_string()))?;
245
246 let extension = path
247 .extension()
248 .and_then(|e| e.to_str())
249 .unwrap_or("")
250 .to_lowercase();
251
252 match extension.as_str() {
253 "yaml" | "yml" => Self::from_yaml(&content),
254 "toml" => Self::from_toml(&content),
255 _ => {
256 Self::from_yaml(&content).or_else(|_| Self::from_toml(&content))
258 }
259 }
260 }
261
262 pub fn from_yaml(content: &str) -> Result<Self, ConfigError> {
264 serde_yaml::from_str(content).map_err(|e| ConfigError::ParseError(e.to_string()))
265 }
266
267 pub fn from_toml(content: &str) -> Result<Self, ConfigError> {
269 toml::from_str(content).map_err(|e| ConfigError::ParseError(e.to_string()))
270 }
271
272 pub fn merge(&mut self, other: Self) {
274 if other.query_file.is_some() {
275 self.query_file = other.query_file;
276 }
277
278 if other.server.port != ServerConfig::default().port {
280 self.server.port = other.server.port;
281 }
282 if other.server.bind != ServerConfig::default().bind {
283 self.server.bind = other.server.bind;
284 }
285 if other.server.metrics_enabled {
286 self.server.metrics_enabled = true;
287 }
288 if other.server.metrics_port != ServerConfig::default().metrics_port {
289 self.server.metrics_port = other.server.metrics_port;
290 }
291 if other.server.workdir.is_some() {
292 self.server.workdir = other.server.workdir;
293 }
294
295 if other.processing.workers.is_some() {
297 self.processing.workers = other.processing.workers;
298 }
299 if other.processing.partition_by.is_some() {
300 self.processing.partition_by = other.processing.partition_by;
301 }
302
303 if other.kafka.is_some() {
305 self.kafka = other.kafka;
306 }
307 if other.http_webhook.is_some() {
308 self.http_webhook = other.http_webhook;
309 }
310 if other.tls.is_some() {
311 self.tls = other.tls;
312 }
313 if other.auth.is_some() {
314 self.auth = other.auth;
315 }
316 }
317
318 pub fn example() -> Self {
320 Self {
321 query_file: Some(PathBuf::from("/app/queries/queries.vql")),
322 server: ServerConfig {
323 port: 9000,
324 bind: "0.0.0.0".to_string(),
325 metrics_enabled: true,
326 metrics_port: 9090,
327 workdir: Some(PathBuf::from("/app")),
328 },
329 simulation: SimulationConfig::default(),
330 kafka: Some(KafkaConfig {
331 bootstrap_servers: "kafka:9092".to_string(),
332 consumer_group: Some("varpulis-consumer".to_string()),
333 input_topic: Some("events".to_string()),
334 output_topic: Some("alerts".to_string()),
335 ..Default::default()
336 }),
337 http_webhook: Some(HttpWebhookConfig {
338 enabled: true,
339 port: 8080,
340 bind: "0.0.0.0".to_string(),
341 api_key: Some("your-api-key-here".to_string()),
342 rate_limit: 1000,
343 max_batch_size: 100,
344 }),
345 logging: LoggingConfig {
346 level: "info".to_string(),
347 format: "json".to_string(),
348 timestamps: true,
349 },
350 processing: ProcessingConfig {
351 workers: Some(4),
352 partition_by: Some("source_id".to_string()),
353 },
354 tls: None,
355 auth: Some(AuthConfig {
356 api_key: Some("your-websocket-api-key".to_string()),
357 }),
358 }
359 }
360
361 pub fn example_yaml() -> String {
363 serde_yaml::to_string(&Self::example()).unwrap_or_default()
364 }
365
366 pub fn example_toml() -> String {
368 toml::to_string_pretty(&Self::example()).unwrap_or_default()
369 }
370}
371
372#[derive(Debug, thiserror::Error)]
374pub enum ConfigError {
375 #[error("Failed to read config file {0}: {1}")]
376 IoError(PathBuf, String),
377
378 #[error("Failed to parse config: {0}")]
379 ParseError(String),
380}
381
382#[derive(Debug, Clone, Serialize, Deserialize, Default)]
402#[serde(default)]
403pub struct ProjectConfig {
404 pub remote: RemoteConfig,
406
407 pub deploy: DeployConfig,
409}
410
411#[derive(Debug, Clone, Serialize, Deserialize, Default)]
413#[serde(default)]
414pub struct RemoteConfig {
415 pub url: Option<String>,
417
418 pub api_key: Option<String>,
420}
421
422#[derive(Debug, Clone, Serialize, Deserialize, Default)]
424#[serde(default)]
425pub struct DeployConfig {
426 pub name: Option<String>,
428}
429
430impl ProjectConfig {
431 pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
433 let path = path.as_ref();
434 let content = std::fs::read_to_string(path)
435 .map_err(|e| ConfigError::IoError(path.to_path_buf(), e.to_string()))?;
436 toml::from_str(&content).map_err(|e| ConfigError::ParseError(e.to_string()))
437 }
438
439 pub fn discover(start_dir: &Path) -> Option<Self> {
442 let mut dir = start_dir;
443 loop {
444 let candidate = dir.join(".varpulis.toml");
445 if candidate.is_file() {
446 return Self::load(&candidate).ok();
447 }
448 dir = dir.parent()?;
449 }
450 }
451
452 pub fn discover_cwd() -> Option<Self> {
454 let cwd = std::env::current_dir().ok()?;
455 Self::discover(&cwd)
456 }
457
458 pub fn resolve_url(&self, cli_flag: Option<&str>) -> Option<String> {
460 cli_flag
461 .map(|s| s.to_string())
462 .or_else(|| self.remote.url.clone())
463 }
464
465 pub fn resolve_api_key(&self, cli_flag: Option<&str>) -> Option<String> {
467 cli_flag
468 .map(|s| s.to_string())
469 .or_else(|| self.remote.api_key.clone())
470 }
471
472 pub fn example() -> String {
474 r#"# Varpulis project configuration
475# Place this file in your project root as .varpulis.toml
476
477[remote]
478url = "http://localhost:9000"
479api_key = "your-api-key-here"
480
481[deploy]
482name = "my-pipeline"
483"#
484 .to_string()
485 }
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491
492 #[test]
493 fn test_default_config() {
494 let config = Config::default();
495 assert_eq!(config.server.port, 9000);
496 assert_eq!(config.server.bind, "127.0.0.1");
497 }
498
499 #[test]
500 fn test_yaml_parsing() {
501 let yaml = r#"
502query_file: /app/queries.vql
503server:
504 port: 8080
505 bind: "0.0.0.0"
506 metrics_enabled: true
507processing:
508 workers: 8
509"#;
510 let config = Config::from_yaml(yaml).unwrap();
511 assert_eq!(config.query_file, Some(PathBuf::from("/app/queries.vql")));
512 assert_eq!(config.server.port, 8080);
513 assert_eq!(config.server.bind, "0.0.0.0");
514 assert!(config.server.metrics_enabled);
515 assert_eq!(config.processing.workers, Some(8));
516 }
517
518 #[test]
519 fn test_toml_parsing() {
520 let toml = r#"
521query_file = "/app/queries.vql"
522
523[server]
524port = 8080
525bind = "0.0.0.0"
526metrics_enabled = true
527
528[processing]
529workers = 8
530"#;
531 let config = Config::from_toml(toml).unwrap();
532 assert_eq!(config.query_file, Some(PathBuf::from("/app/queries.vql")));
533 assert_eq!(config.server.port, 8080);
534 assert_eq!(config.server.bind, "0.0.0.0");
535 assert!(config.server.metrics_enabled);
536 assert_eq!(config.processing.workers, Some(8));
537 }
538
539 #[test]
540 fn test_config_merge() {
541 let mut base = Config::default();
542 let override_config = Config {
543 server: ServerConfig {
544 port: 8888,
545 ..Default::default()
546 },
547 ..Default::default()
548 };
549
550 base.merge(override_config);
551 assert_eq!(base.server.port, 8888);
552 }
553
554 #[test]
556 fn test_project_config_parse() {
557 let toml = r#"
558[remote]
559url = "http://prod.example.com:9000"
560api_key = "secret-123"
561
562[deploy]
563name = "fraud-detection"
564"#;
565 let config: ProjectConfig = toml::from_str(toml).unwrap();
566 assert_eq!(
567 config.remote.url.as_deref(),
568 Some("http://prod.example.com:9000")
569 );
570 assert_eq!(config.remote.api_key.as_deref(), Some("secret-123"));
571 assert_eq!(config.deploy.name.as_deref(), Some("fraud-detection"));
572 }
573
574 #[test]
575 fn test_project_config_partial() {
576 let toml = r#"
577[remote]
578url = "http://localhost:9000"
579"#;
580 let config: ProjectConfig = toml::from_str(toml).unwrap();
581 assert_eq!(config.remote.url.as_deref(), Some("http://localhost:9000"));
582 assert!(config.remote.api_key.is_none());
583 assert!(config.deploy.name.is_none());
584 }
585
586 #[test]
587 fn test_project_config_empty() {
588 let config: ProjectConfig = toml::from_str("").unwrap();
589 assert!(config.remote.url.is_none());
590 assert!(config.remote.api_key.is_none());
591 }
592
593 #[test]
594 fn test_project_config_resolve_url_flag_wins() {
595 let config = ProjectConfig {
596 remote: RemoteConfig {
597 url: Some("http://config-server:9000".to_string()),
598 ..Default::default()
599 },
600 ..Default::default()
601 };
602 assert_eq!(
603 config.resolve_url(Some("http://flag-server:9000")),
604 Some("http://flag-server:9000".to_string())
605 );
606 }
607
608 #[test]
609 fn test_project_config_resolve_url_config_fallback() {
610 let config = ProjectConfig {
611 remote: RemoteConfig {
612 url: Some("http://config-server:9000".to_string()),
613 ..Default::default()
614 },
615 ..Default::default()
616 };
617 assert_eq!(
618 config.resolve_url(None),
619 Some("http://config-server:9000".to_string())
620 );
621 }
622
623 #[test]
624 fn test_project_config_resolve_api_key() {
625 let config = ProjectConfig {
626 remote: RemoteConfig {
627 api_key: Some("config-key".to_string()),
628 ..Default::default()
629 },
630 ..Default::default()
631 };
632 assert_eq!(
634 config.resolve_api_key(Some("flag-key")),
635 Some("flag-key".to_string())
636 );
637 assert_eq!(config.resolve_api_key(None), Some("config-key".to_string()));
639 }
640
641 #[test]
642 fn test_project_config_discover_file() {
643 let dir = tempfile::tempdir().unwrap();
644 let config_path = dir.path().join(".varpulis.toml");
645 std::fs::write(
646 &config_path,
647 r#"
648[remote]
649url = "http://test:9000"
650api_key = "test-key"
651"#,
652 )
653 .unwrap();
654
655 let config = ProjectConfig::discover(dir.path()).unwrap();
656 assert_eq!(config.remote.url.as_deref(), Some("http://test:9000"));
657 assert_eq!(config.remote.api_key.as_deref(), Some("test-key"));
658 }
659
660 #[test]
661 fn test_project_config_discover_parent() {
662 let dir = tempfile::tempdir().unwrap();
663 let config_path = dir.path().join(".varpulis.toml");
664 std::fs::write(
665 &config_path,
666 r#"
667[remote]
668url = "http://parent:9000"
669"#,
670 )
671 .unwrap();
672
673 let subdir = dir.path().join("sub").join("dir");
675 std::fs::create_dir_all(&subdir).unwrap();
676
677 let config = ProjectConfig::discover(&subdir).unwrap();
678 assert_eq!(config.remote.url.as_deref(), Some("http://parent:9000"));
679 }
680
681 #[test]
682 fn test_project_config_discover_not_found() {
683 let dir = tempfile::tempdir().unwrap();
684 let config = ProjectConfig::discover(dir.path());
686 assert!(config.is_none());
687 }
688
689 #[test]
690 fn test_project_config_example() {
691 let example = ProjectConfig::example();
692 assert!(example.contains("[remote]"));
693 assert!(example.contains("url ="));
694 assert!(example.contains("api_key ="));
695 }
696}