cloudevents/binding/http/
deserializer.rs

1use super::{Headers, SPEC_VERSION_HEADER};
2use crate::{
3    binding::CLOUDEVENTS_JSON_HEADER,
4    event::SpecVersion,
5    header_value_to_str, message,
6    message::{
7        BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
8        Result, StructuredDeserializer, StructuredSerializer,
9    },
10};
11
12use http;
13use std::convert::TryFrom;
14
15pub struct Deserializer<'a, T: Headers<'a>> {
16    headers: &'a T,
17    body: Vec<u8>,
18}
19
20impl<'a, T: Headers<'a>> Deserializer<'a, T> {
21    pub fn new(headers: &'a T, body: Vec<u8>) -> Deserializer<'a, T> {
22        Deserializer { headers, body }
23    }
24}
25
26impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
27    fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
28        if self.encoding() != Encoding::BINARY {
29            return Err(message::Error::WrongEncoding {});
30        }
31
32        let spec_version = SpecVersion::try_from(
33            self.headers
34                .get(SPEC_VERSION_HEADER)
35                .map(|a| header_value_to_str!(a))
36                .unwrap()?,
37        )?;
38
39        let attributes = spec_version.attribute_names();
40
41        visitor = visitor.set_spec_version(spec_version)?;
42
43        for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
44            let key = hn.as_str();
45            SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
46        }) {
47            let name = &hn.as_str()["ce-".len()..];
48
49            if attributes.contains(&name) {
50                visitor = visitor.set_attribute(
51                    name,
52                    MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
53                )?
54            } else {
55                visitor = visitor.set_extension(
56                    name,
57                    MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
58                )?
59            }
60        }
61
62        if let Some(hv) = self.headers.get(http::header::CONTENT_TYPE) {
63            visitor = visitor.set_attribute(
64                "datacontenttype",
65                MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
66            )?
67        }
68
69        if !self.body.is_empty() {
70            visitor.end_with_data(self.body)
71        } else {
72            visitor.end()
73        }
74    }
75}
76
77impl<'a, T: Headers<'a>> StructuredDeserializer for Deserializer<'a, T> {
78    fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
79        if self.encoding() != Encoding::STRUCTURED {
80            return Err(message::Error::WrongEncoding {});
81        }
82        visitor.set_structured_event(self.body)
83    }
84}
85
86impl<'a, T: Headers<'a>> MessageDeserializer for Deserializer<'a, T> {
87    fn encoding(&self) -> Encoding {
88        if self
89            .headers
90            .get(http::header::CONTENT_TYPE)
91            .and_then(|v| v.to_str().ok())
92            .filter(|&v| v.starts_with(CLOUDEVENTS_JSON_HEADER))
93            .is_some()
94        {
95            Encoding::STRUCTURED
96        } else if self.headers.get(SPEC_VERSION_HEADER).is_some() {
97            Encoding::BINARY
98        } else {
99            Encoding::UNKNOWN
100        }
101    }
102}