cloudevents_sdk_actix_web/
server_response.rs1use super::headers;
2use actix_web::dev::HttpResponseBuilder;
3use actix_web::http::{HeaderName, HeaderValue};
4use actix_web::HttpResponse;
5use async_trait::async_trait;
6use cloudevents::event::SpecVersion;
7use cloudevents::message::{
8 BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
9};
10use cloudevents::Event;
11use std::str::FromStr;
12
13pub struct HttpResponseSerializer {
15 builder: HttpResponseBuilder,
16}
17
18impl HttpResponseSerializer {
19 pub fn new(builder: HttpResponseBuilder) -> HttpResponseSerializer {
20 HttpResponseSerializer { builder }
21 }
22}
23
24impl BinarySerializer<HttpResponse> for HttpResponseSerializer {
25 fn set_spec_version(mut self, spec_version: SpecVersion) -> Result<Self> {
26 self.builder.set_header(
27 headers::SPEC_VERSION_HEADER.clone(),
28 str_to_header_value!(spec_version.as_str())?,
29 );
30 Ok(self)
31 }
32
33 fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
34 self.builder.set_header(
35 headers::ATTRIBUTES_TO_HEADERS.get(name).unwrap().clone(),
36 str_to_header_value!(value.to_string().as_str())?,
37 );
38 Ok(self)
39 }
40
41 fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
42 self.builder.set_header(
43 attribute_name_to_header!(name)?,
44 str_to_header_value!(value.to_string().as_str())?,
45 );
46 Ok(self)
47 }
48
49 fn end_with_data(mut self, bytes: Vec<u8>) -> Result<HttpResponse> {
50 Ok(self.builder.body(bytes))
51 }
52
53 fn end(mut self) -> Result<HttpResponse> {
54 Ok(self.builder.finish())
55 }
56}
57
58impl StructuredSerializer<HttpResponse> for HttpResponseSerializer {
59 fn set_structured_event(mut self, bytes: Vec<u8>) -> Result<HttpResponse> {
60 Ok(self
61 .builder
62 .set_header(
63 actix_web::http::header::CONTENT_TYPE,
64 headers::CLOUDEVENTS_JSON_HEADER.clone(),
65 )
66 .body(bytes))
67 }
68}
69
70pub async fn event_to_response(
72 event: Event,
73 response: HttpResponseBuilder,
74) -> std::result::Result<HttpResponse, actix_web::error::Error> {
75 BinaryDeserializer::deserialize_binary(event, HttpResponseSerializer::new(response))
76 .map_err(actix_web::error::ErrorBadRequest)
77}
78
79#[async_trait(?Send)]
83pub trait HttpResponseBuilderExt: private::Sealed {
84 async fn event(
86 self,
87 event: Event,
88 ) -> std::result::Result<HttpResponse, actix_web::error::Error>;
89}
90
91#[async_trait(?Send)]
92impl HttpResponseBuilderExt for HttpResponseBuilder {
93 async fn event(
94 self,
95 event: Event,
96 ) -> std::result::Result<HttpResponse, actix_web::error::Error> {
97 event_to_response(event, self).await
98 }
99}
100
101mod private {
103 pub trait Sealed {}
104 impl Sealed for actix_web::dev::HttpResponseBuilder {}
105}
106
107#[cfg(test)]
108mod tests {
109 use super::*;
110 use url::Url;
111
112 use actix_web::http::StatusCode;
113 use actix_web::test;
114 use cloudevents::{EventBuilder, EventBuilderV10};
115 use futures::TryStreamExt;
116 use serde_json::json;
117 use std::str::FromStr;
118
119 #[actix_rt::test]
120 async fn test_response() {
121 let input = EventBuilderV10::new()
122 .id("0001")
123 .ty("example.test")
124 .source(Url::from_str("http://localhost/").unwrap())
125 .extension("someint", "10")
126 .build()
127 .unwrap();
128
129 let resp = HttpResponseBuilder::new(StatusCode::OK)
130 .event(input)
131 .await
132 .unwrap();
133
134 assert_eq!(
135 resp.headers()
136 .get("ce-specversion")
137 .unwrap()
138 .to_str()
139 .unwrap(),
140 "1.0"
141 );
142 assert_eq!(
143 resp.headers().get("ce-id").unwrap().to_str().unwrap(),
144 "0001"
145 );
146 assert_eq!(
147 resp.headers().get("ce-type").unwrap().to_str().unwrap(),
148 "example.test"
149 );
150 assert_eq!(
151 resp.headers().get("ce-source").unwrap().to_str().unwrap(),
152 "http://localhost/"
153 );
154 assert_eq!(
155 resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
156 "10"
157 );
158 }
159
160 #[actix_rt::test]
161 async fn test_response_with_full_data() {
162 let j = json!({"hello": "world"});
163
164 let input = EventBuilderV10::new()
165 .id("0001")
166 .ty("example.test")
167 .source(Url::from_str("http://localhost").unwrap())
168 .data("application/json", j.clone())
169 .extension("someint", "10")
170 .build()
171 .unwrap();
172
173 let mut resp = HttpResponseBuilder::new(StatusCode::OK)
174 .event(input)
175 .await
176 .unwrap();
177
178 assert_eq!(
179 resp.headers()
180 .get("ce-specversion")
181 .unwrap()
182 .to_str()
183 .unwrap(),
184 "1.0"
185 );
186 assert_eq!(
187 resp.headers().get("ce-id").unwrap().to_str().unwrap(),
188 "0001"
189 );
190 assert_eq!(
191 resp.headers().get("ce-type").unwrap().to_str().unwrap(),
192 "example.test"
193 );
194 assert_eq!(
195 resp.headers().get("ce-source").unwrap().to_str().unwrap(),
196 "http://localhost/"
197 );
198 assert_eq!(
199 resp.headers()
200 .get("content-type")
201 .unwrap()
202 .to_str()
203 .unwrap(),
204 "application/json"
205 );
206 assert_eq!(
207 resp.headers().get("ce-someint").unwrap().to_str().unwrap(),
208 "10"
209 );
210
211 let bytes = test::load_stream(resp.take_body().into_stream())
212 .await
213 .unwrap();
214 assert_eq!(j.to_string().as_bytes(), bytes.as_ref())
215 }
216}