Skip to main content

varpulis_cli/
config.rs

1//! Configuration file support for Varpulis
2//!
3//! Supports both YAML and TOML configuration files.
4//!
5//! # Example YAML configuration:
6//! ```yaml
7//! # Varpulis configuration file
8//!
9//! # Query file to run
10//! query_file: /path/to/queries.vql
11//!
12//! # Server settings
13//! server:
14//!   port: 9000
15//!   bind: "0.0.0.0"
16//!   metrics_enabled: true
17//!   metrics_port: 9090
18//!
19//! # Kafka connector settings
20//! kafka:
21//!   bootstrap_servers: "localhost:9092"
22//!   consumer_group: "varpulis-consumer"
23//!
24//! # Logging settings
25//! logging:
26//!   level: info
27//!   format: json
28//! ```
29
30use std::path::{Path, PathBuf};
31
32use serde::{Deserialize, Serialize};
33
34/// Main configuration structure
35#[derive(Debug, Clone, Serialize, Deserialize, Default)]
36#[serde(default)]
37pub struct Config {
38    /// Path to the query file (.vql)
39    pub query_file: Option<PathBuf>,
40
41    /// Server configuration
42    pub server: ServerConfig,
43
44    /// Simulation configuration
45    pub simulation: SimulationConfig,
46
47    /// Kafka connector configuration
48    pub kafka: Option<KafkaConfig>,
49
50    /// HTTP webhook configuration
51    pub http_webhook: Option<HttpWebhookConfig>,
52
53    /// Logging configuration
54    pub logging: LoggingConfig,
55
56    /// Processing configuration
57    pub processing: ProcessingConfig,
58
59    /// TLS configuration
60    pub tls: Option<TlsConfig>,
61
62    /// Authentication configuration
63    pub auth: Option<AuthConfig>,
64}
65
66/// Server configuration
67#[derive(Debug, Clone, Serialize, Deserialize)]
68#[serde(default)]
69pub struct ServerConfig {
70    /// Server port
71    pub port: u16,
72
73    /// Bind address
74    pub bind: String,
75
76    /// Enable metrics endpoint
77    pub metrics_enabled: bool,
78
79    /// Metrics port
80    pub metrics_port: u16,
81
82    /// Working directory
83    pub workdir: Option<PathBuf>,
84}
85
86impl Default for ServerConfig {
87    fn default() -> Self {
88        Self {
89            port: 9000,
90            bind: "127.0.0.1".to_string(),
91            metrics_enabled: false,
92            metrics_port: 9090,
93            workdir: None,
94        }
95    }
96}
97
98/// Simulation configuration
99#[derive(Debug, Clone, Serialize, Deserialize, Default)]
100#[serde(default)]
101pub struct SimulationConfig {
102    /// Replay events with real-time timing delays
103    pub timed: bool,
104
105    /// Stream events line-by-line instead of preloading
106    pub streaming: bool,
107
108    /// Verbose output
109    pub verbose: bool,
110
111    /// Event file path
112    pub events_file: Option<PathBuf>,
113}
114
115/// Kafka connector configuration
116#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(default)]
118pub struct KafkaConfig {
119    /// Kafka bootstrap servers
120    pub bootstrap_servers: String,
121
122    /// Consumer group ID
123    pub consumer_group: Option<String>,
124
125    /// Input topic
126    pub input_topic: Option<String>,
127
128    /// Output topic
129    pub output_topic: Option<String>,
130
131    /// Enable auto-commit
132    pub auto_commit: bool,
133
134    /// Auto offset reset (earliest, latest)
135    pub auto_offset_reset: String,
136}
137
138impl Default for KafkaConfig {
139    fn default() -> Self {
140        Self {
141            bootstrap_servers: "localhost:9092".to_string(),
142            consumer_group: None,
143            input_topic: None,
144            output_topic: None,
145            auto_commit: true,
146            auto_offset_reset: "latest".to_string(),
147        }
148    }
149}
150
151/// HTTP webhook configuration
152#[derive(Debug, Clone, Serialize, Deserialize)]
153#[serde(default)]
154pub struct HttpWebhookConfig {
155    /// Enable HTTP webhook input
156    pub enabled: bool,
157
158    /// Webhook port
159    pub port: u16,
160
161    /// Bind address
162    pub bind: String,
163
164    /// API key for authentication
165    pub api_key: Option<String>,
166
167    /// Rate limit (requests per second, 0 = unlimited)
168    pub rate_limit: u32,
169
170    /// Maximum batch size
171    pub max_batch_size: usize,
172}
173
174impl Default for HttpWebhookConfig {
175    fn default() -> Self {
176        Self {
177            enabled: false,
178            port: 8080,
179            bind: "0.0.0.0".to_string(),
180            api_key: None,
181            rate_limit: 0,
182            max_batch_size: 1000,
183        }
184    }
185}
186
187/// Logging configuration
188#[derive(Debug, Clone, Serialize, Deserialize)]
189#[serde(default)]
190pub struct LoggingConfig {
191    /// Log level (trace, debug, info, warn, error)
192    pub level: String,
193
194    /// Log format (text, json)
195    pub format: String,
196
197    /// Include timestamps
198    pub timestamps: bool,
199}
200
201impl Default for LoggingConfig {
202    fn default() -> Self {
203        Self {
204            level: "info".to_string(),
205            format: "text".to_string(),
206            timestamps: true,
207        }
208    }
209}
210
211/// Processing configuration
212#[derive(Debug, Clone, Serialize, Deserialize, Default)]
213#[serde(default)]
214pub struct ProcessingConfig {
215    /// Number of worker threads
216    pub workers: Option<usize>,
217
218    /// Partition key field
219    pub partition_by: Option<String>,
220}
221
222/// TLS configuration
223#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct TlsConfig {
225    /// Path to certificate file
226    pub cert_file: PathBuf,
227
228    /// Path to private key file
229    pub key_file: PathBuf,
230}
231
232/// Authentication configuration
233#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct AuthConfig {
235    /// API key
236    pub api_key: Option<String>,
237}
238
239impl Config {
240    /// Load configuration from a file (YAML or TOML, auto-detected by extension)
241    pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
242        let path = path.as_ref();
243        let content = std::fs::read_to_string(path)
244            .map_err(|e| ConfigError::IoError(path.to_path_buf(), e.to_string()))?;
245
246        let extension = path
247            .extension()
248            .and_then(|e| e.to_str())
249            .unwrap_or("")
250            .to_lowercase();
251
252        match extension.as_str() {
253            "yaml" | "yml" => Self::from_yaml(&content),
254            "toml" => Self::from_toml(&content),
255            _ => {
256                // Try YAML first, then TOML
257                Self::from_yaml(&content).or_else(|_| Self::from_toml(&content))
258            }
259        }
260    }
261
262    /// Parse configuration from YAML string
263    pub fn from_yaml(content: &str) -> Result<Self, ConfigError> {
264        serde_yaml::from_str(content).map_err(|e| ConfigError::ParseError(e.to_string()))
265    }
266
267    /// Parse configuration from TOML string
268    pub fn from_toml(content: &str) -> Result<Self, ConfigError> {
269        toml::from_str(content).map_err(|e| ConfigError::ParseError(e.to_string()))
270    }
271
272    /// Merge another config into this one (other values take precedence if set)
273    pub fn merge(&mut self, other: Self) {
274        if other.query_file.is_some() {
275            self.query_file = other.query_file;
276        }
277
278        // Merge server config
279        if other.server.port != ServerConfig::default().port {
280            self.server.port = other.server.port;
281        }
282        if other.server.bind != ServerConfig::default().bind {
283            self.server.bind = other.server.bind;
284        }
285        if other.server.metrics_enabled {
286            self.server.metrics_enabled = true;
287        }
288        if other.server.metrics_port != ServerConfig::default().metrics_port {
289            self.server.metrics_port = other.server.metrics_port;
290        }
291        if other.server.workdir.is_some() {
292            self.server.workdir = other.server.workdir;
293        }
294
295        // Merge processing config
296        if other.processing.workers.is_some() {
297            self.processing.workers = other.processing.workers;
298        }
299        if other.processing.partition_by.is_some() {
300            self.processing.partition_by = other.processing.partition_by;
301        }
302
303        // Replace optional configs if provided
304        if other.kafka.is_some() {
305            self.kafka = other.kafka;
306        }
307        if other.http_webhook.is_some() {
308            self.http_webhook = other.http_webhook;
309        }
310        if other.tls.is_some() {
311            self.tls = other.tls;
312        }
313        if other.auth.is_some() {
314            self.auth = other.auth;
315        }
316    }
317
318    /// Create an example configuration
319    pub fn example() -> Self {
320        Self {
321            query_file: Some(PathBuf::from("/app/queries/queries.vql")),
322            server: ServerConfig {
323                port: 9000,
324                bind: "0.0.0.0".to_string(),
325                metrics_enabled: true,
326                metrics_port: 9090,
327                workdir: Some(PathBuf::from("/app")),
328            },
329            simulation: SimulationConfig::default(),
330            kafka: Some(KafkaConfig {
331                bootstrap_servers: "kafka:9092".to_string(),
332                consumer_group: Some("varpulis-consumer".to_string()),
333                input_topic: Some("events".to_string()),
334                output_topic: Some("alerts".to_string()),
335                ..Default::default()
336            }),
337            http_webhook: Some(HttpWebhookConfig {
338                enabled: true,
339                port: 8080,
340                bind: "0.0.0.0".to_string(),
341                api_key: Some("your-api-key-here".to_string()),
342                rate_limit: 1000,
343                max_batch_size: 100,
344            }),
345            logging: LoggingConfig {
346                level: "info".to_string(),
347                format: "json".to_string(),
348                timestamps: true,
349            },
350            processing: ProcessingConfig {
351                workers: Some(4),
352                partition_by: Some("source_id".to_string()),
353            },
354            tls: None,
355            auth: Some(AuthConfig {
356                api_key: Some("your-websocket-api-key".to_string()),
357            }),
358        }
359    }
360
361    /// Generate example YAML configuration
362    pub fn example_yaml() -> String {
363        serde_yaml::to_string(&Self::example()).unwrap_or_default()
364    }
365
366    /// Generate example TOML configuration
367    pub fn example_toml() -> String {
368        toml::to_string_pretty(&Self::example()).unwrap_or_default()
369    }
370}
371
372/// Configuration error types
373#[derive(Debug, thiserror::Error)]
374pub enum ConfigError {
375    #[error("Failed to read config file {0}: {1}")]
376    IoError(PathBuf, String),
377
378    #[error("Failed to parse config: {0}")]
379    ParseError(String),
380}
381
382// =============================================================================
383// Project Config (.varpulis.toml) — lightweight config for SaaS CLI commands
384// =============================================================================
385
386/// Project-level configuration stored in `.varpulis.toml`.
387///
388/// This file stores the remote server URL and API key so that CLI commands
389/// like `deploy`, `status`, `pipelines`, and `undeploy` don't need
390/// `--server` and `--api-key` flags on every invocation.
391///
392/// # Example `.varpulis.toml`:
393/// ```toml
394/// [remote]
395/// url = "http://localhost:9000"
396/// api_key = "my-secret-key"
397///
398/// [deploy]
399/// name = "my-pipeline"
400/// ```
401#[derive(Debug, Clone, Serialize, Deserialize, Default)]
402#[serde(default)]
403pub struct ProjectConfig {
404    /// Remote server connection settings
405    pub remote: RemoteConfig,
406
407    /// Default deploy settings
408    pub deploy: DeployConfig,
409}
410
411/// Remote server connection settings
412#[derive(Debug, Clone, Serialize, Deserialize, Default)]
413#[serde(default)]
414pub struct RemoteConfig {
415    /// Server URL (e.g. "http://localhost:9000")
416    pub url: Option<String>,
417
418    /// Tenant API key
419    pub api_key: Option<String>,
420}
421
422/// Default deployment settings
423#[derive(Debug, Clone, Serialize, Deserialize, Default)]
424#[serde(default)]
425pub struct DeployConfig {
426    /// Default pipeline name
427    pub name: Option<String>,
428}
429
430impl ProjectConfig {
431    /// Load project config from a `.varpulis.toml` file.
432    pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
433        let path = path.as_ref();
434        let content = std::fs::read_to_string(path)
435            .map_err(|e| ConfigError::IoError(path.to_path_buf(), e.to_string()))?;
436        toml::from_str(&content).map_err(|e| ConfigError::ParseError(e.to_string()))
437    }
438
439    /// Discover `.varpulis.toml` by walking up from the given directory.
440    /// Returns `None` if no config file is found.
441    pub fn discover(start_dir: &Path) -> Option<Self> {
442        let mut dir = start_dir;
443        loop {
444            let candidate = dir.join(".varpulis.toml");
445            if candidate.is_file() {
446                return Self::load(&candidate).ok();
447            }
448            dir = dir.parent()?;
449        }
450    }
451
452    /// Discover from the current working directory.
453    pub fn discover_cwd() -> Option<Self> {
454        let cwd = std::env::current_dir().ok()?;
455        Self::discover(&cwd)
456    }
457
458    /// Resolve the server URL: CLI flag > env var > project config.
459    pub fn resolve_url(&self, cli_flag: Option<&str>) -> Option<String> {
460        cli_flag
461            .map(|s| s.to_string())
462            .or_else(|| self.remote.url.clone())
463    }
464
465    /// Resolve the API key: CLI flag > env var > project config.
466    pub fn resolve_api_key(&self, cli_flag: Option<&str>) -> Option<String> {
467        cli_flag
468            .map(|s| s.to_string())
469            .or_else(|| self.remote.api_key.clone())
470    }
471
472    /// Generate example project config content.
473    pub fn example() -> String {
474        r#"# Varpulis project configuration
475# Place this file in your project root as .varpulis.toml
476
477[remote]
478url = "http://localhost:9000"
479api_key = "your-api-key-here"
480
481[deploy]
482name = "my-pipeline"
483"#
484        .to_string()
485    }
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491
492    #[test]
493    fn test_default_config() {
494        let config = Config::default();
495        assert_eq!(config.server.port, 9000);
496        assert_eq!(config.server.bind, "127.0.0.1");
497    }
498
499    #[test]
500    fn test_yaml_parsing() {
501        let yaml = r#"
502query_file: /app/queries.vql
503server:
504  port: 8080
505  bind: "0.0.0.0"
506  metrics_enabled: true
507processing:
508  workers: 8
509"#;
510        let config = Config::from_yaml(yaml).unwrap();
511        assert_eq!(config.query_file, Some(PathBuf::from("/app/queries.vql")));
512        assert_eq!(config.server.port, 8080);
513        assert_eq!(config.server.bind, "0.0.0.0");
514        assert!(config.server.metrics_enabled);
515        assert_eq!(config.processing.workers, Some(8));
516    }
517
518    #[test]
519    fn test_toml_parsing() {
520        let toml = r#"
521query_file = "/app/queries.vql"
522
523[server]
524port = 8080
525bind = "0.0.0.0"
526metrics_enabled = true
527
528[processing]
529workers = 8
530"#;
531        let config = Config::from_toml(toml).unwrap();
532        assert_eq!(config.query_file, Some(PathBuf::from("/app/queries.vql")));
533        assert_eq!(config.server.port, 8080);
534        assert_eq!(config.server.bind, "0.0.0.0");
535        assert!(config.server.metrics_enabled);
536        assert_eq!(config.processing.workers, Some(8));
537    }
538
539    #[test]
540    fn test_config_merge() {
541        let mut base = Config::default();
542        let override_config = Config {
543            server: ServerConfig {
544                port: 8888,
545                ..Default::default()
546            },
547            ..Default::default()
548        };
549
550        base.merge(override_config);
551        assert_eq!(base.server.port, 8888);
552    }
553
554    // Project config tests
555    #[test]
556    fn test_project_config_parse() {
557        let toml = r#"
558[remote]
559url = "http://prod.example.com:9000"
560api_key = "secret-123"
561
562[deploy]
563name = "fraud-detection"
564"#;
565        let config: ProjectConfig = toml::from_str(toml).unwrap();
566        assert_eq!(
567            config.remote.url.as_deref(),
568            Some("http://prod.example.com:9000")
569        );
570        assert_eq!(config.remote.api_key.as_deref(), Some("secret-123"));
571        assert_eq!(config.deploy.name.as_deref(), Some("fraud-detection"));
572    }
573
574    #[test]
575    fn test_project_config_partial() {
576        let toml = r#"
577[remote]
578url = "http://localhost:9000"
579"#;
580        let config: ProjectConfig = toml::from_str(toml).unwrap();
581        assert_eq!(config.remote.url.as_deref(), Some("http://localhost:9000"));
582        assert!(config.remote.api_key.is_none());
583        assert!(config.deploy.name.is_none());
584    }
585
586    #[test]
587    fn test_project_config_empty() {
588        let config: ProjectConfig = toml::from_str("").unwrap();
589        assert!(config.remote.url.is_none());
590        assert!(config.remote.api_key.is_none());
591    }
592
593    #[test]
594    fn test_project_config_resolve_url_flag_wins() {
595        let config = ProjectConfig {
596            remote: RemoteConfig {
597                url: Some("http://config-server:9000".to_string()),
598                ..Default::default()
599            },
600            ..Default::default()
601        };
602        assert_eq!(
603            config.resolve_url(Some("http://flag-server:9000")),
604            Some("http://flag-server:9000".to_string())
605        );
606    }
607
608    #[test]
609    fn test_project_config_resolve_url_config_fallback() {
610        let config = ProjectConfig {
611            remote: RemoteConfig {
612                url: Some("http://config-server:9000".to_string()),
613                ..Default::default()
614            },
615            ..Default::default()
616        };
617        assert_eq!(
618            config.resolve_url(None),
619            Some("http://config-server:9000".to_string())
620        );
621    }
622
623    #[test]
624    fn test_project_config_resolve_api_key() {
625        let config = ProjectConfig {
626            remote: RemoteConfig {
627                api_key: Some("config-key".to_string()),
628                ..Default::default()
629            },
630            ..Default::default()
631        };
632        // Flag wins
633        assert_eq!(
634            config.resolve_api_key(Some("flag-key")),
635            Some("flag-key".to_string())
636        );
637        // Fallback to config
638        assert_eq!(config.resolve_api_key(None), Some("config-key".to_string()));
639    }
640
641    #[test]
642    fn test_project_config_discover_file() {
643        let dir = tempfile::tempdir().unwrap();
644        let config_path = dir.path().join(".varpulis.toml");
645        std::fs::write(
646            &config_path,
647            r#"
648[remote]
649url = "http://test:9000"
650api_key = "test-key"
651"#,
652        )
653        .unwrap();
654
655        let config = ProjectConfig::discover(dir.path()).unwrap();
656        assert_eq!(config.remote.url.as_deref(), Some("http://test:9000"));
657        assert_eq!(config.remote.api_key.as_deref(), Some("test-key"));
658    }
659
660    #[test]
661    fn test_project_config_discover_parent() {
662        let dir = tempfile::tempdir().unwrap();
663        let config_path = dir.path().join(".varpulis.toml");
664        std::fs::write(
665            &config_path,
666            r#"
667[remote]
668url = "http://parent:9000"
669"#,
670        )
671        .unwrap();
672
673        // Create a subdirectory and discover from there
674        let subdir = dir.path().join("sub").join("dir");
675        std::fs::create_dir_all(&subdir).unwrap();
676
677        let config = ProjectConfig::discover(&subdir).unwrap();
678        assert_eq!(config.remote.url.as_deref(), Some("http://parent:9000"));
679    }
680
681    #[test]
682    fn test_project_config_discover_not_found() {
683        let dir = tempfile::tempdir().unwrap();
684        // No .varpulis.toml exists
685        let config = ProjectConfig::discover(dir.path());
686        assert!(config.is_none());
687    }
688
689    #[test]
690    fn test_project_config_example() {
691        let example = ProjectConfig::example();
692        assert!(example.contains("[remote]"));
693        assert!(example.contains("url ="));
694        assert!(example.contains("api_key ="));
695    }
696}