firebase_rs_sdk/storage/
upload.rs1use std::cmp;
2
3use crate::storage::error::{internal_error, invalid_argument, StorageError, StorageResult};
4use crate::storage::metadata::serde::ObjectMetadata;
5use crate::storage::reference::StorageReference;
6use crate::storage::request::{
7 continue_resumable_upload_request, create_resumable_upload_request, multipart_upload_request,
8 RESUMABLE_UPLOAD_CHUNK_SIZE,
9};
10use crate::storage::UploadMetadata;
11
12const MAX_RESUMABLE_CHUNK_SIZE: usize = 32 * 1024 * 1024;
13
14#[derive(Clone, Copy, Debug, PartialEq, Eq)]
16pub enum UploadTaskState {
17 Pending,
18 Running,
19 Completed,
20 Error,
21 Canceled,
22}
23
24#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26pub struct UploadProgress {
27 pub bytes_transferred: u64,
28 pub total_bytes: u64,
29}
30
31impl UploadProgress {
32 pub fn new(bytes_transferred: u64, total_bytes: u64) -> Self {
33 Self {
34 bytes_transferred,
35 total_bytes,
36 }
37 }
38}
39
40pub struct UploadTask {
46 reference: StorageReference,
47 data: Vec<u8>,
48 metadata: Option<UploadMetadata>,
49 total_bytes: u64,
50 transferred: u64,
51 resumable: bool,
52 upload_url: Option<String>,
53 state: UploadTaskState,
54 last_error: Option<StorageError>,
55 result_metadata: Option<ObjectMetadata>,
56 chunk_multiplier: usize,
57}
58
59impl UploadTask {
60 pub(crate) fn new(
61 reference: StorageReference,
62 data: Vec<u8>,
63 metadata: Option<UploadMetadata>,
64 ) -> Self {
65 let total_bytes = data.len() as u64;
66 let resumable = total_bytes as usize > RESUMABLE_UPLOAD_CHUNK_SIZE;
67 Self {
68 reference,
69 data,
70 metadata,
71 total_bytes,
72 transferred: 0,
73 resumable,
74 upload_url: None,
75 state: UploadTaskState::Pending,
76 last_error: None,
77 result_metadata: None,
78 chunk_multiplier: 1,
79 }
80 }
81
82 pub fn total_bytes(&self) -> u64 {
84 self.total_bytes
85 }
86
87 pub fn bytes_transferred(&self) -> u64 {
89 self.transferred
90 }
91
92 pub fn state(&self) -> UploadTaskState {
94 self.state
95 }
96
97 pub fn last_error(&self) -> Option<&StorageError> {
99 self.last_error.as_ref()
100 }
101
102 pub fn metadata(&self) -> Option<&ObjectMetadata> {
104 self.result_metadata.as_ref()
105 }
106
107 pub fn upload_session_url(&self) -> Option<&str> {
109 self.upload_url.as_deref()
110 }
111
112 pub async fn upload_next_with_progress<F>(
116 &mut self,
117 mut progress: F,
118 ) -> StorageResult<Option<ObjectMetadata>>
119 where
120 F: FnMut(UploadProgress),
121 {
122 match self.state {
123 UploadTaskState::Completed => {
124 return Ok(self.result_metadata.clone());
125 }
126 UploadTaskState::Error => {
127 return Err(self
128 .last_error
129 .clone()
130 .unwrap_or_else(|| internal_error("upload task failed")));
131 }
132 UploadTaskState::Canceled => {
133 return Err(invalid_argument("upload task was canceled"));
134 }
135 _ => {}
136 }
137
138 if !self.resumable {
139 return self.upload_multipart(progress).await;
140 }
141
142 self.ensure_resumable_session().await?;
143 self.state = UploadTaskState::Running;
144
145 let storage = self.reference.storage();
146 let upload_url = self
147 .upload_url
148 .clone()
149 .ok_or_else(|| internal_error("resumable session url missing"))?;
150 let start_offset = self.transferred;
151 let chunk_size = self.current_chunk_size() as u64;
152 let end_offset = cmp::min(self.total_bytes, start_offset + chunk_size);
153 let finalize = end_offset == self.total_bytes;
154 let chunk = self
155 .data
156 .get(start_offset as usize..end_offset as usize)
157 .map(|slice| slice.to_vec())
158 .unwrap_or_default();
159
160 let request = continue_resumable_upload_request(
161 &storage,
162 self.reference.location(),
163 &upload_url,
164 start_offset,
165 self.total_bytes,
166 chunk,
167 finalize,
168 );
169 let status = match storage.run_upload_request(request).await {
170 Ok(status) => status,
171 Err(err) => {
172 self.reset_multiplier();
173 return self.fail(err);
174 }
175 };
176
177 self.transferred = status.current;
178 progress(UploadProgress::new(self.transferred, self.total_bytes));
179
180 if status.finalized {
181 let metadata = status
182 .metadata
183 .ok_or_else(|| internal_error("resumable upload completed without metadata"))?;
184 self.state = UploadTaskState::Completed;
185 self.result_metadata = Some(metadata.clone());
186 Ok(Some(metadata))
187 } else {
188 self.bump_multiplier();
189 Ok(None)
190 }
191 }
192
193 pub async fn upload_next(&mut self) -> StorageResult<Option<ObjectMetadata>> {
195 self.upload_next_with_progress(|_| {}).await
196 }
197
198 pub async fn run_to_completion_with_progress<F>(
200 mut self,
201 mut progress: F,
202 ) -> StorageResult<ObjectMetadata>
203 where
204 F: FnMut(UploadProgress),
205 {
206 loop {
207 match self.upload_next_with_progress(&mut progress).await? {
208 Some(metadata) => return Ok(metadata),
209 None => continue,
210 }
211 }
212 }
213
214 pub async fn run_to_completion(self) -> StorageResult<ObjectMetadata> {
216 self.run_to_completion_with_progress(|_| {}).await
217 }
218
219 async fn ensure_resumable_session(&mut self) -> StorageResult<()> {
220 if !self.resumable || self.upload_url.is_some() {
221 return Ok(());
222 }
223 let storage = self.reference.storage();
224 let request = create_resumable_upload_request(
225 &storage,
226 self.reference.location(),
227 self.metadata.clone(),
228 self.total_bytes,
229 );
230 let url = storage.run_upload_request(request).await?;
231 self.upload_url = Some(url);
232 Ok(())
233 }
234
235 async fn upload_multipart<F>(
236 &mut self,
237 mut progress: F,
238 ) -> StorageResult<Option<ObjectMetadata>>
239 where
240 F: FnMut(UploadProgress),
241 {
242 if self.state == UploadTaskState::Completed {
243 return Ok(self.result_metadata.clone());
244 }
245
246 self.state = UploadTaskState::Running;
247 let storage = self.reference.storage();
248 let request = multipart_upload_request(
249 &storage,
250 self.reference.location(),
251 self.data.clone(),
252 self.metadata.clone(),
253 );
254
255 match storage.run_upload_request(request).await {
256 Ok(metadata) => {
257 self.transferred = self.total_bytes;
258 self.state = UploadTaskState::Completed;
259 self.result_metadata = Some(metadata.clone());
260 progress(UploadProgress::new(self.transferred, self.total_bytes));
261 Ok(Some(metadata))
262 }
263 Err(err) => self.fail(err),
264 }
265 }
266
267 fn current_chunk_size(&self) -> usize {
268 cmp::min(
269 RESUMABLE_UPLOAD_CHUNK_SIZE * self.chunk_multiplier,
270 MAX_RESUMABLE_CHUNK_SIZE,
271 )
272 }
273
274 fn bump_multiplier(&mut self) {
275 let next = self.chunk_multiplier * 2;
276 if next * RESUMABLE_UPLOAD_CHUNK_SIZE <= MAX_RESUMABLE_CHUNK_SIZE {
277 self.chunk_multiplier = next;
278 }
279 }
280
281 fn reset_multiplier(&mut self) {
282 self.chunk_multiplier = 1;
283 }
284
285 fn fail<T>(&mut self, error: StorageError) -> StorageResult<T> {
286 self.state = UploadTaskState::Error;
287 self.last_error = Some(error.clone());
288 Err(error)
289 }
290}