1pub mod agent;
28pub mod agent_queue;
29pub mod cli;
30pub mod config;
31pub mod debugger;
32pub mod evaluator;
33pub mod ingestion;
34pub mod metrics;
35pub mod repl;
36pub mod rule;
37pub mod server;
38pub mod state;
39pub mod window;
40
41use crate::agent::{Activation, Agent};
42use crate::agent_queue::{AgentQueue, AgentTask, CircuitBreaker};
43use crate::config::FuseRuleConfig;
44use crate::evaluator::{CompiledRuleEdge, RuleEvaluator};
45use crate::rule::Rule;
46use crate::state::{PredicateResult, RuleTransition, StateStore};
47use crate::window::WindowBuffer;
48use anyhow::Result;
49use arrow::record_batch::RecordBatch;
50use std::collections::HashMap;
51use std::sync::Arc;
52use std::time::Instant;
53use tracing::{debug, error, info, warn};
54
55#[derive(Debug, Clone, serde::Serialize)]
85pub struct EvaluationTrace {
86 pub rule_id: String,
88 pub rule_name: String,
90 pub rule_version: u32,
92 pub result: PredicateResult,
94 pub transition: String,
96 pub action_fired: bool,
98 pub agent_status: Option<String>,
100}
101
102pub struct RuleEngine {
134 evaluator: Box<dyn RuleEvaluator>,
135 state: Box<dyn StateStore>,
136 rules: Vec<CompiledRuleEdge>,
137 window_buffers: HashMap<String, WindowBuffer>,
138 agents: HashMap<String, Arc<dyn Agent>>,
139 schema: Arc<arrow::datatypes::Schema>,
140 agent_queue: Option<AgentQueue>,
141 circuit_breakers: HashMap<String, Arc<CircuitBreaker>>,
142}
143
144impl RuleEngine {
145 pub fn new(
146 evaluator: Box<dyn RuleEvaluator>,
147 state: Box<dyn StateStore>,
148 schema: Arc<arrow::datatypes::Schema>,
149 max_pending_batches: usize,
150 agent_concurrency: usize,
151 ) -> Self {
152 let (agent_queue, worker) = AgentQueue::new(Some(max_pending_batches));
154 let worker = worker.with_concurrency(agent_concurrency);
155 let _worker_handle = tokio::spawn(async move {
157 worker.run().await;
158 });
159 Self {
162 evaluator,
163 state,
164 rules: Vec::new(),
165 window_buffers: HashMap::new(),
166 agents: HashMap::new(),
167 schema,
168 agent_queue: Some(agent_queue),
169 circuit_breakers: HashMap::new(),
170 }
171 }
172
173 pub fn get_or_create_circuit_breaker(&mut self, agent_name: &str) -> Arc<CircuitBreaker> {
174 self.circuit_breakers
175 .entry(agent_name.to_string())
176 .or_insert_with(|| {
177 Arc::new(CircuitBreaker::new(
178 5, std::time::Duration::from_secs(30),
180 ))
181 })
182 .clone()
183 }
184
185 pub async fn from_config(config: FuseRuleConfig) -> Result<Self> {
186 let mut fields = Vec::new();
188 for f in config.schema {
189 let dt = match f.data_type.as_str() {
190 "int32" => arrow::datatypes::DataType::Int32,
191 "int64" => arrow::datatypes::DataType::Int64,
192 "float32" => arrow::datatypes::DataType::Float32,
193 "float64" => arrow::datatypes::DataType::Float64,
194 "bool" => arrow::datatypes::DataType::Boolean,
195 "utf8" | "string" => arrow::datatypes::DataType::Utf8,
196 _ => arrow::datatypes::DataType::Utf8,
197 };
198 fields.push(arrow::datatypes::Field::new(f.name, dt, true));
199 }
200 let schema = Arc::new(arrow::datatypes::Schema::new(fields));
201
202 let evaluator = Box::new(crate::evaluator::DataFusionEvaluator::new());
204 let state_store = crate::state::SledStateStore::new(&config.engine.persistence_path)?;
205
206 let mut rules_ttl = std::collections::HashMap::new();
208 for rule in &config.rules {
209 if let Some(ttl) = rule.state_ttl_seconds {
210 rules_ttl.insert(rule.id.clone(), ttl);
211 }
212 }
213 if !rules_ttl.is_empty() {
214 state_store.start_cleanup_task(rules_ttl);
215 }
216
217 let state = Box::new(state_store);
218
219 let max_pending = config.engine.max_pending_batches;
220 let agent_concurrency = config.engine.agent_concurrency;
221 let mut engine = Self::new(
222 evaluator,
223 state,
224 Arc::clone(&schema),
225 max_pending,
226 agent_concurrency,
227 );
228
229 for agent_cfg in config.agents {
231 match agent_cfg.r#type.as_str() {
232 "logger" => {
233 engine.add_agent(agent_cfg.name, Arc::new(crate::agent::LoggerAgent));
234 }
235 "webhook" => {
236 if let Some(url) = agent_cfg.url {
237 engine.add_agent(
238 agent_cfg.name,
239 Arc::new(crate::agent::WebhookAgent::new(
240 url,
241 agent_cfg.template.clone(),
242 )),
243 );
244 }
245 }
246 _ => println!("Warning: Unknown agent type '{}'", agent_cfg.r#type),
247 }
248 }
249
250 for r_cfg in config.rules {
252 engine
253 .add_rule(Rule {
254 id: r_cfg.id,
255 name: r_cfg.name,
256 predicate: r_cfg.predicate,
257 action: r_cfg.action,
258 window_seconds: r_cfg.window_seconds,
259 version: r_cfg.version,
260 enabled: r_cfg.enabled,
261 })
262 .await?;
263 }
264
265 Ok(engine)
266 }
267
268 pub async fn reload_from_config(&mut self, config: FuseRuleConfig) -> Result<()> {
269 info!("🔄 Reloading engine configuration...");
270
271 let mut new_agents = HashMap::new();
273 let mut new_circuit_breakers = HashMap::new();
274 for agent_cfg in config.agents {
275 match agent_cfg.r#type.as_str() {
276 "logger" => {
277 new_agents.insert(
278 agent_cfg.name.clone(),
279 Arc::new(crate::agent::LoggerAgent) as Arc<dyn Agent>,
280 );
281 }
282 "webhook" => {
283 if let Some(url) = agent_cfg.url {
284 let agent_name = agent_cfg.name.clone();
285 new_agents.insert(
286 agent_name.clone(),
287 Arc::new(crate::agent::WebhookAgent::new(
288 url,
289 agent_cfg.template.clone(),
290 )) as Arc<dyn Agent>,
291 );
292 new_circuit_breakers.insert(
294 agent_name,
295 Arc::new(CircuitBreaker::new(5, std::time::Duration::from_secs(30))),
296 );
297 }
298 }
299 _ => warn!("Unknown agent type '{}' during reload", agent_cfg.r#type),
300 }
301 }
302 self.agents = new_agents;
303 self.circuit_breakers = new_circuit_breakers;
304
305 let mut new_rules = Vec::new();
307 let mut new_window_buffers = HashMap::new();
308
309 for r_cfg in config.rules {
310 let rule = Rule {
311 id: r_cfg.id,
312 name: r_cfg.name,
313 predicate: r_cfg.predicate,
314 action: r_cfg.action,
315 window_seconds: r_cfg.window_seconds,
316 version: r_cfg.version,
317 enabled: r_cfg.enabled,
318 };
319
320 if let Some(secs) = rule.window_seconds {
322 if let Some(existing_buffer) = self.window_buffers.remove(&rule.id) {
323 new_window_buffers.insert(rule.id.clone(), existing_buffer);
324 } else {
325 new_window_buffers.insert(rule.id.clone(), WindowBuffer::new(secs));
326 }
327 }
328
329 let compiled = self.evaluator.compile(rule, &self.schema)?;
330 new_rules.push(compiled);
331 }
332
333 self.rules = new_rules;
334 self.window_buffers = new_window_buffers;
335
336 info!(
337 "✅ Engine reloaded: {} rules, {} agents",
338 self.rules.len(),
339 self.agents.len()
340 );
341 Ok(())
342 }
343
344 pub fn schema(&self) -> Arc<arrow::datatypes::Schema> {
345 Arc::clone(&self.schema)
346 }
347
348 pub fn add_agent(&mut self, name: String, agent: Arc<dyn Agent>) {
349 self.agents.insert(name, agent);
350 }
351
352 pub async fn add_rule(&mut self, rule: Rule) -> Result<()> {
353 if let Some(secs) = rule.window_seconds {
354 self.window_buffers
355 .insert(rule.id.clone(), WindowBuffer::new(secs));
356 }
357 let compiled = self.evaluator.compile(rule, &self.schema)?;
358 self.rules.push(compiled);
359 Ok(())
360 }
361
362 pub async fn update_rule(&mut self, rule_id: &str, new_rule: Rule) -> Result<()> {
363 let rule_idx = self.rules.iter().position(|r| r.rule.id == rule_id);
365 if rule_idx.is_none() {
366 anyhow::bail!("Rule not found: {}", rule_id);
367 }
368
369 let rule_idx = rule_idx.unwrap();
370 let old_rule = &self.rules[rule_idx].rule;
371
372 let preserve_buffer = old_rule.window_seconds == new_rule.window_seconds;
374 let existing_buffer = if preserve_buffer {
375 self.window_buffers.remove(rule_id)
376 } else {
377 None
378 };
379
380 let compiled = self.evaluator.compile(new_rule, &self.schema)?;
382
383 self.rules[rule_idx] = compiled;
385
386 if let Some(buffer) = existing_buffer {
388 self.window_buffers.insert(rule_id.to_string(), buffer);
389 } else if let Some(secs) = self.rules[rule_idx].rule.window_seconds {
390 self.window_buffers
391 .insert(rule_id.to_string(), WindowBuffer::new(secs));
392 }
393
394 Ok(())
395 }
396
397 pub async fn toggle_rule(&mut self, rule_id: &str, enabled: bool) -> Result<()> {
398 let rule_idx = self.rules.iter().position(|r| r.rule.id == rule_id);
399 if let Some(idx) = rule_idx {
400 self.rules[idx].rule.enabled = enabled;
401 Ok(())
402 } else {
403 anyhow::bail!("Rule not found: {}", rule_id)
404 }
405 }
406
407 pub async fn process_batch(&mut self, batch: &RecordBatch) -> Result<Vec<EvaluationTrace>> {
408 let _start = Instant::now();
409
410 let mut windowed_data = Vec::with_capacity(self.rules.len());
411 for rule in &self.rules {
412 if let Some(buffer) = self.window_buffers.get(&rule.rule.id) {
413 windowed_data.push(buffer.get_batches());
414 } else {
415 windowed_data.push(vec![]);
416 }
417 }
418
419 let mut enabled_indices = Vec::new();
421 let mut enabled_compiled_rules = Vec::new();
422 let mut enabled_window_data = Vec::new();
423
424 for (i, rule) in self.rules.iter().enumerate() {
425 if rule.rule.enabled {
426 enabled_indices.push(i);
427 enabled_compiled_rules.push(rule.clone());
428 enabled_window_data.push(windowed_data.get(i).cloned().unwrap_or_default());
429 }
430 }
431
432 let evaluation_start = Instant::now();
434 let results_with_context = match self
435 .evaluator
436 .evaluate_batch(batch, &enabled_compiled_rules, &enabled_window_data)
437 .await
438 {
439 Ok(results) => {
440 let eval_duration = evaluation_start.elapsed();
441 crate::metrics::METRICS.record_evaluation_duration(eval_duration.as_secs_f64());
442 results
443 }
444 Err(e) => {
445 error!("Rule evaluation error: {}", e);
446 crate::metrics::METRICS.record_evaluation_error();
447 return Err(e);
448 }
449 };
450 crate::metrics::METRICS
451 .batches_processed
452 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
453 let mut traces = Vec::new();
454
455 let enabled_count = enabled_indices.len();
457 let mut enabled_results_iter = results_with_context.into_iter();
458 let mut enabled_idx_iter = enabled_indices.into_iter();
459
460 let eval_duration = evaluation_start.elapsed();
463 let per_rule_duration = if enabled_count > 0 {
464 eval_duration.as_secs_f64() / enabled_count as f64
465 } else {
466 0.0
467 };
468
469 for rule in self.rules.iter() {
470 if rule.rule.enabled {
471 let original_idx = enabled_idx_iter.next().unwrap();
472 let (result, context) = enabled_results_iter.next().unwrap();
473 let rule = &self.rules[original_idx].rule;
474
475 crate::metrics::METRICS.record_rule_evaluation(&rule.id);
477 crate::metrics::METRICS
479 .record_rule_evaluation_duration(&rule.id, per_rule_duration);
480
481 let transition = self.state.update_result(&rule.id, result).await?;
482
483 if transition != RuleTransition::None {
484 info!(
485 "Rule '{}' ({} v{}): {:?} -> {:?}",
486 rule.name, rule.id, rule.version, result, transition
487 );
488 }
489
490 let mut agent_status = None;
491 let mut action_fired = false;
492
493 if let RuleTransition::Activated = transition {
494 action_fired = true;
495 crate::metrics::METRICS
496 .activations_total
497 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
498 crate::metrics::METRICS.record_rule_activation(&rule.id);
499 let activation = Activation {
500 rule_id: rule.id.clone(),
501 rule_name: rule.name.clone(),
502 action: rule.action.clone(),
503 context,
504 };
505
506 if let Some(agent_queue) = &self.agent_queue {
508 if let Some(agent) = self.agents.get(&rule.action) {
509 let circuit_breaker = self.circuit_breakers.get(&rule.action).cloned();
510
511 let dlq_sender = Some(agent_queue.dlq_sender.clone());
513
514 let task = AgentTask::new(
515 activation,
516 agent.clone(),
517 3, circuit_breaker,
519 dlq_sender,
520 );
521
522 if let Err(e) = agent_queue.enqueue(task).await {
523 error!("Failed to enqueue agent task: {}", e);
524 agent_status = Some(format!("enqueue_failed: {}", e));
525 crate::metrics::METRICS
526 .agent_failures
527 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
528 } else {
529 agent_status = Some("queued".to_string());
530 }
531 } else {
532 agent_status = Some("agent_not_found".to_string());
533 crate::metrics::METRICS
534 .agent_failures
535 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
536 }
537 } else {
538 if let Some(agent) = self.agents.get(&rule.action) {
540 match agent.execute(&activation).await {
541 Ok(_) => {
542 debug!(
543 "Agent '{}' executed successfully for rule '{}'",
544 rule.action, rule.id
545 );
546 agent_status = Some("success".to_string());
547 }
548 Err(e) => {
549 error!("Error executing agent '{}': {}", rule.action, e);
550 agent_status = Some(format!("failed: {}", e));
551 crate::metrics::METRICS
552 .agent_failures
553 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
554 }
555 }
556 } else {
557 agent_status = Some("agent_not_found".to_string());
558 crate::metrics::METRICS
559 .agent_failures
560 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
561 }
562 }
563 }
564
565 traces.push(EvaluationTrace {
566 rule_id: rule.id.clone(),
567 rule_name: rule.name.clone(),
568 rule_version: rule.version,
569 result,
570 transition: match transition {
571 RuleTransition::None => "None".to_string(),
572 RuleTransition::Activated => "Activated".to_string(),
573 RuleTransition::Deactivated => {
574 crate::metrics::METRICS.record_deactivation();
575 "Deactivated".to_string()
576 }
577 },
578 action_fired,
579 agent_status,
580 });
581
582 if let Some(buffer) = self.window_buffers.get_mut(&rule.id) {
583 buffer.add_batch(batch.clone());
584 }
585 } else {
586 let rule = &rule.rule;
588 let last_result = self
589 .state
590 .get_last_result(&rule.id)
591 .await
592 .unwrap_or(PredicateResult::False);
593 traces.push(EvaluationTrace {
594 rule_id: rule.id.clone(),
595 rule_name: rule.name.clone(),
596 rule_version: rule.version,
597 result: last_result,
598 transition: "None".to_string(),
599 action_fired: false,
600 agent_status: Some("rule_disabled".to_string()),
601 });
602 }
603 }
604
605 Ok(traces)
606 }
607}