use crate::error::{CustomError, Result};
use crate::Video;
use futures::{Stream, StreamExt, TryStreamExt};
use reqwest::header::{AUTHORIZATION, CONTENT_LENGTH};
use reqwest::{header, Body};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::policies::ExponentialBackoff;
use reqwest_retry::RetryTransientMiddleware;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;
pub struct Cos {
client: ClientWithMiddleware,
raw_client: reqwest::Client,
bucket: Bucket,
upload_id: String,
}
impl Cos {
pub async fn form_post(bucket: Bucket) -> Result<Cos> {
let raw_client = reqwest::Client::builder()
.user_agent("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/63.0.3239.108")
.timeout(Duration::new(300, 0))
.build()
.unwrap();
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(5);
let client = ClientBuilder::new(raw_client.clone())
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
let upload_id = get_uploadid(&client, &bucket).await?;
Ok(Cos {
client,
raw_client,
bucket,
upload_id,
})
}
pub async fn upload_stream<F, B>(
&self,
stream: F,
total_size: u64,
limit: usize,
enable_internal: bool,
) -> Result<Vec<(usize, String)>>
where
F: Stream<Item = Result<(B, usize)>>,
B: Into<Body> + Clone, {
let chunk_size = 10485760;
let _chunks_num = (total_size as f64 / chunk_size as f64).ceil() as u32; let client = &self.raw_client;
let temp;
let url = if enable_internal {
temp = self
.bucket
.url
.replace("cos.accelerate", "cos-internal.ap-shanghai");
&temp
} else {
&self.bucket.url
};
let upload_id = &self.upload_id;
let stream = stream
.enumerate()
.map(move |(i, chunk)| async move {
let (chunk, len) = chunk?;
let params = Protocol {
upload_id,
part_number: (i + 1) as u32,
};
let response = super::retryable::retry(|| async {
let response = client
.put(url)
.header(AUTHORIZATION, &self.bucket.put_auth)
.header(CONTENT_LENGTH, len)
.query(¶ms)
.body(chunk.clone())
.send()
.await?;
response.error_for_status_ref()?;
Ok::<_, reqwest::Error>(response)
})
.await?;
let headers = response.headers();
let etag = match headers.get("Etag") {
None => {
return Err(CustomError::Custom(format!(
"upload chunk {i} error: {}",
response.text().await?
)))
}
Some(etag) => etag
.to_str()
.map_err(|e| CustomError::Custom(e.to_string()))?
.to_string(),
};
Ok::<_, CustomError>((i + 1, etag))
})
.buffer_unordered(limit);
let mut parts = Vec::new();
tokio::pin!(stream);
while let Some((part, etag)) = stream.try_next().await? {
parts.push((part, etag));
}
Ok(parts)
}
pub async fn merge_files(&self, mut parts: Vec<(usize, String)>) -> Result<Video> {
parts.sort_unstable_by_key(|annotate| annotate.0);
let complete_multipart_upload = parts
.iter()
.map(|(number, etag)| {
format!(
r#"
<Part>
<PartNumber>{number}</PartNumber>
<ETag>{etag}</ETag>
</Part>
"#
)
})
.reduce(|accum, item| accum + &item)
.unwrap();
let xml = format!(
r#"
<CompleteMultipartUpload>
{complete_multipart_upload}
</CompleteMultipartUpload>
"#
);
let mut headers = header::HeaderMap::new();
headers.insert(
"Authorization",
header::HeaderValue::from_str(&self.bucket.post_auth)?,
);
let response = self
.client
.post(&self.bucket.url)
.query(&[("uploadId", &self.upload_id)])
.body(xml)
.headers(headers)
.send()
.await?;
if !response.status().is_success() {
return Err(CustomError::Custom(response.text().await?));
}
let mut headers = header::HeaderMap::new();
headers.insert(
"X-Upos-Fetch-Source",
header::HeaderValue::from_str(
self.bucket
.fetch_headers
.get("X-Upos-Fetch-Source")
.unwrap(),
)?,
);
headers.insert(
"X-Upos-Auth",
header::HeaderValue::from_str(self.bucket.fetch_headers.get("X-Upos-Auth").unwrap())?,
);
headers.insert(
"Fetch-Header-Authorization",
header::HeaderValue::from_str(
self.bucket
.fetch_headers
.get("Fetch-Header-Authorization")
.unwrap(),
)?,
);
let res = self
.client
.post(format!("https:{}", self.bucket.fetch_url))
.headers(headers)
.send()
.await?;
if !res.status().is_success() {
return Err(CustomError::Custom(res.text().await?));
}
Ok(Video {
title: None,
filename: Path::new(&self.bucket.bili_filename)
.file_stem()
.unwrap()
.to_str()
.unwrap()
.into(),
desc: "".into(),
})
}
}
async fn get_uploadid(client: &ClientWithMiddleware, bucket: &Bucket) -> Result<String> {
let res = client
.post(format!("{}?uploads&output=json", bucket.url))
.header(reqwest::header::AUTHORIZATION, &bucket.post_auth)
.send()
.await?
.text()
.await?;
let start = res
.find(r"<UploadId>")
.ok_or_else(|| CustomError::Custom(res.clone()))?
+ "<UploadId>".len();
let end = res
.rfind(r"</UploadId>")
.ok_or_else(|| CustomError::Custom(res.clone()))?;
let uploadid = &res[start..end];
Ok(uploadid.to_string())
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Bucket {
#[serde(rename = "OK")]
ok: u8,
bili_filename: String,
biz_id: usize,
fetch_headers: HashMap<String, String>,
fetch_url: String,
fetch_urls: Vec<String>,
post_auth: String,
put_auth: String,
url: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Protocol<'a> {
upload_id: &'a str,
part_number: u32,
}