cloudevents/binding/reqwest/
client_request.rs

1use reqwest_lib as reqwest;
2
3use crate::binding::{
4    http::{header_prefix, SPEC_VERSION_HEADER},
5    CLOUDEVENTS_BATCH_JSON_HEADER, CLOUDEVENTS_JSON_HEADER,
6};
7use crate::event::SpecVersion;
8use crate::message::{
9    BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
10};
11use crate::Event;
12use reqwest::RequestBuilder;
13
14// TODO: Ideally, we'd only need to implement binding::http::Builder
15// for reqwest::RequestBuilder here, but because the latter is a
16// consuming builder, we'd need an intermediate struct similar to
17// warp's to adapt that interface. Unfortunately, the reqwest builder
18// doesn't implement the Default trait, so I can't use take() as
19// warp's Adapter does, and I've yet to come up with another
20// solution. So for now, we continue to implement BinarySerializer
21// directly in here.
22
23/// Wrapper for [`RequestBuilder`] that implements [`StructuredSerializer`] & [`BinarySerializer`] traits.
24pub struct RequestSerializer {
25    req: RequestBuilder,
26}
27
28impl RequestSerializer {
29    pub fn new(req: RequestBuilder) -> RequestSerializer {
30        RequestSerializer { req }
31    }
32}
33
34impl BinarySerializer<RequestBuilder> for RequestSerializer {
35    fn set_spec_version(mut self, spec_ver: SpecVersion) -> Result<Self> {
36        self.req = self.req.header(SPEC_VERSION_HEADER, spec_ver.to_string());
37        Ok(self)
38    }
39
40    fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
41        let key = &header_prefix(name);
42        self.req = self.req.header(key, value.to_string());
43        Ok(self)
44    }
45
46    fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
47        let key = &header_prefix(name);
48        self.req = self.req.header(key, value.to_string());
49        Ok(self)
50    }
51
52    fn end_with_data(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
53        Ok(self.req.body(bytes))
54    }
55
56    fn end(self) -> Result<RequestBuilder> {
57        Ok(self.req)
58    }
59}
60
61impl StructuredSerializer<RequestBuilder> for RequestSerializer {
62    fn set_structured_event(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
63        Ok(self
64            .req
65            .header(reqwest::header::CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER)
66            .body(bytes))
67    }
68}
69
70/// Method to fill a [`RequestBuilder`] with an [`Event`].
71pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result<RequestBuilder> {
72    BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder))
73}
74
75/// Method to fill a [`RequestBuilder`] with a batched [`Vec<Event>`].
76pub fn events_to_request(
77    events: Vec<Event>,
78    request_builder: RequestBuilder,
79) -> Result<RequestBuilder> {
80    let bytes = serde_json::to_vec(&events)?;
81    Ok(request_builder
82        .header(reqwest::header::CONTENT_TYPE, CLOUDEVENTS_BATCH_JSON_HEADER)
83        .body(bytes))
84}
85
86/// Extension Trait for [`RequestBuilder`] which acts as a wrapper for the function [`event_to_request()`].
87///
88/// This trait is sealed and cannot be implemented for types outside of this crate.
89pub trait RequestBuilderExt: private::Sealed {
90    /// Write in this [`RequestBuilder`] the provided [`Event`]. Similar to invoking [`Event`].
91    fn event(self, event: Event) -> Result<RequestBuilder>;
92    /// Write in this [`RequestBuilder`] the provided batched [`Vec<Event>`].
93    fn events(self, events: Vec<Event>) -> Result<RequestBuilder>;
94}
95
96impl RequestBuilderExt for RequestBuilder {
97    fn event(self, event: Event) -> Result<RequestBuilder> {
98        event_to_request(event, self)
99    }
100
101    fn events(self, events: Vec<Event>) -> Result<RequestBuilder> {
102        events_to_request(events, self)
103    }
104}
105
106// Sealing the RequestBuilderExt
107mod private {
108    use reqwest_lib as reqwest;
109
110    pub trait Sealed {}
111    impl Sealed for reqwest::RequestBuilder {}
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117    use mockito::Matcher;
118    use reqwest_lib as reqwest;
119
120    use crate::message::StructuredDeserializer;
121    use crate::test::fixtures;
122
123    #[tokio::test]
124    async fn test_request() {
125        let url = mockito::server_url();
126        let m = mockito::mock("POST", "/")
127            .match_header("ce-specversion", "1.0")
128            .match_header("ce-id", "0001")
129            .match_header("ce-type", "test_event.test_application")
130            .match_header("ce-source", "http://localhost/")
131            .match_header("ce-someint", "10")
132            .match_body(Matcher::Missing)
133            .create();
134
135        let input = fixtures::v10::minimal_string_extension();
136
137        let client = reqwest::Client::new();
138        client
139            .post(&url)
140            .event(input)
141            .unwrap()
142            .send()
143            .await
144            .unwrap();
145
146        m.assert();
147    }
148
149    #[tokio::test]
150    async fn test_request_with_full_data() {
151        let url = mockito::server_url();
152        let m = mockito::mock("POST", "/")
153            .match_header("ce-specversion", "1.0")
154            .match_header("ce-id", "0001")
155            .with_header("ce-type", "test_event.test_application")
156            .with_header("ce-source", "http://localhost/")
157            .with_header("ce-subject", "cloudevents-sdk")
158            .with_header("content-type", "application/json")
159            .with_header("ce-string_ex", "val")
160            .with_header("ce-int_ex", "10")
161            .with_header("ce-bool_ex", "true")
162            .with_header("ce-time", &fixtures::time().to_rfc3339())
163            .match_body(Matcher::Exact(fixtures::json_data().to_string()))
164            .create();
165
166        let input = fixtures::v10::full_binary_json_data_string_extension();
167
168        let client = reqwest::Client::new();
169
170        client
171            .post(&url)
172            .event(input)
173            .unwrap()
174            .send()
175            .await
176            .unwrap();
177
178        m.assert();
179    }
180
181    #[tokio::test]
182    async fn test_structured_request_with_full_data() {
183        let input = fixtures::v10::full_json_data_string_extension();
184
185        let url = mockito::server_url();
186        let m = mockito::mock("POST", "/")
187            .match_header("content-type", "application/cloudevents+json")
188            .match_body(Matcher::Exact(serde_json::to_string(&input).unwrap()))
189            .create();
190
191        let client = reqwest::Client::new();
192        StructuredDeserializer::deserialize_structured(
193            input,
194            RequestSerializer::new(client.post(&url)),
195        )
196        .unwrap()
197        .send()
198        .await
199        .unwrap();
200
201        m.assert();
202    }
203
204    #[tokio::test]
205    async fn test_batched_request() {
206        let input = vec![fixtures::v10::full_json_data_string_extension()];
207
208        let url = mockito::server_url();
209        let m = mockito::mock("POST", "/")
210            .match_header("content-type", "application/cloudevents-batch+json")
211            .match_body(Matcher::Exact(serde_json::to_string(&input).unwrap()))
212            .create();
213
214        let client = reqwest::Client::new();
215        client
216            .post(&url)
217            .events(input)
218            .unwrap()
219            .send()
220            .await
221            .unwrap();
222
223        m.assert();
224    }
225}