Skip to main content

varpulis_core/
event.rs

1//! Runtime event types
2//!
3//! This module defines the core `Event` type used throughout the Varpulis
4//! streaming analytics engine. Events are the fundamental unit of data
5//! processed by streams, pattern matchers, and connectors.
6
7pub use crate::value::FxIndexMap;
8use crate::Value;
9use chrono::{DateTime, Utc};
10use indexmap::IndexMap;
11use rustc_hash::FxBuildHasher;
12use serde::{Deserialize, Serialize};
13use std::sync::Arc;
14
15/// Type alias for field name keys using `Arc<str>` for O(1) cloning.
16pub type FieldKey = Arc<str>;
17
18/// A shared reference to an Event for efficient passing through pipelines.
19/// Using Arc avoids expensive deep clones when events are processed by
20/// multiple streams, windows, or pattern matchers.
21pub type SharedEvent = Arc<Event>;
22
23/// A runtime event
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct Event {
26    /// Event type name (`Arc<str>` for O(1) clone instead of O(n) String clone).
27    pub event_type: Arc<str>,
28    /// Timestamp of the event (defaults to current server time if not provided).
29    #[serde(default = "Utc::now")]
30    pub timestamp: DateTime<Utc>,
31    /// Event payload (uses `Arc<str>` keys for O(1) cloning, FxBuildHasher for faster access).
32    pub data: FxIndexMap<Arc<str>, Value>,
33}
34
35impl Event {
36    /// Creates a new event with the given type and current timestamp.
37    pub fn new(event_type: impl Into<Arc<str>>) -> Self {
38        Self {
39            event_type: event_type.into(),
40            timestamp: Utc::now(),
41            data: IndexMap::with_hasher(FxBuildHasher),
42        }
43    }
44
45    /// Creates a new event with a specific timestamp (avoids Utc::now() syscall).
46    pub fn new_at(event_type: impl Into<Arc<str>>, timestamp: DateTime<Utc>) -> Self {
47        Self {
48            event_type: event_type.into(),
49            timestamp,
50            data: IndexMap::with_hasher(FxBuildHasher),
51        }
52    }
53
54    /// Creates a new event with pre-allocated capacity for fields.
55    /// Use this when you know the approximate number of fields in advance.
56    pub fn with_capacity(event_type: impl Into<Arc<str>>, capacity: usize) -> Self {
57        Self {
58            event_type: event_type.into(),
59            timestamp: Utc::now(),
60            data: IndexMap::with_capacity_and_hasher(capacity, FxBuildHasher),
61        }
62    }
63
64    /// Creates a new event with pre-allocated capacity and a specific timestamp.
65    pub fn with_capacity_at(
66        event_type: impl Into<Arc<str>>,
67        capacity: usize,
68        timestamp: DateTime<Utc>,
69    ) -> Self {
70        Self {
71            event_type: event_type.into(),
72            timestamp,
73            data: IndexMap::with_capacity_and_hasher(capacity, FxBuildHasher),
74        }
75    }
76
77    /// Creates a new event from pre-built fields map.
78    /// Use this when you already have the fields constructed (e.g., from JSON parsing).
79    pub fn from_fields(event_type: impl Into<Arc<str>>, data: FxIndexMap<Arc<str>, Value>) -> Self {
80        Self {
81            event_type: event_type.into(),
82            timestamp: Utc::now(),
83            data,
84        }
85    }
86
87    /// Creates a new event from pre-built fields map with String keys (converts to `Arc<str>`).
88    pub fn from_string_fields(
89        event_type: impl Into<Arc<str>>,
90        data: FxIndexMap<String, Value>,
91    ) -> Self {
92        let converted: FxIndexMap<Arc<str>, Value> =
93            data.into_iter().map(|(k, v)| (Arc::from(k), v)).collect();
94        Self {
95            event_type: event_type.into(),
96            timestamp: Utc::now(),
97            data: converted,
98        }
99    }
100
101    /// Creates a new event from pre-built fields map with a specific timestamp.
102    pub fn from_fields_with_timestamp(
103        event_type: impl Into<Arc<str>>,
104        timestamp: DateTime<Utc>,
105        data: FxIndexMap<Arc<str>, Value>,
106    ) -> Self {
107        Self {
108            event_type: event_type.into(),
109            timestamp,
110            data,
111        }
112    }
113
114    /// Sets the event's timestamp (builder pattern).
115    pub const fn with_timestamp(mut self, ts: DateTime<Utc>) -> Self {
116        self.timestamp = ts;
117        self
118    }
119
120    /// Adds a field to the event (builder pattern).
121    pub fn with_field(mut self, key: impl Into<Arc<str>>, value: impl Into<Value>) -> Self {
122        self.data.insert(key.into(), value.into());
123        self
124    }
125
126    /// Looks up a field value by name.
127    pub fn get(&self, key: &str) -> Option<&Value> {
128        self.data.get(key)
129    }
130
131    /// Looks up a field and extracts it as `f64`.
132    pub fn get_float(&self, key: &str) -> Option<f64> {
133        self.data.get(key).and_then(|v| v.as_float())
134    }
135
136    /// Looks up a field and extracts it as `i64`.
137    pub fn get_int(&self, key: &str) -> Option<i64> {
138        self.data.get(key).and_then(|v| v.as_int())
139    }
140
141    /// Looks up a field and extracts it as a string slice.
142    pub fn get_str(&self, key: &str) -> Option<&str> {
143        self.data.get(key).and_then(|v| v.as_str())
144    }
145
146    /// Serialize for sink output: event_type + timestamp + data fields.
147    pub fn to_sink_payload(&self) -> Vec<u8> {
148        use serde::ser::SerializeMap;
149        use serde::Serializer;
150        let mut buf = Vec::with_capacity(256);
151        let mut ser = serde_json::Serializer::new(&mut buf);
152        let mut map = ser.serialize_map(Some(2 + self.data.len())).unwrap();
153        map.serialize_entry("event_type", self.event_type.as_ref())
154            .unwrap();
155        map.serialize_entry("timestamp", &self.timestamp).unwrap();
156        for (k, v) in &self.data {
157            if k.as_ref() != "timestamp" {
158                map.serialize_entry(k.as_ref(), v).unwrap();
159            }
160        }
161        map.end().unwrap();
162        buf
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use chrono::TimeZone;
170
171    #[test]
172    fn test_event_new() {
173        let event = Event::new("TestEvent");
174        assert_eq!(&*event.event_type, "TestEvent");
175        assert!(event.data.is_empty());
176    }
177
178    #[test]
179    fn test_event_new_from_string() {
180        let event = Event::new("TestEvent".to_string());
181        assert_eq!(&*event.event_type, "TestEvent");
182    }
183
184    #[test]
185    fn test_event_with_timestamp() {
186        let ts = Utc.with_ymd_and_hms(2025, 1, 15, 10, 30, 0).unwrap();
187        let event = Event::new("Test").with_timestamp(ts);
188        assert_eq!(event.timestamp, ts);
189    }
190
191    #[test]
192    fn test_event_with_field() {
193        let event = Event::new("Test")
194            .with_field("name", "value")
195            .with_field("count", 42i64);
196
197        assert_eq!(event.data.len(), 2);
198        assert_eq!(event.get("name"), Some(&Value::Str("value".into())));
199        assert_eq!(event.get("count"), Some(&Value::Int(42)));
200    }
201
202    #[test]
203    fn test_event_get_float() {
204        let event = Event::new("Test")
205            .with_field("price", 19.99f64)
206            .with_field("quantity", 5i64);
207
208        assert_eq!(event.get_float("price"), Some(19.99));
209        assert_eq!(event.get_float("quantity"), Some(5.0));
210        assert_eq!(event.get_float("missing"), None);
211    }
212
213    #[test]
214    fn test_event_get_int() {
215        let event = Event::new("Test")
216            .with_field("count", 42i64)
217            .with_field("ratio", 3.7f64);
218
219        assert_eq!(event.get_int("count"), Some(42));
220        assert_eq!(event.get_int("ratio"), Some(3));
221        assert_eq!(event.get_int("missing"), None);
222    }
223
224    #[test]
225    fn test_event_get_str() {
226        let event = Event::new("Test").with_field("name", "Alice");
227        assert_eq!(event.get_str("name"), Some("Alice"));
228        assert_eq!(event.get_str("missing"), None);
229    }
230
231    #[test]
232    fn test_event_overwrite_field() {
233        let event = Event::new("Test")
234            .with_field("key", "first")
235            .with_field("key", "second");
236
237        assert_eq!(event.get_str("key"), Some("second"));
238        assert_eq!(event.data.len(), 1);
239    }
240}