1use async_trait::async_trait;
2use reqwest::header::{CONTENT_LENGTH, ETAG};
3use reqwest::{Client, Method};
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::time::sleep;
7
8use crate::chunk_outcome::ChunkOutcome;
9use crate::direction::Direction;
10use crate::error::{InnerErrorCode, MeowError};
11use crate::http_breakpoint::{
12 BreakpointDownload, BreakpointUpload, DefaultStyleUpload, DownloadHeadCtx,
13 StandardRangeDownload, UploadPrepareCtx,
14};
15use crate::prepare_outcome::PrepareOutcome;
16use crate::transfer_executor_trait::TransferTrait;
17use crate::transfer_task::TransferTask;
18
19use super::default_http_transfer_chunks::{download_one_chunk, map_reqwest, upload_one_chunk};
20
21pub(crate) fn default_breakpoint_arcs() -> (
23 Arc<dyn BreakpointUpload + Send + Sync>,
24 Arc<dyn BreakpointDownload + Send + Sync>,
25) {
26 (
27 Arc::new(DefaultStyleUpload::default()),
28 Arc::new(StandardRangeDownload::default()),
29 )
30}
31
32pub struct DefaultHttpTransfer {
34 client: reqwest::Client,
36 fallback_upload: Arc<dyn BreakpointUpload + Send + Sync>,
38 fallback_download: Arc<dyn BreakpointDownload + Send + Sync>,
40}
41
42impl DefaultHttpTransfer {
43 pub fn new() -> Self {
54 Self::with_http_timeouts(Duration::from_secs(5), Duration::from_secs(30))
55 }
56
57 pub fn with_http_timeouts(http_timeout: Duration, tcp_keepalive: Duration) -> Self {
77 let client = match Client::builder()
80 .timeout(http_timeout)
81 .tcp_keepalive(tcp_keepalive)
82 .pool_max_idle_per_host(0)
86 .build()
87 {
88 Ok(c) => c,
89 Err(e) => {
90 crate::meow_flow_log!(
91 "http_client",
92 "with_http_timeouts build failed, fallback to Client::new(): {}",
93 e
94 );
95 Client::new()
96 }
97 };
98 Self {
99 client,
100 fallback_upload: Arc::new(DefaultStyleUpload::default()),
101 fallback_download: Arc::new(StandardRangeDownload::default()),
102 }
103 }
104
105 pub fn try_with_http_timeouts(
126 http_timeout: Duration,
127 tcp_keepalive: Duration,
128 ) -> Result<Self, MeowError> {
129 let client = Client::builder()
130 .timeout(http_timeout)
131 .tcp_keepalive(tcp_keepalive)
132 .pool_max_idle_per_host(0)
133 .build()
134 .map_err(|e| {
135 MeowError::from_source(
136 InnerErrorCode::HttpClientBuildFailed,
137 format!(
138 "build reqwest client failed (timeout={:?}, keepalive={:?})",
139 http_timeout, tcp_keepalive
140 ),
141 e,
142 )
143 })?;
144 Ok(Self {
145 client,
146 fallback_upload: Arc::new(DefaultStyleUpload::default()),
147 fallback_download: Arc::new(StandardRangeDownload::default()),
148 })
149 }
150
151 pub fn with_client(client: reqwest::Client) -> Self {
163 Self {
164 client,
165 fallback_upload: Arc::new(DefaultStyleUpload::default()),
166 fallback_download: Arc::new(StandardRangeDownload::default()),
167 }
168 }
169
170 pub fn with_fallbacks(
188 client: reqwest::Client,
189 upload: Arc<dyn BreakpointUpload + Send + Sync>,
190 download: Arc<dyn BreakpointDownload + Send + Sync>,
191 ) -> Self {
192 Self {
193 client,
194 fallback_upload: upload,
195 fallback_download: download,
196 }
197 }
198
199 fn client_for(&self, task: &TransferTask) -> reqwest::Client {
201 task.http_client_ref()
202 .cloned()
203 .unwrap_or_else(|| self.client.clone())
204 }
205
206 fn upload_arc(&self, task: &TransferTask) -> Arc<dyn BreakpointUpload + Send + Sync> {
208 match task.breakpoint_upload() {
209 Some(a) => a.clone(),
210 None => self.fallback_upload.clone(),
211 }
212 }
213
214 fn download_arc(&self, task: &TransferTask) -> Arc<dyn BreakpointDownload + Send + Sync> {
216 match task.breakpoint_download() {
217 Some(a) => a.clone(),
218 None => self.fallback_download.clone(),
219 }
220 }
221}
222
223impl Default for DefaultHttpTransfer {
224 fn default() -> Self {
225 Self::new()
226 }
227}
228
229async fn upload_prepare(
230 client: &reqwest::Client,
231 task: &TransferTask,
232 upload: Arc<dyn BreakpointUpload + Send + Sync>,
233 local_offset: u64,
234) -> Result<PrepareOutcome, MeowError> {
235 let max_retries = task.max_upload_prepare_retries();
236 let mut attempt: u32 = 0;
237 loop {
238 crate::meow_flow_log!(
239 "upload_prepare",
240 "start: file={} local_offset={} total={} attempt={} max_retries={}",
241 task.file_name(),
242 local_offset,
243 task.total_size(),
244 attempt,
245 max_retries
246 );
247 match upload_prepare_once(client, task, upload.clone(), local_offset).await {
248 Ok(outcome) => {
249 if attempt > 0 {
250 crate::meow_flow_log!(
251 "upload_prepare",
252 "prepare retry recovered: file={} attempts_used={}",
253 task.file_name(),
254 attempt
255 );
256 }
257 return Ok(outcome);
258 }
259 Err(err) => {
260 let retryable = crate::inner::exec_impl::retry::is_transport_retryable(&err);
261 let reached_limit = attempt >= max_retries;
262 if !retryable || reached_limit {
263 crate::meow_flow_log!(
264 "upload_prepare",
265 "prepare give up: file={} attempt={} max_retries={} retryable={} err={}",
266 task.file_name(),
267 attempt,
268 max_retries,
269 retryable,
270 err
271 );
272 return Err(err);
273 }
274 let delay_ms = crate::inner::exec_impl::retry::calc_backoff_with_jitter_ms(attempt);
275 crate::meow_flow_log!(
276 "upload_prepare",
277 "prepare retry scheduled: file={} next_attempt={} delay_ms={} err={}",
278 task.file_name(),
279 attempt + 1,
280 delay_ms,
281 err
282 );
283 sleep(Duration::from_millis(delay_ms)).await;
284 attempt += 1;
285 }
286 }
287 }
288}
289
290async fn upload_prepare_once(
291 client: &reqwest::Client,
292 task: &TransferTask,
293 upload: Arc<dyn BreakpointUpload + Send + Sync>,
294 local_offset: u64,
295) -> Result<PrepareOutcome, MeowError> {
296 let info = upload
297 .prepare(UploadPrepareCtx {
298 client,
299 task,
300 local_offset,
301 })
302 .await?;
303 if info.completed_file_id.is_some() {
304 let total = task.total_size();
305 crate::meow_flow_log!(
306 "upload_prepare",
307 "server indicates upload already complete: file={} total={}",
308 task.file_name(),
309 total
310 );
311 return Ok(PrepareOutcome {
312 next_offset: total,
313 total_size: total,
314 });
315 }
316 let server_off = info.next_byte.unwrap_or(0);
317 let next = local_offset.max(server_off).min(task.total_size());
318 crate::meow_flow_log!(
319 "upload_prepare",
320 "prepared: server_next={} local_offset={} final_next={}",
321 server_off,
322 local_offset,
323 next
324 );
325 Ok(PrepareOutcome {
326 next_offset: next,
327 total_size: task.total_size(),
328 })
329}
330
331async fn download_prepare(
333 client: &reqwest::Client,
334 task: &TransferTask,
335 download: Arc<dyn BreakpointDownload + Send + Sync>,
336 _local_offset: u64,
337) -> Result<PrepareOutcome, MeowError> {
338 crate::meow_flow_log!(
339 "download_prepare",
340 "start: file={} path={}",
341 task.file_name(),
342 task.file_path().display()
343 );
344 let path = task.file_path();
345 let local_len = match tokio::fs::metadata(path).await {
346 Ok(meta) => meta.len(),
347 Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0u64,
350 Err(e) => {
351 return Err(MeowError::from_io(
352 format!("download_prepare stat failed: {}", path.display()),
353 e,
354 ));
355 }
356 };
357
358 let start = local_len;
360 if let Some(total) = download.total_size_hint(task) {
361 if start > total {
362 crate::meow_flow_log!(
363 "download_prepare",
364 "invalid local length larger than hinted remote: local={} remote={}",
365 start,
366 total
367 );
368 return Err(MeowError::from_code_str(
369 InnerErrorCode::InvalidRange,
370 "local file larger than hinted remote total size",
371 ));
372 }
373 crate::meow_flow_log!(
374 "download_prepare",
375 "prepared from total_size_hint: start={} remote_total={}",
376 start,
377 total
378 );
379 return Ok(PrepareOutcome {
380 next_offset: start.min(total),
381 total_size: total,
382 });
383 }
384
385 let head_url = download.head_url(task);
386 let mut head_headers = task.headers().clone();
387 download.merge_head_headers(DownloadHeadCtx {
388 task,
389 base: &mut head_headers,
390 })?;
391 let head_resp = client
392 .request(Method::HEAD, &head_url)
393 .headers(head_headers)
394 .send()
395 .await
396 .map_err(map_reqwest)?;
397 if !head_resp.status().is_success() {
398 crate::meow_flow_log!(
399 "download_prepare",
400 "head failed: status={}",
401 head_resp.status()
402 );
403 return Err(MeowError::from_code(
404 InnerErrorCode::ResponseStatusError,
405 format!("download_prepare HEAD failed: {}", head_resp.status()),
406 ));
407 }
408 let head_content_length = head_resp
409 .headers()
410 .get(CONTENT_LENGTH)
411 .and_then(|v| v.to_str().ok())
412 .unwrap_or("<missing>");
413 let head_etag = head_resp
414 .headers()
415 .get(ETAG)
416 .and_then(|v| v.to_str().ok())
417 .unwrap_or("<missing>");
418 crate::meow_flow_log!(
419 "download_prepare",
420 "head metadata: url={} content_length={} etag={}",
421 head_url,
422 head_content_length,
423 head_etag
424 );
425 let total = download.total_size_from_head(head_resp.headers())?;
426 if start > total {
427 crate::meow_flow_log!(
428 "download_prepare",
429 "invalid local length larger than remote: local={} remote={}",
430 start,
431 total
432 );
433 return Err(MeowError::from_code_str(
434 InnerErrorCode::InvalidRange,
435 "local file larger than remote content-length",
436 ));
437 }
438 if start >= total {
439 crate::meow_flow_log!(
440 "download_prepare",
441 "already complete by local length: local={} remote={}",
442 start,
443 total
444 );
445 return Ok(PrepareOutcome {
446 next_offset: total,
447 total_size: total,
448 });
449 }
450 crate::meow_flow_log!(
451 "download_prepare",
452 "prepared resume offset: start={} remote_total={}",
453 start,
454 total
455 );
456 Ok(PrepareOutcome {
457 next_offset: start,
458 total_size: total,
459 })
460}
461
462#[async_trait]
463impl TransferTrait for DefaultHttpTransfer {
464 async fn prepare(
466 &self,
467 task: &TransferTask,
468 local_offset: u64,
469 ) -> Result<PrepareOutcome, MeowError> {
470 let client = self.client_for(task);
471 match task.direction() {
472 Direction::Upload => {
473 upload_prepare(&client, task, self.upload_arc(task), local_offset).await
474 }
475 Direction::Download => {
476 download_prepare(&client, task, self.download_arc(task), local_offset).await
477 }
478 }
479 }
480
481 async fn transfer_chunk(
483 &self,
484 task: &TransferTask,
485 offset: u64,
486 chunk_size: u64,
487 remote_total_size: u64,
488 ) -> Result<ChunkOutcome, MeowError> {
489 let client = self.client_for(task);
490 match task.direction() {
491 Direction::Upload => {
492 upload_one_chunk(&client, task, self.upload_arc(task), offset, chunk_size).await
493 }
494 Direction::Download => {
495 download_one_chunk(
496 &client,
497 task,
498 self.download_arc(task),
499 offset,
500 chunk_size,
501 remote_total_size,
502 )
503 .await
504 }
505 }
506 }
507
508 async fn cancel(&self, task: &TransferTask) -> Result<(), MeowError> {
510 if task.direction() != Direction::Upload {
511 return Ok(());
512 }
513 let client = self.client_for(task);
514 self.upload_arc(task).abort_upload(&client, task).await
515 }
516}