1use anyhow::Result;
2use serde::Deserialize;
3use std::path::Path;
4
5#[derive(Debug, Deserialize, Clone)]
6pub struct FuseRuleConfig {
7 pub engine: EngineConfig,
8 pub schema: Vec<FieldDef>,
9 pub rules: Vec<RuleConfig>,
10 pub agents: Vec<AgentConfig>,
11 #[serde(default)]
12 pub sources: Vec<SourceConfig>, }
14
15#[derive(Debug, Deserialize, Clone)]
16pub struct FieldDef {
17 pub name: String,
18 pub data_type: String, }
20
21#[derive(Debug, Deserialize, Clone)]
22pub struct EngineConfig {
23 pub persistence_path: String,
24 #[serde(default = "default_max_pending_batches")]
25 pub max_pending_batches: usize, #[serde(default = "default_agent_concurrency")]
27 pub agent_concurrency: usize, #[serde(default)]
29 pub ingest_rate_limit: Option<u32>, #[serde(default)]
31 pub api_keys: Vec<String>, }
33
34fn default_max_pending_batches() -> usize {
35 1000
36}
37
38fn default_agent_concurrency() -> usize {
39 10
40}
41
42#[derive(Debug, Deserialize, Clone)]
43pub struct RuleConfig {
44 pub id: String,
45 pub name: String,
46 pub predicate: String,
47 pub action: String,
48 pub window_seconds: Option<u64>,
49 #[serde(default = "default_version")]
50 pub version: u32,
51 #[serde(default = "default_enabled")]
52 pub enabled: bool,
53 #[serde(default)]
54 pub state_ttl_seconds: Option<u64>, }
56
57fn default_enabled() -> bool {
58 true
59}
60
61fn default_version() -> u32 {
62 1
63}
64
65#[derive(Debug, Deserialize, Clone)]
66pub struct AgentConfig {
67 pub name: String,
68 pub r#type: String, pub url: Option<String>,
70 pub template: Option<String>, }
72
73#[derive(Debug, Deserialize, Clone)]
74#[serde(tag = "type")]
75pub enum SourceConfig {
76 #[serde(rename = "kafka")]
77 Kafka {
78 brokers: Vec<String>,
79 topic: String,
80 group_id: String,
81 #[serde(default = "default_auto_commit")]
82 auto_commit: bool,
83 },
84 #[serde(rename = "websocket")]
85 WebSocket {
86 bind: String,
87 #[serde(default = "default_max_connections")]
88 max_connections: usize,
89 },
90}
91
92fn default_auto_commit() -> bool {
93 true
94}
95
96fn default_max_connections() -> usize {
97 1000
98}
99
100impl FuseRuleConfig {
101 pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
102 let settings = config::Config::builder()
103 .add_source(config::File::from(path.as_ref()))
104 .build()?;
105
106 let config: FuseRuleConfig = settings.try_deserialize()?;
107 config.validate()?;
108 Ok(config)
109 }
110
111 pub fn validate(&self) -> Result<()> {
112 let mut agent_names = std::collections::HashSet::new();
113 for agent in &self.agents {
114 if !agent_names.insert(&agent.name) {
115 anyhow::bail!("Duplicate agent name: {}", agent.name);
116 }
117 if agent.r#type == "webhook" && agent.url.is_none() {
118 anyhow::bail!("Webhook agent '{}' missing URL", agent.name);
119 }
120 }
121
122 let mut rule_ids = std::collections::HashSet::new();
123 for rule in &self.rules {
124 if !rule_ids.insert(&rule.id) {
125 anyhow::bail!("Duplicate rule ID: {}", rule.id);
126 }
127 if !agent_names.contains(&rule.action) {
128 anyhow::bail!(
129 "Rule '{}' references unknown agent '{}'",
130 rule.id,
131 rule.action
132 );
133 }
134 if rule.predicate.trim().is_empty() {
135 anyhow::bail!("Rule '{}' has an empty predicate", rule.id);
136 }
137 }
138
139 if self.schema.is_empty() {
140 anyhow::bail!("Configuration must define a schema");
141 }
142
143 Ok(())
144 }
145}