graph_http/blocking/
upload_session_blocking.rs

1use crate::upload_session::RangeIter;
2use graph_error::{GraphFailure, GraphResult};
3use reqwest::header::HeaderMap;
4use std::io::Read;
5use std::thread;
6
7pub struct UploadSessionBlocking {
8    url: reqwest::Url,
9    range_iter: RangeIter,
10    client: reqwest::blocking::Client,
11}
12
13impl UploadSessionBlocking {
14    pub fn empty(url: reqwest::Url) -> UploadSessionBlocking {
15        UploadSessionBlocking {
16            url,
17            range_iter: Default::default(),
18            client: reqwest::blocking::Client::new(),
19        }
20    }
21
22    pub(crate) fn new(url: reqwest::Url, range_iter: RangeIter) -> UploadSessionBlocking {
23        UploadSessionBlocking {
24            url,
25            range_iter,
26            client: reqwest::blocking::Client::new(),
27        }
28    }
29
30    pub fn url(&self) -> &reqwest::Url {
31        &self.url
32    }
33
34    fn map_request_builder(
35        &self,
36        components: Vec<(HeaderMap, reqwest::blocking::Body)>,
37    ) -> Vec<reqwest::blocking::RequestBuilder> {
38        components
39            .into_iter()
40            .map(|(header_map, body)| {
41                self.client
42                    .put(self.url.clone())
43                    .headers(header_map)
44                    .body(body)
45            })
46            .collect()
47    }
48
49    fn send(
50        &self,
51        header_map: HeaderMap,
52        body: reqwest::blocking::Body,
53    ) -> reqwest::Result<reqwest::blocking::Response> {
54        self.client
55            .put(self.url.clone())
56            .headers(header_map)
57            .body(body)
58            .send()
59    }
60
61    pub fn status(&self) -> reqwest::blocking::RequestBuilder {
62        self.client.get(self.url.clone())
63    }
64
65    pub fn cancel(&self) -> reqwest::blocking::RequestBuilder {
66        self.client.delete(self.url.clone())
67    }
68
69    pub fn from_reader<U: AsRef<str>, R: Read>(
70        upload_url: U,
71        reader: R,
72    ) -> GraphResult<UploadSessionBlocking> {
73        Ok(UploadSessionBlocking {
74            url: reqwest::Url::parse(upload_url.as_ref())?,
75            range_iter: RangeIter::from_reader(reader)?,
76            client: reqwest::blocking::Client::new(),
77        })
78    }
79
80    pub fn channel(
81        &mut self,
82    ) -> GraphResult<std::sync::mpsc::Receiver<reqwest::Result<reqwest::blocking::Response>>> {
83        self.channel_buffer(self.range_iter.len() + 1)
84    }
85
86    pub fn channel_buffer(
87        &mut self,
88        bound: usize,
89    ) -> GraphResult<std::sync::mpsc::Receiver<reqwest::Result<reqwest::blocking::Response>>> {
90        let components = self
91            .range_iter
92            .map_all_blocking()
93            .ok_or(GraphFailure::invalid(
94                "Invalid Headers (internal error, please report)",
95            ))?;
96        let request_builders = self.map_request_builder(components);
97        let (sender, receiver) = std::sync::mpsc::sync_channel(bound);
98
99        thread::spawn(move || {
100            for request_builder in request_builders {
101                let result = request_builder.send();
102                sender.send(result).unwrap();
103            }
104        });
105
106        Ok(receiver)
107    }
108}
109
110impl Iterator for UploadSessionBlocking {
111    type Item = reqwest::Result<reqwest::blocking::Response>;
112
113    fn next(&mut self) -> Option<Self::Item> {
114        let (header_map, body) = self.range_iter.pop_front_blocking()?;
115        Some(self.send(header_map, body))
116    }
117}