Skip to main content

varpulis_core/
event.rs

1//! Runtime event types
2//!
3//! This module defines the core `Event` type used throughout the Varpulis
4//! streaming analytics engine. Events are the fundamental unit of data
5//! processed by streams, pattern matchers, and connectors.
6
7use std::sync::Arc;
8
9use chrono::{DateTime, Utc};
10use indexmap::IndexMap;
11use rustc_hash::FxBuildHasher;
12use serde::{Deserialize, Serialize};
13
14pub use crate::value::FxIndexMap;
15use crate::Value;
16
17/// Type alias for field name keys using `Arc<str>` for O(1) cloning.
18pub type FieldKey = Arc<str>;
19
20/// A shared reference to an Event for efficient passing through pipelines.
21/// Using Arc avoids expensive deep clones when events are processed by
22/// multiple streams, windows, or pattern matchers.
23pub type SharedEvent = Arc<Event>;
24
25/// A runtime event
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct Event {
28    /// Event type name (`Arc<str>` for O(1) clone instead of O(n) String clone).
29    pub event_type: Arc<str>,
30    /// Timestamp of the event (defaults to current server time if not provided).
31    #[serde(default = "Utc::now")]
32    pub timestamp: DateTime<Utc>,
33    /// Event payload (uses `Arc<str>` keys for O(1) cloning, FxBuildHasher for faster access).
34    pub data: FxIndexMap<Arc<str>, Value>,
35}
36
37impl Event {
38    /// Creates a new event with the given type and current timestamp.
39    pub fn new(event_type: impl Into<Arc<str>>) -> Self {
40        Self {
41            event_type: event_type.into(),
42            timestamp: Utc::now(),
43            data: IndexMap::with_hasher(FxBuildHasher),
44        }
45    }
46
47    /// Creates a new event with a specific timestamp (avoids Utc::now() syscall).
48    pub fn new_at(event_type: impl Into<Arc<str>>, timestamp: DateTime<Utc>) -> Self {
49        Self {
50            event_type: event_type.into(),
51            timestamp,
52            data: IndexMap::with_hasher(FxBuildHasher),
53        }
54    }
55
56    /// Creates a new event with pre-allocated capacity for fields.
57    /// Use this when you know the approximate number of fields in advance.
58    pub fn with_capacity(event_type: impl Into<Arc<str>>, capacity: usize) -> Self {
59        Self {
60            event_type: event_type.into(),
61            timestamp: Utc::now(),
62            data: IndexMap::with_capacity_and_hasher(capacity, FxBuildHasher),
63        }
64    }
65
66    /// Creates a new event with pre-allocated capacity and a specific timestamp.
67    pub fn with_capacity_at(
68        event_type: impl Into<Arc<str>>,
69        capacity: usize,
70        timestamp: DateTime<Utc>,
71    ) -> Self {
72        Self {
73            event_type: event_type.into(),
74            timestamp,
75            data: IndexMap::with_capacity_and_hasher(capacity, FxBuildHasher),
76        }
77    }
78
79    /// Creates a new event from pre-built fields map.
80    /// Use this when you already have the fields constructed (e.g., from JSON parsing).
81    pub fn from_fields(event_type: impl Into<Arc<str>>, data: FxIndexMap<Arc<str>, Value>) -> Self {
82        Self {
83            event_type: event_type.into(),
84            timestamp: Utc::now(),
85            data,
86        }
87    }
88
89    /// Creates a new event from pre-built fields map with String keys (converts to `Arc<str>`).
90    pub fn from_string_fields(
91        event_type: impl Into<Arc<str>>,
92        data: FxIndexMap<String, Value>,
93    ) -> Self {
94        let converted: FxIndexMap<Arc<str>, Value> =
95            data.into_iter().map(|(k, v)| (Arc::from(k), v)).collect();
96        Self {
97            event_type: event_type.into(),
98            timestamp: Utc::now(),
99            data: converted,
100        }
101    }
102
103    /// Creates a new event from pre-built fields map with a specific timestamp.
104    pub fn from_fields_with_timestamp(
105        event_type: impl Into<Arc<str>>,
106        timestamp: DateTime<Utc>,
107        data: FxIndexMap<Arc<str>, Value>,
108    ) -> Self {
109        Self {
110            event_type: event_type.into(),
111            timestamp,
112            data,
113        }
114    }
115
116    /// Sets the event's timestamp (builder pattern).
117    pub const fn with_timestamp(mut self, ts: DateTime<Utc>) -> Self {
118        self.timestamp = ts;
119        self
120    }
121
122    /// Adds a field to the event (builder pattern).
123    pub fn with_field(mut self, key: impl Into<Arc<str>>, value: impl Into<Value>) -> Self {
124        self.data.insert(key.into(), value.into());
125        self
126    }
127
128    /// Looks up a field value by name.
129    pub fn get(&self, key: &str) -> Option<&Value> {
130        self.data.get(key)
131    }
132
133    /// Looks up a field and extracts it as `f64`.
134    pub fn get_float(&self, key: &str) -> Option<f64> {
135        self.data.get(key).and_then(|v| v.as_float())
136    }
137
138    /// Looks up a field and extracts it as `i64`.
139    pub fn get_int(&self, key: &str) -> Option<i64> {
140        self.data.get(key).and_then(|v| v.as_int())
141    }
142
143    /// Looks up a field and extracts it as a string slice.
144    pub fn get_str(&self, key: &str) -> Option<&str> {
145        self.data.get(key).and_then(|v| v.as_str())
146    }
147
148    /// Serialize for sink output: event_type + timestamp + data fields.
149    pub fn to_sink_payload(&self) -> Vec<u8> {
150        use serde::ser::SerializeMap;
151        use serde::Serializer;
152        let mut buf = Vec::with_capacity(256);
153        let mut ser = serde_json::Serializer::new(&mut buf);
154        let mut map = ser
155            .serialize_map(Some(2 + self.data.len()))
156            .expect("serialize_map to Vec<u8> should not fail");
157        map.serialize_entry("event_type", self.event_type.as_ref())
158            .expect("serializing event_type entry should not fail");
159        map.serialize_entry("timestamp", &self.timestamp)
160            .expect("serializing timestamp entry should not fail");
161        for (k, v) in &self.data {
162            if k.as_ref() != "timestamp" {
163                map.serialize_entry(k.as_ref(), v)
164                    .expect("serializing data field entry should not fail");
165            }
166        }
167        map.end()
168            .expect("finalizing JSON map serialization should not fail");
169        buf
170    }
171}
172
173#[cfg(test)]
174#[allow(clippy::unwrap_used)]
175mod tests {
176    use chrono::TimeZone;
177
178    use super::*;
179
180    #[test]
181    fn test_event_new() {
182        let event = Event::new("TestEvent");
183        assert_eq!(&*event.event_type, "TestEvent");
184        assert!(event.data.is_empty());
185    }
186
187    #[test]
188    fn test_event_new_from_string() {
189        let event = Event::new("TestEvent".to_string());
190        assert_eq!(&*event.event_type, "TestEvent");
191    }
192
193    #[test]
194    fn test_event_with_timestamp() {
195        let ts = Utc.with_ymd_and_hms(2025, 1, 15, 10, 30, 0).unwrap();
196        let event = Event::new("Test").with_timestamp(ts);
197        assert_eq!(event.timestamp, ts);
198    }
199
200    #[test]
201    fn test_event_with_field() {
202        let event = Event::new("Test")
203            .with_field("name", "value")
204            .with_field("count", 42i64);
205
206        assert_eq!(event.data.len(), 2);
207        assert_eq!(event.get("name"), Some(&Value::Str("value".into())));
208        assert_eq!(event.get("count"), Some(&Value::Int(42)));
209    }
210
211    #[test]
212    fn test_event_get_float() {
213        let event = Event::new("Test")
214            .with_field("price", 19.99f64)
215            .with_field("quantity", 5i64);
216
217        assert_eq!(event.get_float("price"), Some(19.99));
218        assert_eq!(event.get_float("quantity"), Some(5.0));
219        assert_eq!(event.get_float("missing"), None);
220    }
221
222    #[test]
223    fn test_event_get_int() {
224        let event = Event::new("Test")
225            .with_field("count", 42i64)
226            .with_field("ratio", 3.7f64);
227
228        assert_eq!(event.get_int("count"), Some(42));
229        assert_eq!(event.get_int("ratio"), Some(3));
230        assert_eq!(event.get_int("missing"), None);
231    }
232
233    #[test]
234    fn test_event_get_str() {
235        let event = Event::new("Test").with_field("name", "Alice");
236        assert_eq!(event.get_str("name"), Some("Alice"));
237        assert_eq!(event.get_str("missing"), None);
238    }
239
240    #[test]
241    fn test_event_overwrite_field() {
242        let event = Event::new("Test")
243            .with_field("key", "first")
244            .with_field("key", "second");
245
246        assert_eq!(event.get_str("key"), Some("second"));
247        assert_eq!(event.data.len(), 1);
248    }
249
250    #[test]
251    fn test_event_new_at_avoids_now() {
252        let ts = Utc.with_ymd_and_hms(2020, 6, 15, 12, 0, 0).unwrap();
253        let event = Event::new_at("Sensor", ts);
254        assert_eq!(&*event.event_type, "Sensor");
255        assert_eq!(event.timestamp, ts);
256        assert!(event.data.is_empty());
257    }
258
259    #[test]
260    fn test_event_with_capacity_preallocates() {
261        let event = Event::with_capacity("BigEvent", 10)
262            .with_field("a", 1i64)
263            .with_field("b", 2i64);
264        assert_eq!(event.data.len(), 2);
265        // Capacity should be at least 10 (may be rounded up by allocator)
266        assert!(event.data.capacity() >= 10);
267    }
268
269    #[test]
270    fn test_event_with_capacity_at() {
271        let ts = Utc.with_ymd_and_hms(2025, 3, 1, 0, 0, 0).unwrap();
272        let event = Event::with_capacity_at("Batch", 5, ts);
273        assert_eq!(&*event.event_type, "Batch");
274        assert_eq!(event.timestamp, ts);
275        assert!(event.data.capacity() >= 5);
276    }
277
278    #[test]
279    fn test_event_from_fields() {
280        let mut data: FxIndexMap<Arc<str>, Value> = IndexMap::with_hasher(FxBuildHasher);
281        data.insert(Arc::from("x"), Value::Float(1.5));
282        data.insert(Arc::from("y"), Value::Float(2.5));
283        let event = Event::from_fields("Point", data);
284        assert_eq!(&*event.event_type, "Point");
285        assert_eq!(event.get_float("x"), Some(1.5));
286        assert_eq!(event.get_float("y"), Some(2.5));
287    }
288
289    #[test]
290    fn test_event_from_string_fields_converts_keys() {
291        let mut data: FxIndexMap<String, Value> = IndexMap::with_hasher(FxBuildHasher);
292        data.insert("name".to_string(), Value::str("Alice"));
293        let event = Event::from_string_fields("User", data);
294        assert_eq!(event.get_str("name"), Some("Alice"));
295    }
296
297    #[test]
298    fn test_event_to_sink_payload_json() {
299        let ts = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
300        let event = Event::new("Order")
301            .with_timestamp(ts)
302            .with_field("amount", 99i64);
303        let payload = event.to_sink_payload();
304        let json: serde_json::Value = serde_json::from_slice(&payload).unwrap();
305        assert_eq!(json["event_type"], "Order");
306        assert_eq!(json["amount"], 99);
307        // timestamp should be present in the output
308        assert!(json.get("timestamp").is_some());
309    }
310}