rust_rule_engine/streaming/
event.rs

1//! Stream Event Types and Metadata
2//!
3//! Core data structures for representing events in the streaming rule engine.
4
5use crate::types::Value;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10/// A streaming event with payload and metadata
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct StreamEvent {
13    /// Unique event identifier
14    pub id: String,
15    /// Event type/category
16    pub event_type: String,
17    /// Event payload data
18    pub data: HashMap<String, Value>,
19    /// Event metadata
20    pub metadata: EventMetadata,
21}
22
23/// Event metadata for tracking and processing
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct EventMetadata {
26    /// When the event occurred (milliseconds since epoch)
27    pub timestamp: u64,
28    /// Event source identifier
29    pub source: String,
30    /// Event sequence number
31    pub sequence: u64,
32    /// Processing hints and tags
33    pub tags: HashMap<String, String>,
34}
35
36impl StreamEvent {
37    /// Create a new stream event
38    pub fn new(
39        event_type: impl Into<String>,
40        data: HashMap<String, Value>,
41        source: impl Into<String>,
42    ) -> Self {
43        let timestamp = SystemTime::now()
44            .duration_since(UNIX_EPOCH)
45            .unwrap()
46            .as_millis() as u64;
47
48        Self {
49            id: format!("evt_{}", uuid_v4()),
50            event_type: event_type.into(),
51            data,
52            metadata: EventMetadata {
53                timestamp,
54                source: source.into(),
55                sequence: 0, // Will be set by stream processor
56                tags: HashMap::new(),
57            },
58        }
59    }
60
61    /// Create event with specific timestamp
62    pub fn with_timestamp(
63        event_type: impl Into<String>,
64        data: HashMap<String, Value>,
65        source: impl Into<String>,
66        timestamp: u64,
67    ) -> Self {
68        Self {
69            id: format!("evt_{}", uuid_v4()),
70            event_type: event_type.into(),
71            data,
72            metadata: EventMetadata {
73                timestamp,
74                source: source.into(),
75                sequence: 0,
76                tags: HashMap::new(),
77            },
78        }
79    }
80
81    /// Get event age in milliseconds
82    pub fn age_ms(&self) -> u64 {
83        let now = SystemTime::now()
84            .duration_since(UNIX_EPOCH)
85            .unwrap()
86            .as_millis() as u64;
87
88        now.saturating_sub(self.metadata.timestamp)
89    }
90
91    /// Check if event matches a pattern
92    pub fn matches_pattern(&self, pattern: &EventPattern) -> bool {
93        // Check event type
94        if let Some(ref expected_type) = pattern.event_type {
95            if &self.event_type != expected_type {
96                return false;
97            }
98        }
99
100        // Check data fields
101        for (key, expected_value) in &pattern.required_fields {
102            if let Some(actual_value) = self.data.get(key) {
103                if actual_value != expected_value {
104                    return false;
105                }
106            } else {
107                return false;
108            }
109        }
110
111        // Check source
112        if let Some(ref expected_source) = pattern.source {
113            if &self.metadata.source != expected_source {
114                return false;
115            }
116        }
117
118        true
119    }
120
121    /// Add tag to event metadata
122    pub fn add_tag(&mut self, key: impl Into<String>, value: impl Into<String>) {
123        self.metadata.tags.insert(key.into(), value.into());
124    }
125
126    /// Get numeric value from event data
127    pub fn get_numeric(&self, field: &str) -> Option<f64> {
128        self.data.get(field).and_then(|v| match v {
129            Value::Number(n) => Some(*n),
130            Value::Integer(i) => Some(*i as f64),
131            _ => None,
132        })
133    }
134
135    /// Get string value from event data
136    pub fn get_string(&self, field: &str) -> Option<&str> {
137        self.data.get(field).and_then(|v| match v {
138            Value::String(s) => Some(s.as_str()),
139            _ => None,
140        })
141    }
142
143    /// Get boolean value from event data
144    pub fn get_boolean(&self, field: &str) -> Option<bool> {
145        self.data.get(field).and_then(|v| match v {
146            Value::Boolean(b) => Some(*b),
147            _ => None,
148        })
149    }
150}
151
152/// Pattern for matching events
153#[derive(Debug, Clone)]
154pub struct EventPattern {
155    /// Expected event type (optional)
156    pub event_type: Option<String>,
157    /// Required data fields with expected values
158    pub required_fields: HashMap<String, Value>,
159    /// Expected source (optional)
160    pub source: Option<String>,
161}
162
163impl EventPattern {
164    /// Create a new event pattern
165    pub fn new() -> Self {
166        Self {
167            event_type: None,
168            required_fields: HashMap::new(),
169            source: None,
170        }
171    }
172
173    /// Set expected event type
174    pub fn with_event_type(mut self, event_type: impl Into<String>) -> Self {
175        self.event_type = Some(event_type.into());
176        self
177    }
178
179    /// Add required field
180    pub fn with_field(mut self, key: impl Into<String>, value: Value) -> Self {
181        self.required_fields.insert(key.into(), value);
182        self
183    }
184
185    /// Set expected source
186    pub fn with_source(mut self, source: impl Into<String>) -> Self {
187        self.source = Some(source.into());
188        self
189    }
190}
191
192impl Default for EventPattern {
193    fn default() -> Self {
194        Self::new()
195    }
196}
197
198// Simple UUID v4 generator (basic implementation)
199fn uuid_v4() -> String {
200    use std::time::{SystemTime, UNIX_EPOCH};
201
202    let timestamp = SystemTime::now()
203        .duration_since(UNIX_EPOCH)
204        .unwrap()
205        .as_nanos();
206
207    let random_part = fastrand::u64(..);
208
209    format!("{:x}-{:x}", timestamp, random_part)
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use crate::types::Value;
216
217    #[test]
218    fn test_stream_event_creation() {
219        let mut data = HashMap::new();
220        data.insert("price".to_string(), Value::Number(100.5));
221        data.insert("symbol".to_string(), Value::String("AAPL".to_string()));
222
223        let event = StreamEvent::new("TradeEvent", data, "trading_system");
224
225        assert_eq!(event.event_type, "TradeEvent");
226        assert_eq!(event.metadata.source, "trading_system");
227        assert!(event.id.starts_with("evt_"));
228        assert_eq!(event.get_numeric("price"), Some(100.5));
229        assert_eq!(event.get_string("symbol"), Some("AAPL"));
230    }
231
232    #[test]
233    fn test_event_pattern_matching() {
234        let mut data = HashMap::new();
235        data.insert("price".to_string(), Value::Number(100.5));
236        data.insert("symbol".to_string(), Value::String("AAPL".to_string()));
237
238        let event = StreamEvent::new("TradeEvent", data, "trading_system");
239
240        let pattern = EventPattern::new()
241            .with_event_type("TradeEvent")
242            .with_field("symbol", Value::String("AAPL".to_string()));
243
244        assert!(event.matches_pattern(&pattern));
245
246        let wrong_pattern = EventPattern::new()
247            .with_event_type("TradeEvent")
248            .with_field("symbol", Value::String("GOOGL".to_string()));
249
250        assert!(!event.matches_pattern(&wrong_pattern));
251    }
252
253    #[test]
254    fn test_event_age() {
255        let data = HashMap::new();
256        let event = StreamEvent::new("TestEvent", data, "test");
257
258        // Age should be very small for a just-created event
259        assert!(event.age_ms() < 100);
260    }
261}