use crate::error::{CustomError, Result};
use crate::Video;
use base64::URL_SAFE;
use futures::{Stream, StreamExt, TryStreamExt};
use reqwest::header::{HeaderMap, HeaderName, CONTENT_LENGTH};
use reqwest::{header, Body};
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
use reqwest_retry::policies::ExponentialBackoff;
use reqwest_retry::RetryTransientMiddleware;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
pub struct Kodo {
client: ClientWithMiddleware,
raw_client: reqwest::Client,
bucket: Bucket,
url: String,
}
impl Kodo {
pub async fn from(bucket: Bucket) -> Result<Self> {
let mut headers = header::HeaderMap::new();
headers.insert(
"Authorization",
format!("UpToken {}", bucket.uptoken).parse()?,
);
let raw_client = reqwest::Client::builder()
.user_agent("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 Chrome/63.0.3239.108")
.default_headers(headers)
.timeout(Duration::new(60, 0))
.build()
.unwrap();
let retry_policy = ExponentialBackoff::builder().build_with_max_retries(3);
let client = ClientBuilder::new(raw_client.clone())
.with(RetryTransientMiddleware::new_with_policy(retry_policy))
.build();
let url = format!("https:{}/mkblk", bucket.endpoint); Ok(Kodo {
client,
raw_client,
bucket,
url,
})
}
pub async fn upload_stream<F, B>(
self,
stream: F,
total_size: u64,
limit: usize,
) -> Result<Video>
where
F: Stream<Item = Result<(B, usize)>>,
B: Into<Body> + Clone,
{
let _chunk_size = 4194304;
let mut parts = Vec::new();
let client = &self.raw_client;
let url = &self.url;
let stream = stream
.enumerate()
.map(|(i, chunk)| async move {
let (chunk, len) = chunk?;
let ctx: serde_json::Value = super::retryable::retry(|| async {
let url = format!("{url}/{len}");
let response = client
.post(url)
.header(CONTENT_LENGTH, len)
.body(chunk.clone())
.send()
.await?;
response.error_for_status_ref()?;
let res = response.json().await?;
Ok::<_, CustomError>(res)
})
.await?;
Ok::<_, CustomError>((
Ctx {
index: i,
ctx: ctx["ctx"].as_str().unwrap_or_default().into(),
},
len,
))
})
.buffer_unordered(limit);
tokio::pin!(stream);
while let Some((part, _size)) = stream.try_next().await? {
parts.push(part);
}
parts.sort_by_key(|x| x.index);
let key = base64::encode_config(self.bucket.key, URL_SAFE);
self.client
.post(format!(
"https:{}/mkfile/{total_size}/key/{key}",
self.bucket.endpoint,
))
.body(
parts
.iter()
.map(|x| &x.ctx[..])
.collect::<Vec<_>>()
.join(","),
)
.send()
.await?;
let mut headers = HeaderMap::new();
for (key, value) in self.bucket.fetch_headers {
headers.insert(HeaderName::from_str(&key)?, value.parse()?);
}
let result: serde_json::Value = self
.client
.post(format!("https:{}", self.bucket.fetch_url))
.headers(headers)
.send()
.await?
.json()
.await?;
Ok(match result.get("OK") {
Some(x) if x.as_i64().ok_or("kodo fetch err")? != 1 => {
return Err(CustomError::Custom(result.to_string()));
}
_ => Video {
title: None,
filename: self.bucket.bili_filename,
desc: "".into(),
},
})
}
}
#[derive(Serialize, Deserialize, Debug)]
struct Ctx {
index: usize,
ctx: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Bucket {
bili_filename: String,
fetch_url: String,
endpoint: String,
uptoken: String,
key: String,
fetch_headers: HashMap<String, String>,
}