cloudevents/binding/reqwest/
client_request.rs1use reqwest_lib as reqwest;
2
3use crate::binding::{
4 http::{header_prefix, SPEC_VERSION_HEADER},
5 CLOUDEVENTS_BATCH_JSON_HEADER, CLOUDEVENTS_JSON_HEADER,
6};
7use crate::event::SpecVersion;
8use crate::message::{
9 BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, StructuredSerializer,
10};
11use crate::Event;
12use reqwest::RequestBuilder;
13
14pub struct RequestSerializer {
25 req: RequestBuilder,
26}
27
28impl RequestSerializer {
29 pub fn new(req: RequestBuilder) -> RequestSerializer {
30 RequestSerializer { req }
31 }
32}
33
34impl BinarySerializer<RequestBuilder> for RequestSerializer {
35 fn set_spec_version(mut self, spec_ver: SpecVersion) -> Result<Self> {
36 self.req = self.req.header(SPEC_VERSION_HEADER, spec_ver.to_string());
37 Ok(self)
38 }
39
40 fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
41 let key = &header_prefix(name);
42 self.req = self.req.header(key, value.to_string());
43 Ok(self)
44 }
45
46 fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
47 let key = &header_prefix(name);
48 self.req = self.req.header(key, value.to_string());
49 Ok(self)
50 }
51
52 fn end_with_data(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
53 Ok(self.req.body(bytes))
54 }
55
56 fn end(self) -> Result<RequestBuilder> {
57 Ok(self.req)
58 }
59}
60
61impl StructuredSerializer<RequestBuilder> for RequestSerializer {
62 fn set_structured_event(self, bytes: Vec<u8>) -> Result<RequestBuilder> {
63 Ok(self
64 .req
65 .header(reqwest::header::CONTENT_TYPE, CLOUDEVENTS_JSON_HEADER)
66 .body(bytes))
67 }
68}
69
70pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result<RequestBuilder> {
72 BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder))
73}
74
75pub fn events_to_request(
77 events: Vec<Event>,
78 request_builder: RequestBuilder,
79) -> Result<RequestBuilder> {
80 let bytes = serde_json::to_vec(&events)?;
81 Ok(request_builder
82 .header(reqwest::header::CONTENT_TYPE, CLOUDEVENTS_BATCH_JSON_HEADER)
83 .body(bytes))
84}
85
86pub trait RequestBuilderExt: private::Sealed {
90 fn event(self, event: Event) -> Result<RequestBuilder>;
92 fn events(self, events: Vec<Event>) -> Result<RequestBuilder>;
94}
95
96impl RequestBuilderExt for RequestBuilder {
97 fn event(self, event: Event) -> Result<RequestBuilder> {
98 event_to_request(event, self)
99 }
100
101 fn events(self, events: Vec<Event>) -> Result<RequestBuilder> {
102 events_to_request(events, self)
103 }
104}
105
106mod private {
108 use reqwest_lib as reqwest;
109
110 pub trait Sealed {}
111 impl Sealed for reqwest::RequestBuilder {}
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117 use mockito::Matcher;
118 use reqwest_lib as reqwest;
119
120 use crate::message::StructuredDeserializer;
121 use crate::test::fixtures;
122
123 #[tokio::test]
124 async fn test_request() {
125 let url = mockito::server_url();
126 let m = mockito::mock("POST", "/")
127 .match_header("ce-specversion", "1.0")
128 .match_header("ce-id", "0001")
129 .match_header("ce-type", "test_event.test_application")
130 .match_header("ce-source", "http://localhost/")
131 .match_header("ce-someint", "10")
132 .match_body(Matcher::Missing)
133 .create();
134
135 let input = fixtures::v10::minimal_string_extension();
136
137 let client = reqwest::Client::new();
138 client
139 .post(&url)
140 .event(input)
141 .unwrap()
142 .send()
143 .await
144 .unwrap();
145
146 m.assert();
147 }
148
149 #[tokio::test]
150 async fn test_request_with_full_data() {
151 let url = mockito::server_url();
152 let m = mockito::mock("POST", "/")
153 .match_header("ce-specversion", "1.0")
154 .match_header("ce-id", "0001")
155 .with_header("ce-type", "test_event.test_application")
156 .with_header("ce-source", "http://localhost/")
157 .with_header("ce-subject", "cloudevents-sdk")
158 .with_header("content-type", "application/json")
159 .with_header("ce-string_ex", "val")
160 .with_header("ce-int_ex", "10")
161 .with_header("ce-bool_ex", "true")
162 .with_header("ce-time", &fixtures::time().to_rfc3339())
163 .match_body(Matcher::Exact(fixtures::json_data().to_string()))
164 .create();
165
166 let input = fixtures::v10::full_binary_json_data_string_extension();
167
168 let client = reqwest::Client::new();
169
170 client
171 .post(&url)
172 .event(input)
173 .unwrap()
174 .send()
175 .await
176 .unwrap();
177
178 m.assert();
179 }
180
181 #[tokio::test]
182 async fn test_structured_request_with_full_data() {
183 let input = fixtures::v10::full_json_data_string_extension();
184
185 let url = mockito::server_url();
186 let m = mockito::mock("POST", "/")
187 .match_header("content-type", "application/cloudevents+json")
188 .match_body(Matcher::Exact(serde_json::to_string(&input).unwrap()))
189 .create();
190
191 let client = reqwest::Client::new();
192 StructuredDeserializer::deserialize_structured(
193 input,
194 RequestSerializer::new(client.post(&url)),
195 )
196 .unwrap()
197 .send()
198 .await
199 .unwrap();
200
201 m.assert();
202 }
203
204 #[tokio::test]
205 async fn test_batched_request() {
206 let input = vec![fixtures::v10::full_json_data_string_extension()];
207
208 let url = mockito::server_url();
209 let m = mockito::mock("POST", "/")
210 .match_header("content-type", "application/cloudevents-batch+json")
211 .match_body(Matcher::Exact(serde_json::to_string(&input).unwrap()))
212 .create();
213
214 let client = reqwest::Client::new();
215 client
216 .post(&url)
217 .events(input)
218 .unwrap()
219 .send()
220 .await
221 .unwrap();
222
223 m.assert();
224 }
225}