graph_http/blocking/
blocking_request_handler.rs1use crate::blocking::blocking_client::BlockingClient;
2use crate::internal::*;
3use graph_error::{ErrorMessage, GraphFailure, GraphResult};
4use http::header::CONTENT_TYPE;
5use http::{HeaderMap, HeaderName, HeaderValue};
6use serde::de::DeserializeOwned;
7use std::collections::VecDeque;
8use url::Url;
9
10#[derive(Default)]
11pub struct BlockingRequestHandler {
12 pub(crate) inner: BlockingClient,
13 pub(crate) request_components: RequestComponents,
14 pub(crate) error: Option<GraphFailure>,
15 pub(crate) body: Option<BodyRead>,
16}
17
18impl BlockingRequestHandler {
19 pub fn new(
20 inner: BlockingClient,
21 mut request_components: RequestComponents,
22 err: Option<GraphFailure>,
23 body: Option<BodyRead>,
24 ) -> BlockingRequestHandler {
25 request_components.headers.extend(inner.headers.clone());
26
27 let mut error = None;
28 if let Some(err) = err {
29 let message = err.to_string();
30 error = Some(GraphFailure::PreFlightError {
31 url: Some(request_components.url.clone()),
32 headers: Some(request_components.headers.clone()),
33 error: Some(Box::new(err)),
34 message,
35 });
36 }
37
38 BlockingRequestHandler {
39 inner,
40 request_components,
41 error,
42 body,
43 }
44 }
45
46 pub fn is_err(&self) -> bool {
55 self.error.is_some()
56 }
57
58 pub fn err(&self) -> Option<&GraphFailure> {
67 self.error.as_ref()
68 }
69
70 #[inline]
71 pub fn url(&self) -> Url {
72 self.request_components.url.clone()
73 }
74
75 #[inline]
76 pub fn query<T: serde::Serialize + ?Sized>(mut self, query: &T) -> Self {
77 if let Err(err) = self.request_components.query(query) {
78 if self.error.is_none() {
79 self.error = Some(err);
80 }
81 }
82
83 if let Some("") = self.request_components.url.query() {
84 self.request_components.url.set_query(None);
85 }
86 self
87 }
88
89 #[inline]
90 pub fn append_query_pair<KV: AsRef<str>>(mut self, key: KV, value: KV) -> Self {
91 self.request_components
92 .url
93 .query_pairs_mut()
94 .append_pair(key.as_ref(), value.as_ref());
95 self
96 }
97
98 #[inline]
99 pub fn extend_path<I: AsRef<str>>(mut self, path: &[I]) -> Self {
100 if let Ok(mut p) = self.request_components.url.path_segments_mut() {
101 p.extend(path);
102 }
103 self
104 }
105
106 #[inline]
108 pub fn header<K: Into<HeaderName>, V: Into<HeaderValue>>(
109 mut self,
110 header_name: K,
111 header_value: V,
112 ) -> Self {
113 self.request_components
114 .headers
115 .insert(header_name.into(), header_value.into());
116 self
117 }
118
119 #[inline]
121 pub fn headers(mut self, header_map: HeaderMap) -> Self {
122 self.request_components.headers.extend(header_map);
123 self
124 }
125
126 #[inline]
128 pub fn headers_mut(&mut self) -> &mut HeaderMap {
129 self.request_components.as_mut()
130 }
131
132 pub fn paging(self) -> BlockingPaging {
133 BlockingPaging(self)
134 }
135
136 #[inline]
137 fn default_request_builder(&mut self) -> GraphResult<reqwest::blocking::RequestBuilder> {
138 let access_token = self.inner.client_application.get_token_silent()?;
139
140 let request_builder = self
141 .inner
142 .inner
143 .request(
144 self.request_components.method.clone(),
145 self.request_components.url.clone(),
146 )
147 .bearer_auth(access_token.as_str())
148 .headers(self.request_components.headers.clone());
149
150 if let Some(body) = self.body.take() {
151 if body.has_byte_buf() {
152 self.request_components
153 .headers
154 .entry(CONTENT_TYPE)
155 .or_insert(HeaderValue::from_static("application/octet-stream"));
156 } else if body.has_string_buf() {
157 self.request_components
158 .headers
159 .entry(CONTENT_TYPE)
160 .or_insert(HeaderValue::from_static("application/json"));
161 }
162 return Ok(request_builder
163 .body::<reqwest::blocking::Body>(body.into())
164 .headers(self.request_components.headers.clone()));
165 }
166 Ok(request_builder)
167 }
168
169 #[inline]
171 pub fn build(mut self) -> GraphResult<reqwest::blocking::RequestBuilder> {
172 if let Some(err) = self.error {
173 return Err(err);
174 }
175 self.default_request_builder()
176 }
177
178 #[inline]
179 pub fn send(self) -> GraphResult<reqwest::blocking::Response> {
180 let request_builder = self.build()?;
181 request_builder.send().map_err(GraphFailure::from)
182 }
183}
184
185impl ODataQuery for BlockingRequestHandler {
186 fn append_query_pair<KV: AsRef<str>>(self, key: KV, value: KV) -> Self {
187 self.append_query_pair(key.as_ref(), value.as_ref())
188 }
189}
190
191impl AsRef<Url> for BlockingRequestHandler {
192 fn as_ref(&self) -> &Url {
193 self.request_components.as_ref()
194 }
195}
196
197impl AsMut<Url> for BlockingRequestHandler {
198 fn as_mut(&mut self) -> &mut Url {
199 self.request_components.as_mut()
200 }
201}
202
203pub struct BlockingPaging(BlockingRequestHandler);
204
205impl BlockingPaging {
206 fn http_response<T: DeserializeOwned>(
207 response: reqwest::blocking::Response,
208 ) -> GraphResult<(Option<String>, PagingResponse<T>)> {
209 let status = response.status();
210 let url = response.url().clone();
211 let headers = response.headers().clone();
212 let version = response.version();
213
214 let body: serde_json::Value = response.json()?;
215 let next_link = body.odata_next_link();
216 let json = body.clone();
217 let body_result: Result<T, ErrorMessage> = serde_json::from_value(body)
218 .map_err(|_| serde_json::from_value(json.clone()).unwrap_or(ErrorMessage::default()));
219
220 let mut builder = http::Response::builder()
221 .url(url)
222 .json(&json)
223 .status(http::StatusCode::from(&status))
224 .version(version);
225
226 for builder_header in builder.headers_mut().iter_mut() {
227 builder_header.extend(headers.clone());
228 }
229
230 Ok((next_link, builder.body(body_result)?))
231 }
232
233 pub fn json<T: DeserializeOwned>(mut self) -> GraphResult<VecDeque<PagingResponse<T>>> {
256 if let Some(err) = self.0.error {
257 return Err(err);
258 }
259
260 let request = self.0.default_request_builder()?;
261 let response = request.send()?;
262
263 let (next, http_response) = BlockingPaging::http_response(response)?;
264 let mut next_link = next;
265 let mut vec = VecDeque::new();
266 vec.push_back(http_response);
267
268 let client = self.0.inner.inner.clone();
269 let access_token = self.0.inner.client_application.get_token_silent()?;
270 while let Some(next) = next_link {
271 let response = client
272 .get(next)
273 .bearer_auth(access_token.as_str())
274 .send()
275 .map_err(GraphFailure::from)?;
276
277 let (next, http_response) = BlockingPaging::http_response(response)?;
278
279 next_link = next;
280 vec.push_back(http_response);
281 }
282
283 Ok(vec)
284 }
285
286 fn send_channel_request<T: DeserializeOwned>(
287 client: &reqwest::blocking::Client,
288 next: &str,
289 access_token: &str,
290 ) -> GraphResult<(Option<String>, PagingResponse<T>)> {
291 let response = client
292 .get(next)
293 .bearer_auth(access_token)
294 .send()
295 .map_err(GraphFailure::from)?;
296
297 BlockingPaging::http_response(response)
298 }
299
300 pub fn channel<T: DeserializeOwned + Send + 'static>(
301 mut self,
302 ) -> GraphResult<std::sync::mpsc::Receiver<Option<PagingResult<T>>>> {
303 let (sender, receiver) = std::sync::mpsc::channel();
304 let request = self.0.default_request_builder()?;
305 let response = request.send()?;
306
307 let (next, http_response) = BlockingPaging::http_response(response)?;
308 let mut next_link = next;
309 sender.send(Some(Ok(http_response))).unwrap();
310
311 let client = self.0.inner.inner.clone();
312 let access_token = self.0.inner.client_application.get_token_silent()?;
313
314 std::thread::spawn(move || {
315 while let Some(next) = next_link.as_ref() {
316 let result = BlockingPaging::send_channel_request(
317 &client,
318 next.as_str(),
319 access_token.as_str(),
320 );
321 if let Ok((next_option, http_response)) = result {
322 next_link = next_option;
323 sender.send(Some(Ok(http_response))).unwrap();
324 } else if let Err(err) = result {
325 sender.send(Some(Err(err))).unwrap();
326 break;
327 }
328 }
329 sender.send(None).unwrap();
330 });
331
332 Ok(receiver)
333 }
334}