cloudevents/binding/actix/
server_request.rs1use crate::binding::http_0_2::{to_event, Headers};
2use crate::Event;
3use actix_web::dev::Payload;
4use actix_web::web::BytesMut;
5use actix_web::{web, HttpRequest};
6use async_trait::async_trait;
7use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
8use http::header::{AsHeaderName, HeaderName, HeaderValue};
9use http_0_2 as http;
10
11impl<'a> Headers<'a> for actix_http::header::HeaderMap {
13 type Iterator = Box<dyn Iterator<Item = (&'a HeaderName, &'a HeaderValue)> + 'a>;
14 fn get<K: AsHeaderName>(&self, key: K) -> Option<&HeaderValue> {
15 self.get(key.as_str())
16 }
17 fn iter(&'a self) -> Self::Iterator {
18 Box::new(self.iter())
19 }
20}
21
22pub async fn request_to_event(
24 req: &HttpRequest,
25 mut payload: web::Payload,
26) -> std::result::Result<Event, actix_web::error::Error> {
27 let mut bytes = BytesMut::new();
28 while let Some(item) = payload.next().await {
29 bytes.extend_from_slice(&item?);
30 }
31 to_event(req.headers(), bytes.to_vec()).map_err(actix_web::error::ErrorBadRequest)
32}
33
34impl actix_web::FromRequest for Event {
36 type Error = actix_web::Error;
37 type Future = LocalBoxFuture<'static, std::result::Result<Self, Self::Error>>;
38
39 fn from_request(r: &HttpRequest, p: &mut Payload) -> Self::Future {
40 let request = r.to_owned();
41 bytes::Bytes::from_request(&request, p)
42 .map(move |bytes| match bytes {
43 Ok(b) => to_event(request.headers(), b.to_vec())
44 .map_err(actix_web::error::ErrorBadRequest),
45 Err(e) => Err(e),
46 })
47 .boxed_local()
48 }
49}
50
51#[async_trait(?Send)]
55pub trait HttpRequestExt: private::Sealed {
56 async fn to_event(
58 &self,
59 mut payload: web::Payload,
60 ) -> std::result::Result<Event, actix_web::error::Error>;
61}
62
63#[async_trait(?Send)]
64impl HttpRequestExt for HttpRequest {
65 async fn to_event(
66 &self,
67 payload: web::Payload,
68 ) -> std::result::Result<Event, actix_web::error::Error> {
69 request_to_event(self, payload).await
70 }
71}
72
73mod private {
74 pub trait Sealed {}
76 impl Sealed for actix_web::HttpRequest {}
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82 use actix_web::{test, FromRequest};
83
84 use crate::test::fixtures;
85 use serde_json::json;
86
87 async fn to_event(req: &HttpRequest, mut payload: Payload) -> Event {
88 web::Payload::from_request(req, &mut payload)
89 .then(|p| req.to_event(p.unwrap()))
90 .await
91 .unwrap()
92 }
93
94 #[actix_rt::test]
95 async fn test_request() {
96 let expected = fixtures::v10::minimal_string_extension();
97
98 let (req, payload) = test::TestRequest::post()
99 .insert_header(("ce-specversion", "1.0"))
100 .insert_header(("ce-id", "0001"))
101 .insert_header(("ce-type", "test_event.test_application"))
102 .insert_header(("ce-source", "http://localhost/"))
103 .insert_header(("ce-someint", "10"))
104 .to_http_parts();
105
106 assert_eq!(expected, to_event(&req, payload).await);
107 }
108
109 #[actix_rt::test]
110 async fn test_request_with_full_data() {
111 let expected = fixtures::v10::full_binary_json_data_string_extension();
112
113 let (req, payload) = test::TestRequest::post()
114 .insert_header(("ce-specversion", "1.0"))
115 .insert_header(("ce-id", "0001"))
116 .insert_header(("ce-type", "test_event.test_application"))
117 .insert_header(("ce-subject", "cloudevents-sdk"))
118 .insert_header(("ce-source", "http://localhost/"))
119 .insert_header(("ce-time", fixtures::time().to_rfc3339()))
120 .insert_header(("ce-string_ex", "val"))
121 .insert_header(("ce-int_ex", "10"))
122 .insert_header(("ce-bool_ex", "true"))
123 .insert_header(("content-type", "application/json"))
124 .set_json(fixtures::json_data())
125 .to_http_parts();
126
127 assert_eq!(expected, to_event(&req, payload).await);
128 }
129
130 #[actix_rt::test]
131 async fn test_structured_request_with_full_data() {
132 let payload = json!({
133 "specversion": "1.0",
134 "id": "0001",
135 "type": "test_event.test_application",
136 "subject": "cloudevents-sdk",
137 "source": "http://localhost/",
138 "time": fixtures::time().to_rfc3339(),
139 "string_ex": "val",
140 "int_ex": "10",
141 "bool_ex": "true",
142 "datacontenttype": "application/json",
143 "data": fixtures::json_data()
144 });
145 let bytes = serde_json::to_string(&payload).expect("Failed to serialize test data to json");
146
147 let expected = fixtures::v10::full_json_data_string_extension();
148
149 let (req, payload) = test::TestRequest::post()
150 .insert_header(("content-type", "application/cloudevents+json"))
151 .set_payload(bytes)
152 .to_http_parts();
153
154 assert_eq!(expected, to_event(&req, payload).await);
155 }
156}