1use async_trait::async_trait;
6use crate::error::{InnerErrorCode, MeowError};
7use crate::transfer_task::TransferTask;
8
9pub use crate::download_trait::BreakpointDownload;
10pub use crate::upload_trait::BreakpointUpload;
11use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
12use reqwest::multipart;
13use reqwest::Method;
14
15#[derive(Debug, Clone, Default)]
16pub struct UploadResumeInfo {
17 pub completed_file_id: Option<String>,
19 pub next_byte: Option<u64>,
21}
22
23#[derive(Debug)]
25pub enum UploadBody {
26 Multipart(multipart::Form),
27 Binary(Vec<u8>),
28}
29
30#[derive(Debug)]
32pub struct UploadRequest {
33 pub method: Method,
34 pub url: String,
35 pub headers: HeaderMap,
36 pub body: UploadBody,
37}
38
39impl UploadRequest {
40 pub fn from_task(task: &TransferTask, body: UploadBody) -> Self {
41 Self {
42 method: task.method(),
43 url: task.url().to_string(),
44 headers: task.headers().clone(),
45 body,
46 }
47 }
48}
49
50
51fn parse_default_upload_response(body: &str) -> Result<UploadResumeInfo, MeowError> {
52 if body.trim().is_empty() {
53 crate::meow_flow_log!(
54 "upload_protocol",
55 "empty upload response body, fallback default"
56 );
57 return Ok(UploadResumeInfo::default());
58 }
59 let v: DefaultUploadResp = serde_json::from_str(body).map_err(|e| {
60 crate::meow_flow_log!(
61 "upload_protocol",
62 "upload response parse failed: body_len={} err={}",
63 body.len(),
64 e
65 );
66 MeowError::from_code(
67 InnerErrorCode::ResponseParseError,
68 format!("upload response json: {e}, body: {body}"),
69 )
70 })?;
71 crate::meow_flow_log!(
72 "upload_protocol",
73 "upload response parsed: file_id_present={} next_byte={:?}",
74 v.file_id.is_some(),
75 v.next_byte
76 );
77 Ok(UploadResumeInfo {
78 completed_file_id: v.file_id,
79 next_byte: v.next_byte.map(|n| if n < 0 { 0u64 } else { n as u64 }),
80 })
81}
82
83async fn send_upload_request(
84 client: &reqwest::Client,
85 req: UploadRequest,
86) -> Result<String, MeowError> {
87 let mut builder = client.request(req.method, req.url).headers(req.headers);
88 builder = match req.body {
89 UploadBody::Multipart(form) => builder.multipart(form),
90 UploadBody::Binary(bytes) => builder.body(bytes),
91 };
92 let resp = builder.send().await.map_err(map_reqwest)?;
93 let status = resp.status();
94 let body = resp.text().await.map_err(map_reqwest)?;
95 if !status.is_success() {
96 return Err(MeowError::from_code(
97 InnerErrorCode::ResponseStatusError,
98 format!("upload HTTP {status}: {body}"),
99 ));
100 }
101 Ok(body)
102}
103
104fn map_reqwest(e: reqwest::Error) -> MeowError {
105 MeowError::from_source(InnerErrorCode::HttpError, e.to_string(), e)
106}
107
108#[derive(Debug, Clone)]
109pub struct DefaultStyleUpload {
110 pub category: String,
111}
112
113impl Default for DefaultStyleUpload {
114 fn default() -> Self {
115 Self {
116 category: String::new(),
117 }
118 }
119}
120
121const KEY_FILE_MD5: &str = "fileMd5";
122const KEY_FILE_NAME: &str = "fileName";
123const KEY_CATEGORY: &str = "category";
124const KEY_TOTAL_SIZE: &str = "totalSize";
125const KEY_OFFSET: &str = "offset";
126const KEY_PART_SIZE: &str = "partSize";
127const KEY_FILE: &str = "file";
128const KEY_UPLOAD_CHUNK_DATA: &str = "upload_chunk_data";
129
130#[derive(serde::Deserialize)]
131struct DefaultUploadResp {
132 #[serde(rename = "fileId")]
133 file_id: Option<String>,
134 #[serde(rename = "nextByte")]
135 next_byte: Option<i64>,
136}
137
138#[async_trait]
139impl BreakpointUpload for DefaultStyleUpload {
140 async fn prepare(
141 &self,
142 client: &reqwest::Client,
143 task: &TransferTask,
144 _local_offset: u64,
145 ) -> Result<UploadResumeInfo, MeowError> {
146 let form = multipart::Form::new()
147 .text(KEY_FILE_MD5, task.file_sign().to_string())
148 .text(KEY_FILE_NAME, task.file_name().to_string())
149 .text(KEY_CATEGORY, self.category.clone())
150 .text(KEY_TOTAL_SIZE, task.total_size().to_string());
151 let req = UploadRequest::from_task(task, UploadBody::Multipart(form));
152 let body = send_upload_request(client, req).await?;
153 parse_default_upload_response(&body)
154 }
155
156 async fn upload_chunk(
157 &self,
158 client: &reqwest::Client,
159 task: &TransferTask,
160 chunk: &[u8],
161 offset: u64,
162 ) -> Result<UploadResumeInfo, MeowError> {
163 let part = multipart::Part::bytes(chunk.to_vec())
164 .file_name(KEY_UPLOAD_CHUNK_DATA)
165 .mime_str("application/octet-stream")
166 .map_err(|e| MeowError::from_code(InnerErrorCode::HttpError, e.to_string()))?;
167
168 let form = multipart::Form::new()
169 .part(KEY_FILE, part)
170 .text(KEY_FILE_MD5, task.file_sign().to_string())
171 .text(KEY_FILE_NAME, task.file_name().to_string())
172 .text(KEY_CATEGORY, self.category.clone())
173 .text(KEY_OFFSET, offset.to_string())
174 .text(KEY_PART_SIZE, chunk.len().to_string())
175 .text(KEY_TOTAL_SIZE, task.total_size().to_string());
176 let req = UploadRequest::from_task(task, UploadBody::Multipart(form));
177 let body = send_upload_request(client, req).await?;
178 parse_default_upload_response(&body)
179 }
180}
181
182#[derive(Debug, Clone, PartialEq, Eq)]
185pub struct BreakpointDownloadHttpConfig {
186 pub range_accept: String,
188}
189
190impl Default for BreakpointDownloadHttpConfig {
191 fn default() -> Self {
192 Self {
193 range_accept: DEFAULT_RANGE_ACCEPT.to_string(),
194 }
195 }
196}
197
198pub(crate) const DEFAULT_RANGE_ACCEPT: &str = "application/octet-stream";
199
200pub(crate) fn insert_header(map: &mut HeaderMap, name: &str, value: &str) {
201 if let (Ok(n), Ok(v)) = (
202 HeaderName::from_bytes(name.as_bytes()),
203 HeaderValue::from_str(value),
204 ) {
205 map.insert(n, v);
206 }
207}
208
209#[derive(Debug, Clone, Default)]
211pub struct StandardRangeDownload;
212
213impl BreakpointDownload for StandardRangeDownload {}