cloudevents_sdk_actix_web/
server_request.rs

1use super::headers;
2use actix_web::http::HeaderName;
3use actix_web::web::{Bytes, BytesMut};
4use actix_web::{web, HttpMessage, HttpRequest};
5use async_trait::async_trait;
6use cloudevents::event::SpecVersion;
7use cloudevents::message::{
8    BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
9    Result, StructuredDeserializer, StructuredSerializer,
10};
11use cloudevents::{message, Event};
12use futures::StreamExt;
13use std::convert::TryFrom;
14
15/// Wrapper for [`HttpRequest`] that implements [`MessageDeserializer`] trait.
16pub struct HttpRequestDeserializer<'a> {
17    req: &'a HttpRequest,
18    body: Bytes,
19}
20
21impl HttpRequestDeserializer<'_> {
22    pub fn new(req: &HttpRequest, body: Bytes) -> HttpRequestDeserializer {
23        HttpRequestDeserializer { req, body }
24    }
25}
26
27impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> {
28    fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
29        if self.encoding() != Encoding::BINARY {
30            return Err(message::Error::WrongEncoding {});
31        }
32
33        let spec_version = SpecVersion::try_from(
34            unwrap_optional_header!(self.req.headers(), headers::SPEC_VERSION_HEADER).unwrap()?,
35        )?;
36
37        visitor = visitor.set_spec_version(spec_version.clone())?;
38
39        let attributes = spec_version.attribute_names();
40
41        for (hn, hv) in
42            self.req.headers().iter().filter(|(hn, _)| {
43                headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")
44            })
45        {
46            let name = &hn.as_str()["ce-".len()..];
47
48            if attributes.contains(&name) {
49                visitor = visitor.set_attribute(
50                    name,
51                    MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
52                )?
53            } else {
54                visitor = visitor.set_extension(
55                    name,
56                    MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
57                )?
58            }
59        }
60
61        if let Some(hv) = self.req.headers().get("content-type") {
62            visitor = visitor.set_attribute(
63                "datacontenttype",
64                MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
65            )?
66        }
67
68        if !self.body.is_empty() {
69            visitor.end_with_data(self.body.to_vec())
70        } else {
71            visitor.end()
72        }
73    }
74}
75
76impl<'a> StructuredDeserializer for HttpRequestDeserializer<'a> {
77    fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
78        if self.encoding() != Encoding::STRUCTURED {
79            return Err(message::Error::WrongEncoding {});
80        }
81        visitor.set_structured_event(self.body.to_vec())
82    }
83}
84
85impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> {
86    fn encoding(&self) -> Encoding {
87        if self.req.content_type() == "application/cloudevents+json" {
88            Encoding::STRUCTURED
89        } else if self
90            .req
91            .headers()
92            .get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER)
93            .is_some()
94        {
95            Encoding::BINARY
96        } else {
97            Encoding::UNKNOWN
98        }
99    }
100}
101
102/// Method to transform an incoming [`HttpRequest`] to [`Event`].
103pub async fn request_to_event(
104    req: &HttpRequest,
105    mut payload: web::Payload,
106) -> std::result::Result<Event, actix_web::error::Error> {
107    let mut bytes = BytesMut::new();
108    while let Some(item) = payload.next().await {
109        bytes.extend_from_slice(&item?);
110    }
111    MessageDeserializer::into_event(HttpRequestDeserializer::new(req, bytes.freeze()))
112        .map_err(actix_web::error::ErrorBadRequest)
113}
114
115/// Extention Trait for [`HttpRequest`] which acts as a wrapper for the function [`request_to_event()`].
116///
117/// This trait is sealed and cannot be implemented for types outside of this crate.
118#[async_trait(?Send)]
119pub trait HttpRequestExt: private::Sealed {
120    /// Convert this [`HttpRequest`] into an [`Event`].
121    async fn to_event(
122        &self,
123        mut payload: web::Payload,
124    ) -> std::result::Result<Event, actix_web::error::Error>;
125}
126
127#[async_trait(?Send)]
128impl HttpRequestExt for HttpRequest {
129    async fn to_event(
130        &self,
131        payload: web::Payload,
132    ) -> std::result::Result<Event, actix_web::error::Error> {
133        request_to_event(self, payload).await
134    }
135}
136
137mod private {
138    // Sealing the RequestExt
139    pub trait Sealed {}
140    impl Sealed for actix_web::HttpRequest {}
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use actix_web::test;
147    use url::Url;
148
149    use chrono::Utc;
150    use cloudevents::{EventBuilder, EventBuilderV10};
151    use serde_json::json;
152    use std::str::FromStr;
153
154    #[actix_rt::test]
155    async fn test_request() {
156        let time = Utc::now();
157        let expected = EventBuilderV10::new()
158            .id("0001")
159            .ty("example.test")
160            .source("http://localhost/")
161            //TODO this is required now because the message deserializer implictly set default values
162            // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41)
163            .time(time)
164            .extension("someint", "10")
165            .build()
166            .unwrap();
167
168        let (req, payload) = test::TestRequest::post()
169            .header("ce-specversion", "1.0")
170            .header("ce-id", "0001")
171            .header("ce-type", "example.test")
172            .header("ce-source", "http://localhost/")
173            .header("ce-someint", "10")
174            .header("ce-time", time.to_rfc3339())
175            .to_http_parts();
176
177        let resp = req.to_event(web::Payload(payload)).await.unwrap();
178        assert_eq!(expected, resp);
179    }
180
181    #[actix_rt::test]
182    async fn test_request_with_full_data() {
183        let time = Utc::now();
184        let j = json!({"hello": "world"});
185
186        let expected = EventBuilderV10::new()
187            .id("0001")
188            .ty("example.test")
189            .source(Url::from_str("http://localhost").unwrap())
190            //TODO this is required now because the message deserializer implictly set default values
191            // As soon as this defaulting doesn't happen anymore, we can remove it (Issues #40/#41)
192            .time(time)
193            .data("application/json", j.to_string().into_bytes())
194            .extension("someint", "10")
195            .build()
196            .unwrap();
197
198        let (req, payload) = test::TestRequest::post()
199            .header("ce-specversion", "1.0")
200            .header("ce-id", "0001")
201            .header("ce-type", "example.test")
202            .header("ce-source", "http://localhost")
203            .header("ce-someint", "10")
204            .header("ce-time", time.to_rfc3339())
205            .header("content-type", "application/json")
206            .set_json(&j)
207            .to_http_parts();
208
209        let resp = req.to_event(web::Payload(payload)).await.unwrap();
210        assert_eq!(expected, resp);
211    }
212}