graph_http/upload_session/
upload_session_task.rs

1use crate::traits::AsyncIterator;
2use crate::upload_session::RangeIter;
3use async_stream::try_stream;
4use async_trait::async_trait;
5use futures::Stream;
6use graph_error::{GraphFailure, GraphResult};
7use reqwest::header::HeaderMap;
8use reqwest::RequestBuilder;
9use std::io::Read;
10use std::time::Duration;
11
12pub struct UploadSession {
13    url: reqwest::Url,
14    range_iter: RangeIter,
15    client: reqwest::Client,
16}
17
18impl UploadSession {
19    pub fn empty(url: reqwest::Url) -> UploadSession {
20        UploadSession {
21            url,
22            range_iter: Default::default(),
23            client: Default::default(),
24        }
25    }
26
27    pub(crate) fn new(url: reqwest::Url, range_iter: RangeIter) -> UploadSession {
28        UploadSession {
29            url,
30            range_iter,
31            client: Default::default(),
32        }
33    }
34
35    pub fn url(&self) -> &reqwest::Url {
36        &self.url
37    }
38
39    fn map_request_builder(
40        &self,
41        components: Vec<(HeaderMap, reqwest::Body)>,
42    ) -> Vec<RequestBuilder> {
43        components
44            .into_iter()
45            .map(|(header_map, body)| {
46                self.client
47                    .put(self.url.clone())
48                    .headers(header_map)
49                    .body(body)
50            })
51            .collect()
52    }
53
54    async fn send(
55        &self,
56        header_map: HeaderMap,
57        body: reqwest::Body,
58    ) -> GraphResult<reqwest::Response> {
59        self.client
60            .put(self.url.clone())
61            .headers(header_map)
62            .body(body)
63            .send()
64            .await
65            .map_err(GraphFailure::from)
66    }
67
68    pub fn status(&self) -> RequestBuilder {
69        self.client.get(self.url.clone())
70    }
71
72    pub fn cancel(&self) -> RequestBuilder {
73        self.client.delete(self.url.clone())
74    }
75
76    pub fn from_reader<U: AsRef<str>, R: Read>(
77        upload_url: U,
78        reader: R,
79    ) -> GraphResult<UploadSession> {
80        Ok(UploadSession {
81            url: reqwest::Url::parse(upload_url.as_ref())?,
82            range_iter: RangeIter::from_reader(reader)?,
83            client: Default::default(),
84        })
85    }
86
87    fn try_stream(&mut self) -> impl Stream<Item = GraphResult<reqwest::Response>> + '_ {
88        try_stream! {
89            let components = self.range_iter.map_all().ok_or(GraphFailure::invalid(
90                "Invalid Headers (internal error, please report)",
91            ))?;
92            let request_builders = self.map_request_builder(components);
93
94            for request_builder in request_builders {
95                yield request_builder.send()
96                    .await
97                    .map_err(GraphFailure::from)?
98            }
99        }
100    }
101
102    /// Stream upload session responses.
103    /// Each stream.next().await returns a [`Option<GraphResult<reqwest::Response>>`].
104    ///
105    /// No pinning is required. The stream is pinned before being returned to the caller.
106    ///
107    /// # Example
108    /// ```rust,ignore
109    /// use graph_rs_sdk::*;
110    /// use futures::stream::StreamExt;
111    /// use std::fs::OpenOptions;
112    ///
113    /// static ACCESS_TOKEN: &str = "ACCESS_TOKEN";
114    ///
115    /// // Path to upload file to in OneDrive.
116    /// static ONEDRIVE_PATH: &str = ":/file.txt:";
117    ///
118    /// static LOCAL_FILE_PATH: &str = "./file.txt";
119    ///
120    /// let client = Graph::new("ACCESS_TOKEN");
121    ///
122    ///  let response = client
123    ///     .me()
124    ///     .drive()
125    ///     .item_by_path(ONEDRIVE_PATH)
126    ///     .create_upload_session(&serde_json::json!({
127    ///         "@microsoft.graph.conflictBehavior": Some("fail".to_string())
128    ///     }))
129    ///     .send()
130    ///     .await?;
131    ///
132    ///     assert!(response.status().is_success());
133    ///
134    ///  let mut file = OpenOptions::new()
135    ///     .read(true)
136    ///     .open(LOCAL_FILE_PATH)?;
137    ///
138    ///  let mut upload_session = response.into_upload_session(file).await?;
139    ///  let mut stream = upload_session.stream()?;
140    ///
141    ///  while let Some(result) = stream.next().await {
142    ///     match result {
143    ///         Ok(response) => {
144    ///             println!("{response:#?}");
145    ///
146    ///             let body: serde_json::Value = response.json().await?;
147    ///             println!("{body:#?}");
148    ///         }
149    ///         Err(err) => panic!("Error on upload session {err:#?}")
150    ///     }
151    ///  }
152    /// ```
153    pub fn stream(
154        &mut self,
155    ) -> GraphResult<impl Stream<Item = GraphResult<reqwest::Response>> + '_> {
156        Ok(Box::pin(self.try_stream()))
157    }
158
159    pub fn channel(
160        &mut self,
161    ) -> GraphResult<tokio::sync::mpsc::Receiver<reqwest::Result<reqwest::Response>>> {
162        self.channel_buffer_timeout(self.range_iter.len() + 1, Duration::from_secs(30))
163    }
164
165    pub fn channel_timeout(
166        &mut self,
167        timeout: Duration,
168    ) -> GraphResult<tokio::sync::mpsc::Receiver<reqwest::Result<reqwest::Response>>> {
169        self.channel_buffer_timeout(self.range_iter.len() + 1, timeout)
170    }
171
172    pub fn channel_buffer_timeout(
173        &mut self,
174        buffer: usize,
175        timeout: Duration,
176    ) -> GraphResult<tokio::sync::mpsc::Receiver<reqwest::Result<reqwest::Response>>> {
177        let (sender, receiver) = tokio::sync::mpsc::channel(buffer);
178
179        let components = self.range_iter.map_all().ok_or(GraphFailure::invalid(
180            "Invalid Headers (internal error, please report)",
181        ))?;
182        let request_builders = self.map_request_builder(components);
183
184        tokio::spawn(async move {
185            for request_builder in request_builders {
186                let result = request_builder.send().await;
187                sender.send_timeout(result, timeout).await.unwrap();
188            }
189        });
190
191        Ok(receiver)
192    }
193}
194
195#[async_trait]
196impl AsyncIterator for UploadSession {
197    type Item = GraphResult<reqwest::Response>;
198
199    async fn next(&mut self) -> Option<Self::Item> {
200        let (header_map, body) = self.range_iter.pop_front()?;
201        Some(self.send(header_map, body).await)
202    }
203}