graph_http/blocking/
blocking_request_handler.rs

1use crate::blocking::blocking_client::BlockingClient;
2use crate::internal::*;
3use graph_error::{ErrorMessage, GraphFailure, GraphResult};
4use http::header::CONTENT_TYPE;
5use http::{HeaderMap, HeaderName, HeaderValue};
6use serde::de::DeserializeOwned;
7use std::collections::VecDeque;
8use url::Url;
9
10#[derive(Default)]
11pub struct BlockingRequestHandler {
12    pub(crate) inner: BlockingClient,
13    pub(crate) request_components: RequestComponents,
14    pub(crate) error: Option<GraphFailure>,
15    pub(crate) body: Option<BodyRead>,
16}
17
18impl BlockingRequestHandler {
19    pub fn new(
20        inner: BlockingClient,
21        mut request_components: RequestComponents,
22        err: Option<GraphFailure>,
23        body: Option<BodyRead>,
24    ) -> BlockingRequestHandler {
25        request_components.headers.extend(inner.headers.clone());
26
27        let mut error = None;
28        if let Some(err) = err {
29            let message = err.to_string();
30            error = Some(GraphFailure::PreFlightError {
31                url: Some(request_components.url.clone()),
32                headers: Some(request_components.headers.clone()),
33                error: Some(Box::new(err)),
34                message,
35            });
36        }
37
38        BlockingRequestHandler {
39            inner,
40            request_components,
41            error,
42            body,
43        }
44    }
45
46    /// Returns true if any errors occurred prior to sending the request.
47    ///
48    /// # Example
49    /// ```rust,ignore
50    /// let client = Graph::new("ACCESS_TOKEN");
51    /// let request_handler = client.groups().list_group();
52    /// println!("{:#?}", request_handler.is_err());
53    /// ```
54    pub fn is_err(&self) -> bool {
55        self.error.is_some()
56    }
57
58    /// Returns any error wrapped in an Option that occurred prior to sending a request
59    ///
60    /// # Example
61    /// ```rust,ignore
62    /// let client = Graph::new("ACCESS_TOKEN");
63    /// let request_handler = client.groups().list_group();
64    /// println!("{:#?}", request_handler.err());
65    /// ```
66    pub fn err(&self) -> Option<&GraphFailure> {
67        self.error.as_ref()
68    }
69
70    #[inline]
71    pub fn url(&self) -> Url {
72        self.request_components.url.clone()
73    }
74
75    #[inline]
76    pub fn query<T: serde::Serialize + ?Sized>(mut self, query: &T) -> Self {
77        if let Err(err) = self.request_components.query(query) {
78            if self.error.is_none() {
79                self.error = Some(err);
80            }
81        }
82
83        if let Some("") = self.request_components.url.query() {
84            self.request_components.url.set_query(None);
85        }
86        self
87    }
88
89    #[inline]
90    pub fn append_query_pair<KV: AsRef<str>>(mut self, key: KV, value: KV) -> Self {
91        self.request_components
92            .url
93            .query_pairs_mut()
94            .append_pair(key.as_ref(), value.as_ref());
95        self
96    }
97
98    #[inline]
99    pub fn extend_path<I: AsRef<str>>(mut self, path: &[I]) -> Self {
100        if let Ok(mut p) = self.request_components.url.path_segments_mut() {
101            p.extend(path);
102        }
103        self
104    }
105
106    /// Insert a header for the request.
107    #[inline]
108    pub fn header<K: Into<HeaderName>, V: Into<HeaderValue>>(
109        mut self,
110        header_name: K,
111        header_value: V,
112    ) -> Self {
113        self.request_components
114            .headers
115            .insert(header_name.into(), header_value.into());
116        self
117    }
118
119    /// Set the headers for the request using reqwest::HeaderMap
120    #[inline]
121    pub fn headers(mut self, header_map: HeaderMap) -> Self {
122        self.request_components.headers.extend(header_map);
123        self
124    }
125
126    /// Get a mutable reference to the headers.
127    #[inline]
128    pub fn headers_mut(&mut self) -> &mut HeaderMap {
129        self.request_components.as_mut()
130    }
131
132    pub fn paging(self) -> BlockingPaging {
133        BlockingPaging(self)
134    }
135
136    #[inline]
137    fn default_request_builder(&mut self) -> GraphResult<reqwest::blocking::RequestBuilder> {
138        let access_token = self.inner.client_application.get_token_silent()?;
139
140        let request_builder = self
141            .inner
142            .inner
143            .request(
144                self.request_components.method.clone(),
145                self.request_components.url.clone(),
146            )
147            .bearer_auth(access_token.as_str())
148            .headers(self.request_components.headers.clone());
149
150        if let Some(body) = self.body.take() {
151            if body.has_byte_buf() {
152                self.request_components
153                    .headers
154                    .entry(CONTENT_TYPE)
155                    .or_insert(HeaderValue::from_static("application/octet-stream"));
156            } else if body.has_string_buf() {
157                self.request_components
158                    .headers
159                    .entry(CONTENT_TYPE)
160                    .or_insert(HeaderValue::from_static("application/json"));
161            }
162            return Ok(request_builder
163                .body::<reqwest::blocking::Body>(body.into())
164                .headers(self.request_components.headers.clone()));
165        }
166        Ok(request_builder)
167    }
168
169    /// Builds the request and returns a [`reqwest::blocking::RequestBuilder`].
170    #[inline]
171    pub fn build(mut self) -> GraphResult<reqwest::blocking::RequestBuilder> {
172        if let Some(err) = self.error {
173            return Err(err);
174        }
175        self.default_request_builder()
176    }
177
178    #[inline]
179    pub fn send(self) -> GraphResult<reqwest::blocking::Response> {
180        let request_builder = self.build()?;
181        request_builder.send().map_err(GraphFailure::from)
182    }
183}
184
185impl ODataQuery for BlockingRequestHandler {
186    fn append_query_pair<KV: AsRef<str>>(self, key: KV, value: KV) -> Self {
187        self.append_query_pair(key.as_ref(), value.as_ref())
188    }
189}
190
191impl AsRef<Url> for BlockingRequestHandler {
192    fn as_ref(&self) -> &Url {
193        self.request_components.as_ref()
194    }
195}
196
197impl AsMut<Url> for BlockingRequestHandler {
198    fn as_mut(&mut self) -> &mut Url {
199        self.request_components.as_mut()
200    }
201}
202
203pub struct BlockingPaging(BlockingRequestHandler);
204
205impl BlockingPaging {
206    fn http_response<T: DeserializeOwned>(
207        response: reqwest::blocking::Response,
208    ) -> GraphResult<(Option<String>, PagingResponse<T>)> {
209        let status = response.status();
210        let url = response.url().clone();
211        let headers = response.headers().clone();
212        let version = response.version();
213
214        let body: serde_json::Value = response.json()?;
215        let next_link = body.odata_next_link();
216        let json = body.clone();
217        let body_result: Result<T, ErrorMessage> = serde_json::from_value(body)
218            .map_err(|_| serde_json::from_value(json.clone()).unwrap_or(ErrorMessage::default()));
219
220        let mut builder = http::Response::builder()
221            .url(url)
222            .json(&json)
223            .status(http::StatusCode::from(&status))
224            .version(version);
225
226        for builder_header in builder.headers_mut().iter_mut() {
227            builder_header.extend(headers.clone());
228        }
229
230        Ok((next_link, builder.body(body_result)?))
231    }
232
233    /// Returns all next links as [`VecDeque<http::Response<T>>`]. This method may
234    /// cause significant delay in returning when there is a high volume of next links.
235    ///
236    /// This method is mainly provided for convenience in cases where the caller is sure
237    /// the requests will return successful without issue or where the caller is ok with
238    /// a return delay or does not care if errors occur. It is not recommended to use this
239    /// method in production environments.
240    ///
241    ///
242    /// # Example
243    /// ```rust,ignore
244    /// let response = client
245    ///     .users()
246    ///     .delta()
247    ///     .into_blocking()
248    ///     .paging()
249    ///     .json::<serde_json::Value>()
250    ///     .unwrap();
251    ///
252    /// println!("{response:#?}");
253    /// println!("{:#?}", response.body());
254    /// ```
255    pub fn json<T: DeserializeOwned>(mut self) -> GraphResult<VecDeque<PagingResponse<T>>> {
256        if let Some(err) = self.0.error {
257            return Err(err);
258        }
259
260        let request = self.0.default_request_builder()?;
261        let response = request.send()?;
262
263        let (next, http_response) = BlockingPaging::http_response(response)?;
264        let mut next_link = next;
265        let mut vec = VecDeque::new();
266        vec.push_back(http_response);
267
268        let client = self.0.inner.inner.clone();
269        let access_token = self.0.inner.client_application.get_token_silent()?;
270        while let Some(next) = next_link {
271            let response = client
272                .get(next)
273                .bearer_auth(access_token.as_str())
274                .send()
275                .map_err(GraphFailure::from)?;
276
277            let (next, http_response) = BlockingPaging::http_response(response)?;
278
279            next_link = next;
280            vec.push_back(http_response);
281        }
282
283        Ok(vec)
284    }
285
286    fn send_channel_request<T: DeserializeOwned>(
287        client: &reqwest::blocking::Client,
288        next: &str,
289        access_token: &str,
290    ) -> GraphResult<(Option<String>, PagingResponse<T>)> {
291        let response = client
292            .get(next)
293            .bearer_auth(access_token)
294            .send()
295            .map_err(GraphFailure::from)?;
296
297        BlockingPaging::http_response(response)
298    }
299
300    pub fn channel<T: DeserializeOwned + Send + 'static>(
301        mut self,
302    ) -> GraphResult<std::sync::mpsc::Receiver<Option<PagingResult<T>>>> {
303        let (sender, receiver) = std::sync::mpsc::channel();
304        let request = self.0.default_request_builder()?;
305        let response = request.send()?;
306
307        let (next, http_response) = BlockingPaging::http_response(response)?;
308        let mut next_link = next;
309        sender.send(Some(Ok(http_response))).unwrap();
310
311        let client = self.0.inner.inner.clone();
312        let access_token = self.0.inner.client_application.get_token_silent()?;
313
314        std::thread::spawn(move || {
315            while let Some(next) = next_link.as_ref() {
316                let result = BlockingPaging::send_channel_request(
317                    &client,
318                    next.as_str(),
319                    access_token.as_str(),
320                );
321                if let Ok((next_option, http_response)) = result {
322                    next_link = next_option;
323                    sender.send(Some(Ok(http_response))).unwrap();
324                } else if let Err(err) = result {
325                    sender.send(Some(Err(err))).unwrap();
326                    break;
327                }
328            }
329            sender.send(None).unwrap();
330        });
331
332        Ok(receiver)
333    }
334}