cloudevents/binding/axum/
extract.rs

1use axum::body::Bytes;
2use axum::extract::{FromRequest, Request};
3use axum::response::Response;
4use axum_lib as axum;
5use http;
6use http::StatusCode;
7
8use crate::binding::http::to_event;
9use crate::event::Event;
10
11impl<S> FromRequest<S> for Event
12where
13    Bytes: FromRequest<S>,
14    S: Send + Sync,
15{
16    type Rejection = Response;
17
18    async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
19        let (parts, body) = req.into_parts();
20
21        let body = axum::body::to_bytes(body, usize::MAX).await.map_err(|e| {
22            Response::builder()
23                .status(StatusCode::INTERNAL_SERVER_ERROR)
24                .body(axum::body::Body::from(e.to_string()))
25                .unwrap()
26        })?;
27
28        to_event(&parts.headers, body.to_vec()).map_err(|e| {
29            Response::builder()
30                .status(StatusCode::BAD_REQUEST)
31                .body(axum::body::Body::from(e.to_string()))
32                .unwrap()
33        })
34    }
35}
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40    use axum::body::Body;
41    use axum::extract::FromRequest;
42    use axum::http::{self, Request, StatusCode};
43
44    use crate::test::fixtures;
45
46    #[tokio::test]
47    async fn axum_test_request() {
48        let expected = fixtures::v10::minimal_string_extension();
49
50        let request = Request::builder()
51            .method(http::Method::POST)
52            .header("ce-specversion", "1.0")
53            .header("ce-id", "0001")
54            .header("ce-type", "test_event.test_application")
55            .header("ce-source", "http://localhost/")
56            .header("ce-someint", "10")
57            .body(Body::empty())
58            .unwrap();
59
60        let result = Event::from_request(request, &()).await.unwrap();
61
62        assert_eq!(expected, result);
63    }
64
65    #[tokio::test]
66    async fn axum_test_bad_request() {
67        let request = Request::builder()
68            .method(http::Method::POST)
69            .header("ce-specversion", "BAD SPECIFICATION")
70            .header("ce-id", "0001")
71            .header("ce-type", "example.test")
72            .header("ce-source", "http://localhost/")
73            .header("ce-someint", "10")
74            .header("ce-time", fixtures::time().to_rfc3339())
75            .body(Body::empty())
76            .unwrap();
77
78        let result = Event::from_request(request, &()).await;
79        assert!(result.is_err());
80        let rejection = result.unwrap_err();
81
82        let reason = rejection.status();
83        assert_eq!(reason, StatusCode::BAD_REQUEST)
84    }
85
86    #[tokio::test]
87    async fn axum_test_request_with_full_data() {
88        let expected = fixtures::v10::full_binary_json_data_string_extension();
89
90        let request = Request::builder()
91            .method(http::Method::POST)
92            .header("ce-specversion", "1.0")
93            .header("ce-id", "0001")
94            .header("ce-type", "test_event.test_application")
95            .header("ce-source", "http://localhost/")
96            .header("ce-subject", "cloudevents-sdk")
97            .header("content-type", "application/json")
98            .header("ce-string_ex", "val")
99            .header("ce-int_ex", "10")
100            .header("ce-bool_ex", "true")
101            .header("ce-time", &fixtures::time().to_rfc3339())
102            .body(Body::from(fixtures::json_data_binary()))
103            .unwrap();
104
105        let result = Event::from_request(request, &()).await.unwrap();
106
107        assert_eq!(expected, result);
108    }
109}