Skip to main content

rusty_cat/dflt/
default_http_transfer.rs

1use async_trait::async_trait;
2use reqwest::header::{CONTENT_LENGTH, ETAG};
3use reqwest::{Client, Method};
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::time::sleep;
7
8use crate::chunk_outcome::ChunkOutcome;
9use crate::direction::Direction;
10use crate::error::{InnerErrorCode, MeowError};
11use crate::http_breakpoint::{
12    BreakpointDownload, BreakpointUpload, DefaultStyleUpload, DownloadHeadCtx,
13    StandardRangeDownload, UploadPrepareCtx,
14};
15use crate::prepare_outcome::PrepareOutcome;
16use crate::transfer_executor_trait::TransferTrait;
17use crate::transfer_task::TransferTask;
18
19use super::default_http_transfer_chunks::{
20    download_one_chunk, map_reqwest, upload_one_chunk, upload_one_chunk_part,
21};
22
23/// Creates default breakpoint protocol instances.
24pub(crate) fn default_breakpoint_arcs() -> (
25    Arc<dyn BreakpointUpload + Send + Sync>,
26    Arc<dyn BreakpointDownload + Send + Sync>,
27) {
28    (
29        Arc::new(DefaultStyleUpload::default()),
30        Arc::new(StandardRangeDownload::default()),
31    )
32}
33
34/// Maximum idle connections kept alive per host in the internal pool.
35///
36/// Connection reuse removes a TCP+TLS handshake from every chunk that follows
37/// the first one on the same host, which dominates per-chunk overhead on
38/// high-latency links. The cap stays bounded so long-lived SDK hosts do not
39/// accumulate idle sockets; callers that need a different policy can inject
40/// their own `reqwest::Client` via `MeowConfig::http_client`.
41const DEFAULT_POOL_MAX_IDLE_PER_HOST: usize = 16;
42
43/// How long an idle pooled connection is retained before eviction.
44///
45/// Sequential chunks on one task are issued back-to-back (the inter-chunk gap
46/// is a local file read, i.e. milliseconds), so this comfortably keeps the
47/// connection warm within a transfer while trimming sockets left idle across a
48/// pause. A connection the server silently closed and we still reuse surfaces
49/// as `HttpError`, which every transfer path already retries, so reuse never
50/// turns a recoverable stale socket into a terminal failure.
51const DEFAULT_POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
52
53/// Upper bound applied to the connect phase when building internal clients.
54///
55/// The total request timeout (`http_timeout`) must stay large enough for a slow
56/// chunk body to finish, which would otherwise let a dead TCP/TLS handshake
57/// hang for that whole budget. Capping only the connect phase fails an
58/// unreachable peer fast without shortening a slow-but-alive transfer. The
59/// effective value is `min(http_timeout, cap)`, so small total timeouts are
60/// never exceeded and behavior is unchanged when `http_timeout <= cap`.
61const DEFAULT_CONNECT_TIMEOUT_CAP: Duration = Duration::from_secs(10);
62
63/// Builds an internal `reqwest::Client` with the library's shared transport
64/// policy: a total request timeout, a bounded connect timeout for fast failure
65/// on unreachable peers, TCP keepalive, and a bounded idle connection pool for
66/// handshake reuse across chunks.
67///
68/// Centralizing this keeps every internally created client (the transfer
69/// backend and [`crate::MeowClient::http_client`]) on the exact same policy, so
70/// they can never drift apart.
71pub(crate) fn build_internal_client(
72    http_timeout: Duration,
73    tcp_keepalive: Duration,
74) -> Result<reqwest::Client, reqwest::Error> {
75    Client::builder()
76        .timeout(http_timeout)
77        // Fail fast on an unreachable peer while leaving the total budget for
78        // slow chunk bodies; see `DEFAULT_CONNECT_TIMEOUT_CAP`.
79        .connect_timeout(http_timeout.min(DEFAULT_CONNECT_TIMEOUT_CAP))
80        .tcp_keepalive(tcp_keepalive)
81        // Reuse idle connections to drop a handshake from every subsequent
82        // chunk. A stale socket reused after a pause comes back as `HttpError`,
83        // which all transfer paths retry, so reuse is safe; the bounded cap and
84        // idle timeout only keep idle sockets in check.
85        .pool_max_idle_per_host(DEFAULT_POOL_MAX_IDLE_PER_HOST)
86        .pool_idle_timeout(Some(DEFAULT_POOL_IDLE_TIMEOUT))
87        .build()
88}
89
90/// Built-in HTTP transfer backend based on `reqwest` and async file I/O.
91pub struct DefaultHttpTransfer {
92    /// Default shared HTTP client.
93    client: reqwest::Client,
94    /// Fallback upload protocol when task does not provide one.
95    fallback_upload: Arc<dyn BreakpointUpload + Send + Sync>,
96    /// Fallback download protocol when task does not provide one.
97    fallback_download: Arc<dyn BreakpointDownload + Send + Sync>,
98}
99
100impl DefaultHttpTransfer {
101    /// Creates a backend with default HTTP timeout and keepalive values.
102    ///
103    /// # Examples
104    ///
105    /// ```no_run
106    /// use rusty_cat::DefaultHttpTransfer;
107    ///
108    /// let backend = DefaultHttpTransfer::new();
109    /// let _ = backend;
110    /// ```
111    pub fn new() -> Self {
112        Self::with_http_timeouts(Duration::from_secs(5), Duration::from_secs(30))
113    }
114
115    /// Creates built-in backend with explicit timeout and keepalive values.
116    ///
117    /// # Range guidance
118    ///
119    /// - `http_timeout`: recommended `1s..=120s`
120    /// - `tcp_keepalive`: recommended `10s..=300s`
121    ///
122    /// # Examples
123    ///
124    /// ```no_run
125    /// use std::time::Duration;
126    /// use rusty_cat::DefaultHttpTransfer;
127    ///
128    /// let backend = DefaultHttpTransfer::with_http_timeouts(
129    ///     Duration::from_secs(15),
130    ///     Duration::from_secs(60),
131    /// );
132    /// let _ = backend;
133    /// ```
134    pub fn with_http_timeouts(http_timeout: Duration, tcp_keepalive: Duration) -> Self {
135        // Keep non-fallible constructor for compatibility.
136        // Prefer `try_with_http_timeouts` in new code for explicit errors.
137        let client = match build_internal_client(http_timeout, tcp_keepalive) {
138            Ok(c) => c,
139            Err(e) => {
140                crate::meow_flow_log!(
141                    "http_client",
142                    "with_http_timeouts build failed, fallback to Client::new(): {}",
143                    e
144                );
145                Client::new()
146            }
147        };
148        Self {
149            client,
150            fallback_upload: Arc::new(DefaultStyleUpload::default()),
151            fallback_download: Arc::new(StandardRangeDownload::default()),
152        }
153    }
154
155    /// Preferred fallible constructor with explicit error propagation.
156    ///
157    /// # Errors
158    ///
159    /// Returns `HttpClientBuildFailed` when `reqwest::Client` cannot be
160    /// constructed with the provided timeout/keepalive values.
161    ///
162    /// # Examples
163    ///
164    /// ```no_run
165    /// use std::time::Duration;
166    /// use rusty_cat::DefaultHttpTransfer;
167    ///
168    /// let backend = DefaultHttpTransfer::try_with_http_timeouts(
169    ///     Duration::from_secs(10),
170    ///     Duration::from_secs(30),
171    /// )?;
172    /// let _ = backend;
173    /// # Ok::<(), rusty_cat::api::MeowError>(())
174    /// ```
175    pub fn try_with_http_timeouts(
176        http_timeout: Duration,
177        tcp_keepalive: Duration,
178    ) -> Result<Self, MeowError> {
179        let client = build_internal_client(http_timeout, tcp_keepalive)
180            .map_err(|e| {
181                MeowError::from_source(
182                    InnerErrorCode::HttpClientBuildFailed,
183                    format!(
184                        "build reqwest client failed (timeout={:?}, keepalive={:?})",
185                        http_timeout, tcp_keepalive
186                    ),
187                    e,
188                )
189            })?;
190        Ok(Self {
191            client,
192            fallback_upload: Arc::new(DefaultStyleUpload::default()),
193            fallback_download: Arc::new(StandardRangeDownload::default()),
194        })
195    }
196
197    /// Creates backend with an externally provided `reqwest::Client`.
198    ///
199    /// # Examples
200    ///
201    /// ```no_run
202    /// use rusty_cat::DefaultHttpTransfer;
203    ///
204    /// let reqwest_client = reqwest::Client::new();
205    /// let backend = DefaultHttpTransfer::with_client(reqwest_client);
206    /// let _ = backend;
207    /// ```
208    pub fn with_client(client: reqwest::Client) -> Self {
209        Self {
210            client,
211            fallback_upload: Arc::new(DefaultStyleUpload::default()),
212            fallback_download: Arc::new(StandardRangeDownload::default()),
213        }
214    }
215
216    /// Creates backend with explicit fallback upload/download protocol plugins.
217    ///
218    /// Task-level protocol instances still take precedence when present.
219    ///
220    /// # Examples
221    ///
222    /// ```no_run
223    /// use std::sync::Arc;
224    /// use rusty_cat::{DefaultHttpTransfer, DefaultStyleUpload, StandardRangeDownload};
225    ///
226    /// let backend = DefaultHttpTransfer::with_fallbacks(
227    ///     reqwest::Client::new(),
228    ///     Arc::new(DefaultStyleUpload::default()),
229    ///     Arc::new(StandardRangeDownload::default()),
230    /// );
231    /// let _ = backend;
232    /// ```
233    pub fn with_fallbacks(
234        client: reqwest::Client,
235        upload: Arc<dyn BreakpointUpload + Send + Sync>,
236        download: Arc<dyn BreakpointDownload + Send + Sync>,
237    ) -> Self {
238        Self {
239            client,
240            fallback_upload: upload,
241            fallback_download: download,
242        }
243    }
244
245    /// Selects HTTP client for a task.
246    fn client_for(&self, task: &TransferTask) -> reqwest::Client {
247        task.http_client_ref()
248            .cloned()
249            .unwrap_or_else(|| self.client.clone())
250    }
251
252    /// Selects upload protocol implementation for a task.
253    fn upload_arc(&self, task: &TransferTask) -> Arc<dyn BreakpointUpload + Send + Sync> {
254        match task.breakpoint_upload() {
255            Some(a) => a.clone(),
256            None => self.fallback_upload.clone(),
257        }
258    }
259
260    /// Selects download protocol implementation for a task.
261    fn download_arc(&self, task: &TransferTask) -> Arc<dyn BreakpointDownload + Send + Sync> {
262        match task.breakpoint_download() {
263            Some(a) => a.clone(),
264            None => self.fallback_download.clone(),
265        }
266    }
267}
268
269impl Default for DefaultHttpTransfer {
270    fn default() -> Self {
271        Self::new()
272    }
273}
274
275async fn upload_prepare(
276    client: &reqwest::Client,
277    task: &TransferTask,
278    upload: Arc<dyn BreakpointUpload + Send + Sync>,
279    local_offset: u64,
280) -> Result<PrepareOutcome, MeowError> {
281    let max_retries = task.max_upload_prepare_retries();
282    let mut attempt: u32 = 0;
283    loop {
284        crate::meow_flow_log!(
285            "upload_prepare",
286            "start: file={} local_offset={} total={} attempt={} max_retries={}",
287            task.file_name(),
288            local_offset,
289            task.total_size(),
290            attempt,
291            max_retries
292        );
293        match upload_prepare_once(client, task, upload.clone(), local_offset).await {
294            Ok(outcome) => {
295                if attempt > 0 {
296                    crate::meow_flow_log!(
297                        "upload_prepare",
298                        "prepare retry recovered: file={} attempts_used={}",
299                        task.file_name(),
300                        attempt
301                    );
302                }
303                return Ok(outcome);
304            }
305            Err(err) => {
306                let retryable = crate::inner::exec_impl::retry::is_transport_retryable(&err);
307                let reached_limit = attempt >= max_retries;
308                if !retryable || reached_limit {
309                    crate::meow_flow_log!(
310                        "upload_prepare",
311                        "prepare give up: file={} attempt={} max_retries={} retryable={} err={}",
312                        task.file_name(),
313                        attempt,
314                        max_retries,
315                        retryable,
316                        err
317                    );
318                    return Err(err);
319                }
320                let delay_ms = crate::inner::exec_impl::retry::calc_backoff_with_jitter_ms(attempt);
321                crate::meow_flow_log!(
322                    "upload_prepare",
323                    "prepare retry scheduled: file={} next_attempt={} delay_ms={} err={}",
324                    task.file_name(),
325                    attempt + 1,
326                    delay_ms,
327                    err
328                );
329                sleep(Duration::from_millis(delay_ms)).await;
330                attempt += 1;
331            }
332        }
333    }
334}
335
336async fn upload_prepare_once(
337    client: &reqwest::Client,
338    task: &TransferTask,
339    upload: Arc<dyn BreakpointUpload + Send + Sync>,
340    local_offset: u64,
341) -> Result<PrepareOutcome, MeowError> {
342    let info = upload
343        .prepare(UploadPrepareCtx {
344            client,
345            task,
346            local_offset,
347        })
348        .await?;
349    if info.completed_file_id.is_some() {
350        let total = task.total_size();
351        crate::meow_flow_log!(
352            "upload_prepare",
353            "server indicates upload already complete: file={} total={}",
354            task.file_name(),
355            total
356        );
357        return Ok(PrepareOutcome {
358            next_offset: total,
359            total_size: total,
360        });
361    }
362    let server_off = info.next_byte.unwrap_or(0);
363    let next = local_offset.max(server_off).min(task.total_size());
364    crate::meow_flow_log!(
365        "upload_prepare",
366        "prepared: server_next={} local_offset={} final_next={}",
367        server_off,
368        local_offset,
369        next
370    );
371    Ok(PrepareOutcome {
372        next_offset: next,
373        total_size: task.total_size(),
374    })
375}
376
377/// Runs download prepare stage and computes resume offset/total size.
378async fn download_prepare(
379    client: &reqwest::Client,
380    task: &TransferTask,
381    download: Arc<dyn BreakpointDownload + Send + Sync>,
382    _local_offset: u64,
383) -> Result<PrepareOutcome, MeowError> {
384    crate::meow_flow_log!(
385        "download_prepare",
386        "start: file={} path={}",
387        task.file_name(),
388        task.file_path().display()
389    );
390    let path = task.file_path();
391    let local_len = match tokio::fs::metadata(path).await {
392        Ok(meta) => meta.len(),
393        // A missing file here simply means "no local progress yet"; treat it as
394        // a fresh download rather than a mid-transfer removal.
395        Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0u64,
396        Err(e) => {
397            return Err(MeowError::from_io(
398                format!("download_prepare stat failed: {}", path.display()),
399                e,
400            ));
401        }
402    };
403
404    // Use local persisted length as resume start to avoid sparse gaps.
405    let start = local_len;
406    if let Some(total) = download.total_size_hint(task) {
407        if start > total {
408            crate::meow_flow_log!(
409                "download_prepare",
410                "invalid local length larger than hinted remote: local={} remote={}",
411                start,
412                total
413            );
414            return Err(MeowError::from_code_str(
415                InnerErrorCode::InvalidRange,
416                "local file larger than hinted remote total size",
417            ));
418        }
419        crate::meow_flow_log!(
420            "download_prepare",
421            "prepared from total_size_hint: start={} remote_total={}",
422            start,
423            total
424        );
425        return Ok(PrepareOutcome {
426            next_offset: start.min(total),
427            total_size: total,
428        });
429    }
430
431    let head_url = download.head_url(task);
432    let mut head_headers = task.headers().clone();
433    download.merge_head_headers(DownloadHeadCtx {
434        task,
435        base: &mut head_headers,
436    })?;
437    let head_resp = client
438        .request(Method::HEAD, &head_url)
439        .headers(head_headers)
440        .send()
441        .await
442        .map_err(map_reqwest)?;
443    if !head_resp.status().is_success() {
444        crate::meow_flow_log!(
445            "download_prepare",
446            "head failed: status={}",
447            head_resp.status()
448        );
449        return Err(MeowError::from_code(
450            InnerErrorCode::ResponseStatusError,
451            format!("download_prepare HEAD failed: {}", head_resp.status()),
452        )
453        .with_http_status(head_resp.status().as_u16()));
454    }
455    let head_content_length = head_resp
456        .headers()
457        .get(CONTENT_LENGTH)
458        .and_then(|v| v.to_str().ok())
459        .unwrap_or("<missing>");
460    let head_etag = head_resp
461        .headers()
462        .get(ETAG)
463        .and_then(|v| v.to_str().ok())
464        .unwrap_or("<missing>");
465    crate::meow_flow_log!(
466        "download_prepare",
467        "head metadata: url={} content_length={} etag={}",
468        head_url,
469        head_content_length,
470        head_etag
471    );
472    let total = download.total_size_from_head(head_resp.headers())?;
473    if start > total {
474        crate::meow_flow_log!(
475            "download_prepare",
476            "invalid local length larger than remote: local={} remote={}",
477            start,
478            total
479        );
480        return Err(MeowError::from_code_str(
481            InnerErrorCode::InvalidRange,
482            "local file larger than remote content-length",
483        ));
484    }
485    if start >= total {
486        crate::meow_flow_log!(
487            "download_prepare",
488            "already complete by local length: local={} remote={}",
489            start,
490            total
491        );
492        return Ok(PrepareOutcome {
493            next_offset: total,
494            total_size: total,
495        });
496    }
497    crate::meow_flow_log!(
498        "download_prepare",
499        "prepared resume offset: start={} remote_total={}",
500        start,
501        total
502    );
503    Ok(PrepareOutcome {
504        next_offset: start,
505        total_size: total,
506    })
507}
508
509#[async_trait]
510impl TransferTrait for DefaultHttpTransfer {
511    /// Prepares transfer execution according to task direction.
512    async fn prepare(
513        &self,
514        task: &TransferTask,
515        local_offset: u64,
516    ) -> Result<PrepareOutcome, MeowError> {
517        let client = self.client_for(task);
518        match task.direction() {
519            Direction::Upload => {
520                upload_prepare(&client, task, self.upload_arc(task), local_offset).await
521            }
522            Direction::Download => {
523                download_prepare(&client, task, self.download_arc(task), local_offset).await
524            }
525        }
526    }
527
528    /// Transfers one chunk according to task direction.
529    async fn transfer_chunk(
530        &self,
531        task: &TransferTask,
532        offset: u64,
533        chunk_size: u64,
534        remote_total_size: u64,
535    ) -> Result<ChunkOutcome, MeowError> {
536        let client = self.client_for(task);
537        match task.direction() {
538            Direction::Upload => {
539                upload_one_chunk(&client, task, self.upload_arc(task), offset, chunk_size).await
540            }
541            Direction::Download => {
542                download_one_chunk(
543                    &client,
544                    task,
545                    self.download_arc(task),
546                    offset,
547                    chunk_size,
548                    remote_total_size,
549                )
550                .await
551            }
552        }
553    }
554
555    /// Handles task cancel; upload direction may trigger protocol abort.
556    async fn cancel(&self, task: &TransferTask) -> Result<(), MeowError> {
557        if task.direction() != Direction::Upload {
558            return Ok(());
559        }
560        let client = self.client_for(task);
561        self.upload_arc(task).abort_upload(&client, task).await
562    }
563
564    /// Parallel parts are only offered for uploads whose resolved protocol
565    /// proves out-of-order safety; downloads always stay serial.
566    fn supports_parallel_parts(&self, task: &TransferTask) -> bool {
567        task.direction() == Direction::Upload && self.upload_arc(task).supports_parallel_parts()
568    }
569
570    /// Uploads one chunk without finalizing (parallel path). Completion is run
571    /// exactly once by the scheduler via [`Self::complete`].
572    async fn transfer_chunk_part(
573        &self,
574        task: &TransferTask,
575        offset: u64,
576        chunk_size: u64,
577        remote_total_size: u64,
578    ) -> Result<ChunkOutcome, MeowError> {
579        let client = self.client_for(task);
580        match task.direction() {
581            Direction::Upload => {
582                upload_one_chunk_part(&client, task, self.upload_arc(task), offset, chunk_size).await
583            }
584            // Download has no finalize step; behaves exactly like transfer_chunk.
585            Direction::Download => {
586                download_one_chunk(
587                    &client,
588                    task,
589                    self.download_arc(task),
590                    offset,
591                    chunk_size,
592                    remote_total_size,
593                )
594                .await
595            }
596        }
597    }
598
599    /// Finalizes an upload after all parts have been uploaded; delegates to the
600    /// protocol's `complete_upload`.
601    async fn complete(&self, task: &TransferTask) -> Result<Option<String>, MeowError> {
602        if task.direction() != Direction::Upload {
603            return Ok(None);
604        }
605        let client = self.client_for(task);
606        self.upload_arc(task).complete_upload(&client, task).await
607    }
608}