use crate::types::Value;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamEvent {
pub id: String,
pub event_type: String,
pub data: HashMap<String, Value>,
pub metadata: EventMetadata,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventMetadata {
pub timestamp: u64,
pub source: String,
pub sequence: u64,
pub tags: HashMap<String, String>,
}
impl StreamEvent {
pub fn new(
event_type: impl Into<String>,
data: HashMap<String, Value>,
source: impl Into<String>,
) -> Self {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
Self {
id: format!("evt_{}", uuid_v4()),
event_type: event_type.into(),
data,
metadata: EventMetadata {
timestamp,
source: source.into(),
sequence: 0, tags: HashMap::new(),
},
}
}
pub fn with_timestamp(
event_type: impl Into<String>,
data: HashMap<String, Value>,
source: impl Into<String>,
timestamp: u64,
) -> Self {
Self {
id: format!("evt_{}", uuid_v4()),
event_type: event_type.into(),
data,
metadata: EventMetadata {
timestamp,
source: source.into(),
sequence: 0,
tags: HashMap::new(),
},
}
}
pub fn age_ms(&self) -> u64 {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
now.saturating_sub(self.metadata.timestamp)
}
pub fn matches_pattern(&self, pattern: &EventPattern) -> bool {
if let Some(ref expected_type) = pattern.event_type {
if &self.event_type != expected_type {
return false;
}
}
for (key, expected_value) in &pattern.required_fields {
if let Some(actual_value) = self.data.get(key) {
if actual_value != expected_value {
return false;
}
} else {
return false;
}
}
if let Some(ref expected_source) = pattern.source {
if &self.metadata.source != expected_source {
return false;
}
}
true
}
pub fn add_tag(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.metadata.tags.insert(key.into(), value.into());
}
pub fn get_numeric(&self, field: &str) -> Option<f64> {
self.data.get(field).and_then(|v| match v {
Value::Number(n) => Some(*n),
Value::Integer(i) => Some(*i as f64),
_ => None,
})
}
pub fn get_string(&self, field: &str) -> Option<&str> {
self.data.get(field).and_then(|v| match v {
Value::String(s) => Some(s.as_str()),
_ => None,
})
}
pub fn get_boolean(&self, field: &str) -> Option<bool> {
self.data.get(field).and_then(|v| match v {
Value::Boolean(b) => Some(*b),
_ => None,
})
}
}
#[derive(Debug, Clone)]
pub struct EventPattern {
pub event_type: Option<String>,
pub required_fields: HashMap<String, Value>,
pub source: Option<String>,
}
impl EventPattern {
pub fn new() -> Self {
Self {
event_type: None,
required_fields: HashMap::new(),
source: None,
}
}
pub fn with_event_type(mut self, event_type: impl Into<String>) -> Self {
self.event_type = Some(event_type.into());
self
}
pub fn with_field(mut self, key: impl Into<String>, value: Value) -> Self {
self.required_fields.insert(key.into(), value);
self
}
pub fn with_source(mut self, source: impl Into<String>) -> Self {
self.source = Some(source.into());
self
}
}
impl Default for EventPattern {
fn default() -> Self {
Self::new()
}
}
fn uuid_v4() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hasher};
let mut hasher = RandomState::new().build_hasher();
hasher.write_u128(timestamp);
let random_part = hasher.finish();
format!("{:x}-{:x}", timestamp, random_part)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::Value;
#[test]
fn test_stream_event_creation() {
let mut data = HashMap::new();
data.insert("price".to_string(), Value::Number(100.5));
data.insert("symbol".to_string(), Value::String("AAPL".to_string()));
let event = StreamEvent::new("TradeEvent", data, "trading_system");
assert_eq!(event.event_type, "TradeEvent");
assert_eq!(event.metadata.source, "trading_system");
assert!(event.id.starts_with("evt_"));
assert_eq!(event.get_numeric("price"), Some(100.5));
assert_eq!(event.get_string("symbol"), Some("AAPL"));
}
#[test]
fn test_event_pattern_matching() {
let mut data = HashMap::new();
data.insert("price".to_string(), Value::Number(100.5));
data.insert("symbol".to_string(), Value::String("AAPL".to_string()));
let event = StreamEvent::new("TradeEvent", data, "trading_system");
let pattern = EventPattern::new()
.with_event_type("TradeEvent")
.with_field("symbol", Value::String("AAPL".to_string()));
assert!(event.matches_pattern(&pattern));
let wrong_pattern = EventPattern::new()
.with_event_type("TradeEvent")
.with_field("symbol", Value::String("GOOGL".to_string()));
assert!(!event.matches_pattern(&wrong_pattern));
}
#[test]
fn test_event_age() {
let data = HashMap::new();
let event = StreamEvent::new("TestEvent", data, "test");
assert!(event.age_ms() < 100);
}
}