cloudevents/binding/http_0_2/
deserializer.rs1use super::{Headers, SPEC_VERSION_HEADER};
2use crate::{
3 binding::CLOUDEVENTS_JSON_HEADER,
4 event::SpecVersion,
5 header_value_to_str, message,
6 message::{
7 BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
8 Result, StructuredDeserializer, StructuredSerializer,
9 },
10};
11use http_0_2 as http;
12use std::convert::TryFrom;
13
14pub struct Deserializer<'a, T: Headers<'a>> {
15 headers: &'a T,
16 body: Vec<u8>,
17}
18
19impl<'a, T: Headers<'a>> Deserializer<'a, T> {
20 pub fn new(headers: &'a T, body: Vec<u8>) -> Deserializer<'a, T> {
21 Deserializer { headers, body }
22 }
23}
24
25impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
26 fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
27 if self.encoding() != Encoding::BINARY {
28 return Err(message::Error::WrongEncoding {});
29 }
30
31 let spec_version = SpecVersion::try_from(
32 self.headers
33 .get(SPEC_VERSION_HEADER)
34 .map(|a| header_value_to_str!(a))
35 .unwrap()?,
36 )?;
37
38 let attributes = spec_version.attribute_names();
39
40 visitor = visitor.set_spec_version(spec_version)?;
41
42 for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
43 let key = hn.as_str();
44 SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
45 }) {
46 let name = &hn.as_str()["ce-".len()..];
47
48 if attributes.contains(&name) {
49 visitor = visitor.set_attribute(
50 name,
51 MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
52 )?
53 } else {
54 visitor = visitor.set_extension(
55 name,
56 MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
57 )?
58 }
59 }
60
61 if let Some(hv) = self.headers.get(http::header::CONTENT_TYPE) {
62 visitor = visitor.set_attribute(
63 "datacontenttype",
64 MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
65 )?
66 }
67
68 if !self.body.is_empty() {
69 visitor.end_with_data(self.body)
70 } else {
71 visitor.end()
72 }
73 }
74}
75
76impl<'a, T: Headers<'a>> StructuredDeserializer for Deserializer<'a, T> {
77 fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
78 if self.encoding() != Encoding::STRUCTURED {
79 return Err(message::Error::WrongEncoding {});
80 }
81 visitor.set_structured_event(self.body)
82 }
83}
84
85impl<'a, T: Headers<'a>> MessageDeserializer for Deserializer<'a, T> {
86 fn encoding(&self) -> Encoding {
87 if self
88 .headers
89 .get(http::header::CONTENT_TYPE)
90 .and_then(|v| v.to_str().ok())
91 .filter(|&v| v.starts_with(CLOUDEVENTS_JSON_HEADER))
92 .is_some()
93 {
94 Encoding::STRUCTURED
95 } else if self.headers.get(SPEC_VERSION_HEADER).is_some() {
96 Encoding::BINARY
97 } else {
98 Encoding::UNKNOWN
99 }
100 }
101}