1use async_trait::async_trait;
2use reqwest::header::{CONTENT_LENGTH, ETAG};
3use reqwest::{Client, Method};
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::time::sleep;
7
8use crate::chunk_outcome::ChunkOutcome;
9use crate::direction::Direction;
10use crate::error::{InnerErrorCode, MeowError};
11use crate::http_breakpoint::{
12 BreakpointDownload, BreakpointUpload, DefaultStyleUpload, DownloadHeadCtx,
13 StandardRangeDownload, UploadPrepareCtx,
14};
15use crate::prepare_outcome::PrepareOutcome;
16use crate::transfer_executor_trait::TransferTrait;
17use crate::transfer_task::TransferTask;
18
19use super::default_http_transfer_chunks::{
20 download_one_chunk, map_reqwest, upload_one_chunk, upload_one_chunk_part,
21};
22
23pub(crate) fn default_breakpoint_arcs() -> (
25 Arc<dyn BreakpointUpload + Send + Sync>,
26 Arc<dyn BreakpointDownload + Send + Sync>,
27) {
28 (
29 Arc::new(DefaultStyleUpload::default()),
30 Arc::new(StandardRangeDownload::default()),
31 )
32}
33
34const DEFAULT_POOL_MAX_IDLE_PER_HOST: usize = 16;
42
43const DEFAULT_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
52
53const DEFAULT_CONNECT_TIMEOUT_CAP: Duration = Duration::from_secs(10);
62
63pub(crate) fn build_internal_client(
72 http_timeout: Duration,
73 tcp_keepalive: Duration,
74) -> Result<reqwest::Client, reqwest::Error> {
75 Client::builder()
76 .timeout(http_timeout)
77 .connect_timeout(http_timeout.min(DEFAULT_CONNECT_TIMEOUT_CAP))
80 .tcp_keepalive(tcp_keepalive)
81 .pool_max_idle_per_host(DEFAULT_POOL_MAX_IDLE_PER_HOST)
86 .pool_idle_timeout(Some(DEFAULT_POOL_IDLE_TIMEOUT))
87 .build()
88}
89
90pub struct DefaultHttpTransfer {
92 client: reqwest::Client,
94 fallback_upload: Arc<dyn BreakpointUpload + Send + Sync>,
96 fallback_download: Arc<dyn BreakpointDownload + Send + Sync>,
98}
99
100impl DefaultHttpTransfer {
101 pub fn new() -> Self {
112 Self::with_http_timeouts(Duration::from_secs(5), Duration::from_secs(30))
113 }
114
115 pub fn with_http_timeouts(http_timeout: Duration, tcp_keepalive: Duration) -> Self {
135 let client = match build_internal_client(http_timeout, tcp_keepalive) {
138 Ok(c) => c,
139 Err(e) => {
140 crate::meow_flow_log!(
141 "http_client",
142 "with_http_timeouts build failed, fallback to Client::new(): {}",
143 e
144 );
145 Client::new()
146 }
147 };
148 Self {
149 client,
150 fallback_upload: Arc::new(DefaultStyleUpload::default()),
151 fallback_download: Arc::new(StandardRangeDownload::default()),
152 }
153 }
154
155 pub fn try_with_http_timeouts(
176 http_timeout: Duration,
177 tcp_keepalive: Duration,
178 ) -> Result<Self, MeowError> {
179 let client = build_internal_client(http_timeout, tcp_keepalive)
180 .map_err(|e| {
181 MeowError::from_source(
182 InnerErrorCode::HttpClientBuildFailed,
183 format!(
184 "build reqwest client failed (timeout={:?}, keepalive={:?})",
185 http_timeout, tcp_keepalive
186 ),
187 e,
188 )
189 })?;
190 Ok(Self {
191 client,
192 fallback_upload: Arc::new(DefaultStyleUpload::default()),
193 fallback_download: Arc::new(StandardRangeDownload::default()),
194 })
195 }
196
197 pub fn with_client(client: reqwest::Client) -> Self {
209 Self {
210 client,
211 fallback_upload: Arc::new(DefaultStyleUpload::default()),
212 fallback_download: Arc::new(StandardRangeDownload::default()),
213 }
214 }
215
216 pub fn with_fallbacks(
234 client: reqwest::Client,
235 upload: Arc<dyn BreakpointUpload + Send + Sync>,
236 download: Arc<dyn BreakpointDownload + Send + Sync>,
237 ) -> Self {
238 Self {
239 client,
240 fallback_upload: upload,
241 fallback_download: download,
242 }
243 }
244
245 fn client_for(&self, task: &TransferTask) -> reqwest::Client {
247 task.http_client_ref()
248 .cloned()
249 .unwrap_or_else(|| self.client.clone())
250 }
251
252 fn upload_arc(&self, task: &TransferTask) -> Arc<dyn BreakpointUpload + Send + Sync> {
254 match task.breakpoint_upload() {
255 Some(a) => a.clone(),
256 None => self.fallback_upload.clone(),
257 }
258 }
259
260 fn download_arc(&self, task: &TransferTask) -> Arc<dyn BreakpointDownload + Send + Sync> {
262 match task.breakpoint_download() {
263 Some(a) => a.clone(),
264 None => self.fallback_download.clone(),
265 }
266 }
267}
268
269impl Default for DefaultHttpTransfer {
270 fn default() -> Self {
271 Self::new()
272 }
273}
274
275async fn upload_prepare(
276 client: &reqwest::Client,
277 task: &TransferTask,
278 upload: Arc<dyn BreakpointUpload + Send + Sync>,
279 local_offset: u64,
280) -> Result<PrepareOutcome, MeowError> {
281 let max_retries = task.max_upload_prepare_retries();
282 let mut attempt: u32 = 0;
283 loop {
284 crate::meow_flow_log!(
285 "upload_prepare",
286 "start: file={} local_offset={} total={} attempt={} max_retries={}",
287 task.file_name(),
288 local_offset,
289 task.total_size(),
290 attempt,
291 max_retries
292 );
293 match upload_prepare_once(client, task, upload.clone(), local_offset).await {
294 Ok(outcome) => {
295 if attempt > 0 {
296 crate::meow_flow_log!(
297 "upload_prepare",
298 "prepare retry recovered: file={} attempts_used={}",
299 task.file_name(),
300 attempt
301 );
302 }
303 return Ok(outcome);
304 }
305 Err(err) => {
306 let retryable = crate::inner::exec_impl::retry::is_transport_retryable(&err);
307 let reached_limit = attempt >= max_retries;
308 if !retryable || reached_limit {
309 crate::meow_flow_log!(
310 "upload_prepare",
311 "prepare give up: file={} attempt={} max_retries={} retryable={} err={}",
312 task.file_name(),
313 attempt,
314 max_retries,
315 retryable,
316 err
317 );
318 return Err(err);
319 }
320 let delay_ms = crate::inner::exec_impl::retry::calc_backoff_with_jitter_ms(attempt);
321 crate::meow_flow_log!(
322 "upload_prepare",
323 "prepare retry scheduled: file={} next_attempt={} delay_ms={} err={}",
324 task.file_name(),
325 attempt + 1,
326 delay_ms,
327 err
328 );
329 sleep(Duration::from_millis(delay_ms)).await;
330 attempt += 1;
331 }
332 }
333 }
334}
335
336async fn upload_prepare_once(
337 client: &reqwest::Client,
338 task: &TransferTask,
339 upload: Arc<dyn BreakpointUpload + Send + Sync>,
340 local_offset: u64,
341) -> Result<PrepareOutcome, MeowError> {
342 let info = upload
343 .prepare(UploadPrepareCtx {
344 client,
345 task,
346 local_offset,
347 })
348 .await?;
349 if info.completed_file_id.is_some() {
350 let total = task.total_size();
351 crate::meow_flow_log!(
352 "upload_prepare",
353 "server indicates upload already complete: file={} total={}",
354 task.file_name(),
355 total
356 );
357 return Ok(PrepareOutcome {
358 next_offset: total,
359 total_size: total,
360 });
361 }
362 let server_off = info.next_byte.unwrap_or(0);
363 let next = local_offset.max(server_off).min(task.total_size());
364 crate::meow_flow_log!(
365 "upload_prepare",
366 "prepared: server_next={} local_offset={} final_next={}",
367 server_off,
368 local_offset,
369 next
370 );
371 Ok(PrepareOutcome {
372 next_offset: next,
373 total_size: task.total_size(),
374 })
375}
376
377async fn download_prepare(
379 client: &reqwest::Client,
380 task: &TransferTask,
381 download: Arc<dyn BreakpointDownload + Send + Sync>,
382 _local_offset: u64,
383) -> Result<PrepareOutcome, MeowError> {
384 crate::meow_flow_log!(
385 "download_prepare",
386 "start: file={} path={}",
387 task.file_name(),
388 task.file_path().display()
389 );
390 let path = task.file_path();
391 let local_len = match tokio::fs::metadata(path).await {
392 Ok(meta) => meta.len(),
393 Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0u64,
396 Err(e) => {
397 return Err(MeowError::from_io(
398 format!("download_prepare stat failed: {}", path.display()),
399 e,
400 ));
401 }
402 };
403
404 let start = local_len;
406 if let Some(total) = download.total_size_hint(task) {
407 if start > total {
408 crate::meow_flow_log!(
409 "download_prepare",
410 "invalid local length larger than hinted remote: local={} remote={}",
411 start,
412 total
413 );
414 return Err(MeowError::from_code_str(
415 InnerErrorCode::InvalidRange,
416 "local file larger than hinted remote total size",
417 ));
418 }
419 crate::meow_flow_log!(
420 "download_prepare",
421 "prepared from total_size_hint: start={} remote_total={}",
422 start,
423 total
424 );
425 return Ok(PrepareOutcome {
426 next_offset: start.min(total),
427 total_size: total,
428 });
429 }
430
431 let head_url = download.head_url(task);
432 let mut head_headers = task.headers().clone();
433 download.merge_head_headers(DownloadHeadCtx {
434 task,
435 base: &mut head_headers,
436 })?;
437 let head_resp = client
438 .request(Method::HEAD, &head_url)
439 .headers(head_headers)
440 .send()
441 .await
442 .map_err(map_reqwest)?;
443 if !head_resp.status().is_success() {
444 crate::meow_flow_log!(
445 "download_prepare",
446 "head failed: status={}",
447 head_resp.status()
448 );
449 return Err(MeowError::from_code(
450 InnerErrorCode::ResponseStatusError,
451 format!("download_prepare HEAD failed: {}", head_resp.status()),
452 )
453 .with_http_status(head_resp.status().as_u16()));
454 }
455 let head_content_length = head_resp
456 .headers()
457 .get(CONTENT_LENGTH)
458 .and_then(|v| v.to_str().ok())
459 .unwrap_or("<missing>");
460 let head_etag = head_resp
461 .headers()
462 .get(ETAG)
463 .and_then(|v| v.to_str().ok())
464 .unwrap_or("<missing>");
465 crate::meow_flow_log!(
466 "download_prepare",
467 "head metadata: url={} content_length={} etag={}",
468 head_url,
469 head_content_length,
470 head_etag
471 );
472 let total = download.total_size_from_head(head_resp.headers())?;
473 if start > total {
474 crate::meow_flow_log!(
475 "download_prepare",
476 "invalid local length larger than remote: local={} remote={}",
477 start,
478 total
479 );
480 return Err(MeowError::from_code_str(
481 InnerErrorCode::InvalidRange,
482 "local file larger than remote content-length",
483 ));
484 }
485 if start >= total {
486 crate::meow_flow_log!(
487 "download_prepare",
488 "already complete by local length: local={} remote={}",
489 start,
490 total
491 );
492 return Ok(PrepareOutcome {
493 next_offset: total,
494 total_size: total,
495 });
496 }
497 crate::meow_flow_log!(
498 "download_prepare",
499 "prepared resume offset: start={} remote_total={}",
500 start,
501 total
502 );
503 Ok(PrepareOutcome {
504 next_offset: start,
505 total_size: total,
506 })
507}
508
509#[async_trait]
510impl TransferTrait for DefaultHttpTransfer {
511 async fn prepare(
513 &self,
514 task: &TransferTask,
515 local_offset: u64,
516 ) -> Result<PrepareOutcome, MeowError> {
517 let client = self.client_for(task);
518 match task.direction() {
519 Direction::Upload => {
520 upload_prepare(&client, task, self.upload_arc(task), local_offset).await
521 }
522 Direction::Download => {
523 download_prepare(&client, task, self.download_arc(task), local_offset).await
524 }
525 }
526 }
527
528 async fn transfer_chunk(
530 &self,
531 task: &TransferTask,
532 offset: u64,
533 chunk_size: u64,
534 remote_total_size: u64,
535 ) -> Result<ChunkOutcome, MeowError> {
536 let client = self.client_for(task);
537 match task.direction() {
538 Direction::Upload => {
539 upload_one_chunk(&client, task, self.upload_arc(task), offset, chunk_size).await
540 }
541 Direction::Download => {
542 download_one_chunk(
543 &client,
544 task,
545 self.download_arc(task),
546 offset,
547 chunk_size,
548 remote_total_size,
549 )
550 .await
551 }
552 }
553 }
554
555 async fn cancel(&self, task: &TransferTask) -> Result<(), MeowError> {
557 if task.direction() != Direction::Upload {
558 return Ok(());
559 }
560 let client = self.client_for(task);
561 self.upload_arc(task).abort_upload(&client, task).await
562 }
563
564 fn supports_parallel_parts(&self, task: &TransferTask) -> bool {
567 task.direction() == Direction::Upload && self.upload_arc(task).supports_parallel_parts()
568 }
569
570 async fn transfer_chunk_part(
573 &self,
574 task: &TransferTask,
575 offset: u64,
576 chunk_size: u64,
577 remote_total_size: u64,
578 ) -> Result<ChunkOutcome, MeowError> {
579 let client = self.client_for(task);
580 match task.direction() {
581 Direction::Upload => {
582 upload_one_chunk_part(&client, task, self.upload_arc(task), offset, chunk_size).await
583 }
584 Direction::Download => {
586 download_one_chunk(
587 &client,
588 task,
589 self.download_arc(task),
590 offset,
591 chunk_size,
592 remote_total_size,
593 )
594 .await
595 }
596 }
597 }
598
599 async fn complete(&self, task: &TransferTask) -> Result<Option<String>, MeowError> {
602 if task.direction() != Direction::Upload {
603 return Ok(None);
604 }
605 let client = self.client_for(task);
606 self.upload_arc(task).complete_upload(&client, task).await
607 }
608}