cloudevents_sdk_actix_web/
server_request.rs1use super::headers;
2use actix_web::http::HeaderName;
3use actix_web::web::{Bytes, BytesMut};
4use actix_web::{web, HttpMessage, HttpRequest};
5use async_trait::async_trait;
6use cloudevents::event::SpecVersion;
7use cloudevents::message::{
8 BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
9 Result, StructuredDeserializer, StructuredSerializer,
10};
11use cloudevents::{message, Event};
12use futures::StreamExt;
13use std::convert::TryFrom;
14
15pub struct HttpRequestDeserializer<'a> {
17 req: &'a HttpRequest,
18 body: Bytes,
19}
20
21impl HttpRequestDeserializer<'_> {
22 pub fn new(req: &HttpRequest, body: Bytes) -> HttpRequestDeserializer {
23 HttpRequestDeserializer { req, body }
24 }
25}
26
27impl<'a> BinaryDeserializer for HttpRequestDeserializer<'a> {
28 fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
29 if self.encoding() != Encoding::BINARY {
30 return Err(message::Error::WrongEncoding {});
31 }
32
33 let spec_version = SpecVersion::try_from(
34 unwrap_optional_header!(self.req.headers(), headers::SPEC_VERSION_HEADER).unwrap()?,
35 )?;
36
37 visitor = visitor.set_spec_version(spec_version.clone())?;
38
39 let attributes = spec_version.attribute_names();
40
41 for (hn, hv) in
42 self.req.headers().iter().filter(|(hn, _)| {
43 headers::SPEC_VERSION_HEADER.ne(hn) && hn.as_str().starts_with("ce-")
44 })
45 {
46 let name = &hn.as_str()["ce-".len()..];
47
48 if attributes.contains(&name) {
49 visitor = visitor.set_attribute(
50 name,
51 MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
52 )?
53 } else {
54 visitor = visitor.set_extension(
55 name,
56 MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
57 )?
58 }
59 }
60
61 if let Some(hv) = self.req.headers().get("content-type") {
62 visitor = visitor.set_attribute(
63 "datacontenttype",
64 MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
65 )?
66 }
67
68 if !self.body.is_empty() {
69 visitor.end_with_data(self.body.to_vec())
70 } else {
71 visitor.end()
72 }
73 }
74}
75
76impl<'a> StructuredDeserializer for HttpRequestDeserializer<'a> {
77 fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
78 if self.encoding() != Encoding::STRUCTURED {
79 return Err(message::Error::WrongEncoding {});
80 }
81 visitor.set_structured_event(self.body.to_vec())
82 }
83}
84
85impl<'a> MessageDeserializer for HttpRequestDeserializer<'a> {
86 fn encoding(&self) -> Encoding {
87 if self.req.content_type() == "application/cloudevents+json" {
88 Encoding::STRUCTURED
89 } else if self
90 .req
91 .headers()
92 .get::<&'static HeaderName>(&super::headers::SPEC_VERSION_HEADER)
93 .is_some()
94 {
95 Encoding::BINARY
96 } else {
97 Encoding::UNKNOWN
98 }
99 }
100}
101
102pub async fn request_to_event(
104 req: &HttpRequest,
105 mut payload: web::Payload,
106) -> std::result::Result<Event, actix_web::error::Error> {
107 let mut bytes = BytesMut::new();
108 while let Some(item) = payload.next().await {
109 bytes.extend_from_slice(&item?);
110 }
111 MessageDeserializer::into_event(HttpRequestDeserializer::new(req, bytes.freeze()))
112 .map_err(actix_web::error::ErrorBadRequest)
113}
114
115#[async_trait(?Send)]
119pub trait HttpRequestExt: private::Sealed {
120 async fn to_event(
122 &self,
123 mut payload: web::Payload,
124 ) -> std::result::Result<Event, actix_web::error::Error>;
125}
126
127#[async_trait(?Send)]
128impl HttpRequestExt for HttpRequest {
129 async fn to_event(
130 &self,
131 payload: web::Payload,
132 ) -> std::result::Result<Event, actix_web::error::Error> {
133 request_to_event(self, payload).await
134 }
135}
136
137mod private {
138 pub trait Sealed {}
140 impl Sealed for actix_web::HttpRequest {}
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use actix_web::test;
147 use url::Url;
148
149 use chrono::Utc;
150 use cloudevents::{EventBuilder, EventBuilderV10};
151 use serde_json::json;
152 use std::str::FromStr;
153
154 #[actix_rt::test]
155 async fn test_request() {
156 let time = Utc::now();
157 let expected = EventBuilderV10::new()
158 .id("0001")
159 .ty("example.test")
160 .source("http://localhost/")
161 .time(time)
164 .extension("someint", "10")
165 .build()
166 .unwrap();
167
168 let (req, payload) = test::TestRequest::post()
169 .header("ce-specversion", "1.0")
170 .header("ce-id", "0001")
171 .header("ce-type", "example.test")
172 .header("ce-source", "http://localhost/")
173 .header("ce-someint", "10")
174 .header("ce-time", time.to_rfc3339())
175 .to_http_parts();
176
177 let resp = req.to_event(web::Payload(payload)).await.unwrap();
178 assert_eq!(expected, resp);
179 }
180
181 #[actix_rt::test]
182 async fn test_request_with_full_data() {
183 let time = Utc::now();
184 let j = json!({"hello": "world"});
185
186 let expected = EventBuilderV10::new()
187 .id("0001")
188 .ty("example.test")
189 .source(Url::from_str("http://localhost").unwrap())
190 .time(time)
193 .data("application/json", j.to_string().into_bytes())
194 .extension("someint", "10")
195 .build()
196 .unwrap();
197
198 let (req, payload) = test::TestRequest::post()
199 .header("ce-specversion", "1.0")
200 .header("ce-id", "0001")
201 .header("ce-type", "example.test")
202 .header("ce-source", "http://localhost")
203 .header("ce-someint", "10")
204 .header("ce-time", time.to_rfc3339())
205 .header("content-type", "application/json")
206 .set_json(&j)
207 .to_http_parts();
208
209 let resp = req.to_event(web::Payload(payload)).await.unwrap();
210 assert_eq!(expected, resp);
211 }
212}