backblaze_b2_client/tasks/upload/
file_upload.rs

1use std::{
2    collections::HashMap,
3    convert::Infallible,
4    io::SeekFrom,
5    ops::Deref,
6    sync::{atomic::Ordering, Arc},
7    time::{Duration, Instant},
8};
9
10use async_stream::stream;
11use bytes::Bytes;
12use sha1_smol::Sha1;
13use tokio::{
14    io::{AsyncReadExt, AsyncSeekExt},
15    sync::{
16        mpsc::{self, Receiver, Sender},
17        Mutex, RwLock,
18    },
19    task::{AbortHandle, JoinHandle},
20    time::sleep,
21};
22
23use crate::{
24    definitions::{
25        bodies::{B2FinishLargeFileBody, B2StartLargeFileUploadBody},
26        headers::{B2UploadFileHeaders, B2UploadPartHeaders},
27        shared::B2File,
28    },
29    error::B2Error,
30    simple_client::B2SimpleClient,
31    tasks::upload::{large_file_sha1::LargeFileSha1, upload_buffer::UploadBuffer},
32    throttle::Throttle,
33    util::{write_lock_arc::WriteLockArc, B2Callback, IsValid, SizeUnit},
34};
35
36use crate::tasks::shared::{AsyncFileReader, FileNetworkStats, FileStatus};
37
38use super::{
39    error::FileUploadError, upload_details::UploadFileDetails, FileUploadOptions,
40    LargeFileLoadStrategy,
41};
42pub struct FileUpload {
43    id: u64,
44    client: Arc<B2SimpleClient>,
45    details: UploadFileDetails,
46    status: WriteLockArc<FileStatus>,
47    file: Arc<RwLock<dyn AsyncFileReader>>,
48    stats: Arc<FileNetworkStats>,
49    large_file_id: Arc<RwLock<Option<String>>>,
50    completion_callbacks: Arc<RwLock<Vec<B2Callback<()>>>>,
51    abort_channel: (WriteLockArc<Sender<()>>, WriteLockArc<Receiver<()>>),
52}
53
54impl FileUpload {
55    pub fn new<F: AsyncFileReader + 'static>(
56        file: F,
57        file_name: String,
58        bucket_id: String,
59        optional_info: Option<HashMap<String, String>>,
60        file_size: u64,
61        options: FileUploadOptions,
62        client: Arc<B2SimpleClient>,
63    ) -> Arc<Self> {
64        let (tx, rx) = mpsc::channel::<()>(1);
65
66        Arc::new(Self {
67            id: rand::random(),
68            client,
69            details: UploadFileDetails {
70                file_size,
71                file_name,
72                bucket_id,
73                optional_info,
74                options: Arc::new(options),
75            },
76            large_file_id: Arc::new(RwLock::new(None)),
77            status: WriteLockArc::new(FileStatus::Pending),
78            file: Arc::new(RwLock::new(file)),
79            stats: Arc::new(FileNetworkStats::new(file_size as f64)),
80            completion_callbacks: Arc::new(RwLock::new(vec![])),
81            abort_channel: (WriteLockArc::new(tx), WriteLockArc::new(rx)),
82        })
83    }
84
85    pub fn id(&self) -> u64 {
86        self.id
87    }
88
89    pub fn stats(&self) -> &FileNetworkStats {
90        &self.stats
91    }
92
93    pub fn status(&self) -> FileStatus {
94        (*self.status).clone()
95    }
96
97    /// Returns true when the file has finished or has been aborted.
98    pub fn has_stopped(&self) -> bool {
99        *self.status == FileStatus::Finished || *self.status == FileStatus::Aborted
100    }
101
102    /// Whether it was started or not, will only start if status is [`Pending`](FileStatus::Pending)
103    pub async fn start(&self) -> Result<B2File, FileUploadError> {
104        if *self.status != FileStatus::Pending {
105            return Err(FileUploadError::AlreadyStarted);
106        }
107
108        self.details.options.is_valid()?;
109
110        self.status.set(FileStatus::Working).await;
111
112        let retry_count = self.details.options.retry_strategy.count();
113        let mut curr_retry_count = 1;
114        let abort_receiver = self.abort_channel.1.clone();
115
116        let result = loop {
117            curr_retry_count += 1;
118
119            let result = match self.details.file_size {
120                size if size <= self.details.options.large_file_cutoff => {
121                    self.upload_small_file().await
122                }
123                _ => {
124                    let file_strat = match &self.details.options.file_load_strategy {
125                        LargeFileLoadStrategy::Constant(strat) => strat,
126                        LargeFileLoadStrategy::Dynamic(strat) => {
127                            &strat.get_load_strategy(self.details.file_size)
128                        }
129                    };
130
131                    file_strat.is_valid()?;
132
133                    self.upload_large_file().await
134                }
135            };
136
137            if *self.status == FileStatus::Aborted {
138                break Err(FileUploadError::Aborted);
139            }
140
141            if result.is_err() && curr_retry_count <= retry_count.get() {
142                let wait = self.details.options.retry_strategy.wait(curr_retry_count);
143                let mut receiver_lock = abort_receiver.lock_write().await;
144
145                let mut status = self.status.lock_write().await;
146                if *status == FileStatus::Working {
147                    *status = FileStatus::Retrying;
148                }
149                drop(status);
150
151                tokio::select! {
152                    _ = sleep(wait) => {},
153                    _ = receiver_lock.recv() => {
154                        break Err(FileUploadError::Aborted)
155                    }
156                };
157
158                continue;
159            }
160
161            break result;
162        };
163
164        let mut status = self.status.lock_write().await;
165        if *status == FileStatus::Working {
166            *status = FileStatus::Finished;
167        }
168        drop(status);
169
170        self.call_finish_callbacks().await;
171
172        if *self.status == FileStatus::Aborted {
173            return Err(FileUploadError::Aborted);
174        }
175
176        return result;
177    }
178
179    /// Will abort ongoing upload if status is [`Working`](FileStatus::Working) or [`Retrying`](FileStatus::Retrying), does nothing otherwise.
180    pub async fn abort(&self) {
181        // If its not working there's nothing to do
182        if *self.status != FileStatus::Working || *self.status != FileStatus::Retrying {
183            return;
184        }
185
186        self.status.set(FileStatus::Aborted).await;
187
188        let sender = &self.abort_channel.0;
189        sender.send(()).await.ok();
190
191        self.cancel_large_file().await;
192    }
193
194    pub async fn add_finish_callback(&self, callback: B2Callback<()>) {
195        let mut callbacks = self.completion_callbacks.write().await;
196        callbacks.push(callback);
197    }
198
199    async fn upload_large_file(&self) -> Result<B2File, FileUploadError> {
200        let file = self.file.clone();
201
202        let start_large_upload_body = B2StartLargeFileUploadBody::builder()
203            .bucket_id(self.details.bucket_id.clone())
204            .file_name(self.details.file_name.clone())
205            .content_type("b2/x-auto".into())
206            .file_info(self.details.optional_info.clone())
207            .build();
208
209        let start_large_upload_body = self
210            .details
211            .options
212            .options
213            .clone()
214            .apply_large_file_upload(start_large_upload_body);
215
216        let start_large_file_response = self
217            .client
218            .start_large_file(start_large_upload_body)
219            .await?;
220
221        let file_id = start_large_file_response.file_id;
222        let total_uploaded = self.stats.clone();
223
224        let mut large_file = self.large_file_id.write().await;
225        *large_file = Some(file_id.clone());
226        drop(large_file);
227
228        let file_strat = match &self.details.options.file_load_strategy {
229            LargeFileLoadStrategy::Constant(strat) => strat,
230            LargeFileLoadStrategy::Dynamic(strat) => {
231                &strat.get_load_strategy(self.details.file_size)
232            }
233        };
234
235        let mut parts: Vec<((u64, u64), u16)> = vec![];
236        let mut current_range_start: u16 = 0;
237
238        loop {
239            let start = file_strat.part_size * u64::from(current_range_start);
240            let end = file_strat.part_size * (u64::from(current_range_start) + 1);
241
242            current_range_start += 1;
243
244            if end >= self.details.file_size {
245                parts.push(((start, self.details.file_size), current_range_start));
246                break;
247            } else {
248                parts.push(((start, end), current_range_start));
249            }
250        }
251
252        let sha1s = Arc::new(LargeFileSha1::new(parts.len()));
253        let mut join_handles: Vec<JoinHandle<Result<(), FileUploadError>>> = vec![];
254        let abort_handles: Arc<RwLock<Vec<AbortHandle>>> = Arc::new(RwLock::new(vec![]));
255        self.start_timer().await;
256
257        let upload_throttle = Arc::new(
258            self.details
259                .options
260                .speed_throttle
261                .clone()
262                .map(|t| Mutex::new(t)),
263        );
264
265        let status = self.status.clone();
266
267        for chunk in parts.chunks(file_strat.chunk_size as usize) {
268            let task_chunk = chunk.to_owned();
269            let file_id = file_id.clone();
270            let sha1s = sha1s.clone();
271            let task_abort_handles = abort_handles.clone();
272            let total_uploaded = total_uploaded.clone();
273            let status = status.clone();
274
275            if *status == FileStatus::Aborted {
276                break;
277            }
278
279            let upload_throttle = upload_throttle.clone();
280            let file = file.clone();
281            let client = self.client.clone();
282
283            let options = self.details.options.clone();
284
285            let task_func = FileUpload::part_upload(
286                client,
287                file_id,
288                status,
289                task_chunk,
290                file,
291                sha1s,
292                total_uploaded,
293                upload_throttle,
294                options,
295            );
296
297            let join_handle = tokio::spawn(async move {
298                let result = task_func.await;
299
300                if let Err(err) = result {
301                    for handle in task_abort_handles.read().await.iter() {
302                        handle.abort();
303                    }
304
305                    return Err(err);
306                }
307
308                Ok(())
309            });
310
311            let abort_handle = join_handle.abort_handle();
312
313            join_handles.push(join_handle);
314            abort_handles.write().await.push(abort_handle);
315        }
316
317        for handle in join_handles {
318            match handle.await {
319                Ok(res) => res,
320                Err(err) => match err.is_cancelled() {
321                    true => continue,
322                    false => panic!("{:#?}", err),
323                },
324            }?;
325        }
326
327        Ok(self
328            .client
329            .finish_large_file(B2FinishLargeFileBody {
330                file_id: file_id.clone(),
331                part_sha1_array: Arc::into_inner(sha1s)
332                    .expect("sha1s shouldn't be referenced any where else")
333                    .into(),
334            })
335            .await?)
336    }
337
338    async fn upload_small_file(&self) -> Result<B2File, FileUploadError> {
339        let mut buffer = Vec::with_capacity(self.details.file_size as usize);
340        let mut file = self.file.write().await;
341        file.seek(SeekFrom::Start(0)).await?;
342        file.read_to_end(&mut buffer).await?;
343        drop(file);
344
345        let sha1 = Sha1::from(&buffer).digest().to_string();
346
347        let upload_url_response = self
348            .client
349            .get_upload_url(self.details.bucket_id.clone())
350            .await?;
351
352        let b2_upload_headers = B2UploadFileHeaders::builder()
353            .authorization(upload_url_response.authorization_token)
354            .file_name(self.details.file_name.clone())
355            .content_type("b2/x-auto".into())
356            .content_length(self.details.file_size)
357            .content_sha1(sha1)
358            .build();
359
360        let b2_upload_headers = self
361            .details
362            .options
363            .options
364            .clone()
365            .apply_file_upload(b2_upload_headers);
366
367        let buffer = UploadBuffer::new(buffer);
368        let uploaded = self.stats.clone();
369        let status = self.status.clone();
370        let upload_throttle = Arc::new(
371            self.details
372                .options
373                .speed_throttle
374                .clone()
375                .map(|t| Mutex::new(t)),
376        );
377
378        let stream = stream! {
379            for chunk in buffer.chunks((SizeUnit::KIBIBYTE * 80) as usize) {
380                if let Some(ref throttle) = upload_throttle.as_ref() {
381                    let mut throttle = throttle.lock().await;
382                    throttle.advance_by(chunk.len() as u64).await;
383                    drop(throttle);
384                }
385
386
387                if *status == FileStatus::Aborted {
388                    break;
389                }
390
391                uploaded.add_done_bytes(chunk.len() as u64).await;
392
393                yield Ok::<Bytes, Infallible>(chunk);
394            }
395        };
396
397        self.start_timer().await;
398
399        let file = self
400            .client
401            .upload_file(
402                reqwest::Body::wrap_stream(stream),
403                upload_url_response.upload_url,
404                b2_upload_headers,
405                self.details.optional_info.clone(),
406            )
407            .await?;
408
409        Ok(file)
410    }
411
412    async fn start_timer(&self) {
413        self.stats.start_time.set(Instant::now()).await;
414    }
415
416    async fn cancel_large_file(&self) {
417        let large_file = self.large_file_id.read().await;
418
419        if let Some(id) = large_file.deref() {
420            self.client.cancel_large_file(id.clone()).await.ok();
421        }
422    }
423
424    async fn call_finish_callbacks(&self) {
425        let callbacks = self.completion_callbacks.read().await;
426
427        for callback in callbacks.deref() {
428            match callback {
429                B2Callback::Fn(fun) => fun(()),
430                B2Callback::AsyncFn(fun) => fun(()).await,
431            }
432        }
433    }
434
435    async fn part_upload(
436        client: Arc<B2SimpleClient>,
437        file_id: String,
438        status: WriteLockArc<FileStatus>,
439        task_chunk: Vec<((u64, u64), u16)>,
440        file: Arc<RwLock<dyn AsyncFileReader>>,
441        sha1s: Arc<LargeFileSha1>,
442        total_uploaded: Arc<FileNetworkStats>,
443        upload_throttle: Arc<Option<Mutex<Throttle<u64>>>>,
444        options: Arc<FileUploadOptions>,
445    ) -> Result<(), FileUploadError> {
446        let mut upload_part_url_response = client.get_upload_part_url(file_id.clone()).await?;
447
448        for ((start, end), part_number) in task_chunk {
449            let status = status.clone();
450            let mut buffer = vec![0u8; (end - start) as usize];
451
452            let mut file = file.write().await;
453            file.seek(std::io::SeekFrom::Start(start)).await?;
454            file.read_exact(&mut buffer).await?;
455            drop(file);
456
457            let sha1 = Sha1::from(&buffer).digest().to_string();
458
459            sha1s.set_sha1((part_number - 1) as usize, sha1.clone());
460
461            let buffer = UploadBuffer::new(buffer);
462
463            if *status == FileStatus::Aborted {
464                break;
465            }
466
467            loop {
468                let status = status.clone();
469
470                if *status == FileStatus::Aborted {
471                    break;
472                }
473
474                let total_uploaded = total_uploaded.clone();
475                let sha1 = sha1.clone();
476                let upload_part_headers = B2UploadPartHeaders::builder()
477                    .authorization(upload_part_url_response.authorization_token.clone())
478                    .part_number(part_number)
479                    .content_length(end - start)
480                    .content_sha1(sha1.clone())
481                    .build();
482
483                let upload_part_headers = options
484                    .options
485                    .clone()
486                    .apply_file_part_upload(upload_part_headers);
487
488                let upload_throttle = upload_throttle.clone();
489
490                let mut total_uploaded_here: u64 = 0;
491                let total_uploaded_other = total_uploaded.clone();
492                let buffer = buffer.chunks((SizeUnit::KIBIBYTE * 160) as usize);
493
494                let stream = stream! {
495                    for chunk in buffer {
496                        if *status == FileStatus::Aborted {
497                            break;
498                        }
499
500                        if let Some(ref throttle) = upload_throttle.as_ref() {
501                            let mut throttle = throttle.lock().await;
502                            throttle.advance_by(chunk.len() as u64).await;
503                            drop(throttle);
504                        }
505
506                        total_uploaded.add_done_bytes(chunk.len() as u64).await;
507                        *(&mut total_uploaded_here) += chunk.len() as u64;
508
509                        yield Ok::<_, Infallible>(chunk);
510                    }
511
512                };
513
514                let stream = reqwest::Body::wrap_stream(stream);
515
516                let result = client
517                    .upload_part(
518                        upload_part_headers,
519                        stream,
520                        upload_part_url_response.upload_url.clone(),
521                    )
522                    .await;
523
524                match result {
525                    Ok(_) => break,
526                    Err(error) => match error {
527                        B2Error::RequestError(error) => match error.status.get() {
528                            503 => {
529                                upload_part_url_response =
530                                    match client.get_upload_part_url(file_id.clone()).await {
531                                        Ok(resp) => resp,
532                                        Err(err) => return Err(err.into()),
533                                    };
534
535                                total_uploaded_other
536                                    .done
537                                    .fetch_sub(total_uploaded_here, Ordering::Relaxed);
538
539                                sleep(Duration::from_millis(200)).await;
540                            }
541                            _ => return Err(B2Error::RequestError(error).into()),
542                        },
543                        err => return Err(err.into()),
544                    },
545                };
546            }
547        }
548
549        Ok(())
550    }
551}