yab2/
lib.rs

1//! Yet Another Backblaze B2 Client
2//! ===============================
3//!
4//! Opinionated Backblaze B2 Client.
5//!
6//! ## Features
7//!
8//! - Simple API making use of Rust's ownership for API constraints
9//! - Automatic re-authentication and refreshing of Upload URLs
10//!
11//! ## Cargo Features
12//!
13//! - `fs` (enables optimized routine for uploading from filesystem)
14//! - `pool` (enabled non-large `UploadURL` object pool for reuse)
15//! - `reqwest_compression` (enables deflate/gzip features on `reqwest`)
16//! - `large_buffers` (enables large buffer support, 64KiB instead of 8KiB)
17
18#![allow(clippy::redundant_pattern_matching)]
19
20#[macro_use]
21extern crate serde;
22
23use std::{borrow::Cow, future::Future, num::NonZeroU32, sync::Arc, time::Duration};
24
25use tokio::sync::RwLock;
26
27use headers::HeaderMapExt;
28use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
29use reqwest::Method;
30
31macro_rules! h {
32    ($headers:ident.$key:literal => $value:expr) => {
33        $headers.insert(
34            reqwest::header::HeaderName::from_static($key), // NOTE: Header names must be lowercase
35            reqwest::header::HeaderValue::from_str($value).expect("Unable to use header value"),
36        );
37    };
38}
39
40mod types;
41
42pub mod error;
43pub mod models;
44
45pub use types::{sse, DownloadFileBy, FileRetention, ListFiles, NewFileInfo, NewLargeFileInfo, NewPartInfo};
46
47/// Autogenerated builders for various types.
48pub mod builders {
49    pub use crate::types::{
50        FileRetentionBuilder, ListFilesBuilder, NewFileInfoBuilder, NewLargeFileInfoBuilder, NewPartInfoBuilder,
51    };
52}
53
54#[cfg(feature = "pool")]
55pub mod pool;
56
57#[cfg(feature = "fs")]
58mod fs;
59
60pub use error::B2Error;
61pub use fs::NewFileFromPath;
62
63struct ClientState {
64    /// The builder used to create the client.
65    config: ClientBuilder,
66
67    /// The authorization data returned from the B2 API `b2_authorize_account` endpoint
68    account: crate::models::B2Authorized,
69
70    /// The authorization header to use for requests
71    auth: HeaderValue,
72}
73
74impl ClientState {
75    fn check_capability(&self, capability: &'static str) -> Result<(), B2Error> {
76        if !self.account.allowed(capability) {
77            return Err(B2Error::MissingCapability(capability));
78        }
79
80        Ok(())
81    }
82
83    fn url(&self, path: &str) -> String {
84        format!("{}/b2api/v3/{}", self.account.api.storage.api_url, path)
85    }
86
87    #[inline]
88    fn bucket_id<'a>(&'a self, bucket_id: Option<&'a str>) -> Result<&'a str, B2Error> {
89        #[allow(clippy::unnecessary_lazy_evaluations)]
90        bucket_id.or_else(|| self.account.api.storage.bucket_id.as_deref()).ok_or(B2Error::MissingBucketId)
91    }
92
93    fn check_prefix(&self, name: Option<&str>) -> Result<(), B2Error> {
94        match (name, self.account.api.storage.name_prefix.as_ref()) {
95            (Some(name), Some(prefix)) if !name.starts_with(prefix as &str) => Err(B2Error::InvalidPrefix),
96            _ => Ok(()),
97        }
98    }
99}
100
101/// A client for interacting with the B2 API
102#[derive(Clone)]
103pub struct Client {
104    state: Arc<RwLock<ClientState>>,
105    client: reqwest::Client,
106}
107
108/// A builder for creating a [`Client`]
109#[derive(Clone)]
110pub struct ClientBuilder {
111    auth: HeaderValue,
112    ua: Option<Cow<'static, str>>,
113    max_retries: u8,
114    retry_delay: Duration,
115}
116
117/// Wrapper around a response and the file's parsed headers.
118pub struct DownloadedFile {
119    pub resp: reqwest::Response,
120    pub info: models::B2FileHeaders,
121}
122
123impl ClientBuilder {
124    /// Creates a new client builder with the given key ID and application key.
125    pub fn new(key_id: &str, app_key: &str) -> ClientBuilder {
126        ClientBuilder {
127            auth: models::create_auth_header(key_id, app_key),
128            ua: None,
129            max_retries: 5,
130            retry_delay: Duration::from_secs(1),
131        }
132    }
133
134    /// Sets the `User-Agent` header to be used for requests.
135    #[inline]
136    pub fn user_agent(mut self, ua: impl Into<Cow<'static, str>>) -> Self {
137        self.ua = Some(ua.into());
138        self
139    }
140
141    /// Sets the maximum number of times to retry requests if they fail with a 401 Unauthorized error.
142    #[inline]
143    pub fn max_retries(mut self, max_retries: u8) -> Self {
144        self.max_retries = max_retries;
145        self
146    }
147
148    /// Sets the delay between authorization retries if a request fails.
149    pub fn retry_delay(mut self, delay: Duration) -> Self {
150        self.retry_delay = delay;
151        self
152    }
153
154    /// Builds and authorizes the client for first use.
155    pub async fn authorize(self) -> Result<Client, B2Error> {
156        let mut builder = reqwest::ClientBuilder::new().https_only(true);
157
158        if let Some(ref ua) = self.ua {
159            builder = builder.user_agent(ua.as_ref());
160        }
161
162        let client = builder.build()?;
163
164        Ok(Client {
165            state: Arc::new(RwLock::new(Client::do_auth(&client, self).await?)),
166            client,
167        })
168    }
169}
170
171struct DummyValue;
172
173impl<'de> serde::Deserialize<'de> for DummyValue {
174    fn deserialize<D>(_: D) -> Result<Self, D::Error>
175    where
176        D: serde::Deserializer<'de>,
177    {
178        Ok(DummyValue)
179    }
180}
181
182impl Client {
183    fn req(&self, method: Method, auth: &HeaderValue, url: impl AsRef<str>) -> reqwest::RequestBuilder {
184        self.client.request(method, url.as_ref()).header(AUTHORIZATION, auth)
185    }
186
187    async fn json<T>(builder: reqwest::RequestBuilder) -> Result<T, B2Error>
188    where
189        T: serde::de::DeserializeOwned,
190    {
191        let resp = builder.send().await?;
192
193        if !resp.status().is_success() {
194            return Err(B2Error::B2ErrorMessage(resp.json().await?));
195        }
196
197        Ok(serde_json::from_str(&resp.text().await?)?)
198    }
199
200    async fn do_auth(client: &reqwest::Client, config: ClientBuilder) -> Result<ClientState, B2Error> {
201        use failsafe::{futures::CircuitBreaker, Config, Error as FailsafeError};
202
203        let cb = Config::new().build();
204        let mut attempts = 0;
205
206        'try_auth: loop {
207            let do_auth_inner = Client::json::<models::B2Authorized>(
208                client
209                    .get("https://api.backblazeb2.com/b2api/v3/b2_authorize_account")
210                    .header(AUTHORIZATION, &config.auth),
211            );
212
213            return match cb.call(do_auth_inner).await {
214                Ok(account) => Ok(ClientState {
215                    config,
216                    auth: HeaderValue::from_str(&account.auth_token)
217                        .expect("Unable to use auth token in header value"),
218                    account,
219                }),
220                Err(FailsafeError::Rejected) => {
221                    attempts += 1;
222                    if attempts >= config.max_retries {
223                        return Err(B2Error::Unauthorized);
224                    }
225
226                    tokio::time::sleep(config.retry_delay).await;
227
228                    continue 'try_auth;
229                }
230                Err(FailsafeError::Inner(e)) => Err(e),
231            };
232        }
233    }
234
235    /// Reauthorizes the client, updating the authorization token and account information.
236    async fn reauthorize(&self) -> Result<(), B2Error> {
237        let new_state = Self::do_auth(&self.client, self.state.read().await.config.clone()).await?;
238        *self.state.write().await = new_state;
239        Ok(())
240    }
241
242    /// Runs a request, reauthorizing if necessary.
243    async fn run_request_with_reauth<'a, F, R, T>(&self, f: F) -> Result<T, B2Error>
244    where
245        F: Fn(Self) -> R + 'a,
246        R: Future<Output = Result<T, B2Error>> + 'a,
247    {
248        let mut retried = false;
249        loop {
250            return match f(self.clone()).await {
251                Ok(t) => Ok(t),
252                Err(B2Error::B2ErrorMessage(e)) if !retried && e.status == 401 => {
253                    // box future to avoid stack bloat
254                    Box::pin(self.reauthorize()).await?;
255
256                    retried = true;
257                    continue;
258                }
259                Err(e) => Err(e),
260            };
261        }
262    }
263
264    /// Uses the `b2_get_file_info` API to get information about a file by its ID.
265    pub async fn get_file_info(&self, file_id: &str) -> Result<models::B2FileInfo, B2Error> {
266        #[derive(Serialize)]
267        #[serde(rename_all = "camelCase")]
268        struct B2GetFileInfo<'a> {
269            file_id: &'a str,
270        }
271
272        self.run_request_with_reauth(|b2| async move {
273            let state = b2.state.read().await;
274
275            state.check_capability("readFiles")?; // TODO: check if this is the right capability
276
277            Client::json(b2.req(Method::GET, &state.auth, "b2_get_file_info").query(&B2GetFileInfo { file_id }))
278                .await
279        })
280        .await
281    }
282
283    /// Downloads a file by its ID or name, returning a [`DownloadedFile`],
284    /// which is a wrapper around a [`reqwest::Response`] and the file's parsed headers.
285    ///
286    /// The `file` parameter can be either a file ID or a file name.
287    /// The `range` parameter can be used to download only a portion of the file. If `None`, the entire file will be downloaded.
288    /// The `encryption` parameter is only required if the file is encrypted with server-side encryption with a customer-provided key (SSE-C).
289    pub async fn download_file(
290        &self,
291        file: DownloadFileBy<'_>,
292        range: Option<headers::Range>,
293        encryption: Option<sse::ServerSideEncryptionCustomer>,
294    ) -> Result<DownloadedFile, B2Error> {
295        let (range, encryption) = (&range, &encryption);
296
297        // serde_urlencoded doesn't support top-level enums,
298        // so we need to use a wrapper struct
299        #[derive(Serialize)]
300        struct DownloadFileBy2<'a> {
301            #[serde(flatten)]
302            file: DownloadFileBy<'a>,
303        }
304
305        self.run_request_with_reauth(|b2| async move {
306            let state = b2.state.read().await;
307
308            state.check_capability("readFiles")?;
309
310            let resp = b2
311                .req(Method::GET, &state.auth, {
312                    state.url(match file {
313                        DownloadFileBy::FileId(_) => "b2_download_file_by_id",
314                        DownloadFileBy::FileName(_) => "b2_download_file_by_name",
315                    })
316                })
317                .headers({
318                    let mut headers = HeaderMap::new();
319                    if let Some(ref range) = range {
320                        headers.typed_insert(range.clone());
321                    }
322                    if let Some(ref encryption) = encryption {
323                        encryption.add_headers(&mut headers);
324                    }
325                    headers
326                })
327                .query(&DownloadFileBy2 { file })
328                .send()
329                .await?;
330
331            Ok(DownloadedFile {
332                info: models::B2FileHeaders::parse(resp.headers())?,
333                resp,
334            })
335        })
336        .await
337    }
338
339    /// Lists the names of all files in a bucket, optionally filtered by a prefix and/or delimiter.
340    ///
341    /// If [`ListFiles::bucket_id`] is `None`, the client's default bucket will be used. If there is no default bucket,
342    /// an error will be returned.
343    ///
344    /// Each time you call, it returns a `nextFileName` and `nextFileId` (only if `all_versions` is true)
345    /// that can be used as the starting point for the next call.
346    ///
347    /// NOTE: `b2_list_file_names`/`b2_list_file_versions` are Class C transactions. The maximum number of
348    /// files returned per transaction is 1000. If you set maxFileCount to more than 1000 and
349    /// more than 1000 are returned, the call will be billed as multiple transactions, as if you
350    /// had made requests in a loop asking for 1000 at a time. For example: if you set maxFileCount
351    /// to 10000 and 3123 items are returned, you will be billed for 4 Class C transactions.
352    ///
353    /// See the [B2 API documentation](https://www.backblaze.com/apidocs/b2-list-file-names) of this method
354    /// for more information on how to use the parameters such as `prefix` and `delimiter`.
355    ///
356    /// # Example
357    ///
358    /// ```ignore
359    /// let client = ClientBuilder::new(&app_id, &app_key).authorize().await?;
360    ///
361    /// let files = client.list_files(ListFiles::builder().all_versions(true).build()).await?;
362    ///
363    /// println!("{:#?}", files);
364    /// ```
365    pub async fn list_files(&self, mut args: ListFiles<'_>) -> Result<models::B2FileInfoList, B2Error> {
366        if !args.all_versions {
367            args.start_file_id = None; // not used
368        }
369
370        self.run_request_with_reauth(move |b2| async move {
371            let state = b2.state.read().await;
372
373            state.check_capability("listFiles")?;
374
375            let mut args = ListFiles { ..args }; // redefine lifetime of `args`
376
377            args.bucket_id = Some(state.bucket_id(args.bucket_id)?);
378
379            let path = if args.all_versions { "b2_list_file_versions" } else { "b2_list_file_names" };
380
381            Client::json(b2.req(Method::GET, &state.auth, state.url(path)).query(&args)).await
382        })
383        .await
384    }
385
386    /// Hides a file so that downloading by name will not find the file,
387    /// but previous versions of the file are still stored.
388    pub async fn hide_file(
389        &self,
390        bucket_id: Option<&str>,
391        file_name: &str,
392    ) -> Result<models::B2FileInfo, B2Error> {
393        #[derive(Serialize)]
394        #[serde(rename_all = "camelCase")]
395        struct B2HideFile<'a> {
396            #[serde(skip_serializing_if = "Option::is_none")]
397            bucket_id: Option<&'a str>,
398            file_name: &'a str,
399        }
400
401        self.run_request_with_reauth(|b2| async move {
402            let state = b2.state.read().await;
403
404            state.check_capability("writeFiles")?;
405            state.check_prefix(Some(file_name))?;
406
407            let body = B2HideFile {
408                bucket_id: state.bucket_id(bucket_id).ok(),
409                file_name,
410            };
411
412            Self::json(b2.req(Method::POST, &state.auth, state.url("b2_hide_file")).json(&body)).await
413        })
414        .await
415    }
416
417    /// Deletes one version of a file.
418    ///
419    /// If the version you delete is the latest version, and there are older versions,
420    /// then the most recent older version will become the current version,
421    /// and be the one that you'll get when downloading by name.
422    ///
423    /// When used on an unfinished large file, this call has the same effect as cancelling it.
424    ///
425    /// `bypass_governance` must be set to true if deleting a file version protected by Object Lock
426    /// governance mode retention settings. Setting the value requires the
427    /// `bypassGovernance` application key capability.
428    pub async fn delete_file(
429        &self,
430        file_id: &str,
431        file_name: &str,
432        bypass_governance: bool,
433    ) -> Result<(), B2Error> {
434        #[derive(Serialize)]
435        #[serde(rename_all = "camelCase")]
436        struct B2DeleteFile<'a> {
437            file_id: &'a str,
438            file_name: &'a str,
439            bypass_governance: bool,
440        }
441
442        self.run_request_with_reauth(|b2| async move {
443            let state = b2.state.read().await;
444
445            state.check_capability("deleteFiles")?;
446            state.check_prefix(Some(file_name))?;
447
448            if bypass_governance {
449                // TODO: check if this is the right capability
450                state.check_capability("bypassGovernance")?;
451            }
452
453            let body = B2DeleteFile {
454                file_id,
455                file_name,
456                bypass_governance,
457            };
458
459            Self::json(b2.req(Method::POST, &state.auth, state.url("b2_delete_file_version")).json(&body))
460                .await
461                .map(|_: DummyValue| ())
462        })
463        .await
464    }
465
466    /// Modifies the Object Lock legal hold status for an existing file.
467    ///
468    /// Used to enable legal hold for a file in an Object Lock-enabled bucket,
469    /// preventing it from being deleted, or to disable legal hold protections for a file.
470    ///
471    /// Backblaze B2 Cloud Storage Object Lock lets you make data immutable by preventing
472    /// a file from being changed or deleted until a given date to protect data that is
473    /// stored in Backblaze B2 from threats like ransomware or for regulatory compliance.
474    ///
475    /// `legal_hold` must be set to `true` to enable legal hold, and `false` to disable it.
476    pub async fn update_legal_hold(
477        &self,
478        file_name: &str,
479        file_id: &str,
480        legal_hold: bool,
481    ) -> Result<(), B2Error> {
482        #[derive(Serialize)]
483        #[serde(rename_all = "camelCase")]
484        struct B2UpdateLegalHold<'a> {
485            file_name: &'a str,
486            file_id: &'a str,
487            legal_hold: &'a str,
488        }
489
490        self.run_request_with_reauth(|b2| async move {
491            let state = b2.state.read().await;
492
493            state.check_capability("writeFileLegalHolds")?;
494            state.check_prefix(Some(file_name))?;
495
496            let body = B2UpdateLegalHold {
497                file_name,
498                file_id,
499                legal_hold: if legal_hold { "on" } else { "off" },
500            };
501
502            Self::json(b2.req(Method::POST, &state.auth, state.url("b2_update_legal_hold")).json(&body))
503                .await
504                .map(|_: DummyValue| ())
505        })
506        .await
507    }
508
509    /// Modifies the Object Lock retention settings for an existing file.
510    ///
511    /// After enabling file retention for a file in an Object Lock-enabled bucket,
512    /// any attempts to delete the file or make any changes to it before
513    /// the end of the retention period will fail.
514    ///
515    /// File retention settings can be configured in either governance or
516    /// compliance mode. For files protected in governance mode, file retention
517    /// settings can be deleted or the retention period shortened only by clients
518    /// with the appropriate application key capability (i.e., bypassGovernance).
519    ///
520    /// File retention settings for files protected in compliance mode cannot
521    /// removed by any user, but their retention dates can be extended by
522    /// clients with appropriate application key capabilities.
523    pub async fn update_file_retention(
524        &self,
525        file_name: &str,
526        file_id: &str,
527        retention: FileRetention,
528    ) -> Result<(), B2Error> {
529        #[derive(Serialize)]
530        #[serde(rename_all = "camelCase")]
531        struct B2UpdateFileRetention<'a> {
532            file_name: &'a str,
533            file_id: &'a str,
534
535            #[serde(flatten)]
536            retention: FileRetention,
537
538            bypass_governance: bool,
539        }
540
541        let body = &B2UpdateFileRetention {
542            file_name,
543            file_id,
544            bypass_governance: retention.bypass_governance,
545            retention,
546        };
547
548        self.run_request_with_reauth(|b2| async move {
549            let state = b2.state.read().await;
550
551            state.check_capability("writeFileRetentions")?;
552            state.check_prefix(Some(file_name))?;
553
554            Self::json(b2.req(Method::POST, &state.auth, state.url("b2_update_file_retention")).json(body))
555                .await
556                .map(|_: DummyValue| ())
557        })
558        .await
559    }
560
561    async fn get_b2_upload_url(
562        &self,
563        bucket_id: Option<&str>,
564        file_id: Option<&str>,
565    ) -> Result<(Option<Arc<str>>, models::B2UploadUrl), B2Error> {
566        #[derive(Serialize)]
567        #[serde(rename_all = "camelCase")]
568        struct B2GetUploadUrlQuery<'a> {
569            #[serde(skip_serializing_if = "Option::is_none")]
570            bucket_id: Option<&'a str>,
571
572            #[serde(skip_serializing_if = "Option::is_none")]
573            file_id: Option<&'a str>,
574        }
575
576        self.run_request_with_reauth(|b2| async move {
577            let state = b2.state.read().await;
578
579            state.check_capability("writeFiles")?;
580
581            let mut query = B2GetUploadUrlQuery { bucket_id, file_id };
582
583            if query.file_id.is_some() {
584                query.bucket_id = None;
585            } else if query.bucket_id.is_some() {
586                query.file_id = None;
587            } else {
588                query.bucket_id = Some(state.bucket_id(query.bucket_id)?);
589            }
590
591            let path = state.url(if file_id.is_some() { "b2_get_upload_part_url" } else { "b2_get_upload_url" });
592
593            Ok((
594                state.account.api.storage.name_prefix.clone(),
595                Self::json::<models::B2UploadUrl>(b2.req(Method::GET, &state.auth, path).query(&query)).await?,
596            ))
597        })
598        .await
599    }
600
601    async fn get_raw_upload_url(
602        &self,
603        bucket_id: Option<&str>,
604        file_id: Option<&str>,
605    ) -> Result<RawUploadUrl, B2Error> {
606        let (prefix, url) = self.get_b2_upload_url(bucket_id, file_id).await?;
607
608        Ok(RawUploadUrl {
609            client: self.clone(),
610            auth: url.header(),
611            url,
612            prefix,
613        })
614    }
615
616    /// Gets a URL for uploading files using the `b2_get_upload_url` API.
617    ///
618    /// If `bucket_id` is `None`, the client's default bucket will be used. If there is no default bucket, an error will be returned.
619    ///
620    /// The returned `UploadUrl` can be used to upload files to the B2 API for 24 hours. Only one file can be uploaded to a URL at a time.
621    /// You may acquire multiple URLs to upload multiple files in parallel.
622    pub async fn get_upload_url(&self, bucket_id: Option<&str>) -> Result<UploadUrl, B2Error> {
623        Ok(UploadUrl(self.get_raw_upload_url(bucket_id, None).await?))
624    }
625
626    /// Gets a URL for uploading parts of a large file using the `b2_get_upload_part_url` API.
627    ///
628    /// The returned `UploadPartUrl` can be used to upload parts of a large file to the B2 API for 24 hours.
629    /// Only one part can be uploaded to a URL at a time. You may acquire multiple URLs to upload multiple parts in parallel.
630    pub async fn get_upload_part_url(&self, file_id: &str) -> Result<UploadPartUrl, B2Error> {
631        Ok(UploadPartUrl(self.get_raw_upload_url(None, Some(file_id)).await?))
632    }
633
634    /// Prepares parts of a large file for uploading using the `b2_start_large_file` API.
635    pub async fn start_large_file(
636        &self,
637        bucket_id: Option<&str>,
638        info: &NewLargeFileInfo,
639    ) -> Result<LargeFileUpload, B2Error> {
640        #[derive(Serialize)]
641        #[serde(rename_all = "camelCase")]
642        struct B2StartLargeFile<'a> {
643            bucket_id: &'a str,
644            file_name: &'a str,
645
646            content_type: Option<&'a str>,
647
648            #[serde(skip_serializing_if = "Option::is_none")]
649            file_retention: Option<&'a FileRetention>,
650
651            #[serde(skip_serializing_if = "Option::is_none")]
652            legal_hold: Option<&'a str>,
653
654            #[serde(skip_serializing_if = "sse::ServerSideEncryption::is_default")]
655            encryption: &'a sse::ServerSideEncryption,
656        }
657
658        let info = self
659            .run_request_with_reauth(|b2| async move {
660                let state = b2.state.read().await;
661
662                state.check_capability("writeFiles")?;
663                state.check_prefix(Some(&info.file_name))?;
664
665                let body = B2StartLargeFile {
666                    bucket_id: state.bucket_id(bucket_id)?,
667                    file_name: &info.file_name,
668                    content_type: info.content_type.as_deref(),
669                    file_retention: info.retention.as_ref(),
670                    legal_hold: info.legal_hold.map(|lh| if lh { "on" } else { "off" }),
671                    encryption: &info.encryption,
672                };
673
674                Client::json::<models::B2FileInfo>(
675                    b2.req(Method::POST, &state.auth, state.url("b2_start_large_file")).json(&body),
676                )
677                .await
678            })
679            .await?;
680
681        Ok(LargeFileUpload {
682            client: self.clone(),
683            info,
684        })
685    }
686}
687
688struct RawUploadUrl {
689    client: Client,
690    url: models::B2UploadUrl,
691    auth: HeaderValue,
692    prefix: Option<Arc<str>>,
693}
694
695/// Temporarily acquired URL for uploading single files.
696///
697/// This is returned by [`Client::get_upload_url`].
698///
699/// The URL can be used to upload a file to the B2 API for 24 hours. Only one file can be uploaded to a URL at a time.
700/// This is enforced via requiring mutable references to the URL when uploading a file.
701#[repr(transparent)]
702pub struct UploadUrl(RawUploadUrl);
703
704/// Temporarily acquired URL for uploading parts of a large file.
705///
706/// This is returned by [`Client::get_upload_part_url`].
707///
708/// The URL can be used to upload parts of a large file to the B2 API for 24 hours. Only one part can be uploaded to a URL at a time.
709/// This is enforced via requiring mutable references to the URL when uploading a part.
710#[repr(transparent)]
711pub struct UploadPartUrl(RawUploadUrl);
712
713impl RawUploadUrl {
714    /// Actually performs the upload, with automatic reauthorization if necessary.
715    async fn do_upload<F, T>(&mut self, f: F) -> Result<T, B2Error>
716    where
717        F: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
718        T: serde::de::DeserializeOwned,
719    {
720        loop {
721            let res = Client::json(f(self.client.req(Method::POST, &self.auth, &self.url.upload_url)));
722            return match res.await {
723                Err(B2Error::B2ErrorMessage(e)) if e.status == 401 => {
724                    let get_new_url =
725                        self.client.get_b2_upload_url(self.url.bucket_id.as_deref(), self.url.file_id.as_deref());
726
727                    let (prefix, url) = Box::pin(get_new_url).await?;
728
729                    self.auth = url.header();
730                    self.url = url;
731                    self.prefix = prefix;
732
733                    continue;
734                }
735                res => res,
736            };
737        }
738    }
739
740    fn check_prefix(&self, file_name: &str) -> Result<(), B2Error> {
741        match self.prefix {
742            Some(ref prefix) if !file_name.starts_with(prefix.as_ref()) => Err(B2Error::InvalidPrefix),
743            _ => Ok(()),
744        }
745    }
746
747    async fn upload_file<F, B>(&mut self, info: &NewFileInfo, file: F) -> Result<models::B2FileInfo, B2Error>
748    where
749        F: Fn() -> B,
750        B: Into<reqwest::Body>,
751    {
752        self.check_prefix(&info.file_name)?;
753
754        self.do_upload(|builder| {
755            builder.body(file()).headers({
756                let mut headers = HeaderMap::new();
757                info.add_headers(&mut headers);
758                headers
759            })
760        })
761        .await
762    }
763
764    async fn upload_part<F, B>(&mut self, info: &NewPartInfo, body: F) -> Result<models::B2PartInfo, B2Error>
765    where
766        F: Fn() -> B,
767        B: Into<reqwest::Body>,
768    {
769        self.do_upload(|builder| {
770            builder.body(body()).headers({
771                let mut headers = HeaderMap::new();
772                info.add_headers(&mut headers);
773                headers
774            })
775        })
776        .await
777    }
778}
779
780impl UploadUrl {
781    /// Uploads a file to the B2 API using the URL acquired from [`Client::get_upload_url`].
782    ///
783    /// The `file` parameter is a closure that returns a value to be converted into a `reqwest::Body`.
784    /// This method may need to retry the request if the URL or authorization token has expired, therefore
785    /// it is recommended the body-creation closure be cheap to call multiple times.
786    pub async fn upload_file<F, B>(&mut self, info: &NewFileInfo, file: F) -> Result<models::B2FileInfo, B2Error>
787    where
788        F: Fn() -> B,
789        B: Into<reqwest::Body>,
790    {
791        self.0.upload_file(info, file).await
792    }
793
794    /// Uploads a file to the B2 API using the URL acquired from [`Client::get_upload_url`].
795    ///
796    /// The `bytes` parameter is a value to be converted into the body of the request.
797    pub async fn upload_file_bytes(
798        &mut self,
799        info: &NewFileInfo,
800        bytes: impl Into<bytes::Bytes>,
801    ) -> Result<models::B2FileInfo, B2Error> {
802        let bytes = bytes.into();
803        self.upload_file(info, || bytes.clone()).await
804    }
805}
806
807/// A large file that is being uploaded in parts.
808///
809/// Any [`UploadPartUrl`] can be used to upload a part of the file. Once all parts have been uploaded,
810/// call [`LargeFileUpload::finish`] to complete the upload.
811pub struct LargeFileUpload {
812    client: Client,
813    info: models::B2FileInfo,
814}
815
816impl LargeFileUpload {
817    pub fn info(&self) -> &models::B2FileInfo {
818        &self.info
819    }
820
821    /// Equivalent to [`Client::start_large_file`].
822    pub async fn start(
823        client: &Client,
824        bucket_id: Option<&str>,
825        info: &NewLargeFileInfo,
826    ) -> Result<LargeFileUpload, B2Error> {
827        client.start_large_file(bucket_id, info).await
828    }
829
830    /// Gets a URL for uploading a part of the large file.
831    ///
832    /// Equivalent to [`Client::get_upload_part_url`] with `self.info().file_id`.
833    pub async fn get_upload_part_url(&self) -> Result<UploadPartUrl, B2Error> {
834        self.client.get_upload_part_url(&self.info.file_id).await
835    }
836
837    /// Uploads a part of a large file to the given upload URL. Once all parts have been uploaded,
838    /// call [`LargeFileUpload::finish`] to complete the upload.
839    ///
840    /// Parts can be uploaded in parallel, so long as each url is only used for one part at a time.
841    ///
842    /// The provided `url` must have been acquired from the same `LargeFileUpload` instance, as they
843    /// are specific to the file being uploaded.
844    ///
845    /// The `part` parameter is a closure that returns a value to be converted into a [`reqwest::Body`], and
846    /// may need to be called multiple times if the request needs to be retried. Therefore, it is recommended
847    /// the body-creation closure be cheap to call multiple times.
848    ///
849    /// **NOTE**: This method does not check if the provided SHA1 hash is correct.
850    pub async fn upload_part<F, B>(
851        &self,
852        url: &mut UploadPartUrl,
853        info: &NewPartInfo,
854        part: F,
855    ) -> Result<models::B2PartInfo, B2Error>
856    where
857        F: Fn() -> B,
858        B: Into<reqwest::Body>,
859    {
860        if url.0.url.file_id.as_deref() != Some(self.info.file_id.as_ref()) {
861            return Err(B2Error::FileIdMismatch);
862        }
863
864        url.0.upload_part(info, part).await
865    }
866
867    /// Uploads a part of a large file to the given upload URL. Once all parts have been uploaded,
868    /// call [`LargeFileUpload::finish`] to complete the upload.
869    ///
870    /// Parts can be uploaded in parallel, so long as each url is only used for one part at a time.
871    ///
872    /// The provided `url` must have been acquired from the same `LargeFileUpload` instance, as they
873    /// are specific to the file being uploaded.
874    ///
875    /// The `bytes` parameter is a value to be converted into the body of the request.
876    ///
877    /// **NOTE**: This method does not check if the provided SHA1 hash is correct.
878    pub async fn upload_part_bytes(
879        &self,
880        url: &mut UploadPartUrl,
881        info: &NewPartInfo,
882        bytes: impl Into<bytes::Bytes>,
883    ) -> Result<models::B2PartInfo, B2Error> {
884        let bytes = bytes.into();
885        self.upload_part(url, info, || bytes.clone()).await
886    }
887
888    /// Converts the parts that have been uploaded into a single B2 file.
889    ///
890    /// It may be that the call to finish a large file succeeds, but you don't know it because the
891    /// request timed out, or the connection was broken. In that case, retrying will result in a
892    /// 400 Bad Request response because the file is already finished. If that happens, we recommend
893    /// calling `b2_get_file_info`/[`Client::get_file_info`] to see if the file is there. If the file is there,
894    /// you can count the upload as a success.
895    ///
896    /// `parts` must be sorted by `part_number`.
897    pub async fn finish(self, parts: &[models::B2PartInfo]) -> Result<models::B2FileInfo, B2Error> {
898        // check if parts are sorted by part_number
899        if parts.windows(2).any(|w| w[0].part_number >= w[1].part_number) {
900            return Err(B2Error::InvalidPartSorting);
901        }
902
903        #[derive(Serialize)]
904        #[serde(rename_all = "camelCase")]
905        struct B2FinishLargeFile<'a> {
906            file_id: &'a str,
907            part_sha1_array: Vec<&'a str>,
908        }
909
910        let body = &B2FinishLargeFile {
911            file_id: &self.info.file_id,
912            part_sha1_array: parts.iter().map(|part| &*part.content_sha1).collect(),
913        };
914
915        self.client
916            .run_request_with_reauth(|b2| async move {
917                let state = b2.state.read().await;
918
919                Client::json(b2.req(Method::POST, &state.auth, state.url("b2_finish_large_file")).json(&body))
920                    .await
921            })
922            .await
923    }
924
925    /// Cancels the upload of a large file, and deletes all of the parts that have been uploaded.
926    ///
927    /// This will return an error if there is no active upload with the given file ID.
928    pub async fn cancel(self) -> Result<models::B2CancelledFileInfo, B2Error> {
929        #[derive(Serialize)]
930        #[serde(rename_all = "camelCase")]
931        struct B2CancelLargeFile<'a> {
932            file_id: &'a str,
933        }
934
935        let body = &B2CancelLargeFile {
936            file_id: &self.info.file_id,
937        };
938
939        self.client
940            .run_request_with_reauth(|b2| async move {
941                let state = b2.state.read().await;
942
943                Client::json(b2.req(Method::POST, &state.auth, state.url("b2_cancel_large_file")).json(&body))
944                    .await
945            })
946            .await
947    }
948}
949
950#[cfg(test)]
951mod tests {
952    use tokio::io::AsyncReadExt;
953
954    use super::*;
955
956    #[test]
957    fn test_downloadby_serialization() {
958        let file_id = "4_zc1234567890abcdef1234f1";
959        let file_name = "example.txt";
960
961        let file_id_json = serde_json::to_string(&DownloadFileBy::FileId(file_id)).unwrap();
962        let file_name_json = serde_json::to_string(&DownloadFileBy::FileName(file_name)).unwrap();
963
964        assert_eq!(file_id_json, format!(r#"{{"fileId":"{}"}}"#, file_id));
965        assert_eq!(file_name_json, format!(r#"{{"fileName":"{}"}}"#, file_name));
966    }
967
968    #[tokio::test]
969    async fn test_auth() {
970        use sha1::{Digest, Sha1};
971
972        dotenv::dotenv().ok();
973
974        let app_id = std::env::var("APP_ID").expect("APP_ID not found in .env");
975        let app_key = std::env::var("APP_KEY").expect("APP_KEY not found in .env");
976
977        let client = ClientBuilder::new(&app_id, &app_key).authorize().await.unwrap();
978
979        // must be mut because `upload_file` requires exclusive access to the url
980        let mut upload = client.get_upload_url(None).await.unwrap();
981
982        let mut file = tokio::fs::OpenOptions::new().read(true).open("Cargo.toml").await.unwrap();
983        let meta = file.metadata().await.unwrap();
984
985        let mut bytes = Vec::with_capacity(meta.len() as usize);
986        file.read_to_end(&mut bytes).await.unwrap();
987
988        let bytes = bytes::Bytes::from(bytes); // bytes
989
990        let info = NewFileInfo::builder()
991            .file_name("testing/Cargo.toml".to_owned())
992            .content_length(meta.len())
993            .content_type("text/plain".to_owned())
994            .content_sha1(hex::encode(Sha1::new().chain_update(&bytes).finalize()))
995            .build();
996
997        let file_info = upload.upload_file_bytes(&info, bytes).await.unwrap();
998
999        println!("{:#?}", client.state.read().await.account);
1000
1001        let resp = client.download_file(DownloadFileBy::FileId(&file_info.file_id), None, None).await.unwrap();
1002
1003        let text = resp.resp.text().await.unwrap();
1004
1005        println!("OUTPUT: {text}");
1006    }
1007
1008    #[tokio::test]
1009    async fn test_large_file() {
1010        dotenv::dotenv().ok();
1011
1012        let app_id = std::env::var("APP_ID").expect("APP_ID not found in .env");
1013        let app_key = std::env::var("APP_KEY").expect("APP_KEY not found in .env");
1014
1015        let client = ClientBuilder::new(&app_id, &app_key).authorize().await.unwrap();
1016
1017        let info = NewFileFromPath::builder()
1018            .path(r#"./testing.webm"#.as_ref())
1019            .content_type("video/webm".to_owned())
1020            .file_name("testing.webm".to_owned())
1021            .build();
1022
1023        let file = client.upload_from_path(info, None, None).await.unwrap();
1024
1025        println!("{:?}", file);
1026    }
1027
1028    #[tokio::test]
1029    async fn test_small_file() {
1030        dotenv::dotenv().ok();
1031
1032        let app_id = std::env::var("APP_ID").expect("APP_ID not found in .env");
1033        let app_key = std::env::var("APP_KEY").expect("APP_KEY not found in .env");
1034
1035        let client = ClientBuilder::new(&app_id, &app_key).authorize().await.unwrap();
1036
1037        let info = NewFileFromPath::builder()
1038            .path(r#"Cargo.toml"#.as_ref())
1039            .content_type("test/plain".to_owned())
1040            .file_name("Cargo.toml".to_owned())
1041            .build();
1042
1043        let file = client.upload_from_path(info, None, None).await.unwrap();
1044
1045        println!("{:?}", file);
1046    }
1047
1048    #[tokio::test]
1049    async fn test_list_files() {
1050        dotenv::dotenv().ok();
1051
1052        let app_id = std::env::var("APP_ID").expect("APP_ID not found in .env");
1053        let app_key = std::env::var("APP_KEY").expect("APP_KEY not found in .env");
1054
1055        let client = ClientBuilder::new(&app_id, &app_key).authorize().await.unwrap();
1056
1057        let files = client.list_files(ListFiles::builder().all_versions(false).build()).await.unwrap();
1058
1059        println!("{:#?}", files);
1060    }
1061}