Skip to main content

aegis_streaming/
event.rs

1//! Aegis Streaming Events
2//!
3//! Core event types for the streaming system.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12// =============================================================================
13// Event ID
14// =============================================================================
15
16/// Unique identifier for an event.
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct EventId(pub String);
19
20impl EventId {
21    pub fn new(id: impl Into<String>) -> Self {
22        Self(id.into())
23    }
24
25    pub fn generate() -> Self {
26        let timestamp = SystemTime::now()
27            .duration_since(UNIX_EPOCH)
28            .unwrap_or_default()
29            .as_nanos();
30        Self(format!("evt_{:032x}", timestamp))
31    }
32
33    pub fn as_str(&self) -> &str {
34        &self.0
35    }
36}
37
38impl std::fmt::Display for EventId {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(f, "{}", self.0)
41    }
42}
43
44impl From<String> for EventId {
45    fn from(s: String) -> Self {
46        Self(s)
47    }
48}
49
50impl From<&str> for EventId {
51    fn from(s: &str) -> Self {
52        Self(s.to_string())
53    }
54}
55
56// =============================================================================
57// Event Type
58// =============================================================================
59
60/// Type of event.
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub enum EventType {
63    /// Data was created.
64    Created,
65    /// Data was updated.
66    Updated,
67    /// Data was deleted.
68    Deleted,
69    /// Custom event type.
70    Custom(String),
71}
72
73impl EventType {
74    pub fn custom(name: impl Into<String>) -> Self {
75        Self::Custom(name.into())
76    }
77
78    pub fn as_str(&self) -> &str {
79        match self {
80            Self::Created => "created",
81            Self::Updated => "updated",
82            Self::Deleted => "deleted",
83            Self::Custom(s) => s,
84        }
85    }
86}
87
88impl Default for EventType {
89    fn default() -> Self {
90        Self::Custom("unknown".to_string())
91    }
92}
93
94// =============================================================================
95// Event
96// =============================================================================
97
98/// An event in the streaming system.
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct Event {
101    pub id: EventId,
102    pub event_type: EventType,
103    pub source: String,
104    pub timestamp: u64,
105    pub data: EventData,
106    pub metadata: HashMap<String, String>,
107}
108
109impl Event {
110    /// Create a new event.
111    pub fn new(event_type: EventType, source: impl Into<String>, data: EventData) -> Self {
112        Self {
113            id: EventId::generate(),
114            event_type,
115            source: source.into(),
116            timestamp: current_timestamp_millis(),
117            data,
118            metadata: HashMap::new(),
119        }
120    }
121
122    /// Create an event with a specific ID.
123    pub fn with_id(
124        id: EventId,
125        event_type: EventType,
126        source: impl Into<String>,
127        data: EventData,
128    ) -> Self {
129        Self {
130            id,
131            event_type,
132            source: source.into(),
133            timestamp: current_timestamp_millis(),
134            data,
135            metadata: HashMap::new(),
136        }
137    }
138
139    /// Add metadata to the event.
140    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
141        self.metadata.insert(key.into(), value.into());
142        self
143    }
144
145    /// Get metadata value.
146    pub fn get_metadata(&self, key: &str) -> Option<&String> {
147        self.metadata.get(key)
148    }
149
150    /// Check if the event matches a filter.
151    pub fn matches(&self, filter: &EventFilter) -> bool {
152        if let Some(ref event_type) = filter.event_type {
153            if &self.event_type != event_type {
154                return false;
155            }
156        }
157
158        if let Some(ref source) = filter.source {
159            if !self.source.starts_with(source) {
160                return false;
161            }
162        }
163
164        if let Some(after) = filter.after_timestamp {
165            if self.timestamp <= after {
166                return false;
167            }
168        }
169
170        true
171    }
172}
173
174// =============================================================================
175// Event Data
176// =============================================================================
177
178/// Data payload for an event.
179#[derive(Debug, Clone, Serialize, Deserialize)]
180#[serde(untagged)]
181#[derive(Default)]
182pub enum EventData {
183    #[default]
184    Null,
185    Bool(bool),
186    Int(i64),
187    Float(f64),
188    String(String),
189    Bytes(Vec<u8>),
190    Json(serde_json::Value),
191}
192
193impl EventData {
194    pub fn is_null(&self) -> bool {
195        matches!(self, Self::Null)
196    }
197
198    pub fn as_str(&self) -> Option<&str> {
199        match self {
200            Self::String(s) => Some(s),
201            _ => None,
202        }
203    }
204
205    pub fn as_i64(&self) -> Option<i64> {
206        match self {
207            Self::Int(n) => Some(*n),
208            _ => None,
209        }
210    }
211
212    pub fn as_json(&self) -> Option<&serde_json::Value> {
213        match self {
214            Self::Json(v) => Some(v),
215            _ => None,
216        }
217    }
218}
219
220
221impl From<String> for EventData {
222    fn from(s: String) -> Self {
223        Self::String(s)
224    }
225}
226
227impl From<&str> for EventData {
228    fn from(s: &str) -> Self {
229        Self::String(s.to_string())
230    }
231}
232
233impl From<i64> for EventData {
234    fn from(n: i64) -> Self {
235        Self::Int(n)
236    }
237}
238
239impl From<f64> for EventData {
240    fn from(f: f64) -> Self {
241        Self::Float(f)
242    }
243}
244
245impl From<bool> for EventData {
246    fn from(b: bool) -> Self {
247        Self::Bool(b)
248    }
249}
250
251impl From<serde_json::Value> for EventData {
252    fn from(v: serde_json::Value) -> Self {
253        Self::Json(v)
254    }
255}
256
257impl From<Vec<u8>> for EventData {
258    fn from(bytes: Vec<u8>) -> Self {
259        Self::Bytes(bytes)
260    }
261}
262
263// =============================================================================
264// Event Filter
265// =============================================================================
266
267/// Filter for events.
268#[derive(Debug, Clone, Default, Serialize, Deserialize)]
269pub struct EventFilter {
270    pub event_type: Option<EventType>,
271    pub source: Option<String>,
272    pub after_timestamp: Option<u64>,
273}
274
275impl EventFilter {
276    pub fn new() -> Self {
277        Self::default()
278    }
279
280    pub fn with_type(mut self, event_type: EventType) -> Self {
281        self.event_type = Some(event_type);
282        self
283    }
284
285    pub fn with_source(mut self, source: impl Into<String>) -> Self {
286        self.source = Some(source.into());
287        self
288    }
289
290    pub fn after(mut self, timestamp: u64) -> Self {
291        self.after_timestamp = Some(timestamp);
292        self
293    }
294}
295
296// =============================================================================
297// Event Batch
298// =============================================================================
299
300/// A batch of events.
301#[derive(Debug, Clone, Serialize, Deserialize)]
302pub struct EventBatch {
303    pub events: Vec<Event>,
304    pub batch_id: String,
305    pub timestamp: u64,
306}
307
308impl EventBatch {
309    pub fn new(events: Vec<Event>) -> Self {
310        let batch_id = format!("batch_{}", current_timestamp_millis());
311        Self {
312            events,
313            batch_id,
314            timestamp: current_timestamp_millis(),
315        }
316    }
317
318    pub fn len(&self) -> usize {
319        self.events.len()
320    }
321
322    pub fn is_empty(&self) -> bool {
323        self.events.is_empty()
324    }
325}
326
327fn current_timestamp_millis() -> u64 {
328    SystemTime::now()
329        .duration_since(UNIX_EPOCH)
330        .map(|d| d.as_millis() as u64)
331        .unwrap_or(0)
332}
333
334// =============================================================================
335// Tests
336// =============================================================================
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341
342    #[test]
343    fn test_event_id() {
344        let id1 = EventId::generate();
345        let id2 = EventId::generate();
346        assert_ne!(id1, id2);
347        assert!(id1.as_str().starts_with("evt_"));
348    }
349
350    #[test]
351    fn test_event_creation() {
352        let event = Event::new(
353            EventType::Created,
354            "users",
355            EventData::String("test data".to_string()),
356        );
357
358        assert_eq!(event.event_type, EventType::Created);
359        assert_eq!(event.source, "users");
360        assert!(event.timestamp > 0);
361    }
362
363    #[test]
364    fn test_event_metadata() {
365        let event = Event::new(EventType::Updated, "orders", EventData::Null)
366            .with_metadata("user_id", "123")
367            .with_metadata("action", "update");
368
369        assert_eq!(event.get_metadata("user_id"), Some(&"123".to_string()));
370        assert_eq!(event.get_metadata("action"), Some(&"update".to_string()));
371    }
372
373    #[test]
374    fn test_event_filter() {
375        let event = Event::new(EventType::Created, "users.profile", EventData::Null);
376
377        let filter = EventFilter::new().with_type(EventType::Created);
378        assert!(event.matches(&filter));
379
380        let filter = EventFilter::new().with_source("users");
381        assert!(event.matches(&filter));
382
383        let filter = EventFilter::new().with_type(EventType::Deleted);
384        assert!(!event.matches(&filter));
385    }
386
387    #[test]
388    fn test_event_data() {
389        let data = EventData::String("hello".to_string());
390        assert_eq!(data.as_str(), Some("hello"));
391
392        let data = EventData::Int(42);
393        assert_eq!(data.as_i64(), Some(42));
394
395        let data: EventData = "test".into();
396        assert_eq!(data.as_str(), Some("test"));
397    }
398}