cloudevents/binding/http/
deserializer.rs1use super::{Headers, SPEC_VERSION_HEADER};
2use crate::{
3 binding::CLOUDEVENTS_JSON_HEADER,
4 event::SpecVersion,
5 header_value_to_str, message,
6 message::{
7 BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
8 Result, StructuredDeserializer, StructuredSerializer,
9 },
10};
11
12use http;
13use std::convert::TryFrom;
14
15pub struct Deserializer<'a, T: Headers<'a>> {
16 headers: &'a T,
17 body: Vec<u8>,
18}
19
20impl<'a, T: Headers<'a>> Deserializer<'a, T> {
21 pub fn new(headers: &'a T, body: Vec<u8>) -> Deserializer<'a, T> {
22 Deserializer { headers, body }
23 }
24}
25
26impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
27 fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
28 if self.encoding() != Encoding::BINARY {
29 return Err(message::Error::WrongEncoding {});
30 }
31
32 let spec_version = SpecVersion::try_from(
33 self.headers
34 .get(SPEC_VERSION_HEADER)
35 .map(|a| header_value_to_str!(a))
36 .unwrap()?,
37 )?;
38
39 let attributes = spec_version.attribute_names();
40
41 visitor = visitor.set_spec_version(spec_version)?;
42
43 for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
44 let key = hn.as_str();
45 SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
46 }) {
47 let name = &hn.as_str()["ce-".len()..];
48
49 if attributes.contains(&name) {
50 visitor = visitor.set_attribute(
51 name,
52 MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
53 )?
54 } else {
55 visitor = visitor.set_extension(
56 name,
57 MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
58 )?
59 }
60 }
61
62 if let Some(hv) = self.headers.get(http::header::CONTENT_TYPE) {
63 visitor = visitor.set_attribute(
64 "datacontenttype",
65 MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
66 )?
67 }
68
69 if !self.body.is_empty() {
70 visitor.end_with_data(self.body)
71 } else {
72 visitor.end()
73 }
74 }
75}
76
77impl<'a, T: Headers<'a>> StructuredDeserializer for Deserializer<'a, T> {
78 fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
79 if self.encoding() != Encoding::STRUCTURED {
80 return Err(message::Error::WrongEncoding {});
81 }
82 visitor.set_structured_event(self.body)
83 }
84}
85
86impl<'a, T: Headers<'a>> MessageDeserializer for Deserializer<'a, T> {
87 fn encoding(&self) -> Encoding {
88 if self
89 .headers
90 .get(http::header::CONTENT_TYPE)
91 .and_then(|v| v.to_str().ok())
92 .filter(|&v| v.starts_with(CLOUDEVENTS_JSON_HEADER))
93 .is_some()
94 {
95 Encoding::STRUCTURED
96 } else if self.headers.get(SPEC_VERSION_HEADER).is_some() {
97 Encoding::BINARY
98 } else {
99 Encoding::UNKNOWN
100 }
101 }
102}