Skip to main content

net/adapter/net/behavior/
rules.rs

1//! Phase 4E: Device Autonomy Rules (DEVICE-RULES)
2//!
3//! This module provides a rule engine for autonomous device behavior:
4//! - Declarative rules with conditions and actions
5//! - Priority-based rule evaluation and conflict resolution
6//! - Context-aware condition matching
7//! - Action execution with rate limiting and cooldowns
8
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, HashSet};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
14
15use super::metadata::NodeId;
16
17/// Comparison operators for conditions
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub enum CompareOp {
20    /// Equal to
21    Eq,
22    /// Not equal to
23    Ne,
24    /// Less than
25    Lt,
26    /// Less than or equal
27    Le,
28    /// Greater than
29    Gt,
30    /// Greater than or equal
31    Ge,
32    /// Contains (for strings/arrays)
33    Contains,
34    /// Starts with (for strings)
35    StartsWith,
36    /// Ends with (for strings)
37    EndsWith,
38    /// Matches regex pattern
39    Matches,
40    /// Value is in set
41    In,
42    /// Value is not in set
43    NotIn,
44    /// Value exists (is not null)
45    Exists,
46    /// Value does not exist (is null)
47    NotExists,
48}
49
50impl CompareOp {
51    /// Evaluate comparison between two JSON values
52    pub fn evaluate(&self, left: &serde_json::Value, right: &serde_json::Value) -> bool {
53        match self {
54            CompareOp::Eq => left == right,
55            CompareOp::Ne => left != right,
56            CompareOp::Lt => compare_values(left, right) == Some(std::cmp::Ordering::Less),
57            CompareOp::Le => {
58                matches!(
59                    compare_values(left, right),
60                    Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
61                )
62            }
63            CompareOp::Gt => compare_values(left, right) == Some(std::cmp::Ordering::Greater),
64            CompareOp::Ge => {
65                matches!(
66                    compare_values(left, right),
67                    Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
68                )
69            }
70            CompareOp::Contains => match (left, right) {
71                (serde_json::Value::String(s), serde_json::Value::String(sub)) => {
72                    s.contains(sub.as_str())
73                }
74                (serde_json::Value::Array(arr), val) => arr.contains(val),
75                _ => false,
76            },
77            CompareOp::StartsWith => match (left, right) {
78                (serde_json::Value::String(s), serde_json::Value::String(prefix)) => {
79                    s.starts_with(prefix.as_str())
80                }
81                _ => false,
82            },
83            CompareOp::EndsWith => match (left, right) {
84                (serde_json::Value::String(s), serde_json::Value::String(suffix)) => {
85                    s.ends_with(suffix.as_str())
86                }
87                _ => false,
88            },
89            CompareOp::Matches => {
90                // Simple pattern matching (not full regex for performance)
91                match (left, right) {
92                    (serde_json::Value::String(s), serde_json::Value::String(pattern)) => {
93                        s.contains(pattern.as_str())
94                    }
95                    _ => false,
96                }
97            }
98            CompareOp::In => match right {
99                serde_json::Value::Array(arr) => arr.contains(left),
100                _ => false,
101            },
102            CompareOp::NotIn => match right {
103                serde_json::Value::Array(arr) => !arr.contains(left),
104                _ => true,
105            },
106            CompareOp::Exists => !left.is_null(),
107            CompareOp::NotExists => left.is_null(),
108        }
109    }
110}
111
112fn compare_values(
113    left: &serde_json::Value,
114    right: &serde_json::Value,
115) -> Option<std::cmp::Ordering> {
116    match (left, right) {
117        (serde_json::Value::Number(a), serde_json::Value::Number(b)) => compare_numbers(a, b),
118        (serde_json::Value::String(a), serde_json::Value::String(b)) => Some(a.cmp(b)),
119        _ => None,
120    }
121}
122
123/// Compare two `serde_json::Number` values without losing precision
124/// when both fit in an integer type.
125///
126/// Pre-fix, `compare_values` always reduced both sides to
127/// `f64` via `as_f64()`. For integer fields above `2^53` (e.g. byte
128/// counts, ns timestamps, monotonic sequence numbers) the cast was
129/// lossy: two adjacent values like `9_007_199_254_740_992` and
130/// `9_007_199_254_740_993` compared `Equal`, so a `Gt` rule guarding
131/// a quota silently failed to fire. NaN/Infinity (legal under
132/// `arbitrary_precision`, though we don't enable it) yielded `None`,
133/// silently masking the bug.
134///
135/// Post-fix:
136///   1. Both i64 → exact i64 compare (handles negatives).
137///   2. Both u64 → exact u64 compare (handles values > i64::MAX).
138///   3. Mixed signed-i64 / large-u64 → resolved by sign:
139///      a negative i64 is always less than any u64; a u64 above
140///      i64::MAX is always greater than any negative i64.
141///   4. At least one is a float → f64 fallback with the documented
142///      lossy semantics.
143fn compare_numbers(a: &serde_json::Number, b: &serde_json::Number) -> Option<std::cmp::Ordering> {
144    // We don't enable serde_json's `arbitrary_precision` feature in
145    // this tree, but a transitive dependency could turn it on
146    // workspace-wide via Cargo's feature unification. With it, a
147    // big-integer literal stops fitting in any of `as_i64` /
148    // `as_u64` / `as_f64` (all three return `None`) and the rule
149    // would silently fail to fire — quota gates, threshold checks,
150    // and policy rules become no-ops with no diagnostic. The
151    // `debug_assert!` makes the misuse loud during dev/test; in
152    // release we still return `None` so the rule fails closed.
153    debug_assert!(
154        a.is_i64() || a.is_u64() || a.is_f64(),
155        "compare_numbers: lhs is neither i64/u64/f64 — likely \
156         `serde_json/arbitrary_precision` got enabled via feature \
157         unification. Rule will silently fail closed in release."
158    );
159    debug_assert!(
160        b.is_i64() || b.is_u64() || b.is_f64(),
161        "compare_numbers: rhs is neither i64/u64/f64 — likely \
162         `serde_json/arbitrary_precision` got enabled via feature \
163         unification. Rule will silently fail closed in release."
164    );
165
166    // 1. Both fit in i64.
167    if let (Some(ai), Some(bi)) = (a.as_i64(), b.as_i64()) {
168        return Some(ai.cmp(&bi));
169    }
170    // 2. Both fit in u64. Order matters: i64 takes precedence above
171    //    so non-negative integers that fit in both types use the
172    //    signed compare; only this branch handles values > i64::MAX.
173    if let (Some(au), Some(bu)) = (a.as_u64(), b.as_u64()) {
174        return Some(au.cmp(&bu));
175    }
176    // 3. Mixed: one is a negative i64 (as_u64 returned None for it)
177    //    and the other is a u64 above i64::MAX (as_i64 returned None
178    //    for it). The negative side is always less than the
179    //    non-negative side.
180    if a.as_i64().is_some() && b.as_u64().is_some() {
181        return Some(std::cmp::Ordering::Less);
182    }
183    if a.as_u64().is_some() && b.as_i64().is_some() {
184        return Some(std::cmp::Ordering::Greater);
185    }
186    // 4. Float fallback. NaN → None, which collapses comparing-to-
187    //    NaN rules to the existing `partial_cmp` semantics.
188    a.as_f64()?.partial_cmp(&b.as_f64()?)
189}
190
191/// Logical operators for combining conditions
192#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
193pub enum LogicOp {
194    /// All conditions must be true
195    And,
196    /// At least one condition must be true
197    Or,
198    /// No conditions must be true
199    Not,
200    /// Exactly one condition must be true
201    Xor,
202}
203
204/// A single condition to evaluate
205#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
206pub struct Condition {
207    /// Field path to evaluate (dot notation, e.g., "metrics.cpu_usage")
208    pub field: String,
209    /// Comparison operator
210    pub op: CompareOp,
211    /// Value to compare against
212    pub value: serde_json::Value,
213    /// Optional description
214    #[serde(skip_serializing_if = "Option::is_none")]
215    pub description: Option<String>,
216}
217
218impl Condition {
219    /// Create a new condition
220    pub fn new(field: impl Into<String>, op: CompareOp, value: serde_json::Value) -> Self {
221        Self {
222            field: field.into(),
223            op,
224            value,
225            description: None,
226        }
227    }
228
229    /// Add description
230    pub fn with_description(mut self, desc: impl Into<String>) -> Self {
231        self.description = Some(desc.into());
232        self
233    }
234
235    /// Evaluate condition against context
236    pub fn evaluate(&self, context: &RuleContext) -> bool {
237        let field_value = context.get_field(&self.field);
238        self.op.evaluate(&field_value, &self.value)
239    }
240
241    // Convenience constructors
242
243    /// Field equals value
244    pub fn eq(field: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
245        Self::new(field, CompareOp::Eq, value.into())
246    }
247
248    /// Field not equals value
249    pub fn ne(field: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
250        Self::new(field, CompareOp::Ne, value.into())
251    }
252
253    /// Field greater than value
254    pub fn gt(field: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
255        Self::new(field, CompareOp::Gt, value.into())
256    }
257
258    /// Field greater than or equal to value
259    pub fn ge(field: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
260        Self::new(field, CompareOp::Ge, value.into())
261    }
262
263    /// Field less than value
264    pub fn lt(field: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
265        Self::new(field, CompareOp::Lt, value.into())
266    }
267
268    /// Field less than or equal to value
269    pub fn le(field: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
270        Self::new(field, CompareOp::Le, value.into())
271    }
272
273    /// Field contains value
274    pub fn contains(field: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
275        Self::new(field, CompareOp::Contains, value.into())
276    }
277
278    /// Field is in set of values
279    pub fn is_in(field: impl Into<String>, values: Vec<serde_json::Value>) -> Self {
280        Self::new(field, CompareOp::In, serde_json::Value::Array(values))
281    }
282
283    /// Field exists
284    pub fn exists(field: impl Into<String>) -> Self {
285        Self::new(field, CompareOp::Exists, serde_json::Value::Null)
286    }
287}
288
289/// A composite condition expression
290#[derive(Debug, Clone, PartialEq)]
291pub enum ConditionExpr {
292    /// Single condition
293    Single(Condition),
294    /// Combine conditions with AND
295    And(Vec<ConditionExpr>),
296    /// Combine conditions with OR
297    Or(Vec<ConditionExpr>),
298    /// Negate a condition
299    Not(Box<ConditionExpr>),
300    /// Always true
301    Always,
302    /// Always false
303    Never,
304}
305
306impl ConditionExpr {
307    /// Create from single condition
308    pub fn single(condition: Condition) -> Self {
309        ConditionExpr::Single(condition)
310    }
311
312    /// Combine with AND
313    #[expect(
314        clippy::unwrap_used,
315        reason = "len == 1 branch guarantees the iterator yields exactly one element"
316    )]
317    pub fn and(conditions: Vec<ConditionExpr>) -> Self {
318        if conditions.is_empty() {
319            ConditionExpr::Always
320        } else if conditions.len() == 1 {
321            conditions.into_iter().next().unwrap()
322        } else {
323            ConditionExpr::And(conditions)
324        }
325    }
326
327    /// Combine with OR
328    #[expect(
329        clippy::unwrap_used,
330        reason = "len == 1 branch guarantees the iterator yields exactly one element"
331    )]
332    pub fn or(conditions: Vec<ConditionExpr>) -> Self {
333        if conditions.is_empty() {
334            ConditionExpr::Never
335        } else if conditions.len() == 1 {
336            conditions.into_iter().next().unwrap()
337        } else {
338            ConditionExpr::Or(conditions)
339        }
340    }
341
342    /// Negate condition
343    pub fn negate(condition: ConditionExpr) -> Self {
344        ConditionExpr::Not(Box::new(condition))
345    }
346
347    /// Evaluate expression against context
348    pub fn evaluate(&self, context: &RuleContext) -> bool {
349        match self {
350            ConditionExpr::Single(c) => c.evaluate(context),
351            ConditionExpr::And(conditions) => conditions.iter().all(|c| c.evaluate(context)),
352            ConditionExpr::Or(conditions) => conditions.iter().any(|c| c.evaluate(context)),
353            ConditionExpr::Not(c) => !c.evaluate(context),
354            ConditionExpr::Always => true,
355            ConditionExpr::Never => false,
356        }
357    }
358
359    /// Count the number of conditions
360    pub fn condition_count(&self) -> usize {
361        match self {
362            ConditionExpr::Single(_) => 1,
363            ConditionExpr::And(conditions) | ConditionExpr::Or(conditions) => {
364                conditions.iter().map(|c| c.condition_count()).sum()
365            }
366            ConditionExpr::Not(c) => c.condition_count(),
367            ConditionExpr::Always | ConditionExpr::Never => 0,
368        }
369    }
370}
371
372/// Action types that can be executed when rules match
373#[derive(Debug, Clone, PartialEq)]
374pub enum Action {
375    /// Log a message
376    Log {
377        /// Severity level at which to log the message
378        level: LogLevel,
379        /// Text content of the log entry
380        message: String,
381    },
382    /// Emit an event
383    Emit {
384        /// Identifier for the type of event being emitted
385        event_type: String,
386        /// Arbitrary JSON data attached to the event
387        payload: serde_json::Value,
388    },
389    /// Set a context value
390    SetContext {
391        /// Context key to set
392        key: String,
393        /// JSON value to store under the key
394        value: serde_json::Value,
395    },
396    /// Increment a counter
397    IncrementCounter {
398        /// Name of the counter to increment
399        name: String,
400        /// Amount by which to increment the counter
401        amount: i64,
402    },
403    /// Send alert/notification
404    Alert {
405        /// Severity level of the alert
406        severity: AlertSeverity,
407        /// Short human-readable title for the alert
408        title: String,
409        /// Detailed description of the alert
410        message: String,
411    },
412    /// Throttle/rate limit
413    Throttle {
414        /// Identifier key used to track the rate limit bucket
415        key: String,
416        /// Maximum number of requests allowed per second
417        max_per_second: f64,
418    },
419    /// Reject request
420    Reject {
421        /// Human-readable explanation for the rejection
422        reason: String,
423        /// Numeric error code returned with the rejection
424        code: u32,
425    },
426    /// Redirect to another node
427    Redirect {
428        /// Specific node to redirect to, if known
429        target_node: Option<NodeId>,
430        /// Tags used to select a target node when no specific node is given
431        target_tags: Vec<String>,
432    },
433    /// Scale resources
434    Scale {
435        /// Name of the resource to scale
436        resource: String,
437        /// Whether to scale up or down
438        direction: ScaleDirection,
439        /// Magnitude of the scaling adjustment
440        amount: u32,
441    },
442    /// Execute custom action
443    Custom {
444        /// Identifier for the custom action type
445        action_type: String,
446        /// Key-value parameters passed to the custom action handler
447        params: HashMap<String, serde_json::Value>,
448    },
449    /// Chain multiple actions
450    Chain(Vec<Action>),
451    /// No action (useful for monitoring rules)
452    Noop,
453}
454
455/// Log levels
456#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
457#[serde(rename_all = "lowercase")]
458pub enum LogLevel {
459    /// Debug level
460    Debug,
461    /// Info level
462    Info,
463    /// Warning level
464    Warn,
465    /// Error level
466    Error,
467}
468
469/// Alert severity levels
470#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
471#[serde(rename_all = "lowercase")]
472pub enum AlertSeverity {
473    /// Low priority
474    Low,
475    /// Medium priority
476    Medium,
477    /// High priority
478    High,
479    /// Critical priority
480    Critical,
481}
482
483/// Scale direction
484#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
485#[serde(rename_all = "lowercase")]
486pub enum ScaleDirection {
487    /// Scale up
488    Up,
489    /// Scale down
490    Down,
491}
492
493impl Action {
494    /// Create a log action
495    pub fn log(level: LogLevel, message: impl Into<String>) -> Self {
496        Action::Log {
497            level,
498            message: message.into(),
499        }
500    }
501
502    /// Create an emit action
503    pub fn emit(event_type: impl Into<String>, payload: serde_json::Value) -> Self {
504        Action::Emit {
505            event_type: event_type.into(),
506            payload,
507        }
508    }
509
510    /// Create a set context action
511    pub fn set_context(key: impl Into<String>, value: serde_json::Value) -> Self {
512        Action::SetContext {
513            key: key.into(),
514            value,
515        }
516    }
517
518    /// Create an alert action
519    pub fn alert(
520        severity: AlertSeverity,
521        title: impl Into<String>,
522        message: impl Into<String>,
523    ) -> Self {
524        Action::Alert {
525            severity,
526            title: title.into(),
527            message: message.into(),
528        }
529    }
530
531    /// Create a reject action
532    pub fn reject(reason: impl Into<String>, code: u32) -> Self {
533        Action::Reject {
534            reason: reason.into(),
535            code,
536        }
537    }
538
539    /// Create a redirect action
540    pub fn redirect_to_tags(tags: Vec<String>) -> Self {
541        Action::Redirect {
542            target_node: None,
543            target_tags: tags,
544        }
545    }
546
547    /// Create a chain of actions
548    pub fn chain(actions: Vec<Action>) -> Self {
549        Action::Chain(actions)
550    }
551
552    /// Count total actions (including nested)
553    pub fn action_count(&self) -> usize {
554        match self {
555            Action::Chain(actions) => actions.iter().map(|a| a.action_count()).sum(),
556            _ => 1,
557        }
558    }
559}
560
561/// Rule priority levels
562#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
563pub enum Priority {
564    /// Lowest priority (evaluated last)
565    Lowest,
566    /// Low priority
567    Low,
568    /// Normal priority (default)
569    #[default]
570    Normal,
571    /// High priority
572    High,
573    /// Highest priority (evaluated first)
574    Highest,
575    /// Custom priority value
576    Custom(u8),
577}
578
579impl PartialOrd for Priority {
580    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
581        Some(self.cmp(other))
582    }
583}
584
585impl Ord for Priority {
586    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
587        self.value().cmp(&other.value())
588    }
589}
590
591impl Priority {
592    /// Get numeric priority value
593    pub fn value(&self) -> u8 {
594        match self {
595            Priority::Lowest => 0,
596            Priority::Low => 25,
597            Priority::Normal => 50,
598            Priority::High => 75,
599            Priority::Highest => 100,
600            Priority::Custom(v) => *v,
601        }
602    }
603}
604
605/// A complete rule definition
606#[derive(Debug, Clone, PartialEq)]
607pub struct Rule {
608    /// Unique rule ID
609    pub id: String,
610    /// Human-readable name
611    pub name: String,
612    /// Description
613    pub description: Option<String>,
614    /// Rule priority
615    pub priority: Priority,
616    /// Condition expression
617    pub condition: ConditionExpr,
618    /// Action to execute when condition matches
619    pub action: Action,
620    /// Whether rule is enabled
621    pub enabled: bool,
622    /// Tags for categorization
623    pub tags: Vec<String>,
624    /// Cooldown between executions (milliseconds)
625    pub cooldown_ms: Option<u64>,
626    /// Maximum executions per time window
627    pub rate_limit: Option<RateLimit>,
628    /// Stop processing further rules if this one matches
629    pub stop_on_match: bool,
630    /// Created timestamp
631    pub created_at: u64,
632    /// Updated timestamp
633    pub updated_at: u64,
634}
635
636/// Rate limit configuration
637#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
638pub struct RateLimit {
639    /// Maximum executions
640    pub max_executions: u32,
641    /// Time window in seconds
642    pub window_secs: u32,
643}
644
645impl Rule {
646    /// Create a new rule
647    pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
648        // Pre-fix this used `.as_millis() as u64`, which
649        // silently truncated the u128 millis on overflow. Realistic
650        // dates (anything within u64::MAX milliseconds since UNIX
651        // epoch — year ~584,554,051) are unaffected, but `as`
652        // casts on durations are a footgun worth eliminating.
653        // `try_from` saturates on overflow so a far-future clock
654        // surfaces as a max-timestamp instead of wrapping to a
655        // small value that would invert "newer-rule-wins"
656        // comparisons.
657        let now = u64::try_from(
658            SystemTime::now()
659                .duration_since(UNIX_EPOCH)
660                .unwrap_or_default()
661                .as_millis(),
662        )
663        .unwrap_or(u64::MAX);
664
665        Self {
666            id: id.into(),
667            name: name.into(),
668            description: None,
669            priority: Priority::Normal,
670            condition: ConditionExpr::Always,
671            action: Action::Noop,
672            enabled: true,
673            tags: Vec::new(),
674            cooldown_ms: None,
675            rate_limit: None,
676            stop_on_match: false,
677            created_at: now,
678            updated_at: now,
679        }
680    }
681
682    /// Set description
683    pub fn with_description(mut self, desc: impl Into<String>) -> Self {
684        self.description = Some(desc.into());
685        self
686    }
687
688    /// Set priority
689    pub fn with_priority(mut self, priority: Priority) -> Self {
690        self.priority = priority;
691        self
692    }
693
694    /// Set condition
695    pub fn with_condition(mut self, condition: ConditionExpr) -> Self {
696        self.condition = condition;
697        self
698    }
699
700    /// Set action
701    pub fn with_action(mut self, action: Action) -> Self {
702        self.action = action;
703        self
704    }
705
706    /// Add tag
707    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
708        self.tags.push(tag.into());
709        self
710    }
711
712    /// Set cooldown
713    pub fn with_cooldown(mut self, cooldown_ms: u64) -> Self {
714        self.cooldown_ms = Some(cooldown_ms);
715        self
716    }
717
718    /// Set rate limit
719    pub fn with_rate_limit(mut self, max_executions: u32, window_secs: u32) -> Self {
720        self.rate_limit = Some(RateLimit {
721            max_executions,
722            window_secs,
723        });
724        self
725    }
726
727    /// Stop processing further rules on match
728    pub fn stop_on_match(mut self) -> Self {
729        self.stop_on_match = true;
730        self
731    }
732
733    /// Disable rule
734    pub fn disabled(mut self) -> Self {
735        self.enabled = false;
736        self
737    }
738
739    /// Check if rule matches context
740    pub fn matches(&self, context: &RuleContext) -> bool {
741        self.enabled && self.condition.evaluate(context)
742    }
743}
744
745/// Context for rule evaluation
746#[derive(Debug, Clone, Default)]
747pub struct RuleContext {
748    /// Context data (nested JSON-like structure)
749    data: HashMap<String, serde_json::Value>,
750    /// Metadata about the evaluation
751    metadata: HashMap<String, String>,
752}
753
754impl RuleContext {
755    /// Create empty context
756    pub fn new() -> Self {
757        Self::default()
758    }
759
760    /// Create from JSON value
761    pub fn from_value(value: serde_json::Value) -> Self {
762        let mut ctx = Self::new();
763        if let serde_json::Value::Object(map) = value {
764            for (k, v) in map {
765                ctx.data.insert(k, v);
766            }
767        }
768        ctx
769    }
770
771    /// Set a value
772    pub fn set(&mut self, key: impl Into<String>, value: serde_json::Value) {
773        self.data.insert(key.into(), value);
774    }
775
776    /// Get a value by key
777    pub fn get(&self, key: &str) -> Option<&serde_json::Value> {
778        self.data.get(key)
779    }
780
781    /// Get a field by dot-notation path
782    pub fn get_field(&self, path: &str) -> serde_json::Value {
783        let parts: Vec<&str> = path.split('.').collect();
784        if parts.is_empty() {
785            return serde_json::Value::Null;
786        }
787
788        let mut current = match self.data.get(parts[0]) {
789            Some(v) => v.clone(),
790            None => return serde_json::Value::Null,
791        };
792
793        for part in &parts[1..] {
794            current = match current {
795                serde_json::Value::Object(ref map) => {
796                    map.get(*part).cloned().unwrap_or(serde_json::Value::Null)
797                }
798                serde_json::Value::Array(ref arr) => {
799                    if let Ok(idx) = part.parse::<usize>() {
800                        arr.get(idx).cloned().unwrap_or(serde_json::Value::Null)
801                    } else {
802                        serde_json::Value::Null
803                    }
804                }
805                _ => serde_json::Value::Null,
806            };
807        }
808
809        current
810    }
811
812    /// Set metadata
813    pub fn set_metadata(&mut self, key: impl Into<String>, value: impl Into<String>) {
814        self.metadata.insert(key.into(), value.into());
815    }
816
817    /// Get metadata
818    pub fn get_metadata(&self, key: &str) -> Option<&str> {
819        self.metadata.get(key).map(|s| s.as_str())
820    }
821
822    /// Merge another context into this one
823    pub fn merge(&mut self, other: RuleContext) {
824        for (k, v) in other.data {
825            self.data.insert(k, v);
826        }
827        for (k, v) in other.metadata {
828            self.metadata.insert(k, v);
829        }
830    }
831
832    /// Convert to JSON value
833    pub fn to_value(&self) -> serde_json::Value {
834        serde_json::Value::Object(
835            self.data
836                .iter()
837                .map(|(k, v)| (k.clone(), v.clone()))
838                .collect(),
839        )
840    }
841}
842
843/// Result of evaluating a rule
844#[derive(Debug, Clone)]
845pub struct RuleResult {
846    /// Rule that matched
847    pub rule_id: String,
848    /// Rule name
849    pub rule_name: String,
850    /// Whether the rule matched
851    pub matched: bool,
852    /// Action to execute (if matched)
853    pub action: Option<Action>,
854    /// Whether to stop processing more rules
855    pub stop_processing: bool,
856    /// Evaluation time in nanoseconds
857    pub eval_time_ns: u64,
858}
859
860/// Execution state for a rule (tracks cooldowns and rate limits)
861#[derive(Debug)]
862struct RuleExecutionState {
863    /// Last execution time
864    last_execution: Option<Instant>,
865    /// Execution count in current window
866    execution_count: u32,
867    /// Window start time
868    window_start: Instant,
869}
870
871impl Default for RuleExecutionState {
872    fn default() -> Self {
873        Self {
874            last_execution: None,
875            execution_count: 0,
876            window_start: Instant::now(),
877        }
878    }
879}
880
881/// Rule engine errors
882#[derive(Debug, Clone, PartialEq, Eq)]
883pub enum RuleError {
884    /// Rule not found
885    NotFound(String),
886    /// Rule already exists
887    AlreadyExists(String),
888    /// Invalid rule
889    Invalid(String),
890    /// Rate limited
891    RateLimited {
892        /// Identifier of the rule that triggered the rate limit
893        rule_id: String,
894        /// Number of milliseconds to wait before retrying
895        retry_after_ms: u64,
896    },
897    /// Cooldown active
898    CooldownActive {
899        /// Identifier of the rule currently in cooldown
900        rule_id: String,
901        /// Number of milliseconds remaining in the cooldown period
902        remaining_ms: u64,
903    },
904}
905
906impl std::fmt::Display for RuleError {
907    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
908        match self {
909            RuleError::NotFound(id) => write!(f, "Rule not found: {}", id),
910            RuleError::AlreadyExists(id) => write!(f, "Rule already exists: {}", id),
911            RuleError::Invalid(msg) => write!(f, "Invalid rule: {}", msg),
912            RuleError::RateLimited {
913                rule_id,
914                retry_after_ms,
915            } => {
916                write!(
917                    f,
918                    "Rule {} rate limited, retry after {}ms",
919                    rule_id, retry_after_ms
920                )
921            }
922            RuleError::CooldownActive {
923                rule_id,
924                remaining_ms,
925            } => {
926                write!(
927                    f,
928                    "Rule {} on cooldown, {}ms remaining",
929                    rule_id, remaining_ms
930                )
931            }
932        }
933    }
934}
935
936impl std::error::Error for RuleError {}
937
938/// Statistics for the rule engine
939#[derive(Debug, Clone, Default)]
940pub struct RuleEngineStats {
941    /// Total rules
942    pub total_rules: usize,
943    /// Enabled rules
944    pub enabled_rules: usize,
945    /// Total evaluations
946    pub evaluations: u64,
947    /// Total matches
948    pub matches: u64,
949    /// Total actions executed
950    pub actions_executed: u64,
951    /// Rules by priority
952    pub by_priority: HashMap<u8, usize>,
953    /// Rules by tag
954    pub by_tag: HashMap<String, usize>,
955}
956
957/// High-performance rule engine
958pub struct RuleEngine {
959    /// Rules sorted by priority (highest first)
960    rules: Vec<Arc<Rule>>,
961    /// Rule index by ID
962    rules_by_id: HashMap<String, Arc<Rule>>,
963    /// Rule index by tag
964    rules_by_tag: HashMap<String, HashSet<String>>,
965    /// Execution state for rate limiting and cooldowns
966    execution_state: HashMap<String, RuleExecutionState>,
967    /// Evaluation counter
968    eval_count: AtomicU64,
969    /// Match counter
970    match_count: AtomicU64,
971    /// Action counter
972    action_count: AtomicU64,
973}
974
975impl RuleEngine {
976    /// Create a new rule engine
977    pub fn new() -> Self {
978        Self {
979            rules: Vec::new(),
980            rules_by_id: HashMap::new(),
981            rules_by_tag: HashMap::new(),
982            execution_state: HashMap::new(),
983            eval_count: AtomicU64::new(0),
984            match_count: AtomicU64::new(0),
985            action_count: AtomicU64::new(0),
986        }
987    }
988
989    /// Add a rule
990    pub fn add_rule(&mut self, rule: Rule) -> Result<(), RuleError> {
991        if self.rules_by_id.contains_key(&rule.id) {
992            return Err(RuleError::AlreadyExists(rule.id.clone()));
993        }
994
995        let rule_arc = Arc::new(rule);
996
997        // Add to tag index
998        for tag in &rule_arc.tags {
999            self.rules_by_tag
1000                .entry(tag.clone())
1001                .or_default()
1002                .insert(rule_arc.id.clone());
1003        }
1004
1005        // Add to ID index
1006        self.rules_by_id
1007            .insert(rule_arc.id.clone(), Arc::clone(&rule_arc));
1008
1009        // Add to sorted list and re-sort by priority
1010        self.rules.push(rule_arc);
1011        self.rules
1012            .sort_by_key(|r| std::cmp::Reverse(r.priority.value()));
1013
1014        Ok(())
1015    }
1016
1017    /// Remove a rule
1018    pub fn remove_rule(&mut self, rule_id: &str) -> Option<Arc<Rule>> {
1019        let rule = self.rules_by_id.remove(rule_id)?;
1020
1021        // Remove from tag index
1022        for tag in &rule.tags {
1023            if let Some(set) = self.rules_by_tag.get_mut(tag) {
1024                set.remove(rule_id);
1025            }
1026        }
1027
1028        // Remove from sorted list
1029        self.rules.retain(|r| r.id != rule_id);
1030
1031        // Remove execution state
1032        self.execution_state.remove(rule_id);
1033
1034        Some(rule)
1035    }
1036
1037    /// Get a rule by ID
1038    pub fn get_rule(&self, rule_id: &str) -> Option<Arc<Rule>> {
1039        self.rules_by_id.get(rule_id).cloned()
1040    }
1041
1042    /// Get all rules
1043    pub fn rules(&self) -> &[Arc<Rule>] {
1044        &self.rules
1045    }
1046
1047    /// Get rules by tag
1048    pub fn rules_by_tag(&self, tag: &str) -> Vec<Arc<Rule>> {
1049        self.rules_by_tag
1050            .get(tag)
1051            .map(|ids| {
1052                ids.iter()
1053                    .filter_map(|id| self.rules_by_id.get(id).cloned())
1054                    .collect()
1055            })
1056            .unwrap_or_default()
1057    }
1058
1059    /// Evaluate all rules against context
1060    pub fn evaluate(&mut self, context: &RuleContext) -> Vec<RuleResult> {
1061        self.eval_count.fetch_add(1, Ordering::Relaxed);
1062
1063        // First pass: evaluate rules and collect results
1064        let mut results = Vec::new();
1065        let mut rules_to_record = Vec::new();
1066        let mut should_stop = false;
1067
1068        for rule in &self.rules {
1069            if should_stop {
1070                break;
1071            }
1072
1073            let start = Instant::now();
1074            let matched = rule.matches(context);
1075            let eval_time = start.elapsed().as_nanos() as u64;
1076
1077            if matched {
1078                self.match_count.fetch_add(1, Ordering::Relaxed);
1079
1080                // Check cooldown and rate limit
1081                let can_execute = self.check_execution_allowed(&rule.id, rule.as_ref());
1082
1083                let action = if can_execute {
1084                    rules_to_record.push(rule.id.clone());
1085                    self.action_count
1086                        .fetch_add(rule.action.action_count() as u64, Ordering::Relaxed);
1087                    Some(rule.action.clone())
1088                } else {
1089                    None
1090                };
1091
1092                results.push(RuleResult {
1093                    rule_id: rule.id.clone(),
1094                    rule_name: rule.name.clone(),
1095                    matched: true,
1096                    action,
1097                    stop_processing: rule.stop_on_match,
1098                    eval_time_ns: eval_time,
1099                });
1100
1101                if rule.stop_on_match {
1102                    should_stop = true;
1103                }
1104            } else {
1105                results.push(RuleResult {
1106                    rule_id: rule.id.clone(),
1107                    rule_name: rule.name.clone(),
1108                    matched: false,
1109                    action: None,
1110                    stop_processing: false,
1111                    eval_time_ns: eval_time,
1112                });
1113            }
1114        }
1115
1116        // Second pass: record executions
1117        for rule_id in rules_to_record {
1118            self.record_execution(&rule_id);
1119        }
1120
1121        results
1122    }
1123
1124    /// Evaluate and return only matching rules
1125    pub fn evaluate_matching(&mut self, context: &RuleContext) -> Vec<RuleResult> {
1126        self.evaluate(context)
1127            .into_iter()
1128            .filter(|r| r.matched)
1129            .collect()
1130    }
1131
1132    /// Evaluate until first match
1133    pub fn evaluate_first(&mut self, context: &RuleContext) -> Option<RuleResult> {
1134        self.eval_count.fetch_add(1, Ordering::Relaxed);
1135
1136        let mut result = None;
1137        let mut rule_to_record = None;
1138
1139        for rule in &self.rules {
1140            let start = Instant::now();
1141            let matched = rule.matches(context);
1142            let eval_time = start.elapsed().as_nanos() as u64;
1143
1144            if matched {
1145                self.match_count.fetch_add(1, Ordering::Relaxed);
1146
1147                let can_execute = self.check_execution_allowed(&rule.id, rule.as_ref());
1148                let action = if can_execute {
1149                    rule_to_record = Some(rule.id.clone());
1150                    self.action_count
1151                        .fetch_add(rule.action.action_count() as u64, Ordering::Relaxed);
1152                    Some(rule.action.clone())
1153                } else {
1154                    None
1155                };
1156
1157                result = Some(RuleResult {
1158                    rule_id: rule.id.clone(),
1159                    rule_name: rule.name.clone(),
1160                    matched: true,
1161                    action,
1162                    stop_processing: rule.stop_on_match,
1163                    eval_time_ns: eval_time,
1164                });
1165                break;
1166            }
1167        }
1168
1169        // Record execution after iteration is complete
1170        if let Some(rule_id) = rule_to_record {
1171            self.record_execution(&rule_id);
1172        }
1173
1174        result
1175    }
1176
1177    /// Check if a specific rule would match
1178    pub fn would_match(&self, rule_id: &str, context: &RuleContext) -> bool {
1179        self.rules_by_id
1180            .get(rule_id)
1181            .map(|r| r.matches(context))
1182            .unwrap_or(false)
1183    }
1184
1185    /// Get statistics
1186    pub fn stats(&self) -> RuleEngineStats {
1187        let mut by_priority: HashMap<u8, usize> = HashMap::new();
1188        let mut by_tag: HashMap<String, usize> = HashMap::new();
1189        let mut enabled_count = 0;
1190
1191        for rule in &self.rules {
1192            if rule.enabled {
1193                enabled_count += 1;
1194            }
1195            *by_priority.entry(rule.priority.value()).or_default() += 1;
1196            for tag in &rule.tags {
1197                *by_tag.entry(tag.clone()).or_default() += 1;
1198            }
1199        }
1200
1201        RuleEngineStats {
1202            total_rules: self.rules.len(),
1203            enabled_rules: enabled_count,
1204            evaluations: self.eval_count.load(Ordering::Relaxed),
1205            matches: self.match_count.load(Ordering::Relaxed),
1206            actions_executed: self.action_count.load(Ordering::Relaxed),
1207            by_priority,
1208            by_tag,
1209        }
1210    }
1211
1212    /// Number of rules
1213    pub fn len(&self) -> usize {
1214        self.rules.len()
1215    }
1216
1217    /// Check if empty
1218    pub fn is_empty(&self) -> bool {
1219        self.rules.is_empty()
1220    }
1221
1222    /// Clear all rules
1223    pub fn clear(&mut self) {
1224        self.rules.clear();
1225        self.rules_by_id.clear();
1226        self.rules_by_tag.clear();
1227        self.execution_state.clear();
1228    }
1229
1230    /// Reset execution state (cooldowns and rate limits)
1231    pub fn reset_execution_state(&mut self) {
1232        self.execution_state.clear();
1233    }
1234
1235    // Check if execution is allowed (cooldown and rate limit)
1236    fn check_execution_allowed(&self, rule_id: &str, rule: &Rule) -> bool {
1237        let state = match self.execution_state.get(rule_id) {
1238            Some(s) => s,
1239            None => return true, // No state means no restrictions yet
1240        };
1241
1242        let now = Instant::now();
1243
1244        // Check cooldown
1245        if let Some(cooldown_ms) = rule.cooldown_ms {
1246            if let Some(last) = state.last_execution {
1247                let elapsed = now.duration_since(last).as_millis() as u64;
1248                if elapsed < cooldown_ms {
1249                    return false;
1250                }
1251            }
1252        }
1253
1254        // Check rate limit
1255        if let Some(ref limit) = rule.rate_limit {
1256            let window_duration = Duration::from_secs(limit.window_secs as u64);
1257            if now.duration_since(state.window_start) < window_duration
1258                && state.execution_count >= limit.max_executions
1259            {
1260                return false;
1261            }
1262        }
1263
1264        true
1265    }
1266
1267    // Record an execution
1268    fn record_execution(&mut self, rule_id: &str) {
1269        let now = Instant::now();
1270
1271        // Pre-fix this incremented `execution_count`
1272        // unconditionally, even for rules without a rate_limit.
1273        // A rule that toggled rate-limited → unlimited carried
1274        // its old count forever; on toggle BACK to rate-limited
1275        // with the same window, the count was already at-or-
1276        // above max and every execution was immediately blocked
1277        // — silent stuck-rule on hot reload.
1278        //
1279        // Fix: only touch rate-limit-specific state when the
1280        // current rule actually has a rate_limit. The
1281        // last_execution timestamp is independently used by
1282        // cooldown-based gating, so it advances regardless.
1283        let has_rate_limit = self
1284            .rules_by_id
1285            .get(rule_id)
1286            .and_then(|r| r.rate_limit.as_ref())
1287            .is_some();
1288
1289        let state = self.execution_state.entry(rule_id.to_string()).or_default();
1290        state.last_execution = Some(now);
1291
1292        if has_rate_limit {
1293            state.execution_count += 1;
1294            // Reset window if needed.
1295            if let Some(rule) = self.rules_by_id.get(rule_id) {
1296                if let Some(ref limit) = rule.rate_limit {
1297                    let window_duration = Duration::from_secs(limit.window_secs as u64);
1298                    if now.duration_since(state.window_start) >= window_duration {
1299                        state.window_start = now;
1300                        state.execution_count = 1;
1301                    }
1302                }
1303            }
1304        }
1305    }
1306}
1307
1308impl Default for RuleEngine {
1309    fn default() -> Self {
1310        Self::new()
1311    }
1312}
1313
1314/// Rule set for a node (collection of rules with metadata)
1315#[derive(Debug, Clone, PartialEq)]
1316pub struct RuleSet {
1317    /// Rule set ID
1318    pub id: String,
1319    /// Rule set name
1320    pub name: String,
1321    /// Description
1322    pub description: Option<String>,
1323    /// Rules in this set
1324    pub rules: Vec<Rule>,
1325    /// Version
1326    pub version: u64,
1327    /// Created timestamp
1328    pub created_at: u64,
1329    /// Updated timestamp
1330    pub updated_at: u64,
1331    /// Tags
1332    pub tags: Vec<String>,
1333}
1334
1335impl RuleSet {
1336    /// Create a new rule set
1337    pub fn new(id: impl Into<String>, name: impl Into<String>) -> Self {
1338        // See Rule::new for rationale on saturating
1339        // u128 → u64 instead of `as u64`.
1340        let now = u64::try_from(
1341            SystemTime::now()
1342                .duration_since(UNIX_EPOCH)
1343                .unwrap_or_default()
1344                .as_millis(),
1345        )
1346        .unwrap_or(u64::MAX);
1347
1348        Self {
1349            id: id.into(),
1350            name: name.into(),
1351            description: None,
1352            rules: Vec::new(),
1353            version: 1,
1354            created_at: now,
1355            updated_at: now,
1356            tags: Vec::new(),
1357        }
1358    }
1359
1360    /// Add a rule
1361    pub fn add_rule(mut self, rule: Rule) -> Self {
1362        self.rules.push(rule);
1363        self
1364    }
1365
1366    /// Set description
1367    pub fn with_description(mut self, desc: impl Into<String>) -> Self {
1368        self.description = Some(desc.into());
1369        self
1370    }
1371
1372    /// Add tag
1373    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
1374        self.tags.push(tag.into());
1375        self
1376    }
1377
1378    /// Load into a rule engine
1379    pub fn load_into(&self, engine: &mut RuleEngine) -> Result<(), RuleError> {
1380        for rule in &self.rules {
1381            engine.add_rule(rule.clone())?;
1382        }
1383        Ok(())
1384    }
1385}
1386
1387#[cfg(test)]
1388mod tests {
1389    use super::*;
1390
1391    #[test]
1392    fn test_compare_op() {
1393        assert!(CompareOp::Eq.evaluate(&serde_json::json!(5), &serde_json::json!(5)));
1394        assert!(!CompareOp::Eq.evaluate(&serde_json::json!(5), &serde_json::json!(10)));
1395
1396        assert!(CompareOp::Gt.evaluate(&serde_json::json!(10), &serde_json::json!(5)));
1397        assert!(!CompareOp::Gt.evaluate(&serde_json::json!(5), &serde_json::json!(10)));
1398
1399        assert!(CompareOp::Contains.evaluate(
1400            &serde_json::json!("hello world"),
1401            &serde_json::json!("world")
1402        ));
1403
1404        assert!(
1405            CompareOp::In.evaluate(&serde_json::json!("a"), &serde_json::json!(["a", "b", "c"]))
1406        );
1407    }
1408
1409    /// Pre-fix, both sides were reduced to f64 via
1410    /// `as_f64()`. Two adjacent u64 values above 2^53 round to
1411    /// the same f64 — `9_007_199_254_740_992` and
1412    /// `9_007_199_254_740_993` both become `9007199254740992.0`,
1413    /// so `Gt` incorrectly returned false (they compared Equal).
1414    /// Real-world impact: rules guarding ns timestamps or byte
1415    /// counts silently failed to fire.
1416    #[test]
1417    fn gt_compares_large_u64_without_loss_of_precision() {
1418        // 2^53 + 1: just past the f64 mantissa boundary.
1419        let small = serde_json::json!(9_007_199_254_740_992u64);
1420        let big = serde_json::json!(9_007_199_254_740_993u64);
1421
1422        // Sanity — pre-fix this would have failed (both round to
1423        // the same f64).
1424        assert!(
1425            CompareOp::Gt.evaluate(&big, &small),
1426            "Gt must distinguish u64 values one apart at the f64 boundary"
1427        );
1428        assert!(
1429            !CompareOp::Gt.evaluate(&small, &big),
1430            "Gt must NOT report the smaller value as greater"
1431        );
1432        assert!(
1433            CompareOp::Lt.evaluate(&small, &big),
1434            "Lt must distinguish at the f64 boundary"
1435        );
1436        assert!(
1437            !CompareOp::Eq.evaluate(&small, &big),
1438            "Eq must NOT collapse two distinct u64 values; \
1439             pre-fix these compared equal because both round to the same f64"
1440        );
1441    }
1442
1443    /// Very large u64 values (> i64::MAX) must still
1444    /// compare correctly even though as_i64() returns None for
1445    /// them.
1446    #[test]
1447    fn gt_compares_u64_values_above_i64_max() {
1448        let a = serde_json::json!(u64::MAX);
1449        let b = serde_json::json!(u64::MAX - 1);
1450        assert!(CompareOp::Gt.evaluate(&a, &b));
1451        assert!(CompareOp::Lt.evaluate(&b, &a));
1452    }
1453
1454    /// Comparing a negative i64 against a u64 above
1455    /// i64::MAX must always say "negative is less". Pre-fix the
1456    /// f64 fallback could happen to give a numerically correct
1457    /// answer here (negatives < positives in f64), but only by
1458    /// accident — the helper's contract is now explicit.
1459    #[test]
1460    fn compares_negative_i64_against_huge_u64_correctly() {
1461        let neg = serde_json::json!(-1i64);
1462        let huge = serde_json::json!(u64::MAX);
1463        assert!(CompareOp::Lt.evaluate(&neg, &huge));
1464        assert!(CompareOp::Gt.evaluate(&huge, &neg));
1465    }
1466
1467    /// Floats still work via the f64 fallback.
1468    #[test]
1469    fn float_comparisons_still_work_via_fallback() {
1470        let a = serde_json::json!(1.5);
1471        let b = serde_json::json!(2.5);
1472        assert!(CompareOp::Lt.evaluate(&a, &b));
1473        assert!(CompareOp::Gt.evaluate(&b, &a));
1474    }
1475
1476    /// Integer-vs-float comparison falls back
1477    /// to f64 (with the documented loss of precision for huge
1478    /// integers, which is unavoidable when one side is a float).
1479    #[test]
1480    fn integer_vs_float_uses_f64_fallback() {
1481        let i = serde_json::json!(5i64);
1482        let f = serde_json::json!(4.5);
1483        assert!(CompareOp::Gt.evaluate(&i, &f));
1484        assert!(CompareOp::Lt.evaluate(&f, &i));
1485    }
1486
1487    #[test]
1488    fn test_condition() {
1489        let mut ctx = RuleContext::new();
1490        ctx.set("cpu_usage", serde_json::json!(85));
1491        ctx.set("status", serde_json::json!("running"));
1492
1493        let cond1 = Condition::gt("cpu_usage", serde_json::json!(80));
1494        assert!(cond1.evaluate(&ctx));
1495
1496        let cond2 = Condition::eq("status", serde_json::json!("running"));
1497        assert!(cond2.evaluate(&ctx));
1498
1499        let cond3 = Condition::lt("cpu_usage", serde_json::json!(50));
1500        assert!(!cond3.evaluate(&ctx));
1501    }
1502
1503    #[test]
1504    fn test_condition_expr() {
1505        let mut ctx = RuleContext::new();
1506        ctx.set("cpu", serde_json::json!(85));
1507        ctx.set("memory", serde_json::json!(70));
1508
1509        // AND: both conditions must be true
1510        let expr_and = ConditionExpr::and(vec![
1511            ConditionExpr::single(Condition::gt("cpu", serde_json::json!(80))),
1512            ConditionExpr::single(Condition::gt("memory", serde_json::json!(60))),
1513        ]);
1514        assert!(expr_and.evaluate(&ctx));
1515
1516        // OR: at least one must be true
1517        let expr_or = ConditionExpr::or(vec![
1518            ConditionExpr::single(Condition::gt("cpu", serde_json::json!(90))),
1519            ConditionExpr::single(Condition::gt("memory", serde_json::json!(60))),
1520        ]);
1521        assert!(expr_or.evaluate(&ctx));
1522
1523        // NOT: negate condition
1524        let expr_not = ConditionExpr::negate(ConditionExpr::single(Condition::lt(
1525            "cpu",
1526            serde_json::json!(50),
1527        )));
1528        assert!(expr_not.evaluate(&ctx));
1529    }
1530
1531    #[test]
1532    fn test_nested_field_access() {
1533        let mut ctx = RuleContext::new();
1534        ctx.set(
1535            "metrics",
1536            serde_json::json!({
1537                "cpu": {"usage": 85, "cores": 4},
1538                "memory": {"used": 8192, "total": 16384}
1539            }),
1540        );
1541
1542        let cond = Condition::gt("metrics.cpu.usage", serde_json::json!(80));
1543        assert!(cond.evaluate(&ctx));
1544
1545        let cond2 = Condition::eq("metrics.cpu.cores", serde_json::json!(4));
1546        assert!(cond2.evaluate(&ctx));
1547    }
1548
1549    #[test]
1550    fn test_rule() {
1551        let rule = Rule::new("high-cpu", "High CPU Alert")
1552            .with_description("Alert when CPU usage is high")
1553            .with_priority(Priority::High)
1554            .with_condition(ConditionExpr::single(Condition::gt(
1555                "cpu",
1556                serde_json::json!(80),
1557            )))
1558            .with_action(Action::alert(
1559                AlertSeverity::High,
1560                "High CPU",
1561                "CPU usage exceeded 80%",
1562            ))
1563            .with_tag("monitoring")
1564            .with_cooldown(60000);
1565
1566        let mut ctx = RuleContext::new();
1567        ctx.set("cpu", serde_json::json!(85));
1568
1569        assert!(rule.matches(&ctx));
1570
1571        ctx.set("cpu", serde_json::json!(50));
1572        assert!(!rule.matches(&ctx));
1573    }
1574
1575    #[test]
1576    fn test_rule_engine() {
1577        let mut engine = RuleEngine::new();
1578
1579        // Add rules with different priorities
1580        engine
1581            .add_rule(
1582                Rule::new("rule-low", "Low Priority")
1583                    .with_priority(Priority::Low)
1584                    .with_condition(ConditionExpr::Always)
1585                    .with_action(Action::log(LogLevel::Info, "Low priority")),
1586            )
1587            .unwrap();
1588
1589        engine
1590            .add_rule(
1591                Rule::new("rule-high", "High Priority")
1592                    .with_priority(Priority::High)
1593                    .with_condition(ConditionExpr::Always)
1594                    .with_action(Action::log(LogLevel::Info, "High priority")),
1595            )
1596            .unwrap();
1597
1598        engine
1599            .add_rule(
1600                Rule::new("rule-normal", "Normal Priority")
1601                    .with_priority(Priority::Normal)
1602                    .with_condition(ConditionExpr::Always)
1603                    .with_action(Action::log(LogLevel::Info, "Normal priority")),
1604            )
1605            .unwrap();
1606
1607        // Rules should be sorted by priority
1608        let results = engine.evaluate(&RuleContext::new());
1609        assert_eq!(results.len(), 3);
1610        assert_eq!(results[0].rule_id, "rule-high");
1611        assert_eq!(results[1].rule_id, "rule-normal");
1612        assert_eq!(results[2].rule_id, "rule-low");
1613    }
1614
1615    #[test]
1616    fn test_stop_on_match() {
1617        let mut engine = RuleEngine::new();
1618
1619        engine
1620            .add_rule(
1621                Rule::new("stopper", "Stopper")
1622                    .with_priority(Priority::High)
1623                    .with_condition(ConditionExpr::Always)
1624                    .with_action(Action::Noop)
1625                    .stop_on_match(),
1626            )
1627            .unwrap();
1628
1629        engine
1630            .add_rule(
1631                Rule::new("after", "After")
1632                    .with_priority(Priority::Normal)
1633                    .with_condition(ConditionExpr::Always)
1634                    .with_action(Action::Noop),
1635            )
1636            .unwrap();
1637
1638        let results = engine.evaluate(&RuleContext::new());
1639        assert_eq!(results.len(), 1);
1640        assert_eq!(results[0].rule_id, "stopper");
1641    }
1642
1643    #[test]
1644    fn test_disabled_rule() {
1645        let mut engine = RuleEngine::new();
1646
1647        engine
1648            .add_rule(
1649                Rule::new("disabled", "Disabled Rule")
1650                    .with_condition(ConditionExpr::Always)
1651                    .disabled(),
1652            )
1653            .unwrap();
1654
1655        let results = engine.evaluate_matching(&RuleContext::new());
1656        assert!(results.is_empty());
1657    }
1658
1659    #[test]
1660    fn test_rules_by_tag() {
1661        let mut engine = RuleEngine::new();
1662
1663        engine
1664            .add_rule(
1665                Rule::new("r1", "Rule 1")
1666                    .with_tag("monitoring")
1667                    .with_tag("cpu"),
1668            )
1669            .unwrap();
1670
1671        engine
1672            .add_rule(Rule::new("r2", "Rule 2").with_tag("monitoring"))
1673            .unwrap();
1674
1675        engine
1676            .add_rule(Rule::new("r3", "Rule 3").with_tag("network"))
1677            .unwrap();
1678
1679        let monitoring_rules = engine.rules_by_tag("monitoring");
1680        assert_eq!(monitoring_rules.len(), 2);
1681
1682        let cpu_rules = engine.rules_by_tag("cpu");
1683        assert_eq!(cpu_rules.len(), 1);
1684    }
1685
1686    #[test]
1687    fn test_rule_set() {
1688        let rule_set = RuleSet::new("default", "Default Rules")
1689            .with_description("Default monitoring rules")
1690            .with_tag("production")
1691            .add_rule(
1692                Rule::new("r1", "Rule 1")
1693                    .with_condition(ConditionExpr::Always)
1694                    .with_action(Action::Noop),
1695            )
1696            .add_rule(
1697                Rule::new("r2", "Rule 2")
1698                    .with_condition(ConditionExpr::Always)
1699                    .with_action(Action::Noop),
1700            );
1701
1702        let mut engine = RuleEngine::new();
1703        rule_set.load_into(&mut engine).unwrap();
1704
1705        assert_eq!(engine.len(), 2);
1706    }
1707
1708    #[test]
1709    fn test_stats() {
1710        let mut engine = RuleEngine::new();
1711
1712        engine
1713            .add_rule(
1714                Rule::new("r1", "Rule 1")
1715                    .with_priority(Priority::High)
1716                    .with_tag("tag1"),
1717            )
1718            .unwrap();
1719
1720        engine
1721            .add_rule(
1722                Rule::new("r2", "Rule 2")
1723                    .with_priority(Priority::Normal)
1724                    .with_tag("tag1"),
1725            )
1726            .unwrap();
1727
1728        engine
1729            .add_rule(
1730                Rule::new("r3", "Rule 3")
1731                    .with_priority(Priority::Normal)
1732                    .disabled(),
1733            )
1734            .unwrap();
1735
1736        let stats = engine.stats();
1737        assert_eq!(stats.total_rules, 3);
1738        assert_eq!(stats.enabled_rules, 2);
1739        assert_eq!(stats.by_tag.get("tag1"), Some(&2));
1740    }
1741
1742    #[test]
1743    fn test_action_chain() {
1744        let action = Action::chain(vec![
1745            Action::log(LogLevel::Info, "First"),
1746            Action::log(LogLevel::Info, "Second"),
1747            Action::emit("test", serde_json::json!({})),
1748        ]);
1749
1750        assert_eq!(action.action_count(), 3);
1751    }
1752
1753    // ---------- Cooldown and rate-limit gating ----------
1754    //
1755    // Existing tests cover the happy path: rule matches, action
1756    // fires. None exercise the gating that prevents action
1757    // execution when a rule is on cooldown or has spent its
1758    // rate-limit budget. These branches are load-bearing for any
1759    // production rule that throttles itself (alert fatigue, retry
1760    // storms) — a regression would silently bypass the throttle.
1761
1762    #[test]
1763    fn cooldown_blocks_action_on_second_match_within_window() {
1764        let mut engine = RuleEngine::new();
1765        engine
1766            .add_rule(
1767                Rule::new("cd", "Cooldown rule")
1768                    .with_condition(ConditionExpr::Always)
1769                    .with_action(Action::Noop)
1770                    .with_cooldown(60_000), // 60s — won't elapse in-test
1771            )
1772            .unwrap();
1773
1774        // First evaluation executes the action.
1775        let r1 = engine.evaluate(&RuleContext::new());
1776        assert_eq!(r1.len(), 1);
1777        assert!(r1[0].matched);
1778        assert!(r1[0].action.is_some());
1779
1780        // Second evaluation matches but is on cooldown — action
1781        // must be None. Pre-fix any regression in
1782        // `check_execution_allowed` would let it fire again.
1783        let r2 = engine.evaluate(&RuleContext::new());
1784        assert_eq!(r2.len(), 1);
1785        assert!(r2[0].matched, "rule must still match while gated");
1786        assert!(
1787            r2[0].action.is_none(),
1788            "cooldown must suppress the action, got {:?}",
1789            r2[0].action,
1790        );
1791    }
1792
1793    #[test]
1794    fn rate_limit_blocks_action_after_max_executions() {
1795        let mut engine = RuleEngine::new();
1796        engine
1797            .add_rule(
1798                Rule::new("rl", "Rate-limited rule")
1799                    .with_condition(ConditionExpr::Always)
1800                    .with_action(Action::Noop)
1801                    .with_rate_limit(2, 300), // 2 per 5min window
1802            )
1803            .unwrap();
1804
1805        // First two evaluations consume the budget.
1806        assert!(engine.evaluate(&RuleContext::new())[0].action.is_some());
1807        assert!(engine.evaluate(&RuleContext::new())[0].action.is_some());
1808
1809        // Third matches but rate limit fires.
1810        let r3 = engine.evaluate(&RuleContext::new());
1811        assert!(r3[0].matched);
1812        assert!(
1813            r3[0].action.is_none(),
1814            "rate limit must suppress the action, got {:?}",
1815            r3[0].action,
1816        );
1817    }
1818
1819    /// Regression pin for the rate-limit-state hot-reload bug
1820    /// noted in `record_execution` (L1271-L1282): pre-fix the
1821    /// `execution_count` was incremented for every rule on every
1822    /// match, even rules without a rate_limit. Toggling a rule
1823    /// from rate-limited → unlimited → rate-limited (same window)
1824    /// would carry a stale count and silently block forever.
1825    ///
1826    /// We pin the fix by verifying that an unlimited rule's
1827    /// repeated matches never increment any rate-limit count
1828    /// (observable via continuing to fire actions).
1829    #[test]
1830    fn unlimited_rule_keeps_firing_across_many_evaluations() {
1831        let mut engine = RuleEngine::new();
1832        engine
1833            .add_rule(
1834                Rule::new("unl", "Unlimited rule")
1835                    .with_condition(ConditionExpr::Always)
1836                    .with_action(Action::Noop),
1837            )
1838            .unwrap();
1839
1840        for i in 0..50 {
1841            let r = engine.evaluate(&RuleContext::new());
1842            assert!(
1843                r[0].action.is_some(),
1844                "unlimited rule must keep firing; blocked at iteration {i}",
1845            );
1846        }
1847    }
1848
1849    // ---------- evaluate_first vs evaluate ----------
1850
1851    #[test]
1852    fn evaluate_first_returns_first_matching_rule_only() {
1853        let mut engine = RuleEngine::new();
1854        engine
1855            .add_rule(
1856                Rule::new("a", "A")
1857                    .with_priority(Priority::High)
1858                    .with_condition(ConditionExpr::Always)
1859                    .with_action(Action::Noop),
1860            )
1861            .unwrap();
1862        engine
1863            .add_rule(
1864                Rule::new("b", "B")
1865                    .with_priority(Priority::Normal)
1866                    .with_condition(ConditionExpr::Always)
1867                    .with_action(Action::Noop),
1868            )
1869            .unwrap();
1870
1871        // Both match but evaluate_first returns one result —
1872        // higher-priority rule wins under rules-sorted-by-priority
1873        // semantics.
1874        let r = engine.evaluate_first(&RuleContext::new());
1875        assert!(r.is_some());
1876        let r = r.unwrap();
1877        assert_eq!(r.rule_id, "a", "highest-priority rule must win");
1878        assert!(r.action.is_some());
1879    }
1880
1881    #[test]
1882    fn evaluate_first_action_is_none_when_rule_is_rate_limited() {
1883        let mut engine = RuleEngine::new();
1884        engine
1885            .add_rule(
1886                Rule::new("rl", "Rate-limited")
1887                    .with_condition(ConditionExpr::Always)
1888                    .with_action(Action::Noop)
1889                    .with_rate_limit(1, 300),
1890            )
1891            .unwrap();
1892
1893        assert!(engine
1894            .evaluate_first(&RuleContext::new())
1895            .unwrap()
1896            .action
1897            .is_some());
1898        let r = engine.evaluate_first(&RuleContext::new()).unwrap();
1899        assert!(r.matched);
1900        assert!(
1901            r.action.is_none(),
1902            "evaluate_first must respect rate-limit gating just like evaluate",
1903        );
1904    }
1905
1906    // ---------- Action constructor coverage ----------
1907
1908    #[test]
1909    fn action_factory_methods_round_trip() {
1910        match Action::set_context("k", serde_json::json!(1)) {
1911            Action::SetContext { key, value } => {
1912                assert_eq!(key, "k");
1913                assert_eq!(value, serde_json::json!(1));
1914            }
1915            other => panic!("expected SetContext, got {:?}", other),
1916        }
1917
1918        match Action::reject("policy denied", 403) {
1919            Action::Reject { reason, code } => {
1920                assert_eq!(reason, "policy denied");
1921                assert_eq!(code, 403);
1922            }
1923            other => panic!("expected Reject, got {:?}", other),
1924        }
1925
1926        match Action::redirect_to_tags(vec!["gpu".into(), "fast".into()]) {
1927            Action::Redirect {
1928                target_node,
1929                target_tags,
1930            } => {
1931                assert!(target_node.is_none());
1932                assert_eq!(target_tags, vec!["gpu".to_string(), "fast".to_string()]);
1933            }
1934            other => panic!("expected Redirect, got {:?}", other),
1935        }
1936    }
1937
1938    // ---------- RuleContext::from_value ----------
1939
1940    #[test]
1941    fn rule_context_from_value_loads_object_keys() {
1942        let ctx = RuleContext::from_value(serde_json::json!({
1943            "user": "alice",
1944            "count": 42,
1945        }));
1946        assert_eq!(ctx.get_field("user"), serde_json::json!("alice"));
1947        assert_eq!(ctx.get_field("count"), serde_json::json!(42));
1948
1949        // Non-object inputs produce an empty context (no panic).
1950        let ctx = RuleContext::from_value(serde_json::json!("not-an-object"));
1951        assert_eq!(ctx.get_field("anything"), serde_json::json!(null));
1952    }
1953}