cloudevents/binding/http_0_2/
deserializer.rs

1use super::{Headers, SPEC_VERSION_HEADER};
2use crate::{
3    binding::CLOUDEVENTS_JSON_HEADER,
4    event::SpecVersion,
5    header_value_to_str, message,
6    message::{
7        BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
8        Result, StructuredDeserializer, StructuredSerializer,
9    },
10};
11use http_0_2 as http;
12use std::convert::TryFrom;
13
14pub struct Deserializer<'a, T: Headers<'a>> {
15    headers: &'a T,
16    body: Vec<u8>,
17}
18
19impl<'a, T: Headers<'a>> Deserializer<'a, T> {
20    pub fn new(headers: &'a T, body: Vec<u8>) -> Deserializer<'a, T> {
21        Deserializer { headers, body }
22    }
23}
24
25impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
26    fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
27        if self.encoding() != Encoding::BINARY {
28            return Err(message::Error::WrongEncoding {});
29        }
30
31        let spec_version = SpecVersion::try_from(
32            self.headers
33                .get(SPEC_VERSION_HEADER)
34                .map(|a| header_value_to_str!(a))
35                .unwrap()?,
36        )?;
37
38        let attributes = spec_version.attribute_names();
39
40        visitor = visitor.set_spec_version(spec_version)?;
41
42        for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
43            let key = hn.as_str();
44            SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
45        }) {
46            let name = &hn.as_str()["ce-".len()..];
47
48            if attributes.contains(&name) {
49                visitor = visitor.set_attribute(
50                    name,
51                    MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
52                )?
53            } else {
54                visitor = visitor.set_extension(
55                    name,
56                    MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
57                )?
58            }
59        }
60
61        if let Some(hv) = self.headers.get(http::header::CONTENT_TYPE) {
62            visitor = visitor.set_attribute(
63                "datacontenttype",
64                MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
65            )?
66        }
67
68        if !self.body.is_empty() {
69            visitor.end_with_data(self.body)
70        } else {
71            visitor.end()
72        }
73    }
74}
75
76impl<'a, T: Headers<'a>> StructuredDeserializer for Deserializer<'a, T> {
77    fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
78        if self.encoding() != Encoding::STRUCTURED {
79            return Err(message::Error::WrongEncoding {});
80        }
81        visitor.set_structured_event(self.body)
82    }
83}
84
85impl<'a, T: Headers<'a>> MessageDeserializer for Deserializer<'a, T> {
86    fn encoding(&self) -> Encoding {
87        if self
88            .headers
89            .get(http::header::CONTENT_TYPE)
90            .and_then(|v| v.to_str().ok())
91            .filter(|&v| v.starts_with(CLOUDEVENTS_JSON_HEADER))
92            .is_some()
93        {
94            Encoding::STRUCTURED
95        } else if self.headers.get(SPEC_VERSION_HEADER).is_some() {
96            Encoding::BINARY
97        } else {
98            Encoding::UNKNOWN
99        }
100    }
101}