Skip to main content

aegis_streaming/
event.rs

1//! Aegis Streaming Events
2//!
3//! Core event types for the streaming system.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12// =============================================================================
13// Event ID
14// =============================================================================
15
16/// Unique identifier for an event.
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct EventId(pub String);
19
20impl EventId {
21    pub fn new(id: impl Into<String>) -> Self {
22        Self(id.into())
23    }
24
25    pub fn generate() -> Self {
26        let timestamp = SystemTime::now()
27            .duration_since(UNIX_EPOCH)
28            .unwrap_or_default()
29            .as_nanos();
30        Self(format!("evt_{:032x}", timestamp))
31    }
32
33    pub fn as_str(&self) -> &str {
34        &self.0
35    }
36}
37
38impl std::fmt::Display for EventId {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(f, "{}", self.0)
41    }
42}
43
44impl From<String> for EventId {
45    fn from(s: String) -> Self {
46        Self(s)
47    }
48}
49
50impl From<&str> for EventId {
51    fn from(s: &str) -> Self {
52        Self(s.to_string())
53    }
54}
55
56// =============================================================================
57// Event Type
58// =============================================================================
59
60/// Type of event.
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub enum EventType {
63    /// Data was created.
64    Created,
65    /// Data was updated.
66    Updated,
67    /// Data was deleted.
68    Deleted,
69    /// Custom event type.
70    Custom(String),
71}
72
73impl EventType {
74    pub fn custom(name: impl Into<String>) -> Self {
75        Self::Custom(name.into())
76    }
77
78    pub fn as_str(&self) -> &str {
79        match self {
80            Self::Created => "created",
81            Self::Updated => "updated",
82            Self::Deleted => "deleted",
83            Self::Custom(s) => s,
84        }
85    }
86}
87
88impl Default for EventType {
89    fn default() -> Self {
90        Self::Custom("unknown".to_string())
91    }
92}
93
94// =============================================================================
95// Event
96// =============================================================================
97
98/// An event in the streaming system.
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct Event {
101    pub id: EventId,
102    pub event_type: EventType,
103    pub source: String,
104    pub timestamp: u64,
105    pub data: EventData,
106    pub metadata: HashMap<String, String>,
107}
108
109impl Event {
110    /// Create a new event.
111    pub fn new(event_type: EventType, source: impl Into<String>, data: EventData) -> Self {
112        Self {
113            id: EventId::generate(),
114            event_type,
115            source: source.into(),
116            timestamp: current_timestamp_millis(),
117            data,
118            metadata: HashMap::new(),
119        }
120    }
121
122    /// Create an event with a specific ID.
123    pub fn with_id(
124        id: EventId,
125        event_type: EventType,
126        source: impl Into<String>,
127        data: EventData,
128    ) -> Self {
129        Self {
130            id,
131            event_type,
132            source: source.into(),
133            timestamp: current_timestamp_millis(),
134            data,
135            metadata: HashMap::new(),
136        }
137    }
138
139    /// Add metadata to the event.
140    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
141        self.metadata.insert(key.into(), value.into());
142        self
143    }
144
145    /// Get metadata value.
146    pub fn get_metadata(&self, key: &str) -> Option<&String> {
147        self.metadata.get(key)
148    }
149
150    /// Check if the event matches a filter.
151    pub fn matches(&self, filter: &EventFilter) -> bool {
152        if let Some(ref event_type) = filter.event_type {
153            if &self.event_type != event_type {
154                return false;
155            }
156        }
157
158        if let Some(ref source) = filter.source {
159            if !self.source.starts_with(source) {
160                return false;
161            }
162        }
163
164        if let Some(after) = filter.after_timestamp {
165            if self.timestamp <= after {
166                return false;
167            }
168        }
169
170        true
171    }
172}
173
174// =============================================================================
175// Event Data
176// =============================================================================
177
178/// Data payload for an event.
179#[derive(Debug, Clone, Serialize, Deserialize)]
180#[serde(untagged)]
181#[derive(Default)]
182pub enum EventData {
183    #[default]
184    Null,
185    Bool(bool),
186    Int(i64),
187    Float(f64),
188    String(String),
189    Bytes(Vec<u8>),
190    Json(serde_json::Value),
191}
192
193impl EventData {
194    pub fn is_null(&self) -> bool {
195        matches!(self, Self::Null)
196    }
197
198    pub fn as_str(&self) -> Option<&str> {
199        match self {
200            Self::String(s) => Some(s),
201            _ => None,
202        }
203    }
204
205    pub fn as_i64(&self) -> Option<i64> {
206        match self {
207            Self::Int(n) => Some(*n),
208            _ => None,
209        }
210    }
211
212    pub fn as_json(&self) -> Option<&serde_json::Value> {
213        match self {
214            Self::Json(v) => Some(v),
215            _ => None,
216        }
217    }
218}
219
220impl From<String> for EventData {
221    fn from(s: String) -> Self {
222        Self::String(s)
223    }
224}
225
226impl From<&str> for EventData {
227    fn from(s: &str) -> Self {
228        Self::String(s.to_string())
229    }
230}
231
232impl From<i64> for EventData {
233    fn from(n: i64) -> Self {
234        Self::Int(n)
235    }
236}
237
238impl From<f64> for EventData {
239    fn from(f: f64) -> Self {
240        Self::Float(f)
241    }
242}
243
244impl From<bool> for EventData {
245    fn from(b: bool) -> Self {
246        Self::Bool(b)
247    }
248}
249
250impl From<serde_json::Value> for EventData {
251    fn from(v: serde_json::Value) -> Self {
252        Self::Json(v)
253    }
254}
255
256impl From<Vec<u8>> for EventData {
257    fn from(bytes: Vec<u8>) -> Self {
258        Self::Bytes(bytes)
259    }
260}
261
262// =============================================================================
263// Event Filter
264// =============================================================================
265
266/// Filter for events.
267#[derive(Debug, Clone, Default, Serialize, Deserialize)]
268pub struct EventFilter {
269    pub event_type: Option<EventType>,
270    pub source: Option<String>,
271    pub after_timestamp: Option<u64>,
272}
273
274impl EventFilter {
275    pub fn new() -> Self {
276        Self::default()
277    }
278
279    pub fn with_type(mut self, event_type: EventType) -> Self {
280        self.event_type = Some(event_type);
281        self
282    }
283
284    pub fn with_source(mut self, source: impl Into<String>) -> Self {
285        self.source = Some(source.into());
286        self
287    }
288
289    pub fn after(mut self, timestamp: u64) -> Self {
290        self.after_timestamp = Some(timestamp);
291        self
292    }
293}
294
295// =============================================================================
296// Event Batch
297// =============================================================================
298
299/// A batch of events.
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct EventBatch {
302    pub events: Vec<Event>,
303    pub batch_id: String,
304    pub timestamp: u64,
305}
306
307impl EventBatch {
308    pub fn new(events: Vec<Event>) -> Self {
309        let batch_id = format!("batch_{}", current_timestamp_millis());
310        Self {
311            events,
312            batch_id,
313            timestamp: current_timestamp_millis(),
314        }
315    }
316
317    pub fn len(&self) -> usize {
318        self.events.len()
319    }
320
321    pub fn is_empty(&self) -> bool {
322        self.events.is_empty()
323    }
324}
325
326fn current_timestamp_millis() -> u64 {
327    SystemTime::now()
328        .duration_since(UNIX_EPOCH)
329        .map(|d| d.as_millis() as u64)
330        .unwrap_or(0)
331}
332
333// =============================================================================
334// Tests
335// =============================================================================
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340
341    #[test]
342    fn test_event_id() {
343        let id1 = EventId::generate();
344        let id2 = EventId::generate();
345        assert_ne!(id1, id2);
346        assert!(id1.as_str().starts_with("evt_"));
347    }
348
349    #[test]
350    fn test_event_creation() {
351        let event = Event::new(
352            EventType::Created,
353            "users",
354            EventData::String("test data".to_string()),
355        );
356
357        assert_eq!(event.event_type, EventType::Created);
358        assert_eq!(event.source, "users");
359        assert!(event.timestamp > 0);
360    }
361
362    #[test]
363    fn test_event_metadata() {
364        let event = Event::new(EventType::Updated, "orders", EventData::Null)
365            .with_metadata("user_id", "123")
366            .with_metadata("action", "update");
367
368        assert_eq!(event.get_metadata("user_id"), Some(&"123".to_string()));
369        assert_eq!(event.get_metadata("action"), Some(&"update".to_string()));
370    }
371
372    #[test]
373    fn test_event_filter() {
374        let event = Event::new(EventType::Created, "users.profile", EventData::Null);
375
376        let filter = EventFilter::new().with_type(EventType::Created);
377        assert!(event.matches(&filter));
378
379        let filter = EventFilter::new().with_source("users");
380        assert!(event.matches(&filter));
381
382        let filter = EventFilter::new().with_type(EventType::Deleted);
383        assert!(!event.matches(&filter));
384    }
385
386    #[test]
387    fn test_event_data() {
388        let data = EventData::String("hello".to_string());
389        assert_eq!(data.as_str(), Some("hello"));
390
391        let data = EventData::Int(42);
392        assert_eq!(data.as_i64(), Some(42));
393
394        let data: EventData = "test".into();
395        assert_eq!(data.as_str(), Some("test"));
396    }
397}