Skip to main content

rusty_cat/
http_breakpoint.rs

1//! Breakpoint upload/download protocol plugins.
2//!
3//! - The executor handles scheduling, chunk I/O, retries, progress, and state.
4//! - Protocol plugins handle business-specific request/response semantics.
5
6use crate::error::{InnerErrorCode, MeowError};
7use crate::transfer_task::TransferTask;
8use async_trait::async_trait;
9
10pub use crate::download_trait::{BreakpointDownload, DownloadHeadCtx, DownloadRangeGetCtx};
11pub use crate::upload_trait::{BreakpointUpload, UploadChunkCtx, UploadPrepareCtx};
12use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
13use reqwest::multipart;
14use reqwest::Method;
15
16#[derive(Debug, Clone, Default)]
17pub struct UploadResumeInfo {
18    /// File ID returned by server when upload is already complete.
19    pub completed_file_id: Option<String>,
20    /// Suggested next byte offset (`nextByte`) from server.
21    ///
22    /// Range: `>= 0`.
23    pub next_byte: Option<u64>,
24}
25
26/// Upload HTTP request body payload.
27#[derive(Debug)]
28pub enum UploadBody {
29    Multipart(multipart::Form),
30    Binary(Vec<u8>),
31}
32
33/// Upload request description returned by protocol plugin.
34#[derive(Debug)]
35pub struct UploadRequest {
36    /// HTTP method for upload call.
37    pub method: Method,
38    /// Target request URL.
39    pub url: String,
40    /// Request headers.
41    pub headers: HeaderMap,
42    /// Body payload.
43    pub body: UploadBody,
44}
45
46impl UploadRequest {
47    /// Creates an upload request using URL/method/headers from task.
48    ///
49    /// # Examples
50    ///
51    /// ```no_run
52    /// use rusty_cat::api::{TransferTask, UploadBody, UploadRequest};
53    ///
54    /// fn make_request(task: &TransferTask, bytes: Vec<u8>) {
55    ///     let req = UploadRequest::from_task(task, UploadBody::Binary(bytes));
56    ///     let _ = req;
57    /// }
58    /// ```
59    pub fn from_task(task: &TransferTask, body: UploadBody) -> Self {
60        Self {
61            method: task.method(),
62            url: task.url().to_string(),
63            headers: task.headers().clone(),
64            body,
65        }
66    }
67}
68
69/// Parses default upload response JSON payload into [`UploadResumeInfo`].
70fn parse_default_upload_response(body: &str) -> Result<UploadResumeInfo, MeowError> {
71    if body.trim().is_empty() {
72        crate::meow_flow_log!(
73            "upload_protocol",
74            "empty upload response body, fallback default"
75        );
76        return Ok(UploadResumeInfo::default());
77    }
78    let v: DefaultUploadResp = serde_json::from_str(body).map_err(|e| {
79        crate::meow_flow_log!(
80            "upload_protocol",
81            "upload response parse failed: body_len={} err={}",
82            body.len(),
83            e
84        );
85        MeowError::from_code(
86            InnerErrorCode::ResponseParseError,
87            format!("upload response json: {e}, body: {body}"),
88        )
89    })?;
90    crate::meow_flow_log!(
91        "upload_protocol",
92        "upload response parsed: file_id_present={} next_byte={:?}",
93        v.file_id.is_some(),
94        v.next_byte
95    );
96    Ok(UploadResumeInfo {
97        completed_file_id: v.file_id,
98        next_byte: v.next_byte.map(|n| if n < 0 { 0u64 } else { n as u64 }),
99    })
100}
101
102/// Sends upload request and returns response body as string.
103async fn send_upload_request(
104    client: &reqwest::Client,
105    req: UploadRequest,
106) -> Result<String, MeowError> {
107    let mut builder = client.request(req.method, req.url).headers(req.headers);
108    builder = match req.body {
109        UploadBody::Multipart(form) => builder.multipart(form),
110        UploadBody::Binary(bytes) => builder.body(bytes),
111    };
112    let resp = builder.send().await.map_err(map_reqwest)?;
113    let status = resp.status();
114    let body = resp.text().await.map_err(map_reqwest)?;
115    if !status.is_success() {
116        return Err(MeowError::from_code(
117            InnerErrorCode::ResponseStatusError,
118            format!("upload HTTP {status}: {body}"),
119        ));
120    }
121    Ok(body)
122}
123
124/// Maps `reqwest` errors into SDK errors.
125fn map_reqwest(e: reqwest::Error) -> MeowError {
126    MeowError::from_source(InnerErrorCode::HttpError, e.to_string(), e)
127}
128
129#[derive(Debug, Clone)]
130pub struct DefaultStyleUpload {
131    /// Optional business category sent in multipart form.
132    pub category: String,
133}
134
135impl Default for DefaultStyleUpload {
136    fn default() -> Self {
137        Self {
138            category: String::new(),
139        }
140    }
141}
142
143/// Multipart field key: file md5/signature.
144const KEY_FILE_MD5: &str = "fileMd5";
145/// Multipart field key: file name.
146const KEY_FILE_NAME: &str = "fileName";
147/// Multipart field key: business category.
148const KEY_CATEGORY: &str = "category";
149/// Multipart field key: total file size.
150const KEY_TOTAL_SIZE: &str = "totalSize";
151/// Multipart field key: current chunk offset.
152const KEY_OFFSET: &str = "offset";
153/// Multipart field key: current chunk byte length.
154const KEY_PART_SIZE: &str = "partSize";
155/// Multipart field key: file binary part.
156const KEY_FILE: &str = "file";
157/// Multipart part file name used for chunk payload.
158const KEY_UPLOAD_CHUNK_DATA: &str = "upload_chunk_data";
159
160#[derive(serde::Deserialize)]
161struct DefaultUploadResp {
162    #[serde(rename = "fileId")]
163    file_id: Option<String>,
164    #[serde(rename = "nextByte")]
165    next_byte: Option<i64>,
166}
167
168#[async_trait]
169impl BreakpointUpload for DefaultStyleUpload {
170    /// Sends prepare request for default multipart upload protocol.
171    async fn prepare(&self, ctx: UploadPrepareCtx<'_>) -> Result<UploadResumeInfo, MeowError> {
172        let form = multipart::Form::new()
173            .text(KEY_FILE_MD5, ctx.task.file_sign().to_string())
174            .text(KEY_FILE_NAME, ctx.task.file_name().to_string())
175            .text(KEY_CATEGORY, self.category.clone())
176            .text(KEY_TOTAL_SIZE, ctx.task.total_size().to_string());
177        let req = UploadRequest::from_task(ctx.task, UploadBody::Multipart(form));
178        let body = send_upload_request(ctx.client, req).await?;
179        parse_default_upload_response(&body)
180    }
181
182    /// Sends one upload chunk for default multipart upload protocol.
183    ///
184    /// The chunk payload is forwarded to `reqwest` as a stream built from the
185    /// shared [`bytes::Bytes`] handle, so no per-chunk `Vec::to_vec` copy is
186    /// required. Retries clone the same `Bytes` (refcount only) instead of
187    /// re-allocating the chunk buffer.
188    async fn upload_chunk(&self, ctx: UploadChunkCtx<'_>) -> Result<UploadResumeInfo, MeowError> {
189        let chunk_len = ctx.chunk.len();
190        let body = reqwest::Body::from(ctx.chunk.clone());
191        let part = multipart::Part::stream_with_length(body, chunk_len as u64)
192            .file_name(KEY_UPLOAD_CHUNK_DATA)
193            .mime_str("application/octet-stream")
194            .map_err(|e| MeowError::from_code(InnerErrorCode::HttpError, e.to_string()))?;
195
196        let form = multipart::Form::new()
197            .part(KEY_FILE, part)
198            .text(KEY_FILE_MD5, ctx.task.file_sign().to_string())
199            .text(KEY_FILE_NAME, ctx.task.file_name().to_string())
200            .text(KEY_CATEGORY, self.category.clone())
201            .text(KEY_OFFSET, ctx.offset.to_string())
202            .text(KEY_PART_SIZE, chunk_len.to_string())
203            .text(KEY_TOTAL_SIZE, ctx.task.total_size().to_string());
204        let req = UploadRequest::from_task(ctx.task, UploadBody::Multipart(form));
205        let body = send_upload_request(ctx.client, req).await?;
206        parse_default_upload_response(&body)
207    }
208}
209
210/// HTTP behavior config for breakpoint range download.
211///
212/// This is usually provided by caller in task config. If missing, enqueue logic
213/// fills it from [`crate::meow_config::MeowConfig`].
214#[derive(Debug, Clone, PartialEq, Eq)]
215pub struct BreakpointDownloadHttpConfig {
216    /// `Accept` header used by range GET chunk requests.
217    ///
218    /// Typical value: `application/octet-stream`.
219    pub range_accept: String,
220}
221
222impl Default for BreakpointDownloadHttpConfig {
223    fn default() -> Self {
224        Self {
225            range_accept: DEFAULT_RANGE_ACCEPT.to_string(),
226        }
227    }
228}
229
230pub(crate) const DEFAULT_RANGE_ACCEPT: &str = "application/octet-stream";
231
232/// Inserts header pair into map when both name/value are valid.
233pub(crate) fn insert_header(map: &mut HeaderMap, name: &str, value: &str) {
234    if let (Ok(n), Ok(v)) = (
235        HeaderName::from_bytes(name.as_bytes()),
236        HeaderValue::from_str(value),
237    ) {
238        map.insert(n, v);
239    }
240}
241
242/// Default range download protocol.
243///
244/// It sets `Range` and `Accept` headers and reads total size from
245/// `Content-Length`.
246#[derive(Debug, Clone, Default)]
247pub struct StandardRangeDownload;
248
249impl BreakpointDownload for StandardRangeDownload {}