cloudevents/binding/actix/
server_request.rs

1use crate::binding::http_0_2::{to_event, Headers};
2use crate::Event;
3use actix_web::dev::Payload;
4use actix_web::web::BytesMut;
5use actix_web::{web, HttpRequest};
6use async_trait::async_trait;
7use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
8use http::header::{AsHeaderName, HeaderName, HeaderValue};
9use http_0_2 as http;
10
11/// Implement Headers for the actix HeaderMap
12impl<'a> Headers<'a> for actix_http::header::HeaderMap {
13    type Iterator = Box<dyn Iterator<Item = (&'a HeaderName, &'a HeaderValue)> + 'a>;
14    fn get<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
15        self.get(key.as_str())
16    }
17    fn iter(&'a self) -> Self::Iterator {
18        Box::new(self.iter())
19    }
20}
21
22/// Method to transform an incoming [`HttpRequest`] to [`Event`].
23pub async fn request_to_event(
24    req: &HttpRequest,
25    mut payload: web::Payload,
26) -> std::result::Result<Event, actix_web::error::Error> {
27    let mut bytes = BytesMut::new();
28    while let Some(item) = payload.next().await {
29        bytes.extend_from_slice(&item?);
30    }
31    to_event(req.headers(), bytes.to_vec()).map_err(actix_web::error::ErrorBadRequest)
32}
33
34/// So that an actix-web handler may take an Event parameter
35impl actix_web::FromRequest for Event {
36    type Error = actix_web::Error;
37    type Future = LocalBoxFuture<'static, std::result::Result<Self, Self::Error>>;
38
39    fn from_request(r: &HttpRequest, p: &mut Payload) -> Self::Future {
40        let request = r.to_owned();
41        bytes::Bytes::from_request(&request, p)
42            .map(move |bytes| match bytes {
43                Ok(b) => to_event(request.headers(), b.to_vec())
44                    .map_err(actix_web::error::ErrorBadRequest),
45                Err(e) => Err(e),
46            })
47            .boxed_local()
48    }
49}
50
51/// Extension Trait for [`HttpRequest`] which acts as a wrapper for the function [`request_to_event()`].
52///
53/// This trait is sealed and cannot be implemented for types outside of this crate.
54#[async_trait(?Send)]
55pub trait HttpRequestExt: private::Sealed {
56    /// Convert this [`HttpRequest`] into an [`Event`].
57    async fn to_event(
58        &self,
59        mut payload: web::Payload,
60    ) -> std::result::Result<Event, actix_web::error::Error>;
61}
62
63#[async_trait(?Send)]
64impl HttpRequestExt for HttpRequest {
65    async fn to_event(
66        &self,
67        payload: web::Payload,
68    ) -> std::result::Result<Event, actix_web::error::Error> {
69        request_to_event(self, payload).await
70    }
71}
72
73mod private {
74    // Sealing the RequestExt
75    pub trait Sealed {}
76    impl Sealed for actix_web::HttpRequest {}
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82    use actix_web::{test, FromRequest};
83
84    use crate::test::fixtures;
85    use serde_json::json;
86
87    async fn to_event(req: &HttpRequest, mut payload: Payload) -> Event {
88        web::Payload::from_request(req, &mut payload)
89            .then(|p| req.to_event(p.unwrap()))
90            .await
91            .unwrap()
92    }
93
94    #[actix_rt::test]
95    async fn test_request() {
96        let expected = fixtures::v10::minimal_string_extension();
97
98        let (req, payload) = test::TestRequest::post()
99            .insert_header(("ce-specversion", "1.0"))
100            .insert_header(("ce-id", "0001"))
101            .insert_header(("ce-type", "test_event.test_application"))
102            .insert_header(("ce-source", "http://localhost/"))
103            .insert_header(("ce-someint", "10"))
104            .to_http_parts();
105
106        assert_eq!(expected, to_event(&req, payload).await);
107    }
108
109    #[actix_rt::test]
110    async fn test_request_with_full_data() {
111        let expected = fixtures::v10::full_binary_json_data_string_extension();
112
113        let (req, payload) = test::TestRequest::post()
114            .insert_header(("ce-specversion", "1.0"))
115            .insert_header(("ce-id", "0001"))
116            .insert_header(("ce-type", "test_event.test_application"))
117            .insert_header(("ce-subject", "cloudevents-sdk"))
118            .insert_header(("ce-source", "http://localhost/"))
119            .insert_header(("ce-time", fixtures::time().to_rfc3339()))
120            .insert_header(("ce-string_ex", "val"))
121            .insert_header(("ce-int_ex", "10"))
122            .insert_header(("ce-bool_ex", "true"))
123            .insert_header(("content-type", "application/json"))
124            .set_json(fixtures::json_data())
125            .to_http_parts();
126
127        assert_eq!(expected, to_event(&req, payload).await);
128    }
129
130    #[actix_rt::test]
131    async fn test_structured_request_with_full_data() {
132        let payload = json!({
133            "specversion": "1.0",
134            "id": "0001",
135            "type": "test_event.test_application",
136            "subject": "cloudevents-sdk",
137            "source": "http://localhost/",
138            "time": fixtures::time().to_rfc3339(),
139            "string_ex": "val",
140            "int_ex": "10",
141            "bool_ex": "true",
142            "datacontenttype": "application/json",
143            "data": fixtures::json_data()
144        });
145        let bytes = serde_json::to_string(&payload).expect("Failed to serialize test data to json");
146
147        let expected = fixtures::v10::full_json_data_string_extension();
148
149        let (req, payload) = test::TestRequest::post()
150            .insert_header(("content-type", "application/cloudevents+json"))
151            .set_payload(bytes)
152            .to_http_parts();
153
154        assert_eq!(expected, to_event(&req, payload).await);
155    }
156}