evidentsource_client/conversions/
events.rs

1//! Event and Transaction type conversions.
2
3use evidentsource_core::domain::{
4    Event, EventData, ExtensionValue, ProspectiveEvent, Transaction, TransactionSummary,
5};
6
7use crate::com::evidentsource as proto;
8use crate::io::cloudevents::v1 as proto_ce;
9
10use super::error::ConversionError;
11
12// =============================================================================
13// Event: Domain -> Proto
14// =============================================================================
15
16impl From<Event> for proto_ce::CloudEvent {
17    fn from(event: Event) -> Self {
18        use proto_ce::cloud_event::cloud_event_attribute_value::Attr;
19        use proto_ce::cloud_event::CloudEventAttributeValue;
20
21        let mut attributes = std::collections::HashMap::new();
22
23        // Add optional attributes
24        if let Some(subject) = event.subject {
25            attributes.insert(
26                "subject".to_string(),
27                CloudEventAttributeValue {
28                    attr: Some(Attr::CeString(subject)),
29                },
30            );
31        }
32
33        if let Some(time) = event.time {
34            attributes.insert(
35                "time".to_string(),
36                CloudEventAttributeValue {
37                    attr: Some(Attr::CeTimestamp(prost_types::Timestamp {
38                        seconds: time.timestamp(),
39                        nanos: time.timestamp_subsec_nanos() as i32,
40                    })),
41                },
42            );
43        }
44
45        if let Some(datacontenttype) = event.datacontenttype {
46            attributes.insert(
47                "datacontenttype".to_string(),
48                CloudEventAttributeValue {
49                    attr: Some(Attr::CeString(datacontenttype)),
50                },
51            );
52        }
53
54        if let Some(dataschema) = event.dataschema {
55            attributes.insert(
56                "dataschema".to_string(),
57                CloudEventAttributeValue {
58                    attr: Some(Attr::CeUri(dataschema)),
59                },
60            );
61        }
62
63        // Add extension attributes
64        for (key, value) in event.extensions {
65            let attr = match value {
66                ExtensionValue::String(s) => Some(Attr::CeString(s)),
67                ExtensionValue::Boolean(b) => Some(Attr::CeBoolean(b)),
68                ExtensionValue::Integer(i) => Some(Attr::CeInteger(i as i32)),
69            };
70            if let Some(attr) = attr {
71                attributes.insert(key, CloudEventAttributeValue { attr: Some(attr) });
72            }
73        }
74
75        // Handle data
76        let data = event.data.map(|d| match d {
77            EventData::Binary(bytes) => proto_ce::cloud_event::Data::BinaryData(bytes),
78            EventData::String(s) => proto_ce::cloud_event::Data::TextData(s),
79        });
80
81        proto_ce::CloudEvent {
82            spec_version: "1.0".to_string(),
83            id: event.id,
84            source: event.source,
85            r#type: event.event_type,
86            attributes,
87            data,
88        }
89    }
90}
91
92// =============================================================================
93// ProspectiveEvent: Domain -> Proto
94// =============================================================================
95
96impl From<ProspectiveEvent> for proto_ce::CloudEvent {
97    fn from(event: ProspectiveEvent) -> Self {
98        use proto_ce::cloud_event::cloud_event_attribute_value::Attr;
99        use proto_ce::cloud_event::CloudEventAttributeValue;
100
101        let mut attributes = std::collections::HashMap::new();
102
103        // Add optional attributes
104        if let Some(subject) = event.subject {
105            attributes.insert(
106                "subject".to_string(),
107                CloudEventAttributeValue {
108                    attr: Some(Attr::CeString(subject)),
109                },
110            );
111        }
112
113        if let Some(time) = event.time {
114            attributes.insert(
115                "time".to_string(),
116                CloudEventAttributeValue {
117                    attr: Some(Attr::CeTimestamp(prost_types::Timestamp {
118                        seconds: time.timestamp(),
119                        nanos: time.timestamp_subsec_nanos() as i32,
120                    })),
121                },
122            );
123        }
124
125        if let Some(datacontenttype) = event.datacontenttype {
126            attributes.insert(
127                "datacontenttype".to_string(),
128                CloudEventAttributeValue {
129                    attr: Some(Attr::CeString(datacontenttype)),
130                },
131            );
132        }
133
134        if let Some(dataschema) = event.dataschema {
135            attributes.insert(
136                "dataschema".to_string(),
137                CloudEventAttributeValue {
138                    attr: Some(Attr::CeUri(dataschema)),
139                },
140            );
141        }
142
143        // Add extension attributes
144        for (key, value) in event.extensions {
145            let attr = match value {
146                ExtensionValue::String(s) => Some(Attr::CeString(s)),
147                ExtensionValue::Boolean(b) => Some(Attr::CeBoolean(b)),
148                ExtensionValue::Integer(i) => Some(Attr::CeInteger(i as i32)),
149            };
150            if let Some(attr) = attr {
151                attributes.insert(key, CloudEventAttributeValue { attr: Some(attr) });
152            }
153        }
154
155        // Handle data
156        let data = event.data.map(|d| match d {
157            EventData::Binary(bytes) => proto_ce::cloud_event::Data::BinaryData(bytes),
158            EventData::String(s) => proto_ce::cloud_event::Data::TextData(s),
159        });
160
161        proto_ce::CloudEvent {
162            spec_version: "1.0".to_string(),
163            id: event.id,
164            source: event.stream, // ProspectiveEvent uses stream as source
165            r#type: event.event_type,
166            attributes,
167            data,
168        }
169    }
170}
171
172// =============================================================================
173// Event: Proto -> Domain
174// =============================================================================
175
176impl TryFrom<proto_ce::CloudEvent> for Event {
177    type Error = ConversionError;
178
179    fn try_from(proto: proto_ce::CloudEvent) -> Result<Self, Self::Error> {
180        use proto_ce::cloud_event::cloud_event_attribute_value::Attr;
181
182        let mut subject = None;
183        let mut time = None;
184        let mut datacontenttype = None;
185        let mut dataschema = None;
186        let mut extensions = Vec::new();
187
188        // Process attributes
189        for (key, value) in proto.attributes {
190            if let Some(attr) = value.attr {
191                match (key.as_str(), attr) {
192                    ("subject", Attr::CeString(s)) => {
193                        subject = Some(s);
194                    }
195                    ("time", Attr::CeTimestamp(ts)) => {
196                        time = chrono::DateTime::from_timestamp(ts.seconds, ts.nanos as u32);
197                    }
198                    ("datacontenttype", Attr::CeString(s)) => {
199                        datacontenttype = Some(s);
200                    }
201                    ("dataschema", Attr::CeUri(s)) => {
202                        dataschema = Some(s);
203                    }
204                    (key, Attr::CeString(s)) => {
205                        extensions.push((key.to_string(), ExtensionValue::String(s)));
206                    }
207                    (key, Attr::CeBoolean(b)) => {
208                        extensions.push((key.to_string(), ExtensionValue::Boolean(b)));
209                    }
210                    (key, Attr::CeInteger(i)) => {
211                        extensions.push((key.to_string(), ExtensionValue::Integer(i as i64)));
212                    }
213                    _ => {}
214                }
215            }
216        }
217
218        // Handle data
219        let data = proto.data.map(|d| match d {
220            proto_ce::cloud_event::Data::BinaryData(bytes) => EventData::Binary(bytes),
221            proto_ce::cloud_event::Data::TextData(text) => EventData::String(text),
222            proto_ce::cloud_event::Data::ProtoData(_) => {
223                // Proto data not directly supported - treat as binary
224                EventData::Binary(vec![])
225            }
226        });
227
228        Ok(Event {
229            id: proto.id,
230            source: proto.source,
231            event_type: proto.r#type,
232            subject,
233            data,
234            time,
235            datacontenttype,
236            dataschema,
237            extensions,
238        })
239    }
240}
241
242// =============================================================================
243// Transaction and TransactionSummary: Proto -> Domain
244// =============================================================================
245
246impl TryFrom<proto::Transaction> for Transaction {
247    type Error = ConversionError;
248
249    fn try_from(proto: proto::Transaction) -> Result<Self, Self::Error> {
250        let event_count = proto.events.len();
251        let events: Result<Vec<Event>, _> = proto.events.into_iter().map(Event::try_from).collect();
252
253        Ok(Transaction {
254            events: events?,
255            summary: TransactionSummary {
256                transaction_id: if proto.id.is_empty() {
257                    None
258                } else {
259                    Some(proto.id)
260                },
261                revision: proto.basis, // Note: basis in proto is the revision after the transaction
262                event_count,
263            },
264        })
265    }
266}
267
268impl From<proto::TransactionSummary> for TransactionSummary {
269    fn from(proto: proto::TransactionSummary) -> Self {
270        TransactionSummary {
271            transaction_id: if proto.id.is_empty() {
272                None
273            } else {
274                Some(proto.id)
275            },
276            revision: proto.revision,
277            event_count: 0, // Not available in proto summary, will be filled elsewhere
278        }
279    }
280}
281
282// =============================================================================
283// CloudEvents SDK compatibility (feature-gated)
284// =============================================================================
285//
286// Note: We use standalone functions instead of From/TryFrom trait impls because
287// both Event/ProspectiveEvent (from evidentsource_core) and cloudevents::Event
288// (from cloudevents-sdk) are external types, which would violate Rust's orphan rules.
289
290#[cfg(feature = "cloudevents")]
291use cloudevents::{AttributesReader, Data, EventBuilder, EventBuilderV10};
292
293/// Convert an Event to a cloudevents::Event.
294#[cfg(feature = "cloudevents")]
295pub fn event_to_cloudevent(event: Event) -> cloudevents::Event {
296    let mut builder = EventBuilderV10::new()
297        .id(event.id)
298        .source(event.source)
299        .ty(event.event_type);
300
301    if let Some(subject) = event.subject {
302        builder = builder.subject(subject);
303    }
304
305    if let Some(time) = event.time {
306        builder = builder.time(time);
307    }
308
309    // Add extension attributes
310    for (key, value) in event.extensions {
311        builder = match value {
312            ExtensionValue::String(s) => builder.extension(&key, s),
313            ExtensionValue::Boolean(b) => builder.extension(&key, b),
314            ExtensionValue::Integer(i) => builder.extension(&key, i),
315        };
316    }
317
318    // Add data
319    if let Some(data) = event.data {
320        let content_type = event
321            .datacontenttype
322            .unwrap_or_else(|| "application/octet-stream".to_string());
323        builder = match data {
324            EventData::Binary(bytes) => builder.data(content_type, bytes),
325            EventData::String(s) => builder.data(content_type, s),
326        };
327    }
328
329    builder
330        .build()
331        .expect("Failed to build CloudEvent from Event")
332}
333
334/// Convert a ProspectiveEvent to a cloudevents::Event.
335#[cfg(feature = "cloudevents")]
336pub fn prospective_event_to_cloudevent(event: ProspectiveEvent) -> cloudevents::Event {
337    let mut builder = EventBuilderV10::new()
338        .id(event.id)
339        .source(event.stream) // ProspectiveEvent uses stream as source
340        .ty(event.event_type);
341
342    if let Some(subject) = event.subject {
343        builder = builder.subject(subject);
344    }
345
346    if let Some(time) = event.time {
347        builder = builder.time(time);
348    }
349
350    // Add extension attributes
351    for (key, value) in event.extensions {
352        builder = match value {
353            ExtensionValue::String(s) => builder.extension(&key, s),
354            ExtensionValue::Boolean(b) => builder.extension(&key, b),
355            ExtensionValue::Integer(i) => builder.extension(&key, i),
356        };
357    }
358
359    // Add data
360    if let Some(data) = event.data {
361        let content_type = event
362            .datacontenttype
363            .unwrap_or_else(|| "application/octet-stream".to_string());
364        builder = match data {
365            EventData::Binary(bytes) => builder.data(content_type, bytes),
366            EventData::String(s) => builder.data(content_type, s),
367        };
368    }
369
370    builder
371        .build()
372        .expect("Failed to build CloudEvent from ProspectiveEvent")
373}
374
375/// Convert a cloudevents::Event to an Event.
376#[cfg(feature = "cloudevents")]
377pub fn cloudevent_to_event(ce: cloudevents::Event) -> Result<Event, ConversionError> {
378    let mut extensions = Vec::new();
379
380    // Extract extension attributes
381    for (key, value) in ce.iter_extensions() {
382        let ext_value = match value {
383            cloudevents::event::ExtensionValue::String(s) => ExtensionValue::String(s.to_string()),
384            cloudevents::event::ExtensionValue::Boolean(b) => ExtensionValue::Boolean(*b),
385            cloudevents::event::ExtensionValue::Integer(i) => ExtensionValue::Integer(*i),
386        };
387        extensions.push((key.to_string(), ext_value));
388    }
389
390    // Extract data
391    let (data, datacontenttype) = match ce.data() {
392        Some(Data::Binary(bytes)) => (
393            Some(EventData::Binary(bytes.clone())),
394            ce.datacontenttype().map(|s| s.to_string()),
395        ),
396        Some(Data::String(s)) => (
397            Some(EventData::String(s.clone())),
398            ce.datacontenttype().map(|s| s.to_string()),
399        ),
400        Some(Data::Json(v)) => (
401            Some(EventData::String(v.to_string())),
402            Some("application/json".to_string()),
403        ),
404        None => (None, ce.datacontenttype().map(|s| s.to_string())),
405    };
406
407    Ok(Event {
408        id: ce.id().to_string(),
409        source: ce.source().to_string(),
410        event_type: ce.ty().to_string(),
411        subject: ce.subject().map(|s| s.to_string()),
412        data,
413        time: ce.time().copied(),
414        datacontenttype,
415        dataschema: ce.dataschema().map(|u| u.to_string()),
416        extensions,
417    })
418}
419
420/// Convert a cloudevents::Event to a ProspectiveEvent.
421#[cfg(feature = "cloudevents")]
422pub fn cloudevent_to_prospective_event(
423    ce: cloudevents::Event,
424) -> Result<ProspectiveEvent, ConversionError> {
425    let mut extensions = Vec::new();
426
427    // Extract extension attributes
428    for (key, value) in ce.iter_extensions() {
429        let ext_value = match value {
430            cloudevents::event::ExtensionValue::String(s) => ExtensionValue::String(s.to_string()),
431            cloudevents::event::ExtensionValue::Boolean(b) => ExtensionValue::Boolean(*b),
432            cloudevents::event::ExtensionValue::Integer(i) => ExtensionValue::Integer(*i),
433        };
434        extensions.push((key.to_string(), ext_value));
435    }
436
437    // Extract data
438    let (data, datacontenttype) = match ce.data() {
439        Some(Data::Binary(bytes)) => (
440            Some(EventData::Binary(bytes.clone())),
441            ce.datacontenttype().map(|s| s.to_string()),
442        ),
443        Some(Data::String(s)) => (
444            Some(EventData::String(s.clone())),
445            ce.datacontenttype().map(|s| s.to_string()),
446        ),
447        Some(Data::Json(v)) => (
448            Some(EventData::String(v.to_string())),
449            Some("application/json".to_string()),
450        ),
451        None => (None, ce.datacontenttype().map(|s| s.to_string())),
452    };
453
454    Ok(ProspectiveEvent {
455        id: ce.id().to_string(),
456        stream: ce.source().to_string(), // source becomes stream
457        event_type: ce.ty().to_string(),
458        subject: ce.subject().map(|s| s.to_string()),
459        data,
460        time: ce.time().copied(),
461        datacontenttype,
462        dataschema: ce.dataschema().map(|u| u.to_string()),
463        extensions,
464    })
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470
471    #[test]
472    fn test_event_to_proto_roundtrip() {
473        let event = Event {
474            id: "test-id".to_string(),
475            source: "https://test.local/db/test/streams/test-stream".to_string(),
476            event_type: "test.type".to_string(),
477            subject: Some("test-subject".to_string()),
478            data: Some(EventData::String(r#"{"key": "value"}"#.to_string())),
479            time: None,
480            datacontenttype: Some("application/json".to_string()),
481            dataschema: None,
482            extensions: vec![],
483        };
484
485        let proto: proto_ce::CloudEvent = event.clone().into();
486        let back: Event = proto.try_into().unwrap();
487
488        assert_eq!(event.id, back.id);
489        assert_eq!(event.source, back.source);
490        assert_eq!(event.event_type, back.event_type);
491        assert_eq!(event.subject, back.subject);
492    }
493
494    #[test]
495    fn test_prospective_event_to_proto() {
496        let event = ProspectiveEvent {
497            id: "test-id".to_string(),
498            stream: "test-stream".to_string(),
499            event_type: "test.type".to_string(),
500            subject: Some("test-subject".to_string()),
501            data: Some(EventData::String(r#"{"key": "value"}"#.to_string())),
502            time: None,
503            datacontenttype: Some("application/json".to_string()),
504            dataschema: None,
505            extensions: vec![],
506        };
507
508        let proto: proto_ce::CloudEvent = event.into();
509
510        assert_eq!(proto.id, "test-id");
511        assert_eq!(proto.source, "test-stream"); // stream becomes source
512        assert_eq!(proto.r#type, "test.type");
513    }
514
515    #[cfg(feature = "cloudevents")]
516    mod cloudevents_tests {
517        use super::*;
518        use cloudevents::{AttributesReader, EventBuilder, EventBuilderV10};
519
520        #[test]
521        fn test_event_to_cloudevents_sdk() {
522            let event = Event {
523                id: "test-id".to_string(),
524                source: "https://test.local/db/test/streams/test-stream".to_string(),
525                event_type: "test.type".to_string(),
526                subject: Some("test-subject".to_string()),
527                data: Some(EventData::String(r#"{"key": "value"}"#.to_string())),
528                time: None,
529                datacontenttype: Some("application/json".to_string()),
530                dataschema: None,
531                extensions: vec![],
532            };
533
534            let ce = event_to_cloudevent(event);
535
536            assert_eq!(ce.id(), "test-id");
537            assert_eq!(
538                ce.source(),
539                "https://test.local/db/test/streams/test-stream"
540            );
541            assert_eq!(ce.ty(), "test.type");
542            assert_eq!(ce.subject(), Some("test-subject"));
543        }
544
545        #[test]
546        fn test_cloudevents_sdk_to_event() {
547            let ce = EventBuilderV10::new()
548                .id("test-id")
549                .source("https://test.local/db/test/streams/test-stream")
550                .ty("test.type")
551                .subject("test-subject")
552                .data("application/json", r#"{"key": "value"}"#)
553                .build()
554                .unwrap();
555
556            let event = cloudevent_to_event(ce).unwrap();
557
558            assert_eq!(event.id, "test-id");
559            assert_eq!(
560                event.source,
561                "https://test.local/db/test/streams/test-stream"
562            );
563            assert_eq!(event.event_type, "test.type");
564            assert_eq!(event.subject, Some("test-subject".to_string()));
565        }
566    }
567}