1use std::sync::Arc;
8
9use chrono::{DateTime, Utc};
10use indexmap::IndexMap;
11use rustc_hash::FxBuildHasher;
12use serde::{Deserialize, Serialize};
13
14pub use crate::value::FxIndexMap;
15use crate::Value;
16
17pub type FieldKey = Arc<str>;
19
20pub type SharedEvent = Arc<Event>;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct Event {
28 pub event_type: Arc<str>,
30 #[serde(default = "Utc::now")]
32 pub timestamp: DateTime<Utc>,
33 pub data: FxIndexMap<Arc<str>, Value>,
35}
36
37impl Event {
38 pub fn new(event_type: impl Into<Arc<str>>) -> Self {
40 Self {
41 event_type: event_type.into(),
42 timestamp: Utc::now(),
43 data: IndexMap::with_hasher(FxBuildHasher),
44 }
45 }
46
47 pub fn new_at(event_type: impl Into<Arc<str>>, timestamp: DateTime<Utc>) -> Self {
49 Self {
50 event_type: event_type.into(),
51 timestamp,
52 data: IndexMap::with_hasher(FxBuildHasher),
53 }
54 }
55
56 pub fn with_capacity(event_type: impl Into<Arc<str>>, capacity: usize) -> Self {
59 Self {
60 event_type: event_type.into(),
61 timestamp: Utc::now(),
62 data: IndexMap::with_capacity_and_hasher(capacity, FxBuildHasher),
63 }
64 }
65
66 pub fn with_capacity_at(
68 event_type: impl Into<Arc<str>>,
69 capacity: usize,
70 timestamp: DateTime<Utc>,
71 ) -> Self {
72 Self {
73 event_type: event_type.into(),
74 timestamp,
75 data: IndexMap::with_capacity_and_hasher(capacity, FxBuildHasher),
76 }
77 }
78
79 pub fn from_fields(event_type: impl Into<Arc<str>>, data: FxIndexMap<Arc<str>, Value>) -> Self {
82 Self {
83 event_type: event_type.into(),
84 timestamp: Utc::now(),
85 data,
86 }
87 }
88
89 pub fn from_string_fields(
91 event_type: impl Into<Arc<str>>,
92 data: FxIndexMap<String, Value>,
93 ) -> Self {
94 let converted: FxIndexMap<Arc<str>, Value> =
95 data.into_iter().map(|(k, v)| (Arc::from(k), v)).collect();
96 Self {
97 event_type: event_type.into(),
98 timestamp: Utc::now(),
99 data: converted,
100 }
101 }
102
103 pub fn from_fields_with_timestamp(
105 event_type: impl Into<Arc<str>>,
106 timestamp: DateTime<Utc>,
107 data: FxIndexMap<Arc<str>, Value>,
108 ) -> Self {
109 Self {
110 event_type: event_type.into(),
111 timestamp,
112 data,
113 }
114 }
115
116 pub const fn with_timestamp(mut self, ts: DateTime<Utc>) -> Self {
118 self.timestamp = ts;
119 self
120 }
121
122 pub fn with_field(mut self, key: impl Into<Arc<str>>, value: impl Into<Value>) -> Self {
124 self.data.insert(key.into(), value.into());
125 self
126 }
127
128 pub fn get(&self, key: &str) -> Option<&Value> {
130 self.data.get(key)
131 }
132
133 pub fn get_float(&self, key: &str) -> Option<f64> {
135 self.data.get(key).and_then(|v| v.as_float())
136 }
137
138 pub fn get_int(&self, key: &str) -> Option<i64> {
140 self.data.get(key).and_then(|v| v.as_int())
141 }
142
143 pub fn get_str(&self, key: &str) -> Option<&str> {
145 self.data.get(key).and_then(|v| v.as_str())
146 }
147
148 pub fn to_sink_payload(&self) -> Vec<u8> {
150 use serde::ser::SerializeMap;
151 use serde::Serializer;
152 let mut buf = Vec::with_capacity(256);
153 let mut ser = serde_json::Serializer::new(&mut buf);
154 let mut map = ser
155 .serialize_map(Some(2 + self.data.len()))
156 .expect("serialize_map to Vec<u8> should not fail");
157 map.serialize_entry("event_type", self.event_type.as_ref())
158 .expect("serializing event_type entry should not fail");
159 map.serialize_entry("timestamp", &self.timestamp)
160 .expect("serializing timestamp entry should not fail");
161 for (k, v) in &self.data {
162 if k.as_ref() != "timestamp" {
163 map.serialize_entry(k.as_ref(), v)
164 .expect("serializing data field entry should not fail");
165 }
166 }
167 map.end()
168 .expect("finalizing JSON map serialization should not fail");
169 buf
170 }
171}
172
173#[cfg(test)]
174#[allow(clippy::unwrap_used)]
175mod tests {
176 use chrono::TimeZone;
177
178 use super::*;
179
180 #[test]
181 fn test_event_new() {
182 let event = Event::new("TestEvent");
183 assert_eq!(&*event.event_type, "TestEvent");
184 assert!(event.data.is_empty());
185 }
186
187 #[test]
188 fn test_event_new_from_string() {
189 let event = Event::new("TestEvent".to_string());
190 assert_eq!(&*event.event_type, "TestEvent");
191 }
192
193 #[test]
194 fn test_event_with_timestamp() {
195 let ts = Utc.with_ymd_and_hms(2025, 1, 15, 10, 30, 0).unwrap();
196 let event = Event::new("Test").with_timestamp(ts);
197 assert_eq!(event.timestamp, ts);
198 }
199
200 #[test]
201 fn test_event_with_field() {
202 let event = Event::new("Test")
203 .with_field("name", "value")
204 .with_field("count", 42i64);
205
206 assert_eq!(event.data.len(), 2);
207 assert_eq!(event.get("name"), Some(&Value::Str("value".into())));
208 assert_eq!(event.get("count"), Some(&Value::Int(42)));
209 }
210
211 #[test]
212 fn test_event_get_float() {
213 let event = Event::new("Test")
214 .with_field("price", 19.99f64)
215 .with_field("quantity", 5i64);
216
217 assert_eq!(event.get_float("price"), Some(19.99));
218 assert_eq!(event.get_float("quantity"), Some(5.0));
219 assert_eq!(event.get_float("missing"), None);
220 }
221
222 #[test]
223 fn test_event_get_int() {
224 let event = Event::new("Test")
225 .with_field("count", 42i64)
226 .with_field("ratio", 3.7f64);
227
228 assert_eq!(event.get_int("count"), Some(42));
229 assert_eq!(event.get_int("ratio"), Some(3));
230 assert_eq!(event.get_int("missing"), None);
231 }
232
233 #[test]
234 fn test_event_get_str() {
235 let event = Event::new("Test").with_field("name", "Alice");
236 assert_eq!(event.get_str("name"), Some("Alice"));
237 assert_eq!(event.get_str("missing"), None);
238 }
239
240 #[test]
241 fn test_event_overwrite_field() {
242 let event = Event::new("Test")
243 .with_field("key", "first")
244 .with_field("key", "second");
245
246 assert_eq!(event.get_str("key"), Some("second"));
247 assert_eq!(event.data.len(), 1);
248 }
249
250 #[test]
251 fn test_event_new_at_avoids_now() {
252 let ts = Utc.with_ymd_and_hms(2020, 6, 15, 12, 0, 0).unwrap();
253 let event = Event::new_at("Sensor", ts);
254 assert_eq!(&*event.event_type, "Sensor");
255 assert_eq!(event.timestamp, ts);
256 assert!(event.data.is_empty());
257 }
258
259 #[test]
260 fn test_event_with_capacity_preallocates() {
261 let event = Event::with_capacity("BigEvent", 10)
262 .with_field("a", 1i64)
263 .with_field("b", 2i64);
264 assert_eq!(event.data.len(), 2);
265 assert!(event.data.capacity() >= 10);
267 }
268
269 #[test]
270 fn test_event_with_capacity_at() {
271 let ts = Utc.with_ymd_and_hms(2025, 3, 1, 0, 0, 0).unwrap();
272 let event = Event::with_capacity_at("Batch", 5, ts);
273 assert_eq!(&*event.event_type, "Batch");
274 assert_eq!(event.timestamp, ts);
275 assert!(event.data.capacity() >= 5);
276 }
277
278 #[test]
279 fn test_event_from_fields() {
280 let mut data: FxIndexMap<Arc<str>, Value> = IndexMap::with_hasher(FxBuildHasher);
281 data.insert(Arc::from("x"), Value::Float(1.5));
282 data.insert(Arc::from("y"), Value::Float(2.5));
283 let event = Event::from_fields("Point", data);
284 assert_eq!(&*event.event_type, "Point");
285 assert_eq!(event.get_float("x"), Some(1.5));
286 assert_eq!(event.get_float("y"), Some(2.5));
287 }
288
289 #[test]
290 fn test_event_from_string_fields_converts_keys() {
291 let mut data: FxIndexMap<String, Value> = IndexMap::with_hasher(FxBuildHasher);
292 data.insert("name".to_string(), Value::str("Alice"));
293 let event = Event::from_string_fields("User", data);
294 assert_eq!(event.get_str("name"), Some("Alice"));
295 }
296
297 #[test]
298 fn test_event_to_sink_payload_json() {
299 let ts = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
300 let event = Event::new("Order")
301 .with_timestamp(ts)
302 .with_field("amount", 99i64);
303 let payload = event.to_sink_payload();
304 let json: serde_json::Value = serde_json::from_slice(&payload).unwrap();
305 assert_eq!(json["event_type"], "Order");
306 assert_eq!(json["amount"], 99);
307 assert!(json.get("timestamp").is_some());
309 }
310}