1#[derive(Debug, Clone)]
3pub struct Event {
4 pub id: u64,
5 pub event_type: String,
6 pub payload: Vec<u8>,
7}
8
9impl Event {
10 pub fn decode(id: u64, bytes: &[u8]) -> crate::Result<Self> {
17 let s = std::str::from_utf8(bytes)
18 .map_err(|e| crate::BusError::InvalidState(format!("Invalid UTF-8 in event: {}", e)))?;
19
20 let colon_positions: Vec<usize> = s
22 .char_indices()
23 .filter(|(_, c)| *c == ':')
24 .map(|(i, _)| i)
25 .collect();
26
27 for &split_pos in colon_positions.iter().rev() {
29 let payload = &s[split_pos + 1..];
30
31 if serde_json::from_str::<serde_json::Value>(payload).is_ok() {
34 let event_type = &s[..split_pos];
35 return Ok(Self {
36 id,
37 event_type: event_type.to_string(),
38 payload: payload.as_bytes().to_vec(),
39 });
40 }
41 }
42
43 Ok(Self {
45 id,
46 event_type: s.to_string(),
47 payload: Vec::new(),
48 })
49 }
50}
51
52pub trait EventFilterTrait: Send + Sync {
54 fn matches(&self, event: &Event) -> bool;
56}
57
58#[derive(Clone)]
60pub enum EventFilter {
61 All,
63
64 Prefix(String),
66
67 Exact(String),
69
70 And(Box<EventFilter>, Box<EventFilter>),
72
73 Or(Box<EventFilter>, Box<EventFilter>),
75}
76
77impl EventFilter {
78 pub fn prefix(prefix: impl Into<String>) -> Self {
80 EventFilter::Prefix(prefix.into())
81 }
82
83 pub fn exact(event_type: impl Into<String>) -> Self {
85 EventFilter::Exact(event_type.into())
86 }
87
88 pub fn and(self, other: EventFilter) -> Self {
90 EventFilter::And(Box::new(self), Box::new(other))
91 }
92
93 pub fn or(self, other: EventFilter) -> Self {
95 EventFilter::Or(Box::new(self), Box::new(other))
96 }
97}
98
99impl EventFilterTrait for EventFilter {
100 fn matches(&self, event: &Event) -> bool {
101 match self {
102 EventFilter::All => true,
103 EventFilter::Prefix(prefix) => event.event_type.starts_with(prefix),
104 EventFilter::Exact(event_type) => &event.event_type == event_type,
105 EventFilter::And(f1, f2) => f1.matches(event) && f2.matches(event),
106 EventFilter::Or(f1, f2) => f1.matches(event) || f2.matches(event),
107 }
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114
115 fn test_event(event_type: &str) -> Event {
116 Event {
117 id: 1,
118 event_type: event_type.to_string(),
119 payload: vec![],
120 }
121 }
122
123 #[test]
124 fn test_prefix_filter() {
125 let filter = EventFilter::prefix("knowledge:");
126 assert!(filter.matches(&test_event("knowledge:doc_updated")));
127 assert!(!filter.matches(&test_event("bus:message")));
128 }
129
130 #[test]
131 fn test_exact_filter() {
132 let filter = EventFilter::exact("knowledge:doc_updated");
133 assert!(filter.matches(&test_event("knowledge:doc_updated")));
134 assert!(!filter.matches(&test_event("knowledge:doc_deleted")));
135 }
136
137 #[test]
138 fn test_and_filter() {
139 let filter = EventFilter::prefix("knowledge:").and(EventFilter::prefix("knowledge:doc_"));
140
141 assert!(filter.matches(&test_event("knowledge:doc_updated")));
142 assert!(!filter.matches(&test_event("knowledge:index_updated")));
143 }
144
145 #[test]
146 fn test_or_filter() {
147 let filter = EventFilter::prefix("knowledge:").or(EventFilter::prefix("bus:"));
148
149 assert!(filter.matches(&test_event("knowledge:doc_updated")));
150 assert!(filter.matches(&test_event("bus:message")));
151 assert!(!filter.matches(&test_event("session:started")));
152 }
153}