1use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
18pub struct EventId(pub String);
19
20impl EventId {
21 pub fn new(id: impl Into<String>) -> Self {
22 Self(id.into())
23 }
24
25 pub fn generate() -> Self {
26 let timestamp = SystemTime::now()
27 .duration_since(UNIX_EPOCH)
28 .unwrap_or_default()
29 .as_nanos();
30 Self(format!("evt_{:032x}", timestamp))
31 }
32
33 pub fn as_str(&self) -> &str {
34 &self.0
35 }
36}
37
38impl std::fmt::Display for EventId {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 write!(f, "{}", self.0)
41 }
42}
43
44impl From<String> for EventId {
45 fn from(s: String) -> Self {
46 Self(s)
47 }
48}
49
50impl From<&str> for EventId {
51 fn from(s: &str) -> Self {
52 Self(s.to_string())
53 }
54}
55
56#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub enum EventType {
63 Created,
65 Updated,
67 Deleted,
69 Custom(String),
71}
72
73impl EventType {
74 pub fn custom(name: impl Into<String>) -> Self {
75 Self::Custom(name.into())
76 }
77
78 pub fn as_str(&self) -> &str {
79 match self {
80 Self::Created => "created",
81 Self::Updated => "updated",
82 Self::Deleted => "deleted",
83 Self::Custom(s) => s,
84 }
85 }
86}
87
88impl Default for EventType {
89 fn default() -> Self {
90 Self::Custom("unknown".to_string())
91 }
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct Event {
101 pub id: EventId,
102 pub event_type: EventType,
103 pub source: String,
104 pub timestamp: u64,
105 pub data: EventData,
106 pub metadata: HashMap<String, String>,
107}
108
109impl Event {
110 pub fn new(event_type: EventType, source: impl Into<String>, data: EventData) -> Self {
112 Self {
113 id: EventId::generate(),
114 event_type,
115 source: source.into(),
116 timestamp: current_timestamp_millis(),
117 data,
118 metadata: HashMap::new(),
119 }
120 }
121
122 pub fn with_id(
124 id: EventId,
125 event_type: EventType,
126 source: impl Into<String>,
127 data: EventData,
128 ) -> Self {
129 Self {
130 id,
131 event_type,
132 source: source.into(),
133 timestamp: current_timestamp_millis(),
134 data,
135 metadata: HashMap::new(),
136 }
137 }
138
139 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
141 self.metadata.insert(key.into(), value.into());
142 self
143 }
144
145 pub fn get_metadata(&self, key: &str) -> Option<&String> {
147 self.metadata.get(key)
148 }
149
150 pub fn matches(&self, filter: &EventFilter) -> bool {
152 if let Some(ref event_type) = filter.event_type {
153 if &self.event_type != event_type {
154 return false;
155 }
156 }
157
158 if let Some(ref source) = filter.source {
159 if !self.source.starts_with(source) {
160 return false;
161 }
162 }
163
164 if let Some(after) = filter.after_timestamp {
165 if self.timestamp <= after {
166 return false;
167 }
168 }
169
170 true
171 }
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
180#[serde(untagged)]
181#[derive(Default)]
182pub enum EventData {
183 #[default]
184 Null,
185 Bool(bool),
186 Int(i64),
187 Float(f64),
188 String(String),
189 Bytes(Vec<u8>),
190 Json(serde_json::Value),
191}
192
193impl EventData {
194 pub fn is_null(&self) -> bool {
195 matches!(self, Self::Null)
196 }
197
198 pub fn as_str(&self) -> Option<&str> {
199 match self {
200 Self::String(s) => Some(s),
201 _ => None,
202 }
203 }
204
205 pub fn as_i64(&self) -> Option<i64> {
206 match self {
207 Self::Int(n) => Some(*n),
208 _ => None,
209 }
210 }
211
212 pub fn as_json(&self) -> Option<&serde_json::Value> {
213 match self {
214 Self::Json(v) => Some(v),
215 _ => None,
216 }
217 }
218}
219
220
221impl From<String> for EventData {
222 fn from(s: String) -> Self {
223 Self::String(s)
224 }
225}
226
227impl From<&str> for EventData {
228 fn from(s: &str) -> Self {
229 Self::String(s.to_string())
230 }
231}
232
233impl From<i64> for EventData {
234 fn from(n: i64) -> Self {
235 Self::Int(n)
236 }
237}
238
239impl From<f64> for EventData {
240 fn from(f: f64) -> Self {
241 Self::Float(f)
242 }
243}
244
245impl From<bool> for EventData {
246 fn from(b: bool) -> Self {
247 Self::Bool(b)
248 }
249}
250
251impl From<serde_json::Value> for EventData {
252 fn from(v: serde_json::Value) -> Self {
253 Self::Json(v)
254 }
255}
256
257impl From<Vec<u8>> for EventData {
258 fn from(bytes: Vec<u8>) -> Self {
259 Self::Bytes(bytes)
260 }
261}
262
263#[derive(Debug, Clone, Default, Serialize, Deserialize)]
269pub struct EventFilter {
270 pub event_type: Option<EventType>,
271 pub source: Option<String>,
272 pub after_timestamp: Option<u64>,
273}
274
275impl EventFilter {
276 pub fn new() -> Self {
277 Self::default()
278 }
279
280 pub fn with_type(mut self, event_type: EventType) -> Self {
281 self.event_type = Some(event_type);
282 self
283 }
284
285 pub fn with_source(mut self, source: impl Into<String>) -> Self {
286 self.source = Some(source.into());
287 self
288 }
289
290 pub fn after(mut self, timestamp: u64) -> Self {
291 self.after_timestamp = Some(timestamp);
292 self
293 }
294}
295
296#[derive(Debug, Clone, Serialize, Deserialize)]
302pub struct EventBatch {
303 pub events: Vec<Event>,
304 pub batch_id: String,
305 pub timestamp: u64,
306}
307
308impl EventBatch {
309 pub fn new(events: Vec<Event>) -> Self {
310 let batch_id = format!("batch_{}", current_timestamp_millis());
311 Self {
312 events,
313 batch_id,
314 timestamp: current_timestamp_millis(),
315 }
316 }
317
318 pub fn len(&self) -> usize {
319 self.events.len()
320 }
321
322 pub fn is_empty(&self) -> bool {
323 self.events.is_empty()
324 }
325}
326
327fn current_timestamp_millis() -> u64 {
328 SystemTime::now()
329 .duration_since(UNIX_EPOCH)
330 .map(|d| d.as_millis() as u64)
331 .unwrap_or(0)
332}
333
334#[cfg(test)]
339mod tests {
340 use super::*;
341
342 #[test]
343 fn test_event_id() {
344 let id1 = EventId::generate();
345 let id2 = EventId::generate();
346 assert_ne!(id1, id2);
347 assert!(id1.as_str().starts_with("evt_"));
348 }
349
350 #[test]
351 fn test_event_creation() {
352 let event = Event::new(
353 EventType::Created,
354 "users",
355 EventData::String("test data".to_string()),
356 );
357
358 assert_eq!(event.event_type, EventType::Created);
359 assert_eq!(event.source, "users");
360 assert!(event.timestamp > 0);
361 }
362
363 #[test]
364 fn test_event_metadata() {
365 let event = Event::new(EventType::Updated, "orders", EventData::Null)
366 .with_metadata("user_id", "123")
367 .with_metadata("action", "update");
368
369 assert_eq!(event.get_metadata("user_id"), Some(&"123".to_string()));
370 assert_eq!(event.get_metadata("action"), Some(&"update".to_string()));
371 }
372
373 #[test]
374 fn test_event_filter() {
375 let event = Event::new(EventType::Created, "users.profile", EventData::Null);
376
377 let filter = EventFilter::new().with_type(EventType::Created);
378 assert!(event.matches(&filter));
379
380 let filter = EventFilter::new().with_source("users");
381 assert!(event.matches(&filter));
382
383 let filter = EventFilter::new().with_type(EventType::Deleted);
384 assert!(!event.matches(&filter));
385 }
386
387 #[test]
388 fn test_event_data() {
389 let data = EventData::String("hello".to_string());
390 assert_eq!(data.as_str(), Some("hello"));
391
392 let data = EventData::Int(42);
393 assert_eq!(data.as_i64(), Some(42));
394
395 let data: EventData = "test".into();
396 assert_eq!(data.as_str(), Some("test"));
397 }
398}