Skip to main content

azoth_bus/
filter.rs

1/// Simple event representation for filtering
2#[derive(Debug, Clone)]
3pub struct Event {
4    pub id: u64,
5    pub event_type: String,
6    pub payload: Vec<u8>,
7}
8
9impl Event {
10    /// Decode from raw event bytes (format: "type:json_payload")
11    ///
12    /// The format from ctx.log is "event_type:json_payload" where json_payload
13    /// is the JSON-serialized form of the payload. Since event_type may contain
14    /// colons (e.g., "knowledge:fact_learned"), we need to find the LAST colon
15    /// that's followed by valid JSON by actually attempting to parse.
16    pub fn decode(id: u64, bytes: &[u8]) -> crate::Result<Self> {
17        let s = std::str::from_utf8(bytes)
18            .map_err(|e| crate::BusError::InvalidState(format!("Invalid UTF-8 in event: {}", e)))?;
19
20        // Find all colons and check which one precedes valid JSON
21        let colon_positions: Vec<usize> = s
22            .char_indices()
23            .filter(|(_, c)| *c == ':')
24            .map(|(i, _)| i)
25            .collect();
26
27        // Check colons from last to first to find the split point
28        for &split_pos in colon_positions.iter().rev() {
29            let payload = &s[split_pos + 1..];
30
31            // Try to parse as JSON to verify this is the correct split point
32            // We don't care about the actual parsed value, just that it's valid JSON
33            if serde_json::from_str::<serde_json::Value>(payload).is_ok() {
34                let event_type = &s[..split_pos];
35                return Ok(Self {
36                    id,
37                    event_type: event_type.to_string(),
38                    payload: payload.as_bytes().to_vec(),
39                });
40            }
41        }
42
43        // No colon found or no valid JSON after any colon - legacy format
44        Ok(Self {
45            id,
46            event_type: s.to_string(),
47            payload: Vec::new(),
48        })
49    }
50}
51
52/// Filter for events
53pub trait EventFilterTrait: Send + Sync {
54    /// Check if event matches filter
55    fn matches(&self, event: &Event) -> bool;
56}
57
58/// Event filter implementation
59#[derive(Clone)]
60pub enum EventFilter {
61    /// Match all events
62    All,
63
64    /// Match events with type prefix
65    Prefix(String),
66
67    /// Match exact event type
68    Exact(String),
69
70    /// Combine multiple filters with AND logic
71    And(Box<EventFilter>, Box<EventFilter>),
72
73    /// Combine multiple filters with OR logic
74    Or(Box<EventFilter>, Box<EventFilter>),
75}
76
77impl EventFilter {
78    /// Create a prefix filter
79    pub fn prefix(prefix: impl Into<String>) -> Self {
80        EventFilter::Prefix(prefix.into())
81    }
82
83    /// Create an exact match filter
84    pub fn exact(event_type: impl Into<String>) -> Self {
85        EventFilter::Exact(event_type.into())
86    }
87
88    /// Combine with another filter using AND logic
89    pub fn and(self, other: EventFilter) -> Self {
90        EventFilter::And(Box::new(self), Box::new(other))
91    }
92
93    /// Combine with another filter using OR logic
94    pub fn or(self, other: EventFilter) -> Self {
95        EventFilter::Or(Box::new(self), Box::new(other))
96    }
97}
98
99impl EventFilterTrait for EventFilter {
100    fn matches(&self, event: &Event) -> bool {
101        match self {
102            EventFilter::All => true,
103            EventFilter::Prefix(prefix) => event.event_type.starts_with(prefix),
104            EventFilter::Exact(event_type) => &event.event_type == event_type,
105            EventFilter::And(f1, f2) => f1.matches(event) && f2.matches(event),
106            EventFilter::Or(f1, f2) => f1.matches(event) || f2.matches(event),
107        }
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114
115    fn test_event(event_type: &str) -> Event {
116        Event {
117            id: 1,
118            event_type: event_type.to_string(),
119            payload: vec![],
120        }
121    }
122
123    #[test]
124    fn test_prefix_filter() {
125        let filter = EventFilter::prefix("knowledge:");
126        assert!(filter.matches(&test_event("knowledge:doc_updated")));
127        assert!(!filter.matches(&test_event("bus:message")));
128    }
129
130    #[test]
131    fn test_exact_filter() {
132        let filter = EventFilter::exact("knowledge:doc_updated");
133        assert!(filter.matches(&test_event("knowledge:doc_updated")));
134        assert!(!filter.matches(&test_event("knowledge:doc_deleted")));
135    }
136
137    #[test]
138    fn test_and_filter() {
139        let filter = EventFilter::prefix("knowledge:").and(EventFilter::prefix("knowledge:doc_"));
140
141        assert!(filter.matches(&test_event("knowledge:doc_updated")));
142        assert!(!filter.matches(&test_event("knowledge:index_updated")));
143    }
144
145    #[test]
146    fn test_or_filter() {
147        let filter = EventFilter::prefix("knowledge:").or(EventFilter::prefix("bus:"));
148
149        assert!(filter.matches(&test_event("knowledge:doc_updated")));
150        assert!(filter.matches(&test_event("bus:message")));
151        assert!(!filter.matches(&test_event("session:started")));
152    }
153}