cloudevents/binding/warp/
filter.rs1use warp_lib as warp;
2
3use crate::binding::http_0_2 as http;
4
5use crate::Event;
6use warp::http::HeaderMap;
7use warp::Filter;
8use warp::Rejection;
9
10#[derive(Debug)]
11#[allow(dead_code)]
12pub struct EventFilterError {
13 error: crate::message::Error,
14}
15
16impl warp::reject::Reject for EventFilterError {}
17
18pub fn to_event() -> impl Filter<Extract = (Event,), Error = Rejection> + Copy {
36 warp::header::headers_cloned()
37 .and(warp::body::bytes())
38 .and_then(create_event)
39}
40
41async fn create_event(headers: HeaderMap, body: bytes::Bytes) -> Result<Event, Rejection> {
42 http::to_event(&headers, body.to_vec())
43 .map_err(|error| warp::reject::custom(EventFilterError { error }))
44}
45
46#[cfg(test)]
47mod tests {
48 use super::to_event;
49 use crate::test::fixtures;
50 use std::convert::TryInto;
51 use warp::test;
52 use warp_lib as warp;
53
54 #[tokio::test]
55 async fn test_request() {
56 let expected = fixtures::v10::minimal_string_extension();
57
58 let result = test::request()
59 .method("POST")
60 .header("ce-specversion", "1.0")
61 .header("ce-id", "0001")
62 .header("ce-type", "test_event.test_application")
63 .header("ce-source", "http://localhost/")
64 .header("ce-someint", "10")
65 .filter(&to_event())
66 .await
67 .unwrap();
68
69 assert_eq!(expected, result);
70 }
71
72 #[tokio::test]
73 async fn test_bad_request() {
74 let result = test::request()
75 .method("POST")
76 .header("ce-specversion", "BAD SPECIFICATION")
77 .header("ce-id", "0001")
78 .header("ce-type", "example.test")
79 .header("ce-source", "http://localhost/")
80 .header("ce-someint", "10")
81 .header("ce-time", fixtures::time().to_rfc3339())
82 .filter(&to_event())
83 .await;
84
85 assert!(result.is_err());
86 let rejection = result.unwrap_err();
87
88 let reason = rejection.find::<super::EventFilterError>().unwrap();
89 assert_eq!(
90 reason.error.to_string(),
91 "Invalid specversion BAD SPECIFICATION"
92 )
93 }
94
95 #[tokio::test]
96 async fn test_request_with_full_data() {
97 let expected = fixtures::v10::full_binary_json_data_string_extension();
98
99 let result = test::request()
100 .method("POST")
101 .header("ce-specversion", "1.0")
102 .header("ce-id", "0001")
103 .header("ce-type", "test_event.test_application")
104 .header("ce-source", "http://localhost/")
105 .header("ce-subject", "cloudevents-sdk")
106 .header("content-type", "application/json")
107 .header("ce-string_ex", "val")
108 .header("ce-int_ex", "10")
109 .header("ce-bool_ex", "true")
110 .header("ce-time", &fixtures::time().to_rfc3339())
111 .json(&fixtures::json_data())
112 .filter(&to_event())
113 .await
114 .unwrap();
115
116 let mut event = result.clone();
117 let (_datacontenttype, _dataschema, data) = event.take_data();
118 let actual_payload: Vec<u8> = data.unwrap().try_into().unwrap();
119 let expected_payload: Vec<u8> = serde_json::to_vec(&fixtures::json_data()).unwrap();
120 assert_eq!(expected_payload, actual_payload);
121
122 assert_eq!(expected, result);
123 }
124}