Skip to main content

rusty_cat/
http_breakpoint.rs

1//! 断点上传/下载协议插件:
2//! - 执行器只做并发调度、分片读写、重试、进度、暂停恢复;
3//! - 协议插件只负责业务语义(初始化、分片请求、完成/取消、响应解析)。
4
5use async_trait::async_trait;
6use crate::error::{InnerErrorCode, MeowError};
7use crate::transfer_task::TransferTask;
8
9pub use crate::download_trait::{BreakpointDownload, DownloadHeadCtx, DownloadRangeGetCtx};
10pub use crate::upload_trait::{BreakpointUpload, UploadChunkCtx, UploadPrepareCtx};
11use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
12use reqwest::multipart;
13use reqwest::Method;
14
15#[derive(Debug, Clone, Default)]
16pub struct UploadResumeInfo {
17    /// 若已上传完成,服务端返回的文件 ID。
18    pub completed_file_id: Option<String>,
19    /// 建议下一字节偏移(`nextByte`)。
20    pub next_byte: Option<u64>,
21}
22
23/// 一次上传 HTTP 请求体。
24#[derive(Debug)]
25pub enum UploadBody {
26    Multipart(multipart::Form),
27    Binary(Vec<u8>),
28}
29
30/// 协议插件返回给执行器的上传请求描述。
31#[derive(Debug)]
32pub struct UploadRequest {
33    pub method: Method,
34    pub url: String,
35    pub headers: HeaderMap,
36    pub body: UploadBody,
37}
38
39impl UploadRequest {
40    pub fn from_task(task: &TransferTask, body: UploadBody) -> Self {
41        Self {
42            method: task.method(),
43            url: task.url().to_string(),
44            headers: task.headers().clone(),
45            body,
46        }
47    }
48}
49
50
51fn parse_default_upload_response(body: &str) -> Result<UploadResumeInfo, MeowError> {
52    if body.trim().is_empty() {
53        crate::meow_flow_log!(
54            "upload_protocol",
55            "empty upload response body, fallback default"
56        );
57        return Ok(UploadResumeInfo::default());
58    }
59    let v: DefaultUploadResp = serde_json::from_str(body).map_err(|e| {
60        crate::meow_flow_log!(
61            "upload_protocol",
62            "upload response parse failed: body_len={} err={}",
63            body.len(),
64            e
65        );
66        MeowError::from_code(
67            InnerErrorCode::ResponseParseError,
68            format!("upload response json: {e}, body: {body}"),
69        )
70    })?;
71    crate::meow_flow_log!(
72        "upload_protocol",
73        "upload response parsed: file_id_present={} next_byte={:?}",
74        v.file_id.is_some(),
75        v.next_byte
76    );
77    Ok(UploadResumeInfo {
78        completed_file_id: v.file_id,
79        next_byte: v.next_byte.map(|n| if n < 0 { 0u64 } else { n as u64 }),
80    })
81}
82
83async fn send_upload_request(
84    client: &reqwest::Client,
85    req: UploadRequest,
86) -> Result<String, MeowError> {
87    let mut builder = client.request(req.method, req.url).headers(req.headers);
88    builder = match req.body {
89        UploadBody::Multipart(form) => builder.multipart(form),
90        UploadBody::Binary(bytes) => builder.body(bytes),
91    };
92    let resp = builder.send().await.map_err(map_reqwest)?;
93    let status = resp.status();
94    let body = resp.text().await.map_err(map_reqwest)?;
95    if !status.is_success() {
96        return Err(MeowError::from_code(
97            InnerErrorCode::ResponseStatusError,
98            format!("upload HTTP {status}: {body}"),
99        ));
100    }
101    Ok(body)
102}
103
104fn map_reqwest(e: reqwest::Error) -> MeowError {
105    MeowError::from_source(InnerErrorCode::HttpError, e.to_string(), e)
106}
107
108#[derive(Debug, Clone)]
109pub struct DefaultStyleUpload {
110    pub category: String,
111}
112
113impl Default for DefaultStyleUpload {
114    fn default() -> Self {
115        Self {
116            category: String::new(),
117        }
118    }
119}
120
121const KEY_FILE_MD5: &str = "fileMd5";
122const KEY_FILE_NAME: &str = "fileName";
123const KEY_CATEGORY: &str = "category";
124const KEY_TOTAL_SIZE: &str = "totalSize";
125const KEY_OFFSET: &str = "offset";
126const KEY_PART_SIZE: &str = "partSize";
127const KEY_FILE: &str = "file";
128const KEY_UPLOAD_CHUNK_DATA: &str = "upload_chunk_data";
129
130#[derive(serde::Deserialize)]
131struct DefaultUploadResp {
132    #[serde(rename = "fileId")]
133    file_id: Option<String>,
134    #[serde(rename = "nextByte")]
135    next_byte: Option<i64>,
136}
137
138#[async_trait]
139impl BreakpointUpload for DefaultStyleUpload {
140    async fn prepare(&self, ctx: UploadPrepareCtx<'_>) -> Result<UploadResumeInfo, MeowError> {
141        let form = multipart::Form::new()
142            .text(KEY_FILE_MD5, ctx.task.file_sign().to_string())
143            .text(KEY_FILE_NAME, ctx.task.file_name().to_string())
144            .text(KEY_CATEGORY, self.category.clone())
145            .text(KEY_TOTAL_SIZE, ctx.task.total_size().to_string());
146        let req = UploadRequest::from_task(ctx.task, UploadBody::Multipart(form));
147        let body = send_upload_request(ctx.client, req).await?;
148        parse_default_upload_response(&body)
149    }
150
151    async fn upload_chunk(&self, ctx: UploadChunkCtx<'_>) -> Result<UploadResumeInfo, MeowError> {
152        let part = multipart::Part::bytes(ctx.chunk.to_vec())
153            .file_name(KEY_UPLOAD_CHUNK_DATA)
154            .mime_str("application/octet-stream")
155            .map_err(|e| MeowError::from_code(InnerErrorCode::HttpError, e.to_string()))?;
156
157        let form = multipart::Form::new()
158            .part(KEY_FILE, part)
159            .text(KEY_FILE_MD5, ctx.task.file_sign().to_string())
160            .text(KEY_FILE_NAME, ctx.task.file_name().to_string())
161            .text(KEY_CATEGORY, self.category.clone())
162            .text(KEY_OFFSET, ctx.offset.to_string())
163            .text(KEY_PART_SIZE, ctx.chunk.len().to_string())
164            .text(KEY_TOTAL_SIZE, ctx.task.total_size().to_string());
165        let req = UploadRequest::from_task(ctx.task, UploadBody::Multipart(form));
166        let body = send_upload_request(ctx.client, req).await?;
167        parse_default_upload_response(&body)
168    }
169}
170
171/// 断点下载中与 HTTP 请求相关的可配置项(Range GET 的 Accept、HEAD 行为等),通常放在
172/// [`TransferTask`] 上由调用方传入;未设置时由 [`crate::meow_config::MeowConfig`] 在入队时补齐。
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct BreakpointDownloadHttpConfig {
175    /// 分片 GET(带 `Range`)使用的 `Accept` 头。
176    pub range_accept: String,
177}
178
179impl Default for BreakpointDownloadHttpConfig {
180    fn default() -> Self {
181        Self {
182            range_accept: DEFAULT_RANGE_ACCEPT.to_string(),
183        }
184    }
185}
186
187pub(crate) const DEFAULT_RANGE_ACCEPT: &str = "application/octet-stream";
188
189pub(crate) fn insert_header(map: &mut HeaderMap, name: &str, value: &str) {
190    if let (Ok(n), Ok(v)) = (
191        HeaderName::from_bytes(name.as_bytes()),
192        HeaderValue::from_str(value),
193    ) {
194        map.insert(n, v);
195    }
196}
197
198/// 默认 Range 下载:仅设置 Range / Accept,总长度取自 `Content-Length`。
199#[derive(Debug, Clone, Default)]
200pub struct StandardRangeDownload;
201
202impl BreakpointDownload for StandardRangeDownload {}