cloudevents/binding/warp/
filter.rs

1use warp_lib as warp;
2
3use crate::binding::http_0_2 as http;
4
5use crate::Event;
6use warp::http::HeaderMap;
7use warp::Filter;
8use warp::Rejection;
9
10#[derive(Debug)]
11#[allow(dead_code)]
12pub struct EventFilterError {
13    error: crate::message::Error,
14}
15
16impl warp::reject::Reject for EventFilterError {}
17
18///
19/// # Extracts [`crate::Event`] from incoming request
20///
21/// ```
22/// # use warp_lib as warp;
23/// use cloudevents::binding::warp::filter::to_event;
24/// use warp::Filter;
25/// use warp::Reply;
26///
27/// let routes = warp::any()
28///    .and(to_event())
29///    .map(|event| {
30///         // do something with the event
31///     }
32/// );
33/// ```
34///
35pub fn to_event() -> impl Filter<Extract = (Event,), Error = Rejection> + Copy {
36    warp::header::headers_cloned()
37        .and(warp::body::bytes())
38        .and_then(create_event)
39}
40
41async fn create_event(headers: HeaderMap, body: bytes::Bytes) -> Result<Event, Rejection> {
42    http::to_event(&headers, body.to_vec())
43        .map_err(|error| warp::reject::custom(EventFilterError { error }))
44}
45
46#[cfg(test)]
47mod tests {
48    use super::to_event;
49    use crate::test::fixtures;
50    use std::convert::TryInto;
51    use warp::test;
52    use warp_lib as warp;
53
54    #[tokio::test]
55    async fn test_request() {
56        let expected = fixtures::v10::minimal_string_extension();
57
58        let result = test::request()
59            .method("POST")
60            .header("ce-specversion", "1.0")
61            .header("ce-id", "0001")
62            .header("ce-type", "test_event.test_application")
63            .header("ce-source", "http://localhost/")
64            .header("ce-someint", "10")
65            .filter(&to_event())
66            .await
67            .unwrap();
68
69        assert_eq!(expected, result);
70    }
71
72    #[tokio::test]
73    async fn test_bad_request() {
74        let result = test::request()
75            .method("POST")
76            .header("ce-specversion", "BAD SPECIFICATION")
77            .header("ce-id", "0001")
78            .header("ce-type", "example.test")
79            .header("ce-source", "http://localhost/")
80            .header("ce-someint", "10")
81            .header("ce-time", fixtures::time().to_rfc3339())
82            .filter(&to_event())
83            .await;
84
85        assert!(result.is_err());
86        let rejection = result.unwrap_err();
87
88        let reason = rejection.find::<super::EventFilterError>().unwrap();
89        assert_eq!(
90            reason.error.to_string(),
91            "Invalid specversion BAD SPECIFICATION"
92        )
93    }
94
95    #[tokio::test]
96    async fn test_request_with_full_data() {
97        let expected = fixtures::v10::full_binary_json_data_string_extension();
98
99        let result = test::request()
100            .method("POST")
101            .header("ce-specversion", "1.0")
102            .header("ce-id", "0001")
103            .header("ce-type", "test_event.test_application")
104            .header("ce-source", "http://localhost/")
105            .header("ce-subject", "cloudevents-sdk")
106            .header("content-type", "application/json")
107            .header("ce-string_ex", "val")
108            .header("ce-int_ex", "10")
109            .header("ce-bool_ex", "true")
110            .header("ce-time", &fixtures::time().to_rfc3339())
111            .json(&fixtures::json_data())
112            .filter(&to_event())
113            .await
114            .unwrap();
115
116        let mut event = result.clone();
117        let (_datacontenttype, _dataschema, data) = event.take_data();
118        let actual_payload: Vec<u8> = data.unwrap().try_into().unwrap();
119        let expected_payload: Vec<u8> = serde_json::to_vec(&fixtures::json_data()).unwrap();
120        assert_eq!(expected_payload, actual_payload);
121
122        assert_eq!(expected, result);
123    }
124}