Skip to main content

rusty_cat/
http_breakpoint.rs

1//! Breakpoint upload/download protocol plugins.
2//!
3//! - The executor handles scheduling, chunk I/O, retries, progress, and state.
4//! - Protocol plugins handle business-specific request/response semantics.
5
6use crate::error::{InnerErrorCode, MeowError};
7use crate::transfer_task::TransferTask;
8use async_trait::async_trait;
9
10pub use crate::download_trait::{BreakpointDownload, DownloadHeadCtx, DownloadRangeGetCtx};
11pub use crate::upload_trait::{BreakpointUpload, UploadChunkCtx, UploadPrepareCtx};
12use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
13use reqwest::multipart;
14use reqwest::Method;
15
16#[derive(Debug, Clone, Default)]
17pub struct UploadResumeInfo {
18    /// File ID returned by server when upload is already complete.
19    pub completed_file_id: Option<String>,
20    /// Suggested next byte offset (`nextByte`) from server.
21    ///
22    /// Range: `>= 0`.
23    pub next_byte: Option<u64>,
24    /// Opaque, provider-defined identifier of the in-flight multipart/resumable
25    /// upload session (for example an Aliyun OSS multipart `UploadId`).
26    ///
27    /// This is **not** a credential and is safe to persist. Persisting it lets
28    /// callers abort an orphaned session out-of-band after a crash, so that
29    /// uncommitted parts/blocks stop accruing storage cost. Providers that have
30    /// no separate session id (such as Azure Block Blob, whose resume state is
31    /// the uncommitted block list keyed by the blob URL) leave this `None`.
32    ///
33    /// Note: the bundled executor only consumes [`UploadResumeInfo::completed_file_id`]
34    /// and [`UploadResumeInfo::next_byte`]; it does not forward this id to the
35    /// progress pipeline. To persist it, hold the protocol instance and read its
36    /// accessor (for OSS, [`crate::aliyun_oss_direct::AliOssDirectUpload::current_upload_id`]),
37    /// or consume this value from a custom executor.
38    pub provider_upload_id: Option<String>,
39}
40
41/// Upload HTTP request body payload.
42#[derive(Debug)]
43pub enum UploadBody {
44    Multipart(multipart::Form),
45    Binary(Vec<u8>),
46}
47
48/// Upload request description returned by protocol plugin.
49#[derive(Debug)]
50pub struct UploadRequest {
51    /// HTTP method for upload call.
52    pub method: Method,
53    /// Target request URL.
54    pub url: String,
55    /// Request headers.
56    pub headers: HeaderMap,
57    /// Body payload.
58    pub body: UploadBody,
59}
60
61impl UploadRequest {
62    /// Creates an upload request using URL/method/headers from task.
63    ///
64    /// # Examples
65    ///
66    /// ```no_run
67    /// use rusty_cat::api::{TransferTask, UploadBody, UploadRequest};
68    ///
69    /// fn make_request(task: &TransferTask, bytes: Vec<u8>) {
70    ///     let req = UploadRequest::from_task(task, UploadBody::Binary(bytes));
71    ///     let _ = req;
72    /// }
73    /// ```
74    pub fn from_task(task: &TransferTask, body: UploadBody) -> Self {
75        Self {
76            method: task.method(),
77            url: task.url().to_string(),
78            headers: task.headers().clone(),
79            body,
80        }
81    }
82}
83
84/// Parses default upload response JSON payload into [`UploadResumeInfo`].
85fn parse_default_upload_response(body: &str) -> Result<UploadResumeInfo, MeowError> {
86    if body.trim().is_empty() {
87        crate::meow_flow_log!(
88            "upload_protocol",
89            "empty upload response body, fallback default"
90        );
91        return Ok(UploadResumeInfo::default());
92    }
93    let v: DefaultUploadResp = serde_json::from_str(body).map_err(|e| {
94        crate::meow_flow_log!(
95            "upload_protocol",
96            "upload response parse failed: body_len={} err={}",
97            body.len(),
98            e
99        );
100        MeowError::from_code(
101            InnerErrorCode::ResponseParseError,
102            format!("upload response json: {e}, body: {body}"),
103        )
104    })?;
105    crate::meow_flow_log!(
106        "upload_protocol",
107        "upload response parsed: file_id_present={} next_byte={:?}",
108        v.file_id.is_some(),
109        v.next_byte
110    );
111    Ok(UploadResumeInfo {
112        completed_file_id: v.file_id,
113        next_byte: v.next_byte.map(|n| if n < 0 { 0u64 } else { n as u64 }),
114        // The default multipart protocol carries no provider-side session id.
115        provider_upload_id: None,
116    })
117}
118
119/// Sends upload request and returns response body as string.
120async fn send_upload_request(
121    client: &reqwest::Client,
122    req: UploadRequest,
123) -> Result<String, MeowError> {
124    let mut builder = client.request(req.method, req.url).headers(req.headers);
125    builder = match req.body {
126        UploadBody::Multipart(form) => builder.multipart(form),
127        UploadBody::Binary(bytes) => builder.body(bytes),
128    };
129    let resp = builder.send().await.map_err(map_reqwest)?;
130    let status = resp.status();
131    let body = resp.text().await.map_err(map_reqwest)?;
132    if !status.is_success() {
133        return Err(MeowError::from_code(
134            InnerErrorCode::ResponseStatusError,
135            format!("upload HTTP {status}: {body}"),
136        )
137        .with_http_status(status.as_u16()));
138    }
139    Ok(body)
140}
141
142/// Maps `reqwest` errors into SDK errors.
143fn map_reqwest(e: reqwest::Error) -> MeowError {
144    MeowError::from_source(InnerErrorCode::HttpError, e.to_string(), e)
145}
146
147#[derive(Debug, Clone)]
148pub struct DefaultStyleUpload {
149    /// Optional business category sent in multipart form.
150    pub category: String,
151}
152
153impl Default for DefaultStyleUpload {
154    fn default() -> Self {
155        Self {
156            category: String::new(),
157        }
158    }
159}
160
161/// Multipart field key: file md5/signature.
162const KEY_FILE_MD5: &str = "fileMd5";
163/// Multipart field key: file name.
164const KEY_FILE_NAME: &str = "fileName";
165/// Multipart field key: business category.
166const KEY_CATEGORY: &str = "category";
167/// Multipart field key: total file size.
168const KEY_TOTAL_SIZE: &str = "totalSize";
169/// Multipart field key: current chunk offset.
170const KEY_OFFSET: &str = "offset";
171/// Multipart field key: current chunk byte length.
172const KEY_PART_SIZE: &str = "partSize";
173/// Multipart field key: file binary part.
174const KEY_FILE: &str = "file";
175/// Multipart part file name used for chunk payload.
176const KEY_UPLOAD_CHUNK_DATA: &str = "upload_chunk_data";
177
178#[derive(serde::Deserialize)]
179struct DefaultUploadResp {
180    #[serde(rename = "fileId")]
181    file_id: Option<String>,
182    #[serde(rename = "nextByte")]
183    next_byte: Option<i64>,
184}
185
186#[async_trait]
187impl BreakpointUpload for DefaultStyleUpload {
188    /// Sends prepare request for default multipart upload protocol.
189    async fn prepare(&self, ctx: UploadPrepareCtx<'_>) -> Result<UploadResumeInfo, MeowError> {
190        let form = multipart::Form::new()
191            .text(KEY_FILE_MD5, ctx.task.file_sign().to_string())
192            .text(KEY_FILE_NAME, ctx.task.file_name().to_string())
193            .text(KEY_CATEGORY, self.category.clone())
194            .text(KEY_TOTAL_SIZE, ctx.task.total_size().to_string());
195        let req = UploadRequest::from_task(ctx.task, UploadBody::Multipart(form));
196        let body = send_upload_request(ctx.client, req).await?;
197        parse_default_upload_response(&body)
198    }
199
200    /// Sends one upload chunk for default multipart upload protocol.
201    ///
202    /// The chunk payload is forwarded to `reqwest` as a stream built from the
203    /// shared [`bytes::Bytes`] handle, so no per-chunk `Vec::to_vec` copy is
204    /// required. Retries clone the same `Bytes` (refcount only) instead of
205    /// re-allocating the chunk buffer.
206    async fn upload_chunk(&self, ctx: UploadChunkCtx<'_>) -> Result<UploadResumeInfo, MeowError> {
207        let chunk_len = ctx.chunk.len();
208        let body = reqwest::Body::from(ctx.chunk.clone());
209        let part = multipart::Part::stream_with_length(body, chunk_len as u64)
210            .file_name(KEY_UPLOAD_CHUNK_DATA)
211            .mime_str("application/octet-stream")
212            .map_err(|e| MeowError::from_code(InnerErrorCode::HttpError, e.to_string()))?;
213
214        let form = multipart::Form::new()
215            .part(KEY_FILE, part)
216            .text(KEY_FILE_MD5, ctx.task.file_sign().to_string())
217            .text(KEY_FILE_NAME, ctx.task.file_name().to_string())
218            .text(KEY_CATEGORY, self.category.clone())
219            .text(KEY_OFFSET, ctx.offset.to_string())
220            .text(KEY_PART_SIZE, chunk_len.to_string())
221            .text(KEY_TOTAL_SIZE, ctx.task.total_size().to_string());
222        let req = UploadRequest::from_task(ctx.task, UploadBody::Multipart(form));
223        let body = send_upload_request(ctx.client, req).await?;
224        parse_default_upload_response(&body)
225    }
226}
227
228/// HTTP behavior config for breakpoint range download.
229///
230/// This is usually provided by caller in task config. If missing, enqueue logic
231/// fills it from [`crate::meow_config::MeowConfig`].
232#[derive(Debug, Clone, PartialEq, Eq)]
233pub struct BreakpointDownloadHttpConfig {
234    /// `Accept` header used by range GET chunk requests.
235    ///
236    /// Typical value: `application/octet-stream`.
237    pub range_accept: String,
238}
239
240impl Default for BreakpointDownloadHttpConfig {
241    fn default() -> Self {
242        Self {
243            range_accept: DEFAULT_RANGE_ACCEPT.to_string(),
244        }
245    }
246}
247
248pub(crate) const DEFAULT_RANGE_ACCEPT: &str = "application/octet-stream";
249
250/// Inserts header pair into map when both name/value are valid.
251pub(crate) fn insert_header(map: &mut HeaderMap, name: &str, value: &str) {
252    if let (Ok(n), Ok(v)) = (
253        HeaderName::from_bytes(name.as_bytes()),
254        HeaderValue::from_str(value),
255    ) {
256        map.insert(n, v);
257    }
258}
259
260/// Default range download protocol.
261///
262/// It sets `Range` and `Accept` headers and reads total size from
263/// `Content-Length`.
264#[derive(Debug, Clone, Default)]
265pub struct StandardRangeDownload;
266
267impl BreakpointDownload for StandardRangeDownload {}
268
269#[cfg(test)]
270mod tests {
271    use super::{BreakpointUpload, DefaultStyleUpload};
272
273    #[test]
274    fn default_style_upload_is_serial_only() {
275        // The default multipart protocol trusts the server's single-cursor
276        // `nextByte`, so it must NOT advertise out-of-order safety.
277        assert!(!DefaultStyleUpload::default().supports_parallel_parts());
278    }
279}