1use std::collections::VecDeque;
2use std::future::Future;
3use std::path::PathBuf;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use bytes::Bytes;
9use futures_util::StreamExt;
10use reqwest::header::{
11 ACCEPT_ENCODING, CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_RANGE, ETAG, HeaderMap,
12 LAST_MODIFIED, RANGE,
13};
14use reqwest::{Client, StatusCode};
15use takanawa_core::{
16 Chunk, ChunkPlan, DEFAULT_CHUNK_SIZE, HashConfig, PartFile, PartMetadata, RemoteInfo, Result,
17 TakanawaError,
18};
19use tokio::runtime::Runtime;
20use tokio::sync::{mpsc, oneshot};
21use tokio::task::JoinSet;
22
23use crate::content_range::{parse_content_range, parse_unsatisfied_total};
24use crate::limiter::IoLimiter;
25use crate::state::{DownloadSnapshot, SharedState};
26
27const DEFAULT_MAX_RETRIES: u32 = 4;
28const DEFAULT_BACKOFF_INITIAL: Duration = Duration::from_millis(100);
29const DEFAULT_BACKOFF_MAX: Duration = Duration::from_secs(3);
30const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(30);
31const WRITE_QUEUE_DEPTH_PER_CHUNK: usize = 8;
32
33#[derive(Debug, Clone)]
34pub struct DownloadConfig {
35 pub url: String,
36 pub target_path: PathBuf,
37 pub chunk_size: u64,
38 pub parallelism: usize,
39 pub max_parallel_chunks: usize,
40 pub retry: RetryConfig,
41 pub timeout: TimeoutConfig,
42 pub bytes_per_second_limit: u64,
43 pub hash: HashConfig,
44}
45
46impl DownloadConfig {
47 #[must_use]
48 pub fn normalized(mut self) -> Self {
49 if self.chunk_size == 0 {
50 self.chunk_size = DEFAULT_CHUNK_SIZE;
51 }
52 self.retry = self.retry.normalized();
53 self.timeout = self.timeout.normalized();
54 self
55 }
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub struct RetryConfig {
60 pub max_retries: u32,
61 pub backoff_initial: Duration,
62 pub backoff_max: Duration,
63}
64
65impl Default for RetryConfig {
66 fn default() -> Self {
67 Self {
68 max_retries: DEFAULT_MAX_RETRIES,
69 backoff_initial: DEFAULT_BACKOFF_INITIAL,
70 backoff_max: DEFAULT_BACKOFF_MAX,
71 }
72 }
73}
74
75impl RetryConfig {
76 #[must_use]
77 pub fn normalized(self) -> Self {
78 let default = Self::default();
79 let backoff_initial = if self.backoff_initial.is_zero() {
80 default.backoff_initial
81 } else {
82 self.backoff_initial
83 };
84 let backoff_max = if self.backoff_max.is_zero() {
85 default.backoff_max
86 } else {
87 self.backoff_max.max(backoff_initial)
88 };
89 Self {
90 max_retries: self.max_retries,
91 backoff_initial,
92 backoff_max,
93 }
94 }
95
96 fn max_attempts(self) -> u32 {
97 self.max_retries.saturating_add(1).max(1)
98 }
99}
100
101#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
102pub struct TimeoutConfig {
103 pub connect: Duration,
104 pub read: Duration,
105 pub total: Duration,
106}
107
108impl TimeoutConfig {
109 #[must_use]
110 pub fn normalized(self) -> Self {
111 Self {
112 connect: if self.connect.is_zero() {
113 DEFAULT_CONNECT_TIMEOUT
114 } else {
115 self.connect
116 },
117 read: self.read,
118 total: self.total,
119 }
120 }
121}
122
123#[derive(Debug, Clone)]
124pub struct DownloadEngine {
125 client: Client,
126 limiter: IoLimiter,
127}
128
129impl DownloadEngine {
130 pub fn new(max_io: usize) -> Result<Self> {
131 let client = build_client(TimeoutConfig::default().normalized())?;
132 Ok(Self {
133 client,
134 limiter: IoLimiter::new(max_io.max(1)),
135 })
136 }
137
138 #[must_use]
139 pub fn max_io(&self) -> usize {
140 self.limiter.max()
141 }
142
143 pub fn set_max_io(&self, max_io: usize) {
144 self.limiter.set_max(max_io);
145 }
146
147 fn default_parallelism(&self) -> usize {
148 self.max_io().clamp(1, 4)
149 }
150
151 fn with_timeout(&self, timeout: TimeoutConfig) -> Result<Self> {
152 Ok(Self {
153 client: build_client(timeout)?,
154 limiter: self.limiter.clone(),
155 })
156 }
157}
158
159fn client_builder() -> reqwest::ClientBuilder {
160 let builder = Client::builder();
161 #[cfg(feature = "tls-rustls")]
162 {
163 let roots = rustls::RootCertStore {
164 roots: webpki_roots::TLS_SERVER_ROOTS.to_vec(),
165 };
166 let tls_config = rustls::ClientConfig::builder()
167 .with_root_certificates(roots)
168 .with_no_client_auth();
169 return builder.tls_backend_preconfigured(tls_config);
170 }
171 #[allow(unreachable_code)]
172 builder
173}
174
175fn build_client(timeout: TimeoutConfig) -> Result<Client> {
176 let mut builder = client_builder()
177 .user_agent("takanawa/0.1")
178 .connect_timeout(timeout.connect);
179 if !timeout.read.is_zero() {
180 builder = builder.read_timeout(timeout.read);
181 }
182 builder
183 .build()
184 .map_err(|err| TakanawaError::InvalidConfig(format!("failed to build HTTP client: {err}")))
185}
186
187#[derive(Debug)]
188pub struct DownloadHandle {
189 engine: DownloadEngine,
190 config: DownloadConfig,
191 state: SharedState,
192 control: Arc<Control>,
193 join: Mutex<Option<tokio::task::JoinHandle<()>>>,
194}
195
196#[derive(Debug, Default)]
197struct Control {
198 pause: AtomicBool,
199 cancel: AtomicBool,
200}
201
202impl DownloadHandle {
203 #[must_use]
204 pub fn new(engine: DownloadEngine, config: DownloadConfig) -> Self {
205 Self {
206 engine,
207 config: config.normalized(),
208 state: SharedState::new(),
209 control: Arc::new(Control::default()),
210 join: Mutex::new(None),
211 }
212 }
213
214 pub fn start_on(&self, runtime: &Runtime) -> Result<()> {
215 let mut join = self.join.lock().expect("download join mutex poisoned");
216 if join.as_ref().is_some_and(|handle| !handle.is_finished()) {
217 return Err(TakanawaError::AlreadyStarted);
218 }
219 if join
220 .as_ref()
221 .is_some_and(tokio::task::JoinHandle::is_finished)
222 {
223 let _ = join.take();
224 }
225
226 self.control.pause.store(false, Ordering::Relaxed);
227 self.control.cancel.store(false, Ordering::Relaxed);
228 self.state.clear_error();
229 self.state.mark_running();
230
231 let engine = self.engine.clone();
232 let config = self.config.clone();
233 let state = self.state.clone();
234 let control = Arc::clone(&self.control);
235 *join = Some(runtime.spawn(async move {
236 if let Err(err) = run_download(engine, config, state.clone(), control).await {
237 match err {
238 TakanawaError::Cancelled => state.mark_cancelled(),
239 err => state.mark_failed(err.to_string()),
240 }
241 }
242 }));
243 Ok(())
244 }
245
246 pub fn pause(&self) -> Result<()> {
247 self.control.pause.store(true, Ordering::Relaxed);
248 self.state.request_pause();
249 Ok(())
250 }
251
252 pub fn cancel(&self) -> Result<()> {
253 self.control.cancel.store(true, Ordering::Relaxed);
254 self.state.request_cancel();
255 if self
256 .join
257 .lock()
258 .expect("download join mutex poisoned")
259 .as_ref()
260 .is_none_or(tokio::task::JoinHandle::is_finished)
261 {
262 self.state.mark_cancelled();
263 }
264 Ok(())
265 }
266
267 #[must_use]
268 pub fn snapshot(&self) -> DownloadSnapshot {
269 self.state.snapshot()
270 }
271
272 #[must_use]
273 pub fn bitmap(&self) -> Vec<u8> {
274 self.state.bitmap()
275 }
276}
277
278impl Drop for DownloadHandle {
279 fn drop(&mut self) {
280 self.control.cancel.store(true, Ordering::Relaxed);
281 if let Some(join) = self
282 .join
283 .lock()
284 .expect("download join mutex poisoned")
285 .take()
286 {
287 join.abort();
288 }
289 }
290}
291
292pub async fn download_to_completion(
293 engine: DownloadEngine,
294 config: DownloadConfig,
295) -> Result<DownloadSnapshot> {
296 let state = SharedState::new();
297 let control = Arc::new(Control::default());
298 run_download(engine, config.normalized(), state.clone(), control).await?;
299 Ok(state.snapshot())
300}
301
302#[allow(clippy::too_many_lines)]
303async fn run_download(
304 engine: DownloadEngine,
305 config: DownloadConfig,
306 state: SharedState,
307 control: Arc<Control>,
308) -> Result<()> {
309 let config = config.normalized();
310 let engine = engine.with_timeout(config.timeout)?;
311 let bandwidth = Arc::new(BandwidthLimiter::new(config.bytes_per_second_limit));
312 state.mark_running();
313 let remote = probe_with_retry(&engine, &config, &state, &control).await?;
314 let chunk_plan = ChunkPlan::new(remote.content_len, config.chunk_size)?;
315 let target_path = config.target_path.clone();
316 let url = config.url.clone();
317 let hash = config.hash;
318 let chunk_size = config.chunk_size;
319
320 let part = tokio::task::spawn_blocking(move || {
321 PartFile::open_or_create(&target_path, &url, &remote, chunk_size, hash)
322 })
323 .await
324 .map_err(|err| TakanawaError::Ffi(format!("part open task failed: {err}")))??;
325 state.update_from_metadata(part.metadata());
326
327 if part.metadata().all_complete() {
328 finalize_part(part, &config, &state).await?;
329 return Ok(());
330 }
331
332 let mut pending: VecDeque<u64> = part.incomplete_chunks().into();
333 let requested_parallelism = if config.max_parallel_chunks == 0 {
334 config.parallelism
335 } else {
336 config.max_parallel_chunks
337 };
338 let parallelism = if requested_parallelism == 0 {
339 engine.default_parallelism()
340 } else {
341 requested_parallelism.max(1)
342 };
343 let writer_capacity = parallelism
344 .max(1)
345 .saturating_mul(WRITE_QUEUE_DEPTH_PER_CHUNK);
346 let (writer_tx, writer_join) = spawn_part_writer(part, writer_capacity);
347 let mut tasks = JoinSet::new();
348
349 loop {
350 if control.cancel.load(Ordering::Relaxed) {
351 tasks.shutdown().await;
352 let _part = finish_part_writer(writer_tx, writer_join).await?;
353 state.mark_cancelled();
354 return Err(TakanawaError::Cancelled);
355 }
356 if control.pause.load(Ordering::Relaxed) {
357 tasks.shutdown().await;
358 let _part = finish_part_writer(writer_tx, writer_join).await?;
359 state.mark_paused();
360 return Ok(());
361 }
362
363 while !control.pause.load(Ordering::Relaxed)
364 && !control.cancel.load(Ordering::Relaxed)
365 && tasks.len() < parallelism
366 {
367 let Some(index) = pending.pop_front() else {
368 break;
369 };
370 let chunk = chunk_plan.chunk(index)?;
371 let engine = engine.clone();
372 let config = config.clone();
373 let state = state.clone();
374 let control = Arc::clone(&control);
375 let bandwidth = Arc::clone(&bandwidth);
376 let writer_tx = writer_tx.clone();
377 tasks.spawn(async move {
378 let result = fetch_chunk_with_retry(
379 &engine, &config, chunk, &state, &control, &bandwidth, &writer_tx,
380 )
381 .await?;
382 Ok::<_, TakanawaError>(result)
383 });
384 }
385
386 if tasks.is_empty() {
387 break;
388 }
389
390 let Some(result) = tasks.join_next().await else {
391 break;
392 };
393 let task_result = match result {
394 Ok(Ok(task_result)) => task_result,
395 Ok(Err(err)) => {
396 tasks.shutdown().await;
397 let err = finish_part_writer_after_error(writer_tx, writer_join, err).await;
398 return Err(err);
399 }
400 Err(err) => {
401 tasks.shutdown().await;
402 let err = finish_part_writer_after_error(
403 writer_tx,
404 writer_join,
405 TakanawaError::Ffi(format!("download task failed: {err}")),
406 )
407 .await;
408 return Err(err);
409 }
410 };
411 match task_result {
412 ChunkTaskResult::Committed(metadata) => state.update_from_metadata(&metadata),
413 ChunkTaskResult::Paused => {
414 tasks.shutdown().await;
415 let _part = finish_part_writer(writer_tx, writer_join).await?;
416 state.mark_paused();
417 return Ok(());
418 }
419 }
420
421 if control.pause.load(Ordering::Relaxed) && tasks.is_empty() {
422 let _part = finish_part_writer(writer_tx, writer_join).await?;
423 state.mark_paused();
424 return Ok(());
425 }
426 }
427
428 let part = finish_part_writer(writer_tx, writer_join).await?;
429
430 if control.pause.load(Ordering::Relaxed) && !part.metadata().all_complete() {
431 state.mark_paused();
432 return Ok(());
433 }
434
435 finalize_part(part, &config, &state).await
436}
437
438enum ChunkTaskResult {
439 Committed(PartMetadata),
440 Paused,
441}
442
443enum FetchChunkStatus {
444 Complete,
445 Paused,
446}
447
448enum WriterCommand {
449 Write {
450 index: u64,
451 chunk_offset: u64,
452 bytes: Bytes,
453 },
454 Commit {
455 index: u64,
456 result: oneshot::Sender<Result<PartMetadata>>,
457 },
458}
459
460fn spawn_part_writer(
461 part: PartFile,
462 capacity: usize,
463) -> (
464 mpsc::Sender<WriterCommand>,
465 tokio::task::JoinHandle<Result<PartFile>>,
466) {
467 let (writer_tx, mut writer_rx) = mpsc::channel(capacity.max(1));
468 let writer_join = tokio::task::spawn_blocking(move || {
469 let mut part = part;
470 while let Some(command) = writer_rx.blocking_recv() {
471 match command {
472 WriterCommand::Write {
473 index,
474 chunk_offset,
475 bytes,
476 } => {
477 part.write_chunk_bytes(index, chunk_offset, &bytes)?;
478 }
479 WriterCommand::Commit { index, result } => {
480 let metadata = match part.commit_chunk(index) {
481 Ok(()) => part.metadata().clone(),
482 Err(err) => {
483 let message = err.to_string();
484 let _ = result.send(Err(err));
485 return Err(TakanawaError::Ffi(format!(
486 "part writer commit failed: {message}"
487 )));
488 }
489 };
490 let _ = result.send(Ok(metadata));
491 }
492 }
493 }
494 Ok(part)
495 });
496 (writer_tx, writer_join)
497}
498
499async fn finish_part_writer(
500 writer_tx: mpsc::Sender<WriterCommand>,
501 writer_join: tokio::task::JoinHandle<Result<PartFile>>,
502) -> Result<PartFile> {
503 drop(writer_tx);
504 writer_join
505 .await
506 .map_err(|err| TakanawaError::Ffi(format!("part writer task failed: {err}")))?
507}
508
509async fn finish_part_writer_after_error(
510 writer_tx: mpsc::Sender<WriterCommand>,
511 writer_join: tokio::task::JoinHandle<Result<PartFile>>,
512 err: TakanawaError,
513) -> TakanawaError {
514 match finish_part_writer(writer_tx, writer_join).await {
515 Err(writer_err) if matches!(err, TakanawaError::Ffi(_)) => writer_err,
516 Ok(_) | Err(TakanawaError::Ffi(_)) => err,
517 Err(writer_err) => writer_err,
518 }
519}
520
521async fn finalize_part(part: PartFile, config: &DownloadConfig, state: &SharedState) -> Result<()> {
522 let target_path = config.target_path.clone();
523 tokio::task::spawn_blocking(move || part.finalize(&target_path))
524 .await
525 .map_err(|err| TakanawaError::Ffi(format!("finalize task failed: {err}")))??;
526 state.mark_completed();
527 Ok(())
528}
529
530async fn probe_with_retry(
531 engine: &DownloadEngine,
532 config: &DownloadConfig,
533 state: &SharedState,
534 control: &Control,
535) -> Result<RemoteInfo> {
536 let retry = config.retry.normalized();
537 let mut delay = retry.backoff_initial;
538 for attempt in 1..=retry.max_attempts() {
539 if control.cancel.load(Ordering::Relaxed) {
540 return Err(TakanawaError::Cancelled);
541 }
542 match with_total_timeout(config.timeout.total, probe_once(engine, &config.url, state)).await
543 {
544 Ok(remote) => return Ok(remote),
545 Err(err) if err.is_retryable() && attempt < retry.max_attempts() => {
546 tokio::time::sleep(delay).await;
547 delay = (delay * 2).min(retry.backoff_max);
548 }
549 Err(err) => return Err(err),
550 }
551 }
552 Err(TakanawaError::Network(
553 "probe exhausted retry attempts".to_owned(),
554 ))
555}
556
557async fn fetch_chunk_with_retry(
558 engine: &DownloadEngine,
559 config: &DownloadConfig,
560 chunk: Chunk,
561 state: &SharedState,
562 control: &Control,
563 bandwidth: &BandwidthLimiter,
564 writer_tx: &mpsc::Sender<WriterCommand>,
565) -> Result<ChunkTaskResult> {
566 let retry = config.retry.normalized();
567 let mut delay = retry.backoff_initial;
568 for attempt in 1..=retry.max_attempts() {
569 if control.cancel.load(Ordering::Relaxed) {
570 return Err(TakanawaError::Cancelled);
571 }
572 if control.pause.load(Ordering::Relaxed) {
573 return Ok(ChunkTaskResult::Paused);
574 }
575 match with_total_timeout(
576 config.timeout.total,
577 fetch_chunk_once(
578 engine,
579 &config.url,
580 chunk,
581 state,
582 control,
583 bandwidth,
584 writer_tx,
585 ),
586 )
587 .await
588 {
589 Ok(FetchChunkStatus::Complete) => {
590 if control.cancel.load(Ordering::Relaxed) {
591 return Err(TakanawaError::Cancelled);
592 }
593 if control.pause.load(Ordering::Relaxed) {
594 return Ok(ChunkTaskResult::Paused);
595 }
596 let metadata = commit_written_chunk(writer_tx, chunk.index).await?;
597 return Ok(ChunkTaskResult::Committed(metadata));
598 }
599 Ok(FetchChunkStatus::Paused) => return Ok(ChunkTaskResult::Paused),
600 Err(err) if err.is_retryable() && attempt < retry.max_attempts() => {
601 tokio::time::sleep(delay).await;
602 delay = (delay * 2).min(retry.backoff_max);
603 }
604 Err(err) => return Err(err),
605 }
606 }
607 Err(TakanawaError::Network(format!(
608 "chunk {} exhausted retry attempts",
609 chunk.index
610 )))
611}
612
613async fn probe_once(engine: &DownloadEngine, url: &str, state: &SharedState) -> Result<RemoteInfo> {
614 let _permit = engine.limiter.acquire().await;
615 let _active_io = ActiveIoGuard::new(state.clone());
616 let response = engine
617 .client
618 .get(url)
619 .header(RANGE, "bytes=0-0")
620 .header(ACCEPT_ENCODING, "identity")
621 .send()
622 .await
623 .map_err(map_reqwest_error)?;
624
625 if response.status() == StatusCode::RANGE_NOT_SATISFIABLE {
626 let total = response
627 .headers()
628 .get(CONTENT_RANGE)
629 .ok_or_else(|| {
630 TakanawaError::HttpProtocol("416 response missing Content-Range".to_owned())
631 })?
632 .to_str()
633 .map_err(|err| {
634 TakanawaError::HttpProtocol(format!("invalid Content-Range header: {err}"))
635 })
636 .and_then(parse_unsatisfied_total)?;
637 if total == 0 {
638 return Ok(RemoteInfo {
639 content_len: 0,
640 etag: header_to_string(response.headers(), ETAG)?,
641 last_modified: header_to_string(response.headers(), LAST_MODIFIED)?,
642 });
643 }
644 return Err(TakanawaError::HttpProtocol(format!(
645 "probe range was unsatisfied for non-empty resource length {total}"
646 )));
647 }
648
649 validate_status(response.status())?;
650 validate_identity(response.headers())?;
651 let range = response_content_range(&response, 0, 0)?;
652 let content_len = response_content_length(&response)?;
653 if content_len != 1 {
654 return Err(TakanawaError::HttpProtocol(format!(
655 "probe Content-Length mismatch: expected 1, got {content_len}"
656 )));
657 }
658 let headers = response.headers().clone();
659 let body = response.bytes().await.map_err(map_reqwest_error)?;
660 if body.len() != 1 {
661 return Err(TakanawaError::HttpProtocol(format!(
662 "probe body length mismatch: expected 1, got {}",
663 body.len()
664 )));
665 }
666
667 Ok(RemoteInfo {
668 content_len: range.total,
669 etag: header_to_string(&headers, ETAG)?,
670 last_modified: header_to_string(&headers, LAST_MODIFIED)?,
671 })
672}
673
674async fn fetch_chunk_once(
675 engine: &DownloadEngine,
676 url: &str,
677 chunk: Chunk,
678 state: &SharedState,
679 control: &Control,
680 bandwidth: &BandwidthLimiter,
681 writer_tx: &mpsc::Sender<WriterCommand>,
682) -> Result<FetchChunkStatus> {
683 let _permit = engine.limiter.acquire().await;
684 let _active_io = ActiveIoGuard::new(state.clone());
685 let response = engine
686 .client
687 .get(url)
688 .header(RANGE, format!("bytes={}-{}", chunk.start, chunk.end))
689 .header(ACCEPT_ENCODING, "identity")
690 .send()
691 .await
692 .map_err(map_reqwest_error)?;
693
694 validate_status(response.status())?;
695 validate_identity(response.headers())?;
696 let _range = response_content_range(&response, chunk.start, chunk.end)?;
697 let content_len = response_content_length(&response)?;
698 if content_len != chunk.len {
699 return Err(TakanawaError::HttpProtocol(format!(
700 "chunk {} Content-Length mismatch: expected {}, got {content_len}",
701 chunk.index, chunk.len
702 )));
703 }
704 stream_body_to_writer(response, chunk, control, bandwidth, writer_tx).await
705}
706
707async fn stream_body_to_writer(
708 response: reqwest::Response,
709 chunk: Chunk,
710 control: &Control,
711 bandwidth: &BandwidthLimiter,
712 writer_tx: &mpsc::Sender<WriterCommand>,
713) -> Result<FetchChunkStatus> {
714 let mut written = 0_u64;
715 let mut stream = response.bytes_stream();
716 while let Some(bytes) = stream.next().await {
717 if control.cancel.load(Ordering::Relaxed) {
718 return Err(TakanawaError::Cancelled);
719 }
720 if control.pause.load(Ordering::Relaxed) {
721 return Ok(FetchChunkStatus::Paused);
722 }
723 let bytes = bytes.map_err(map_reqwest_error)?;
724 if bytes.is_empty() {
725 continue;
726 }
727 let len = u64::try_from(bytes.len()).map_err(|_| {
728 TakanawaError::HttpProtocol(format!(
729 "chunk {} body fragment length does not fit in file offsets",
730 chunk.index
731 ))
732 })?;
733 let next_written = written.checked_add(len).ok_or_else(|| {
734 TakanawaError::HttpProtocol(format!("chunk {} body length overflow", chunk.index))
735 })?;
736 if next_written > chunk.len {
737 return Err(TakanawaError::HttpProtocol(format!(
738 "chunk {} body length exceeded expected {} bytes",
739 chunk.index, chunk.len
740 )));
741 }
742 bandwidth.throttle(bytes.len()).await;
743 send_writer_write(writer_tx, chunk.index, written, bytes).await?;
744 written = next_written;
745 }
746
747 if written != chunk.len {
748 return Err(TakanawaError::HttpProtocol(format!(
749 "chunk {} body length mismatch: expected {}, got {}",
750 chunk.index, chunk.len, written
751 )));
752 }
753 Ok(FetchChunkStatus::Complete)
754}
755
756async fn send_writer_write(
757 writer_tx: &mpsc::Sender<WriterCommand>,
758 index: u64,
759 chunk_offset: u64,
760 bytes: Bytes,
761) -> Result<()> {
762 writer_tx
763 .send(WriterCommand::Write {
764 index,
765 chunk_offset,
766 bytes,
767 })
768 .await
769 .map_err(|_| TakanawaError::Ffi("part writer stopped before write".to_owned()))
770}
771
772async fn commit_written_chunk(
773 writer_tx: &mpsc::Sender<WriterCommand>,
774 index: u64,
775) -> Result<PartMetadata> {
776 let (result_tx, result_rx) = oneshot::channel();
777 writer_tx
778 .send(WriterCommand::Commit {
779 index,
780 result: result_tx,
781 })
782 .await
783 .map_err(|_| TakanawaError::Ffi("part writer stopped before commit".to_owned()))?;
784 result_rx
785 .await
786 .map_err(|_| TakanawaError::Ffi("part writer stopped during commit".to_owned()))?
787}
788
789async fn with_total_timeout<T>(
790 timeout: Duration,
791 future: impl Future<Output = Result<T>>,
792) -> Result<T> {
793 if timeout.is_zero() {
794 return future.await;
795 }
796 tokio::time::timeout(timeout, future).await.map_err(|_| {
797 TakanawaError::Network(format!("request exceeded {} ms", timeout.as_millis()))
798 })?
799}
800
801#[derive(Debug)]
802struct BandwidthLimiter {
803 bytes_per_second: u64,
804 next_available: Mutex<Instant>,
805}
806
807impl BandwidthLimiter {
808 fn new(bytes_per_second: u64) -> Self {
809 Self {
810 bytes_per_second,
811 next_available: Mutex::new(Instant::now()),
812 }
813 }
814
815 async fn throttle(&self, bytes: usize) {
816 if self.bytes_per_second == 0 || bytes == 0 {
817 return;
818 }
819
820 let now = Instant::now();
821 let wait_until = {
822 let mut next_available = self
823 .next_available
824 .lock()
825 .expect("bandwidth limiter mutex poisoned");
826 let start = (*next_available).max(now);
827 let nanos = (bytes as u128)
828 .saturating_mul(1_000_000_000)
829 .div_ceil(u128::from(self.bytes_per_second));
830 let duration = Duration::from_nanos(u64::try_from(nanos).unwrap_or(u64::MAX));
831 *next_available = start + duration;
832 start
833 };
834
835 if wait_until > now {
836 tokio::time::sleep_until(tokio::time::Instant::from_std(wait_until)).await;
837 }
838 }
839}
840
841fn validate_status(status: StatusCode) -> Result<()> {
842 if status == StatusCode::PARTIAL_CONTENT {
843 return Ok(());
844 }
845 if status == StatusCode::REQUEST_TIMEOUT
846 || status == StatusCode::TOO_MANY_REQUESTS
847 || status.is_server_error()
848 {
849 return Err(TakanawaError::RetryableHttpStatus(status.as_u16()));
850 }
851 Err(TakanawaError::HttpProtocol(format!(
852 "expected 206 Partial Content, got {status}"
853 )))
854}
855
856fn validate_identity(headers: &HeaderMap) -> Result<()> {
857 if let Some(value) = headers.get(CONTENT_ENCODING) {
858 let value = value.to_str().map_err(|err| {
859 TakanawaError::HttpProtocol(format!("invalid Content-Encoding: {err}"))
860 })?;
861 if !value.eq_ignore_ascii_case("identity") {
862 return Err(TakanawaError::HttpProtocol(format!(
863 "unexpected Content-Encoding {value}"
864 )));
865 }
866 }
867 Ok(())
868}
869
870fn response_content_range(
871 response: &reqwest::Response,
872 start: u64,
873 end: u64,
874) -> Result<crate::content_range::ContentRange> {
875 let value = response
876 .headers()
877 .get(CONTENT_RANGE)
878 .ok_or_else(|| TakanawaError::HttpProtocol("missing Content-Range".to_owned()))?
879 .to_str()
880 .map_err(|err| {
881 TakanawaError::HttpProtocol(format!("invalid Content-Range header: {err}"))
882 })?;
883 let range = parse_content_range(value)?;
884 if range.start != start || range.end != end {
885 return Err(TakanawaError::HttpProtocol(format!(
886 "Content-Range mismatch: expected {start}-{end}, got {}-{}",
887 range.start, range.end
888 )));
889 }
890 Ok(range)
891}
892
893fn response_content_length(response: &reqwest::Response) -> Result<u64> {
894 response
895 .headers()
896 .get(CONTENT_LENGTH)
897 .ok_or_else(|| TakanawaError::HttpProtocol("missing Content-Length".to_owned()))?
898 .to_str()
899 .map_err(|err| {
900 TakanawaError::HttpProtocol(format!("invalid Content-Length header: {err}"))
901 })?
902 .parse::<u64>()
903 .map_err(|err| TakanawaError::HttpProtocol(format!("invalid Content-Length: {err}")))
904}
905
906fn header_to_string(
907 headers: &HeaderMap,
908 name: reqwest::header::HeaderName,
909) -> Result<Option<String>> {
910 headers
911 .get(name)
912 .map(|value| {
913 value.to_str().map(str::to_owned).map_err(|err| {
914 TakanawaError::HttpProtocol(format!("invalid response header: {err}"))
915 })
916 })
917 .transpose()
918}
919
920#[allow(clippy::needless_pass_by_value)]
921fn map_reqwest_error(err: reqwest::Error) -> TakanawaError {
922 if err.is_timeout() || err.is_connect() || err.is_request() || err.is_body() || err.is_decode()
923 {
924 TakanawaError::Network(err.to_string())
925 } else {
926 TakanawaError::HttpProtocol(err.to_string())
927 }
928}
929
930struct ActiveIoGuard {
931 state: SharedState,
932}
933
934impl ActiveIoGuard {
935 fn new(state: SharedState) -> Self {
936 state.increment_active_io();
937 Self { state }
938 }
939}
940
941impl Drop for ActiveIoGuard {
942 fn drop(&mut self) {
943 self.state.decrement_active_io();
944 }
945}
946
947#[cfg(test)]
948mod tests {
949 use std::io::{Read, Write};
950 use std::net::{SocketAddr, TcpListener};
951 use std::sync::Arc;
952 use std::thread;
953 use std::time::Duration;
954
955 use sha2::{Digest, Sha256};
956 use tempfile::TempDir;
957
958 use super::*;
959 use crate::DownloadPhase;
960 use crate::limiter::DEFAULT_MAX_IO;
961
962 #[tokio::test]
963 async fn downloads_file_with_ranges() {
964 let data = Arc::new(b"abcdefghijklmnopqrstuvwxyz".to_vec());
965 let addr = spawn_range_server(Arc::clone(&data), false);
966 let dir = TempDir::new().unwrap();
967 let target = dir.path().join("out.bin");
968 let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
969 let config = DownloadConfig {
970 url: format!("http://{addr}/file"),
971 target_path: target.clone(),
972 chunk_size: 5,
973 parallelism: 2,
974 max_parallel_chunks: 0,
975 retry: RetryConfig::default(),
976 timeout: TimeoutConfig::default(),
977 bytes_per_second_limit: 0,
978 hash: HashConfig::None,
979 };
980
981 let snapshot = download_to_completion(engine, config).await.unwrap();
982
983 assert_eq!(snapshot.phase, DownloadPhase::Completed);
984 assert_eq!(std::fs::read(target).unwrap(), data.as_slice());
985 }
986
987 #[tokio::test]
988 async fn rejects_ignored_range() {
989 let data = Arc::new(b"abcdef".to_vec());
990 let addr = spawn_range_server(Arc::clone(&data), true);
991 let dir = TempDir::new().unwrap();
992 let target = dir.path().join("out.bin");
993 let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
994 let config = DownloadConfig {
995 url: format!("http://{addr}/file"),
996 target_path: target,
997 chunk_size: 3,
998 parallelism: 1,
999 max_parallel_chunks: 0,
1000 retry: RetryConfig::default(),
1001 timeout: TimeoutConfig::default(),
1002 bytes_per_second_limit: 0,
1003 hash: HashConfig::None,
1004 };
1005
1006 let err = download_to_completion(engine, config).await.unwrap_err();
1007
1008 assert!(matches!(err, TakanawaError::HttpProtocol(_)));
1009 }
1010
1011 #[tokio::test]
1012 async fn resumes_from_existing_part() {
1013 let data = Arc::new(b"abcdefghijklmnopqrstuvwxyz".to_vec());
1014 let addr = spawn_range_server(Arc::clone(&data), false);
1015 let dir = TempDir::new().unwrap();
1016 let target = dir.path().join("out.bin");
1017 let remote = RemoteInfo {
1018 content_len: data.len() as u64,
1019 etag: None,
1020 last_modified: None,
1021 };
1022 let mut part = PartFile::open_or_create(
1023 &target,
1024 &format!("http://{addr}/file"),
1025 &remote,
1026 5,
1027 HashConfig::None,
1028 )
1029 .unwrap();
1030 part.write_chunk(0, &data[..5]).unwrap();
1031 drop(part);
1032
1033 let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1034 let config = DownloadConfig {
1035 url: format!("http://{addr}/file"),
1036 target_path: target.clone(),
1037 chunk_size: 5,
1038 parallelism: 2,
1039 max_parallel_chunks: 0,
1040 retry: RetryConfig::default(),
1041 timeout: TimeoutConfig::default(),
1042 bytes_per_second_limit: 0,
1043 hash: HashConfig::None,
1044 };
1045
1046 let snapshot = download_to_completion(engine, config).await.unwrap();
1047
1048 assert_eq!(snapshot.phase, DownloadPhase::Completed);
1049 assert_eq!(std::fs::read(target).unwrap(), data.as_slice());
1050 }
1051
1052 #[tokio::test]
1053 async fn retries_after_partial_stream_write() {
1054 let data = Arc::new(b"abcdefghij".to_vec());
1055 let addr = spawn_truncated_once_server(Arc::clone(&data), 2);
1056 let dir = TempDir::new().unwrap();
1057 let target = dir.path().join("out.bin");
1058 let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1059 let config = DownloadConfig {
1060 url: format!("http://{addr}/file"),
1061 target_path: target.clone(),
1062 chunk_size: 5,
1063 parallelism: 1,
1064 max_parallel_chunks: 0,
1065 retry: RetryConfig {
1066 max_retries: 1,
1067 backoff_initial: Duration::from_millis(1),
1068 backoff_max: Duration::from_millis(1),
1069 },
1070 timeout: TimeoutConfig::default(),
1071 bytes_per_second_limit: 0,
1072 hash: HashConfig::None,
1073 };
1074
1075 let snapshot = download_to_completion(engine, config).await.unwrap();
1076
1077 assert_eq!(snapshot.phase, DownloadPhase::Completed);
1078 assert_eq!(std::fs::read(target).unwrap(), data.as_slice());
1079 }
1080
1081 #[test]
1082 fn pause_discards_in_flight_chunk() {
1083 let data = Arc::new(b"abcdefghij".to_vec());
1084 let addr = spawn_delayed_chunk_server(Arc::clone(&data), Duration::from_millis(300));
1085 let dir = TempDir::new().unwrap();
1086 let target = dir.path().join("out.bin");
1087 let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1088 let runtime = Runtime::new().unwrap();
1089 let download = DownloadHandle::new(
1090 engine,
1091 DownloadConfig {
1092 url: format!("http://{addr}/file"),
1093 target_path: target,
1094 chunk_size: 5,
1095 parallelism: 1,
1096 max_parallel_chunks: 0,
1097 retry: RetryConfig::default(),
1098 timeout: TimeoutConfig::default(),
1099 bytes_per_second_limit: 0,
1100 hash: HashConfig::None,
1101 },
1102 );
1103
1104 download.start_on(&runtime).unwrap();
1105 thread::sleep(Duration::from_millis(100));
1106 download.pause().unwrap();
1107
1108 let snapshot = wait_for_phase(&download, DownloadPhase::Paused);
1109
1110 assert_eq!(snapshot.completed_chunks, 0);
1111 assert_eq!(snapshot.downloaded_bytes, 0);
1112 }
1113
1114 #[test]
1115 fn pause_mid_stream_discards_uncommitted_bytes_and_resume_completes() {
1116 let data = Arc::new(b"abcdefghij".to_vec());
1117 let addr = spawn_split_body_server(Arc::clone(&data), 2, Duration::from_millis(300));
1118 let dir = TempDir::new().unwrap();
1119 let target = dir.path().join("out.bin");
1120 let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1121 let runtime = Runtime::new().unwrap();
1122 let download = DownloadHandle::new(
1123 engine,
1124 DownloadConfig {
1125 url: format!("http://{addr}/file"),
1126 target_path: target.clone(),
1127 chunk_size: 5,
1128 parallelism: 1,
1129 max_parallel_chunks: 0,
1130 retry: RetryConfig::default(),
1131 timeout: TimeoutConfig::default(),
1132 bytes_per_second_limit: 0,
1133 hash: HashConfig::None,
1134 },
1135 );
1136
1137 download.start_on(&runtime).unwrap();
1138 thread::sleep(Duration::from_millis(100));
1139 download.pause().unwrap();
1140
1141 let snapshot = wait_for_phase(&download, DownloadPhase::Paused);
1142
1143 assert_eq!(snapshot.completed_chunks, 0);
1144 assert_eq!(snapshot.downloaded_bytes, 0);
1145
1146 download.start_on(&runtime).unwrap();
1147 let snapshot = wait_for_phase(&download, DownloadPhase::Completed);
1148
1149 assert_eq!(snapshot.phase, DownloadPhase::Completed);
1150 assert_eq!(std::fs::read(target).unwrap(), data.as_slice());
1151 }
1152
1153 #[tokio::test]
1154 async fn rejects_existing_target() {
1155 let data = Arc::new(b"abcdef".to_vec());
1156 let addr = spawn_range_server(Arc::clone(&data), false);
1157 let dir = TempDir::new().unwrap();
1158 let target = dir.path().join("out.bin");
1159 std::fs::write(&target, b"already here").unwrap();
1160 let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1161 let config = DownloadConfig {
1162 url: format!("http://{addr}/file"),
1163 target_path: target,
1164 chunk_size: 3,
1165 parallelism: 1,
1166 max_parallel_chunks: 0,
1167 retry: RetryConfig::default(),
1168 timeout: TimeoutConfig::default(),
1169 bytes_per_second_limit: 0,
1170 hash: HashConfig::None,
1171 };
1172
1173 let err = download_to_completion(engine, config).await.unwrap_err();
1174
1175 assert!(matches!(err, TakanawaError::TargetExists(_)));
1176 }
1177
1178 #[tokio::test]
1179 async fn verifies_sha256_before_finalize() {
1180 let data = Arc::new(b"abcdef".to_vec());
1181 let addr = spawn_range_server(Arc::clone(&data), false);
1182 let dir = TempDir::new().unwrap();
1183 let target = dir.path().join("out.bin");
1184 let expected: [u8; 32] = Sha256::digest(data.as_slice()).into();
1185 let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1186 let config = DownloadConfig {
1187 url: format!("http://{addr}/file"),
1188 target_path: target.clone(),
1189 chunk_size: 3,
1190 parallelism: 1,
1191 max_parallel_chunks: 0,
1192 retry: RetryConfig::default(),
1193 timeout: TimeoutConfig::default(),
1194 bytes_per_second_limit: 0,
1195 hash: HashConfig::Sha256(expected),
1196 };
1197
1198 download_to_completion(engine, config).await.unwrap();
1199
1200 assert_eq!(std::fs::read(target).unwrap(), data.as_slice());
1201 }
1202
1203 #[tokio::test]
1204 async fn total_timeout_aborts_slow_chunk() {
1205 let data = Arc::new(b"abcdef".to_vec());
1206 let addr = spawn_delayed_chunk_server(Arc::clone(&data), Duration::from_millis(300));
1207 let dir = TempDir::new().unwrap();
1208 let target = dir.path().join("out.bin");
1209 let engine = DownloadEngine::new(DEFAULT_MAX_IO).unwrap();
1210 let config = DownloadConfig {
1211 url: format!("http://{addr}/file"),
1212 target_path: target,
1213 chunk_size: 3,
1214 parallelism: 1,
1215 max_parallel_chunks: 0,
1216 retry: RetryConfig {
1217 max_retries: 0,
1218 backoff_initial: Duration::from_millis(1),
1219 backoff_max: Duration::from_millis(1),
1220 },
1221 timeout: TimeoutConfig {
1222 connect: Duration::from_secs(30),
1223 read: Duration::ZERO,
1224 total: Duration::from_millis(50),
1225 },
1226 bytes_per_second_limit: 0,
1227 hash: HashConfig::None,
1228 };
1229
1230 let err = download_to_completion(engine, config).await.unwrap_err();
1231
1232 assert!(matches!(err, TakanawaError::Network(_)));
1233 }
1234
1235 fn spawn_range_server(data: Arc<Vec<u8>>, ignore_range: bool) -> SocketAddr {
1236 spawn_range_server_with_chunk_delay(data, ignore_range, None)
1237 }
1238
1239 fn spawn_delayed_chunk_server(data: Arc<Vec<u8>>, delay: Duration) -> SocketAddr {
1240 spawn_range_server_with_chunk_delay(data, false, Some(delay))
1241 }
1242
1243 fn spawn_split_body_server(
1244 data: Arc<Vec<u8>>,
1245 first_body_bytes: usize,
1246 delay: Duration,
1247 ) -> SocketAddr {
1248 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1249 let addr = listener.local_addr().unwrap();
1250 thread::spawn(move || {
1251 for stream in listener.incoming().flatten() {
1252 let data = Arc::clone(&data);
1253 thread::spawn(move || {
1254 handle_split_body_connection(stream, &data, first_body_bytes, delay);
1255 });
1256 }
1257 });
1258 addr
1259 }
1260
1261 fn spawn_truncated_once_server(
1262 data: Arc<Vec<u8>>,
1263 body_bytes_before_close: usize,
1264 ) -> SocketAddr {
1265 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1266 let addr = listener.local_addr().unwrap();
1267 let truncated = Arc::new(std::sync::atomic::AtomicBool::new(false));
1268 thread::spawn(move || {
1269 for stream in listener.incoming().flatten() {
1270 let data = Arc::clone(&data);
1271 let truncated = Arc::clone(&truncated);
1272 thread::spawn(move || {
1273 handle_truncated_once_connection(
1274 stream,
1275 &data,
1276 body_bytes_before_close,
1277 &truncated,
1278 );
1279 });
1280 }
1281 });
1282 addr
1283 }
1284
1285 fn spawn_range_server_with_chunk_delay(
1286 data: Arc<Vec<u8>>,
1287 ignore_range: bool,
1288 chunk_delay: Option<Duration>,
1289 ) -> SocketAddr {
1290 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1291 let addr = listener.local_addr().unwrap();
1292 thread::spawn(move || {
1293 for stream in listener.incoming().flatten() {
1294 let data = Arc::clone(&data);
1295 thread::spawn(move || handle_connection(stream, &data, ignore_range, chunk_delay));
1296 }
1297 });
1298 addr
1299 }
1300
1301 fn handle_connection(
1302 mut stream: std::net::TcpStream,
1303 data: &[u8],
1304 ignore_range: bool,
1305 chunk_delay: Option<Duration>,
1306 ) {
1307 let mut buffer = [0; 4096];
1308 let read = stream.read(&mut buffer).unwrap_or(0);
1309 let request = String::from_utf8_lossy(&buffer[..read]);
1310 let range = request_range(&request);
1311
1312 if ignore_range {
1313 let response = format!("HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", data.len());
1314 stream.write_all(response.as_bytes()).unwrap();
1315 stream.write_all(data).unwrap();
1316 return;
1317 }
1318
1319 let Some((start, end)) = range else {
1320 stream
1321 .write_all(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n")
1322 .unwrap();
1323 return;
1324 };
1325 if start >= data.len() {
1326 let response = format!(
1327 "HTTP/1.1 416 Range Not Satisfiable\r\nContent-Range: bytes */{}\r\nContent-Length: 0\r\n\r\n",
1328 data.len()
1329 );
1330 stream.write_all(response.as_bytes()).unwrap();
1331 return;
1332 }
1333 let end = end.min(data.len() - 1);
1334 let body = &data[start..=end];
1335 if let Some(delay) = chunk_delay {
1336 if !(start == 0 && end == 0) {
1337 thread::sleep(delay);
1338 }
1339 }
1340 let response = format!(
1341 "HTTP/1.1 206 Partial Content\r\nContent-Range: bytes {start}-{end}/{}\r\nContent-Length: {}\r\nAccept-Ranges: bytes\r\n\r\n",
1342 data.len(),
1343 body.len()
1344 );
1345 stream.write_all(response.as_bytes()).unwrap();
1346 stream.write_all(body).unwrap();
1347 }
1348
1349 fn handle_split_body_connection(
1350 mut stream: std::net::TcpStream,
1351 data: &[u8],
1352 first_body_bytes: usize,
1353 delay: Duration,
1354 ) {
1355 let Some((start, end)) = read_request_range(&mut stream) else {
1356 let _ = stream.write_all(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n");
1357 return;
1358 };
1359 let Some(body) = range_body(data, start, end, &mut stream) else {
1360 return;
1361 };
1362 let response = format!(
1363 "HTTP/1.1 206 Partial Content\r\nContent-Range: bytes {start}-{}/{}\r\nContent-Length: {}\r\nAccept-Ranges: bytes\r\n\r\n",
1364 start + body.len() - 1,
1365 data.len(),
1366 body.len()
1367 );
1368 let _ = stream.write_all(response.as_bytes());
1369 if body.len() <= 1 {
1370 let _ = stream.write_all(body);
1371 return;
1372 }
1373 let split_at = first_body_bytes.clamp(1, body.len());
1374 let _ = stream.write_all(&body[..split_at]);
1375 let _ = stream.flush();
1376 thread::sleep(delay);
1377 let _ = stream.write_all(&body[split_at..]);
1378 }
1379
1380 fn handle_truncated_once_connection(
1381 mut stream: std::net::TcpStream,
1382 data: &[u8],
1383 body_bytes_before_close: usize,
1384 truncated: &std::sync::atomic::AtomicBool,
1385 ) {
1386 let Some((start, end)) = read_request_range(&mut stream) else {
1387 let _ = stream.write_all(b"HTTP/1.1 400 Bad Request\r\nContent-Length: 0\r\n\r\n");
1388 return;
1389 };
1390 let Some(body) = range_body(data, start, end, &mut stream) else {
1391 return;
1392 };
1393 let response = format!(
1394 "HTTP/1.1 206 Partial Content\r\nContent-Range: bytes {start}-{}/{}\r\nContent-Length: {}\r\nAccept-Ranges: bytes\r\n\r\n",
1395 start + body.len() - 1,
1396 data.len(),
1397 body.len()
1398 );
1399 let _ = stream.write_all(response.as_bytes());
1400 if body.len() > 1 && !truncated.swap(true, std::sync::atomic::Ordering::SeqCst) {
1401 let end = body_bytes_before_close.min(body.len());
1402 let _ = stream.write_all(&body[..end]);
1403 return;
1404 }
1405 let _ = stream.write_all(body);
1406 }
1407
1408 fn read_request_range(stream: &mut std::net::TcpStream) -> Option<(usize, usize)> {
1409 let mut buffer = [0; 4096];
1410 let read = stream.read(&mut buffer).ok()?;
1411 let request = String::from_utf8_lossy(&buffer[..read]);
1412 request_range(&request)
1413 }
1414
1415 fn request_range(request: &str) -> Option<(usize, usize)> {
1416 let range = request.lines().find_map(|line| {
1417 let (name, value) = line.split_once(':')?;
1418 if name.eq_ignore_ascii_case("range") {
1419 value.trim().strip_prefix("bytes=")
1420 } else {
1421 None
1422 }
1423 })?;
1424 let (start, end) = range.split_once('-')?;
1425 Some((start.parse().ok()?, end.parse().ok()?))
1426 }
1427
1428 fn range_body<'a>(
1429 data: &'a [u8],
1430 start: usize,
1431 end: usize,
1432 stream: &mut std::net::TcpStream,
1433 ) -> Option<&'a [u8]> {
1434 if start >= data.len() {
1435 let response = format!(
1436 "HTTP/1.1 416 Range Not Satisfiable\r\nContent-Range: bytes */{}\r\nContent-Length: 0\r\n\r\n",
1437 data.len()
1438 );
1439 let _ = stream.write_all(response.as_bytes());
1440 return None;
1441 }
1442 let end = end.min(data.len() - 1);
1443 Some(&data[start..=end])
1444 }
1445
1446 fn wait_for_phase(download: &DownloadHandle, phase: DownloadPhase) -> DownloadSnapshot {
1447 for _ in 0..100 {
1448 let snapshot = download.snapshot();
1449 if snapshot.phase == phase {
1450 return snapshot;
1451 }
1452 thread::sleep(Duration::from_millis(20));
1453 }
1454 download.snapshot()
1455 }
1456}