1use async_trait::async_trait;
2use reqwest::header::{CONTENT_LENGTH, ETAG};
3use reqwest::{Client, Method};
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::time::sleep;
7
8use crate::chunk_outcome::ChunkOutcome;
9use crate::direction::Direction;
10use crate::error::{InnerErrorCode, MeowError};
11use crate::http_breakpoint::{
12 BreakpointDownload, BreakpointUpload, DefaultStyleUpload, DownloadHeadCtx,
13 StandardRangeDownload, UploadPrepareCtx,
14};
15use crate::prepare_outcome::PrepareOutcome;
16use crate::transfer_executor_trait::TransferTrait;
17use crate::transfer_task::TransferTask;
18
19use super::default_http_transfer_chunks::{download_one_chunk, map_reqwest, upload_one_chunk};
20
21pub(crate) fn default_breakpoint_arcs() -> (
23 Arc<dyn BreakpointUpload + Send + Sync>,
24 Arc<dyn BreakpointDownload + Send + Sync>,
25) {
26 (
27 Arc::new(DefaultStyleUpload::default()),
28 Arc::new(StandardRangeDownload::default()),
29 )
30}
31
32pub struct DefaultHttpTransfer {
34 client: reqwest::Client,
36 fallback_upload: Arc<dyn BreakpointUpload + Send + Sync>,
38 fallback_download: Arc<dyn BreakpointDownload + Send + Sync>,
40}
41
42impl DefaultHttpTransfer {
43 pub fn new() -> Self {
54 Self::with_http_timeouts(Duration::from_secs(5), Duration::from_secs(30))
55 }
56
57 pub fn with_http_timeouts(http_timeout: Duration, tcp_keepalive: Duration) -> Self {
77 let client = match Client::builder()
80 .timeout(http_timeout)
81 .tcp_keepalive(tcp_keepalive)
82 .pool_max_idle_per_host(0)
86 .build()
87 {
88 Ok(c) => c,
89 Err(e) => {
90 crate::meow_flow_log!(
91 "http_client",
92 "with_http_timeouts build failed, fallback to Client::new(): {}",
93 e
94 );
95 Client::new()
96 }
97 };
98 Self {
99 client,
100 fallback_upload: Arc::new(DefaultStyleUpload::default()),
101 fallback_download: Arc::new(StandardRangeDownload::default()),
102 }
103 }
104
105 pub fn try_with_http_timeouts(
126 http_timeout: Duration,
127 tcp_keepalive: Duration,
128 ) -> Result<Self, MeowError> {
129 let client = Client::builder()
130 .timeout(http_timeout)
131 .tcp_keepalive(tcp_keepalive)
132 .pool_max_idle_per_host(0)
133 .build()
134 .map_err(|e| {
135 MeowError::from_source(
136 InnerErrorCode::HttpClientBuildFailed,
137 format!(
138 "build reqwest client failed (timeout={:?}, keepalive={:?})",
139 http_timeout, tcp_keepalive
140 ),
141 e,
142 )
143 })?;
144 Ok(Self {
145 client,
146 fallback_upload: Arc::new(DefaultStyleUpload::default()),
147 fallback_download: Arc::new(StandardRangeDownload::default()),
148 })
149 }
150
151 pub fn with_client(client: reqwest::Client) -> Self {
163 Self {
164 client,
165 fallback_upload: Arc::new(DefaultStyleUpload::default()),
166 fallback_download: Arc::new(StandardRangeDownload::default()),
167 }
168 }
169
170 pub fn with_fallbacks(
188 client: reqwest::Client,
189 upload: Arc<dyn BreakpointUpload + Send + Sync>,
190 download: Arc<dyn BreakpointDownload + Send + Sync>,
191 ) -> Self {
192 Self {
193 client,
194 fallback_upload: upload,
195 fallback_download: download,
196 }
197 }
198
199 fn client_for(&self, task: &TransferTask) -> reqwest::Client {
201 task.http_client_ref()
202 .cloned()
203 .unwrap_or_else(|| self.client.clone())
204 }
205
206 fn upload_arc(&self, task: &TransferTask) -> Arc<dyn BreakpointUpload + Send + Sync> {
208 match task.breakpoint_upload() {
209 Some(a) => a.clone(),
210 None => self.fallback_upload.clone(),
211 }
212 }
213
214 fn download_arc(&self, task: &TransferTask) -> Arc<dyn BreakpointDownload + Send + Sync> {
216 match task.breakpoint_download() {
217 Some(a) => a.clone(),
218 None => self.fallback_download.clone(),
219 }
220 }
221}
222
223impl Default for DefaultHttpTransfer {
224 fn default() -> Self {
225 Self::new()
226 }
227}
228
229async fn upload_prepare(
230 client: &reqwest::Client,
231 task: &TransferTask,
232 upload: Arc<dyn BreakpointUpload + Send + Sync>,
233 local_offset: u64,
234) -> Result<PrepareOutcome, MeowError> {
235 let max_retries = task.max_upload_prepare_retries();
236 let mut attempt: u32 = 0;
237 loop {
238 crate::meow_flow_log!(
239 "upload_prepare",
240 "start: file={} local_offset={} total={} attempt={} max_retries={}",
241 task.file_name(),
242 local_offset,
243 task.total_size(),
244 attempt,
245 max_retries
246 );
247 match upload_prepare_once(client, task, upload.clone(), local_offset).await {
248 Ok(outcome) => {
249 if attempt > 0 {
250 crate::meow_flow_log!(
251 "upload_prepare",
252 "prepare retry recovered: file={} attempts_used={}",
253 task.file_name(),
254 attempt
255 );
256 }
257 return Ok(outcome);
258 }
259 Err(err) => {
260 let retryable = crate::inner::exec_impl::retry::is_transport_retryable(&err);
261 let reached_limit = attempt >= max_retries;
262 if !retryable || reached_limit {
263 crate::meow_flow_log!(
264 "upload_prepare",
265 "prepare give up: file={} attempt={} max_retries={} retryable={} err={}",
266 task.file_name(),
267 attempt,
268 max_retries,
269 retryable,
270 err
271 );
272 return Err(err);
273 }
274 let delay_ms = crate::inner::exec_impl::retry::calc_backoff_with_jitter_ms(attempt);
275 crate::meow_flow_log!(
276 "upload_prepare",
277 "prepare retry scheduled: file={} next_attempt={} delay_ms={} err={}",
278 task.file_name(),
279 attempt + 1,
280 delay_ms,
281 err
282 );
283 sleep(Duration::from_millis(delay_ms)).await;
284 attempt += 1;
285 }
286 }
287 }
288}
289
290async fn upload_prepare_once(
291 client: &reqwest::Client,
292 task: &TransferTask,
293 upload: Arc<dyn BreakpointUpload + Send + Sync>,
294 local_offset: u64,
295) -> Result<PrepareOutcome, MeowError> {
296 let info = upload
297 .prepare(UploadPrepareCtx {
298 client,
299 task,
300 local_offset,
301 })
302 .await?;
303 if info.completed_file_id.is_some() {
304 let total = task.total_size();
305 crate::meow_flow_log!(
306 "upload_prepare",
307 "server indicates upload already complete: file={} total={}",
308 task.file_name(),
309 total
310 );
311 return Ok(PrepareOutcome {
312 next_offset: total,
313 total_size: total,
314 });
315 }
316 let server_off = info.next_byte.unwrap_or(0);
317 let next = local_offset.max(server_off).min(task.total_size());
318 crate::meow_flow_log!(
319 "upload_prepare",
320 "prepared: server_next={} local_offset={} final_next={}",
321 server_off,
322 local_offset,
323 next
324 );
325 Ok(PrepareOutcome {
326 next_offset: next,
327 total_size: task.total_size(),
328 })
329}
330
331async fn download_prepare(
333 client: &reqwest::Client,
334 task: &TransferTask,
335 download: Arc<dyn BreakpointDownload + Send + Sync>,
336 _local_offset: u64,
337) -> Result<PrepareOutcome, MeowError> {
338 crate::meow_flow_log!(
339 "download_prepare",
340 "start: file={} path={}",
341 task.file_name(),
342 task.file_path().display()
343 );
344 let path = task.file_path();
345 let local_len = match tokio::fs::metadata(path).await {
346 Ok(meta) => meta.len(),
347 Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0u64,
348 Err(e) => {
349 return Err(MeowError::from_source(
350 InnerErrorCode::IoError,
351 format!("download_prepare stat failed: {}", path.display()),
352 e,
353 ));
354 }
355 };
356
357 let start = local_len;
359 if let Some(total) = download.total_size_hint(task) {
360 if start > total {
361 crate::meow_flow_log!(
362 "download_prepare",
363 "invalid local length larger than hinted remote: local={} remote={}",
364 start,
365 total
366 );
367 return Err(MeowError::from_code_str(
368 InnerErrorCode::InvalidRange,
369 "local file larger than hinted remote total size",
370 ));
371 }
372 crate::meow_flow_log!(
373 "download_prepare",
374 "prepared from total_size_hint: start={} remote_total={}",
375 start,
376 total
377 );
378 return Ok(PrepareOutcome {
379 next_offset: start.min(total),
380 total_size: total,
381 });
382 }
383
384 let head_url = download.head_url(task);
385 let mut head_headers = task.headers().clone();
386 download.merge_head_headers(DownloadHeadCtx {
387 task,
388 base: &mut head_headers,
389 })?;
390 let head_resp = client
391 .request(Method::HEAD, &head_url)
392 .headers(head_headers)
393 .send()
394 .await
395 .map_err(map_reqwest)?;
396 if !head_resp.status().is_success() {
397 crate::meow_flow_log!(
398 "download_prepare",
399 "head failed: status={}",
400 head_resp.status()
401 );
402 return Err(MeowError::from_code(
403 InnerErrorCode::ResponseStatusError,
404 format!("download_prepare HEAD failed: {}", head_resp.status()),
405 ));
406 }
407 let head_content_length = head_resp
408 .headers()
409 .get(CONTENT_LENGTH)
410 .and_then(|v| v.to_str().ok())
411 .unwrap_or("<missing>");
412 let head_etag = head_resp
413 .headers()
414 .get(ETAG)
415 .and_then(|v| v.to_str().ok())
416 .unwrap_or("<missing>");
417 crate::meow_flow_log!(
418 "download_prepare",
419 "head metadata: url={} content_length={} etag={}",
420 head_url,
421 head_content_length,
422 head_etag
423 );
424 let total = download.total_size_from_head(head_resp.headers())?;
425 if start > total {
426 crate::meow_flow_log!(
427 "download_prepare",
428 "invalid local length larger than remote: local={} remote={}",
429 start,
430 total
431 );
432 return Err(MeowError::from_code_str(
433 InnerErrorCode::InvalidRange,
434 "local file larger than remote content-length",
435 ));
436 }
437 if start >= total {
438 crate::meow_flow_log!(
439 "download_prepare",
440 "already complete by local length: local={} remote={}",
441 start,
442 total
443 );
444 return Ok(PrepareOutcome {
445 next_offset: total,
446 total_size: total,
447 });
448 }
449 crate::meow_flow_log!(
450 "download_prepare",
451 "prepared resume offset: start={} remote_total={}",
452 start,
453 total
454 );
455 Ok(PrepareOutcome {
456 next_offset: start,
457 total_size: total,
458 })
459}
460
461#[async_trait]
462impl TransferTrait for DefaultHttpTransfer {
463 async fn prepare(
465 &self,
466 task: &TransferTask,
467 local_offset: u64,
468 ) -> Result<PrepareOutcome, MeowError> {
469 let client = self.client_for(task);
470 match task.direction() {
471 Direction::Upload => {
472 upload_prepare(&client, task, self.upload_arc(task), local_offset).await
473 }
474 Direction::Download => {
475 download_prepare(&client, task, self.download_arc(task), local_offset).await
476 }
477 }
478 }
479
480 async fn transfer_chunk(
482 &self,
483 task: &TransferTask,
484 offset: u64,
485 chunk_size: u64,
486 remote_total_size: u64,
487 ) -> Result<ChunkOutcome, MeowError> {
488 let client = self.client_for(task);
489 match task.direction() {
490 Direction::Upload => {
491 upload_one_chunk(&client, task, self.upload_arc(task), offset, chunk_size).await
492 }
493 Direction::Download => {
494 download_one_chunk(
495 &client,
496 task,
497 self.download_arc(task),
498 offset,
499 chunk_size,
500 remote_total_size,
501 )
502 .await
503 }
504 }
505 }
506
507 async fn cancel(&self, task: &TransferTask) -> Result<(), MeowError> {
509 if task.direction() != Direction::Upload {
510 return Ok(());
511 }
512 let client = self.client_for(task);
513 self.upload_arc(task).abort_upload(&client, task).await
514 }
515}