backblaze_b2_client/tasks/upload/
file_upload.rs1use std::{
2 collections::HashMap,
3 convert::Infallible,
4 io::SeekFrom,
5 ops::Deref,
6 sync::{atomic::Ordering, Arc},
7 time::{Duration, Instant},
8};
9
10use async_stream::stream;
11use bytes::Bytes;
12use sha1_smol::Sha1;
13use tokio::{
14 io::{AsyncReadExt, AsyncSeekExt},
15 sync::{
16 mpsc::{self, Receiver, Sender},
17 Mutex, RwLock,
18 },
19 task::{AbortHandle, JoinHandle},
20 time::sleep,
21};
22
23use crate::{
24 definitions::{
25 bodies::{B2FinishLargeFileBody, B2StartLargeFileUploadBody},
26 headers::{B2UploadFileHeaders, B2UploadPartHeaders},
27 shared::B2File,
28 },
29 error::B2Error,
30 simple_client::B2SimpleClient,
31 tasks::upload::{large_file_sha1::LargeFileSha1, upload_buffer::UploadBuffer},
32 throttle::Throttle,
33 util::{write_lock_arc::WriteLockArc, B2Callback, IsValid, SizeUnit},
34};
35
36use crate::tasks::shared::{AsyncFileReader, FileNetworkStats, FileStatus};
37
38use super::{
39 error::FileUploadError, upload_details::UploadFileDetails, FileUploadOptions,
40 LargeFileLoadStrategy,
41};
42pub struct FileUpload {
43 id: u64,
44 client: Arc<B2SimpleClient>,
45 details: UploadFileDetails,
46 status: WriteLockArc<FileStatus>,
47 file: Arc<RwLock<dyn AsyncFileReader>>,
48 stats: Arc<FileNetworkStats>,
49 large_file_id: Arc<RwLock<Option<String>>>,
50 completion_callbacks: Arc<RwLock<Vec<B2Callback<()>>>>,
51 abort_channel: (WriteLockArc<Sender<()>>, WriteLockArc<Receiver<()>>),
52}
53
54impl FileUpload {
55 pub fn new<F: AsyncFileReader + 'static>(
56 file: F,
57 file_name: String,
58 bucket_id: String,
59 optional_info: Option<HashMap<String, String>>,
60 file_size: u64,
61 options: FileUploadOptions,
62 client: Arc<B2SimpleClient>,
63 ) -> Arc<Self> {
64 let (tx, rx) = mpsc::channel::<()>(1);
65
66 Arc::new(Self {
67 id: rand::random(),
68 client,
69 details: UploadFileDetails {
70 file_size,
71 file_name,
72 bucket_id,
73 optional_info,
74 options: Arc::new(options),
75 },
76 large_file_id: Arc::new(RwLock::new(None)),
77 status: WriteLockArc::new(FileStatus::Pending),
78 file: Arc::new(RwLock::new(file)),
79 stats: Arc::new(FileNetworkStats::new(file_size as f64)),
80 completion_callbacks: Arc::new(RwLock::new(vec![])),
81 abort_channel: (WriteLockArc::new(tx), WriteLockArc::new(rx)),
82 })
83 }
84
85 pub fn id(&self) -> u64 {
86 self.id
87 }
88
89 pub fn stats(&self) -> &FileNetworkStats {
90 &self.stats
91 }
92
93 pub fn status(&self) -> FileStatus {
94 (*self.status).clone()
95 }
96
97 pub fn has_stopped(&self) -> bool {
99 *self.status == FileStatus::Finished || *self.status == FileStatus::Aborted
100 }
101
102 pub async fn start(&self) -> Result<B2File, FileUploadError> {
104 if *self.status != FileStatus::Pending {
105 return Err(FileUploadError::AlreadyStarted);
106 }
107
108 self.details.options.is_valid()?;
109
110 self.status.set(FileStatus::Working).await;
111
112 let retry_count = self.details.options.retry_strategy.count();
113 let mut curr_retry_count = 1;
114 let abort_receiver = self.abort_channel.1.clone();
115
116 let result = loop {
117 curr_retry_count += 1;
118
119 let result = match self.details.file_size {
120 size if size <= self.details.options.large_file_cutoff => {
121 self.upload_small_file().await
122 }
123 _ => {
124 let file_strat = match &self.details.options.file_load_strategy {
125 LargeFileLoadStrategy::Constant(strat) => strat,
126 LargeFileLoadStrategy::Dynamic(strat) => {
127 &strat.get_load_strategy(self.details.file_size)
128 }
129 };
130
131 file_strat.is_valid()?;
132
133 self.upload_large_file().await
134 }
135 };
136
137 if *self.status == FileStatus::Aborted {
138 break Err(FileUploadError::Aborted);
139 }
140
141 if result.is_err() && curr_retry_count <= retry_count.get() {
142 let wait = self.details.options.retry_strategy.wait(curr_retry_count);
143 let mut receiver_lock = abort_receiver.lock_write().await;
144
145 let mut status = self.status.lock_write().await;
146 if *status == FileStatus::Working {
147 *status = FileStatus::Retrying;
148 }
149 drop(status);
150
151 tokio::select! {
152 _ = sleep(wait) => {},
153 _ = receiver_lock.recv() => {
154 break Err(FileUploadError::Aborted)
155 }
156 };
157
158 continue;
159 }
160
161 break result;
162 };
163
164 let mut status = self.status.lock_write().await;
165 if *status == FileStatus::Working {
166 *status = FileStatus::Finished;
167 }
168 drop(status);
169
170 self.call_finish_callbacks().await;
171
172 if *self.status == FileStatus::Aborted {
173 return Err(FileUploadError::Aborted);
174 }
175
176 return result;
177 }
178
179 pub async fn abort(&self) {
181 if *self.status != FileStatus::Working || *self.status != FileStatus::Retrying {
183 return;
184 }
185
186 self.status.set(FileStatus::Aborted).await;
187
188 let sender = &self.abort_channel.0;
189 sender.send(()).await.ok();
190
191 self.cancel_large_file().await;
192 }
193
194 pub async fn add_finish_callback(&self, callback: B2Callback<()>) {
195 let mut callbacks = self.completion_callbacks.write().await;
196 callbacks.push(callback);
197 }
198
199 async fn upload_large_file(&self) -> Result<B2File, FileUploadError> {
200 let file = self.file.clone();
201
202 let start_large_upload_body = B2StartLargeFileUploadBody::builder()
203 .bucket_id(self.details.bucket_id.clone())
204 .file_name(self.details.file_name.clone())
205 .content_type("b2/x-auto".into())
206 .file_info(self.details.optional_info.clone())
207 .build();
208
209 let start_large_upload_body = self
210 .details
211 .options
212 .options
213 .clone()
214 .apply_large_file_upload(start_large_upload_body);
215
216 let start_large_file_response = self
217 .client
218 .start_large_file(start_large_upload_body)
219 .await?;
220
221 let file_id = start_large_file_response.file_id;
222 let total_uploaded = self.stats.clone();
223
224 let mut large_file = self.large_file_id.write().await;
225 *large_file = Some(file_id.clone());
226 drop(large_file);
227
228 let file_strat = match &self.details.options.file_load_strategy {
229 LargeFileLoadStrategy::Constant(strat) => strat,
230 LargeFileLoadStrategy::Dynamic(strat) => {
231 &strat.get_load_strategy(self.details.file_size)
232 }
233 };
234
235 let mut parts: Vec<((u64, u64), u16)> = vec![];
236 let mut current_range_start: u16 = 0;
237
238 loop {
239 let start = file_strat.part_size * u64::from(current_range_start);
240 let end = file_strat.part_size * (u64::from(current_range_start) + 1);
241
242 current_range_start += 1;
243
244 if end >= self.details.file_size {
245 parts.push(((start, self.details.file_size), current_range_start));
246 break;
247 } else {
248 parts.push(((start, end), current_range_start));
249 }
250 }
251
252 let sha1s = Arc::new(LargeFileSha1::new(parts.len()));
253 let mut join_handles: Vec<JoinHandle<Result<(), FileUploadError>>> = vec![];
254 let abort_handles: Arc<RwLock<Vec<AbortHandle>>> = Arc::new(RwLock::new(vec![]));
255 self.start_timer().await;
256
257 let upload_throttle = Arc::new(
258 self.details
259 .options
260 .speed_throttle
261 .clone()
262 .map(|t| Mutex::new(t)),
263 );
264
265 let status = self.status.clone();
266
267 for chunk in parts.chunks(file_strat.chunk_size as usize) {
268 let task_chunk = chunk.to_owned();
269 let file_id = file_id.clone();
270 let sha1s = sha1s.clone();
271 let task_abort_handles = abort_handles.clone();
272 let total_uploaded = total_uploaded.clone();
273 let status = status.clone();
274
275 if *status == FileStatus::Aborted {
276 break;
277 }
278
279 let upload_throttle = upload_throttle.clone();
280 let file = file.clone();
281 let client = self.client.clone();
282
283 let options = self.details.options.clone();
284
285 let task_func = FileUpload::part_upload(
286 client,
287 file_id,
288 status,
289 task_chunk,
290 file,
291 sha1s,
292 total_uploaded,
293 upload_throttle,
294 options,
295 );
296
297 let join_handle = tokio::spawn(async move {
298 let result = task_func.await;
299
300 if let Err(err) = result {
301 for handle in task_abort_handles.read().await.iter() {
302 handle.abort();
303 }
304
305 return Err(err);
306 }
307
308 Ok(())
309 });
310
311 let abort_handle = join_handle.abort_handle();
312
313 join_handles.push(join_handle);
314 abort_handles.write().await.push(abort_handle);
315 }
316
317 for handle in join_handles {
318 match handle.await {
319 Ok(res) => res,
320 Err(err) => match err.is_cancelled() {
321 true => continue,
322 false => panic!("{:#?}", err),
323 },
324 }?;
325 }
326
327 Ok(self
328 .client
329 .finish_large_file(B2FinishLargeFileBody {
330 file_id: file_id.clone(),
331 part_sha1_array: Arc::into_inner(sha1s)
332 .expect("sha1s shouldn't be referenced any where else")
333 .into(),
334 })
335 .await?)
336 }
337
338 async fn upload_small_file(&self) -> Result<B2File, FileUploadError> {
339 let mut buffer = Vec::with_capacity(self.details.file_size as usize);
340 let mut file = self.file.write().await;
341 file.seek(SeekFrom::Start(0)).await?;
342 file.read_to_end(&mut buffer).await?;
343 drop(file);
344
345 let sha1 = Sha1::from(&buffer).digest().to_string();
346
347 let upload_url_response = self
348 .client
349 .get_upload_url(self.details.bucket_id.clone())
350 .await?;
351
352 let b2_upload_headers = B2UploadFileHeaders::builder()
353 .authorization(upload_url_response.authorization_token)
354 .file_name(self.details.file_name.clone())
355 .content_type("b2/x-auto".into())
356 .content_length(self.details.file_size)
357 .content_sha1(sha1)
358 .build();
359
360 let b2_upload_headers = self
361 .details
362 .options
363 .options
364 .clone()
365 .apply_file_upload(b2_upload_headers);
366
367 let buffer = UploadBuffer::new(buffer);
368 let uploaded = self.stats.clone();
369 let status = self.status.clone();
370 let upload_throttle = Arc::new(
371 self.details
372 .options
373 .speed_throttle
374 .clone()
375 .map(|t| Mutex::new(t)),
376 );
377
378 let stream = stream! {
379 for chunk in buffer.chunks((SizeUnit::KIBIBYTE * 80) as usize) {
380 if let Some(ref throttle) = upload_throttle.as_ref() {
381 let mut throttle = throttle.lock().await;
382 throttle.advance_by(chunk.len() as u64).await;
383 drop(throttle);
384 }
385
386
387 if *status == FileStatus::Aborted {
388 break;
389 }
390
391 uploaded.add_done_bytes(chunk.len() as u64).await;
392
393 yield Ok::<Bytes, Infallible>(chunk);
394 }
395 };
396
397 self.start_timer().await;
398
399 let file = self
400 .client
401 .upload_file(
402 reqwest::Body::wrap_stream(stream),
403 upload_url_response.upload_url,
404 b2_upload_headers,
405 self.details.optional_info.clone(),
406 )
407 .await?;
408
409 Ok(file)
410 }
411
412 async fn start_timer(&self) {
413 self.stats.start_time.set(Instant::now()).await;
414 }
415
416 async fn cancel_large_file(&self) {
417 let large_file = self.large_file_id.read().await;
418
419 if let Some(id) = large_file.deref() {
420 self.client.cancel_large_file(id.clone()).await.ok();
421 }
422 }
423
424 async fn call_finish_callbacks(&self) {
425 let callbacks = self.completion_callbacks.read().await;
426
427 for callback in callbacks.deref() {
428 match callback {
429 B2Callback::Fn(fun) => fun(()),
430 B2Callback::AsyncFn(fun) => fun(()).await,
431 }
432 }
433 }
434
435 async fn part_upload(
436 client: Arc<B2SimpleClient>,
437 file_id: String,
438 status: WriteLockArc<FileStatus>,
439 task_chunk: Vec<((u64, u64), u16)>,
440 file: Arc<RwLock<dyn AsyncFileReader>>,
441 sha1s: Arc<LargeFileSha1>,
442 total_uploaded: Arc<FileNetworkStats>,
443 upload_throttle: Arc<Option<Mutex<Throttle<u64>>>>,
444 options: Arc<FileUploadOptions>,
445 ) -> Result<(), FileUploadError> {
446 let mut upload_part_url_response = client.get_upload_part_url(file_id.clone()).await?;
447
448 for ((start, end), part_number) in task_chunk {
449 let status = status.clone();
450 let mut buffer = vec![0u8; (end - start) as usize];
451
452 let mut file = file.write().await;
453 file.seek(std::io::SeekFrom::Start(start)).await?;
454 file.read_exact(&mut buffer).await?;
455 drop(file);
456
457 let sha1 = Sha1::from(&buffer).digest().to_string();
458
459 sha1s.set_sha1((part_number - 1) as usize, sha1.clone());
460
461 let buffer = UploadBuffer::new(buffer);
462
463 if *status == FileStatus::Aborted {
464 break;
465 }
466
467 loop {
468 let status = status.clone();
469
470 if *status == FileStatus::Aborted {
471 break;
472 }
473
474 let total_uploaded = total_uploaded.clone();
475 let sha1 = sha1.clone();
476 let upload_part_headers = B2UploadPartHeaders::builder()
477 .authorization(upload_part_url_response.authorization_token.clone())
478 .part_number(part_number)
479 .content_length(end - start)
480 .content_sha1(sha1.clone())
481 .build();
482
483 let upload_part_headers = options
484 .options
485 .clone()
486 .apply_file_part_upload(upload_part_headers);
487
488 let upload_throttle = upload_throttle.clone();
489
490 let mut total_uploaded_here: u64 = 0;
491 let total_uploaded_other = total_uploaded.clone();
492 let buffer = buffer.chunks((SizeUnit::KIBIBYTE * 160) as usize);
493
494 let stream = stream! {
495 for chunk in buffer {
496 if *status == FileStatus::Aborted {
497 break;
498 }
499
500 if let Some(ref throttle) = upload_throttle.as_ref() {
501 let mut throttle = throttle.lock().await;
502 throttle.advance_by(chunk.len() as u64).await;
503 drop(throttle);
504 }
505
506 total_uploaded.add_done_bytes(chunk.len() as u64).await;
507 *(&mut total_uploaded_here) += chunk.len() as u64;
508
509 yield Ok::<_, Infallible>(chunk);
510 }
511
512 };
513
514 let stream = reqwest::Body::wrap_stream(stream);
515
516 let result = client
517 .upload_part(
518 upload_part_headers,
519 stream,
520 upload_part_url_response.upload_url.clone(),
521 )
522 .await;
523
524 match result {
525 Ok(_) => break,
526 Err(error) => match error {
527 B2Error::RequestError(error) => match error.status.get() {
528 503 => {
529 upload_part_url_response =
530 match client.get_upload_part_url(file_id.clone()).await {
531 Ok(resp) => resp,
532 Err(err) => return Err(err.into()),
533 };
534
535 total_uploaded_other
536 .done
537 .fetch_sub(total_uploaded_here, Ordering::Relaxed);
538
539 sleep(Duration::from_millis(200)).await;
540 }
541 _ => return Err(B2Error::RequestError(error).into()),
542 },
543 err => return Err(err.into()),
544 },
545 };
546 }
547 }
548
549 Ok(())
550 }
551}