1use evidentsource_core::domain::{
4 Event, EventData, ExtensionValue, ProspectiveEvent, Transaction, TransactionSummary,
5};
6
7use crate::com::evidentsource as proto;
8use crate::io::cloudevents::v1 as proto_ce;
9
10use super::error::ConversionError;
11
12impl From<Event> for proto_ce::CloudEvent {
17 fn from(event: Event) -> Self {
18 use proto_ce::cloud_event::cloud_event_attribute_value::Attr;
19 use proto_ce::cloud_event::CloudEventAttributeValue;
20
21 let mut attributes = std::collections::HashMap::new();
22
23 if let Some(subject) = event.subject {
25 attributes.insert(
26 "subject".to_string(),
27 CloudEventAttributeValue {
28 attr: Some(Attr::CeString(subject)),
29 },
30 );
31 }
32
33 if let Some(time) = event.time {
34 attributes.insert(
35 "time".to_string(),
36 CloudEventAttributeValue {
37 attr: Some(Attr::CeTimestamp(prost_types::Timestamp {
38 seconds: time.timestamp(),
39 nanos: time.timestamp_subsec_nanos() as i32,
40 })),
41 },
42 );
43 }
44
45 if let Some(datacontenttype) = event.datacontenttype {
46 attributes.insert(
47 "datacontenttype".to_string(),
48 CloudEventAttributeValue {
49 attr: Some(Attr::CeString(datacontenttype)),
50 },
51 );
52 }
53
54 if let Some(dataschema) = event.dataschema {
55 attributes.insert(
56 "dataschema".to_string(),
57 CloudEventAttributeValue {
58 attr: Some(Attr::CeUri(dataschema)),
59 },
60 );
61 }
62
63 for (key, value) in event.extensions {
65 let attr = match value {
66 ExtensionValue::String(s) => Some(Attr::CeString(s)),
67 ExtensionValue::Boolean(b) => Some(Attr::CeBoolean(b)),
68 ExtensionValue::Integer(i) => Some(Attr::CeInteger(i as i32)),
69 };
70 if let Some(attr) = attr {
71 attributes.insert(key, CloudEventAttributeValue { attr: Some(attr) });
72 }
73 }
74
75 let data = event.data.map(|d| match d {
77 EventData::Binary(bytes) => proto_ce::cloud_event::Data::BinaryData(bytes),
78 EventData::String(s) => proto_ce::cloud_event::Data::TextData(s),
79 });
80
81 proto_ce::CloudEvent {
82 spec_version: "1.0".to_string(),
83 id: event.id,
84 source: event.source,
85 r#type: event.event_type,
86 attributes,
87 data,
88 }
89 }
90}
91
92impl From<ProspectiveEvent> for proto_ce::CloudEvent {
97 fn from(event: ProspectiveEvent) -> Self {
98 use proto_ce::cloud_event::cloud_event_attribute_value::Attr;
99 use proto_ce::cloud_event::CloudEventAttributeValue;
100
101 let mut attributes = std::collections::HashMap::new();
102
103 if let Some(subject) = event.subject {
105 attributes.insert(
106 "subject".to_string(),
107 CloudEventAttributeValue {
108 attr: Some(Attr::CeString(subject)),
109 },
110 );
111 }
112
113 if let Some(time) = event.time {
114 attributes.insert(
115 "time".to_string(),
116 CloudEventAttributeValue {
117 attr: Some(Attr::CeTimestamp(prost_types::Timestamp {
118 seconds: time.timestamp(),
119 nanos: time.timestamp_subsec_nanos() as i32,
120 })),
121 },
122 );
123 }
124
125 if let Some(datacontenttype) = event.datacontenttype {
126 attributes.insert(
127 "datacontenttype".to_string(),
128 CloudEventAttributeValue {
129 attr: Some(Attr::CeString(datacontenttype)),
130 },
131 );
132 }
133
134 if let Some(dataschema) = event.dataschema {
135 attributes.insert(
136 "dataschema".to_string(),
137 CloudEventAttributeValue {
138 attr: Some(Attr::CeUri(dataschema)),
139 },
140 );
141 }
142
143 for (key, value) in event.extensions {
145 let attr = match value {
146 ExtensionValue::String(s) => Some(Attr::CeString(s)),
147 ExtensionValue::Boolean(b) => Some(Attr::CeBoolean(b)),
148 ExtensionValue::Integer(i) => Some(Attr::CeInteger(i as i32)),
149 };
150 if let Some(attr) = attr {
151 attributes.insert(key, CloudEventAttributeValue { attr: Some(attr) });
152 }
153 }
154
155 let data = event.data.map(|d| match d {
157 EventData::Binary(bytes) => proto_ce::cloud_event::Data::BinaryData(bytes),
158 EventData::String(s) => proto_ce::cloud_event::Data::TextData(s),
159 });
160
161 proto_ce::CloudEvent {
162 spec_version: "1.0".to_string(),
163 id: event.id,
164 source: event.stream, r#type: event.event_type,
166 attributes,
167 data,
168 }
169 }
170}
171
172impl TryFrom<proto_ce::CloudEvent> for Event {
177 type Error = ConversionError;
178
179 fn try_from(proto: proto_ce::CloudEvent) -> Result<Self, Self::Error> {
180 use proto_ce::cloud_event::cloud_event_attribute_value::Attr;
181
182 let mut subject = None;
183 let mut time = None;
184 let mut datacontenttype = None;
185 let mut dataschema = None;
186 let mut extensions = Vec::new();
187
188 for (key, value) in proto.attributes {
190 if let Some(attr) = value.attr {
191 match (key.as_str(), attr) {
192 ("subject", Attr::CeString(s)) => {
193 subject = Some(s);
194 }
195 ("time", Attr::CeTimestamp(ts)) => {
196 time = chrono::DateTime::from_timestamp(ts.seconds, ts.nanos as u32);
197 }
198 ("datacontenttype", Attr::CeString(s)) => {
199 datacontenttype = Some(s);
200 }
201 ("dataschema", Attr::CeUri(s)) => {
202 dataschema = Some(s);
203 }
204 (key, Attr::CeString(s)) => {
205 extensions.push((key.to_string(), ExtensionValue::String(s)));
206 }
207 (key, Attr::CeBoolean(b)) => {
208 extensions.push((key.to_string(), ExtensionValue::Boolean(b)));
209 }
210 (key, Attr::CeInteger(i)) => {
211 extensions.push((key.to_string(), ExtensionValue::Integer(i as i64)));
212 }
213 _ => {}
214 }
215 }
216 }
217
218 let data = proto.data.map(|d| match d {
220 proto_ce::cloud_event::Data::BinaryData(bytes) => EventData::Binary(bytes),
221 proto_ce::cloud_event::Data::TextData(text) => EventData::String(text),
222 proto_ce::cloud_event::Data::ProtoData(_) => {
223 EventData::Binary(vec![])
225 }
226 });
227
228 Ok(Event {
229 id: proto.id,
230 source: proto.source,
231 event_type: proto.r#type,
232 subject,
233 data,
234 time,
235 datacontenttype,
236 dataschema,
237 extensions,
238 })
239 }
240}
241
242impl TryFrom<proto::Transaction> for Transaction {
247 type Error = ConversionError;
248
249 fn try_from(proto: proto::Transaction) -> Result<Self, Self::Error> {
250 let event_count = proto.events.len();
251 let events: Result<Vec<Event>, _> = proto.events.into_iter().map(Event::try_from).collect();
252
253 Ok(Transaction {
254 events: events?,
255 summary: TransactionSummary {
256 transaction_id: if proto.id.is_empty() {
257 None
258 } else {
259 Some(proto.id)
260 },
261 revision: proto.basis, event_count,
263 },
264 })
265 }
266}
267
268impl From<proto::TransactionSummary> for TransactionSummary {
269 fn from(proto: proto::TransactionSummary) -> Self {
270 TransactionSummary {
271 transaction_id: if proto.id.is_empty() {
272 None
273 } else {
274 Some(proto.id)
275 },
276 revision: proto.revision,
277 event_count: 0, }
279 }
280}
281
282#[cfg(feature = "cloudevents")]
291use cloudevents::{AttributesReader, Data, EventBuilder, EventBuilderV10};
292
293#[cfg(feature = "cloudevents")]
295pub fn event_to_cloudevent(event: Event) -> cloudevents::Event {
296 let mut builder = EventBuilderV10::new()
297 .id(event.id)
298 .source(event.source)
299 .ty(event.event_type);
300
301 if let Some(subject) = event.subject {
302 builder = builder.subject(subject);
303 }
304
305 if let Some(time) = event.time {
306 builder = builder.time(time);
307 }
308
309 for (key, value) in event.extensions {
311 builder = match value {
312 ExtensionValue::String(s) => builder.extension(&key, s),
313 ExtensionValue::Boolean(b) => builder.extension(&key, b),
314 ExtensionValue::Integer(i) => builder.extension(&key, i),
315 };
316 }
317
318 if let Some(data) = event.data {
320 let content_type = event
321 .datacontenttype
322 .unwrap_or_else(|| "application/octet-stream".to_string());
323 builder = match data {
324 EventData::Binary(bytes) => builder.data(content_type, bytes),
325 EventData::String(s) => builder.data(content_type, s),
326 };
327 }
328
329 builder
330 .build()
331 .expect("Failed to build CloudEvent from Event")
332}
333
334#[cfg(feature = "cloudevents")]
336pub fn prospective_event_to_cloudevent(event: ProspectiveEvent) -> cloudevents::Event {
337 let mut builder = EventBuilderV10::new()
338 .id(event.id)
339 .source(event.stream) .ty(event.event_type);
341
342 if let Some(subject) = event.subject {
343 builder = builder.subject(subject);
344 }
345
346 if let Some(time) = event.time {
347 builder = builder.time(time);
348 }
349
350 for (key, value) in event.extensions {
352 builder = match value {
353 ExtensionValue::String(s) => builder.extension(&key, s),
354 ExtensionValue::Boolean(b) => builder.extension(&key, b),
355 ExtensionValue::Integer(i) => builder.extension(&key, i),
356 };
357 }
358
359 if let Some(data) = event.data {
361 let content_type = event
362 .datacontenttype
363 .unwrap_or_else(|| "application/octet-stream".to_string());
364 builder = match data {
365 EventData::Binary(bytes) => builder.data(content_type, bytes),
366 EventData::String(s) => builder.data(content_type, s),
367 };
368 }
369
370 builder
371 .build()
372 .expect("Failed to build CloudEvent from ProspectiveEvent")
373}
374
375#[cfg(feature = "cloudevents")]
377pub fn cloudevent_to_event(ce: cloudevents::Event) -> Result<Event, ConversionError> {
378 let mut extensions = Vec::new();
379
380 for (key, value) in ce.iter_extensions() {
382 let ext_value = match value {
383 cloudevents::event::ExtensionValue::String(s) => ExtensionValue::String(s.to_string()),
384 cloudevents::event::ExtensionValue::Boolean(b) => ExtensionValue::Boolean(*b),
385 cloudevents::event::ExtensionValue::Integer(i) => ExtensionValue::Integer(*i),
386 };
387 extensions.push((key.to_string(), ext_value));
388 }
389
390 let (data, datacontenttype) = match ce.data() {
392 Some(Data::Binary(bytes)) => (
393 Some(EventData::Binary(bytes.clone())),
394 ce.datacontenttype().map(|s| s.to_string()),
395 ),
396 Some(Data::String(s)) => (
397 Some(EventData::String(s.clone())),
398 ce.datacontenttype().map(|s| s.to_string()),
399 ),
400 Some(Data::Json(v)) => (
401 Some(EventData::String(v.to_string())),
402 Some("application/json".to_string()),
403 ),
404 None => (None, ce.datacontenttype().map(|s| s.to_string())),
405 };
406
407 Ok(Event {
408 id: ce.id().to_string(),
409 source: ce.source().to_string(),
410 event_type: ce.ty().to_string(),
411 subject: ce.subject().map(|s| s.to_string()),
412 data,
413 time: ce.time().copied(),
414 datacontenttype,
415 dataschema: ce.dataschema().map(|u| u.to_string()),
416 extensions,
417 })
418}
419
420#[cfg(feature = "cloudevents")]
422pub fn cloudevent_to_prospective_event(
423 ce: cloudevents::Event,
424) -> Result<ProspectiveEvent, ConversionError> {
425 let mut extensions = Vec::new();
426
427 for (key, value) in ce.iter_extensions() {
429 let ext_value = match value {
430 cloudevents::event::ExtensionValue::String(s) => ExtensionValue::String(s.to_string()),
431 cloudevents::event::ExtensionValue::Boolean(b) => ExtensionValue::Boolean(*b),
432 cloudevents::event::ExtensionValue::Integer(i) => ExtensionValue::Integer(*i),
433 };
434 extensions.push((key.to_string(), ext_value));
435 }
436
437 let (data, datacontenttype) = match ce.data() {
439 Some(Data::Binary(bytes)) => (
440 Some(EventData::Binary(bytes.clone())),
441 ce.datacontenttype().map(|s| s.to_string()),
442 ),
443 Some(Data::String(s)) => (
444 Some(EventData::String(s.clone())),
445 ce.datacontenttype().map(|s| s.to_string()),
446 ),
447 Some(Data::Json(v)) => (
448 Some(EventData::String(v.to_string())),
449 Some("application/json".to_string()),
450 ),
451 None => (None, ce.datacontenttype().map(|s| s.to_string())),
452 };
453
454 Ok(ProspectiveEvent {
455 id: ce.id().to_string(),
456 stream: ce.source().to_string(), event_type: ce.ty().to_string(),
458 subject: ce.subject().map(|s| s.to_string()),
459 data,
460 time: ce.time().copied(),
461 datacontenttype,
462 dataschema: ce.dataschema().map(|u| u.to_string()),
463 extensions,
464 })
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470
471 #[test]
472 fn test_event_to_proto_roundtrip() {
473 let event = Event {
474 id: "test-id".to_string(),
475 source: "https://test.local/db/test/streams/test-stream".to_string(),
476 event_type: "test.type".to_string(),
477 subject: Some("test-subject".to_string()),
478 data: Some(EventData::String(r#"{"key": "value"}"#.to_string())),
479 time: None,
480 datacontenttype: Some("application/json".to_string()),
481 dataschema: None,
482 extensions: vec![],
483 };
484
485 let proto: proto_ce::CloudEvent = event.clone().into();
486 let back: Event = proto.try_into().unwrap();
487
488 assert_eq!(event.id, back.id);
489 assert_eq!(event.source, back.source);
490 assert_eq!(event.event_type, back.event_type);
491 assert_eq!(event.subject, back.subject);
492 }
493
494 #[test]
495 fn test_prospective_event_to_proto() {
496 let event = ProspectiveEvent {
497 id: "test-id".to_string(),
498 stream: "test-stream".to_string(),
499 event_type: "test.type".to_string(),
500 subject: Some("test-subject".to_string()),
501 data: Some(EventData::String(r#"{"key": "value"}"#.to_string())),
502 time: None,
503 datacontenttype: Some("application/json".to_string()),
504 dataschema: None,
505 extensions: vec![],
506 };
507
508 let proto: proto_ce::CloudEvent = event.into();
509
510 assert_eq!(proto.id, "test-id");
511 assert_eq!(proto.source, "test-stream"); assert_eq!(proto.r#type, "test.type");
513 }
514
515 #[cfg(feature = "cloudevents")]
516 mod cloudevents_tests {
517 use super::*;
518 use cloudevents::{AttributesReader, EventBuilder, EventBuilderV10};
519
520 #[test]
521 fn test_event_to_cloudevents_sdk() {
522 let event = Event {
523 id: "test-id".to_string(),
524 source: "https://test.local/db/test/streams/test-stream".to_string(),
525 event_type: "test.type".to_string(),
526 subject: Some("test-subject".to_string()),
527 data: Some(EventData::String(r#"{"key": "value"}"#.to_string())),
528 time: None,
529 datacontenttype: Some("application/json".to_string()),
530 dataschema: None,
531 extensions: vec![],
532 };
533
534 let ce = event_to_cloudevent(event);
535
536 assert_eq!(ce.id(), "test-id");
537 assert_eq!(
538 ce.source(),
539 "https://test.local/db/test/streams/test-stream"
540 );
541 assert_eq!(ce.ty(), "test.type");
542 assert_eq!(ce.subject(), Some("test-subject"));
543 }
544
545 #[test]
546 fn test_cloudevents_sdk_to_event() {
547 let ce = EventBuilderV10::new()
548 .id("test-id")
549 .source("https://test.local/db/test/streams/test-stream")
550 .ty("test.type")
551 .subject("test-subject")
552 .data("application/json", r#"{"key": "value"}"#)
553 .build()
554 .unwrap();
555
556 let event = cloudevent_to_event(ce).unwrap();
557
558 assert_eq!(event.id, "test-id");
559 assert_eq!(
560 event.source,
561 "https://test.local/db/test/streams/test-stream"
562 );
563 assert_eq!(event.event_type, "test.type");
564 assert_eq!(event.subject, Some("test-subject".to_string()));
565 }
566 }
567}