graph_http/upload_session/
upload_session_task.rs1use crate::traits::AsyncIterator;
2use crate::upload_session::RangeIter;
3use async_stream::try_stream;
4use async_trait::async_trait;
5use futures::Stream;
6use graph_error::{GraphFailure, GraphResult};
7use reqwest::header::HeaderMap;
8use reqwest::RequestBuilder;
9use std::io::Read;
10use std::time::Duration;
11
12pub struct UploadSession {
13 url: reqwest::Url,
14 range_iter: RangeIter,
15 client: reqwest::Client,
16}
17
18impl UploadSession {
19 pub fn empty(url: reqwest::Url) -> UploadSession {
20 UploadSession {
21 url,
22 range_iter: Default::default(),
23 client: Default::default(),
24 }
25 }
26
27 pub(crate) fn new(url: reqwest::Url, range_iter: RangeIter) -> UploadSession {
28 UploadSession {
29 url,
30 range_iter,
31 client: Default::default(),
32 }
33 }
34
35 pub fn url(&self) -> &reqwest::Url {
36 &self.url
37 }
38
39 fn map_request_builder(
40 &self,
41 components: Vec<(HeaderMap, reqwest::Body)>,
42 ) -> Vec<RequestBuilder> {
43 components
44 .into_iter()
45 .map(|(header_map, body)| {
46 self.client
47 .put(self.url.clone())
48 .headers(header_map)
49 .body(body)
50 })
51 .collect()
52 }
53
54 async fn send(
55 &self,
56 header_map: HeaderMap,
57 body: reqwest::Body,
58 ) -> GraphResult<reqwest::Response> {
59 self.client
60 .put(self.url.clone())
61 .headers(header_map)
62 .body(body)
63 .send()
64 .await
65 .map_err(GraphFailure::from)
66 }
67
68 pub fn status(&self) -> RequestBuilder {
69 self.client.get(self.url.clone())
70 }
71
72 pub fn cancel(&self) -> RequestBuilder {
73 self.client.delete(self.url.clone())
74 }
75
76 pub fn from_reader<U: AsRef<str>, R: Read>(
77 upload_url: U,
78 reader: R,
79 ) -> GraphResult<UploadSession> {
80 Ok(UploadSession {
81 url: reqwest::Url::parse(upload_url.as_ref())?,
82 range_iter: RangeIter::from_reader(reader)?,
83 client: Default::default(),
84 })
85 }
86
87 fn try_stream(&mut self) -> impl Stream<Item = GraphResult<reqwest::Response>> + '_ {
88 try_stream! {
89 let components = self.range_iter.map_all().ok_or(GraphFailure::invalid(
90 "Invalid Headers (internal error, please report)",
91 ))?;
92 let request_builders = self.map_request_builder(components);
93
94 for request_builder in request_builders {
95 yield request_builder.send()
96 .await
97 .map_err(GraphFailure::from)?
98 }
99 }
100 }
101
102 pub fn stream(
154 &mut self,
155 ) -> GraphResult<impl Stream<Item = GraphResult<reqwest::Response>> + '_> {
156 Ok(Box::pin(self.try_stream()))
157 }
158
159 pub fn channel(
160 &mut self,
161 ) -> GraphResult<tokio::sync::mpsc::Receiver<reqwest::Result<reqwest::Response>>> {
162 self.channel_buffer_timeout(self.range_iter.len() + 1, Duration::from_secs(30))
163 }
164
165 pub fn channel_timeout(
166 &mut self,
167 timeout: Duration,
168 ) -> GraphResult<tokio::sync::mpsc::Receiver<reqwest::Result<reqwest::Response>>> {
169 self.channel_buffer_timeout(self.range_iter.len() + 1, timeout)
170 }
171
172 pub fn channel_buffer_timeout(
173 &mut self,
174 buffer: usize,
175 timeout: Duration,
176 ) -> GraphResult<tokio::sync::mpsc::Receiver<reqwest::Result<reqwest::Response>>> {
177 let (sender, receiver) = tokio::sync::mpsc::channel(buffer);
178
179 let components = self.range_iter.map_all().ok_or(GraphFailure::invalid(
180 "Invalid Headers (internal error, please report)",
181 ))?;
182 let request_builders = self.map_request_builder(components);
183
184 tokio::spawn(async move {
185 for request_builder in request_builders {
186 let result = request_builder.send().await;
187 sender.send_timeout(result, timeout).await.unwrap();
188 }
189 });
190
191 Ok(receiver)
192 }
193}
194
195#[async_trait]
196impl AsyncIterator for UploadSession {
197 type Item = GraphResult<reqwest::Response>;
198
199 async fn next(&mut self) -> Option<Self::Item> {
200 let (header_map, body) = self.range_iter.pop_front()?;
201 Some(self.send(header_map, body).await)
202 }
203}