Skip to main content

takanawa_http/
downloader.rs

1use std::collections::VecDeque;
2use std::future::Future;
3use std::path::PathBuf;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use bytes::Bytes;
9use futures_util::StreamExt;
10use reqwest::header::{
11    ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_RANGE, ETAG, HeaderMap,
12    LAST_MODIFIED, RANGE,
13};
14use reqwest::{Client, StatusCode};
15use takanawa_core::{
16    Chunk, ChunkPlan, DEFAULT_CHUNK_SIZE, HashConfig, PartFile, PartMetadata, RemoteInfo, Result,
17    TakanawaError,
18};
19use tokio::runtime::Runtime;
20use tokio::sync::{mpsc, oneshot};
21use tokio::task::JoinSet;
22
23use crate::content_range::{parse_content_range, parse_unsatisfied_total};
24use crate::limiter::IoLimiter;
25use crate::state::{DownloadSnapshot, SharedState};
26
27const DEFAULT_MAX_RETRIES: u32 = 4;
28const DEFAULT_BACKOFF_INITIAL: Duration = Duration::from_millis(100);
29const DEFAULT_BACKOFF_MAX: Duration = Duration::from_secs(3);
30const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
31const WRITE_QUEUE_DEPTH_PER_CHUNK: usize = 8;
32
33#[derive(Debug, Clone)]
34pub struct DownloadConfig {
35    pub url: String,
36    pub target_path: PathBuf,
37    pub chunk_size: u64,
38    pub parallelism: usize,
39    pub max_parallel_chunks: usize,
40    pub retry: RetryConfig,
41    pub timeout: TimeoutConfig,
42    pub bytes_per_second_limit: u64,
43    pub hash: HashConfig,
44}
45
46impl DownloadConfig {
47    #[must_use]
48    pub fn normalized(mut self) -> Self {
49        if self.chunk_size == 0 {
50            self.chunk_size = DEFAULT_CHUNK_SIZE;
51        }
52        self.retry = self.retry.normalized();
53        self.timeout = self.timeout.normalized();
54        self
55    }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub struct RetryConfig {
60    pub max_retries: u32,
61    pub backoff_initial: Duration,
62    pub backoff_max: Duration,
63}
64
65impl Default for RetryConfig {
66    fn default() -> Self {
67        Self {
68            max_retries: DEFAULT_MAX_RETRIES,
69            backoff_initial: DEFAULT_BACKOFF_INITIAL,
70            backoff_max: DEFAULT_BACKOFF_MAX,
71        }
72    }
73}
74
75impl RetryConfig {
76    #[must_use]
77    pub fn normalized(self) -> Self {
78        let default = Self::default();
79        let backoff_initial = if self.backoff_initial.is_zero() {
80            default.backoff_initial
81        } else {
82            self.backoff_initial
83        };
84        let backoff_max = if self.backoff_max.is_zero() {
85            default.backoff_max
86        } else {
87            self.backoff_max.max(backoff_initial)
88        };
89        Self {
90            max_retries: self.max_retries,
91            backoff_initial,
92            backoff_max,
93        }
94    }
95
96    fn max_attempts(self) -> u32 {
97        self.max_retries.saturating_add(1).max(1)
98    }
99}
100
101#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
102pub struct TimeoutConfig {
103    pub connect: Duration,
104    pub read: Duration,
105    pub total: Duration,
106}
107
108impl TimeoutConfig {
109    #[must_use]
110    pub fn normalized(self) -> Self {
111        Self {
112            connect: if self.connect.is_zero() {
113                DEFAULT_CONNECT_TIMEOUT
114            } else {
115                self.connect
116            },
117            read: self.read,
118            total: self.total,
119        }
120    }
121}
122
123#[derive(Debug, Clone)]
124pub struct DownloadEngine {
125    client: Client,
126    limiter: IoLimiter,
127}
128
129impl DownloadEngine {
130    pub fn new(max_io: usize) -> Result<Self> {
131        let client = build_client(TimeoutConfig::default().normalized())?;
132        Ok(Self {
133            client,
134            limiter: IoLimiter::new(max_io.max(1)),
135        })
136    }
137
138    #[must_use]
139    pub fn max_io(&self) -> usize {
140        self.limiter.max()
141    }
142
143    pub fn set_max_io(&self, max_io: usize) {
144        self.limiter.set_max(max_io);
145    }
146
147    fn default_parallelism(&self) -> usize {
148        self.max_io().clamp(1, 4)
149    }
150
151    fn with_timeout(&self, timeout: TimeoutConfig) -> Result<Self> {
152        Ok(Self {
153            client: build_client(timeout)?,
154            limiter: self.limiter.clone(),
155        })
156    }
157}
158
159fn client_builder() -> reqwest::ClientBuilder {
160    let builder = Client::builder();
161    #[cfg(feature = "tls-rustls")]
162    {
163        let roots = rustls::RootCertStore {
164            roots: webpki_roots::TLS_SERVER_ROOTS.to_vec(),
165        };
166        let tls_config = rustls::ClientConfig::builder()
167            .with_root_certificates(roots)
168            .with_no_client_auth();
169        return builder.tls_backend_preconfigured(tls_config);
170    }
171    #[allow(unreachable_code)]
172    builder
173}
174
175fn build_client(timeout: TimeoutConfig) -> Result<Client> {
176    let mut builder = client_builder()
177        .user_agent("takanawa/0.1")
178        .connect_timeout(timeout.connect);
179    if !timeout.read.is_zero() {
180        builder = builder.read_timeout(timeout.read);
181    }
182    builder
183        .build()
184        .map_err(|err| TakanawaError::InvalidConfig(format!("failed to build HTTP client: {err}")))
185}
186
187#[derive(Debug)]
188pub struct DownloadHandle {
189    engine: DownloadEngine,
190    config: DownloadConfig,
191    state: SharedState,
192    control: Arc<Control>,
193    join: Mutex<Option<tokio::task::JoinHandle<()>>>,
194}
195
196#[derive(Debug, Default)]
197struct Control {
198    pause: AtomicBool,
199    cancel: AtomicBool,
200}
201
202impl DownloadHandle {
203    #[must_use]
204    pub fn new(engine: DownloadEngine, config: DownloadConfig) -> Self {
205        Self {
206            engine,
207            config: config.normalized(),
208            state: SharedState::new(),
209            control: Arc::new(Control::default()),
210            join: Mutex::new(None),
211        }
212    }
213
214    pub fn start_on(&self, runtime: &Runtime) -> Result<()> {
215        let mut join = self.join.lock().expect("download join mutex poisoned");
216        if join.as_ref().is_some_and(|handle| !handle.is_finished()) {
217            return Err(TakanawaError::AlreadyStarted);
218        }
219        if join
220            .as_ref()
221            .is_some_and(tokio::task::JoinHandle::is_finished)
222        {
223            let _ = join.take();
224        }
225
226        self.control.pause.store(false, Ordering::Relaxed);
227        self.control.cancel.store(false, Ordering::Relaxed);
228        self.state.clear_error();
229        self.state.mark_running();
230
231        let engine = self.engine.clone();
232        let config = self.config.clone();
233        let state = self.state.clone();
234        let control = Arc::clone(&self.control);
235        *join = Some(runtime.spawn(async move {
236            if let Err(err) = run_download(engine, config, state.clone(), control).await {
237                match err {
238                    TakanawaError::Cancelled => state.mark_cancelled(),
239                    err => state.mark_failed(err.to_string()),
240                }
241            }
242        }));
243        Ok(())
244    }
245
246    pub fn pause(&self) -> Result<()> {
247        self.control.pause.store(true, Ordering::Relaxed);
248        self.state.request_pause();
249        Ok(())
250    }
251
252    pub fn cancel(&self) -> Result<()> {
253        self.control.cancel.store(true, Ordering::Relaxed);
254        self.state.request_cancel();
255        if self
256            .join
257            .lock()
258            .expect("download join mutex poisoned")
259            .as_ref()
260            .is_none_or(tokio::task::JoinHandle::is_finished)
261        {
262            self.state.mark_cancelled();
263        }
264        Ok(())
265    }
266
267    #[must_use]
268    pub fn snapshot(&self) -> DownloadSnapshot {
269        self.state.snapshot()
270    }
271
272    #[must_use]
273    pub fn bitmap(&self) -> Vec<u8> {
274        self.state.bitmap()
275    }
276}
277
278impl Drop for DownloadHandle {
279    fn drop(&mut self) {
280        self.control.cancel.store(true, Ordering::Relaxed);
281        if let Some(join) = self
282            .join
283            .lock()
284            .expect("download join mutex poisoned")
285            .take()
286        {
287            join.abort();
288        }
289    }
290}
291
292pub async fn download_to_completion(
293    engine: DownloadEngine,
294    config: DownloadConfig,
295) -> Result<DownloadSnapshot> {
296    let state = SharedState::new();
297    let control = Arc::new(Control::default());
298    run_download(engine, config.normalized(), state.clone(), control).await?;
299    Ok(state.snapshot())
300}
301
302#[allow(clippy::too_many_lines)]
303async fn run_download(
304    engine: DownloadEngine,
305    config: DownloadConfig,
306    state: SharedState,
307    control: Arc<Control>,
308) -> Result<()> {
309    let config = config.normalized();
310    let engine = engine.with_timeout(config.timeout)?;
311    let bandwidth = Arc::new(BandwidthLimiter::new(config.bytes_per_second_limit));
312    state.mark_running();
313    let remote = probe_with_retry(&engine, &config, &state, &control).await?;
314    let chunk_plan = ChunkPlan::new(remote.content_len, config.chunk_size)?;
315    let target_path = config.target_path.clone();
316    let url = config.url.clone();
317    let hash = config.hash;
318    let chunk_size = config.chunk_size;
319
320    let part = tokio::task::spawn_blocking(move || {
321        PartFile::open_or_create(&target_path, &url, &remote, chunk_size, hash)
322    })
323    .await
324    .map_err(|err| TakanawaError::Ffi(format!("part open task failed: {err}")))??;
325    state.update_from_metadata(part.metadata());
326
327    if part.metadata().all_complete() {
328        finalize_part(part, &config, &state).await?;
329        return Ok(());
330    }
331
332    let mut pending: VecDeque<u64> = part.incomplete_chunks().into();
333    let requested_parallelism = if config.max_parallel_chunks == 0 {
334        config.parallelism
335    } else {
336        config.max_parallel_chunks
337    };
338    let parallelism = if requested_parallelism == 0 {
339        engine.default_parallelism()
340    } else {
341        requested_parallelism.max(1)
342    };
343    let writer_capacity = parallelism
344        .max(1)
345        .saturating_mul(WRITE_QUEUE_DEPTH_PER_CHUNK);
346    let (writer_tx, writer_join) = spawn_part_writer(part, writer_capacity);
347    let mut tasks = JoinSet::new();
348
349    loop {
350        if control.cancel.load(Ordering::Relaxed) {
351            tasks.shutdown().await;
352            let _part = finish_part_writer(writer_tx, writer_join).await?;
353            state.mark_cancelled();
354            return Err(TakanawaError::Cancelled);
355        }
356        if control.pause.load(Ordering::Relaxed) {
357            tasks.shutdown().await;
358            let _part = finish_part_writer(writer_tx, writer_join).await?;
359            state.mark_paused();
360            return Ok(());
361        }
362
363        while !control.pause.load(Ordering::Relaxed)
364            && !control.cancel.load(Ordering::Relaxed)
365            && tasks.len() < parallelism
366        {
367            let Some(index) = pending.pop_front() else {
368                break;
369            };
370            let chunk = chunk_plan.chunk(index)?;
371            let engine = engine.clone();
372            let config = config.clone();
373            let state = state.clone();
374            let control = Arc::clone(&control);
375            let bandwidth = Arc::clone(&bandwidth);
376            let writer_tx = writer_tx.clone();
377            tasks.spawn(async move {
378                let result = fetch_chunk_with_retry(
379                    &engine, &config, chunk, &state, &control, &bandwidth, &writer_tx,
380                )
381                .await?;
382                Ok::<_, TakanawaError>(result)
383            });
384        }
385
386        if tasks.is_empty() {
387            break;
388        }
389
390        let Some(result) = tasks.join_next().await else {
391            break;
392        };
393        let task_result = match result {
394            Ok(Ok(task_result)) => task_result,
395            Ok(Err(err)) => {
396                tasks.shutdown().await;
397                let err = finish_part_writer_after_error(writer_tx, writer_join, err).await;
398                return Err(err);
399            }
400            Err(err) => {
401                tasks.shutdown().await;
402                let err = finish_part_writer_after_error(
403                    writer_tx,
404                    writer_join,
405                    TakanawaError::Ffi(format!("download task failed: {err}")),
406                )
407                .await;
408                return Err(err);
409            }
410        };
411        match task_result {
412            ChunkTaskResult::Committed(metadata) => state.update_from_metadata(&metadata),
413            ChunkTaskResult::Paused => {
414                tasks.shutdown().await;
415                let _part = finish_part_writer(writer_tx, writer_join).await?;
416                state.mark_paused();
417                return Ok(());
418            }
419        }
420
421        if control.pause.load(Ordering::Relaxed) && tasks.is_empty() {
422            let _part = finish_part_writer(writer_tx, writer_join).await?;
423            state.mark_paused();
424            return Ok(());
425        }
426    }
427
428    let part = finish_part_writer(writer_tx, writer_join).await?;
429
430    if control.pause.load(Ordering::Relaxed) && !part.metadata().all_complete() {
431        state.mark_paused();
432        return Ok(());
433    }
434
435    finalize_part(part, &config, &state).await
436}
437
438enum ChunkTaskResult {
439    Committed(PartMetadata),
440    Paused,
441}
442
443enum FetchChunkStatus {
444    Complete,
445    Paused,
446}
447
448enum WriterCommand {
449    Write {
450        index: u64,
451        chunk_offset: u64,
452        bytes: Bytes,
453    },
454    Commit {
455        index: u64,
456        result: oneshot::Sender<Result<PartMetadata>>,
457    },
458}
459
460fn spawn_part_writer(
461    part: PartFile,
462    capacity: usize,
463) -> (
464    mpsc::Sender<WriterCommand>,
465    tokio::task::JoinHandle<Result<PartFile>>,
466) {
467    let (writer_tx, mut writer_rx) = mpsc::channel(capacity.max(1));
468    let writer_join = tokio::task::spawn_blocking(move || {
469        let mut part = part;
470        while let Some(command) = writer_rx.blocking_recv() {
471            match command {
472                WriterCommand::Write {
473                    index,
474                    chunk_offset,
475                    bytes,
476                } => {
477                    part.write_chunk_bytes(index, chunk_offset, &bytes)?;
478                }
479                WriterCommand::Commit { index, result } => {
480                    let metadata = match part.commit_chunk(index) {
481                        Ok(()) => part.metadata().clone(),
482                        Err(err) => {
483                            let message = err.to_string();
484                            let _ = result.send(Err(err));
485                            return Err(TakanawaError::Ffi(format!(
486                                "part writer commit failed: {message}"
487                            )));
488                        }
489                    };
490                    let _ = result.send(Ok(metadata));
491                }
492            }
493        }
494        Ok(part)
495    });
496    (writer_tx, writer_join)
497}
498
499async fn finish_part_writer(
500    writer_tx: mpsc::Sender<WriterCommand>,
501    writer_join: tokio::task::JoinHandle<Result<PartFile>>,
502) -> Result<PartFile> {
503    drop(writer_tx);
504    writer_join
505        .await
506        .map_err(|err| TakanawaError::Ffi(format!("part writer task failed: {err}")))?
507}
508
509async fn finish_part_writer_after_error(
510    writer_tx: mpsc::Sender<WriterCommand>,
511    writer_join: tokio::task::JoinHandle<Result<PartFile>>,
512    err: TakanawaError,
513) -> TakanawaError {
514    match finish_part_writer(writer_tx, writer_join).await {
515        Err(writer_err) if matches!(err, TakanawaError::Ffi(_)) => writer_err,
516        Ok(_) | Err(TakanawaError::Ffi(_)) => err,
517        Err(writer_err) => writer_err,
518    }
519}
520
521async fn finalize_part(part: PartFile, config: &DownloadConfig, state: &SharedState) -> Result<()> {
522    let target_path = config.target_path.clone();
523    tokio::task::spawn_blocking(move || part.finalize(&target_path))
524        .await
525        .map_err(|err| TakanawaError::Ffi(format!("finalize task failed: {err}")))??;
526    state.mark_completed();
527    Ok(())
528}
529
530async fn probe_with_retry(
531    engine: &DownloadEngine,
532    config: &DownloadConfig,
533    state: &SharedState,
534    control: &Control,
535) -> Result<RemoteInfo> {
536    let retry = config.retry.normalized();
537    let mut delay = retry.backoff_initial;
538    for attempt in 1..=retry.max_attempts() {
539        if control.cancel.load(Ordering::Relaxed) {
540            return Err(TakanawaError::Cancelled);
541        }
542        match with_total_timeout(config.timeout.total, probe_once(engine, &config.url, state)).await
543        {
544            Ok(remote) => return Ok(remote),
545            Err(err) if err.is_retryable() && attempt < retry.max_attempts() => {
546                tokio::time::sleep(delay).await;
547                delay = (delay * 2).min(retry.backoff_max);
548            }
549            Err(err) => return Err(err),
550        }
551    }
552    Err(TakanawaError::Network(
553        "probe exhausted retry attempts".to_owned(),
554    ))
555}
556
557async fn fetch_chunk_with_retry(
558    engine: &DownloadEngine,
559    config: &DownloadConfig,
560    chunk: Chunk,
561    state: &SharedState,
562    control: &Control,
563    bandwidth: &BandwidthLimiter,
564    writer_tx: &mpsc::Sender<WriterCommand>,
565) -> Result<ChunkTaskResult> {
566    let retry = config.retry.normalized();
567    let mut delay = retry.backoff_initial;
568    for attempt in 1..=retry.max_attempts() {
569        if control.cancel.load(Ordering::Relaxed) {
570            return Err(TakanawaError::Cancelled);
571        }
572        if control.pause.load(Ordering::Relaxed) {
573            return Ok(ChunkTaskResult::Paused);
574        }
575        match with_total_timeout(
576            config.timeout.total,
577            fetch_chunk_once(
578                engine,
579                &config.url,
580                chunk,
581                state,
582                control,
583                bandwidth,
584                writer_tx,
585            ),
586        )
587        .await
588        {
589            Ok(FetchChunkStatus::Complete) => {
590                if control.cancel.load(Ordering::Relaxed) {
591                    return Err(TakanawaError::Cancelled);
592                }
593                if control.pause.load(Ordering::Relaxed) {
594                    return Ok(ChunkTaskResult::Paused);
595                }
596                let metadata = commit_written_chunk(writer_tx, chunk.index).await?;
597                return Ok(ChunkTaskResult::Committed(metadata));
598            }
599            Ok(FetchChunkStatus::Paused) => return Ok(ChunkTaskResult::Paused),
600            Err(err) if err.is_retryable() && attempt < retry.max_attempts() => {
601                tokio::time::sleep(delay).await;
602                delay = (delay * 2).min(retry.backoff_max);
603            }
604            Err(err) => return Err(err),
605        }
606    }
607    Err(TakanawaError::Network(format!(
608        "chunk {} exhausted retry attempts",
609        chunk.index
610    )))
611}
612
613async fn probe_once(engine: &DownloadEngine, url: &str, state: &SharedState) -> Result<RemoteInfo> {
614    let _permit = engine.limiter.acquire().await;
615    let _active_io = ActiveIoGuard::new(state.clone());
616    let response = engine
617        .client
618        .get(url)
619        .header(RANGE, "bytes=0-0")
620        .header(ACCEPT_ENCODING, "identity")
621        .send()
622        .await
623        .map_err(map_reqwest_error)?;
624
625    if response.status() == StatusCode::RANGE_NOT_SATISFIABLE {
626        let total = response
627            .headers()
628            .get(CONTENT_RANGE)
629            .ok_or_else(|| {
630                TakanawaError::HttpProtocol("416 response missing Content-Range".to_owned())
631            })?
632            .to_str()
633            .map_err(|err| {
634                TakanawaError::HttpProtocol(format!("invalid Content-Range header: {err}"))
635            })
636            .and_then(parse_unsatisfied_total)?;
637        if total == 0 {
638            return Ok(RemoteInfo {
639                content_len: 0,
640                etag: header_to_string(response.headers(), ETAG)?,
641                last_modified: header_to_string(response.headers(), LAST_MODIFIED)?,
642            });
643        }
644        return Err(TakanawaError::HttpProtocol(format!(
645            "probe range was unsatisfied for non-empty resource length {total}"
646        )));
647    }
648
649    validate_status(response.status())?;
650    validate_identity(response.headers())?;
651    let range = response_content_range(&response, 0, 0)?;
652    let content_len = response_content_length(&response)?;
653    if content_len != 1 {
654        return Err(TakanawaError::HttpProtocol(format!(
655            "probe Content-Length mismatch: expected 1, got {content_len}"
656        )));
657    }
658    let headers = response.headers().clone();
659    let body = response.bytes().await.map_err(map_reqwest_error)?;
660    if body.len() != 1 {
661        return Err(TakanawaError::HttpProtocol(format!(
662            "probe body length mismatch: expected 1, got {}",
663            body.len()
664        )));
665    }
666
667    Ok(RemoteInfo {
668        content_len: range.total,
669        etag: header_to_string(&headers, ETAG)?,
670        last_modified: header_to_string(&headers, LAST_MODIFIED)?,
671    })
672}
673
674async fn fetch_chunk_once(
675    engine: &DownloadEngine,
676    url: &str,
677    chunk: Chunk,
678    state: &SharedState,
679    control: &Control,
680    bandwidth: &BandwidthLimiter,
681    writer_tx: &mpsc::Sender<WriterCommand>,
682) -> Result<FetchChunkStatus> {
683    let _permit = engine.limiter.acquire().await;
684    let _active_io = ActiveIoGuard::new(state.clone());
685    let response = engine
686        .client
687        .get(url)
688        .header(RANGE, format!("bytes={}-{}", chunk.start, chunk.end))
689        .header(ACCEPT_ENCODING, "identity")
690        .send()
691        .await
692        .map_err(map_reqwest_error)?;
693
694    validate_status(response.status())?;
695    validate_identity(response.headers())?;
696    let _range = response_content_range(&response, chunk.start, chunk.end)?;
697    let content_len = response_content_length(&response)?;
698    if content_len != chunk.len {
699        return Err(TakanawaError::HttpProtocol(format!(
700            "chunk {} Content-Length mismatch: expected {}, got {content_len}",
701            chunk.index, chunk.len
702        )));
703    }
704    stream_body_to_writer(response, chunk, control, bandwidth, writer_tx).await
705}
706
707async fn stream_body_to_writer(
708    response: reqwest::Response,
709    chunk: Chunk,
710    control: &Control,
711    bandwidth: &BandwidthLimiter,
712    writer_tx: &mpsc::Sender<WriterCommand>,
713) -> Result<FetchChunkStatus> {
714    let mut written = 0_u64;
715    let mut stream = response.bytes_stream();
716    while let Some(bytes) = stream.next().await {
717        if control.cancel.load(Ordering::Relaxed) {
718            return Err(TakanawaError::Cancelled);
719        }
720        if control.pause.load(Ordering::Relaxed) {
721            return Ok(FetchChunkStatus::Paused);
722        }
723        let bytes = bytes.map_err(map_reqwest_error)?;
724        if bytes.is_empty() {
725            continue;
726        }
727        let len = u64::try_from(bytes.len()).map_err(|_| {
728            TakanawaError::HttpProtocol(format!(
729                "chunk {} body fragment length does not fit in file offsets",
730                chunk.index
731            ))
732        })?;
733        let next_written = written.checked_add(len).ok_or_else(|| {
734            TakanawaError::HttpProtocol(format!("chunk {} body length overflow", chunk.index))
735        })?;
736        if next_written > chunk.len {
737            return Err(TakanawaError::HttpProtocol(format!(
738                "chunk {} body length exceeded expected {} bytes",
739                chunk.index, chunk.len
740            )));
741        }
742        bandwidth.throttle(bytes.len()).await;
743        send_writer_write(writer_tx, chunk.index, written, bytes).await?;
744        written = next_written;
745    }
746
747    if written != chunk.len {
748        return Err(TakanawaError::HttpProtocol(format!(
749            "chunk {} body length mismatch: expected {}, got {}",
750            chunk.index, chunk.len, written
751        )));
752    }
753    Ok(FetchChunkStatus::Complete)
754}
755
756async fn send_writer_write(
757    writer_tx: &mpsc::Sender<WriterCommand>,
758    index: u64,
759    chunk_offset: u64,
760    bytes: Bytes,
761) -> Result<()> {
762    writer_tx
763        .send(WriterCommand::Write {
764            index,
765            chunk_offset,
766            bytes,
767        })
768        .await
769        .map_err(|_| TakanawaError::Ffi("part writer stopped before write".to_owned()))
770}
771
772async fn commit_written_chunk(
773    writer_tx: &mpsc::Sender<WriterCommand>,
774    index: u64,
775) -> Result<PartMetadata> {
776    let (result_tx, result_rx) = oneshot::channel();
777    writer_tx
778        .send(WriterCommand::Commit {
779            index,
780            result: result_tx,
781        })
782        .await
783        .map_err(|_| TakanawaError::Ffi("part writer stopped before commit".to_owned()))?;
784    result_rx
785        .await
786        .map_err(|_| TakanawaError::Ffi("part writer stopped during commit".to_owned()))?
787}
788
789async fn with_total_timeout<T>(
790    timeout: Duration,
791    future: impl Future<Output = Result<T>>,
792) -> Result<T> {
793    if timeout.is_zero() {
794        return future.await;
795    }
796    tokio::time::timeout(timeout, future).await.map_err(|_| {
797        TakanawaError::Network(format!("request exceeded {} ms", timeout.as_millis()))
798    })?
799}
800
801#[derive(Debug)]
802struct BandwidthLimiter {
803    bytes_per_second: u64,
804    next_available: Mutex<Instant>,
805}
806
807impl BandwidthLimiter {
808    fn new(bytes_per_second: u64) -> Self {
809        Self {
810            bytes_per_second,
811            next_available: Mutex::new(Instant::now()),
812        }
813    }
814
815    async fn throttle(&self, bytes: usize) {
816        if self.bytes_per_second == 0 || bytes == 0 {
817            return;
818        }
819
820        let now = Instant::now();
821        let wait_until = {
822            let mut next_available = self
823                .next_available
824                .lock()
825                .expect("bandwidth limiter mutex poisoned");
826            let start = (*next_available).max(now);
827            let nanos = (bytes as u128)
828                .saturating_mul(1_000_000_000)
829                .div_ceil(u128::from(self.bytes_per_second));
830            let duration = Duration::from_nanos(u64::try_from(nanos).unwrap_or(u64::MAX));
831            *next_available = start + duration;
832            start
833        };
834
835        if wait_until > now {
836            tokio::time::sleep_until(tokio::time::Instant::from_std(wait_until)).await;
837        }
838    }
839}
840
841fn validate_status(status: StatusCode) -> Result<()> {
842    if status == StatusCode::PARTIAL_CONTENT {
843        return Ok(());
844    }
845    if status == StatusCode::REQUEST_TIMEOUT
846        || status == StatusCode::TOO_MANY_REQUESTS
847        || status.is_server_error()
848    {
849        return Err(TakanawaError::RetryableHttpStatus(status.as_u16()));
850    }
851    Err(TakanawaError::HttpProtocol(format!(
852        "expected 206 Partial Content, got {status}"
853    )))
854}
855
856fn validate_identity(headers: &HeaderMap) -> Result<()> {
857    if let Some(value) = headers.get(CONTENT_ENCODING) {
858        let value = value.to_str().map_err(|err| {
859            TakanawaError::HttpProtocol(format!("invalid Content-Encoding: {err}"))
860        })?;
861        if !value.eq_ignore_ascii_case("identity") {
862            return Err(TakanawaError::HttpProtocol(format!(
863                "unexpected Content-Encoding {value}"
864            )));
865        }
866    }
867    Ok(())
868}
869
870fn response_content_range(
871    response: &reqwest::Response,
872    start: u64,
873    end: u64,
874) -> Result<crate::content_range::ContentRange> {
875    let value = response
876        .headers()
877        .get(CONTENT_RANGE)
878        .ok_or_else(|| TakanawaError::HttpProtocol("missing Content-Range".to_owned()))?
879        .to_str()
880        .map_err(|err| {
881            TakanawaError::HttpProtocol(format!("invalid Content-Range header: {err}"))
882        })?;
883    let range = parse_content_range(value)?;
884    if range.start != start || range.end != end {
885        return Err(TakanawaError::HttpProtocol(format!(
886            "Content-Range mismatch: expected {start}-{end}, got {}-{}",
887            range.start, range.end
888        )));
889    }
890    Ok(range)
891}
892
893fn response_content_length(response: &reqwest::Response) -> Result<u64> {
894    response
895        .headers()
896        .get(CONTENT_LENGTH)
897        .ok_or_else(|| TakanawaError::HttpProtocol("missing Content-Length".to_owned()))?
898        .to_str()
899        .map_err(|err| {
900            TakanawaError::HttpProtocol(format!("invalid Content-Length header: {err}"))
901        })?
902        .parse::<u64>()
903        .map_err(|err| TakanawaError::HttpProtocol(format!("invalid Content-Length: {err}")))
904}
905
906fn header_to_string(
907    headers: &HeaderMap,
908    name: reqwest::header::HeaderName,
909) -> Result<Option<String>> {
910    headers
911        .get(name)
912        .map(|value| {
913            value.to_str().map(str::to_owned).map_err(|err| {
914                TakanawaError::HttpProtocol(format!("invalid response header: {err}"))
915            })
916        })
917        .transpose()
918}
919
920#[allow(clippy::needless_pass_by_value)]
921fn map_reqwest_error(err: reqwest::Error) -> TakanawaError {
922    if err.is_timeout() || err.is_connect() || err.is_request() || err.is_body() || err.is_decode()
923    {
924        TakanawaError::Network(err.to_string())
925    } else {
926        TakanawaError::HttpProtocol(err.to_string())
927    }
928}
929
930struct ActiveIoGuard {
931    state: SharedState,
932}
933
934impl ActiveIoGuard {
935    fn new(state: SharedState) -> Self {
936        state.increment_active_io();
937        Self { state }
938    }
939}
940
941impl Drop for ActiveIoGuard {
942    fn drop(&mut self) {
943        self.state.decrement_active_io();
944    }
945}
946
947#[cfg(test)]
948mod tests {
949    use std::io::{Read, Write};
950    use std::net::{SocketAddr, TcpListener};
951    use std::sync::Arc;
952    use std::thread;
953    use std::time::Duration;
954
955    use sha2::{Digest, Sha256};
956    use tempfile::TempDir;
957
958    use super::*;
959    use crate::DownloadPhase;
960    use crate::limiter::DEFAULT_MAX_IO;
961
962    #[tokio::test]
963    async fn downloads_file_with_ranges() {
964        let data = Arc::new(b"abcdefghijklmnopqrstuvwxyz".to_vec());
965        let addr = spawn_range_server(Arc::clone(&data), false);
966        let dir = TempDir::new().unwrap();
967        let target = dir.path().join("out.bin");
968        let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
969        let config = DownloadConfig {
970            url: format!("http://{addr}/file"),
971            target_path: target.clone(),
972            chunk_size: 5,
973            parallelism: 2,
974            max_parallel_chunks: 0,
975            retry: RetryConfig::default(),
976            timeout: TimeoutConfig::default(),
977            bytes_per_second_limit: 0,
978            hash: HashConfig::None,
979        };
980
981        let snapshot = download_to_completion(engine, config).await.unwrap();
982
983        assert_eq!(snapshot.phase, DownloadPhase::Completed);
984        assert_eq!(std::fs::read(target).unwrap(), data.as_slice());
985    }
986
987    #[tokio::test]
988    async fn rejects_ignored_range() {
989        let data = Arc::new(b"abcdef".to_vec());
990        let addr = spawn_range_server(Arc::clone(&data), true);
991        let dir = TempDir::new().unwrap();
992        let target = dir.path().join("out.bin");
993        let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
994        let config = DownloadConfig {
995            url: format!("http://{addr}/file"),
996            target_path: target,
997            chunk_size: 3,
998            parallelism: 1,
999            max_parallel_chunks: 0,
1000            retry: RetryConfig::default(),
1001            timeout: TimeoutConfig::default(),
1002            bytes_per_second_limit: 0,
1003            hash: HashConfig::None,
1004        };
1005
1006        let err = download_to_completion(engine, config).await.unwrap_err();
1007
1008        assert!(matches!(err, TakanawaError::HttpProtocol(_)));
1009    }
1010
1011    #[tokio::test]
1012    async fn resumes_from_existing_part() {
1013        let data = Arc::new(b"abcdefghijklmnopqrstuvwxyz".to_vec());
1014        let addr = spawn_range_server(Arc::clone(&data), false);
1015        let dir = TempDir::new().unwrap();
1016        let target = dir.path().join("out.bin");
1017        let remote = RemoteInfo {
1018            content_len: data.len() as u64,
1019            etag: None,
1020            last_modified: None,
1021        };
1022        let mut part = PartFile::open_or_create(
1023            &target,
1024            &format!("http://{addr}/file"),
1025            &remote,
1026            5,
1027            HashConfig::None,
1028        )
1029        .unwrap();
1030        part.write_chunk(0, &data[..5]).unwrap();
1031        drop(part);
1032
1033        let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1034        let config = DownloadConfig {
1035            url: format!("http://{addr}/file"),
1036            target_path: target.clone(),
1037            chunk_size: 5,
1038            parallelism: 2,
1039            max_parallel_chunks: 0,
1040            retry: RetryConfig::default(),
1041            timeout: TimeoutConfig::default(),
1042            bytes_per_second_limit: 0,
1043            hash: HashConfig::None,
1044        };
1045
1046        let snapshot = download_to_completion(engine, config).await.unwrap();
1047
1048        assert_eq!(snapshot.phase, DownloadPhase::Completed);
1049        assert_eq!(std::fs::read(target).unwrap(), data.as_slice());
1050    }
1051
1052    #[tokio::test]
1053    async fn retries_after_partial_stream_write() {
1054        let data = Arc::new(b"abcdefghij".to_vec());
1055        let addr = spawn_truncated_once_server(Arc::clone(&data), 2);
1056        let dir = TempDir::new().unwrap();
1057        let target = dir.path().join("out.bin");
1058        let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1059        let config = DownloadConfig {
1060            url: format!("http://{addr}/file"),
1061            target_path: target.clone(),
1062            chunk_size: 5,
1063            parallelism: 1,
1064            max_parallel_chunks: 0,
1065            retry: RetryConfig {
1066                max_retries: 1,
1067                backoff_initial: Duration::from_millis(1),
1068                backoff_max: Duration::from_millis(1),
1069            },
1070            timeout: TimeoutConfig::default(),
1071            bytes_per_second_limit: 0,
1072            hash: HashConfig::None,
1073        };
1074
1075        let snapshot = download_to_completion(engine, config).await.unwrap();
1076
1077        assert_eq!(snapshot.phase, DownloadPhase::Completed);
1078        assert_eq!(std::fs::read(target).unwrap(), data.as_slice());
1079    }
1080
1081    #[test]
1082    fn pause_discards_in_flight_chunk() {
1083        let data = Arc::new(b"abcdefghij".to_vec());
1084        let addr = spawn_delayed_chunk_server(Arc::clone(&data), Duration::from_millis(300));
1085        let dir = TempDir::new().unwrap();
1086        let target = dir.path().join("out.bin");
1087        let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1088        let runtime = Runtime::new().unwrap();
1089        let download = DownloadHandle::new(
1090            engine,
1091            DownloadConfig {
1092                url: format!("http://{addr}/file"),
1093                target_path: target,
1094                chunk_size: 5,
1095                parallelism: 1,
1096                max_parallel_chunks: 0,
1097                retry: RetryConfig::default(),
1098                timeout: TimeoutConfig::default(),
1099                bytes_per_second_limit: 0,
1100                hash: HashConfig::None,
1101            },
1102        );
1103
1104        download.start_on(&runtime).unwrap();
1105        thread::sleep(Duration::from_millis(100));
1106        download.pause().unwrap();
1107
1108        let snapshot = wait_for_phase(&download, DownloadPhase::Paused);
1109
1110        assert_eq!(snapshot.completed_chunks, 0);
1111        assert_eq!(snapshot.downloaded_bytes, 0);
1112    }
1113
1114    #[test]
1115    fn pause_mid_stream_discards_uncommitted_bytes_and_resume_completes() {
1116        let data = Arc::new(b"abcdefghij".to_vec());
1117        let addr = spawn_split_body_server(Arc::clone(&data), 2, Duration::from_millis(300));
1118        let dir = TempDir::new().unwrap();
1119        let target = dir.path().join("out.bin");
1120        let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1121        let runtime = Runtime::new().unwrap();
1122        let download = DownloadHandle::new(
1123            engine,
1124            DownloadConfig {
1125                url: format!("http://{addr}/file"),
1126                target_path: target.clone(),
1127                chunk_size: 5,
1128                parallelism: 1,
1129                max_parallel_chunks: 0,
1130                retry: RetryConfig::default(),
1131                timeout: TimeoutConfig::default(),
1132                bytes_per_second_limit: 0,
1133                hash: HashConfig::None,
1134            },
1135        );
1136
1137        download.start_on(&runtime).unwrap();
1138        thread::sleep(Duration::from_millis(100));
1139        download.pause().unwrap();
1140
1141        let snapshot = wait_for_phase(&download, DownloadPhase::Paused);
1142
1143        assert_eq!(snapshot.completed_chunks, 0);
1144        assert_eq!(snapshot.downloaded_bytes, 0);
1145
1146        download.start_on(&runtime).unwrap();
1147        let snapshot = wait_for_phase(&download, DownloadPhase::Completed);
1148
1149        assert_eq!(snapshot.phase, DownloadPhase::Completed);
1150        assert_eq!(std::fs::read(target).unwrap(), data.as_slice());
1151    }
1152
1153    #[tokio::test]
1154    async fn rejects_existing_target() {
1155        let data = Arc::new(b"abcdef".to_vec());
1156        let addr = spawn_range_server(Arc::clone(&data), false);
1157        let dir = TempDir::new().unwrap();
1158        let target = dir.path().join("out.bin");
1159        std::fs::write(&target, b"already here").unwrap();
1160        let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1161        let config = DownloadConfig {
1162            url: format!("http://{addr}/file"),
1163            target_path: target,
1164            chunk_size: 3,
1165            parallelism: 1,
1166            max_parallel_chunks: 0,
1167            retry: RetryConfig::default(),
1168            timeout: TimeoutConfig::default(),
1169            bytes_per_second_limit: 0,
1170            hash: HashConfig::None,
1171        };
1172
1173        let err = download_to_completion(engine, config).await.unwrap_err();
1174
1175        assert!(matches!(err, TakanawaError::TargetExists(_)));
1176    }
1177
1178    #[tokio::test]
1179    async fn verifies_sha256_before_finalize() {
1180        let data = Arc::new(b"abcdef".to_vec());
1181        let addr = spawn_range_server(Arc::clone(&data), false);
1182        let dir = TempDir::new().unwrap();
1183        let target = dir.path().join("out.bin");
1184        let expected: [u8; 32] = Sha256::digest(data.as_slice()).into();
1185        let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1186        let config = DownloadConfig {
1187            url: format!("http://{addr}/file"),
1188            target_path: target.clone(),
1189            chunk_size: 3,
1190            parallelism: 1,
1191            max_parallel_chunks: 0,
1192            retry: RetryConfig::default(),
1193            timeout: TimeoutConfig::default(),
1194            bytes_per_second_limit: 0,
1195            hash: HashConfig::Sha256(expected),
1196        };
1197
1198        download_to_completion(engine, config).await.unwrap();
1199
1200        assert_eq!(std::fs::read(target).unwrap(), data.as_slice());
1201    }
1202
1203    #[tokio::test]
1204    async fn total_timeout_aborts_slow_chunk() {
1205        let data = Arc::new(b"abcdef".to_vec());
1206        let addr = spawn_delayed_chunk_server(Arc::clone(&data), Duration::from_millis(300));
1207        let dir = TempDir::new().unwrap();
1208        let target = dir.path().join("out.bin");
1209        let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1210        let config = DownloadConfig {
1211            url: format!("http://{addr}/file"),
1212            target_path: target,
1213            chunk_size: 3,
1214            parallelism: 1,
1215            max_parallel_chunks: 0,
1216            retry: RetryConfig {
1217                max_retries: 0,
1218                backoff_initial: Duration::from_millis(1),
1219                backoff_max: Duration::from_millis(1),
1220            },
1221            timeout: TimeoutConfig {
1222                connect: Duration::from_secs(30),
1223                read: Duration::ZERO,
1224                total: Duration::from_millis(50),
1225            },
1226            bytes_per_second_limit: 0,
1227            hash: HashConfig::None,
1228        };
1229
1230        let err = download_to_completion(engine, config).await.unwrap_err();
1231
1232        assert!(matches!(err, TakanawaError::Network(_)));
1233    }
1234
1235    fn spawn_range_server(data: Arc<Vec<u8>>, ignore_range: bool) -> SocketAddr {
1236        spawn_range_server_with_chunk_delay(data, ignore_range, None)
1237    }
1238
1239    fn spawn_delayed_chunk_server(data: Arc<Vec<u8>>, delay: Duration) -> SocketAddr {
1240        spawn_range_server_with_chunk_delay(data, false, Some(delay))
1241    }
1242
1243    fn spawn_split_body_server(
1244        data: Arc<Vec<u8>>,
1245        first_body_bytes: usize,
1246        delay: Duration,
1247    ) -> SocketAddr {
1248        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1249        let addr = listener.local_addr().unwrap();
1250        thread::spawn(move || {
1251            for stream in listener.incoming().flatten() {
1252                let data = Arc::clone(&data);
1253                thread::spawn(move || {
1254                    handle_split_body_connection(stream, &data, first_body_bytes, delay);
1255                });
1256            }
1257        });
1258        addr
1259    }
1260
1261    fn spawn_truncated_once_server(
1262        data: Arc<Vec<u8>>,
1263        body_bytes_before_close: usize,
1264    ) -> SocketAddr {
1265        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1266        let addr = listener.local_addr().unwrap();
1267        let truncated = Arc::new(std::sync::atomic::AtomicBool::new(false));
1268        thread::spawn(move || {
1269            for stream in listener.incoming().flatten() {
1270                let data = Arc::clone(&data);
1271                let truncated = Arc::clone(&truncated);
1272                thread::spawn(move || {
1273                    handle_truncated_once_connection(
1274                        stream,
1275                        &data,
1276                        body_bytes_before_close,
1277                        &truncated,
1278                    );
1279                });
1280            }
1281        });
1282        addr
1283    }
1284
1285    fn spawn_range_server_with_chunk_delay(
1286        data: Arc<Vec<u8>>,
1287        ignore_range: bool,
1288        chunk_delay: Option<Duration>,
1289    ) -> SocketAddr {
1290        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1291        let addr = listener.local_addr().unwrap();
1292        thread::spawn(move || {
1293            for stream in listener.incoming().flatten() {
1294                let data = Arc::clone(&data);
1295                thread::spawn(move || handle_connection(stream, &data, ignore_range, chunk_delay));
1296            }
1297        });
1298        addr
1299    }
1300
1301    fn handle_connection(
1302        mut stream: std::net::TcpStream,
1303        data: &[u8],
1304        ignore_range: bool,
1305        chunk_delay: Option<Duration>,
1306    ) {
1307        let mut buffer = [0; 4096];
1308        let read = stream.read(&mut buffer).unwrap_or(0);
1309        let request = String::from_utf8_lossy(&buffer[..read]);
1310        let range = request_range(&request);
1311
1312        if ignore_range {
1313            let response = format!("HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", data.len());
1314            stream.write_all(response.as_bytes()).unwrap();
1315            stream.write_all(data).unwrap();
1316            return;
1317        }
1318
1319        let Some((start, end)) = range else {
1320            stream
1321                .write_all(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n")
1322                .unwrap();
1323            return;
1324        };
1325        if start >= data.len() {
1326            let response = format!(
1327                "HTTP/1.1 416 Range Not Satisfiable\r\nContent-Range: bytes */{}\r\nContent-Length: 0\r\n\r\n",
1328                data.len()
1329            );
1330            stream.write_all(response.as_bytes()).unwrap();
1331            return;
1332        }
1333        let end = end.min(data.len() - 1);
1334        let body = &data[start..=end];
1335        if let Some(delay) = chunk_delay {
1336            if !(start == 0 && end == 0) {
1337                thread::sleep(delay);
1338            }
1339        }
1340        let response = format!(
1341            "HTTP/1.1 206 Partial Content\r\nContent-Range: bytes {start}-{end}/{}\r\nContent-Length: {}\r\nAccept-Ranges: bytes\r\n\r\n",
1342            data.len(),
1343            body.len()
1344        );
1345        stream.write_all(response.as_bytes()).unwrap();
1346        stream.write_all(body).unwrap();
1347    }
1348
1349    fn handle_split_body_connection(
1350        mut stream: std::net::TcpStream,
1351        data: &[u8],
1352        first_body_bytes: usize,
1353        delay: Duration,
1354    ) {
1355        let Some((start, end)) = read_request_range(&mut stream) else {
1356            let _ = stream.write_all(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n");
1357            return;
1358        };
1359        let Some(body) = range_body(data, start, end, &mut stream) else {
1360            return;
1361        };
1362        let response = format!(
1363            "HTTP/1.1 206 Partial Content\r\nContent-Range: bytes {start}-{}/{}\r\nContent-Length: {}\r\nAccept-Ranges: bytes\r\n\r\n",
1364            start + body.len() - 1,
1365            data.len(),
1366            body.len()
1367        );
1368        let _ = stream.write_all(response.as_bytes());
1369        if body.len() <= 1 {
1370            let _ = stream.write_all(body);
1371            return;
1372        }
1373        let split_at = first_body_bytes.clamp(1, body.len());
1374        let _ = stream.write_all(&body[..split_at]);
1375        let _ = stream.flush();
1376        thread::sleep(delay);
1377        let _ = stream.write_all(&body[split_at..]);
1378    }
1379
1380    fn handle_truncated_once_connection(
1381        mut stream: std::net::TcpStream,
1382        data: &[u8],
1383        body_bytes_before_close: usize,
1384        truncated: &std::sync::atomic::AtomicBool,
1385    ) {
1386        let Some((start, end)) = read_request_range(&mut stream) else {
1387            let _ = stream.write_all(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n");
1388            return;
1389        };
1390        let Some(body) = range_body(data, start, end, &mut stream) else {
1391            return;
1392        };
1393        let response = format!(
1394            "HTTP/1.1 206 Partial Content\r\nContent-Range: bytes {start}-{}/{}\r\nContent-Length: {}\r\nAccept-Ranges: bytes\r\n\r\n",
1395            start + body.len() - 1,
1396            data.len(),
1397            body.len()
1398        );
1399        let _ = stream.write_all(response.as_bytes());
1400        if body.len() > 1 && !truncated.swap(true, std::sync::atomic::Ordering::SeqCst) {
1401            let end = body_bytes_before_close.min(body.len());
1402            let _ = stream.write_all(&body[..end]);
1403            return;
1404        }
1405        let _ = stream.write_all(body);
1406    }
1407
1408    fn read_request_range(stream: &mut std::net::TcpStream) -> Option<(usize, usize)> {
1409        let mut buffer = [0; 4096];
1410        let read = stream.read(&mut buffer).ok()?;
1411        let request = String::from_utf8_lossy(&buffer[..read]);
1412        request_range(&request)
1413    }
1414
1415    fn request_range(request: &str) -> Option<(usize, usize)> {
1416        let range = request.lines().find_map(|line| {
1417            let (name, value) = line.split_once(':')?;
1418            if name.eq_ignore_ascii_case("range") {
1419                value.trim().strip_prefix("bytes=")
1420            } else {
1421                None
1422            }
1423        })?;
1424        let (start, end) = range.split_once('-')?;
1425        Some((start.parse().ok()?, end.parse().ok()?))
1426    }
1427
1428    fn range_body<'a>(
1429        data: &'a [u8],
1430        start: usize,
1431        end: usize,
1432        stream: &mut std::net::TcpStream,
1433    ) -> Option<&'a [u8]> {
1434        if start >= data.len() {
1435            let response = format!(
1436                "HTTP/1.1 416 Range Not Satisfiable\r\nContent-Range: bytes */{}\r\nContent-Length: 0\r\n\r\n",
1437                data.len()
1438            );
1439            let _ = stream.write_all(response.as_bytes());
1440            return None;
1441        }
1442        let end = end.min(data.len() - 1);
1443        Some(&data[start..=end])
1444    }
1445
1446    fn wait_for_phase(download: &DownloadHandle, phase: DownloadPhase) -> DownloadSnapshot {
1447        for _ in 0..100 {
1448            let snapshot = download.snapshot();
1449            if snapshot.phase == phase {
1450                return snapshot;
1451            }
1452            thread::sleep(Duration::from_millis(20));
1453        }
1454        download.snapshot()
1455    }
1456}