firebase_rs_sdk/storage/
upload.rs

1use std::cmp;
2
3use crate::storage::error::{internal_error, invalid_argument, StorageError, StorageResult};
4use crate::storage::metadata::serde::ObjectMetadata;
5use crate::storage::reference::StorageReference;
6use crate::storage::request::{
7    continue_resumable_upload_request, create_resumable_upload_request, multipart_upload_request,
8    RESUMABLE_UPLOAD_CHUNK_SIZE,
9};
10use crate::storage::UploadMetadata;
11
12const MAX_RESUMABLE_CHUNK_SIZE: usize = 32 * 1024 * 1024;
13
14/// Represents the execution state of an [`UploadTask`].
15#[derive(Clone, Copy, Debug, PartialEq, Eq)]
16pub enum UploadTaskState {
17    Pending,
18    Running,
19    Completed,
20    Error,
21    Canceled,
22}
23
24/// Progress information emitted while uploading large blobs.
25#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26pub struct UploadProgress {
27    pub bytes_transferred: u64,
28    pub total_bytes: u64,
29}
30
31impl UploadProgress {
32    pub fn new(bytes_transferred: u64, total_bytes: u64) -> Self {
33        Self {
34            bytes_transferred,
35            total_bytes,
36        }
37    }
38}
39
40/// Stateful helper that mirrors the Firebase Web SDK's resumable upload behaviour.
41///
42/// A task is created via [`StorageReference::upload_bytes_resumable`](crate::storage::StorageReference::upload_bytes_resumable)
43/// and can then be polled chunk-by-chunk (`upload_next`) or allowed to run to completion (`run_to_completion`).
44/// Small payloads are uploaded with a single multipart request, whereas larger blobs utilise the resumable REST API.
45pub struct UploadTask {
46    reference: StorageReference,
47    data: Vec<u8>,
48    metadata: Option<UploadMetadata>,
49    total_bytes: u64,
50    transferred: u64,
51    resumable: bool,
52    upload_url: Option<String>,
53    state: UploadTaskState,
54    last_error: Option<StorageError>,
55    result_metadata: Option<ObjectMetadata>,
56    chunk_multiplier: usize,
57}
58
59impl UploadTask {
60    pub(crate) fn new(
61        reference: StorageReference,
62        data: Vec<u8>,
63        metadata: Option<UploadMetadata>,
64    ) -> Self {
65        let total_bytes = data.len() as u64;
66        let resumable = total_bytes as usize > RESUMABLE_UPLOAD_CHUNK_SIZE;
67        Self {
68            reference,
69            data,
70            metadata,
71            total_bytes,
72            transferred: 0,
73            resumable,
74            upload_url: None,
75            state: UploadTaskState::Pending,
76            last_error: None,
77            result_metadata: None,
78            chunk_multiplier: 1,
79        }
80    }
81
82    /// Returns the total number of bytes that will be uploaded.
83    pub fn total_bytes(&self) -> u64 {
84        self.total_bytes
85    }
86
87    /// Returns the number of bytes that have been successfully uploaded so far.
88    pub fn bytes_transferred(&self) -> u64 {
89        self.transferred
90    }
91
92    /// Current task state.
93    pub fn state(&self) -> UploadTaskState {
94        self.state
95    }
96
97    /// Last error reported by the task, if any.
98    pub fn last_error(&self) -> Option<&StorageError> {
99        self.last_error.as_ref()
100    }
101
102    /// Resulting object metadata after a successful upload.
103    pub fn metadata(&self) -> Option<&ObjectMetadata> {
104        self.result_metadata.as_ref()
105    }
106
107    /// The resumable session URL, if one has been established.
108    pub fn upload_session_url(&self) -> Option<&str> {
109        self.upload_url.as_deref()
110    }
111
112    /// Uploads the next chunk and invokes the provided progress callback.
113    ///
114    /// Returns `Ok(Some(metadata))` when the upload finishes and the remote metadata is available.
115    pub async fn upload_next_with_progress<F>(
116        &mut self,
117        mut progress: F,
118    ) -> StorageResult<Option<ObjectMetadata>>
119    where
120        F: FnMut(UploadProgress),
121    {
122        match self.state {
123            UploadTaskState::Completed => {
124                return Ok(self.result_metadata.clone());
125            }
126            UploadTaskState::Error => {
127                return Err(self
128                    .last_error
129                    .clone()
130                    .unwrap_or_else(|| internal_error("upload task failed")));
131            }
132            UploadTaskState::Canceled => {
133                return Err(invalid_argument("upload task was canceled"));
134            }
135            _ => {}
136        }
137
138        if !self.resumable {
139            return self.upload_multipart(progress).await;
140        }
141
142        self.ensure_resumable_session().await?;
143        self.state = UploadTaskState::Running;
144
145        let storage = self.reference.storage();
146        let upload_url = self
147            .upload_url
148            .clone()
149            .ok_or_else(|| internal_error("resumable session url missing"))?;
150        let start_offset = self.transferred;
151        let chunk_size = self.current_chunk_size() as u64;
152        let end_offset = cmp::min(self.total_bytes, start_offset + chunk_size);
153        let finalize = end_offset == self.total_bytes;
154        let chunk = self
155            .data
156            .get(start_offset as usize..end_offset as usize)
157            .map(|slice| slice.to_vec())
158            .unwrap_or_default();
159
160        let request = continue_resumable_upload_request(
161            &storage,
162            self.reference.location(),
163            &upload_url,
164            start_offset,
165            self.total_bytes,
166            chunk,
167            finalize,
168        );
169        let status = match storage.run_upload_request(request).await {
170            Ok(status) => status,
171            Err(err) => {
172                self.reset_multiplier();
173                return self.fail(err);
174            }
175        };
176
177        self.transferred = status.current;
178        progress(UploadProgress::new(self.transferred, self.total_bytes));
179
180        if status.finalized {
181            let metadata = status
182                .metadata
183                .ok_or_else(|| internal_error("resumable upload completed without metadata"))?;
184            self.state = UploadTaskState::Completed;
185            self.result_metadata = Some(metadata.clone());
186            Ok(Some(metadata))
187        } else {
188            self.bump_multiplier();
189            Ok(None)
190        }
191    }
192
193    /// Uploads the next chunk without emitting progress callbacks.
194    pub async fn upload_next(&mut self) -> StorageResult<Option<ObjectMetadata>> {
195        self.upload_next_with_progress(|_| {}).await
196    }
197
198    /// Runs the task to completion while notifying `progress` for each chunk.
199    pub async fn run_to_completion_with_progress<F>(
200        mut self,
201        mut progress: F,
202    ) -> StorageResult<ObjectMetadata>
203    where
204        F: FnMut(UploadProgress),
205    {
206        loop {
207            match self.upload_next_with_progress(&mut progress).await? {
208                Some(metadata) => return Ok(metadata),
209                None => continue,
210            }
211        }
212    }
213
214    /// Runs the task to completion without progress callbacks.
215    pub async fn run_to_completion(self) -> StorageResult<ObjectMetadata> {
216        self.run_to_completion_with_progress(|_| {}).await
217    }
218
219    async fn ensure_resumable_session(&mut self) -> StorageResult<()> {
220        if !self.resumable || self.upload_url.is_some() {
221            return Ok(());
222        }
223        let storage = self.reference.storage();
224        let request = create_resumable_upload_request(
225            &storage,
226            self.reference.location(),
227            self.metadata.clone(),
228            self.total_bytes,
229        );
230        let url = storage.run_upload_request(request).await?;
231        self.upload_url = Some(url);
232        Ok(())
233    }
234
235    async fn upload_multipart<F>(
236        &mut self,
237        mut progress: F,
238    ) -> StorageResult<Option<ObjectMetadata>>
239    where
240        F: FnMut(UploadProgress),
241    {
242        if self.state == UploadTaskState::Completed {
243            return Ok(self.result_metadata.clone());
244        }
245
246        self.state = UploadTaskState::Running;
247        let storage = self.reference.storage();
248        let request = multipart_upload_request(
249            &storage,
250            self.reference.location(),
251            self.data.clone(),
252            self.metadata.clone(),
253        );
254
255        match storage.run_upload_request(request).await {
256            Ok(metadata) => {
257                self.transferred = self.total_bytes;
258                self.state = UploadTaskState::Completed;
259                self.result_metadata = Some(metadata.clone());
260                progress(UploadProgress::new(self.transferred, self.total_bytes));
261                Ok(Some(metadata))
262            }
263            Err(err) => self.fail(err),
264        }
265    }
266
267    fn current_chunk_size(&self) -> usize {
268        cmp::min(
269            RESUMABLE_UPLOAD_CHUNK_SIZE * self.chunk_multiplier,
270            MAX_RESUMABLE_CHUNK_SIZE,
271        )
272    }
273
274    fn bump_multiplier(&mut self) {
275        let next = self.chunk_multiplier * 2;
276        if next * RESUMABLE_UPLOAD_CHUNK_SIZE <= MAX_RESUMABLE_CHUNK_SIZE {
277            self.chunk_multiplier = next;
278        }
279    }
280
281    fn reset_multiplier(&mut self) {
282        self.chunk_multiplier = 1;
283    }
284
285    fn fail<T>(&mut self, error: StorageError) -> StorageResult<T> {
286        self.state = UploadTaskState::Error;
287        self.last_error = Some(error.clone());
288        Err(error)
289    }
290}