1use crate::error::{InnerErrorCode, MeowError};
7use crate::transfer_task::TransferTask;
8use async_trait::async_trait;
9
10pub use crate::download_trait::{BreakpointDownload, DownloadHeadCtx, DownloadRangeGetCtx};
11pub use crate::upload_trait::{BreakpointUpload, UploadChunkCtx, UploadPrepareCtx};
12use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
13use reqwest::multipart;
14use reqwest::Method;
15
16#[derive(Debug, Clone, Default)]
17pub struct UploadResumeInfo {
18 pub completed_file_id: Option<String>,
20 pub next_byte: Option<u64>,
24 pub provider_upload_id: Option<String>,
39}
40
41#[derive(Debug)]
43pub enum UploadBody {
44 Multipart(multipart::Form),
45 Binary(Vec<u8>),
46}
47
48#[derive(Debug)]
50pub struct UploadRequest {
51 pub method: Method,
53 pub url: String,
55 pub headers: HeaderMap,
57 pub body: UploadBody,
59}
60
61impl UploadRequest {
62 pub fn from_task(task: &TransferTask, body: UploadBody) -> Self {
75 Self {
76 method: task.method(),
77 url: task.url().to_string(),
78 headers: task.headers().clone(),
79 body,
80 }
81 }
82}
83
84fn parse_default_upload_response(body: &str) -> Result<UploadResumeInfo, MeowError> {
86 if body.trim().is_empty() {
87 crate::meow_flow_log!(
88 "upload_protocol",
89 "empty upload response body, fallback default"
90 );
91 return Ok(UploadResumeInfo::default());
92 }
93 let v: DefaultUploadResp = serde_json::from_str(body).map_err(|e| {
94 crate::meow_flow_log!(
95 "upload_protocol",
96 "upload response parse failed: body_len={} err={}",
97 body.len(),
98 e
99 );
100 MeowError::from_code(
101 InnerErrorCode::ResponseParseError,
102 format!("upload response json: {e}, body: {body}"),
103 )
104 })?;
105 crate::meow_flow_log!(
106 "upload_protocol",
107 "upload response parsed: file_id_present={} next_byte={:?}",
108 v.file_id.is_some(),
109 v.next_byte
110 );
111 Ok(UploadResumeInfo {
112 completed_file_id: v.file_id,
113 next_byte: v.next_byte.map(|n| if n < 0 { 0u64 } else { n as u64 }),
114 provider_upload_id: None,
116 })
117}
118
119async fn send_upload_request(
121 client: &reqwest::Client,
122 req: UploadRequest,
123) -> Result<String, MeowError> {
124 let mut builder = client.request(req.method, req.url).headers(req.headers);
125 builder = match req.body {
126 UploadBody::Multipart(form) => builder.multipart(form),
127 UploadBody::Binary(bytes) => builder.body(bytes),
128 };
129 let resp = builder.send().await.map_err(map_reqwest)?;
130 let status = resp.status();
131 let body = resp.text().await.map_err(map_reqwest)?;
132 if !status.is_success() {
133 return Err(MeowError::from_code(
134 InnerErrorCode::ResponseStatusError,
135 format!("upload HTTP {status}: {body}"),
136 )
137 .with_http_status(status.as_u16()));
138 }
139 Ok(body)
140}
141
142fn map_reqwest(e: reqwest::Error) -> MeowError {
144 MeowError::from_source(InnerErrorCode::HttpError, e.to_string(), e)
145}
146
147#[derive(Debug, Clone)]
148pub struct DefaultStyleUpload {
149 pub category: String,
151}
152
153impl Default for DefaultStyleUpload {
154 fn default() -> Self {
155 Self {
156 category: String::new(),
157 }
158 }
159}
160
161const KEY_FILE_MD5: &str = "fileMd5";
163const KEY_FILE_NAME: &str = "fileName";
165const KEY_CATEGORY: &str = "category";
167const KEY_TOTAL_SIZE: &str = "totalSize";
169const KEY_OFFSET: &str = "offset";
171const KEY_PART_SIZE: &str = "partSize";
173const KEY_FILE: &str = "file";
175const KEY_UPLOAD_CHUNK_DATA: &str = "upload_chunk_data";
177
178#[derive(serde::Deserialize)]
179struct DefaultUploadResp {
180 #[serde(rename = "fileId")]
181 file_id: Option<String>,
182 #[serde(rename = "nextByte")]
183 next_byte: Option<i64>,
184}
185
186#[async_trait]
187impl BreakpointUpload for DefaultStyleUpload {
188 async fn prepare(&self, ctx: UploadPrepareCtx<'_>) -> Result<UploadResumeInfo, MeowError> {
190 let form = multipart::Form::new()
191 .text(KEY_FILE_MD5, ctx.task.file_sign().to_string())
192 .text(KEY_FILE_NAME, ctx.task.file_name().to_string())
193 .text(KEY_CATEGORY, self.category.clone())
194 .text(KEY_TOTAL_SIZE, ctx.task.total_size().to_string());
195 let req = UploadRequest::from_task(ctx.task, UploadBody::Multipart(form));
196 let body = send_upload_request(ctx.client, req).await?;
197 parse_default_upload_response(&body)
198 }
199
200 async fn upload_chunk(&self, ctx: UploadChunkCtx<'_>) -> Result<UploadResumeInfo, MeowError> {
207 let chunk_len = ctx.chunk.len();
208 let body = reqwest::Body::from(ctx.chunk.clone());
209 let part = multipart::Part::stream_with_length(body, chunk_len as u64)
210 .file_name(KEY_UPLOAD_CHUNK_DATA)
211 .mime_str("application/octet-stream")
212 .map_err(|e| MeowError::from_code(InnerErrorCode::HttpError, e.to_string()))?;
213
214 let form = multipart::Form::new()
215 .part(KEY_FILE, part)
216 .text(KEY_FILE_MD5, ctx.task.file_sign().to_string())
217 .text(KEY_FILE_NAME, ctx.task.file_name().to_string())
218 .text(KEY_CATEGORY, self.category.clone())
219 .text(KEY_OFFSET, ctx.offset.to_string())
220 .text(KEY_PART_SIZE, chunk_len.to_string())
221 .text(KEY_TOTAL_SIZE, ctx.task.total_size().to_string());
222 let req = UploadRequest::from_task(ctx.task, UploadBody::Multipart(form));
223 let body = send_upload_request(ctx.client, req).await?;
224 parse_default_upload_response(&body)
225 }
226}
227
228#[derive(Debug, Clone, PartialEq, Eq)]
233pub struct BreakpointDownloadHttpConfig {
234 pub range_accept: String,
238}
239
240impl Default for BreakpointDownloadHttpConfig {
241 fn default() -> Self {
242 Self {
243 range_accept: DEFAULT_RANGE_ACCEPT.to_string(),
244 }
245 }
246}
247
248pub(crate) const DEFAULT_RANGE_ACCEPT: &str = "application/octet-stream";
249
250pub(crate) fn insert_header(map: &mut HeaderMap, name: &str, value: &str) {
252 if let (Ok(n), Ok(v)) = (
253 HeaderName::from_bytes(name.as_bytes()),
254 HeaderValue::from_str(value),
255 ) {
256 map.insert(n, v);
257 }
258}
259
260#[derive(Debug, Clone, Default)]
265pub struct StandardRangeDownload;
266
267impl BreakpointDownload for StandardRangeDownload {}
268
269#[cfg(test)]
270mod tests {
271 use super::{BreakpointUpload, DefaultStyleUpload};
272
273 #[test]
274 fn default_style_upload_is_serial_only() {
275 assert!(!DefaultStyleUpload::default().supports_parallel_parts());
278 }
279}