graph_http/blocking/
upload_session_blocking.rs1use crate::upload_session::RangeIter;
2use graph_error::{GraphFailure, GraphResult};
3use reqwest::header::HeaderMap;
4use std::io::Read;
5use std::thread;
6
7pub struct UploadSessionBlocking {
8 url: reqwest::Url,
9 range_iter: RangeIter,
10 client: reqwest::blocking::Client,
11}
12
13impl UploadSessionBlocking {
14 pub fn empty(url: reqwest::Url) -> UploadSessionBlocking {
15 UploadSessionBlocking {
16 url,
17 range_iter: Default::default(),
18 client: reqwest::blocking::Client::new(),
19 }
20 }
21
22 pub(crate) fn new(url: reqwest::Url, range_iter: RangeIter) -> UploadSessionBlocking {
23 UploadSessionBlocking {
24 url,
25 range_iter,
26 client: reqwest::blocking::Client::new(),
27 }
28 }
29
30 pub fn url(&self) -> &reqwest::Url {
31 &self.url
32 }
33
34 fn map_request_builder(
35 &self,
36 components: Vec<(HeaderMap, reqwest::blocking::Body)>,
37 ) -> Vec<reqwest::blocking::RequestBuilder> {
38 components
39 .into_iter()
40 .map(|(header_map, body)| {
41 self.client
42 .put(self.url.clone())
43 .headers(header_map)
44 .body(body)
45 })
46 .collect()
47 }
48
49 fn send(
50 &self,
51 header_map: HeaderMap,
52 body: reqwest::blocking::Body,
53 ) -> reqwest::Result<reqwest::blocking::Response> {
54 self.client
55 .put(self.url.clone())
56 .headers(header_map)
57 .body(body)
58 .send()
59 }
60
61 pub fn status(&self) -> reqwest::blocking::RequestBuilder {
62 self.client.get(self.url.clone())
63 }
64
65 pub fn cancel(&self) -> reqwest::blocking::RequestBuilder {
66 self.client.delete(self.url.clone())
67 }
68
69 pub fn from_reader<U: AsRef<str>, R: Read>(
70 upload_url: U,
71 reader: R,
72 ) -> GraphResult<UploadSessionBlocking> {
73 Ok(UploadSessionBlocking {
74 url: reqwest::Url::parse(upload_url.as_ref())?,
75 range_iter: RangeIter::from_reader(reader)?,
76 client: reqwest::blocking::Client::new(),
77 })
78 }
79
80 pub fn channel(
81 &mut self,
82 ) -> GraphResult<std::sync::mpsc::Receiver<reqwest::Result<reqwest::blocking::Response>>> {
83 self.channel_buffer(self.range_iter.len() + 1)
84 }
85
86 pub fn channel_buffer(
87 &mut self,
88 bound: usize,
89 ) -> GraphResult<std::sync::mpsc::Receiver<reqwest::Result<reqwest::blocking::Response>>> {
90 let components = self
91 .range_iter
92 .map_all_blocking()
93 .ok_or(GraphFailure::invalid(
94 "Invalid Headers (internal error, please report)",
95 ))?;
96 let request_builders = self.map_request_builder(components);
97 let (sender, receiver) = std::sync::mpsc::sync_channel(bound);
98
99 thread::spawn(move || {
100 for request_builder in request_builders {
101 let result = request_builder.send();
102 sender.send(result).unwrap();
103 }
104 });
105
106 Ok(receiver)
107 }
108}
109
110impl Iterator for UploadSessionBlocking {
111 type Item = reqwest::Result<reqwest::blocking::Response>;
112
113 fn next(&mut self) -> Option<Self::Item> {
114 let (header_map, body) = self.range_iter.pop_front_blocking()?;
115 Some(self.send(header_map, body))
116 }
117}