pub mod agent;
pub mod agent_queue;
pub mod cli;
pub mod config;
pub mod debugger;
pub mod evaluator;
pub mod ingestion;
pub mod metrics;
pub mod repl;
pub mod rule;
pub mod server;
pub mod state;
pub mod window;
use crate::agent::{Activation, Agent};
use crate::agent_queue::{AgentQueue, AgentTask, CircuitBreaker};
use crate::config::FuseRuleConfig;
use crate::evaluator::{CompiledRuleEdge, RuleEvaluator};
use crate::rule::Rule;
use crate::state::{PredicateResult, RuleTransition, StateStore};
use crate::window::WindowBuffer;
use anyhow::Result;
use arrow::record_batch::RecordBatch;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, serde::Serialize)]
pub struct EvaluationTrace {
pub rule_id: String,
pub rule_name: String,
pub rule_version: u32,
pub result: PredicateResult,
pub transition: String,
pub action_fired: bool,
pub agent_status: Option<String>,
}
pub struct RuleEngine {
evaluator: Box<dyn RuleEvaluator>,
state: Box<dyn StateStore>,
rules: Vec<CompiledRuleEdge>,
window_buffers: HashMap<String, WindowBuffer>,
agents: HashMap<String, Arc<dyn Agent>>,
schema: Arc<arrow::datatypes::Schema>,
agent_queue: Option<AgentQueue>,
circuit_breakers: HashMap<String, Arc<CircuitBreaker>>,
}
impl RuleEngine {
pub fn new(
evaluator: Box<dyn RuleEvaluator>,
state: Box<dyn StateStore>,
schema: Arc<arrow::datatypes::Schema>,
max_pending_batches: usize,
agent_concurrency: usize,
) -> Self {
let (agent_queue, worker) = AgentQueue::new(Some(max_pending_batches));
let worker = worker.with_concurrency(agent_concurrency);
let _worker_handle = tokio::spawn(async move {
worker.run().await;
});
Self {
evaluator,
state,
rules: Vec::new(),
window_buffers: HashMap::new(),
agents: HashMap::new(),
schema,
agent_queue: Some(agent_queue),
circuit_breakers: HashMap::new(),
}
}
pub fn get_or_create_circuit_breaker(&mut self, agent_name: &str) -> Arc<CircuitBreaker> {
self.circuit_breakers
.entry(agent_name.to_string())
.or_insert_with(|| {
Arc::new(CircuitBreaker::new(
5, std::time::Duration::from_secs(30),
))
})
.clone()
}
pub async fn from_config(config: FuseRuleConfig) -> Result<Self> {
let mut fields = Vec::new();
for f in config.schema {
let dt = match f.data_type.as_str() {
"int32" => arrow::datatypes::DataType::Int32,
"int64" => arrow::datatypes::DataType::Int64,
"float32" => arrow::datatypes::DataType::Float32,
"float64" => arrow::datatypes::DataType::Float64,
"bool" => arrow::datatypes::DataType::Boolean,
"utf8" | "string" => arrow::datatypes::DataType::Utf8,
_ => arrow::datatypes::DataType::Utf8,
};
fields.push(arrow::datatypes::Field::new(f.name, dt, true));
}
let schema = Arc::new(arrow::datatypes::Schema::new(fields));
let evaluator = Box::new(crate::evaluator::DataFusionEvaluator::new());
let state_store = crate::state::SledStateStore::new(&config.engine.persistence_path)?;
let mut rules_ttl = std::collections::HashMap::new();
for rule in &config.rules {
if let Some(ttl) = rule.state_ttl_seconds {
rules_ttl.insert(rule.id.clone(), ttl);
}
}
if !rules_ttl.is_empty() {
state_store.start_cleanup_task(rules_ttl);
}
let state = Box::new(state_store);
let max_pending = config.engine.max_pending_batches;
let agent_concurrency = config.engine.agent_concurrency;
let mut engine = Self::new(
evaluator,
state,
Arc::clone(&schema),
max_pending,
agent_concurrency,
);
for agent_cfg in config.agents {
match agent_cfg.r#type.as_str() {
"logger" => {
engine.add_agent(agent_cfg.name, Arc::new(crate::agent::LoggerAgent));
}
"webhook" => {
if let Some(url) = agent_cfg.url {
engine.add_agent(
agent_cfg.name,
Arc::new(crate::agent::WebhookAgent::new(
url,
agent_cfg.template.clone(),
)),
);
}
}
_ => println!("Warning: Unknown agent type '{}'", agent_cfg.r#type),
}
}
for r_cfg in config.rules {
engine
.add_rule(Rule {
id: r_cfg.id,
name: r_cfg.name,
predicate: r_cfg.predicate,
action: r_cfg.action,
window_seconds: r_cfg.window_seconds,
version: r_cfg.version,
enabled: r_cfg.enabled,
})
.await?;
}
Ok(engine)
}
pub async fn reload_from_config(&mut self, config: FuseRuleConfig) -> Result<()> {
info!("🔄 Reloading engine configuration...");
let mut new_agents = HashMap::new();
let mut new_circuit_breakers = HashMap::new();
for agent_cfg in config.agents {
match agent_cfg.r#type.as_str() {
"logger" => {
new_agents.insert(
agent_cfg.name.clone(),
Arc::new(crate::agent::LoggerAgent) as Arc<dyn Agent>,
);
}
"webhook" => {
if let Some(url) = agent_cfg.url {
let agent_name = agent_cfg.name.clone();
new_agents.insert(
agent_name.clone(),
Arc::new(crate::agent::WebhookAgent::new(
url,
agent_cfg.template.clone(),
)) as Arc<dyn Agent>,
);
new_circuit_breakers.insert(
agent_name,
Arc::new(CircuitBreaker::new(5, std::time::Duration::from_secs(30))),
);
}
}
_ => warn!("Unknown agent type '{}' during reload", agent_cfg.r#type),
}
}
self.agents = new_agents;
self.circuit_breakers = new_circuit_breakers;
let mut new_rules = Vec::new();
let mut new_window_buffers = HashMap::new();
for r_cfg in config.rules {
let rule = Rule {
id: r_cfg.id,
name: r_cfg.name,
predicate: r_cfg.predicate,
action: r_cfg.action,
window_seconds: r_cfg.window_seconds,
version: r_cfg.version,
enabled: r_cfg.enabled,
};
if let Some(secs) = rule.window_seconds {
if let Some(existing_buffer) = self.window_buffers.remove(&rule.id) {
new_window_buffers.insert(rule.id.clone(), existing_buffer);
} else {
new_window_buffers.insert(rule.id.clone(), WindowBuffer::new(secs));
}
}
let compiled = self.evaluator.compile(rule, &self.schema)?;
new_rules.push(compiled);
}
self.rules = new_rules;
self.window_buffers = new_window_buffers;
info!(
"✅ Engine reloaded: {} rules, {} agents",
self.rules.len(),
self.agents.len()
);
Ok(())
}
pub fn schema(&self) -> Arc<arrow::datatypes::Schema> {
Arc::clone(&self.schema)
}
pub fn add_agent(&mut self, name: String, agent: Arc<dyn Agent>) {
self.agents.insert(name, agent);
}
pub async fn add_rule(&mut self, rule: Rule) -> Result<()> {
if let Some(secs) = rule.window_seconds {
self.window_buffers
.insert(rule.id.clone(), WindowBuffer::new(secs));
}
let compiled = self.evaluator.compile(rule, &self.schema)?;
self.rules.push(compiled);
Ok(())
}
pub async fn update_rule(&mut self, rule_id: &str, new_rule: Rule) -> Result<()> {
let rule_idx = self.rules.iter().position(|r| r.rule.id == rule_id);
if rule_idx.is_none() {
anyhow::bail!("Rule not found: {}", rule_id);
}
let rule_idx = rule_idx.unwrap();
let old_rule = &self.rules[rule_idx].rule;
let preserve_buffer = old_rule.window_seconds == new_rule.window_seconds;
let existing_buffer = if preserve_buffer {
self.window_buffers.remove(rule_id)
} else {
None
};
let compiled = self.evaluator.compile(new_rule, &self.schema)?;
self.rules[rule_idx] = compiled;
if let Some(buffer) = existing_buffer {
self.window_buffers.insert(rule_id.to_string(), buffer);
} else if let Some(secs) = self.rules[rule_idx].rule.window_seconds {
self.window_buffers
.insert(rule_id.to_string(), WindowBuffer::new(secs));
}
Ok(())
}
pub async fn toggle_rule(&mut self, rule_id: &str, enabled: bool) -> Result<()> {
let rule_idx = self.rules.iter().position(|r| r.rule.id == rule_id);
if let Some(idx) = rule_idx {
self.rules[idx].rule.enabled = enabled;
Ok(())
} else {
anyhow::bail!("Rule not found: {}", rule_id)
}
}
pub async fn process_batch(&mut self, batch: &RecordBatch) -> Result<Vec<EvaluationTrace>> {
let _start = Instant::now();
let mut windowed_data = Vec::with_capacity(self.rules.len());
for rule in &self.rules {
if let Some(buffer) = self.window_buffers.get(&rule.rule.id) {
windowed_data.push(buffer.get_batches());
} else {
windowed_data.push(vec![]);
}
}
let mut enabled_indices = Vec::new();
let mut enabled_compiled_rules = Vec::new();
let mut enabled_window_data = Vec::new();
for (i, rule) in self.rules.iter().enumerate() {
if rule.rule.enabled {
enabled_indices.push(i);
enabled_compiled_rules.push(rule.clone());
enabled_window_data.push(windowed_data.get(i).cloned().unwrap_or_default());
}
}
let evaluation_start = Instant::now();
let results_with_context = match self
.evaluator
.evaluate_batch(batch, &enabled_compiled_rules, &enabled_window_data)
.await
{
Ok(results) => {
let eval_duration = evaluation_start.elapsed();
crate::metrics::METRICS.record_evaluation_duration(eval_duration.as_secs_f64());
results
}
Err(e) => {
error!("Rule evaluation error: {}", e);
crate::metrics::METRICS.record_evaluation_error();
return Err(e);
}
};
crate::metrics::METRICS
.batches_processed
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut traces = Vec::new();
let enabled_count = enabled_indices.len();
let mut enabled_results_iter = results_with_context.into_iter();
let mut enabled_idx_iter = enabled_indices.into_iter();
let eval_duration = evaluation_start.elapsed();
let per_rule_duration = if enabled_count > 0 {
eval_duration.as_secs_f64() / enabled_count as f64
} else {
0.0
};
for rule in self.rules.iter() {
if rule.rule.enabled {
let original_idx = enabled_idx_iter.next().unwrap();
let (result, context) = enabled_results_iter.next().unwrap();
let rule = &self.rules[original_idx].rule;
crate::metrics::METRICS.record_rule_evaluation(&rule.id);
crate::metrics::METRICS
.record_rule_evaluation_duration(&rule.id, per_rule_duration);
let transition = self.state.update_result(&rule.id, result).await?;
if transition != RuleTransition::None {
info!(
"Rule '{}' ({} v{}): {:?} -> {:?}",
rule.name, rule.id, rule.version, result, transition
);
}
let mut agent_status = None;
let mut action_fired = false;
if let RuleTransition::Activated = transition {
action_fired = true;
crate::metrics::METRICS
.activations_total
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
crate::metrics::METRICS.record_rule_activation(&rule.id);
let activation = Activation {
rule_id: rule.id.clone(),
rule_name: rule.name.clone(),
action: rule.action.clone(),
context,
};
if let Some(agent_queue) = &self.agent_queue {
if let Some(agent) = self.agents.get(&rule.action) {
let circuit_breaker = self.circuit_breakers.get(&rule.action).cloned();
let dlq_sender = Some(agent_queue.dlq_sender.clone());
let task = AgentTask::new(
activation,
agent.clone(),
3, circuit_breaker,
dlq_sender,
);
if let Err(e) = agent_queue.enqueue(task).await {
error!("Failed to enqueue agent task: {}", e);
agent_status = Some(format!("enqueue_failed: {}", e));
crate::metrics::METRICS
.agent_failures
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
} else {
agent_status = Some("queued".to_string());
}
} else {
agent_status = Some("agent_not_found".to_string());
crate::metrics::METRICS
.agent_failures
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
} else {
if let Some(agent) = self.agents.get(&rule.action) {
match agent.execute(&activation).await {
Ok(_) => {
debug!(
"Agent '{}' executed successfully for rule '{}'",
rule.action, rule.id
);
agent_status = Some("success".to_string());
}
Err(e) => {
error!("Error executing agent '{}': {}", rule.action, e);
agent_status = Some(format!("failed: {}", e));
crate::metrics::METRICS
.agent_failures
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
} else {
agent_status = Some("agent_not_found".to_string());
crate::metrics::METRICS
.agent_failures
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
}
traces.push(EvaluationTrace {
rule_id: rule.id.clone(),
rule_name: rule.name.clone(),
rule_version: rule.version,
result,
transition: match transition {
RuleTransition::None => "None".to_string(),
RuleTransition::Activated => "Activated".to_string(),
RuleTransition::Deactivated => {
crate::metrics::METRICS.record_deactivation();
"Deactivated".to_string()
}
},
action_fired,
agent_status,
});
if let Some(buffer) = self.window_buffers.get_mut(&rule.id) {
buffer.add_batch(batch.clone());
}
} else {
let rule = &rule.rule;
let last_result = self
.state
.get_last_result(&rule.id)
.await
.unwrap_or(PredicateResult::False);
traces.push(EvaluationTrace {
rule_id: rule.id.clone(),
rule_name: rule.name.clone(),
rule_version: rule.version,
result: last_result,
transition: "None".to_string(),
action_fired: false,
agent_status: Some("rule_disabled".to_string()),
});
}
}
Ok(traces)
}
}