cloudevents/binding/reqwest/
client_response.rs

1use reqwest_lib as reqwest;
2
3use crate::binding;
4use crate::message::{Error, Result};
5use crate::Event;
6use async_trait::async_trait;
7use http;
8use http::header;
9use reqwest::Response;
10
11/// Method to transform an incoming [`Response`] to [`Event`].
12pub async fn response_to_event(res: Response) -> Result<Event> {
13    let h = res.headers().to_owned();
14    let b = res.bytes().await.map_err(|e| Error::Other {
15        source: Box::new(e),
16    })?;
17    binding::http::to_event(&h, b.to_vec())
18}
19
20/// Method to transform an incoming [`Response`] to a batched [`Vec<Event>`]
21pub async fn response_to_events(res: Response) -> Result<Vec<Event>> {
22    if res
23        .headers()
24        .get(header::CONTENT_TYPE)
25        .and_then(|v| v.to_str().ok())
26        .filter(|&v| v.starts_with(binding::CLOUDEVENTS_BATCH_JSON_HEADER))
27        .is_none()
28    {
29        return Err(Error::WrongEncoding {});
30    }
31
32    let bytes = res.bytes().await.map_err(|e| Error::Other {
33        source: Box::new(e),
34    })?;
35
36    Ok(serde_json::from_slice(&bytes)?)
37}
38
39/// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`].
40///
41/// This trait is sealed and cannot be implemented for types outside of this crate.
42#[async_trait(?Send)]
43pub trait ResponseExt: private::Sealed {
44    /// Convert this [`Response`] to [`Event`].
45    async fn into_event(self) -> Result<Event>;
46    /// Convert this [`Response`] to a batched [`Vec<Event>`].
47    async fn into_events(self) -> Result<Vec<Event>>;
48}
49
50#[async_trait(?Send)]
51impl ResponseExt for Response {
52    async fn into_event(self) -> Result<Event> {
53        response_to_event(self).await
54    }
55
56    async fn into_events(self) -> Result<Vec<Event>> {
57        response_to_events(self).await
58    }
59}
60
61// Sealing the ResponseExt
62mod private {
63    use reqwest_lib as reqwest;
64
65    pub trait Sealed {}
66    impl Sealed for reqwest::Response {}
67}
68
69#[cfg(test)]
70mod tests {
71    use super::*;
72    use reqwest_lib as reqwest;
73    use std::vec;
74
75    use crate::test::fixtures;
76
77    #[tokio::test]
78    async fn test_response() {
79        let url = mockito::server_url();
80        let _m = mockito::mock("GET", "/")
81            .with_status(200)
82            .with_header("ce-specversion", "1.0")
83            .with_header("ce-id", "0001")
84            .with_header("ce-type", "test_event.test_application")
85            .with_header("ce-source", "http://localhost/")
86            .with_header("ce-someint", "10")
87            .create();
88
89        let expected = fixtures::v10::minimal_string_extension();
90
91        let client = reqwest::Client::new();
92        let res = client
93            .get(&url)
94            .send()
95            .await
96            .unwrap()
97            .into_event()
98            .await
99            .unwrap();
100
101        assert_eq!(expected, res);
102    }
103
104    #[tokio::test]
105    async fn test_response_with_full_data() {
106        let url = mockito::server_url();
107        let _m = mockito::mock("GET", "/")
108            .with_status(200)
109            .with_header("ce-specversion", "1.0")
110            .with_header("ce-id", "0001")
111            .with_header("ce-type", "test_event.test_application")
112            .with_header("ce-source", "http://localhost/")
113            .with_header("ce-subject", "cloudevents-sdk")
114            .with_header("content-type", "application/json")
115            .with_header("ce-string_ex", "val")
116            .with_header("ce-int_ex", "10")
117            .with_header("ce-bool_ex", "true")
118            .with_header("ce-time", &fixtures::time().to_rfc3339())
119            .with_body(fixtures::json_data().to_string())
120            .create();
121
122        let expected = fixtures::v10::full_binary_json_data_string_extension();
123
124        let client = reqwest::Client::new();
125        let res = client
126            .get(&url)
127            .send()
128            .await
129            .unwrap()
130            .into_event()
131            .await
132            .unwrap();
133
134        assert_eq!(expected, res);
135    }
136
137    #[tokio::test]
138    async fn test_structured_response_with_full_data() {
139        let expected = fixtures::v10::full_json_data_string_extension();
140
141        let url = mockito::server_url();
142        let _m = mockito::mock("GET", "/")
143            .with_status(200)
144            .with_header(
145                "content-type",
146                "application/cloudevents+json; charset=utf-8",
147            )
148            .with_body(serde_json::to_string(&expected).unwrap())
149            .create();
150
151        let client = reqwest::Client::new();
152        let res = client
153            .get(&url)
154            .send()
155            .await
156            .unwrap()
157            .into_event()
158            .await
159            .unwrap();
160
161        assert_eq!(expected, res);
162    }
163
164    #[tokio::test]
165    async fn test_batched_response() {
166        let expected = vec![fixtures::v10::full_json_data_string_extension()];
167
168        let url = mockito::server_url();
169        let _m = mockito::mock("GET", "/")
170            .with_status(200)
171            .with_header(
172                "content-type",
173                "application/cloudevents-batch+json; charset=utf-8",
174            )
175            .with_body(serde_json::to_string(&expected).unwrap())
176            .create();
177
178        let client = reqwest::Client::new();
179        let res = client
180            .get(&url)
181            .send()
182            .await
183            .unwrap()
184            .into_events()
185            .await
186            .unwrap();
187
188        assert_eq!(expected, res);
189    }
190}