1use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct EventId(pub String);
19
20impl EventId {
21 pub fn new(id: impl Into<String>) -> Self {
22 Self(id.into())
23 }
24
25 pub fn generate() -> Self {
26 let timestamp = SystemTime::now()
27 .duration_since(UNIX_EPOCH)
28 .unwrap_or_default()
29 .as_nanos();
30 Self(format!("evt_{:032x}", timestamp))
31 }
32
33 pub fn as_str(&self) -> &str {
34 &self.0
35 }
36}
37
38impl std::fmt::Display for EventId {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(f, "{}", self.0)
41 }
42}
43
44impl From<String> for EventId {
45 fn from(s: String) -> Self {
46 Self(s)
47 }
48}
49
50impl From<&str> for EventId {
51 fn from(s: &str) -> Self {
52 Self(s.to_string())
53 }
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub enum EventType {
63 Created,
65 Updated,
67 Deleted,
69 Custom(String),
71}
72
73impl EventType {
74 pub fn custom(name: impl Into<String>) -> Self {
75 Self::Custom(name.into())
76 }
77
78 pub fn as_str(&self) -> &str {
79 match self {
80 Self::Created => "created",
81 Self::Updated => "updated",
82 Self::Deleted => "deleted",
83 Self::Custom(s) => s,
84 }
85 }
86}
87
88impl Default for EventType {
89 fn default() -> Self {
90 Self::Custom("unknown".to_string())
91 }
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct Event {
101 pub id: EventId,
102 pub event_type: EventType,
103 pub source: String,
104 pub timestamp: u64,
105 pub data: EventData,
106 pub metadata: HashMap<String, String>,
107}
108
109impl Event {
110 pub fn new(event_type: EventType, source: impl Into<String>, data: EventData) -> Self {
112 Self {
113 id: EventId::generate(),
114 event_type,
115 source: source.into(),
116 timestamp: current_timestamp_millis(),
117 data,
118 metadata: HashMap::new(),
119 }
120 }
121
122 pub fn with_id(
124 id: EventId,
125 event_type: EventType,
126 source: impl Into<String>,
127 data: EventData,
128 ) -> Self {
129 Self {
130 id,
131 event_type,
132 source: source.into(),
133 timestamp: current_timestamp_millis(),
134 data,
135 metadata: HashMap::new(),
136 }
137 }
138
139 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
141 self.metadata.insert(key.into(), value.into());
142 self
143 }
144
145 pub fn get_metadata(&self, key: &str) -> Option<&String> {
147 self.metadata.get(key)
148 }
149
150 pub fn matches(&self, filter: &EventFilter) -> bool {
152 if let Some(ref event_type) = filter.event_type {
153 if &self.event_type != event_type {
154 return false;
155 }
156 }
157
158 if let Some(ref source) = filter.source {
159 if !self.source.starts_with(source) {
160 return false;
161 }
162 }
163
164 if let Some(after) = filter.after_timestamp {
165 if self.timestamp <= after {
166 return false;
167 }
168 }
169
170 true
171 }
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
180#[serde(untagged)]
181#[derive(Default)]
182pub enum EventData {
183 #[default]
184 Null,
185 Bool(bool),
186 Int(i64),
187 Float(f64),
188 String(String),
189 Bytes(Vec<u8>),
190 Json(serde_json::Value),
191}
192
193impl EventData {
194 pub fn is_null(&self) -> bool {
195 matches!(self, Self::Null)
196 }
197
198 pub fn as_str(&self) -> Option<&str> {
199 match self {
200 Self::String(s) => Some(s),
201 _ => None,
202 }
203 }
204
205 pub fn as_i64(&self) -> Option<i64> {
206 match self {
207 Self::Int(n) => Some(*n),
208 _ => None,
209 }
210 }
211
212 pub fn as_json(&self) -> Option<&serde_json::Value> {
213 match self {
214 Self::Json(v) => Some(v),
215 _ => None,
216 }
217 }
218}
219
220impl From<String> for EventData {
221 fn from(s: String) -> Self {
222 Self::String(s)
223 }
224}
225
226impl From<&str> for EventData {
227 fn from(s: &str) -> Self {
228 Self::String(s.to_string())
229 }
230}
231
232impl From<i64> for EventData {
233 fn from(n: i64) -> Self {
234 Self::Int(n)
235 }
236}
237
238impl From<f64> for EventData {
239 fn from(f: f64) -> Self {
240 Self::Float(f)
241 }
242}
243
244impl From<bool> for EventData {
245 fn from(b: bool) -> Self {
246 Self::Bool(b)
247 }
248}
249
250impl From<serde_json::Value> for EventData {
251 fn from(v: serde_json::Value) -> Self {
252 Self::Json(v)
253 }
254}
255
256impl From<Vec<u8>> for EventData {
257 fn from(bytes: Vec<u8>) -> Self {
258 Self::Bytes(bytes)
259 }
260}
261
262#[derive(Debug, Clone, Default, Serialize, Deserialize)]
268pub struct EventFilter {
269 pub event_type: Option<EventType>,
270 pub source: Option<String>,
271 pub after_timestamp: Option<u64>,
272}
273
274impl EventFilter {
275 pub fn new() -> Self {
276 Self::default()
277 }
278
279 pub fn with_type(mut self, event_type: EventType) -> Self {
280 self.event_type = Some(event_type);
281 self
282 }
283
284 pub fn with_source(mut self, source: impl Into<String>) -> Self {
285 self.source = Some(source.into());
286 self
287 }
288
289 pub fn after(mut self, timestamp: u64) -> Self {
290 self.after_timestamp = Some(timestamp);
291 self
292 }
293}
294
295#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct EventBatch {
302 pub events: Vec<Event>,
303 pub batch_id: String,
304 pub timestamp: u64,
305}
306
307impl EventBatch {
308 pub fn new(events: Vec<Event>) -> Self {
309 let batch_id = format!("batch_{}", current_timestamp_millis());
310 Self {
311 events,
312 batch_id,
313 timestamp: current_timestamp_millis(),
314 }
315 }
316
317 pub fn len(&self) -> usize {
318 self.events.len()
319 }
320
321 pub fn is_empty(&self) -> bool {
322 self.events.is_empty()
323 }
324}
325
326fn current_timestamp_millis() -> u64 {
327 SystemTime::now()
328 .duration_since(UNIX_EPOCH)
329 .map(|d| d.as_millis() as u64)
330 .unwrap_or(0)
331}
332
333#[cfg(test)]
338mod tests {
339 use super::*;
340
341 #[test]
342 fn test_event_id() {
343 let id1 = EventId::generate();
344 let id2 = EventId::generate();
345 assert_ne!(id1, id2);
346 assert!(id1.as_str().starts_with("evt_"));
347 }
348
349 #[test]
350 fn test_event_creation() {
351 let event = Event::new(
352 EventType::Created,
353 "users",
354 EventData::String("test data".to_string()),
355 );
356
357 assert_eq!(event.event_type, EventType::Created);
358 assert_eq!(event.source, "users");
359 assert!(event.timestamp > 0);
360 }
361
362 #[test]
363 fn test_event_metadata() {
364 let event = Event::new(EventType::Updated, "orders", EventData::Null)
365 .with_metadata("user_id", "123")
366 .with_metadata("action", "update");
367
368 assert_eq!(event.get_metadata("user_id"), Some(&"123".to_string()));
369 assert_eq!(event.get_metadata("action"), Some(&"update".to_string()));
370 }
371
372 #[test]
373 fn test_event_filter() {
374 let event = Event::new(EventType::Created, "users.profile", EventData::Null);
375
376 let filter = EventFilter::new().with_type(EventType::Created);
377 assert!(event.matches(&filter));
378
379 let filter = EventFilter::new().with_source("users");
380 assert!(event.matches(&filter));
381
382 let filter = EventFilter::new().with_type(EventType::Deleted);
383 assert!(!event.matches(&filter));
384 }
385
386 #[test]
387 fn test_event_data() {
388 let data = EventData::String("hello".to_string());
389 assert_eq!(data.as_str(), Some("hello"));
390
391 let data = EventData::Int(42);
392 assert_eq!(data.as_i64(), Some(42));
393
394 let data: EventData = "test".into();
395 assert_eq!(data.as_str(), Some("test"));
396 }
397}