Skip to main content

rusty_cat/dflt/
default_http_transfer.rs

1use async_trait::async_trait;
2use reqwest::header::{CONTENT_LENGTH, ETAG};
3use reqwest::{Client, Method};
4use std::sync::Arc;
5use std::time::Duration;
6use tokio::time::sleep;
7
8use crate::chunk_outcome::ChunkOutcome;
9use crate::direction::Direction;
10use crate::error::{InnerErrorCode, MeowError};
11use crate::http_breakpoint::{
12    BreakpointDownload, BreakpointUpload, DefaultStyleUpload, DownloadHeadCtx,
13    StandardRangeDownload, UploadPrepareCtx,
14};
15use crate::prepare_outcome::PrepareOutcome;
16use crate::transfer_executor_trait::TransferTrait;
17use crate::transfer_task::TransferTask;
18
19use super::default_http_transfer_chunks::{download_one_chunk, map_reqwest, upload_one_chunk};
20
21/// Creates default breakpoint protocol instances.
22pub(crate) fn default_breakpoint_arcs() -> (
23    Arc<dyn BreakpointUpload + Send + Sync>,
24    Arc<dyn BreakpointDownload + Send + Sync>,
25) {
26    (
27        Arc::new(DefaultStyleUpload::default()),
28        Arc::new(StandardRangeDownload::default()),
29    )
30}
31
32/// Built-in HTTP transfer backend based on `reqwest` and async file I/O.
33pub struct DefaultHttpTransfer {
34    /// Default shared HTTP client.
35    client: reqwest::Client,
36    /// Fallback upload protocol when task does not provide one.
37    fallback_upload: Arc<dyn BreakpointUpload + Send + Sync>,
38    /// Fallback download protocol when task does not provide one.
39    fallback_download: Arc<dyn BreakpointDownload + Send + Sync>,
40}
41
42impl DefaultHttpTransfer {
43    /// Creates a backend with default HTTP timeout and keepalive values.
44    ///
45    /// # Examples
46    ///
47    /// ```no_run
48    /// use rusty_cat::DefaultHttpTransfer;
49    ///
50    /// let backend = DefaultHttpTransfer::new();
51    /// let _ = backend;
52    /// ```
53    pub fn new() -> Self {
54        Self::with_http_timeouts(Duration::from_secs(5), Duration::from_secs(30))
55    }
56
57    /// Creates built-in backend with explicit timeout and keepalive values.
58    ///
59    /// # Range guidance
60    ///
61    /// - `http_timeout`: recommended `1s..=120s`
62    /// - `tcp_keepalive`: recommended `10s..=300s`
63    ///
64    /// # Examples
65    ///
66    /// ```no_run
67    /// use std::time::Duration;
68    /// use rusty_cat::DefaultHttpTransfer;
69    ///
70    /// let backend = DefaultHttpTransfer::with_http_timeouts(
71    ///     Duration::from_secs(15),
72    ///     Duration::from_secs(60),
73    /// );
74    /// let _ = backend;
75    /// ```
76    pub fn with_http_timeouts(http_timeout: Duration, tcp_keepalive: Duration) -> Self {
77        // Keep non-fallible constructor for compatibility.
78        // Prefer `try_with_http_timeouts` in new code for explicit errors.
79        let client = match Client::builder()
80            .timeout(http_timeout)
81            .tcp_keepalive(tcp_keepalive)
82            // Avoid reusing idle TCP connections after user cancel/pause: a pooled conn can be
83            // half-closed and then fail the next HEAD/GET with reset/incomplete message (flaky
84            // under pause/resume and slow range servers).
85            .pool_max_idle_per_host(0)
86            .build()
87        {
88            Ok(c) => c,
89            Err(e) => {
90                crate::meow_flow_log!(
91                    "http_client",
92                    "with_http_timeouts build failed, fallback to Client::new(): {}",
93                    e
94                );
95                Client::new()
96            }
97        };
98        Self {
99            client,
100            fallback_upload: Arc::new(DefaultStyleUpload::default()),
101            fallback_download: Arc::new(StandardRangeDownload::default()),
102        }
103    }
104
105    /// Preferred fallible constructor with explicit error propagation.
106    ///
107    /// # Errors
108    ///
109    /// Returns `HttpClientBuildFailed` when `reqwest::Client` cannot be
110    /// constructed with the provided timeout/keepalive values.
111    ///
112    /// # Examples
113    ///
114    /// ```no_run
115    /// use std::time::Duration;
116    /// use rusty_cat::DefaultHttpTransfer;
117    ///
118    /// let backend = DefaultHttpTransfer::try_with_http_timeouts(
119    ///     Duration::from_secs(10),
120    ///     Duration::from_secs(30),
121    /// )?;
122    /// let _ = backend;
123    /// # Ok::<(), rusty_cat::api::MeowError>(())
124    /// ```
125    pub fn try_with_http_timeouts(
126        http_timeout: Duration,
127        tcp_keepalive: Duration,
128    ) -> Result<Self, MeowError> {
129        let client = Client::builder()
130            .timeout(http_timeout)
131            .tcp_keepalive(tcp_keepalive)
132            .pool_max_idle_per_host(0)
133            .build()
134            .map_err(|e| {
135                MeowError::from_source(
136                    InnerErrorCode::HttpClientBuildFailed,
137                    format!(
138                        "build reqwest client failed (timeout={:?}, keepalive={:?})",
139                        http_timeout, tcp_keepalive
140                    ),
141                    e,
142                )
143            })?;
144        Ok(Self {
145            client,
146            fallback_upload: Arc::new(DefaultStyleUpload::default()),
147            fallback_download: Arc::new(StandardRangeDownload::default()),
148        })
149    }
150
151    /// Creates backend with an externally provided `reqwest::Client`.
152    ///
153    /// # Examples
154    ///
155    /// ```no_run
156    /// use rusty_cat::DefaultHttpTransfer;
157    ///
158    /// let reqwest_client = reqwest::Client::new();
159    /// let backend = DefaultHttpTransfer::with_client(reqwest_client);
160    /// let _ = backend;
161    /// ```
162    pub fn with_client(client: reqwest::Client) -> Self {
163        Self {
164            client,
165            fallback_upload: Arc::new(DefaultStyleUpload::default()),
166            fallback_download: Arc::new(StandardRangeDownload::default()),
167        }
168    }
169
170    /// Creates backend with explicit fallback upload/download protocol plugins.
171    ///
172    /// Task-level protocol instances still take precedence when present.
173    ///
174    /// # Examples
175    ///
176    /// ```no_run
177    /// use std::sync::Arc;
178    /// use rusty_cat::{DefaultHttpTransfer, DefaultStyleUpload, StandardRangeDownload};
179    ///
180    /// let backend = DefaultHttpTransfer::with_fallbacks(
181    ///     reqwest::Client::new(),
182    ///     Arc::new(DefaultStyleUpload::default()),
183    ///     Arc::new(StandardRangeDownload::default()),
184    /// );
185    /// let _ = backend;
186    /// ```
187    pub fn with_fallbacks(
188        client: reqwest::Client,
189        upload: Arc<dyn BreakpointUpload + Send + Sync>,
190        download: Arc<dyn BreakpointDownload + Send + Sync>,
191    ) -> Self {
192        Self {
193            client,
194            fallback_upload: upload,
195            fallback_download: download,
196        }
197    }
198
199    /// Selects HTTP client for a task.
200    fn client_for(&self, task: &TransferTask) -> reqwest::Client {
201        task.http_client_ref()
202            .cloned()
203            .unwrap_or_else(|| self.client.clone())
204    }
205
206    /// Selects upload protocol implementation for a task.
207    fn upload_arc(&self, task: &TransferTask) -> Arc<dyn BreakpointUpload + Send + Sync> {
208        match task.breakpoint_upload() {
209            Some(a) => a.clone(),
210            None => self.fallback_upload.clone(),
211        }
212    }
213
214    /// Selects download protocol implementation for a task.
215    fn download_arc(&self, task: &TransferTask) -> Arc<dyn BreakpointDownload + Send + Sync> {
216        match task.breakpoint_download() {
217            Some(a) => a.clone(),
218            None => self.fallback_download.clone(),
219        }
220    }
221}
222
223impl Default for DefaultHttpTransfer {
224    fn default() -> Self {
225        Self::new()
226    }
227}
228
229async fn upload_prepare(
230    client: &reqwest::Client,
231    task: &TransferTask,
232    upload: Arc<dyn BreakpointUpload + Send + Sync>,
233    local_offset: u64,
234) -> Result<PrepareOutcome, MeowError> {
235    let max_retries = task.max_upload_prepare_retries();
236    let mut attempt: u32 = 0;
237    loop {
238        crate::meow_flow_log!(
239            "upload_prepare",
240            "start: file={} local_offset={} total={} attempt={} max_retries={}",
241            task.file_name(),
242            local_offset,
243            task.total_size(),
244            attempt,
245            max_retries
246        );
247        match upload_prepare_once(client, task, upload.clone(), local_offset).await {
248            Ok(outcome) => {
249                if attempt > 0 {
250                    crate::meow_flow_log!(
251                        "upload_prepare",
252                        "prepare retry recovered: file={} attempts_used={}",
253                        task.file_name(),
254                        attempt
255                    );
256                }
257                return Ok(outcome);
258            }
259            Err(err) => {
260                let retryable = crate::inner::exec_impl::retry::is_transport_retryable(&err);
261                let reached_limit = attempt >= max_retries;
262                if !retryable || reached_limit {
263                    crate::meow_flow_log!(
264                        "upload_prepare",
265                        "prepare give up: file={} attempt={} max_retries={} retryable={} err={}",
266                        task.file_name(),
267                        attempt,
268                        max_retries,
269                        retryable,
270                        err
271                    );
272                    return Err(err);
273                }
274                let delay_ms = crate::inner::exec_impl::retry::calc_backoff_with_jitter_ms(attempt);
275                crate::meow_flow_log!(
276                    "upload_prepare",
277                    "prepare retry scheduled: file={} next_attempt={} delay_ms={} err={}",
278                    task.file_name(),
279                    attempt + 1,
280                    delay_ms,
281                    err
282                );
283                sleep(Duration::from_millis(delay_ms)).await;
284                attempt += 1;
285            }
286        }
287    }
288}
289
290async fn upload_prepare_once(
291    client: &reqwest::Client,
292    task: &TransferTask,
293    upload: Arc<dyn BreakpointUpload + Send + Sync>,
294    local_offset: u64,
295) -> Result<PrepareOutcome, MeowError> {
296    let info = upload
297        .prepare(UploadPrepareCtx {
298            client,
299            task,
300            local_offset,
301        })
302        .await?;
303    if info.completed_file_id.is_some() {
304        let total = task.total_size();
305        crate::meow_flow_log!(
306            "upload_prepare",
307            "server indicates upload already complete: file={} total={}",
308            task.file_name(),
309            total
310        );
311        return Ok(PrepareOutcome {
312            next_offset: total,
313            total_size: total,
314        });
315    }
316    let server_off = info.next_byte.unwrap_or(0);
317    let next = local_offset.max(server_off).min(task.total_size());
318    crate::meow_flow_log!(
319        "upload_prepare",
320        "prepared: server_next={} local_offset={} final_next={}",
321        server_off,
322        local_offset,
323        next
324    );
325    Ok(PrepareOutcome {
326        next_offset: next,
327        total_size: task.total_size(),
328    })
329}
330
331/// Runs download prepare stage and computes resume offset/total size.
332async fn download_prepare(
333    client: &reqwest::Client,
334    task: &TransferTask,
335    download: Arc<dyn BreakpointDownload + Send + Sync>,
336    _local_offset: u64,
337) -> Result<PrepareOutcome, MeowError> {
338    crate::meow_flow_log!(
339        "download_prepare",
340        "start: file={} path={}",
341        task.file_name(),
342        task.file_path().display()
343    );
344    let path = task.file_path();
345    let local_len = match tokio::fs::metadata(path).await {
346        Ok(meta) => meta.len(),
347        Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0u64,
348        Err(e) => {
349            return Err(MeowError::from_source(
350                InnerErrorCode::IoError,
351                format!("download_prepare stat failed: {}", path.display()),
352                e,
353            ));
354        }
355    };
356
357    // Use local persisted length as resume start to avoid sparse gaps.
358    let start = local_len;
359    if let Some(total) = download.total_size_hint(task) {
360        if start > total {
361            crate::meow_flow_log!(
362                "download_prepare",
363                "invalid local length larger than hinted remote: local={} remote={}",
364                start,
365                total
366            );
367            return Err(MeowError::from_code_str(
368                InnerErrorCode::InvalidRange,
369                "local file larger than hinted remote total size",
370            ));
371        }
372        crate::meow_flow_log!(
373            "download_prepare",
374            "prepared from total_size_hint: start={} remote_total={}",
375            start,
376            total
377        );
378        return Ok(PrepareOutcome {
379            next_offset: start.min(total),
380            total_size: total,
381        });
382    }
383
384    let head_url = download.head_url(task);
385    let mut head_headers = task.headers().clone();
386    download.merge_head_headers(DownloadHeadCtx {
387        task,
388        base: &mut head_headers,
389    })?;
390    let head_resp = client
391        .request(Method::HEAD, &head_url)
392        .headers(head_headers)
393        .send()
394        .await
395        .map_err(map_reqwest)?;
396    if !head_resp.status().is_success() {
397        crate::meow_flow_log!(
398            "download_prepare",
399            "head failed: status={}",
400            head_resp.status()
401        );
402        return Err(MeowError::from_code(
403            InnerErrorCode::ResponseStatusError,
404            format!("download_prepare HEAD failed: {}", head_resp.status()),
405        ));
406    }
407    let head_content_length = head_resp
408        .headers()
409        .get(CONTENT_LENGTH)
410        .and_then(|v| v.to_str().ok())
411        .unwrap_or("<missing>");
412    let head_etag = head_resp
413        .headers()
414        .get(ETAG)
415        .and_then(|v| v.to_str().ok())
416        .unwrap_or("<missing>");
417    crate::meow_flow_log!(
418        "download_prepare",
419        "head metadata: url={} content_length={} etag={}",
420        head_url,
421        head_content_length,
422        head_etag
423    );
424    let total = download.total_size_from_head(head_resp.headers())?;
425    if start > total {
426        crate::meow_flow_log!(
427            "download_prepare",
428            "invalid local length larger than remote: local={} remote={}",
429            start,
430            total
431        );
432        return Err(MeowError::from_code_str(
433            InnerErrorCode::InvalidRange,
434            "local file larger than remote content-length",
435        ));
436    }
437    if start >= total {
438        crate::meow_flow_log!(
439            "download_prepare",
440            "already complete by local length: local={} remote={}",
441            start,
442            total
443        );
444        return Ok(PrepareOutcome {
445            next_offset: total,
446            total_size: total,
447        });
448    }
449    crate::meow_flow_log!(
450        "download_prepare",
451        "prepared resume offset: start={} remote_total={}",
452        start,
453        total
454    );
455    Ok(PrepareOutcome {
456        next_offset: start,
457        total_size: total,
458    })
459}
460
461#[async_trait]
462impl TransferTrait for DefaultHttpTransfer {
463    /// Prepares transfer execution according to task direction.
464    async fn prepare(
465        &self,
466        task: &TransferTask,
467        local_offset: u64,
468    ) -> Result<PrepareOutcome, MeowError> {
469        let client = self.client_for(task);
470        match task.direction() {
471            Direction::Upload => {
472                upload_prepare(&client, task, self.upload_arc(task), local_offset).await
473            }
474            Direction::Download => {
475                download_prepare(&client, task, self.download_arc(task), local_offset).await
476            }
477        }
478    }
479
480    /// Transfers one chunk according to task direction.
481    async fn transfer_chunk(
482        &self,
483        task: &TransferTask,
484        offset: u64,
485        chunk_size: u64,
486        remote_total_size: u64,
487    ) -> Result<ChunkOutcome, MeowError> {
488        let client = self.client_for(task);
489        match task.direction() {
490            Direction::Upload => {
491                upload_one_chunk(&client, task, self.upload_arc(task), offset, chunk_size).await
492            }
493            Direction::Download => {
494                download_one_chunk(
495                    &client,
496                    task,
497                    self.download_arc(task),
498                    offset,
499                    chunk_size,
500                    remote_total_size,
501                )
502                .await
503            }
504        }
505    }
506
507    /// Handles task cancel; upload direction may trigger protocol abort.
508    async fn cancel(&self, task: &TransferTask) -> Result<(), MeowError> {
509        if task.direction() != Direction::Upload {
510            return Ok(());
511        }
512        let client = self.client_for(task);
513        self.upload_arc(task).abort_upload(&client, task).await
514    }
515}