rusty_cat/
http_breakpoint.rs1use crate::error::{InnerErrorCode, MeowError};
7use crate::transfer_task::TransferTask;
8use async_trait::async_trait;
9
10pub use crate::download_trait::{BreakpointDownload, DownloadHeadCtx, DownloadRangeGetCtx};
11pub use crate::upload_trait::{BreakpointUpload, UploadChunkCtx, UploadPrepareCtx};
12use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
13use reqwest::multipart;
14use reqwest::Method;
15
16#[derive(Debug, Clone, Default)]
17pub struct UploadResumeInfo {
18 pub completed_file_id: Option<String>,
20 pub next_byte: Option<u64>,
24}
25
26#[derive(Debug)]
28pub enum UploadBody {
29 Multipart(multipart::Form),
30 Binary(Vec<u8>),
31}
32
33#[derive(Debug)]
35pub struct UploadRequest {
36 pub method: Method,
38 pub url: String,
40 pub headers: HeaderMap,
42 pub body: UploadBody,
44}
45
46impl UploadRequest {
47 pub fn from_task(task: &TransferTask, body: UploadBody) -> Self {
60 Self {
61 method: task.method(),
62 url: task.url().to_string(),
63 headers: task.headers().clone(),
64 body,
65 }
66 }
67}
68
69fn parse_default_upload_response(body: &str) -> Result<UploadResumeInfo, MeowError> {
71 if body.trim().is_empty() {
72 crate::meow_flow_log!(
73 "upload_protocol",
74 "empty upload response body, fallback default"
75 );
76 return Ok(UploadResumeInfo::default());
77 }
78 let v: DefaultUploadResp = serde_json::from_str(body).map_err(|e| {
79 crate::meow_flow_log!(
80 "upload_protocol",
81 "upload response parse failed: body_len={} err={}",
82 body.len(),
83 e
84 );
85 MeowError::from_code(
86 InnerErrorCode::ResponseParseError,
87 format!("upload response json: {e}, body: {body}"),
88 )
89 })?;
90 crate::meow_flow_log!(
91 "upload_protocol",
92 "upload response parsed: file_id_present={} next_byte={:?}",
93 v.file_id.is_some(),
94 v.next_byte
95 );
96 Ok(UploadResumeInfo {
97 completed_file_id: v.file_id,
98 next_byte: v.next_byte.map(|n| if n < 0 { 0u64 } else { n as u64 }),
99 })
100}
101
102async fn send_upload_request(
104 client: &reqwest::Client,
105 req: UploadRequest,
106) -> Result<String, MeowError> {
107 let mut builder = client.request(req.method, req.url).headers(req.headers);
108 builder = match req.body {
109 UploadBody::Multipart(form) => builder.multipart(form),
110 UploadBody::Binary(bytes) => builder.body(bytes),
111 };
112 let resp = builder.send().await.map_err(map_reqwest)?;
113 let status = resp.status();
114 let body = resp.text().await.map_err(map_reqwest)?;
115 if !status.is_success() {
116 return Err(MeowError::from_code(
117 InnerErrorCode::ResponseStatusError,
118 format!("upload HTTP {status}: {body}"),
119 ));
120 }
121 Ok(body)
122}
123
124fn map_reqwest(e: reqwest::Error) -> MeowError {
126 MeowError::from_source(InnerErrorCode::HttpError, e.to_string(), e)
127}
128
129#[derive(Debug, Clone)]
130pub struct DefaultStyleUpload {
131 pub category: String,
133}
134
135impl Default for DefaultStyleUpload {
136 fn default() -> Self {
137 Self {
138 category: String::new(),
139 }
140 }
141}
142
143const KEY_FILE_MD5: &str = "fileMd5";
145const KEY_FILE_NAME: &str = "fileName";
147const KEY_CATEGORY: &str = "category";
149const KEY_TOTAL_SIZE: &str = "totalSize";
151const KEY_OFFSET: &str = "offset";
153const KEY_PART_SIZE: &str = "partSize";
155const KEY_FILE: &str = "file";
157const KEY_UPLOAD_CHUNK_DATA: &str = "upload_chunk_data";
159
160#[derive(serde::Deserialize)]
161struct DefaultUploadResp {
162 #[serde(rename = "fileId")]
163 file_id: Option<String>,
164 #[serde(rename = "nextByte")]
165 next_byte: Option<i64>,
166}
167
168#[async_trait]
169impl BreakpointUpload for DefaultStyleUpload {
170 async fn prepare(&self, ctx: UploadPrepareCtx<'_>) -> Result<UploadResumeInfo, MeowError> {
172 let form = multipart::Form::new()
173 .text(KEY_FILE_MD5, ctx.task.file_sign().to_string())
174 .text(KEY_FILE_NAME, ctx.task.file_name().to_string())
175 .text(KEY_CATEGORY, self.category.clone())
176 .text(KEY_TOTAL_SIZE, ctx.task.total_size().to_string());
177 let req = UploadRequest::from_task(ctx.task, UploadBody::Multipart(form));
178 let body = send_upload_request(ctx.client, req).await?;
179 parse_default_upload_response(&body)
180 }
181
182 async fn upload_chunk(&self, ctx: UploadChunkCtx<'_>) -> Result<UploadResumeInfo, MeowError> {
189 let chunk_len = ctx.chunk.len();
190 let body = reqwest::Body::from(ctx.chunk.clone());
191 let part = multipart::Part::stream_with_length(body, chunk_len as u64)
192 .file_name(KEY_UPLOAD_CHUNK_DATA)
193 .mime_str("application/octet-stream")
194 .map_err(|e| MeowError::from_code(InnerErrorCode::HttpError, e.to_string()))?;
195
196 let form = multipart::Form::new()
197 .part(KEY_FILE, part)
198 .text(KEY_FILE_MD5, ctx.task.file_sign().to_string())
199 .text(KEY_FILE_NAME, ctx.task.file_name().to_string())
200 .text(KEY_CATEGORY, self.category.clone())
201 .text(KEY_OFFSET, ctx.offset.to_string())
202 .text(KEY_PART_SIZE, chunk_len.to_string())
203 .text(KEY_TOTAL_SIZE, ctx.task.total_size().to_string());
204 let req = UploadRequest::from_task(ctx.task, UploadBody::Multipart(form));
205 let body = send_upload_request(ctx.client, req).await?;
206 parse_default_upload_response(&body)
207 }
208}
209
210#[derive(Debug, Clone, PartialEq, Eq)]
215pub struct BreakpointDownloadHttpConfig {
216 pub range_accept: String,
220}
221
222impl Default for BreakpointDownloadHttpConfig {
223 fn default() -> Self {
224 Self {
225 range_accept: DEFAULT_RANGE_ACCEPT.to_string(),
226 }
227 }
228}
229
230pub(crate) const DEFAULT_RANGE_ACCEPT: &str = "application/octet-stream";
231
232pub(crate) fn insert_header(map: &mut HeaderMap, name: &str, value: &str) {
234 if let (Ok(n), Ok(v)) = (
235 HeaderName::from_bytes(name.as_bytes()),
236 HeaderValue::from_str(value),
237 ) {
238 map.insert(n, v);
239 }
240}
241
242#[derive(Debug, Clone, Default)]
247pub struct StandardRangeDownload;
248
249impl BreakpointDownload for StandardRangeDownload {}