cloudevents_sdk_actix_web/
server_response.rs

1use super::headers;
2use actix_web::dev::HttpResponseBuilder;
3use actix_web::http::{HeaderName, HeaderValue};
4use actix_web::HttpResponse;
5use async_trait::async_trait;
6use cloudevents::event::SpecVersion;
7use cloudevents::message::{
8    BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
9};
10use cloudevents::Event;
11use std::str::FromStr;
12
13/// Wrapper for [`HttpResponseBuilder`] that implements [`StructuredSerializer`] and [`BinarySerializer`].
14pub struct HttpResponseSerializer {
15    builder: HttpResponseBuilder,
16}
17
18impl HttpResponseSerializer {
19    pub fn new(builder: HttpResponseBuilder) -> HttpResponseSerializer {
20        HttpResponseSerializer { builder }
21    }
22}
23
24impl BinarySerializer<HttpResponse> for HttpResponseSerializer {
25    fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
26        self.builder.set_header(
27            headers::SPEC_VERSION_HEADER.clone(),
28            str_to_header_value!(spec_version.as_str())?,
29        );
30        Ok(self)
31    }
32
33    fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
34        self.builder.set_header(
35            headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
36            str_to_header_value!(value.to_string().as_str())?,
37        );
38        Ok(self)
39    }
40
41    fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
42        self.builder.set_header(
43            attribute_name_to_header!(name)?,
44            str_to_header_value!(value.to_string().as_str())?,
45        );
46        Ok(self)
47    }
48
49    fn end_with_data(mut self, bytes: Vec<u8>) -> Result<HttpResponse> {
50        Ok(self.builder.body(bytes))
51    }
52
53    fn end(mut self) -> Result<HttpResponse> {
54        Ok(self.builder.finish())
55    }
56}
57
58impl StructuredSerializer<HttpResponse> for HttpResponseSerializer {
59    fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<HttpResponse> {
60        Ok(self
61            .builder
62            .set_header(
63                actix_web::http::header::CONTENT_TYPE,
64                headers::CLOUDEVENTS_JSON_HEADER.clone(),
65            )
66            .body(bytes))
67    }
68}
69
70/// Method to fill an [`HttpResponseBuilder`] with an [`Event`].
71pub async fn event_to_response(
72    event: Event,
73    response: HttpResponseBuilder,
74) -> std::result::Result<HttpResponse, actix_web::error::Error> {
75    BinaryDeserializer::deserialize_binary(event, HttpResponseSerializer::new(response))
76        .map_err(actix_web::error::ErrorBadRequest)
77}
78
79/// Extension Trait for [`HttpResponseBuilder`] which acts as a wrapper for the function [`event_to_response()`].
80///
81/// This trait is sealed and cannot be implemented for types outside of this crate.
82#[async_trait(?Send)]
83pub trait HttpResponseBuilderExt: private::Sealed {
84    /// Fill this [`HttpResponseBuilder`] with an [`Event`].
85    async fn event(
86        self,
87        event: Event,
88    ) -> std::result::Result<HttpResponse, actix_web::error::Error>;
89}
90
91#[async_trait(?Send)]
92impl HttpResponseBuilderExt for HttpResponseBuilder {
93    async fn event(
94        self,
95        event: Event,
96    ) -> std::result::Result<HttpResponse, actix_web::error::Error> {
97        event_to_response(event, self).await
98    }
99}
100
101// Sealing the HttpResponseBuilderExt
102mod private {
103    pub trait Sealed {}
104    impl Sealed for actix_web::dev::HttpResponseBuilder {}
105}
106
107#[cfg(test)]
108mod tests {
109    use super::*;
110    use url::Url;
111
112    use actix_web::http::StatusCode;
113    use actix_web::test;
114    use cloudevents::{EventBuilder, EventBuilderV10};
115    use futures::TryStreamExt;
116    use serde_json::json;
117    use std::str::FromStr;
118
119    #[actix_rt::test]
120    async fn test_response() {
121        let input = EventBuilderV10::new()
122            .id("0001")
123            .ty("example.test")
124            .source(Url::from_str("http://localhost/").unwrap())
125            .extension("someint", "10")
126            .build()
127            .unwrap();
128
129        let resp = HttpResponseBuilder::new(StatusCode::OK)
130            .event(input)
131            .await
132            .unwrap();
133
134        assert_eq!(
135            resp.headers()
136                .get("ce-specversion")
137                .unwrap()
138                .to_str()
139                .unwrap(),
140            "1.0"
141        );
142        assert_eq!(
143            resp.headers().get("ce-id").unwrap().to_str().unwrap(),
144            "0001"
145        );
146        assert_eq!(
147            resp.headers().get("ce-type").unwrap().to_str().unwrap(),
148            "example.test"
149        );
150        assert_eq!(
151            resp.headers().get("ce-source").unwrap().to_str().unwrap(),
152            "http://localhost/"
153        );
154        assert_eq!(
155            resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
156            "10"
157        );
158    }
159
160    #[actix_rt::test]
161    async fn test_response_with_full_data() {
162        let j = json!({"hello": "world"});
163
164        let input = EventBuilderV10::new()
165            .id("0001")
166            .ty("example.test")
167            .source(Url::from_str("http://localhost").unwrap())
168            .data("application/json", j.clone())
169            .extension("someint", "10")
170            .build()
171            .unwrap();
172
173        let mut resp = HttpResponseBuilder::new(StatusCode::OK)
174            .event(input)
175            .await
176            .unwrap();
177
178        assert_eq!(
179            resp.headers()
180                .get("ce-specversion")
181                .unwrap()
182                .to_str()
183                .unwrap(),
184            "1.0"
185        );
186        assert_eq!(
187            resp.headers().get("ce-id").unwrap().to_str().unwrap(),
188            "0001"
189        );
190        assert_eq!(
191            resp.headers().get("ce-type").unwrap().to_str().unwrap(),
192            "example.test"
193        );
194        assert_eq!(
195            resp.headers().get("ce-source").unwrap().to_str().unwrap(),
196            "http://localhost/"
197        );
198        assert_eq!(
199            resp.headers()
200                .get("content-type")
201                .unwrap()
202                .to_str()
203                .unwrap(),
204            "application/json"
205        );
206        assert_eq!(
207            resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
208            "10"
209        );
210
211        let bytes = test::load_stream(resp.take_body().into_stream())
212            .await
213            .unwrap();
214        assert_eq!(j.to_string().as_bytes(), bytes.as_ref())
215    }
216}