1use async_trait::async_trait;
2use reqwest::header::{CONTENT_LENGTH, ETAG};
3use reqwest::{Client, Method};
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::time::sleep;
7
8use crate::chunk_outcome::ChunkOutcome;
9use crate::direction::Direction;
10use crate::error::{InnerErrorCode, MeowError};
11use crate::http_breakpoint::{
12 BreakpointDownload, BreakpointUpload, DefaultStyleUpload, DownloadHeadCtx,
13 StandardRangeDownload, UploadPrepareCtx,
14};
15use crate::prepare_outcome::PrepareOutcome;
16use crate::transfer_executor_trait::TransferTrait;
17use crate::transfer_task::TransferTask;
18
19use super::default_http_transfer_chunks::{download_one_chunk, map_reqwest, upload_one_chunk};
20
21pub(crate) fn default_breakpoint_arcs() -> (
23 Arc<dyn BreakpointUpload + Send + Sync>,
24 Arc<dyn BreakpointDownload + Send + Sync>,
25) {
26 (
27 Arc::new(DefaultStyleUpload::default()),
28 Arc::new(StandardRangeDownload::default()),
29 )
30}
31
32pub struct DefaultHttpTransfer {
34 client: reqwest::Client,
36 fallback_upload: Arc<dyn BreakpointUpload + Send + Sync>,
38 fallback_download: Arc<dyn BreakpointDownload + Send + Sync>,
40}
41
42impl DefaultHttpTransfer {
43 pub fn new() -> Self {
54 Self::with_http_timeouts(Duration::from_secs(5), Duration::from_secs(30))
55 }
56
57 pub fn with_http_timeouts(http_timeout: Duration, tcp_keepalive: Duration) -> Self {
77 let client = match Client::builder()
80 .timeout(http_timeout)
81 .tcp_keepalive(tcp_keepalive)
82 .pool_max_idle_per_host(0)
86 .build()
87 {
88 Ok(c) => c,
89 Err(e) => {
90 crate::meow_flow_log!(
91 "http_client",
92 "with_http_timeouts build failed, fallback to Client::new(): {}",
93 e
94 );
95 Client::new()
96 }
97 };
98 Self {
99 client,
100 fallback_upload: Arc::new(DefaultStyleUpload::default()),
101 fallback_download: Arc::new(StandardRangeDownload::default()),
102 }
103 }
104
105 pub fn try_with_http_timeouts(
126 http_timeout: Duration,
127 tcp_keepalive: Duration,
128 ) -> Result<Self, MeowError> {
129 let client = Client::builder()
130 .timeout(http_timeout)
131 .tcp_keepalive(tcp_keepalive)
132 .pool_max_idle_per_host(0)
133 .build()
134 .map_err(|e| {
135 MeowError::from_source(
136 InnerErrorCode::HttpClientBuildFailed,
137 format!(
138 "build reqwest client failed (timeout={:?}, keepalive={:?})",
139 http_timeout, tcp_keepalive
140 ),
141 e,
142 )
143 })?;
144 Ok(Self {
145 client,
146 fallback_upload: Arc::new(DefaultStyleUpload::default()),
147 fallback_download: Arc::new(StandardRangeDownload::default()),
148 })
149 }
150
151 pub fn with_client(client: reqwest::Client) -> Self {
163 Self {
164 client,
165 fallback_upload: Arc::new(DefaultStyleUpload::default()),
166 fallback_download: Arc::new(StandardRangeDownload::default()),
167 }
168 }
169
170 pub fn with_fallbacks(
188 client: reqwest::Client,
189 upload: Arc<dyn BreakpointUpload + Send + Sync>,
190 download: Arc<dyn BreakpointDownload + Send + Sync>,
191 ) -> Self {
192 Self {
193 client,
194 fallback_upload: upload,
195 fallback_download: download,
196 }
197 }
198
199 fn client_for(&self, task: &TransferTask) -> reqwest::Client {
201 task.http_client_ref()
202 .cloned()
203 .unwrap_or_else(|| self.client.clone())
204 }
205
206 fn upload_arc(&self, task: &TransferTask) -> Arc<dyn BreakpointUpload + Send + Sync> {
208 match task.breakpoint_upload() {
209 Some(a) => a.clone(),
210 None => self.fallback_upload.clone(),
211 }
212 }
213
214 fn download_arc(&self, task: &TransferTask) -> Arc<dyn BreakpointDownload + Send + Sync> {
216 match task.breakpoint_download() {
217 Some(a) => a.clone(),
218 None => self.fallback_download.clone(),
219 }
220 }
221}
222
223impl Default for DefaultHttpTransfer {
224 fn default() -> Self {
225 Self::new()
226 }
227}
228
229async fn upload_prepare(
230 client: &reqwest::Client,
231 task: &TransferTask,
232 upload: Arc<dyn BreakpointUpload + Send + Sync>,
233 local_offset: u64,
234) -> Result<PrepareOutcome, MeowError> {
235 let max_retries = task.max_upload_prepare_retries();
236 let mut attempt: u32 = 0;
237 loop {
238 crate::meow_flow_log!(
239 "upload_prepare",
240 "start: file={} local_offset={} total={} attempt={} max_retries={}",
241 task.file_name(),
242 local_offset,
243 task.total_size(),
244 attempt,
245 max_retries
246 );
247 match upload_prepare_once(client, task, upload.clone(), local_offset).await {
248 Ok(outcome) => {
249 if attempt > 0 {
250 crate::meow_flow_log!(
251 "upload_prepare",
252 "prepare retry recovered: file={} attempts_used={}",
253 task.file_name(),
254 attempt
255 );
256 }
257 return Ok(outcome);
258 }
259 Err(err) => {
260 let retryable = crate::inner::exec_impl::retry::is_transport_retryable(&err);
261 let reached_limit = attempt >= max_retries;
262 if !retryable || reached_limit {
263 crate::meow_flow_log!(
264 "upload_prepare",
265 "prepare give up: file={} attempt={} max_retries={} retryable={} err={}",
266 task.file_name(),
267 attempt,
268 max_retries,
269 retryable,
270 err
271 );
272 return Err(err);
273 }
274 let delay_ms = crate::inner::exec_impl::retry::calc_backoff_with_jitter_ms(attempt);
275 crate::meow_flow_log!(
276 "upload_prepare",
277 "prepare retry scheduled: file={} next_attempt={} delay_ms={} err={}",
278 task.file_name(),
279 attempt + 1,
280 delay_ms,
281 err
282 );
283 sleep(Duration::from_millis(delay_ms)).await;
284 attempt += 1;
285 }
286 }
287 }
288}
289
290async fn upload_prepare_once(
291 client: &reqwest::Client,
292 task: &TransferTask,
293 upload: Arc<dyn BreakpointUpload + Send + Sync>,
294 local_offset: u64,
295) -> Result<PrepareOutcome, MeowError> {
296 let info = upload
297 .prepare(UploadPrepareCtx {
298 client,
299 task,
300 local_offset,
301 })
302 .await?;
303 if info.completed_file_id.is_some() {
304 let total = task.total_size();
305 crate::meow_flow_log!(
306 "upload_prepare",
307 "server indicates upload already complete: file={} total={}",
308 task.file_name(),
309 total
310 );
311 return Ok(PrepareOutcome {
312 next_offset: total,
313 total_size: total,
314 });
315 }
316 let server_off = info.next_byte.unwrap_or(0);
317 let next = local_offset.max(server_off).min(task.total_size());
318 crate::meow_flow_log!(
319 "upload_prepare",
320 "prepared: server_next={} local_offset={} final_next={}",
321 server_off,
322 local_offset,
323 next
324 );
325 Ok(PrepareOutcome {
326 next_offset: next,
327 total_size: task.total_size(),
328 })
329}
330
331async fn download_prepare(
333 client: &reqwest::Client,
334 task: &TransferTask,
335 download: Arc<dyn BreakpointDownload + Send + Sync>,
336 _local_offset: u64,
337) -> Result<PrepareOutcome, MeowError> {
338 crate::meow_flow_log!(
339 "download_prepare",
340 "start: file={} path={}",
341 task.file_name(),
342 task.file_path().display()
343 );
344 let path = task.file_path();
345 let local_len = match tokio::fs::metadata(path).await {
346 Ok(meta) => meta.len(),
347 Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0u64,
348 Err(e) => {
349 return Err(MeowError::from_source(
350 InnerErrorCode::IoError,
351 format!("download_prepare stat failed: {}", path.display()),
352 e,
353 ));
354 }
355 };
356
357 let start = local_len;
359 let head_url = download.head_url(task);
360 let mut head_headers = task.headers().clone();
361 download.merge_head_headers(DownloadHeadCtx {
362 task,
363 base: &mut head_headers,
364 })?;
365 let head_resp = client
366 .request(Method::HEAD, &head_url)
367 .headers(head_headers)
368 .send()
369 .await
370 .map_err(map_reqwest)?;
371 if !head_resp.status().is_success() {
372 crate::meow_flow_log!(
373 "download_prepare",
374 "head failed: status={}",
375 head_resp.status()
376 );
377 return Err(MeowError::from_code(
378 InnerErrorCode::ResponseStatusError,
379 format!("download_prepare HEAD failed: {}", head_resp.status()),
380 ));
381 }
382 let head_content_length = head_resp
383 .headers()
384 .get(CONTENT_LENGTH)
385 .and_then(|v| v.to_str().ok())
386 .unwrap_or("<missing>");
387 let head_etag = head_resp
388 .headers()
389 .get(ETAG)
390 .and_then(|v| v.to_str().ok())
391 .unwrap_or("<missing>");
392 crate::meow_flow_log!(
393 "download_prepare",
394 "head metadata: url={} content_length={} etag={}",
395 head_url,
396 head_content_length,
397 head_etag
398 );
399 let total = download.total_size_from_head(head_resp.headers())?;
400 if start > total {
401 crate::meow_flow_log!(
402 "download_prepare",
403 "invalid local length larger than remote: local={} remote={}",
404 start,
405 total
406 );
407 return Err(MeowError::from_code_str(
408 InnerErrorCode::InvalidRange,
409 "local file larger than remote content-length",
410 ));
411 }
412 if start >= total {
413 crate::meow_flow_log!(
414 "download_prepare",
415 "already complete by local length: local={} remote={}",
416 start,
417 total
418 );
419 return Ok(PrepareOutcome {
420 next_offset: total,
421 total_size: total,
422 });
423 }
424 crate::meow_flow_log!(
425 "download_prepare",
426 "prepared resume offset: start={} remote_total={}",
427 start,
428 total
429 );
430 Ok(PrepareOutcome {
431 next_offset: start,
432 total_size: total,
433 })
434}
435
436#[async_trait]
437impl TransferTrait for DefaultHttpTransfer {
438 async fn prepare(
440 &self,
441 task: &TransferTask,
442 local_offset: u64,
443 ) -> Result<PrepareOutcome, MeowError> {
444 let client = self.client_for(task);
445 match task.direction() {
446 Direction::Upload => {
447 upload_prepare(&client, task, self.upload_arc(task), local_offset).await
448 }
449 Direction::Download => {
450 download_prepare(&client, task, self.download_arc(task), local_offset).await
451 }
452 }
453 }
454
455 async fn transfer_chunk(
457 &self,
458 task: &TransferTask,
459 offset: u64,
460 chunk_size: u64,
461 remote_total_size: u64,
462 ) -> Result<ChunkOutcome, MeowError> {
463 let client = self.client_for(task);
464 match task.direction() {
465 Direction::Upload => {
466 upload_one_chunk(&client, task, self.upload_arc(task), offset, chunk_size).await
467 }
468 Direction::Download => {
469 download_one_chunk(
470 &client,
471 task,
472 self.download_arc(task),
473 offset,
474 chunk_size,
475 remote_total_size,
476 )
477 .await
478 }
479 }
480 }
481
482 async fn cancel(&self, task: &TransferTask) -> Result<(), MeowError> {
484 if task.direction() != Direction::Upload {
485 return Ok(());
486 }
487 let client = self.client_for(task);
488 self.upload_arc(task).abort_upload(&client, task).await
489 }
490}