Skip to main content

varpulis_cli/
config.rs

1//! Configuration file support for Varpulis
2//!
3//! Supports both YAML and TOML configuration files.
4//!
5//! # Example YAML configuration:
6//! ```yaml
7//! # Varpulis configuration file
8//!
9//! # Query file to run
10//! query_file: /path/to/queries.vql
11//!
12//! # Server settings
13//! server:
14//!   port: 9000
15//!   bind: "0.0.0.0"
16//!   metrics_enabled: true
17//!   metrics_port: 9090
18//!
19//! # Kafka connector settings
20//! kafka:
21//!   bootstrap_servers: "localhost:9092"
22//!   consumer_group: "varpulis-consumer"
23//!
24//! # Logging settings
25//! logging:
26//!   level: info
27//!   format: json
28//! ```
29
30use serde::{Deserialize, Serialize};
31use std::path::{Path, PathBuf};
32
33/// Main configuration structure
34#[derive(Debug, Clone, Serialize, Deserialize, Default)]
35#[serde(default)]
36pub struct Config {
37    /// Path to the query file (.vql)
38    pub query_file: Option<PathBuf>,
39
40    /// Server configuration
41    pub server: ServerConfig,
42
43    /// Simulation configuration
44    pub simulation: SimulationConfig,
45
46    /// Kafka connector configuration
47    pub kafka: Option<KafkaConfig>,
48
49    /// HTTP webhook configuration
50    pub http_webhook: Option<HttpWebhookConfig>,
51
52    /// Logging configuration
53    pub logging: LoggingConfig,
54
55    /// Processing configuration
56    pub processing: ProcessingConfig,
57
58    /// TLS configuration
59    pub tls: Option<TlsConfig>,
60
61    /// Authentication configuration
62    pub auth: Option<AuthConfig>,
63}
64
65/// Server configuration
66#[derive(Debug, Clone, Serialize, Deserialize)]
67#[serde(default)]
68pub struct ServerConfig {
69    /// Server port
70    pub port: u16,
71
72    /// Bind address
73    pub bind: String,
74
75    /// Enable metrics endpoint
76    pub metrics_enabled: bool,
77
78    /// Metrics port
79    pub metrics_port: u16,
80
81    /// Working directory
82    pub workdir: Option<PathBuf>,
83}
84
85impl Default for ServerConfig {
86    fn default() -> Self {
87        Self {
88            port: 9000,
89            bind: "127.0.0.1".to_string(),
90            metrics_enabled: false,
91            metrics_port: 9090,
92            workdir: None,
93        }
94    }
95}
96
97/// Simulation configuration
98#[derive(Debug, Clone, Serialize, Deserialize, Default)]
99#[serde(default)]
100pub struct SimulationConfig {
101    /// Replay events with real-time timing delays
102    pub timed: bool,
103
104    /// Stream events line-by-line instead of preloading
105    pub streaming: bool,
106
107    /// Verbose output
108    pub verbose: bool,
109
110    /// Event file path
111    pub events_file: Option<PathBuf>,
112}
113
114/// Kafka connector configuration
115#[derive(Debug, Clone, Serialize, Deserialize)]
116#[serde(default)]
117pub struct KafkaConfig {
118    /// Kafka bootstrap servers
119    pub bootstrap_servers: String,
120
121    /// Consumer group ID
122    pub consumer_group: Option<String>,
123
124    /// Input topic
125    pub input_topic: Option<String>,
126
127    /// Output topic
128    pub output_topic: Option<String>,
129
130    /// Enable auto-commit
131    pub auto_commit: bool,
132
133    /// Auto offset reset (earliest, latest)
134    pub auto_offset_reset: String,
135}
136
137impl Default for KafkaConfig {
138    fn default() -> Self {
139        Self {
140            bootstrap_servers: "localhost:9092".to_string(),
141            consumer_group: None,
142            input_topic: None,
143            output_topic: None,
144            auto_commit: true,
145            auto_offset_reset: "latest".to_string(),
146        }
147    }
148}
149
150/// HTTP webhook configuration
151#[derive(Debug, Clone, Serialize, Deserialize)]
152#[serde(default)]
153pub struct HttpWebhookConfig {
154    /// Enable HTTP webhook input
155    pub enabled: bool,
156
157    /// Webhook port
158    pub port: u16,
159
160    /// Bind address
161    pub bind: String,
162
163    /// API key for authentication
164    pub api_key: Option<String>,
165
166    /// Rate limit (requests per second, 0 = unlimited)
167    pub rate_limit: u32,
168
169    /// Maximum batch size
170    pub max_batch_size: usize,
171}
172
173impl Default for HttpWebhookConfig {
174    fn default() -> Self {
175        Self {
176            enabled: false,
177            port: 8080,
178            bind: "0.0.0.0".to_string(),
179            api_key: None,
180            rate_limit: 0,
181            max_batch_size: 1000,
182        }
183    }
184}
185
186/// Logging configuration
187#[derive(Debug, Clone, Serialize, Deserialize)]
188#[serde(default)]
189pub struct LoggingConfig {
190    /// Log level (trace, debug, info, warn, error)
191    pub level: String,
192
193    /// Log format (text, json)
194    pub format: String,
195
196    /// Include timestamps
197    pub timestamps: bool,
198}
199
200impl Default for LoggingConfig {
201    fn default() -> Self {
202        Self {
203            level: "info".to_string(),
204            format: "text".to_string(),
205            timestamps: true,
206        }
207    }
208}
209
210/// Processing configuration
211#[derive(Debug, Clone, Serialize, Deserialize, Default)]
212#[serde(default)]
213pub struct ProcessingConfig {
214    /// Number of worker threads
215    pub workers: Option<usize>,
216
217    /// Partition key field
218    pub partition_by: Option<String>,
219}
220
221/// TLS configuration
222#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct TlsConfig {
224    /// Path to certificate file
225    pub cert_file: PathBuf,
226
227    /// Path to private key file
228    pub key_file: PathBuf,
229}
230
231/// Authentication configuration
232#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct AuthConfig {
234    /// API key
235    pub api_key: Option<String>,
236}
237
238impl Config {
239    /// Load configuration from a file (YAML or TOML, auto-detected by extension)
240    pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
241        let path = path.as_ref();
242        let content = std::fs::read_to_string(path)
243            .map_err(|e| ConfigError::IoError(path.to_path_buf(), e.to_string()))?;
244
245        let extension = path
246            .extension()
247            .and_then(|e| e.to_str())
248            .unwrap_or("")
249            .to_lowercase();
250
251        match extension.as_str() {
252            "yaml" | "yml" => Self::from_yaml(&content),
253            "toml" => Self::from_toml(&content),
254            _ => {
255                // Try YAML first, then TOML
256                Self::from_yaml(&content).or_else(|_| Self::from_toml(&content))
257            }
258        }
259    }
260
261    /// Parse configuration from YAML string
262    pub fn from_yaml(content: &str) -> Result<Self, ConfigError> {
263        serde_yaml::from_str(content).map_err(|e| ConfigError::ParseError(e.to_string()))
264    }
265
266    /// Parse configuration from TOML string
267    pub fn from_toml(content: &str) -> Result<Self, ConfigError> {
268        toml::from_str(content).map_err(|e| ConfigError::ParseError(e.to_string()))
269    }
270
271    /// Merge another config into this one (other values take precedence if set)
272    pub fn merge(&mut self, other: Self) {
273        if other.query_file.is_some() {
274            self.query_file = other.query_file;
275        }
276
277        // Merge server config
278        if other.server.port != ServerConfig::default().port {
279            self.server.port = other.server.port;
280        }
281        if other.server.bind != ServerConfig::default().bind {
282            self.server.bind = other.server.bind;
283        }
284        if other.server.metrics_enabled {
285            self.server.metrics_enabled = true;
286        }
287        if other.server.metrics_port != ServerConfig::default().metrics_port {
288            self.server.metrics_port = other.server.metrics_port;
289        }
290        if other.server.workdir.is_some() {
291            self.server.workdir = other.server.workdir;
292        }
293
294        // Merge processing config
295        if other.processing.workers.is_some() {
296            self.processing.workers = other.processing.workers;
297        }
298        if other.processing.partition_by.is_some() {
299            self.processing.partition_by = other.processing.partition_by;
300        }
301
302        // Replace optional configs if provided
303        if other.kafka.is_some() {
304            self.kafka = other.kafka;
305        }
306        if other.http_webhook.is_some() {
307            self.http_webhook = other.http_webhook;
308        }
309        if other.tls.is_some() {
310            self.tls = other.tls;
311        }
312        if other.auth.is_some() {
313            self.auth = other.auth;
314        }
315    }
316
317    /// Create an example configuration
318    pub fn example() -> Self {
319        Self {
320            query_file: Some(PathBuf::from("/app/queries/queries.vql")),
321            server: ServerConfig {
322                port: 9000,
323                bind: "0.0.0.0".to_string(),
324                metrics_enabled: true,
325                metrics_port: 9090,
326                workdir: Some(PathBuf::from("/app")),
327            },
328            simulation: SimulationConfig::default(),
329            kafka: Some(KafkaConfig {
330                bootstrap_servers: "kafka:9092".to_string(),
331                consumer_group: Some("varpulis-consumer".to_string()),
332                input_topic: Some("events".to_string()),
333                output_topic: Some("alerts".to_string()),
334                ..Default::default()
335            }),
336            http_webhook: Some(HttpWebhookConfig {
337                enabled: true,
338                port: 8080,
339                bind: "0.0.0.0".to_string(),
340                api_key: Some("your-api-key-here".to_string()),
341                rate_limit: 1000,
342                max_batch_size: 100,
343            }),
344            logging: LoggingConfig {
345                level: "info".to_string(),
346                format: "json".to_string(),
347                timestamps: true,
348            },
349            processing: ProcessingConfig {
350                workers: Some(4),
351                partition_by: Some("source_id".to_string()),
352            },
353            tls: None,
354            auth: Some(AuthConfig {
355                api_key: Some("your-websocket-api-key".to_string()),
356            }),
357        }
358    }
359
360    /// Generate example YAML configuration
361    pub fn example_yaml() -> String {
362        serde_yaml::to_string(&Self::example()).unwrap_or_default()
363    }
364
365    /// Generate example TOML configuration
366    pub fn example_toml() -> String {
367        toml::to_string_pretty(&Self::example()).unwrap_or_default()
368    }
369}
370
371/// Configuration error types
372#[derive(Debug, thiserror::Error)]
373pub enum ConfigError {
374    #[error("Failed to read config file {0}: {1}")]
375    IoError(PathBuf, String),
376
377    #[error("Failed to parse config: {0}")]
378    ParseError(String),
379}
380
381// =============================================================================
382// Project Config (.varpulis.toml) — lightweight config for SaaS CLI commands
383// =============================================================================
384
385/// Project-level configuration stored in `.varpulis.toml`.
386///
387/// This file stores the remote server URL and API key so that CLI commands
388/// like `deploy`, `status`, `pipelines`, and `undeploy` don't need
389/// `--server` and `--api-key` flags on every invocation.
390///
391/// # Example `.varpulis.toml`:
392/// ```toml
393/// [remote]
394/// url = "http://localhost:9000"
395/// api_key = "my-secret-key"
396///
397/// [deploy]
398/// name = "my-pipeline"
399/// ```
400#[derive(Debug, Clone, Serialize, Deserialize, Default)]
401#[serde(default)]
402pub struct ProjectConfig {
403    /// Remote server connection settings
404    pub remote: RemoteConfig,
405
406    /// Default deploy settings
407    pub deploy: DeployConfig,
408}
409
410/// Remote server connection settings
411#[derive(Debug, Clone, Serialize, Deserialize, Default)]
412#[serde(default)]
413pub struct RemoteConfig {
414    /// Server URL (e.g. "http://localhost:9000")
415    pub url: Option<String>,
416
417    /// Tenant API key
418    pub api_key: Option<String>,
419}
420
421/// Default deployment settings
422#[derive(Debug, Clone, Serialize, Deserialize, Default)]
423#[serde(default)]
424pub struct DeployConfig {
425    /// Default pipeline name
426    pub name: Option<String>,
427}
428
429impl ProjectConfig {
430    /// Load project config from a `.varpulis.toml` file.
431    pub fn load<P: AsRef<Path>>(path: P) -> Result<Self, ConfigError> {
432        let path = path.as_ref();
433        let content = std::fs::read_to_string(path)
434            .map_err(|e| ConfigError::IoError(path.to_path_buf(), e.to_string()))?;
435        toml::from_str(&content).map_err(|e| ConfigError::ParseError(e.to_string()))
436    }
437
438    /// Discover `.varpulis.toml` by walking up from the given directory.
439    /// Returns `None` if no config file is found.
440    pub fn discover(start_dir: &Path) -> Option<Self> {
441        let mut dir = start_dir;
442        loop {
443            let candidate = dir.join(".varpulis.toml");
444            if candidate.is_file() {
445                return Self::load(&candidate).ok();
446            }
447            dir = dir.parent()?;
448        }
449    }
450
451    /// Discover from the current working directory.
452    pub fn discover_cwd() -> Option<Self> {
453        let cwd = std::env::current_dir().ok()?;
454        Self::discover(&cwd)
455    }
456
457    /// Resolve the server URL: CLI flag > env var > project config.
458    pub fn resolve_url(&self, cli_flag: Option<&str>) -> Option<String> {
459        cli_flag
460            .map(|s| s.to_string())
461            .or_else(|| self.remote.url.clone())
462    }
463
464    /// Resolve the API key: CLI flag > env var > project config.
465    pub fn resolve_api_key(&self, cli_flag: Option<&str>) -> Option<String> {
466        cli_flag
467            .map(|s| s.to_string())
468            .or_else(|| self.remote.api_key.clone())
469    }
470
471    /// Generate example project config content.
472    pub fn example() -> String {
473        r#"# Varpulis project configuration
474# Place this file in your project root as .varpulis.toml
475
476[remote]
477url = "http://localhost:9000"
478api_key = "your-api-key-here"
479
480[deploy]
481name = "my-pipeline"
482"#
483        .to_string()
484    }
485}
486
487#[cfg(test)]
488mod tests {
489    use super::*;
490
491    #[test]
492    fn test_default_config() {
493        let config = Config::default();
494        assert_eq!(config.server.port, 9000);
495        assert_eq!(config.server.bind, "127.0.0.1");
496    }
497
498    #[test]
499    fn test_yaml_parsing() {
500        let yaml = r#"
501query_file: /app/queries.vql
502server:
503  port: 8080
504  bind: "0.0.0.0"
505  metrics_enabled: true
506processing:
507  workers: 8
508"#;
509        let config = Config::from_yaml(yaml).unwrap();
510        assert_eq!(config.query_file, Some(PathBuf::from("/app/queries.vql")));
511        assert_eq!(config.server.port, 8080);
512        assert_eq!(config.server.bind, "0.0.0.0");
513        assert!(config.server.metrics_enabled);
514        assert_eq!(config.processing.workers, Some(8));
515    }
516
517    #[test]
518    fn test_toml_parsing() {
519        let toml = r#"
520query_file = "/app/queries.vql"
521
522[server]
523port = 8080
524bind = "0.0.0.0"
525metrics_enabled = true
526
527[processing]
528workers = 8
529"#;
530        let config = Config::from_toml(toml).unwrap();
531        assert_eq!(config.query_file, Some(PathBuf::from("/app/queries.vql")));
532        assert_eq!(config.server.port, 8080);
533        assert_eq!(config.server.bind, "0.0.0.0");
534        assert!(config.server.metrics_enabled);
535        assert_eq!(config.processing.workers, Some(8));
536    }
537
538    #[test]
539    fn test_config_merge() {
540        let mut base = Config::default();
541        let override_config = Config {
542            server: ServerConfig {
543                port: 8888,
544                ..Default::default()
545            },
546            ..Default::default()
547        };
548
549        base.merge(override_config);
550        assert_eq!(base.server.port, 8888);
551    }
552
553    // Project config tests
554    #[test]
555    fn test_project_config_parse() {
556        let toml = r#"
557[remote]
558url = "http://prod.example.com:9000"
559api_key = "secret-123"
560
561[deploy]
562name = "fraud-detection"
563"#;
564        let config: ProjectConfig = toml::from_str(toml).unwrap();
565        assert_eq!(
566            config.remote.url.as_deref(),
567            Some("http://prod.example.com:9000")
568        );
569        assert_eq!(config.remote.api_key.as_deref(), Some("secret-123"));
570        assert_eq!(config.deploy.name.as_deref(), Some("fraud-detection"));
571    }
572
573    #[test]
574    fn test_project_config_partial() {
575        let toml = r#"
576[remote]
577url = "http://localhost:9000"
578"#;
579        let config: ProjectConfig = toml::from_str(toml).unwrap();
580        assert_eq!(config.remote.url.as_deref(), Some("http://localhost:9000"));
581        assert!(config.remote.api_key.is_none());
582        assert!(config.deploy.name.is_none());
583    }
584
585    #[test]
586    fn test_project_config_empty() {
587        let config: ProjectConfig = toml::from_str("").unwrap();
588        assert!(config.remote.url.is_none());
589        assert!(config.remote.api_key.is_none());
590    }
591
592    #[test]
593    fn test_project_config_resolve_url_flag_wins() {
594        let config = ProjectConfig {
595            remote: RemoteConfig {
596                url: Some("http://config-server:9000".to_string()),
597                ..Default::default()
598            },
599            ..Default::default()
600        };
601        assert_eq!(
602            config.resolve_url(Some("http://flag-server:9000")),
603            Some("http://flag-server:9000".to_string())
604        );
605    }
606
607    #[test]
608    fn test_project_config_resolve_url_config_fallback() {
609        let config = ProjectConfig {
610            remote: RemoteConfig {
611                url: Some("http://config-server:9000".to_string()),
612                ..Default::default()
613            },
614            ..Default::default()
615        };
616        assert_eq!(
617            config.resolve_url(None),
618            Some("http://config-server:9000".to_string())
619        );
620    }
621
622    #[test]
623    fn test_project_config_resolve_api_key() {
624        let config = ProjectConfig {
625            remote: RemoteConfig {
626                api_key: Some("config-key".to_string()),
627                ..Default::default()
628            },
629            ..Default::default()
630        };
631        // Flag wins
632        assert_eq!(
633            config.resolve_api_key(Some("flag-key")),
634            Some("flag-key".to_string())
635        );
636        // Fallback to config
637        assert_eq!(config.resolve_api_key(None), Some("config-key".to_string()));
638    }
639
640    #[test]
641    fn test_project_config_discover_file() {
642        let dir = tempfile::tempdir().unwrap();
643        let config_path = dir.path().join(".varpulis.toml");
644        std::fs::write(
645            &config_path,
646            r#"
647[remote]
648url = "http://test:9000"
649api_key = "test-key"
650"#,
651        )
652        .unwrap();
653
654        let config = ProjectConfig::discover(dir.path()).unwrap();
655        assert_eq!(config.remote.url.as_deref(), Some("http://test:9000"));
656        assert_eq!(config.remote.api_key.as_deref(), Some("test-key"));
657    }
658
659    #[test]
660    fn test_project_config_discover_parent() {
661        let dir = tempfile::tempdir().unwrap();
662        let config_path = dir.path().join(".varpulis.toml");
663        std::fs::write(
664            &config_path,
665            r#"
666[remote]
667url = "http://parent:9000"
668"#,
669        )
670        .unwrap();
671
672        // Create a subdirectory and discover from there
673        let subdir = dir.path().join("sub").join("dir");
674        std::fs::create_dir_all(&subdir).unwrap();
675
676        let config = ProjectConfig::discover(&subdir).unwrap();
677        assert_eq!(config.remote.url.as_deref(), Some("http://parent:9000"));
678    }
679
680    #[test]
681    fn test_project_config_discover_not_found() {
682        let dir = tempfile::tempdir().unwrap();
683        // No .varpulis.toml exists
684        let config = ProjectConfig::discover(dir.path());
685        assert!(config.is_none());
686    }
687
688    #[test]
689    fn test_project_config_example() {
690        let example = ProjectConfig::example();
691        assert!(example.contains("[remote]"));
692        assert!(example.contains("url ="));
693        assert!(example.contains("api_key ="));
694    }
695}