Skip to main content

a3s_event/
cloudevents.rs

1//! CloudEvents v1.0 envelope for A3S events
2//!
3//! Provides a `CloudEvent` struct conforming to the CloudEvents v1.0 specification,
4//! with lossless conversion to/from the internal `Event` type. A3S-specific fields
5//! are stored as extension attributes with the `a3s` prefix.
6//!
7//! Manual implementation — no dependency on `cloudevents-sdk`.
8
9use crate::error::{EventError, Result};
10use crate::types::Event;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13
14/// CloudEvents specification version
15pub const SPEC_VERSION: &str = "1.0";
16
17/// Default data content type for A3S events
18pub const DEFAULT_DATA_CONTENT_TYPE: &str = "application/json";
19
20/// CloudEvents v1.0 envelope
21///
22/// Required attributes: `specversion`, `id`, `source`, `type`.
23/// Optional attributes: `datacontenttype`, `dataschema`, `subject`, `time`.
24/// Extension attributes stored in `extensions`.
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26#[serde(rename_all = "camelCase")]
27pub struct CloudEvent {
28    // ── Required attributes ──
29
30    /// CloudEvents spec version (always "1.0")
31    pub specversion: String,
32
33    /// Event identifier (maps to Event.id)
34    pub id: String,
35
36    /// Event source (maps to Event.source)
37    pub source: String,
38
39    /// Event type (maps to Event.event_type, or "a3s.event" for untyped)
40    #[serde(rename = "type")]
41    pub event_type: String,
42
43    // ── Optional attributes ──
44
45    /// Data content type
46    #[serde(default, skip_serializing_if = "Option::is_none")]
47    pub datacontenttype: Option<String>,
48
49    /// Data schema URI
50    #[serde(default, skip_serializing_if = "Option::is_none")]
51    pub dataschema: Option<String>,
52
53    /// Event subject (maps to Event.subject)
54    #[serde(default, skip_serializing_if = "Option::is_none")]
55    pub subject: Option<String>,
56
57    /// Timestamp in RFC 3339 format
58    #[serde(default, skip_serializing_if = "Option::is_none")]
59    pub time: Option<String>,
60
61    // ── Data ──
62
63    /// Event payload
64    #[serde(default, skip_serializing_if = "Option::is_none")]
65    pub data: Option<serde_json::Value>,
66
67    // ── Extension attributes ──
68
69    /// Extension attributes (includes a3s-prefixed fields)
70    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
71    pub extensions: HashMap<String, serde_json::Value>,
72}
73
74impl CloudEvent {
75    /// Create a new CloudEvent with required attributes
76    pub fn new(
77        id: impl Into<String>,
78        source: impl Into<String>,
79        event_type: impl Into<String>,
80    ) -> Self {
81        Self {
82            specversion: SPEC_VERSION.to_string(),
83            id: id.into(),
84            source: source.into(),
85            event_type: event_type.into(),
86            datacontenttype: None,
87            dataschema: None,
88            subject: None,
89            time: None,
90            data: None,
91            extensions: HashMap::new(),
92        }
93    }
94
95    /// Set the data payload
96    pub fn with_data(mut self, data: serde_json::Value) -> Self {
97        self.datacontenttype = Some(DEFAULT_DATA_CONTENT_TYPE.to_string());
98        self.data = Some(data);
99        self
100    }
101
102    /// Set the subject
103    pub fn with_subject(mut self, subject: impl Into<String>) -> Self {
104        self.subject = Some(subject.into());
105        self
106    }
107
108    /// Set the time
109    pub fn with_time(mut self, time: impl Into<String>) -> Self {
110        self.time = Some(time.into());
111        self
112    }
113
114    /// Add an extension attribute
115    pub fn with_extension(
116        mut self,
117        key: impl Into<String>,
118        value: impl Into<serde_json::Value>,
119    ) -> Self {
120        self.extensions.insert(key.into(), value.into());
121        self
122    }
123}
124
125/// Convert an A3S Event to a CloudEvent (lossless)
126///
127/// A3S-specific fields are stored as extension attributes:
128/// - `a3scategory` — Event.category
129/// - `a3sversion` — Event.version
130/// - `a3ssummary` — Event.summary
131/// - `a3stimestamp` — Event.timestamp (Unix ms)
132/// - `a3smeta_<key>` — each metadata entry
133impl From<Event> for CloudEvent {
134    fn from(event: Event) -> Self {
135        let event_type = if event.event_type.is_empty() {
136            "a3s.event".to_string()
137        } else {
138            event.event_type.clone()
139        };
140
141        let time = millis_to_rfc3339(event.timestamp);
142
143        let mut ce = CloudEvent {
144            specversion: SPEC_VERSION.to_string(),
145            id: event.id.clone(),
146            source: event.source.clone(),
147            event_type,
148            datacontenttype: Some(DEFAULT_DATA_CONTENT_TYPE.to_string()),
149            dataschema: None,
150            subject: Some(event.subject.clone()),
151            time: Some(time),
152            data: Some(event.payload.clone()),
153            extensions: HashMap::new(),
154        };
155
156        // Store A3S-specific fields as extensions
157        ce.extensions.insert(
158            "a3scategory".to_string(),
159            serde_json::Value::String(event.category.clone()),
160        );
161        ce.extensions.insert(
162            "a3sversion".to_string(),
163            serde_json::Value::Number(event.version.into()),
164        );
165        ce.extensions.insert(
166            "a3ssummary".to_string(),
167            serde_json::Value::String(event.summary.clone()),
168        );
169        ce.extensions.insert(
170            "a3stimestamp".to_string(),
171            serde_json::Value::Number(event.timestamp.into()),
172        );
173
174        // Store original event_type if it was set
175        if !event.event_type.is_empty() {
176            ce.extensions.insert(
177                "a3seventtype".to_string(),
178                serde_json::Value::String(event.event_type),
179            );
180        }
181
182        // Store metadata as prefixed extensions
183        for (key, value) in &event.metadata {
184            ce.extensions.insert(
185                format!("a3smeta_{}", key),
186                serde_json::Value::String(value.clone()),
187            );
188        }
189
190        ce
191    }
192}
193
194/// Convert a CloudEvent back to an A3S Event
195///
196/// Extracts A3S-specific fields from extension attributes.
197/// Fails if required A3S extensions are missing.
198impl TryFrom<CloudEvent> for Event {
199    type Error = EventError;
200
201    fn try_from(ce: CloudEvent) -> Result<Self> {
202        let category = ce
203            .extensions
204            .get("a3scategory")
205            .and_then(|v| v.as_str())
206            .unwrap_or("")
207            .to_string();
208
209        let version = ce
210            .extensions
211            .get("a3sversion")
212            .and_then(|v| v.as_u64())
213            .unwrap_or(1) as u32;
214
215        let summary = ce
216            .extensions
217            .get("a3ssummary")
218            .and_then(|v| v.as_str())
219            .unwrap_or("")
220            .to_string();
221
222        let timestamp = ce
223            .extensions
224            .get("a3stimestamp")
225            .and_then(|v| v.as_u64())
226            .unwrap_or_else(|| {
227                // Fall back to parsing RFC 3339 time
228                ce.time
229                    .as_deref()
230                    .and_then(rfc3339_to_millis)
231                    .unwrap_or(0)
232            });
233
234        // Recover original event_type
235        let event_type = ce
236            .extensions
237            .get("a3seventtype")
238            .and_then(|v| v.as_str())
239            .map(|s| s.to_string())
240            .unwrap_or_else(|| {
241                if ce.event_type == "a3s.event" {
242                    String::new()
243                } else {
244                    ce.event_type.clone()
245                }
246            });
247
248        let subject = ce.subject.unwrap_or_default();
249        let payload = ce.data.unwrap_or(serde_json::Value::Null);
250
251        // Recover metadata from a3smeta_ prefixed extensions
252        let mut metadata = HashMap::new();
253        for (key, value) in &ce.extensions {
254            if let Some(meta_key) = key.strip_prefix("a3smeta_") {
255                if let Some(meta_val) = value.as_str() {
256                    metadata.insert(meta_key.to_string(), meta_val.to_string());
257                }
258            }
259        }
260
261        Ok(Event {
262            id: ce.id,
263            subject,
264            category,
265            event_type,
266            version,
267            payload,
268            summary,
269            source: ce.source,
270            timestamp,
271            metadata,
272        })
273    }
274}
275
276/// Convert Unix milliseconds to RFC 3339 string
277fn millis_to_rfc3339(millis: u64) -> String {
278    let secs = (millis / 1000) as i64;
279    let nanos = ((millis % 1000) * 1_000_000) as u32;
280    let dt = chrono::DateTime::from_timestamp(secs, nanos);
281    match dt {
282        Some(dt) => dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
283        None => String::new(),
284    }
285}
286
287/// Parse RFC 3339 string to Unix milliseconds
288fn rfc3339_to_millis(s: &str) -> Option<u64> {
289    let dt = chrono::DateTime::parse_from_rfc3339(s).ok()?;
290    Some(dt.timestamp_millis() as u64)
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296
297    #[test]
298    fn test_cloudevent_creation() {
299        let ce = CloudEvent::new("evt-123", "test-source", "test.type");
300        assert_eq!(ce.specversion, "1.0");
301        assert_eq!(ce.id, "evt-123");
302        assert_eq!(ce.source, "test-source");
303        assert_eq!(ce.event_type, "test.type");
304        assert!(ce.subject.is_none());
305        assert!(ce.data.is_none());
306    }
307
308    #[test]
309    fn test_cloudevent_builder() {
310        let ce = CloudEvent::new("evt-1", "src", "type.a")
311            .with_data(serde_json::json!({"key": "value"}))
312            .with_subject("events.test.a")
313            .with_time("2024-01-01T00:00:00.000Z")
314            .with_extension("custom", serde_json::json!("ext-value"));
315
316        assert_eq!(ce.subject.as_deref(), Some("events.test.a"));
317        assert_eq!(ce.data.as_ref().unwrap()["key"], "value");
318        assert_eq!(
319            ce.datacontenttype.as_deref(),
320            Some("application/json")
321        );
322        assert_eq!(ce.time.as_deref(), Some("2024-01-01T00:00:00.000Z"));
323        assert_eq!(ce.extensions["custom"], "ext-value");
324    }
325
326    #[test]
327    fn test_cloudevent_serialization_roundtrip() {
328        let ce = CloudEvent::new("evt-1", "src", "type.a")
329            .with_data(serde_json::json!({"rate": 7.35}))
330            .with_subject("events.market.forex");
331
332        let json = serde_json::to_string(&ce).unwrap();
333        assert!(json.contains("\"specversion\":\"1.0\""));
334        assert!(json.contains("\"type\":\"type.a\""));
335
336        let parsed: CloudEvent = serde_json::from_str(&json).unwrap();
337        assert_eq!(parsed, ce);
338    }
339
340    #[test]
341    fn test_event_to_cloudevent_typed() {
342        let event = Event::typed(
343            "events.market.forex",
344            "market",
345            "forex.rate_change",
346            2,
347            "USD/CNY rate change",
348            "reuters",
349            serde_json::json!({"rate": 7.35}),
350        )
351        .with_metadata("region", "asia");
352
353        let ce: CloudEvent = event.clone().into();
354
355        assert_eq!(ce.specversion, "1.0");
356        assert_eq!(ce.id, event.id);
357        assert_eq!(ce.source, "reuters");
358        assert_eq!(ce.event_type, "forex.rate_change");
359        assert_eq!(ce.subject.as_deref(), Some("events.market.forex"));
360        assert_eq!(ce.data.as_ref().unwrap()["rate"], 7.35);
361        assert_eq!(ce.extensions["a3scategory"], "market");
362        assert_eq!(ce.extensions["a3sversion"], 2);
363        assert_eq!(ce.extensions["a3ssummary"], "USD/CNY rate change");
364        assert_eq!(ce.extensions["a3smeta_region"], "asia");
365        assert!(ce.time.is_some());
366    }
367
368    #[test]
369    fn test_event_to_cloudevent_untyped() {
370        let event = Event::new(
371            "events.test.a",
372            "test",
373            "Test event",
374            "test-src",
375            serde_json::json!({}),
376        );
377
378        let ce: CloudEvent = event.into();
379        assert_eq!(ce.event_type, "a3s.event");
380        // Untyped events should NOT have a3seventtype extension
381        assert!(!ce.extensions.contains_key("a3seventtype"));
382    }
383
384    #[test]
385    fn test_cloudevent_to_event_roundtrip_typed() {
386        let original = Event::typed(
387            "events.market.forex",
388            "market",
389            "forex.rate_change",
390            2,
391            "Rate change",
392            "reuters",
393            serde_json::json!({"rate": 7.35}),
394        )
395        .with_metadata("env", "prod");
396
397        let ce: CloudEvent = original.clone().into();
398        let recovered: Event = ce.try_into().unwrap();
399
400        assert_eq!(recovered.id, original.id);
401        assert_eq!(recovered.subject, original.subject);
402        assert_eq!(recovered.category, original.category);
403        assert_eq!(recovered.event_type, original.event_type);
404        assert_eq!(recovered.version, original.version);
405        assert_eq!(recovered.summary, original.summary);
406        assert_eq!(recovered.source, original.source);
407        assert_eq!(recovered.timestamp, original.timestamp);
408        assert_eq!(recovered.payload, original.payload);
409        assert_eq!(recovered.metadata["env"], "prod");
410    }
411
412    #[test]
413    fn test_cloudevent_to_event_roundtrip_untyped() {
414        let original = Event::new(
415            "events.test.a",
416            "test",
417            "Test",
418            "src",
419            serde_json::json!({"key": "val"}),
420        );
421
422        let ce: CloudEvent = original.clone().into();
423        let recovered: Event = ce.try_into().unwrap();
424
425        assert_eq!(recovered.id, original.id);
426        assert_eq!(recovered.event_type, ""); // restored to empty
427        assert_eq!(recovered.version, 1);
428    }
429
430    #[test]
431    fn test_cloudevent_from_external_source() {
432        // Simulate a CloudEvent not created from an A3S Event
433        let ce = CloudEvent::new("ext-1", "external-service", "com.example.order.created")
434            .with_data(serde_json::json!({"order_id": "ORD-123"}))
435            .with_subject("orders")
436            .with_time("2024-06-15T10:30:00.000Z");
437
438        let event: Event = ce.try_into().unwrap();
439
440        assert_eq!(event.id, "ext-1");
441        assert_eq!(event.source, "external-service");
442        assert_eq!(event.event_type, "com.example.order.created");
443        assert_eq!(event.subject, "orders");
444        assert_eq!(event.category, ""); // no a3scategory extension
445        assert_eq!(event.version, 1); // default
446        assert_eq!(event.payload["order_id"], "ORD-123");
447    }
448
449    #[test]
450    fn test_cloudevent_no_data() {
451        let ce = CloudEvent::new("evt-1", "src", "type.a");
452        let event: Event = ce.try_into().unwrap();
453        assert_eq!(event.payload, serde_json::Value::Null);
454    }
455
456    #[test]
457    fn test_millis_to_rfc3339() {
458        let time = millis_to_rfc3339(1700000000000);
459        assert!(time.contains("2023-11-14"));
460    }
461
462    #[test]
463    fn test_rfc3339_to_millis() {
464        let millis = rfc3339_to_millis("2023-11-14T22:13:20.000Z").unwrap();
465        assert_eq!(millis, 1700000000000);
466    }
467
468    #[test]
469    fn test_rfc3339_roundtrip() {
470        let original_ms = 1700000000123u64;
471        let rfc = millis_to_rfc3339(original_ms);
472        let recovered = rfc3339_to_millis(&rfc).unwrap();
473        assert_eq!(recovered, original_ms);
474    }
475
476    #[test]
477    fn test_cloudevent_multiple_metadata_roundtrip() {
478        let original = Event::new(
479            "events.test.a",
480            "test",
481            "Test",
482            "src",
483            serde_json::json!({}),
484        )
485        .with_metadata("key1", "val1")
486        .with_metadata("key2", "val2")
487        .with_metadata("key3", "val3");
488
489        let ce: CloudEvent = original.clone().into();
490        let recovered: Event = ce.try_into().unwrap();
491
492        assert_eq!(recovered.metadata.len(), 3);
493        assert_eq!(recovered.metadata["key1"], "val1");
494        assert_eq!(recovered.metadata["key2"], "val2");
495        assert_eq!(recovered.metadata["key3"], "val3");
496    }
497
498    #[test]
499    fn test_cloudevent_json_wire_format() {
500        let ce = CloudEvent::new("evt-wire", "a3s", "a3s.gateway.scale.up")
501            .with_data(serde_json::json!({"replicas": 3}))
502            .with_subject("scaling.gateway");
503
504        let json = serde_json::to_value(&ce).unwrap();
505        // Verify CE required fields present at top level
506        assert_eq!(json["specversion"], "1.0");
507        assert_eq!(json["id"], "evt-wire");
508        assert_eq!(json["source"], "a3s");
509        assert_eq!(json["type"], "a3s.gateway.scale.up");
510        assert_eq!(json["data"]["replicas"], 3);
511    }
512
513    #[test]
514    fn test_spec_version_constant() {
515        assert_eq!(SPEC_VERSION, "1.0");
516        assert_eq!(DEFAULT_DATA_CONTENT_TYPE, "application/json");
517    }
518}