1use crate::error::{EventError, Result};
10use crate::types::Event;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14pub const SPEC_VERSION: &str = "1.0";
16
17pub const DEFAULT_DATA_CONTENT_TYPE: &str = "application/json";
19
20#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26#[serde(rename_all = "camelCase")]
27pub struct CloudEvent {
28 pub specversion: String,
32
33 pub id: String,
35
36 pub source: String,
38
39 #[serde(rename = "type")]
41 pub event_type: String,
42
43 #[serde(default, skip_serializing_if = "Option::is_none")]
47 pub datacontenttype: Option<String>,
48
49 #[serde(default, skip_serializing_if = "Option::is_none")]
51 pub dataschema: Option<String>,
52
53 #[serde(default, skip_serializing_if = "Option::is_none")]
55 pub subject: Option<String>,
56
57 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub time: Option<String>,
60
61 #[serde(default, skip_serializing_if = "Option::is_none")]
65 pub data: Option<serde_json::Value>,
66
67 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
71 pub extensions: HashMap<String, serde_json::Value>,
72}
73
74impl CloudEvent {
75 pub fn new(
77 id: impl Into<String>,
78 source: impl Into<String>,
79 event_type: impl Into<String>,
80 ) -> Self {
81 Self {
82 specversion: SPEC_VERSION.to_string(),
83 id: id.into(),
84 source: source.into(),
85 event_type: event_type.into(),
86 datacontenttype: None,
87 dataschema: None,
88 subject: None,
89 time: None,
90 data: None,
91 extensions: HashMap::new(),
92 }
93 }
94
95 pub fn with_data(mut self, data: serde_json::Value) -> Self {
97 self.datacontenttype = Some(DEFAULT_DATA_CONTENT_TYPE.to_string());
98 self.data = Some(data);
99 self
100 }
101
102 pub fn with_subject(mut self, subject: impl Into<String>) -> Self {
104 self.subject = Some(subject.into());
105 self
106 }
107
108 pub fn with_time(mut self, time: impl Into<String>) -> Self {
110 self.time = Some(time.into());
111 self
112 }
113
114 pub fn with_extension(
116 mut self,
117 key: impl Into<String>,
118 value: impl Into<serde_json::Value>,
119 ) -> Self {
120 self.extensions.insert(key.into(), value.into());
121 self
122 }
123}
124
125impl From<Event> for CloudEvent {
134 fn from(event: Event) -> Self {
135 let event_type = if event.event_type.is_empty() {
136 "a3s.event".to_string()
137 } else {
138 event.event_type.clone()
139 };
140
141 let time = millis_to_rfc3339(event.timestamp);
142
143 let mut ce = CloudEvent {
144 specversion: SPEC_VERSION.to_string(),
145 id: event.id.clone(),
146 source: event.source.clone(),
147 event_type,
148 datacontenttype: Some(DEFAULT_DATA_CONTENT_TYPE.to_string()),
149 dataschema: None,
150 subject: Some(event.subject.clone()),
151 time: Some(time),
152 data: Some(event.payload.clone()),
153 extensions: HashMap::new(),
154 };
155
156 ce.extensions.insert(
158 "a3scategory".to_string(),
159 serde_json::Value::String(event.category.clone()),
160 );
161 ce.extensions.insert(
162 "a3sversion".to_string(),
163 serde_json::Value::Number(event.version.into()),
164 );
165 ce.extensions.insert(
166 "a3ssummary".to_string(),
167 serde_json::Value::String(event.summary.clone()),
168 );
169 ce.extensions.insert(
170 "a3stimestamp".to_string(),
171 serde_json::Value::Number(event.timestamp.into()),
172 );
173
174 if !event.event_type.is_empty() {
176 ce.extensions.insert(
177 "a3seventtype".to_string(),
178 serde_json::Value::String(event.event_type),
179 );
180 }
181
182 for (key, value) in &event.metadata {
184 ce.extensions.insert(
185 format!("a3smeta_{}", key),
186 serde_json::Value::String(value.clone()),
187 );
188 }
189
190 ce
191 }
192}
193
194impl TryFrom<CloudEvent> for Event {
199 type Error = EventError;
200
201 fn try_from(ce: CloudEvent) -> Result<Self> {
202 let category = ce
203 .extensions
204 .get("a3scategory")
205 .and_then(|v| v.as_str())
206 .unwrap_or("")
207 .to_string();
208
209 let version = ce
210 .extensions
211 .get("a3sversion")
212 .and_then(|v| v.as_u64())
213 .unwrap_or(1) as u32;
214
215 let summary = ce
216 .extensions
217 .get("a3ssummary")
218 .and_then(|v| v.as_str())
219 .unwrap_or("")
220 .to_string();
221
222 let timestamp = ce
223 .extensions
224 .get("a3stimestamp")
225 .and_then(|v| v.as_u64())
226 .unwrap_or_else(|| {
227 ce.time
229 .as_deref()
230 .and_then(rfc3339_to_millis)
231 .unwrap_or(0)
232 });
233
234 let event_type = ce
236 .extensions
237 .get("a3seventtype")
238 .and_then(|v| v.as_str())
239 .map(|s| s.to_string())
240 .unwrap_or_else(|| {
241 if ce.event_type == "a3s.event" {
242 String::new()
243 } else {
244 ce.event_type.clone()
245 }
246 });
247
248 let subject = ce.subject.unwrap_or_default();
249 let payload = ce.data.unwrap_or(serde_json::Value::Null);
250
251 let mut metadata = HashMap::new();
253 for (key, value) in &ce.extensions {
254 if let Some(meta_key) = key.strip_prefix("a3smeta_") {
255 if let Some(meta_val) = value.as_str() {
256 metadata.insert(meta_key.to_string(), meta_val.to_string());
257 }
258 }
259 }
260
261 Ok(Event {
262 id: ce.id,
263 subject,
264 category,
265 event_type,
266 version,
267 payload,
268 summary,
269 source: ce.source,
270 timestamp,
271 metadata,
272 })
273 }
274}
275
276fn millis_to_rfc3339(millis: u64) -> String {
278 let secs = (millis / 1000) as i64;
279 let nanos = ((millis % 1000) * 1_000_000) as u32;
280 let dt = chrono::DateTime::from_timestamp(secs, nanos);
281 match dt {
282 Some(dt) => dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
283 None => String::new(),
284 }
285}
286
287fn rfc3339_to_millis(s: &str) -> Option<u64> {
289 let dt = chrono::DateTime::parse_from_rfc3339(s).ok()?;
290 Some(dt.timestamp_millis() as u64)
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296
297 #[test]
298 fn test_cloudevent_creation() {
299 let ce = CloudEvent::new("evt-123", "test-source", "test.type");
300 assert_eq!(ce.specversion, "1.0");
301 assert_eq!(ce.id, "evt-123");
302 assert_eq!(ce.source, "test-source");
303 assert_eq!(ce.event_type, "test.type");
304 assert!(ce.subject.is_none());
305 assert!(ce.data.is_none());
306 }
307
308 #[test]
309 fn test_cloudevent_builder() {
310 let ce = CloudEvent::new("evt-1", "src", "type.a")
311 .with_data(serde_json::json!({"key": "value"}))
312 .with_subject("events.test.a")
313 .with_time("2024-01-01T00:00:00.000Z")
314 .with_extension("custom", serde_json::json!("ext-value"));
315
316 assert_eq!(ce.subject.as_deref(), Some("events.test.a"));
317 assert_eq!(ce.data.as_ref().unwrap()["key"], "value");
318 assert_eq!(
319 ce.datacontenttype.as_deref(),
320 Some("application/json")
321 );
322 assert_eq!(ce.time.as_deref(), Some("2024-01-01T00:00:00.000Z"));
323 assert_eq!(ce.extensions["custom"], "ext-value");
324 }
325
326 #[test]
327 fn test_cloudevent_serialization_roundtrip() {
328 let ce = CloudEvent::new("evt-1", "src", "type.a")
329 .with_data(serde_json::json!({"rate": 7.35}))
330 .with_subject("events.market.forex");
331
332 let json = serde_json::to_string(&ce).unwrap();
333 assert!(json.contains("\"specversion\":\"1.0\""));
334 assert!(json.contains("\"type\":\"type.a\""));
335
336 let parsed: CloudEvent = serde_json::from_str(&json).unwrap();
337 assert_eq!(parsed, ce);
338 }
339
340 #[test]
341 fn test_event_to_cloudevent_typed() {
342 let event = Event::typed(
343 "events.market.forex",
344 "market",
345 "forex.rate_change",
346 2,
347 "USD/CNY rate change",
348 "reuters",
349 serde_json::json!({"rate": 7.35}),
350 )
351 .with_metadata("region", "asia");
352
353 let ce: CloudEvent = event.clone().into();
354
355 assert_eq!(ce.specversion, "1.0");
356 assert_eq!(ce.id, event.id);
357 assert_eq!(ce.source, "reuters");
358 assert_eq!(ce.event_type, "forex.rate_change");
359 assert_eq!(ce.subject.as_deref(), Some("events.market.forex"));
360 assert_eq!(ce.data.as_ref().unwrap()["rate"], 7.35);
361 assert_eq!(ce.extensions["a3scategory"], "market");
362 assert_eq!(ce.extensions["a3sversion"], 2);
363 assert_eq!(ce.extensions["a3ssummary"], "USD/CNY rate change");
364 assert_eq!(ce.extensions["a3smeta_region"], "asia");
365 assert!(ce.time.is_some());
366 }
367
368 #[test]
369 fn test_event_to_cloudevent_untyped() {
370 let event = Event::new(
371 "events.test.a",
372 "test",
373 "Test event",
374 "test-src",
375 serde_json::json!({}),
376 );
377
378 let ce: CloudEvent = event.into();
379 assert_eq!(ce.event_type, "a3s.event");
380 assert!(!ce.extensions.contains_key("a3seventtype"));
382 }
383
384 #[test]
385 fn test_cloudevent_to_event_roundtrip_typed() {
386 let original = Event::typed(
387 "events.market.forex",
388 "market",
389 "forex.rate_change",
390 2,
391 "Rate change",
392 "reuters",
393 serde_json::json!({"rate": 7.35}),
394 )
395 .with_metadata("env", "prod");
396
397 let ce: CloudEvent = original.clone().into();
398 let recovered: Event = ce.try_into().unwrap();
399
400 assert_eq!(recovered.id, original.id);
401 assert_eq!(recovered.subject, original.subject);
402 assert_eq!(recovered.category, original.category);
403 assert_eq!(recovered.event_type, original.event_type);
404 assert_eq!(recovered.version, original.version);
405 assert_eq!(recovered.summary, original.summary);
406 assert_eq!(recovered.source, original.source);
407 assert_eq!(recovered.timestamp, original.timestamp);
408 assert_eq!(recovered.payload, original.payload);
409 assert_eq!(recovered.metadata["env"], "prod");
410 }
411
412 #[test]
413 fn test_cloudevent_to_event_roundtrip_untyped() {
414 let original = Event::new(
415 "events.test.a",
416 "test",
417 "Test",
418 "src",
419 serde_json::json!({"key": "val"}),
420 );
421
422 let ce: CloudEvent = original.clone().into();
423 let recovered: Event = ce.try_into().unwrap();
424
425 assert_eq!(recovered.id, original.id);
426 assert_eq!(recovered.event_type, ""); assert_eq!(recovered.version, 1);
428 }
429
430 #[test]
431 fn test_cloudevent_from_external_source() {
432 let ce = CloudEvent::new("ext-1", "external-service", "com.example.order.created")
434 .with_data(serde_json::json!({"order_id": "ORD-123"}))
435 .with_subject("orders")
436 .with_time("2024-06-15T10:30:00.000Z");
437
438 let event: Event = ce.try_into().unwrap();
439
440 assert_eq!(event.id, "ext-1");
441 assert_eq!(event.source, "external-service");
442 assert_eq!(event.event_type, "com.example.order.created");
443 assert_eq!(event.subject, "orders");
444 assert_eq!(event.category, ""); assert_eq!(event.version, 1); assert_eq!(event.payload["order_id"], "ORD-123");
447 }
448
449 #[test]
450 fn test_cloudevent_no_data() {
451 let ce = CloudEvent::new("evt-1", "src", "type.a");
452 let event: Event = ce.try_into().unwrap();
453 assert_eq!(event.payload, serde_json::Value::Null);
454 }
455
456 #[test]
457 fn test_millis_to_rfc3339() {
458 let time = millis_to_rfc3339(1700000000000);
459 assert!(time.contains("2023-11-14"));
460 }
461
462 #[test]
463 fn test_rfc3339_to_millis() {
464 let millis = rfc3339_to_millis("2023-11-14T22:13:20.000Z").unwrap();
465 assert_eq!(millis, 1700000000000);
466 }
467
468 #[test]
469 fn test_rfc3339_roundtrip() {
470 let original_ms = 1700000000123u64;
471 let rfc = millis_to_rfc3339(original_ms);
472 let recovered = rfc3339_to_millis(&rfc).unwrap();
473 assert_eq!(recovered, original_ms);
474 }
475
476 #[test]
477 fn test_cloudevent_multiple_metadata_roundtrip() {
478 let original = Event::new(
479 "events.test.a",
480 "test",
481 "Test",
482 "src",
483 serde_json::json!({}),
484 )
485 .with_metadata("key1", "val1")
486 .with_metadata("key2", "val2")
487 .with_metadata("key3", "val3");
488
489 let ce: CloudEvent = original.clone().into();
490 let recovered: Event = ce.try_into().unwrap();
491
492 assert_eq!(recovered.metadata.len(), 3);
493 assert_eq!(recovered.metadata["key1"], "val1");
494 assert_eq!(recovered.metadata["key2"], "val2");
495 assert_eq!(recovered.metadata["key3"], "val3");
496 }
497
498 #[test]
499 fn test_cloudevent_json_wire_format() {
500 let ce = CloudEvent::new("evt-wire", "a3s", "a3s.gateway.scale.up")
501 .with_data(serde_json::json!({"replicas": 3}))
502 .with_subject("scaling.gateway");
503
504 let json = serde_json::to_value(&ce).unwrap();
505 assert_eq!(json["specversion"], "1.0");
507 assert_eq!(json["id"], "evt-wire");
508 assert_eq!(json["source"], "a3s");
509 assert_eq!(json["type"], "a3s.gateway.scale.up");
510 assert_eq!(json["data"]["replicas"], 3);
511 }
512
513 #[test]
514 fn test_spec_version_constant() {
515 assert_eq!(SPEC_VERSION, "1.0");
516 assert_eq!(DEFAULT_DATA_CONTENT_TYPE, "application/json");
517 }
518}