aegis_streaming/
event.rs

1//! Aegis Streaming Events
2//!
3//! Core event types for the streaming system.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12// =============================================================================
13// Event ID
14// =============================================================================
15
16/// Unique identifier for an event.
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct EventId(pub String);
19
20impl EventId {
21    pub fn new(id: impl Into<String>) -> Self {
22        Self(id.into())
23    }
24
25    pub fn generate() -> Self {
26        let timestamp = SystemTime::now()
27            .duration_since(UNIX_EPOCH)
28            .unwrap_or_default()
29            .as_nanos();
30        Self(format!("evt_{:032x}", timestamp))
31    }
32
33    pub fn as_str(&self) -> &str {
34        &self.0
35    }
36}
37
38impl std::fmt::Display for EventId {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        write!(f, "{}", self.0)
41    }
42}
43
44impl From<String> for EventId {
45    fn from(s: String) -> Self {
46        Self(s)
47    }
48}
49
50impl From<&str> for EventId {
51    fn from(s: &str) -> Self {
52        Self(s.to_string())
53    }
54}
55
56// =============================================================================
57// Event Type
58// =============================================================================
59
60/// Type of event.
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub enum EventType {
63    /// Data was created.
64    Created,
65    /// Data was updated.
66    Updated,
67    /// Data was deleted.
68    Deleted,
69    /// Custom event type.
70    Custom(String),
71}
72
73impl EventType {
74    pub fn custom(name: impl Into<String>) -> Self {
75        Self::Custom(name.into())
76    }
77
78    pub fn as_str(&self) -> &str {
79        match self {
80            Self::Created => "created",
81            Self::Updated => "updated",
82            Self::Deleted => "deleted",
83            Self::Custom(s) => s,
84        }
85    }
86}
87
88impl Default for EventType {
89    fn default() -> Self {
90        Self::Custom("unknown".to_string())
91    }
92}
93
94// =============================================================================
95// Event
96// =============================================================================
97
98/// An event in the streaming system.
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct Event {
101    pub id: EventId,
102    pub event_type: EventType,
103    pub source: String,
104    pub timestamp: u64,
105    pub data: EventData,
106    pub metadata: HashMap<String, String>,
107}
108
109impl Event {
110    /// Create a new event.
111    pub fn new(event_type: EventType, source: impl Into<String>, data: EventData) -> Self {
112        Self {
113            id: EventId::generate(),
114            event_type,
115            source: source.into(),
116            timestamp: current_timestamp_millis(),
117            data,
118            metadata: HashMap::new(),
119        }
120    }
121
122    /// Create an event with a specific ID.
123    pub fn with_id(
124        id: EventId,
125        event_type: EventType,
126        source: impl Into<String>,
127        data: EventData,
128    ) -> Self {
129        Self {
130            id,
131            event_type,
132            source: source.into(),
133            timestamp: current_timestamp_millis(),
134            data,
135            metadata: HashMap::new(),
136        }
137    }
138
139    /// Add metadata to the event.
140    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
141        self.metadata.insert(key.into(), value.into());
142        self
143    }
144
145    /// Get metadata value.
146    pub fn get_metadata(&self, key: &str) -> Option<&String> {
147        self.metadata.get(key)
148    }
149
150    /// Check if the event matches a filter.
151    pub fn matches(&self, filter: &EventFilter) -> bool {
152        if let Some(ref event_type) = filter.event_type {
153            if &self.event_type != event_type {
154                return false;
155            }
156        }
157
158        if let Some(ref source) = filter.source {
159            if !self.source.starts_with(source) {
160                return false;
161            }
162        }
163
164        if let Some(after) = filter.after_timestamp {
165            if self.timestamp <= after {
166                return false;
167            }
168        }
169
170        true
171    }
172}
173
174// =============================================================================
175// Event Data
176// =============================================================================
177
178/// Data payload for an event.
179#[derive(Debug, Clone, Serialize, Deserialize)]
180#[serde(untagged)]
181pub enum EventData {
182    Null,
183    Bool(bool),
184    Int(i64),
185    Float(f64),
186    String(String),
187    Bytes(Vec<u8>),
188    Json(serde_json::Value),
189}
190
191impl EventData {
192    pub fn is_null(&self) -> bool {
193        matches!(self, Self::Null)
194    }
195
196    pub fn as_str(&self) -> Option<&str> {
197        match self {
198            Self::String(s) => Some(s),
199            _ => None,
200        }
201    }
202
203    pub fn as_i64(&self) -> Option<i64> {
204        match self {
205            Self::Int(n) => Some(*n),
206            _ => None,
207        }
208    }
209
210    pub fn as_json(&self) -> Option<&serde_json::Value> {
211        match self {
212            Self::Json(v) => Some(v),
213            _ => None,
214        }
215    }
216}
217
218impl Default for EventData {
219    fn default() -> Self {
220        Self::Null
221    }
222}
223
224impl From<String> for EventData {
225    fn from(s: String) -> Self {
226        Self::String(s)
227    }
228}
229
230impl From<&str> for EventData {
231    fn from(s: &str) -> Self {
232        Self::String(s.to_string())
233    }
234}
235
236impl From<i64> for EventData {
237    fn from(n: i64) -> Self {
238        Self::Int(n)
239    }
240}
241
242impl From<f64> for EventData {
243    fn from(f: f64) -> Self {
244        Self::Float(f)
245    }
246}
247
248impl From<bool> for EventData {
249    fn from(b: bool) -> Self {
250        Self::Bool(b)
251    }
252}
253
254impl From<serde_json::Value> for EventData {
255    fn from(v: serde_json::Value) -> Self {
256        Self::Json(v)
257    }
258}
259
260impl From<Vec<u8>> for EventData {
261    fn from(bytes: Vec<u8>) -> Self {
262        Self::Bytes(bytes)
263    }
264}
265
266// =============================================================================
267// Event Filter
268// =============================================================================
269
270/// Filter for events.
271#[derive(Debug, Clone, Default, Serialize, Deserialize)]
272pub struct EventFilter {
273    pub event_type: Option<EventType>,
274    pub source: Option<String>,
275    pub after_timestamp: Option<u64>,
276}
277
278impl EventFilter {
279    pub fn new() -> Self {
280        Self::default()
281    }
282
283    pub fn with_type(mut self, event_type: EventType) -> Self {
284        self.event_type = Some(event_type);
285        self
286    }
287
288    pub fn with_source(mut self, source: impl Into<String>) -> Self {
289        self.source = Some(source.into());
290        self
291    }
292
293    pub fn after(mut self, timestamp: u64) -> Self {
294        self.after_timestamp = Some(timestamp);
295        self
296    }
297}
298
299// =============================================================================
300// Event Batch
301// =============================================================================
302
303/// A batch of events.
304#[derive(Debug, Clone, Serialize, Deserialize)]
305pub struct EventBatch {
306    pub events: Vec<Event>,
307    pub batch_id: String,
308    pub timestamp: u64,
309}
310
311impl EventBatch {
312    pub fn new(events: Vec<Event>) -> Self {
313        let batch_id = format!("batch_{}", current_timestamp_millis());
314        Self {
315            events,
316            batch_id,
317            timestamp: current_timestamp_millis(),
318        }
319    }
320
321    pub fn len(&self) -> usize {
322        self.events.len()
323    }
324
325    pub fn is_empty(&self) -> bool {
326        self.events.is_empty()
327    }
328}
329
330fn current_timestamp_millis() -> u64 {
331    SystemTime::now()
332        .duration_since(UNIX_EPOCH)
333        .map(|d| d.as_millis() as u64)
334        .unwrap_or(0)
335}
336
337// =============================================================================
338// Tests
339// =============================================================================
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344
345    #[test]
346    fn test_event_id() {
347        let id1 = EventId::generate();
348        let id2 = EventId::generate();
349        assert_ne!(id1, id2);
350        assert!(id1.as_str().starts_with("evt_"));
351    }
352
353    #[test]
354    fn test_event_creation() {
355        let event = Event::new(
356            EventType::Created,
357            "users",
358            EventData::String("test data".to_string()),
359        );
360
361        assert_eq!(event.event_type, EventType::Created);
362        assert_eq!(event.source, "users");
363        assert!(event.timestamp > 0);
364    }
365
366    #[test]
367    fn test_event_metadata() {
368        let event = Event::new(EventType::Updated, "orders", EventData::Null)
369            .with_metadata("user_id", "123")
370            .with_metadata("action", "update");
371
372        assert_eq!(event.get_metadata("user_id"), Some(&"123".to_string()));
373        assert_eq!(event.get_metadata("action"), Some(&"update".to_string()));
374    }
375
376    #[test]
377    fn test_event_filter() {
378        let event = Event::new(EventType::Created, "users.profile", EventData::Null);
379
380        let filter = EventFilter::new().with_type(EventType::Created);
381        assert!(event.matches(&filter));
382
383        let filter = EventFilter::new().with_source("users");
384        assert!(event.matches(&filter));
385
386        let filter = EventFilter::new().with_type(EventType::Deleted);
387        assert!(!event.matches(&filter));
388    }
389
390    #[test]
391    fn test_event_data() {
392        let data = EventData::String("hello".to_string());
393        assert_eq!(data.as_str(), Some("hello"));
394
395        let data = EventData::Int(42);
396        assert_eq!(data.as_i64(), Some(42));
397
398        let data: EventData = "test".into();
399        assert_eq!(data.as_str(), Some("test"));
400    }
401}