#![allow(clippy::redundant_pattern_matching)]
#[macro_use]
extern crate serde;
use std::{borrow::Cow, future::Future, num::NonZeroU32, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use headers::HeaderMapExt;
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use reqwest::Method;
macro_rules! h {
($headers:ident.$key:literal => $value:expr) => {
$headers.insert(
reqwest::header::HeaderName::from_static($key), reqwest::header::HeaderValue::from_str($value).expect("Unable to use header value"),
);
};
}
mod types;
pub mod error;
pub mod models;
pub use types::{sse, DownloadFileBy, FileRetention, ListFiles, NewFileInfo, NewLargeFileInfo, NewPartInfo};
pub mod builders {
pub use crate::types::{
FileRetentionBuilder, ListFilesBuilder, NewFileInfoBuilder, NewLargeFileInfoBuilder, NewPartInfoBuilder,
};
}
#[cfg(feature = "pool")]
pub mod pool;
#[cfg(feature = "fs")]
mod fs;
pub use error::B2Error;
pub use fs::NewFileFromPath;
struct ClientState {
config: ClientBuilder,
account: crate::models::B2Authorized,
auth: HeaderValue,
}
impl ClientState {
fn check_capability(&self, capability: &'static str) -> Result<(), B2Error> {
if !self.account.allowed(capability) {
return Err(B2Error::MissingCapability(capability));
}
Ok(())
}
fn url(&self, path: &str) -> String {
format!("{}/b2api/v3/{}", self.account.api.storage.api_url, path)
}
#[inline]
fn bucket_id<'a>(&'a self, bucket_id: Option<&'a str>) -> Result<&'a str, B2Error> {
#[allow(clippy::unnecessary_lazy_evaluations)]
bucket_id.or_else(|| self.account.api.storage.bucket_id.as_deref()).ok_or(B2Error::MissingBucketId)
}
fn check_prefix(&self, name: Option<&str>) -> Result<(), B2Error> {
match (name, self.account.api.storage.name_prefix.as_ref()) {
(Some(name), Some(prefix)) if !name.starts_with(prefix as &str) => Err(B2Error::InvalidPrefix),
_ => Ok(()),
}
}
}
#[derive(Clone)]
pub struct Client {
state: Arc<RwLock<ClientState>>,
client: reqwest::Client,
}
#[derive(Clone)]
pub struct ClientBuilder {
auth: HeaderValue,
ua: Option<Cow<'static, str>>,
max_retries: u8,
retry_delay: Duration,
}
pub struct DownloadedFile {
pub resp: reqwest::Response,
pub info: models::B2FileHeaders,
}
impl ClientBuilder {
pub fn new(key_id: &str, app_key: &str) -> ClientBuilder {
ClientBuilder {
auth: models::create_auth_header(key_id, app_key),
ua: None,
max_retries: 5,
retry_delay: Duration::from_secs(1),
}
}
#[inline]
pub fn user_agent(mut self, ua: impl Into<Cow<'static, str>>) -> Self {
self.ua = Some(ua.into());
self
}
#[inline]
pub fn max_retries(mut self, max_retries: u8) -> Self {
self.max_retries = max_retries;
self
}
pub fn retry_delay(mut self, delay: Duration) -> Self {
self.retry_delay = delay;
self
}
pub async fn authorize(self) -> Result<Client, B2Error> {
let mut builder = reqwest::ClientBuilder::new().https_only(true);
if let Some(ref ua) = self.ua {
builder = builder.user_agent(ua.as_ref());
}
let client = builder.build()?;
Ok(Client {
state: Arc::new(RwLock::new(Client::do_auth(&client, self).await?)),
client,
})
}
}
struct DummyValue;
impl<'de> serde::Deserialize<'de> for DummyValue {
fn deserialize<D>(_: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
Ok(DummyValue)
}
}
impl Client {
fn req(&self, method: Method, auth: &HeaderValue, url: impl AsRef<str>) -> reqwest::RequestBuilder {
self.client.request(method, url.as_ref()).header(AUTHORIZATION, auth)
}
async fn json<T>(builder: reqwest::RequestBuilder) -> Result<T, B2Error>
where
T: serde::de::DeserializeOwned,
{
let resp = builder.send().await?;
if !resp.status().is_success() {
return Err(B2Error::B2ErrorMessage(resp.json().await?));
}
Ok(serde_json::from_str(&resp.text().await?)?)
}
async fn do_auth(client: &reqwest::Client, config: ClientBuilder) -> Result<ClientState, B2Error> {
use failsafe::{futures::CircuitBreaker, Config, Error as FailsafeError};
let cb = Config::new().build();
let mut attempts = 0;
'try_auth: loop {
let do_auth_inner = Client::json::<models::B2Authorized>(
client
.get("https://api.backblazeb2.com/b2api/v3/b2_authorize_account")
.header(AUTHORIZATION, &config.auth),
);
return match cb.call(do_auth_inner).await {
Ok(account) => Ok(ClientState {
config,
auth: HeaderValue::from_str(&account.auth_token)
.expect("Unable to use auth token in header value"),
account,
}),
Err(FailsafeError::Rejected) => {
attempts += 1;
if attempts >= config.max_retries {
return Err(B2Error::Unauthorized);
}
tokio::time::sleep(config.retry_delay).await;
continue 'try_auth;
}
Err(FailsafeError::Inner(e)) => Err(e),
};
}
}
async fn reauthorize(&self) -> Result<(), B2Error> {
let new_state = Self::do_auth(&self.client, self.state.read().await.config.clone()).await?;
*self.state.write().await = new_state;
Ok(())
}
async fn run_request_with_reauth<'a, F, R, T>(&self, f: F) -> Result<T, B2Error>
where
F: Fn(Self) -> R + 'a,
R: Future<Output = Result<T, B2Error>> + 'a,
{
let mut retried = false;
loop {
return match f(self.clone()).await {
Ok(t) => Ok(t),
Err(B2Error::B2ErrorMessage(e)) if !retried && e.status == 401 => {
Box::pin(self.reauthorize()).await?;
retried = true;
continue;
}
Err(e) => Err(e),
};
}
}
pub async fn get_file_info(&self, file_id: &str) -> Result<models::B2FileInfo, B2Error> {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct B2GetFileInfo<'a> {
file_id: &'a str,
}
self.run_request_with_reauth(|b2| async move {
let state = b2.state.read().await;
state.check_capability("readFiles")?;
Client::json(b2.req(Method::GET, &state.auth, "b2_get_file_info").query(&B2GetFileInfo { file_id }))
.await
})
.await
}
pub async fn download_file(
&self,
file: DownloadFileBy<'_>,
range: Option<headers::Range>,
encryption: Option<sse::ServerSideEncryptionCustomer>,
) -> Result<DownloadedFile, B2Error> {
let (range, encryption) = (&range, &encryption);
#[derive(Serialize)]
struct DownloadFileBy2<'a> {
#[serde(flatten)]
file: DownloadFileBy<'a>,
}
self.run_request_with_reauth(|b2| async move {
let state = b2.state.read().await;
state.check_capability("readFiles")?;
let resp = b2
.req(Method::GET, &state.auth, {
state.url(match file {
DownloadFileBy::FileId(_) => "b2_download_file_by_id",
DownloadFileBy::FileName(_) => "b2_download_file_by_name",
})
})
.headers({
let mut headers = HeaderMap::new();
if let Some(ref range) = range {
headers.typed_insert(range.clone());
}
if let Some(ref encryption) = encryption {
encryption.add_headers(&mut headers);
}
headers
})
.query(&DownloadFileBy2 { file })
.send()
.await?;
Ok(DownloadedFile {
info: models::B2FileHeaders::parse(resp.headers())?,
resp,
})
})
.await
}
pub async fn list_files(&self, mut args: ListFiles<'_>) -> Result<models::B2FileInfoList, B2Error> {
if !args.all_versions {
args.start_file_id = None; }
self.run_request_with_reauth(move |b2| async move {
let state = b2.state.read().await;
state.check_capability("listFiles")?;
let mut args = ListFiles { ..args };
args.bucket_id = Some(state.bucket_id(args.bucket_id)?);
let path = if args.all_versions { "b2_list_file_versions" } else { "b2_list_file_names" };
Client::json(b2.req(Method::GET, &state.auth, state.url(path)).query(&args)).await
})
.await
}
pub async fn hide_file(
&self,
bucket_id: Option<&str>,
file_name: &str,
) -> Result<models::B2FileInfo, B2Error> {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct B2HideFile<'a> {
#[serde(skip_serializing_if = "Option::is_none")]
bucket_id: Option<&'a str>,
file_name: &'a str,
}
self.run_request_with_reauth(|b2| async move {
let state = b2.state.read().await;
state.check_capability("writeFiles")?;
state.check_prefix(Some(file_name))?;
let body = B2HideFile {
bucket_id: state.bucket_id(bucket_id).ok(),
file_name,
};
Self::json(b2.req(Method::POST, &state.auth, state.url("b2_hide_file")).json(&body)).await
})
.await
}
pub async fn delete_file(
&self,
file_id: &str,
file_name: &str,
bypass_governance: bool,
) -> Result<(), B2Error> {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct B2DeleteFile<'a> {
file_id: &'a str,
file_name: &'a str,
bypass_governance: bool,
}
self.run_request_with_reauth(|b2| async move {
let state = b2.state.read().await;
state.check_capability("deleteFiles")?;
state.check_prefix(Some(file_name))?;
if bypass_governance {
state.check_capability("bypassGovernance")?;
}
let body = B2DeleteFile {
file_id,
file_name,
bypass_governance,
};
Self::json(b2.req(Method::POST, &state.auth, state.url("b2_delete_file_version")).json(&body))
.await
.map(|_: DummyValue| ())
})
.await
}
pub async fn update_legal_hold(
&self,
file_name: &str,
file_id: &str,
legal_hold: bool,
) -> Result<(), B2Error> {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct B2UpdateLegalHold<'a> {
file_name: &'a str,
file_id: &'a str,
legal_hold: &'a str,
}
self.run_request_with_reauth(|b2| async move {
let state = b2.state.read().await;
state.check_capability("writeFileLegalHolds")?;
state.check_prefix(Some(file_name))?;
let body = B2UpdateLegalHold {
file_name,
file_id,
legal_hold: if legal_hold { "on" } else { "off" },
};
Self::json(b2.req(Method::POST, &state.auth, state.url("b2_update_legal_hold")).json(&body))
.await
.map(|_: DummyValue| ())
})
.await
}
pub async fn update_file_retention(
&self,
file_name: &str,
file_id: &str,
retention: FileRetention,
) -> Result<(), B2Error> {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct B2UpdateFileRetention<'a> {
file_name: &'a str,
file_id: &'a str,
#[serde(flatten)]
retention: FileRetention,
bypass_governance: bool,
}
let body = &B2UpdateFileRetention {
file_name,
file_id,
bypass_governance: retention.bypass_governance,
retention,
};
self.run_request_with_reauth(|b2| async move {
let state = b2.state.read().await;
state.check_capability("writeFileRetentions")?;
state.check_prefix(Some(file_name))?;
Self::json(b2.req(Method::POST, &state.auth, state.url("b2_update_file_retention")).json(body))
.await
.map(|_: DummyValue| ())
})
.await
}
async fn get_b2_upload_url(
&self,
bucket_id: Option<&str>,
file_id: Option<&str>,
) -> Result<(Option<Arc<str>>, models::B2UploadUrl), B2Error> {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct B2GetUploadUrlQuery<'a> {
#[serde(skip_serializing_if = "Option::is_none")]
bucket_id: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
file_id: Option<&'a str>,
}
self.run_request_with_reauth(|b2| async move {
let state = b2.state.read().await;
state.check_capability("writeFiles")?;
let mut query = B2GetUploadUrlQuery { bucket_id, file_id };
if query.file_id.is_some() {
query.bucket_id = None;
} else if query.bucket_id.is_some() {
query.file_id = None;
} else {
query.bucket_id = Some(state.bucket_id(query.bucket_id)?);
}
let path = state.url(if file_id.is_some() { "b2_get_upload_part_url" } else { "b2_get_upload_url" });
Ok((
state.account.api.storage.name_prefix.clone(),
Self::json::<models::B2UploadUrl>(b2.req(Method::GET, &state.auth, path).query(&query)).await?,
))
})
.await
}
async fn get_raw_upload_url(
&self,
bucket_id: Option<&str>,
file_id: Option<&str>,
) -> Result<RawUploadUrl, B2Error> {
let (prefix, url) = self.get_b2_upload_url(bucket_id, file_id).await?;
Ok(RawUploadUrl {
client: self.clone(),
auth: url.header(),
url,
prefix,
})
}
pub async fn get_upload_url(&self, bucket_id: Option<&str>) -> Result<UploadUrl, B2Error> {
Ok(UploadUrl(self.get_raw_upload_url(bucket_id, None).await?))
}
pub async fn get_upload_part_url(&self, file_id: &str) -> Result<UploadPartUrl, B2Error> {
Ok(UploadPartUrl(self.get_raw_upload_url(None, Some(file_id)).await?))
}
pub async fn start_large_file(
&self,
bucket_id: Option<&str>,
info: &NewLargeFileInfo,
) -> Result<LargeFileUpload, B2Error> {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct B2StartLargeFile<'a> {
bucket_id: &'a str,
file_name: &'a str,
content_type: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
file_retention: Option<&'a FileRetention>,
#[serde(skip_serializing_if = "Option::is_none")]
legal_hold: Option<&'a str>,
#[serde(skip_serializing_if = "sse::ServerSideEncryption::is_default")]
encryption: &'a sse::ServerSideEncryption,
}
let info = self
.run_request_with_reauth(|b2| async move {
let state = b2.state.read().await;
state.check_capability("writeFiles")?;
state.check_prefix(Some(&info.file_name))?;
let body = B2StartLargeFile {
bucket_id: state.bucket_id(bucket_id)?,
file_name: &info.file_name,
content_type: info.content_type.as_deref(),
file_retention: info.retention.as_ref(),
legal_hold: info.legal_hold.map(|lh| if lh { "on" } else { "off" }),
encryption: &info.encryption,
};
Client::json::<models::B2FileInfo>(
b2.req(Method::POST, &state.auth, state.url("b2_start_large_file")).json(&body),
)
.await
})
.await?;
Ok(LargeFileUpload {
client: self.clone(),
info,
})
}
}
struct RawUploadUrl {
client: Client,
url: models::B2UploadUrl,
auth: HeaderValue,
prefix: Option<Arc<str>>,
}
#[repr(transparent)]
pub struct UploadUrl(RawUploadUrl);
#[repr(transparent)]
pub struct UploadPartUrl(RawUploadUrl);
impl RawUploadUrl {
async fn do_upload<F, T>(&mut self, f: F) -> Result<T, B2Error>
where
F: Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder,
T: serde::de::DeserializeOwned,
{
loop {
let res = Client::json(f(self.client.req(Method::POST, &self.auth, &self.url.upload_url)));
return match res.await {
Err(B2Error::B2ErrorMessage(e)) if e.status == 401 => {
let get_new_url =
self.client.get_b2_upload_url(self.url.bucket_id.as_deref(), self.url.file_id.as_deref());
let (prefix, url) = Box::pin(get_new_url).await?;
self.auth = url.header();
self.url = url;
self.prefix = prefix;
continue;
}
res => res,
};
}
}
fn check_prefix(&self, file_name: &str) -> Result<(), B2Error> {
match self.prefix {
Some(ref prefix) if !file_name.starts_with(prefix.as_ref()) => Err(B2Error::InvalidPrefix),
_ => Ok(()),
}
}
async fn upload_file<F, B>(&mut self, info: &NewFileInfo, file: F) -> Result<models::B2FileInfo, B2Error>
where
F: Fn() -> B,
B: Into<reqwest::Body>,
{
self.check_prefix(&info.file_name)?;
self.do_upload(|builder| {
builder.body(file()).headers({
let mut headers = HeaderMap::new();
info.add_headers(&mut headers);
headers
})
})
.await
}
async fn upload_part<F, B>(&mut self, info: &NewPartInfo, body: F) -> Result<models::B2PartInfo, B2Error>
where
F: Fn() -> B,
B: Into<reqwest::Body>,
{
self.do_upload(|builder| {
builder.body(body()).headers({
let mut headers = HeaderMap::new();
info.add_headers(&mut headers);
headers
})
})
.await
}
}
impl UploadUrl {
pub async fn upload_file<F, B>(&mut self, info: &NewFileInfo, file: F) -> Result<models::B2FileInfo, B2Error>
where
F: Fn() -> B,
B: Into<reqwest::Body>,
{
self.0.upload_file(info, file).await
}
pub async fn upload_file_bytes(
&mut self,
info: &NewFileInfo,
bytes: impl Into<bytes::Bytes>,
) -> Result<models::B2FileInfo, B2Error> {
let bytes = bytes.into();
self.upload_file(info, || bytes.clone()).await
}
}
pub struct LargeFileUpload {
client: Client,
info: models::B2FileInfo,
}
impl LargeFileUpload {
pub fn info(&self) -> &models::B2FileInfo {
&self.info
}
pub async fn start(
client: &Client,
bucket_id: Option<&str>,
info: &NewLargeFileInfo,
) -> Result<LargeFileUpload, B2Error> {
client.start_large_file(bucket_id, info).await
}
pub async fn get_upload_part_url(&self) -> Result<UploadPartUrl, B2Error> {
self.client.get_upload_part_url(&self.info.file_id).await
}
pub async fn upload_part<F, B>(
&self,
url: &mut UploadPartUrl,
info: &NewPartInfo,
part: F,
) -> Result<models::B2PartInfo, B2Error>
where
F: Fn() -> B,
B: Into<reqwest::Body>,
{
if url.0.url.file_id.as_deref() != Some(self.info.file_id.as_ref()) {
return Err(B2Error::FileIdMismatch);
}
url.0.upload_part(info, part).await
}
pub async fn upload_part_bytes(
&self,
url: &mut UploadPartUrl,
info: &NewPartInfo,
bytes: impl Into<bytes::Bytes>,
) -> Result<models::B2PartInfo, B2Error> {
let bytes = bytes.into();
self.upload_part(url, info, || bytes.clone()).await
}
pub async fn finish(self, parts: &[models::B2PartInfo]) -> Result<models::B2FileInfo, B2Error> {
if parts.windows(2).any(|w| w[0].part_number >= w[1].part_number) {
return Err(B2Error::InvalidPartSorting);
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct B2FinishLargeFile<'a> {
file_id: &'a str,
part_sha1_array: Vec<&'a str>,
}
let body = &B2FinishLargeFile {
file_id: &self.info.file_id,
part_sha1_array: parts.iter().map(|part| &*part.content_sha1).collect(),
};
self.client
.run_request_with_reauth(|b2| async move {
let state = b2.state.read().await;
Client::json(b2.req(Method::POST, &state.auth, state.url("b2_finish_large_file")).json(&body))
.await
})
.await
}
pub async fn cancel(self) -> Result<models::B2CancelledFileInfo, B2Error> {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct B2CancelLargeFile<'a> {
file_id: &'a str,
}
let body = &B2CancelLargeFile {
file_id: &self.info.file_id,
};
self.client
.run_request_with_reauth(|b2| async move {
let state = b2.state.read().await;
Client::json(b2.req(Method::POST, &state.auth, state.url("b2_cancel_large_file")).json(&body))
.await
})
.await
}
}
#[cfg(test)]
mod tests {
use tokio::io::AsyncReadExt;
use super::*;
#[test]
fn test_downloadby_serialization() {
let file_id = "4_zc1234567890abcdef1234f1";
let file_name = "example.txt";
let file_id_json = serde_json::to_string(&DownloadFileBy::FileId(file_id)).unwrap();
let file_name_json = serde_json::to_string(&DownloadFileBy::FileName(file_name)).unwrap();
assert_eq!(file_id_json, format!(r#"{{"fileId":"{}"}}"#, file_id));
assert_eq!(file_name_json, format!(r#"{{"fileName":"{}"}}"#, file_name));
}
#[tokio::test]
async fn test_auth() {
use sha1::{Digest, Sha1};
dotenv::dotenv().ok();
let app_id = std::env::var("APP_ID").expect("APP_ID not found in .env");
let app_key = std::env::var("APP_KEY").expect("APP_KEY not found in .env");
let client = ClientBuilder::new(&app_id, &app_key).authorize().await.unwrap();
let mut upload = client.get_upload_url(None).await.unwrap();
let mut file = tokio::fs::OpenOptions::new().read(true).open("Cargo.toml").await.unwrap();
let meta = file.metadata().await.unwrap();
let mut bytes = Vec::with_capacity(meta.len() as usize);
file.read_to_end(&mut bytes).await.unwrap();
let bytes = bytes::Bytes::from(bytes);
let info = NewFileInfo::builder()
.file_name("testing/Cargo.toml".to_owned())
.content_length(meta.len())
.content_type("text/plain".to_owned())
.content_sha1(hex::encode(Sha1::new().chain_update(&bytes).finalize()))
.build();
let file_info = upload.upload_file_bytes(&info, bytes).await.unwrap();
println!("{:#?}", client.state.read().await.account);
let resp = client.download_file(DownloadFileBy::FileId(&file_info.file_id), None, None).await.unwrap();
let text = resp.resp.text().await.unwrap();
println!("OUTPUT: {text}");
}
#[tokio::test]
async fn test_large_file() {
dotenv::dotenv().ok();
let app_id = std::env::var("APP_ID").expect("APP_ID not found in .env");
let app_key = std::env::var("APP_KEY").expect("APP_KEY not found in .env");
let client = ClientBuilder::new(&app_id, &app_key).authorize().await.unwrap();
let info = NewFileFromPath::builder()
.path(r#"./testing.webm"#.as_ref())
.content_type("video/webm".to_owned())
.file_name("testing.webm".to_owned())
.build();
let file = client.upload_from_path(info, None, None).await.unwrap();
println!("{:?}", file);
}
#[tokio::test]
async fn test_small_file() {
dotenv::dotenv().ok();
let app_id = std::env::var("APP_ID").expect("APP_ID not found in .env");
let app_key = std::env::var("APP_KEY").expect("APP_KEY not found in .env");
let client = ClientBuilder::new(&app_id, &app_key).authorize().await.unwrap();
let info = NewFileFromPath::builder()
.path(r#"Cargo.toml"#.as_ref())
.content_type("test/plain".to_owned())
.file_name("Cargo.toml".to_owned())
.build();
let file = client.upload_from_path(info, None, None).await.unwrap();
println!("{:?}", file);
}
#[tokio::test]
async fn test_list_files() {
dotenv::dotenv().ok();
let app_id = std::env::var("APP_ID").expect("APP_ID not found in .env");
let app_key = std::env::var("APP_KEY").expect("APP_KEY not found in .env");
let client = ClientBuilder::new(&app_id, &app_key).authorize().await.unwrap();
let files = client.list_files(ListFiles::builder().all_versions(false).build()).await.unwrap();
println!("{:#?}", files);
}
}