fuse_rule/
config.rs

1use anyhow::Result;
2use serde::Deserialize;
3use std::path::Path;
4
5#[derive(Debug, Deserialize, Clone)]
6pub struct FuseRuleConfig {
7    pub engine: EngineConfig,
8    pub schema: Vec<FieldDef>,
9    pub rules: Vec<RuleConfig>,
10    pub agents: Vec<AgentConfig>,
11    #[serde(default)]
12    pub sources: Vec<SourceConfig>, // Data ingestion sources
13}
14
15#[derive(Debug, Deserialize, Clone)]
16pub struct FieldDef {
17    pub name: String,
18    pub data_type: String, // "int32", "float64", "utf8", "bool"
19}
20
21#[derive(Debug, Deserialize, Clone)]
22pub struct EngineConfig {
23    pub persistence_path: String,
24    #[serde(default = "default_max_pending_batches")]
25    pub max_pending_batches: usize, // Backpressure: max batches queued before blocking
26    #[serde(default = "default_agent_concurrency")]
27    pub agent_concurrency: usize, // Number of concurrent agent workers
28    #[serde(default)]
29    pub ingest_rate_limit: Option<u32>, // Rate limit: requests per second (None = unlimited)
30    #[serde(default)]
31    pub api_keys: Vec<String>, // API keys from config file
32}
33
34fn default_max_pending_batches() -> usize {
35    1000
36}
37
38fn default_agent_concurrency() -> usize {
39    10
40}
41
42#[derive(Debug, Deserialize, Clone)]
43pub struct RuleConfig {
44    pub id: String,
45    pub name: String,
46    pub predicate: String,
47    pub action: String,
48    pub window_seconds: Option<u64>,
49    #[serde(default = "default_version")]
50    pub version: u32,
51    #[serde(default = "default_enabled")]
52    pub enabled: bool,
53    #[serde(default)]
54    pub state_ttl_seconds: Option<u64>, // TTL for rule state (None = never expire)
55}
56
57fn default_enabled() -> bool {
58    true
59}
60
61fn default_version() -> u32 {
62    1
63}
64
65#[derive(Debug, Deserialize, Clone)]
66pub struct AgentConfig {
67    pub name: String,
68    pub r#type: String, // "logger", "webhook", etc.
69    pub url: Option<String>,
70    pub template: Option<String>, // Handlebars template for webhook payloads
71}
72
73#[derive(Debug, Deserialize, Clone)]
74#[serde(tag = "type")]
75pub enum SourceConfig {
76    #[serde(rename = "kafka")]
77    Kafka {
78        brokers: Vec<String>,
79        topic: String,
80        group_id: String,
81        #[serde(default = "default_auto_commit")]
82        auto_commit: bool,
83    },
84    #[serde(rename = "websocket")]
85    WebSocket {
86        bind: String,
87        #[serde(default = "default_max_connections")]
88        max_connections: usize,
89    },
90}
91
92fn default_auto_commit() -> bool {
93    true
94}
95
96fn default_max_connections() -> usize {
97    1000
98}
99
100impl FuseRuleConfig {
101    pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
102        let settings = config::Config::builder()
103            .add_source(config::File::from(path.as_ref()))
104            .build()?;
105
106        let config: FuseRuleConfig = settings.try_deserialize()?;
107        config.validate()?;
108        Ok(config)
109    }
110
111    pub fn validate(&self) -> Result<()> {
112        let mut agent_names = std::collections::HashSet::new();
113        for agent in &self.agents {
114            if !agent_names.insert(&agent.name) {
115                anyhow::bail!("Duplicate agent name: {}", agent.name);
116            }
117            if agent.r#type == "webhook" && agent.url.is_none() {
118                anyhow::bail!("Webhook agent '{}' missing URL", agent.name);
119            }
120        }
121
122        let mut rule_ids = std::collections::HashSet::new();
123        for rule in &self.rules {
124            if !rule_ids.insert(&rule.id) {
125                anyhow::bail!("Duplicate rule ID: {}", rule.id);
126            }
127            if !agent_names.contains(&rule.action) {
128                anyhow::bail!(
129                    "Rule '{}' references unknown agent '{}'",
130                    rule.id,
131                    rule.action
132                );
133            }
134            if rule.predicate.trim().is_empty() {
135                anyhow::bail!("Rule '{}' has an empty predicate", rule.id);
136            }
137        }
138
139        if self.schema.is_empty() {
140            anyhow::bail!("Configuration must define a schema");
141        }
142
143        Ok(())
144    }
145}