rust_rule_engine/streaming/
event.rs1use crate::types::Value;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct StreamEvent {
13 pub id: String,
15 pub event_type: String,
17 pub data: HashMap<String, Value>,
19 pub metadata: EventMetadata,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct EventMetadata {
26 pub timestamp: u64,
28 pub source: String,
30 pub sequence: u64,
32 pub tags: HashMap<String, String>,
34}
35
36impl StreamEvent {
37 pub fn new(
39 event_type: impl Into<String>,
40 data: HashMap<String, Value>,
41 source: impl Into<String>,
42 ) -> Self {
43 let timestamp = SystemTime::now()
44 .duration_since(UNIX_EPOCH)
45 .unwrap()
46 .as_millis() as u64;
47
48 Self {
49 id: format!("evt_{}", uuid_v4()),
50 event_type: event_type.into(),
51 data,
52 metadata: EventMetadata {
53 timestamp,
54 source: source.into(),
55 sequence: 0, tags: HashMap::new(),
57 },
58 }
59 }
60
61 pub fn with_timestamp(
63 event_type: impl Into<String>,
64 data: HashMap<String, Value>,
65 source: impl Into<String>,
66 timestamp: u64,
67 ) -> Self {
68 Self {
69 id: format!("evt_{}", uuid_v4()),
70 event_type: event_type.into(),
71 data,
72 metadata: EventMetadata {
73 timestamp,
74 source: source.into(),
75 sequence: 0,
76 tags: HashMap::new(),
77 },
78 }
79 }
80
81 pub fn age_ms(&self) -> u64 {
83 let now = SystemTime::now()
84 .duration_since(UNIX_EPOCH)
85 .unwrap()
86 .as_millis() as u64;
87
88 now.saturating_sub(self.metadata.timestamp)
89 }
90
91 pub fn matches_pattern(&self, pattern: &EventPattern) -> bool {
93 if let Some(ref expected_type) = pattern.event_type {
95 if &self.event_type != expected_type {
96 return false;
97 }
98 }
99
100 for (key, expected_value) in &pattern.required_fields {
102 if let Some(actual_value) = self.data.get(key) {
103 if actual_value != expected_value {
104 return false;
105 }
106 } else {
107 return false;
108 }
109 }
110
111 if let Some(ref expected_source) = pattern.source {
113 if &self.metadata.source != expected_source {
114 return false;
115 }
116 }
117
118 true
119 }
120
121 pub fn add_tag(&mut self, key: impl Into<String>, value: impl Into<String>) {
123 self.metadata.tags.insert(key.into(), value.into());
124 }
125
126 pub fn get_numeric(&self, field: &str) -> Option<f64> {
128 self.data.get(field).and_then(|v| match v {
129 Value::Number(n) => Some(*n),
130 Value::Integer(i) => Some(*i as f64),
131 _ => None,
132 })
133 }
134
135 pub fn get_string(&self, field: &str) -> Option<&str> {
137 self.data.get(field).and_then(|v| match v {
138 Value::String(s) => Some(s.as_str()),
139 _ => None,
140 })
141 }
142
143 pub fn get_boolean(&self, field: &str) -> Option<bool> {
145 self.data.get(field).and_then(|v| match v {
146 Value::Boolean(b) => Some(*b),
147 _ => None,
148 })
149 }
150}
151
152#[derive(Debug, Clone)]
154pub struct EventPattern {
155 pub event_type: Option<String>,
157 pub required_fields: HashMap<String, Value>,
159 pub source: Option<String>,
161}
162
163impl EventPattern {
164 pub fn new() -> Self {
166 Self {
167 event_type: None,
168 required_fields: HashMap::new(),
169 source: None,
170 }
171 }
172
173 pub fn with_event_type(mut self, event_type: impl Into<String>) -> Self {
175 self.event_type = Some(event_type.into());
176 self
177 }
178
179 pub fn with_field(mut self, key: impl Into<String>, value: Value) -> Self {
181 self.required_fields.insert(key.into(), value);
182 self
183 }
184
185 pub fn with_source(mut self, source: impl Into<String>) -> Self {
187 self.source = Some(source.into());
188 self
189 }
190}
191
192impl Default for EventPattern {
193 fn default() -> Self {
194 Self::new()
195 }
196}
197
198fn uuid_v4() -> String {
200 use std::time::{SystemTime, UNIX_EPOCH};
201
202 let timestamp = SystemTime::now()
203 .duration_since(UNIX_EPOCH)
204 .unwrap()
205 .as_nanos();
206
207 let random_part = fastrand::u64(..);
208
209 format!("{:x}-{:x}", timestamp, random_part)
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use crate::types::Value;
216
217 #[test]
218 fn test_stream_event_creation() {
219 let mut data = HashMap::new();
220 data.insert("price".to_string(), Value::Number(100.5));
221 data.insert("symbol".to_string(), Value::String("AAPL".to_string()));
222
223 let event = StreamEvent::new("TradeEvent", data, "trading_system");
224
225 assert_eq!(event.event_type, "TradeEvent");
226 assert_eq!(event.metadata.source, "trading_system");
227 assert!(event.id.starts_with("evt_"));
228 assert_eq!(event.get_numeric("price"), Some(100.5));
229 assert_eq!(event.get_string("symbol"), Some("AAPL"));
230 }
231
232 #[test]
233 fn test_event_pattern_matching() {
234 let mut data = HashMap::new();
235 data.insert("price".to_string(), Value::Number(100.5));
236 data.insert("symbol".to_string(), Value::String("AAPL".to_string()));
237
238 let event = StreamEvent::new("TradeEvent", data, "trading_system");
239
240 let pattern = EventPattern::new()
241 .with_event_type("TradeEvent")
242 .with_field("symbol", Value::String("AAPL".to_string()));
243
244 assert!(event.matches_pattern(&pattern));
245
246 let wrong_pattern = EventPattern::new()
247 .with_event_type("TradeEvent")
248 .with_field("symbol", Value::String("GOOGL".to_string()));
249
250 assert!(!event.matches_pattern(&wrong_pattern));
251 }
252
253 #[test]
254 fn test_event_age() {
255 let data = HashMap::new();
256 let event = StreamEvent::new("TestEvent", data, "test");
257
258 assert!(event.age_ms() < 100);
260 }
261}