use crate::error::{InnerErrorCode, MeowError};
use crate::transfer_task::TransferTask;
use async_trait::async_trait;
pub use crate::download_trait::{BreakpointDownload, DownloadHeadCtx, DownloadRangeGetCtx};
pub use crate::upload_trait::{BreakpointUpload, UploadChunkCtx, UploadPrepareCtx};
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
use reqwest::multipart;
use reqwest::Method;
#[derive(Debug, Clone, Default)]
pub struct UploadResumeInfo {
pub completed_file_id: Option<String>,
pub next_byte: Option<u64>,
}
#[derive(Debug)]
pub enum UploadBody {
Multipart(multipart::Form),
Binary(Vec<u8>),
}
#[derive(Debug)]
pub struct UploadRequest {
pub method: Method,
pub url: String,
pub headers: HeaderMap,
pub body: UploadBody,
}
impl UploadRequest {
pub fn from_task(task: &TransferTask, body: UploadBody) -> Self {
Self {
method: task.method(),
url: task.url().to_string(),
headers: task.headers().clone(),
body,
}
}
}
fn parse_default_upload_response(body: &str) -> Result<UploadResumeInfo, MeowError> {
if body.trim().is_empty() {
crate::meow_flow_log!(
"upload_protocol",
"empty upload response body, fallback default"
);
return Ok(UploadResumeInfo::default());
}
let v: DefaultUploadResp = serde_json::from_str(body).map_err(|e| {
crate::meow_flow_log!(
"upload_protocol",
"upload response parse failed: body_len={} err={}",
body.len(),
e
);
MeowError::from_code(
InnerErrorCode::ResponseParseError,
format!("upload response json: {e}, body: {body}"),
)
})?;
crate::meow_flow_log!(
"upload_protocol",
"upload response parsed: file_id_present={} next_byte={:?}",
v.file_id.is_some(),
v.next_byte
);
Ok(UploadResumeInfo {
completed_file_id: v.file_id,
next_byte: v.next_byte.map(|n| if n < 0 { 0u64 } else { n as u64 }),
})
}
async fn send_upload_request(
client: &reqwest::Client,
req: UploadRequest,
) -> Result<String, MeowError> {
let mut builder = client.request(req.method, req.url).headers(req.headers);
builder = match req.body {
UploadBody::Multipart(form) => builder.multipart(form),
UploadBody::Binary(bytes) => builder.body(bytes),
};
let resp = builder.send().await.map_err(map_reqwest)?;
let status = resp.status();
let body = resp.text().await.map_err(map_reqwest)?;
if !status.is_success() {
return Err(MeowError::from_code(
InnerErrorCode::ResponseStatusError,
format!("upload HTTP {status}: {body}"),
));
}
Ok(body)
}
fn map_reqwest(e: reqwest::Error) -> MeowError {
MeowError::from_source(InnerErrorCode::HttpError, e.to_string(), e)
}
#[derive(Debug, Clone)]
pub struct DefaultStyleUpload {
pub category: String,
}
impl Default for DefaultStyleUpload {
fn default() -> Self {
Self {
category: String::new(),
}
}
}
const KEY_FILE_MD5: &str = "fileMd5";
const KEY_FILE_NAME: &str = "fileName";
const KEY_CATEGORY: &str = "category";
const KEY_TOTAL_SIZE: &str = "totalSize";
const KEY_OFFSET: &str = "offset";
const KEY_PART_SIZE: &str = "partSize";
const KEY_FILE: &str = "file";
const KEY_UPLOAD_CHUNK_DATA: &str = "upload_chunk_data";
#[derive(serde::Deserialize)]
struct DefaultUploadResp {
#[serde(rename = "fileId")]
file_id: Option<String>,
#[serde(rename = "nextByte")]
next_byte: Option<i64>,
}
#[async_trait]
impl BreakpointUpload for DefaultStyleUpload {
async fn prepare(&self, ctx: UploadPrepareCtx<'_>) -> Result<UploadResumeInfo, MeowError> {
let form = multipart::Form::new()
.text(KEY_FILE_MD5, ctx.task.file_sign().to_string())
.text(KEY_FILE_NAME, ctx.task.file_name().to_string())
.text(KEY_CATEGORY, self.category.clone())
.text(KEY_TOTAL_SIZE, ctx.task.total_size().to_string());
let req = UploadRequest::from_task(ctx.task, UploadBody::Multipart(form));
let body = send_upload_request(ctx.client, req).await?;
parse_default_upload_response(&body)
}
async fn upload_chunk(&self, ctx: UploadChunkCtx<'_>) -> Result<UploadResumeInfo, MeowError> {
let chunk_len = ctx.chunk.len();
let body = reqwest::Body::from(ctx.chunk.clone());
let part = multipart::Part::stream_with_length(body, chunk_len as u64)
.file_name(KEY_UPLOAD_CHUNK_DATA)
.mime_str("application/octet-stream")
.map_err(|e| MeowError::from_code(InnerErrorCode::HttpError, e.to_string()))?;
let form = multipart::Form::new()
.part(KEY_FILE, part)
.text(KEY_FILE_MD5, ctx.task.file_sign().to_string())
.text(KEY_FILE_NAME, ctx.task.file_name().to_string())
.text(KEY_CATEGORY, self.category.clone())
.text(KEY_OFFSET, ctx.offset.to_string())
.text(KEY_PART_SIZE, chunk_len.to_string())
.text(KEY_TOTAL_SIZE, ctx.task.total_size().to_string());
let req = UploadRequest::from_task(ctx.task, UploadBody::Multipart(form));
let body = send_upload_request(ctx.client, req).await?;
parse_default_upload_response(&body)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BreakpointDownloadHttpConfig {
pub range_accept: String,
}
impl Default for BreakpointDownloadHttpConfig {
fn default() -> Self {
Self {
range_accept: DEFAULT_RANGE_ACCEPT.to_string(),
}
}
}
pub(crate) const DEFAULT_RANGE_ACCEPT: &str = "application/octet-stream";
pub(crate) fn insert_header(map: &mut HeaderMap, name: &str, value: &str) {
if let (Ok(n), Ok(v)) = (
HeaderName::from_bytes(name.as_bytes()),
HeaderValue::from_str(value),
) {
map.insert(n, v);
}
}
#[derive(Debug, Clone, Default)]
pub struct StandardRangeDownload;
impl BreakpointDownload for StandardRangeDownload {}