fuse_rule/
lib.rs

1//! # FuseRule - High-Performance Rule Engine
2//!
3//! FuseRule is a high-performance, Arrow-native Complex Event Processing (CEP) engine
4//! with SQL-powered rules for real-time data auditing and event processing.
5//!
6//! ## Quick Start
7//!
8//! ```no_run
9//! use fuse_rule::{RuleEngine, config::FuseRuleConfig};
10//!
11//! # async fn example() -> anyhow::Result<()> {
12//! let config = FuseRuleConfig::from_file("fuse_rule_config.yaml")?;
13//! let mut engine = RuleEngine::from_config(config).await?;
14//! // Process batches and evaluate rules...
15//! # Ok(())
16//! # }
17//! ```
18//!
19//! ## Features
20//!
21//! - **Arrow-Native**: Zero-copy columnar data processing
22//! - **SQL-Powered Rules**: Write predicates using standard SQL
23//! - **Stateful Transitions**: Track rule activation/deactivation
24//! - **Time Windows**: Sliding windows for aggregate functions
25//! - **Pluggable Architecture**: Custom state stores, evaluators, and agents
26
27pub mod agent;
28pub mod agent_queue;
29pub mod cli;
30pub mod config;
31pub mod debugger;
32pub mod evaluator;
33pub mod ingestion;
34pub mod metrics;
35pub mod repl;
36pub mod rule;
37pub mod server;
38pub mod state;
39pub mod window;
40
41use crate::agent::{Activation, Agent};
42use crate::agent_queue::{AgentQueue, AgentTask, CircuitBreaker};
43use crate::config::FuseRuleConfig;
44use crate::evaluator::{CompiledRuleEdge, RuleEvaluator};
45use crate::rule::Rule;
46use crate::state::{PredicateResult, RuleTransition, StateStore};
47use crate::window::WindowBuffer;
48use anyhow::Result;
49use arrow::record_batch::RecordBatch;
50use std::collections::HashMap;
51use std::sync::Arc;
52use std::time::Instant;
53use tracing::{debug, error, info, warn};
54
55/// A trace of what happened during an evaluation batch.
56///
57/// This provides observability into rule evaluation, showing which rules
58/// were evaluated, their results, and whether actions were fired.
59///
60/// # Example
61///
62/// ```no_run
63/// # use fuse_rule::RuleEngine;
64/// # use arrow::array::Float64Array;
65/// # use arrow::datatypes::{DataType, Field, Schema};
66/// # use arrow::record_batch::RecordBatch;
67/// # use std::sync::Arc;
68/// # async fn example(engine: &mut RuleEngine) -> anyhow::Result<()> {
69/// // Create a test batch
70/// let schema = Schema::new(vec![Field::new("price", DataType::Float64, true)]);
71/// let batch = RecordBatch::try_new(
72///     Arc::new(schema),
73///     vec![Arc::new(Float64Array::from(vec![150.0, 50.0]))],
74/// )?;
75/// let traces = engine.process_batch(&batch).await?;
76/// for trace in traces {
77///     if trace.action_fired {
78///         println!("Rule '{}' activated!", trace.rule_name);
79///     }
80/// }
81/// # Ok(())
82/// # }
83/// ```
84#[derive(Debug, Clone, serde::Serialize)]
85pub struct EvaluationTrace {
86    /// Unique identifier for the rule
87    pub rule_id: String,
88    /// Human-readable rule name
89    pub rule_name: String,
90    /// Rule version number
91    pub rule_version: u32,
92    /// Result of predicate evaluation
93    pub result: PredicateResult,
94    /// State transition: "None", "Activated", or "Deactivated"
95    pub transition: String,
96    /// Whether the action/agent was fired
97    pub action_fired: bool,
98    /// Optional status message from the agent
99    pub agent_status: Option<String>,
100}
101
102/// The core rule engine that evaluates rules against data batches.
103///
104/// `RuleEngine` is the main entry point for using FuseRule. It manages
105/// rules, state, agents, and provides methods to process data batches.
106///
107/// # Example
108///
109/// ```no_run
110/// use fuse_rule::{RuleEngine, config::FuseRuleConfig};
111/// use arrow::array::Float64Array;
112/// use arrow::datatypes::{DataType, Field, Schema};
113/// use arrow::record_batch::RecordBatch;
114/// use std::sync::Arc;
115///
116/// # async fn example() -> anyhow::Result<()> {
117/// // Create engine from configuration
118/// let config = FuseRuleConfig::from_file("config.yaml")?;
119/// let mut engine = RuleEngine::from_config(config).await?;
120///
121/// // Create a test batch
122/// let schema = Schema::new(vec![Field::new("price", DataType::Float64, true)]);
123/// let batch = RecordBatch::try_new(
124///     Arc::new(schema),
125///     vec![Arc::new(Float64Array::from(vec![150.0, 50.0]))],
126/// )?;
127///
128/// // Process a batch of data
129/// let traces = engine.process_batch(&batch).await?;
130/// # Ok(())
131/// # }
132/// ```
133pub struct RuleEngine {
134    evaluator: Box<dyn RuleEvaluator>,
135    state: Box<dyn StateStore>,
136    rules: Vec<CompiledRuleEdge>,
137    window_buffers: HashMap<String, WindowBuffer>,
138    agents: HashMap<String, Arc<dyn Agent>>,
139    schema: Arc<arrow::datatypes::Schema>,
140    agent_queue: Option<AgentQueue>,
141    circuit_breakers: HashMap<String, Arc<CircuitBreaker>>,
142}
143
144impl RuleEngine {
145    pub fn new(
146        evaluator: Box<dyn RuleEvaluator>,
147        state: Box<dyn StateStore>,
148        schema: Arc<arrow::datatypes::Schema>,
149        max_pending_batches: usize,
150        agent_concurrency: usize,
151    ) -> Self {
152        // Create agent queue with bounded channel for backpressure
153        let (agent_queue, worker) = AgentQueue::new(Some(max_pending_batches));
154        let worker = worker.with_concurrency(agent_concurrency);
155        // Spawn worker in background
156        let _worker_handle = tokio::spawn(async move {
157            worker.run().await;
158        });
159        // Note: worker_handle is dropped but task continues running
160
161        Self {
162            evaluator,
163            state,
164            rules: Vec::new(),
165            window_buffers: HashMap::new(),
166            agents: HashMap::new(),
167            schema,
168            agent_queue: Some(agent_queue),
169            circuit_breakers: HashMap::new(),
170        }
171    }
172
173    pub fn get_or_create_circuit_breaker(&mut self, agent_name: &str) -> Arc<CircuitBreaker> {
174        self.circuit_breakers
175            .entry(agent_name.to_string())
176            .or_insert_with(|| {
177                Arc::new(CircuitBreaker::new(
178                    5, // 5 failures before opening
179                    std::time::Duration::from_secs(30),
180                ))
181            })
182            .clone()
183    }
184
185    pub async fn from_config(config: FuseRuleConfig) -> Result<Self> {
186        // 1. Build Schema (with evolution support - fields can be added/removed)
187        let mut fields = Vec::new();
188        for f in config.schema {
189            let dt = match f.data_type.as_str() {
190                "int32" => arrow::datatypes::DataType::Int32,
191                "int64" => arrow::datatypes::DataType::Int64,
192                "float32" => arrow::datatypes::DataType::Float32,
193                "float64" => arrow::datatypes::DataType::Float64,
194                "bool" => arrow::datatypes::DataType::Boolean,
195                "utf8" | "string" => arrow::datatypes::DataType::Utf8,
196                _ => arrow::datatypes::DataType::Utf8,
197            };
198            fields.push(arrow::datatypes::Field::new(f.name, dt, true));
199        }
200        let schema = Arc::new(arrow::datatypes::Schema::new(fields));
201
202        // 2. Build Components (Edges)
203        let evaluator = Box::new(crate::evaluator::DataFusionEvaluator::new());
204        let state_store = crate::state::SledStateStore::new(&config.engine.persistence_path)?;
205
206        // Start state cleanup task for rules with TTL
207        let mut rules_ttl = std::collections::HashMap::new();
208        for rule in &config.rules {
209            if let Some(ttl) = rule.state_ttl_seconds {
210                rules_ttl.insert(rule.id.clone(), ttl);
211            }
212        }
213        if !rules_ttl.is_empty() {
214            state_store.start_cleanup_task(rules_ttl);
215        }
216
217        let state = Box::new(state_store);
218
219        let max_pending = config.engine.max_pending_batches;
220        let agent_concurrency = config.engine.agent_concurrency;
221        let mut engine = Self::new(
222            evaluator,
223            state,
224            Arc::clone(&schema),
225            max_pending,
226            agent_concurrency,
227        );
228
229        // 3. Add Agents
230        for agent_cfg in config.agents {
231            match agent_cfg.r#type.as_str() {
232                "logger" => {
233                    engine.add_agent(agent_cfg.name, Arc::new(crate::agent::LoggerAgent));
234                }
235                "webhook" => {
236                    if let Some(url) = agent_cfg.url {
237                        engine.add_agent(
238                            agent_cfg.name,
239                            Arc::new(crate::agent::WebhookAgent::new(
240                                url,
241                                agent_cfg.template.clone(),
242                            )),
243                        );
244                    }
245                }
246                _ => println!("Warning: Unknown agent type '{}'", agent_cfg.r#type),
247            }
248        }
249
250        // 4. Add Rules
251        for r_cfg in config.rules {
252            engine
253                .add_rule(Rule {
254                    id: r_cfg.id,
255                    name: r_cfg.name,
256                    predicate: r_cfg.predicate,
257                    action: r_cfg.action,
258                    window_seconds: r_cfg.window_seconds,
259                    version: r_cfg.version,
260                    enabled: r_cfg.enabled,
261                })
262                .await?;
263        }
264
265        Ok(engine)
266    }
267
268    pub async fn reload_from_config(&mut self, config: FuseRuleConfig) -> Result<()> {
269        info!("🔄 Reloading engine configuration...");
270
271        // 1. Update Agents
272        let mut new_agents = HashMap::new();
273        let mut new_circuit_breakers = HashMap::new();
274        for agent_cfg in config.agents {
275            match agent_cfg.r#type.as_str() {
276                "logger" => {
277                    new_agents.insert(
278                        agent_cfg.name.clone(),
279                        Arc::new(crate::agent::LoggerAgent) as Arc<dyn Agent>,
280                    );
281                }
282                "webhook" => {
283                    if let Some(url) = agent_cfg.url {
284                        let agent_name = agent_cfg.name.clone();
285                        new_agents.insert(
286                            agent_name.clone(),
287                            Arc::new(crate::agent::WebhookAgent::new(
288                                url,
289                                agent_cfg.template.clone(),
290                            )) as Arc<dyn Agent>,
291                        );
292                        // Create circuit breaker for webhook agents
293                        new_circuit_breakers.insert(
294                            agent_name,
295                            Arc::new(CircuitBreaker::new(5, std::time::Duration::from_secs(30))),
296                        );
297                    }
298                }
299                _ => warn!("Unknown agent type '{}' during reload", agent_cfg.r#type),
300            }
301        }
302        self.agents = new_agents;
303        self.circuit_breakers = new_circuit_breakers;
304
305        // 2. Update Rules
306        let mut new_rules = Vec::new();
307        let mut new_window_buffers = HashMap::new();
308
309        for r_cfg in config.rules {
310            let rule = Rule {
311                id: r_cfg.id,
312                name: r_cfg.name,
313                predicate: r_cfg.predicate,
314                action: r_cfg.action,
315                window_seconds: r_cfg.window_seconds,
316                version: r_cfg.version,
317                enabled: r_cfg.enabled,
318            };
319
320            // Preserve existing window buffers if possible, otherwise create new
321            if let Some(secs) = rule.window_seconds {
322                if let Some(existing_buffer) = self.window_buffers.remove(&rule.id) {
323                    new_window_buffers.insert(rule.id.clone(), existing_buffer);
324                } else {
325                    new_window_buffers.insert(rule.id.clone(), WindowBuffer::new(secs));
326                }
327            }
328
329            let compiled = self.evaluator.compile(rule, &self.schema)?;
330            new_rules.push(compiled);
331        }
332
333        self.rules = new_rules;
334        self.window_buffers = new_window_buffers;
335
336        info!(
337            "✅ Engine reloaded: {} rules, {} agents",
338            self.rules.len(),
339            self.agents.len()
340        );
341        Ok(())
342    }
343
344    pub fn schema(&self) -> Arc<arrow::datatypes::Schema> {
345        Arc::clone(&self.schema)
346    }
347
348    pub fn add_agent(&mut self, name: String, agent: Arc<dyn Agent>) {
349        self.agents.insert(name, agent);
350    }
351
352    pub async fn add_rule(&mut self, rule: Rule) -> Result<()> {
353        if let Some(secs) = rule.window_seconds {
354            self.window_buffers
355                .insert(rule.id.clone(), WindowBuffer::new(secs));
356        }
357        let compiled = self.evaluator.compile(rule, &self.schema)?;
358        self.rules.push(compiled);
359        Ok(())
360    }
361
362    pub async fn update_rule(&mut self, rule_id: &str, new_rule: Rule) -> Result<()> {
363        // Find existing rule
364        let rule_idx = self.rules.iter().position(|r| r.rule.id == rule_id);
365        if rule_idx.is_none() {
366            anyhow::bail!("Rule not found: {}", rule_id);
367        }
368
369        let rule_idx = rule_idx.unwrap();
370        let old_rule = &self.rules[rule_idx].rule;
371
372        // Preserve window buffer if window_seconds unchanged
373        let preserve_buffer = old_rule.window_seconds == new_rule.window_seconds;
374        let existing_buffer = if preserve_buffer {
375            self.window_buffers.remove(rule_id)
376        } else {
377            None
378        };
379
380        // Compile new rule
381        let compiled = self.evaluator.compile(new_rule, &self.schema)?;
382
383        // Replace rule
384        self.rules[rule_idx] = compiled;
385
386        // Restore or create window buffer
387        if let Some(buffer) = existing_buffer {
388            self.window_buffers.insert(rule_id.to_string(), buffer);
389        } else if let Some(secs) = self.rules[rule_idx].rule.window_seconds {
390            self.window_buffers
391                .insert(rule_id.to_string(), WindowBuffer::new(secs));
392        }
393
394        Ok(())
395    }
396
397    pub async fn toggle_rule(&mut self, rule_id: &str, enabled: bool) -> Result<()> {
398        let rule_idx = self.rules.iter().position(|r| r.rule.id == rule_id);
399        if let Some(idx) = rule_idx {
400            self.rules[idx].rule.enabled = enabled;
401            Ok(())
402        } else {
403            anyhow::bail!("Rule not found: {}", rule_id)
404        }
405    }
406
407    pub async fn process_batch(&mut self, batch: &RecordBatch) -> Result<Vec<EvaluationTrace>> {
408        let _start = Instant::now();
409
410        let mut windowed_data = Vec::with_capacity(self.rules.len());
411        for rule in &self.rules {
412            if let Some(buffer) = self.window_buffers.get(&rule.rule.id) {
413                windowed_data.push(buffer.get_batches());
414            } else {
415                windowed_data.push(vec![]);
416            }
417        }
418
419        // Filter to only enabled rules for evaluation, tracking original indices
420        let mut enabled_indices = Vec::new();
421        let mut enabled_compiled_rules = Vec::new();
422        let mut enabled_window_data = Vec::new();
423
424        for (i, rule) in self.rules.iter().enumerate() {
425            if rule.rule.enabled {
426                enabled_indices.push(i);
427                enabled_compiled_rules.push(rule.clone());
428                enabled_window_data.push(windowed_data.get(i).cloned().unwrap_or_default());
429            }
430        }
431
432        // Parallel rule evaluation using tokio::join_all
433        let evaluation_start = Instant::now();
434        let results_with_context = match self
435            .evaluator
436            .evaluate_batch(batch, &enabled_compiled_rules, &enabled_window_data)
437            .await
438        {
439            Ok(results) => {
440                let eval_duration = evaluation_start.elapsed();
441                crate::metrics::METRICS.record_evaluation_duration(eval_duration.as_secs_f64());
442                results
443            }
444            Err(e) => {
445                error!("Rule evaluation error: {}", e);
446                crate::metrics::METRICS.record_evaluation_error();
447                return Err(e);
448            }
449        };
450        crate::metrics::METRICS
451            .batches_processed
452            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
453        let mut traces = Vec::new();
454
455        // Process enabled rules and create traces for all rules (disabled ones get None transition)
456        let enabled_count = enabled_indices.len();
457        let mut enabled_results_iter = results_with_context.into_iter();
458        let mut enabled_idx_iter = enabled_indices.into_iter();
459
460        // Record per-rule evaluation duration (approximate by dividing total time)
461        // In a more sophisticated implementation, we'd track each rule individually
462        let eval_duration = evaluation_start.elapsed();
463        let per_rule_duration = if enabled_count > 0 {
464            eval_duration.as_secs_f64() / enabled_count as f64
465        } else {
466            0.0
467        };
468
469        for rule in self.rules.iter() {
470            if rule.rule.enabled {
471                let original_idx = enabled_idx_iter.next().unwrap();
472                let (result, context) = enabled_results_iter.next().unwrap();
473                let rule = &self.rules[original_idx].rule;
474
475                // Record rule evaluation
476                crate::metrics::METRICS.record_rule_evaluation(&rule.id);
477                // Record per-rule evaluation duration histogram
478                crate::metrics::METRICS
479                    .record_rule_evaluation_duration(&rule.id, per_rule_duration);
480
481                let transition = self.state.update_result(&rule.id, result).await?;
482
483                if transition != RuleTransition::None {
484                    info!(
485                        "Rule '{}' ({} v{}): {:?} -> {:?}",
486                        rule.name, rule.id, rule.version, result, transition
487                    );
488                }
489
490                let mut agent_status = None;
491                let mut action_fired = false;
492
493                if let RuleTransition::Activated = transition {
494                    action_fired = true;
495                    crate::metrics::METRICS
496                        .activations_total
497                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
498                    crate::metrics::METRICS.record_rule_activation(&rule.id);
499                    let activation = Activation {
500                        rule_id: rule.id.clone(),
501                        rule_name: rule.name.clone(),
502                        action: rule.action.clone(),
503                        context,
504                    };
505
506                    // Use async agent queue if available, otherwise execute synchronously
507                    if let Some(agent_queue) = &self.agent_queue {
508                        if let Some(agent) = self.agents.get(&rule.action) {
509                            let circuit_breaker = self.circuit_breakers.get(&rule.action).cloned();
510
511                            // Get DLQ sender from queue
512                            let dlq_sender = Some(agent_queue.dlq_sender.clone());
513
514                            let task = AgentTask::new(
515                                activation,
516                                agent.clone(),
517                                3, // max_retries
518                                circuit_breaker,
519                                dlq_sender,
520                            );
521
522                            if let Err(e) = agent_queue.enqueue(task).await {
523                                error!("Failed to enqueue agent task: {}", e);
524                                agent_status = Some(format!("enqueue_failed: {}", e));
525                                crate::metrics::METRICS
526                                    .agent_failures
527                                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
528                            } else {
529                                agent_status = Some("queued".to_string());
530                            }
531                        } else {
532                            agent_status = Some("agent_not_found".to_string());
533                            crate::metrics::METRICS
534                                .agent_failures
535                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
536                        }
537                    } else {
538                        // Fallback to synchronous execution
539                        if let Some(agent) = self.agents.get(&rule.action) {
540                            match agent.execute(&activation).await {
541                                Ok(_) => {
542                                    debug!(
543                                        "Agent '{}' executed successfully for rule '{}'",
544                                        rule.action, rule.id
545                                    );
546                                    agent_status = Some("success".to_string());
547                                }
548                                Err(e) => {
549                                    error!("Error executing agent '{}': {}", rule.action, e);
550                                    agent_status = Some(format!("failed: {}", e));
551                                    crate::metrics::METRICS
552                                        .agent_failures
553                                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
554                                }
555                            }
556                        } else {
557                            agent_status = Some("agent_not_found".to_string());
558                            crate::metrics::METRICS
559                                .agent_failures
560                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
561                        }
562                    }
563                }
564
565                traces.push(EvaluationTrace {
566                    rule_id: rule.id.clone(),
567                    rule_name: rule.name.clone(),
568                    rule_version: rule.version,
569                    result,
570                    transition: match transition {
571                        RuleTransition::None => "None".to_string(),
572                        RuleTransition::Activated => "Activated".to_string(),
573                        RuleTransition::Deactivated => {
574                            crate::metrics::METRICS.record_deactivation();
575                            "Deactivated".to_string()
576                        }
577                    },
578                    action_fired,
579                    agent_status,
580                });
581
582                if let Some(buffer) = self.window_buffers.get_mut(&rule.id) {
583                    buffer.add_batch(batch.clone());
584                }
585            } else {
586                // Disabled rule - create trace with None transition
587                let rule = &rule.rule;
588                let last_result = self
589                    .state
590                    .get_last_result(&rule.id)
591                    .await
592                    .unwrap_or(PredicateResult::False);
593                traces.push(EvaluationTrace {
594                    rule_id: rule.id.clone(),
595                    rule_name: rule.name.clone(),
596                    rule_version: rule.version,
597                    result: last_result,
598                    transition: "None".to_string(),
599                    action_fired: false,
600                    agent_status: Some("rule_disabled".to_string()),
601                });
602            }
603        }
604
605        Ok(traces)
606    }
607}