1use std::fs::{self, File, OpenOptions};
2use std::io::{self, Read, Write};
3use std::path::{Component, Path, PathBuf};
4
5use reqwest::header::ACCEPT_ENCODING;
6use serde::{Deserialize, Serialize};
7use sha2::{Digest, Sha256};
8use tokio::io::AsyncWriteExt;
9use zccache_download::{canonical_destination, stable_download_id, DownloadOptions, DownloadPhase};
10
11use crate::DownloadClient;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
14pub enum WaitMode {
15 Block,
16 NoWait,
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20pub enum ArchiveFormat {
21 Auto,
22 None,
23 Zst,
24 Zip,
25 Xz,
26 TarGz,
27 TarXz,
28 TarZst,
29 SevenZip,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
33pub enum FetchStatus {
34 Downloaded,
35 AlreadyPresent,
36 Expanded,
37 AlreadyExpanded,
38 Ready,
39 Locked,
40 DryRun,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44pub enum FetchStateKind {
45 Missing,
46 ArtifactReady,
47 ExpandedReady,
48 Invalid,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
52pub enum DownloadSource {
53 Url(String),
54 MultipartUrls(Vec<String>),
55}
56
57impl DownloadSource {
58 #[must_use]
59 pub fn primary_url(&self) -> &str {
60 match self {
61 Self::Url(url) => url,
62 Self::MultipartUrls(urls) => urls.first().map(String::as_str).unwrap_or(""),
63 }
64 }
65}
66
67impl From<String> for DownloadSource {
68 fn from(value: String) -> Self {
69 Self::Url(value)
70 }
71}
72
73impl From<&str> for DownloadSource {
74 fn from(value: &str) -> Self {
75 Self::Url(value.to_string())
76 }
77}
78
79impl From<Vec<String>> for DownloadSource {
80 fn from(value: Vec<String>) -> Self {
81 Self::MultipartUrls(value)
82 }
83}
84
85#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
86pub struct FetchRequest {
87 pub source: DownloadSource,
88 pub destination_path: PathBuf,
89 pub destination_path_expanded: Option<PathBuf>,
90 pub expected_sha256: Option<String>,
91 pub archive_format: ArchiveFormat,
92 pub wait_mode: WaitMode,
93 pub dry_run: bool,
94 pub force: bool,
95 pub download_options: DownloadOptions,
96}
97
98impl FetchRequest {
99 #[must_use]
100 pub fn new(source: impl Into<DownloadSource>, destination_path: impl Into<PathBuf>) -> Self {
101 Self {
102 source: source.into(),
103 destination_path: destination_path.into(),
104 destination_path_expanded: None,
105 expected_sha256: None,
106 archive_format: ArchiveFormat::Auto,
107 wait_mode: WaitMode::Block,
108 dry_run: false,
109 force: false,
110 download_options: DownloadOptions::default(),
111 }
112 }
113}
114
115#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
116pub struct FetchResult {
117 pub status: FetchStatus,
118 pub cache_path: PathBuf,
119 pub expanded_path: Option<PathBuf>,
120 pub bytes: Option<u64>,
121 pub sha256: String,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
125pub struct FetchState {
126 pub kind: FetchStateKind,
127 pub cache_path: PathBuf,
128 pub expanded_path: Option<PathBuf>,
129 pub bytes: Option<u64>,
130 pub sha256: Option<String>,
131 pub reason: Option<String>,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
135struct ExpandedMarker {
136 source: DownloadSource,
137 cache_path: String,
138 artifact_sha256: String,
139 archive_format: ArchiveFormat,
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
143struct ArtifactMarker {
144 source: DownloadSource,
145 cache_path: String,
146 sha256: String,
147 bytes: u64,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
151struct ArtifactFingerprint {
152 sha256: String,
153 bytes: u64,
154}
155
156#[derive(Debug, Clone)]
157struct ResolvedFetchRequest {
158 source: DownloadSource,
159 cache_path: PathBuf,
160 expanded_path: Option<PathBuf>,
161 expected_sha256: Option<String>,
162 archive_format: ArchiveFormat,
163 wait_mode: WaitMode,
164 dry_run: bool,
165 force: bool,
166 download_options: DownloadOptions,
167}
168
169impl DownloadClient {
170 pub fn fetch(&self, request: FetchRequest) -> Result<FetchResult, String> {
171 let resolved = resolve_request(&request)?;
172 let initial = exists_resolved(&resolved)?;
173 if resolved.force && initial.kind != FetchStateKind::Missing {
174 return Err(format!(
175 "artifact state already exists at {}; purge it before forcing replacement",
176 resolved.cache_path.display()
177 ));
178 }
179 if !resolved.force {
180 match initial.kind {
181 FetchStateKind::ExpandedReady => {
182 return Ok(FetchResult {
183 status: FetchStatus::AlreadyExpanded,
184 cache_path: resolved.cache_path,
185 expanded_path: resolved.expanded_path,
186 bytes: initial.bytes,
187 sha256: initial
188 .sha256
189 .ok_or_else(|| "missing artifact sha256 fingerprint".to_string())?,
190 });
191 }
192 FetchStateKind::ArtifactReady if resolved.expanded_path.is_none() => {
193 return Ok(FetchResult {
194 status: FetchStatus::AlreadyPresent,
195 cache_path: resolved.cache_path,
196 expanded_path: None,
197 bytes: initial.bytes,
198 sha256: initial
199 .sha256
200 .ok_or_else(|| "missing artifact sha256 fingerprint".to_string())?,
201 });
202 }
203 _ => {}
204 }
205 }
206
207 if resolved.dry_run {
208 return Ok(FetchResult {
209 status: FetchStatus::DryRun,
210 cache_path: resolved.cache_path,
211 expanded_path: resolved.expanded_path,
212 bytes: initial.bytes,
213 sha256: initial.sha256.unwrap_or_default(),
214 });
215 }
216
217 let _lock = match acquire_fetch_lock(&resolved) {
218 Ok(lock) => lock,
219 Err(message) if message == "locked" => {
220 return Ok(FetchResult {
221 status: FetchStatus::Locked,
222 cache_path: resolved.cache_path,
223 expanded_path: resolved.expanded_path,
224 bytes: initial.bytes,
225 sha256: initial.sha256.unwrap_or_default(),
226 });
227 }
228 Err(message) => return Err(message),
229 };
230 let current = exists_resolved(&resolved)?;
231 if current.kind == FetchStateKind::Invalid {
232 return Err(format!(
233 "{}; purge the artifact state before retrying",
234 current
235 .reason
236 .clone()
237 .unwrap_or_else(|| "artifact exists but failed validation".to_string())
238 ));
239 }
240 if !resolved.force {
241 match current.kind {
242 FetchStateKind::ExpandedReady => {
243 return Ok(FetchResult {
244 status: FetchStatus::AlreadyExpanded,
245 cache_path: resolved.cache_path,
246 expanded_path: resolved.expanded_path,
247 bytes: current.bytes,
248 sha256: current
249 .sha256
250 .ok_or_else(|| "missing artifact sha256 fingerprint".to_string())?,
251 });
252 }
253 FetchStateKind::ArtifactReady if resolved.expanded_path.is_none() => {
254 return Ok(FetchResult {
255 status: FetchStatus::AlreadyPresent,
256 cache_path: resolved.cache_path,
257 expanded_path: None,
258 bytes: current.bytes,
259 sha256: current
260 .sha256
261 .ok_or_else(|| "missing artifact sha256 fingerprint".to_string())?,
262 });
263 }
264 _ => {}
265 }
266 }
267
268 let mut downloaded_now = false;
269 if resolved.force || current.kind != FetchStateKind::ArtifactReady {
270 match &resolved.source {
271 DownloadSource::Url(url) => {
272 let mut handle = self.download(
273 url,
274 &resolved.cache_path,
275 resolved.download_options.clone(),
276 )?;
277 let status = loop {
278 let status = handle.wait(None)?;
279 if crate::is_terminal(&status) {
280 break status;
281 }
282 };
283 if status.phase != DownloadPhase::Completed {
284 return Err(status.error.unwrap_or_else(|| {
285 format!("download finished in unexpected phase {:?}", status.phase)
286 }));
287 }
288 handle.close()?;
289 }
290 DownloadSource::MultipartUrls(urls) => {
291 download_explicit_parts(urls, &resolved.cache_path)?;
292 }
293 }
294 downloaded_now = true;
295 }
296
297 let fingerprint = match validate_artifact(&resolved) {
298 Ok(fingerprint) => fingerprint,
299 Err(err) => {
300 cleanup_invalid_fetch_state(&resolved);
301 return Err(err);
302 }
303 };
304 write_artifact_marker(&resolved, &fingerprint)?;
305
306 if let Some(expanded_path) = &resolved.expanded_path {
307 let expanded_ready = expanded_marker_matches(&resolved, &fingerprint)?;
308 if !resolved.force && expanded_ready {
309 return Ok(FetchResult {
310 status: if downloaded_now {
311 FetchStatus::Ready
312 } else {
313 FetchStatus::AlreadyExpanded
314 },
315 cache_path: resolved.cache_path.clone(),
316 expanded_path: Some(expanded_path.clone()),
317 bytes: Some(fingerprint.bytes),
318 sha256: fingerprint.sha256.clone(),
319 });
320 }
321
322 if expanded_path.exists() {
323 return Err(format!(
324 "expanded destination {} already exists but is not validated; purge it before retrying",
325 expanded_path.display()
326 ));
327 }
328
329 remove_path_if_exists(&expanded_marker_path(expanded_path))?;
330 extract_archive(&resolved, expanded_path)?;
331 write_expanded_marker(&resolved, &fingerprint)?;
332 return Ok(FetchResult {
333 status: FetchStatus::Expanded,
334 cache_path: resolved.cache_path.clone(),
335 expanded_path: Some(expanded_path.clone()),
336 bytes: Some(fingerprint.bytes),
337 sha256: fingerprint.sha256,
338 });
339 }
340
341 Ok(FetchResult {
342 status: if downloaded_now {
343 FetchStatus::Downloaded
344 } else {
345 FetchStatus::AlreadyPresent
346 },
347 cache_path: resolved.cache_path.clone(),
348 expanded_path: None,
349 bytes: Some(fingerprint.bytes),
350 sha256: fingerprint.sha256,
351 })
352 }
353
354 pub fn exists(&self, request: &FetchRequest) -> Result<FetchState, String> {
355 let resolved = resolve_request_no_create(request)?;
356 exists_resolved(&resolved)
357 }
358}
359
360fn resolve_request(request: &FetchRequest) -> Result<ResolvedFetchRequest, String> {
361 Ok(ResolvedFetchRequest {
362 source: normalize_source(request.source.clone())?,
363 cache_path: canonical_destination(&request.destination_path)
364 .map_err(|e| e.to_string())?
365 .into_path_buf(),
366 expanded_path: request
367 .destination_path_expanded
368 .as_ref()
369 .map(|p| normalize_target(p, true))
370 .transpose()?,
371 expected_sha256: request.expected_sha256.clone().map(normalize_sha256),
372 archive_format: request.archive_format,
373 wait_mode: request.wait_mode,
374 dry_run: request.dry_run,
375 force: request.force,
376 download_options: request.download_options.clone(),
377 })
378}
379
380fn resolve_request_no_create(request: &FetchRequest) -> Result<ResolvedFetchRequest, String> {
381 Ok(ResolvedFetchRequest {
382 source: normalize_source(request.source.clone())?,
383 cache_path: normalize_target(&request.destination_path, false)?,
384 expanded_path: request
385 .destination_path_expanded
386 .as_ref()
387 .map(|p| normalize_target(p, false))
388 .transpose()?,
389 expected_sha256: request.expected_sha256.clone().map(normalize_sha256),
390 archive_format: request.archive_format,
391 wait_mode: request.wait_mode,
392 dry_run: request.dry_run,
393 force: request.force,
394 download_options: request.download_options.clone(),
395 })
396}
397
398fn normalize_target(path: &Path, create_parent: bool) -> Result<PathBuf, String> {
399 let absolute = if path.is_absolute() {
400 path.to_path_buf()
401 } else {
402 std::env::current_dir()
403 .map_err(|e| e.to_string())?
404 .join(path)
405 };
406 let file_name = absolute
407 .file_name()
408 .map(ToOwned::to_owned)
409 .ok_or_else(|| "path must include a terminal file or directory name".to_string())?;
410 let parent = absolute.parent().unwrap_or_else(|| Path::new("."));
411 let canonical_parent = if parent.exists() {
412 std::fs::canonicalize(parent).map_err(|e| e.to_string())?
413 } else if create_parent {
414 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
415 std::fs::canonicalize(parent).map_err(|e| e.to_string())?
416 } else {
417 zccache_core::NormalizedPath::new(parent).into_path_buf()
418 };
419 Ok(canonical_parent.join(file_name))
420}
421
422fn normalize_sha256(value: String) -> String {
423 value.trim().to_ascii_lowercase()
424}
425
426fn normalize_source(source: DownloadSource) -> Result<DownloadSource, String> {
427 match source {
428 DownloadSource::Url(url) => {
429 if url.trim().is_empty() {
430 Err("download source URL must not be empty".to_string())
431 } else {
432 Ok(DownloadSource::Url(url))
433 }
434 }
435 DownloadSource::MultipartUrls(urls) => {
436 if urls.is_empty() {
437 return Err("multipart download source must include at least one URL".to_string());
438 }
439 if urls.iter().any(|url| url.trim().is_empty()) {
440 return Err("multipart download source contains an empty URL".to_string());
441 }
442 Ok(DownloadSource::MultipartUrls(urls))
443 }
444 }
445}
446
447fn exists_resolved(request: &ResolvedFetchRequest) -> Result<FetchState, String> {
448 let cache_exists = request.cache_path.exists();
449 let fingerprint = if cache_exists {
450 Some(read_or_compute_artifact_fingerprint(request)?)
451 } else {
452 None
453 };
454 let cache_valid = fingerprint
455 .as_ref()
456 .map(|fingerprint| artifact_matches_request(request, fingerprint))
457 .unwrap_or(false);
458 let bytes = fingerprint.as_ref().map(|fingerprint| fingerprint.bytes);
459 let sha256 = fingerprint
460 .as_ref()
461 .map(|fingerprint| fingerprint.sha256.clone());
462
463 if let Some(expanded_path) = &request.expanded_path {
464 if cache_valid
465 && expanded_marker_matches(
466 request,
467 fingerprint
468 .as_ref()
469 .ok_or_else(|| "missing artifact fingerprint".to_string())?,
470 )?
471 && expanded_path.exists()
472 {
473 return Ok(FetchState {
474 kind: FetchStateKind::ExpandedReady,
475 cache_path: request.cache_path.clone(),
476 expanded_path: Some(expanded_path.clone()),
477 bytes,
478 sha256,
479 reason: None,
480 });
481 }
482
483 if cache_valid {
484 return Ok(FetchState {
485 kind: FetchStateKind::ArtifactReady,
486 cache_path: request.cache_path.clone(),
487 expanded_path: Some(expanded_path.clone()),
488 bytes,
489 sha256,
490 reason: Some("expanded destination not ready".to_string()),
491 });
492 }
493 } else if cache_valid {
494 return Ok(FetchState {
495 kind: FetchStateKind::ArtifactReady,
496 cache_path: request.cache_path.clone(),
497 expanded_path: None,
498 bytes,
499 sha256,
500 reason: None,
501 });
502 }
503
504 if cache_exists {
505 return Ok(FetchState {
506 kind: FetchStateKind::Invalid,
507 cache_path: request.cache_path.clone(),
508 expanded_path: request.expanded_path.clone(),
509 bytes,
510 sha256,
511 reason: Some("artifact exists but failed validation".to_string()),
512 });
513 }
514
515 Ok(FetchState {
516 kind: FetchStateKind::Missing,
517 cache_path: request.cache_path.clone(),
518 expanded_path: request.expanded_path.clone(),
519 bytes: None,
520 sha256: None,
521 reason: None,
522 })
523}
524
525fn artifact_matches_request(
526 request: &ResolvedFetchRequest,
527 fingerprint: &ArtifactFingerprint,
528) -> bool {
529 request
530 .expected_sha256
531 .as_ref()
532 .map(|expected_sha256| fingerprint.sha256 == *expected_sha256)
533 .unwrap_or(true)
534}
535
536fn validate_artifact(request: &ResolvedFetchRequest) -> Result<ArtifactFingerprint, String> {
537 if !request.cache_path.exists() {
538 return Err(format!(
539 "downloaded artifact missing at {}",
540 request.cache_path.display()
541 ));
542 }
543 let fingerprint =
544 compute_artifact_fingerprint(&request.cache_path).map_err(|e| e.to_string())?;
545 if let Some(expected_sha256) = &request.expected_sha256 {
546 if fingerprint.sha256 != *expected_sha256 {
547 return Err(format!(
548 "sha256 mismatch for {}: expected {}, got {}",
549 request.cache_path.display(),
550 expected_sha256,
551 fingerprint.sha256
552 ));
553 }
554 }
555 Ok(fingerprint)
556}
557
558fn cleanup_invalid_fetch_state(request: &ResolvedFetchRequest) {
559 let _ = remove_path_if_exists(&request.cache_path);
560 let _ = remove_path_if_exists(&artifact_marker_path(&request.cache_path));
561 if let Some(expanded_path) = &request.expanded_path {
562 let _ = remove_path_if_exists(expanded_path);
563 let _ = remove_path_if_exists(&expanded_marker_path(expanded_path));
564 }
565}
566
567fn download_explicit_parts(part_urls: &[String], destination: &Path) -> Result<(), String> {
568 let temp_path = temp_download_path(destination);
569 let runtime = tokio::runtime::Builder::new_current_thread()
570 .enable_all()
571 .build()
572 .map_err(|e| format!("failed to create tokio runtime: {e}"))?;
573 runtime.block_on(async move {
574 let client = reqwest::Client::builder()
575 .user_agent(format!("zccache-download/{}", zccache_core::VERSION))
576 .build()
577 .map_err(|e| e.to_string())?;
578
579 if let Some(parent) = destination.parent() {
580 tokio::fs::create_dir_all(parent)
581 .await
582 .map_err(|e| e.to_string())?;
583 }
584
585 let _ = tokio::fs::remove_file(&temp_path).await;
586
587 let result = async {
588 let mut output = tokio::fs::File::create(&temp_path)
589 .await
590 .map_err(|e| e.to_string())?;
591 for url in part_urls {
592 let mut response = client
593 .get(url)
594 .header(ACCEPT_ENCODING, "identity")
595 .send()
596 .await
597 .map_err(|e| e.to_string())?;
598 if !response.status().is_success() {
599 return Err(format!("unexpected status {} for {url}", response.status()));
600 }
601 while let Some(chunk) = response.chunk().await.map_err(|e| e.to_string())? {
602 output.write_all(&chunk).await.map_err(|e| e.to_string())?;
603 }
604 }
605 output.flush().await.map_err(|e| e.to_string())?;
606 drop(output);
607 if destination.exists() {
608 let _ = tokio::fs::remove_file(destination).await;
609 }
610 tokio::fs::rename(&temp_path, destination)
611 .await
612 .map_err(|e| e.to_string())
613 }
614 .await;
615
616 if result.is_err() {
617 let _ = tokio::fs::remove_file(&temp_path).await;
618 }
619 result
620 })
621}
622
623fn sha256_file(path: &Path) -> io::Result<String> {
624 let mut file = File::open(path)?;
625 let mut hasher = Sha256::new();
626 let mut buf = [0u8; 64 * 1024];
627 loop {
628 let n = file.read(&mut buf)?;
629 if n == 0 {
630 break;
631 }
632 hasher.update(&buf[..n]);
633 }
634 Ok(format!("{:x}", hasher.finalize()))
635}
636
637fn compute_artifact_fingerprint(path: &Path) -> io::Result<ArtifactFingerprint> {
638 let sha256 = sha256_file(path)?;
639 let bytes = fs::metadata(path)?.len();
640 Ok(ArtifactFingerprint { sha256, bytes })
641}
642
643fn temp_download_path(destination: &Path) -> PathBuf {
644 destination.with_extension(format!(
645 "{}part",
646 destination
647 .extension()
648 .map(|ext| format!("{}.", ext.to_string_lossy()))
649 .unwrap_or_default()
650 ))
651}
652
653struct FetchLock {
654 _file: File,
655}
656
657fn acquire_fetch_lock(request: &ResolvedFetchRequest) -> Result<FetchLock, String> {
658 let lock_path = fetch_lock_path(request);
659 if let Some(parent) = lock_path.parent() {
660 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
661 }
662 let file = OpenOptions::new()
663 .read(true)
664 .write(true)
665 .create(true)
666 .truncate(false)
667 .open(&lock_path)
668 .map_err(|e| e.to_string())?;
669 match request.wait_mode {
670 WaitMode::Block => fs2::FileExt::lock_exclusive(&file).map_err(|e| e.to_string())?,
671 WaitMode::NoWait => {
672 if fs2::FileExt::try_lock_exclusive(&file).is_err() {
673 return Err("locked".to_string());
674 }
675 }
676 }
677 Ok(FetchLock { _file: file })
678}
679
680fn fetch_lock_path(request: &ResolvedFetchRequest) -> PathBuf {
681 let mut key = zccache_core::normalize_for_key(&request.cache_path);
682 if let Some(expanded_path) = &request.expanded_path {
683 key.push('\n');
684 key.push_str(&zccache_core::normalize_for_key(expanded_path));
685 }
686 let hash = stable_download_id(Path::new(&key));
687 zccache_core::config::default_cache_dir()
688 .join("downloads")
689 .join("locks")
690 .join(format!("{hash}.lock"))
691 .into_path_buf()
692}
693
694fn artifact_marker_path(cache_path: &Path) -> PathBuf {
695 let hash = stable_download_id(cache_path);
696 zccache_core::config::default_cache_dir()
697 .join("downloads")
698 .join("artifact-state")
699 .join(format!("{hash}.json"))
700 .into_path_buf()
701}
702
703fn expanded_marker_path(expanded_path: &Path) -> PathBuf {
704 let hash = stable_download_id(expanded_path);
705 zccache_core::config::default_cache_dir()
706 .join("downloads")
707 .join("expanded-state")
708 .join(format!("{hash}.json"))
709 .into_path_buf()
710}
711
712fn read_or_compute_artifact_fingerprint(
713 request: &ResolvedFetchRequest,
714) -> Result<ArtifactFingerprint, String> {
715 let fingerprint =
716 compute_artifact_fingerprint(&request.cache_path).map_err(|e| e.to_string())?;
717 if let Ok(content) = fs::read_to_string(artifact_marker_path(&request.cache_path)) {
718 let marker: ArtifactMarker = serde_json::from_str(&content).map_err(|e| e.to_string())?;
719 if marker.source != request.source
720 || marker.cache_path != request.cache_path.to_string_lossy()
721 || marker.sha256 != fingerprint.sha256
722 || marker.bytes != fingerprint.bytes
723 {
724 return Err(format!(
725 "artifact marker for {} does not match the on-disk payload",
726 request.cache_path.display()
727 ));
728 }
729 }
730 Ok(fingerprint)
731}
732
733fn write_artifact_marker(
734 request: &ResolvedFetchRequest,
735 fingerprint: &ArtifactFingerprint,
736) -> Result<(), String> {
737 let marker_path = artifact_marker_path(&request.cache_path);
738 if let Some(parent) = marker_path.parent() {
739 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
740 }
741 let marker = ArtifactMarker {
742 source: request.source.clone(),
743 cache_path: request.cache_path.to_string_lossy().into_owned(),
744 sha256: fingerprint.sha256.clone(),
745 bytes: fingerprint.bytes,
746 };
747 let json = serde_json::to_string(&marker).map_err(|e| e.to_string())?;
748 fs::write(marker_path, json).map_err(|e| e.to_string())
749}
750
751fn expanded_marker_matches(
752 request: &ResolvedFetchRequest,
753 fingerprint: &ArtifactFingerprint,
754) -> Result<bool, String> {
755 let Some(expanded_path) = &request.expanded_path else {
756 return Ok(false);
757 };
758 let marker_path = expanded_marker_path(expanded_path);
759 let marker: ExpandedMarker = match fs::read_to_string(&marker_path) {
760 Ok(content) => serde_json::from_str(&content).map_err(|e| e.to_string())?,
761 Err(_) => return Ok(false),
762 };
763 if marker.source != request.source {
764 return Ok(false);
765 }
766 if marker.cache_path != request.cache_path.to_string_lossy() {
767 return Ok(false);
768 }
769 if marker.artifact_sha256 != fingerprint.sha256 {
770 return Ok(false);
771 }
772 if marker.archive_format != detect_archive_format(request)? {
773 return Ok(false);
774 }
775 Ok(expanded_path.exists())
776}
777
778fn write_expanded_marker(
779 request: &ResolvedFetchRequest,
780 fingerprint: &ArtifactFingerprint,
781) -> Result<(), String> {
782 let Some(expanded_path) = &request.expanded_path else {
783 return Ok(());
784 };
785 let marker_path = expanded_marker_path(expanded_path);
786 if let Some(parent) = marker_path.parent() {
787 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
788 }
789 let marker = ExpandedMarker {
790 source: request.source.clone(),
791 cache_path: request.cache_path.to_string_lossy().into_owned(),
792 artifact_sha256: fingerprint.sha256.clone(),
793 archive_format: detect_archive_format(request)?,
794 };
795 let json = serde_json::to_string(&marker).map_err(|e| e.to_string())?;
796 fs::write(marker_path, json).map_err(|e| e.to_string())
797}
798
799fn detect_archive_format(request: &ResolvedFetchRequest) -> Result<ArchiveFormat, String> {
800 match request.archive_format {
801 ArchiveFormat::Auto => auto_archive_format(&request.cache_path),
802 other => Ok(other),
803 }
804}
805
806fn auto_archive_format(path: &Path) -> Result<ArchiveFormat, String> {
807 let name = path
808 .file_name()
809 .map(|n| n.to_string_lossy().to_ascii_lowercase())
810 .unwrap_or_default();
811 if name.ends_with(".tar.gz") {
812 Ok(ArchiveFormat::TarGz)
813 } else if name.ends_with(".tar.xz") {
814 Ok(ArchiveFormat::TarXz)
815 } else if name.ends_with(".tar.zst") || name.ends_with(".tzst") {
816 Ok(ArchiveFormat::TarZst)
817 } else if name.ends_with(".zip") {
818 Ok(ArchiveFormat::Zip)
819 } else if name.ends_with(".zst") {
820 Ok(ArchiveFormat::Zst)
821 } else if name.ends_with(".xz") {
822 Ok(ArchiveFormat::Xz)
823 } else if name.ends_with(".7z") {
824 Ok(ArchiveFormat::SevenZip)
825 } else {
826 Ok(ArchiveFormat::None)
827 }
828}
829
830fn extract_archive(request: &ResolvedFetchRequest, expanded_path: &Path) -> Result<(), String> {
831 match detect_archive_format(request)? {
832 ArchiveFormat::None => {
833 copy_file(&request.cache_path, expanded_path).map_err(|e| e.to_string())
834 }
835 ArchiveFormat::Zst => {
836 let input = File::open(&request.cache_path).map_err(|e| e.to_string())?;
837 let mut decoder = ruzstd::StreamingDecoder::new(input).map_err(|e| e.to_string())?;
838 write_decoded_to_file(&mut decoder, expanded_path).map_err(|e| e.to_string())
839 }
840 ArchiveFormat::Xz => {
841 let input = File::open(&request.cache_path).map_err(|e| e.to_string())?;
842 if let Some(parent) = expanded_path.parent() {
843 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
844 }
845 let mut output = File::create(expanded_path).map_err(|e| e.to_string())?;
846 let mut input = io::BufReader::new(input);
847 lzma_rs::xz_decompress(&mut input, &mut output).map_err(|e| e.to_string())
848 }
849 ArchiveFormat::Zip => extract_zip(&request.cache_path, expanded_path),
850 ArchiveFormat::TarGz => {
851 let input = File::open(&request.cache_path).map_err(|e| e.to_string())?;
852 let decoder = flate2::read::GzDecoder::new(input);
853 extract_tar(decoder, expanded_path)
854 }
855 ArchiveFormat::TarXz => {
856 let input = File::open(&request.cache_path).map_err(|e| e.to_string())?;
857 let mut decoded = Vec::new();
858 let mut input = io::BufReader::new(input);
859 lzma_rs::xz_decompress(&mut input, &mut decoded).map_err(|e| e.to_string())?;
860 extract_tar(io::Cursor::new(decoded), expanded_path)
861 }
862 ArchiveFormat::TarZst => {
863 let input = File::open(&request.cache_path).map_err(|e| e.to_string())?;
864 let decoder = ruzstd::StreamingDecoder::new(input).map_err(|e| e.to_string())?;
865 extract_tar(decoder, expanded_path)
866 }
867 ArchiveFormat::SevenZip => extract_7z(&request.cache_path, expanded_path),
868 ArchiveFormat::Auto => Err("archive format auto-detection failed".to_string()),
869 }
870}
871
872fn extract_7z(archive_path: &Path, destination: &Path) -> Result<(), String> {
873 fs::create_dir_all(destination).map_err(|e| e.to_string())?;
874 let base = destination.to_path_buf();
875 sevenz_rust::decompress_file_with_extract_fn(
876 archive_path,
877 destination,
878 move |entry, reader, _default_dest| {
879 let relative = Path::new(entry.name());
880 let out_path = safe_join(&base, relative).map_err(std::io::Error::other)?;
881 if entry.is_directory() {
882 fs::create_dir_all(&out_path)?;
883 return Ok(true);
884 }
885 if let Some(parent) = out_path.parent() {
886 fs::create_dir_all(parent)?;
887 }
888 let mut output = File::create(&out_path)?;
889 io::copy(reader, &mut output)?;
890 output.flush()?;
891 Ok(true)
892 },
893 )
894 .map_err(|e| e.to_string())
895}
896
897fn write_decoded_to_file(reader: &mut dyn Read, destination: &Path) -> io::Result<()> {
898 if let Some(parent) = destination.parent() {
899 fs::create_dir_all(parent)?;
900 }
901 let mut output = File::create(destination)?;
902 io::copy(reader, &mut output)?;
903 output.flush()?;
904 Ok(())
905}
906
907fn copy_file(source: &Path, destination: &Path) -> io::Result<()> {
908 if let Some(parent) = destination.parent() {
909 fs::create_dir_all(parent)?;
910 }
911 fs::copy(source, destination)?;
912 Ok(())
913}
914
915fn extract_zip(archive_path: &Path, destination: &Path) -> Result<(), String> {
916 fs::create_dir_all(destination).map_err(|e| e.to_string())?;
917 let file = File::open(archive_path).map_err(|e| e.to_string())?;
918 let mut zip = zip::ZipArchive::new(file).map_err(|e| e.to_string())?;
919 for i in 0..zip.len() {
920 let mut entry = zip.by_index(i).map_err(|e| e.to_string())?;
921 let name = entry
922 .enclosed_name()
923 .map(|p| p.to_path_buf())
924 .ok_or_else(|| format!("unsafe zip entry: {}", entry.name()))?;
925 let out_path = safe_join(destination, &name)?;
926 if entry.is_dir() {
927 fs::create_dir_all(&out_path).map_err(|e| e.to_string())?;
928 continue;
929 }
930 if let Some(mode) = entry.unix_mode() {
931 if (mode & 0o170000) == 0o120000 {
932 return Err(format!(
933 "zip symlink entries are not allowed: {}",
934 entry.name()
935 ));
936 }
937 }
938 if let Some(parent) = out_path.parent() {
939 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
940 }
941 let mut out = File::create(&out_path).map_err(|e| e.to_string())?;
942 io::copy(&mut entry, &mut out).map_err(|e| e.to_string())?;
943 }
944 Ok(())
945}
946
947fn extract_tar<R: Read>(reader: R, destination: &Path) -> Result<(), String> {
948 fs::create_dir_all(destination).map_err(|e| e.to_string())?;
949 let mut archive = tar::Archive::new(reader);
950 let entries = archive.entries().map_err(|e| e.to_string())?;
951 for item in entries {
952 let mut entry = item.map_err(|e| e.to_string())?;
953 let path = entry.path().map_err(|e| e.to_string())?;
954 let out_path = safe_join(destination, &path)?;
955 let entry_type = entry.header().entry_type();
956 if entry_type.is_symlink() || entry_type.is_hard_link() {
957 return Err(format!(
958 "tar link entries are not allowed: {}",
959 path.display()
960 ));
961 }
962 if entry_type.is_dir() {
963 fs::create_dir_all(&out_path).map_err(|e| e.to_string())?;
964 continue;
965 }
966 if let Some(parent) = out_path.parent() {
967 fs::create_dir_all(parent).map_err(|e| e.to_string())?;
968 }
969 let mut out = File::create(&out_path).map_err(|e| e.to_string())?;
970 io::copy(&mut entry, &mut out).map_err(|e| e.to_string())?;
971 }
972 Ok(())
973}
974
975fn safe_join(base: &Path, entry: &Path) -> Result<PathBuf, String> {
976 if entry.is_absolute() {
977 return Err(format!(
978 "absolute archive entry is not allowed: {}",
979 entry.display()
980 ));
981 }
982 let mut clean = PathBuf::new();
983 for component in entry.components() {
984 match component {
985 Component::Normal(part) => clean.push(part),
986 Component::CurDir => {}
987 _ => return Err(format!("unsafe archive entry: {}", entry.display())),
988 }
989 }
990 Ok(base.join(clean))
991}
992
993fn remove_path_if_exists(path: &Path) -> Result<(), String> {
994 if !path.exists() {
995 return Ok(());
996 }
997 if path.is_dir() {
998 fs::remove_dir_all(path).map_err(|e| e.to_string())
999 } else {
1000 fs::remove_file(path).map_err(|e| e.to_string())
1001 }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006 use super::*;
1007
1008 use std::net::{TcpListener, TcpStream};
1009 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1010 use std::sync::Arc;
1011 use std::thread;
1012 use std::time::{Duration, Instant};
1013
1014 use flate2::write::GzEncoder;
1015 use flate2::Compression;
1016 use zccache_download_daemon::DownloadDaemon;
1017
1018 #[derive(Clone)]
1019 struct TestHttpConfig {
1020 body: Arc<Vec<u8>>,
1021 accept_ranges: bool,
1022 send_content_length: bool,
1023 chunk_size: usize,
1024 chunk_delay: Duration,
1025 path: String,
1026 request_started: Option<Arc<AtomicBool>>,
1027 release_response: Option<Arc<AtomicBool>>,
1028 }
1029
1030 struct TestHttpServer {
1031 url: String,
1032 request_count: Arc<AtomicUsize>,
1033 range_request_count: Arc<AtomicUsize>,
1034 shutdown: Arc<AtomicBool>,
1035 thread: Option<thread::JoinHandle<()>>,
1036 }
1037
1038 impl TestHttpServer {
1039 fn start(config: TestHttpConfig) -> Self {
1040 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1041 let addr = listener.local_addr().unwrap();
1042 listener.set_nonblocking(true).unwrap();
1043 let url = format!("http://{addr}/{}", config.path);
1044 let request_count = Arc::new(AtomicUsize::new(0));
1045 let range_request_count = Arc::new(AtomicUsize::new(0));
1046 let shutdown = Arc::new(AtomicBool::new(false));
1047 let request_count_clone = Arc::clone(&request_count);
1048 let range_request_count_clone = Arc::clone(&range_request_count);
1049 let shutdown_clone = Arc::clone(&shutdown);
1050 let config_for_thread = config.clone();
1051 let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel(1);
1052 let thread = thread::spawn(move || {
1053 ready_tx.send(()).unwrap();
1054 while !shutdown_clone.load(Ordering::Relaxed) {
1055 match listener.accept() {
1056 Ok((stream, _)) => {
1057 let config = config_for_thread.clone();
1058 let request_count = Arc::clone(&request_count_clone);
1059 let range_request_count = Arc::clone(&range_request_count_clone);
1060 thread::spawn(move || {
1061 let _ = handle_test_http_connection(
1062 stream,
1063 config,
1064 request_count,
1065 range_request_count,
1066 );
1067 });
1068 }
1069 Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1070 thread::sleep(Duration::from_millis(5));
1071 }
1072 Err(_) => {
1073 thread::sleep(Duration::from_millis(10));
1078 }
1079 }
1080 }
1081 });
1082 ready_rx
1083 .recv_timeout(Duration::from_secs(1))
1084 .expect("test http server failed to start");
1085 wait_for_test_http_server(&addr, &config.path);
1086 request_count.store(0, Ordering::Relaxed);
1087 range_request_count.store(0, Ordering::Relaxed);
1088 Self {
1089 url,
1090 request_count,
1091 range_request_count,
1092 shutdown,
1093 thread: Some(thread),
1094 }
1095 }
1096
1097 fn request_count(&self) -> usize {
1098 self.request_count.load(Ordering::Relaxed)
1099 }
1100
1101 fn range_request_count(&self) -> usize {
1102 self.range_request_count.load(Ordering::Relaxed)
1103 }
1104 }
1105
1106 fn wait_for_test_http_server(addr: &std::net::SocketAddr, path: &str) {
1107 let deadline = Instant::now() + Duration::from_secs(1);
1108 let request = format!("HEAD /{path} HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
1109 while Instant::now() < deadline {
1110 if let Ok(mut stream) = TcpStream::connect(addr) {
1111 if stream
1112 .set_read_timeout(Some(Duration::from_millis(100)))
1113 .is_err()
1114 {
1115 thread::sleep(Duration::from_millis(10));
1116 continue;
1117 }
1118 if stream
1119 .set_write_timeout(Some(Duration::from_millis(100)))
1120 .is_err()
1121 {
1122 thread::sleep(Duration::from_millis(10));
1123 continue;
1124 }
1125 if stream.write_all(request.as_bytes()).is_err() {
1126 thread::sleep(Duration::from_millis(10));
1127 continue;
1128 }
1129 let mut response = Vec::new();
1130 let mut buf = [0u8; 256];
1131 loop {
1132 match stream.read(&mut buf) {
1133 Ok(0) => break,
1134 Ok(n) => {
1135 response.extend_from_slice(&buf[..n]);
1136 if response.windows(4).any(|window| window == b"\r\n\r\n") {
1137 return;
1138 }
1139 }
1140 Err(err)
1141 if err.kind() == io::ErrorKind::WouldBlock
1142 || err.kind() == io::ErrorKind::TimedOut =>
1143 {
1144 break;
1145 }
1146 Err(_) => break,
1147 }
1148 }
1149 }
1150 thread::sleep(Duration::from_millis(10));
1151 }
1152 panic!("test http server at {addr} did not respond in time");
1153 }
1154
1155 fn try_wait_for_test_condition(
1156 timeout: Duration,
1157 description: &str,
1158 mut predicate: impl FnMut() -> bool,
1159 ) -> Result<(), String> {
1160 let deadline = Instant::now() + timeout;
1161 while Instant::now() < deadline {
1162 if predicate() {
1163 return Ok(());
1164 }
1165 thread::sleep(Duration::from_millis(10));
1166 }
1167 Err(format!("timed out waiting for {description}"))
1168 }
1169
1170 fn run_with_self_healing<F>(label: &str, mut attempt_fn: F)
1171 where
1172 F: FnMut(usize) -> Result<(), String>,
1173 {
1174 const MAX_ATTEMPTS: usize = 3;
1175 let mut last_err: Option<String> = None;
1176 for attempt in 1..=MAX_ATTEMPTS {
1177 match attempt_fn(attempt) {
1178 Ok(()) => return,
1179 Err(err) => {
1180 eprintln!("{label}: attempt {attempt}/{MAX_ATTEMPTS} failed: {err}");
1181 last_err = Some(err);
1182 }
1183 }
1184 }
1185 panic!(
1186 "{label} failed after {MAX_ATTEMPTS} attempts: {}",
1187 last_err.unwrap_or_else(|| "unknown error".to_string())
1188 );
1189 }
1190
1191 fn is_transient_http_error(err: &str) -> bool {
1199 err.contains("error sending request") || err.contains("error decoding response body")
1200 }
1201
1202 fn fetch_with_retry(http: &DownloadClient, req: FetchRequest) -> Result<FetchResult, String> {
1203 const MAX_ATTEMPTS: usize = 5;
1204 for attempt in 1..=MAX_ATTEMPTS {
1205 match http.fetch(req.clone()) {
1206 Ok(result) => return Ok(result),
1207 Err(err) => {
1208 if !is_transient_http_error(&err) || attempt == MAX_ATTEMPTS {
1209 return Err(err);
1210 }
1211 thread::sleep(Duration::from_millis(25 * attempt as u64));
1212 }
1213 }
1214 }
1215 unreachable!()
1216 }
1217
1218 impl Drop for TestHttpServer {
1219 fn drop(&mut self) {
1220 self.shutdown.store(true, Ordering::Relaxed);
1221 let _ = TcpStream::connect(
1222 self.url
1223 .trim_start_matches("http://")
1224 .split('/')
1225 .next()
1226 .unwrap_or_default(),
1227 );
1228 if let Some(thread) = self.thread.take() {
1229 let _ = thread.join();
1230 }
1231 }
1232 }
1233
1234 fn handle_test_http_connection(
1235 mut stream: TcpStream,
1236 config: TestHttpConfig,
1237 request_count: Arc<AtomicUsize>,
1238 range_request_count: Arc<AtomicUsize>,
1239 ) -> io::Result<()> {
1240 let mut request = Vec::new();
1241 let mut buf = [0u8; 4096];
1242 loop {
1243 let n = stream.read(&mut buf)?;
1244 if n == 0 {
1245 return Ok(());
1246 }
1247 request.extend_from_slice(&buf[..n]);
1248 if request.windows(4).any(|window| window == b"\r\n\r\n") {
1249 break;
1250 }
1251 }
1252 request_count.fetch_add(1, Ordering::Relaxed);
1253 let request_text = String::from_utf8_lossy(&request);
1254 let mut lines = request_text.lines();
1255 let request_line = lines.next().unwrap_or_default();
1256 let mut parts = request_line.split_whitespace();
1257 let method = parts.next().unwrap_or_default();
1258 let range_header = request_text.lines().find_map(|line| {
1259 let (name, value) = line.split_once(':')?;
1260 if name.eq_ignore_ascii_case("range") {
1261 Some(value.trim().to_string())
1262 } else {
1263 None
1264 }
1265 });
1266
1267 let mut body = (*config.body).clone();
1268 let mut status_line = "HTTP/1.1 200 OK\r\n".to_string();
1269 let mut content_range = None;
1270 if let Some(range) = range_header {
1271 if config.accept_ranges {
1272 if let Some((start, end)) = parse_range(&range, body.len() as u64) {
1273 range_request_count.fetch_add(1, Ordering::Relaxed);
1274 status_line = "HTTP/1.1 206 Partial Content\r\n".to_string();
1275 content_range = Some(format!("bytes {start}-{end}/{}", body.len()));
1276 body = body[start as usize..=end as usize].to_vec();
1277 }
1278 }
1279 }
1280
1281 let mut headers = String::new();
1282 headers.push_str("Connection: close\r\n");
1283 headers.push_str("Content-Type: application/octet-stream\r\n");
1284 if config.accept_ranges {
1285 headers.push_str("Accept-Ranges: bytes\r\n");
1286 }
1287 if config.send_content_length {
1288 headers.push_str(&format!("Content-Length: {}\r\n", body.len()));
1289 }
1290 if let Some(content_range) = content_range {
1291 headers.push_str(&format!("Content-Range: {content_range}\r\n"));
1292 }
1293
1294 stream.write_all(status_line.as_bytes())?;
1295 stream.write_all(headers.as_bytes())?;
1296 stream.write_all(b"\r\n")?;
1297
1298 if method.eq_ignore_ascii_case("HEAD") {
1299 stream.flush()?;
1300 return Ok(());
1301 }
1302
1303 let first_body_request = config
1304 .request_started
1305 .as_ref()
1306 .map(|request_started| {
1307 request_started
1308 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1309 .is_ok()
1310 })
1311 .unwrap_or(false);
1312 if first_body_request {
1313 if let Some(release_response) = &config.release_response {
1314 while !release_response.load(Ordering::Acquire) {
1315 thread::sleep(Duration::from_millis(5));
1316 }
1317 }
1318 }
1319
1320 if config.chunk_size == 0 {
1321 stream.write_all(&body)?;
1322 } else {
1323 for chunk in body.chunks(config.chunk_size) {
1324 stream.write_all(chunk)?;
1325 stream.flush()?;
1326 if !config.chunk_delay.is_zero() {
1327 thread::sleep(config.chunk_delay);
1328 }
1329 }
1330 }
1331 stream.flush()?;
1332 Ok(())
1333 }
1334
1335 fn parse_range(header: &str, total_len: u64) -> Option<(u64, u64)> {
1336 let range = header.strip_prefix("bytes=")?;
1337 let (start, end) = range.split_once('-')?;
1338 let start = start.parse::<u64>().ok()?;
1339 let end = if end.is_empty() {
1340 total_len.checked_sub(1)?
1341 } else {
1342 end.parse::<u64>().ok()?
1343 };
1344 if start > end || end >= total_len {
1345 return None;
1346 }
1347 Some((start, end))
1348 }
1349
1350 struct TestDaemon {
1351 endpoint: String,
1352 shutdown: Arc<tokio::sync::Notify>,
1353 thread: Option<thread::JoinHandle<()>>,
1354 }
1355
1356 impl TestDaemon {
1357 fn start() -> Self {
1358 let endpoint = unique_test_endpoint();
1359 let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel(1);
1360 let endpoint_for_thread = endpoint.clone();
1361 let thread = thread::spawn(move || {
1362 let runtime = tokio::runtime::Builder::new_current_thread()
1363 .enable_all()
1364 .build()
1365 .unwrap();
1366 runtime.block_on(async move {
1367 let mut daemon = DownloadDaemon::bind(&endpoint_for_thread).unwrap();
1368 ready_tx.send(daemon.shutdown_handle()).unwrap();
1369 daemon.run().await.unwrap();
1370 });
1371 });
1372 let shutdown = ready_rx
1373 .recv_timeout(Duration::from_secs(5))
1374 .expect("download daemon failed to bind");
1375 let client = DownloadClient::new(Some(endpoint.clone()));
1376 let deadline = Instant::now() + Duration::from_secs(5);
1377 while Instant::now() < deadline {
1378 if client.daemon_status().is_ok() {
1379 return Self {
1380 endpoint,
1381 shutdown,
1382 thread: Some(thread),
1383 };
1384 }
1385 thread::sleep(Duration::from_millis(50));
1386 }
1387 panic!("download daemon did not start in time");
1388 }
1389 }
1390
1391 impl Drop for TestDaemon {
1392 fn drop(&mut self) {
1393 self.shutdown.notify_one();
1394 if let Some(thread) = self.thread.take() {
1395 let _ = thread.join();
1396 }
1397 }
1398 }
1399
1400 fn unique_test_endpoint() -> String {
1401 static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
1402 let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
1403 #[cfg(windows)]
1404 {
1405 format!(
1406 r"\\.\pipe\zccache-download-test-{}-{id}",
1407 std::process::id()
1408 )
1409 }
1410 #[cfg(unix)]
1411 {
1412 std::env::temp_dir()
1413 .join(format!(
1414 "zccache-download-test-{}-{id}.sock",
1415 std::process::id()
1416 ))
1417 .display()
1418 .to_string()
1419 }
1420 }
1421
1422 fn sha256_hex(data: &[u8]) -> String {
1423 let mut hasher = Sha256::new();
1424 hasher.update(data);
1425 format!("{:x}", hasher.finalize())
1426 }
1427
1428 #[test]
1429 fn auto_detect_archive_formats() {
1430 assert_eq!(
1431 auto_archive_format(Path::new("toolchain.tar.gz")).unwrap(),
1432 ArchiveFormat::TarGz
1433 );
1434 assert_eq!(
1435 auto_archive_format(Path::new("toolchain.tar.xz")).unwrap(),
1436 ArchiveFormat::TarXz
1437 );
1438 assert_eq!(
1439 auto_archive_format(Path::new("toolchain.tar.zst")).unwrap(),
1440 ArchiveFormat::TarZst
1441 );
1442 assert_eq!(
1443 auto_archive_format(Path::new("toolchain.zip")).unwrap(),
1444 ArchiveFormat::Zip
1445 );
1446 assert_eq!(
1447 auto_archive_format(Path::new("toolchain.7z")).unwrap(),
1448 ArchiveFormat::SevenZip
1449 );
1450 }
1451
1452 #[test]
1453 fn safe_join_rejects_parent_traversal() {
1454 let err = safe_join(Path::new("out"), Path::new("../evil")).unwrap_err();
1455 assert!(err.contains("unsafe"));
1456 }
1457
1458 #[test]
1459 fn zip_extraction_rejects_path_traversal() {
1460 let dir = tempfile::tempdir().unwrap();
1461 let archive = dir.path().join("bad.zip");
1462 {
1463 let file = File::create(&archive).unwrap();
1464 let mut zip = zip::ZipWriter::new(file);
1465 let options = zip::write::SimpleFileOptions::default();
1466 zip.start_file("../evil.txt", options).unwrap();
1467 zip.write_all(b"bad").unwrap();
1468 zip.finish().unwrap();
1469 }
1470 let out = dir.path().join("extract");
1471 let err = extract_zip(&archive, &out).unwrap_err();
1472 assert!(err.contains("unsafe zip entry"));
1473 }
1474
1475 #[test]
1476 fn tar_gz_extracts_regular_files() {
1477 let dir = tempfile::tempdir().unwrap();
1478 let archive = dir.path().join("ok.tar.gz");
1479 {
1480 let file = File::create(&archive).unwrap();
1481 let encoder = GzEncoder::new(file, Compression::default());
1482 let mut builder = tar::Builder::new(encoder);
1483 let data = b"hello";
1484 let mut header = tar::Header::new_gnu();
1485 header.set_size(data.len() as u64);
1486 header.set_mode(0o644);
1487 header.set_cksum();
1488 builder
1489 .append_data(&mut header, "bin/tool.txt", &data[..])
1490 .unwrap();
1491 builder.finish().unwrap();
1492 }
1493 let out = dir.path().join("extract");
1494 let file = File::open(&archive).unwrap();
1495 let decoder = flate2::read::GzDecoder::new(file);
1496 extract_tar(decoder, &out).unwrap();
1497 assert_eq!(
1498 fs::read(out.join("bin").join("tool.txt")).unwrap(),
1499 b"hello"
1500 );
1501 }
1502
1503 #[test]
1504 fn fetch_cache_miss_then_hit_and_exists_stay_local() {
1505 let daemon = TestDaemon::start();
1506 let body = b"artifact payload".to_vec();
1507 let server = TestHttpServer::start(TestHttpConfig {
1508 body: Arc::new(body.clone()),
1509 accept_ranges: false,
1510 send_content_length: true,
1511 chunk_size: 0,
1512 chunk_delay: Duration::ZERO,
1513 path: "artifact.bin".to_string(),
1514 request_started: None,
1515 release_response: None,
1516 });
1517 let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1518 let dir = tempfile::tempdir().unwrap();
1519 let mut request = FetchRequest::new(server.url.clone(), dir.path().join("artifact.bin"));
1520 request.expected_sha256 = Some(sha256_hex(&body));
1521
1522 let first = fetch_with_retry(&client, request.clone()).unwrap();
1523 assert_eq!(first.status, FetchStatus::Downloaded);
1524 assert_eq!(first.sha256, sha256_hex(&body));
1525 let requests_after_first = server.request_count();
1526 assert!(requests_after_first > 0);
1527
1528 let second = fetch_with_retry(&client, request.clone()).unwrap();
1529 assert_eq!(second.status, FetchStatus::AlreadyPresent);
1530 assert_eq!(server.request_count(), requests_after_first);
1531
1532 let state = client.exists(&request).unwrap();
1533 assert_eq!(state.kind, FetchStateKind::ArtifactReady);
1534 assert_eq!(state.sha256.as_deref(), Some(first.sha256.as_str()));
1535 assert_eq!(server.request_count(), requests_after_first);
1536 }
1537
1538 #[test]
1539 fn fetch_checksum_mismatch_cleans_up_invalid_artifact() {
1540 let daemon = TestDaemon::start();
1541 let body = b"wrong checksum body".to_vec();
1542 let server = TestHttpServer::start(TestHttpConfig {
1543 body: Arc::new(body),
1544 accept_ranges: false,
1545 send_content_length: true,
1546 chunk_size: 0,
1547 chunk_delay: Duration::ZERO,
1548 path: "bad.bin".to_string(),
1549 request_started: None,
1550 release_response: None,
1551 });
1552 let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1553 let dir = tempfile::tempdir().unwrap();
1554 let destination = dir.path().join("bad.bin");
1555 let mut request = FetchRequest::new(server.url.clone(), &destination);
1556 request.expected_sha256 = Some("00".repeat(32));
1557
1558 let err = fetch_with_retry(&client, request.clone()).unwrap_err();
1559 assert!(err.contains("sha256 mismatch"));
1560 assert!(!destination.exists());
1561
1562 let state = client.exists(&request).unwrap();
1563 assert_eq!(state.kind, FetchStateKind::Missing);
1564 }
1565
1566 #[test]
1567 fn fetch_single_url_max_connections_uses_range_requests() {
1568 let daemon = TestDaemon::start();
1569 let body: Vec<u8> = (0..128 * 1024).map(|i| (i % 251) as u8).collect();
1570 let server = TestHttpServer::start(TestHttpConfig {
1571 body: Arc::new(body.clone()),
1572 accept_ranges: true,
1573 send_content_length: true,
1574 chunk_size: 4096,
1575 chunk_delay: Duration::ZERO,
1576 path: "multipart.bin".to_string(),
1577 request_started: None,
1578 release_response: None,
1579 });
1580 let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1581 let dir = tempfile::tempdir().unwrap();
1582 let mut request = FetchRequest::new(server.url.clone(), dir.path().join("multipart.bin"));
1583 request.download_options.max_connections = Some(4);
1584 request.download_options.min_segment_size = Some(1024);
1585 request.expected_sha256 = Some(sha256_hex(&body));
1586
1587 let result = fetch_with_retry(&client, request).unwrap();
1588 assert_eq!(result.status, FetchStatus::Downloaded);
1589 assert_eq!(result.sha256, sha256_hex(&body));
1590 assert!(server.range_request_count() >= 2);
1591 }
1592
1593 #[test]
1594 fn fetch_explicit_multipart_urls_concatenates_and_stays_local() {
1595 let daemon = TestDaemon::start();
1596 let part_a = b"hello ".to_vec();
1597 let part_b = b"multipart ".to_vec();
1598 let part_c = b"world".to_vec();
1599 let mut full = Vec::new();
1600 full.extend_from_slice(&part_a);
1601 full.extend_from_slice(&part_b);
1602 full.extend_from_slice(&part_c);
1603
1604 let server_a = TestHttpServer::start(TestHttpConfig {
1605 body: Arc::new(part_a),
1606 accept_ranges: false,
1607 send_content_length: true,
1608 chunk_size: 0,
1609 chunk_delay: Duration::ZERO,
1610 path: "artifact.part-aa".to_string(),
1611 request_started: None,
1612 release_response: None,
1613 });
1614 let server_b = TestHttpServer::start(TestHttpConfig {
1615 body: Arc::new(part_b),
1616 accept_ranges: false,
1617 send_content_length: true,
1618 chunk_size: 0,
1619 chunk_delay: Duration::ZERO,
1620 path: "artifact.part-ab".to_string(),
1621 request_started: None,
1622 release_response: None,
1623 });
1624 let server_c = TestHttpServer::start(TestHttpConfig {
1625 body: Arc::new(part_c),
1626 accept_ranges: false,
1627 send_content_length: true,
1628 chunk_size: 0,
1629 chunk_delay: Duration::ZERO,
1630 path: "artifact.part-ac".to_string(),
1631 request_started: None,
1632 release_response: None,
1633 });
1634
1635 let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1636 let dir = tempfile::tempdir().unwrap();
1637 let destination = dir.path().join("artifact.bin");
1638 let mut request = FetchRequest::new(
1639 vec![
1640 server_a.url.clone(),
1641 server_b.url.clone(),
1642 server_c.url.clone(),
1643 ],
1644 &destination,
1645 );
1646 request.expected_sha256 = Some(sha256_hex(&full));
1647
1648 let first = fetch_with_retry(&client, request.clone()).unwrap();
1649 assert_eq!(first.status, FetchStatus::Downloaded);
1650 assert_eq!(first.sha256, sha256_hex(&full));
1651 assert_eq!(fs::read(&destination).unwrap(), full);
1652 let request_counts = (
1653 server_a.request_count(),
1654 server_b.request_count(),
1655 server_c.request_count(),
1656 );
1657
1658 let second = fetch_with_retry(&client, request.clone()).unwrap();
1659 assert_eq!(second.status, FetchStatus::AlreadyPresent);
1660 assert_eq!(
1661 (
1662 server_a.request_count(),
1663 server_b.request_count(),
1664 server_c.request_count()
1665 ),
1666 request_counts
1667 );
1668
1669 let state = client.exists(&request).unwrap();
1670 assert_eq!(state.kind, FetchStateKind::ArtifactReady);
1671 assert_eq!(state.sha256.as_deref(), Some(first.sha256.as_str()));
1672 }
1673
1674 #[test]
1675 fn fetch_no_wait_returns_locked_while_other_client_is_downloading() {
1676 run_with_self_healing(
1677 "fetch_no_wait_returns_locked_while_other_client_is_downloading",
1678 |attempt| {
1679 let daemon = TestDaemon::start();
1680 let request_started = Arc::new(AtomicBool::new(false));
1681 let release_response = Arc::new(AtomicBool::new(false));
1682 let body: Vec<u8> = (0..512 * 1024).map(|i| (i % 251) as u8).collect();
1683 let server = TestHttpServer::start(TestHttpConfig {
1684 body: Arc::new(body),
1685 accept_ranges: false,
1686 send_content_length: true,
1687 chunk_size: 4096,
1688 chunk_delay: Duration::from_millis(2),
1689 path: "slow.bin".to_string(),
1690 request_started: Some(Arc::clone(&request_started)),
1691 release_response: Some(Arc::clone(&release_response)),
1692 });
1693 let dest_dir = tempfile::tempdir().map_err(|e| e.to_string())?;
1694 let destination = dest_dir.path().join(format!("slow-{attempt}.bin"));
1695
1696 let endpoint = daemon.endpoint.clone();
1697 let url = server.url.clone();
1698 let destination_for_thread = destination.clone();
1699 let download_thread = thread::spawn(move || {
1700 let client = DownloadClient::new(Some(endpoint));
1701 let request = FetchRequest::new(url, &destination_for_thread);
1702 fetch_with_retry(&client, request)
1703 });
1704
1705 let outcome = (|| -> Result<(), String> {
1706 try_wait_for_test_condition(
1707 Duration::from_secs(30),
1708 "initial download request",
1709 || request_started.load(Ordering::Acquire),
1710 )?;
1711 let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1712 let mut no_wait = FetchRequest::new(server.url.clone(), &destination);
1713 no_wait.wait_mode = WaitMode::NoWait;
1714 let locked = fetch_with_retry(&client, no_wait)
1715 .map_err(|err| format!("no-wait fetch failed: {err}"))?;
1716 if locked.status != FetchStatus::Locked {
1717 return Err(format!("expected Locked status, got {:?}", locked.status));
1718 }
1719 Ok(())
1720 })();
1721
1722 release_response.store(true, Ordering::Release);
1723 let join_result = download_thread.join();
1724 outcome?;
1725 let completed = match join_result {
1726 Ok(Ok(result)) => result,
1727 Ok(Err(err)) => return Err(format!("download thread returned error: {err}")),
1728 Err(_) => return Err("download thread panicked".to_string()),
1729 };
1730 if completed.status != FetchStatus::Downloaded {
1731 return Err(format!(
1732 "expected Downloaded status, got {:?}",
1733 completed.status
1734 ));
1735 }
1736 Ok(())
1737 },
1738 );
1739 }
1740
1741 #[test]
1742 fn fetch_multipart_no_wait_returns_locked_while_other_client_is_downloading() {
1743 run_with_self_healing(
1744 "fetch_multipart_no_wait_returns_locked_while_other_client_is_downloading",
1745 |attempt| {
1746 let daemon = TestDaemon::start();
1747 let request_started = Arc::new(AtomicBool::new(false));
1748 let release_response = Arc::new(AtomicBool::new(false));
1749 let slow_server = TestHttpServer::start(TestHttpConfig {
1750 body: Arc::new((0..512 * 1024).map(|i| (i % 251) as u8).collect()),
1751 accept_ranges: false,
1752 send_content_length: true,
1753 chunk_size: 4096,
1754 chunk_delay: Duration::from_millis(2),
1755 path: "slow.part-aa".to_string(),
1756 request_started: Some(Arc::clone(&request_started)),
1757 release_response: Some(Arc::clone(&release_response)),
1758 });
1759 let fast_server = TestHttpServer::start(TestHttpConfig {
1760 body: Arc::new(b"tail".to_vec()),
1761 accept_ranges: false,
1762 send_content_length: true,
1763 chunk_size: 0,
1764 chunk_delay: Duration::ZERO,
1765 path: "slow.part-ab".to_string(),
1766 request_started: None,
1767 release_response: None,
1768 });
1769 let dest_dir = tempfile::tempdir().map_err(|e| e.to_string())?;
1770 let destination = dest_dir.path().join(format!("slow-{attempt}.bin"));
1771
1772 let endpoint = daemon.endpoint.clone();
1773 let source = vec![slow_server.url.clone(), fast_server.url.clone()];
1774 let destination_for_thread = destination.clone();
1775 let download_thread = thread::spawn(move || {
1776 let client = DownloadClient::new(Some(endpoint));
1777 let request = FetchRequest::new(source, &destination_for_thread);
1778 fetch_with_retry(&client, request)
1779 });
1780
1781 let outcome = (|| -> Result<(), String> {
1782 try_wait_for_test_condition(
1783 Duration::from_secs(30),
1784 "initial multipart download request",
1785 || request_started.load(Ordering::Acquire),
1786 )?;
1787 let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1788 let mut no_wait = FetchRequest::new(
1789 vec![slow_server.url.clone(), fast_server.url.clone()],
1790 &destination,
1791 );
1792 no_wait.wait_mode = WaitMode::NoWait;
1793 let locked = fetch_with_retry(&client, no_wait)
1794 .map_err(|err| format!("no-wait fetch failed: {err}"))?;
1795 if locked.status != FetchStatus::Locked {
1796 return Err(format!("expected Locked status, got {:?}", locked.status));
1797 }
1798 Ok(())
1799 })();
1800
1801 release_response.store(true, Ordering::Release);
1802 let join_result = download_thread.join();
1803 outcome?;
1804 let completed = match join_result {
1805 Ok(Ok(result)) => result,
1806 Ok(Err(err)) => return Err(format!("download thread returned error: {err}")),
1807 Err(_) => return Err("download thread panicked".to_string()),
1808 };
1809 if completed.status != FetchStatus::Downloaded {
1810 return Err(format!(
1811 "expected Downloaded status, got {:?}",
1812 completed.status
1813 ));
1814 }
1815 Ok(())
1816 },
1817 );
1818 }
1819
1820 #[test]
1821 fn fetch_dry_run_avoids_network_and_filesystem_mutation() {
1822 let daemon = TestDaemon::start();
1823 let server = TestHttpServer::start(TestHttpConfig {
1824 body: Arc::new(b"dry-run".to_vec()),
1825 accept_ranges: false,
1826 send_content_length: true,
1827 chunk_size: 0,
1828 chunk_delay: Duration::ZERO,
1829 path: "dry.bin".to_string(),
1830 request_started: None,
1831 release_response: None,
1832 });
1833 let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1834 let dir = tempfile::tempdir().unwrap();
1835 let destination = dir.path().join("dry.bin");
1836 let mut request = FetchRequest::new(server.url.clone(), &destination);
1837 request.dry_run = true;
1838
1839 let result = fetch_with_retry(&client, request).unwrap();
1840 assert_eq!(result.status, FetchStatus::DryRun);
1841 assert_eq!(server.request_count(), 0);
1842 assert!(!destination.exists());
1843 }
1844
1845 #[test]
1846 fn fetch_expands_7z_and_exists_reports_expanded_ready() {
1847 let daemon = TestDaemon::start();
1848 let dir = tempfile::tempdir().unwrap();
1849 let source_dir = dir.path().join("source");
1850 fs::create_dir_all(source_dir.join("bin")).unwrap();
1851 fs::write(source_dir.join("bin").join("tool.txt"), b"tool data").unwrap();
1852 let archive_path = dir.path().join("toolchain.7z");
1853 sevenz_rust::compress_to_path(&source_dir, &archive_path).unwrap();
1854 let archive_bytes = fs::read(&archive_path).unwrap();
1855
1856 let server = TestHttpServer::start(TestHttpConfig {
1857 body: Arc::new(archive_bytes.clone()),
1858 accept_ranges: false,
1859 send_content_length: true,
1860 chunk_size: 0,
1861 chunk_delay: Duration::ZERO,
1862 path: "toolchain.7z".to_string(),
1863 request_started: None,
1864 release_response: None,
1865 });
1866 let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1867 let cache_path = dir.path().join("cache").join("toolchain.7z");
1868 let expanded_path = dir.path().join("expanded");
1869 let mut request = FetchRequest::new(server.url.clone(), &cache_path);
1870 request.destination_path_expanded = Some(expanded_path.clone());
1871 request.expected_sha256 = Some(sha256_hex(&archive_bytes));
1872
1873 let first = fetch_with_retry(&client, request.clone()).unwrap();
1874 assert_eq!(first.status, FetchStatus::Expanded);
1875 assert_eq!(first.sha256, sha256_hex(&archive_bytes));
1876 let extracted = [
1877 expanded_path.join("source").join("bin").join("tool.txt"),
1878 expanded_path.join("bin").join("tool.txt"),
1879 expanded_path.join("tool.txt"),
1880 ]
1881 .into_iter()
1882 .find(|path| path.exists())
1883 .expect("expected extracted file in expanded directory");
1884 assert_eq!(fs::read(extracted).unwrap(), b"tool data");
1885
1886 let state = client.exists(&request).unwrap();
1887 assert_eq!(state.kind, FetchStateKind::ExpandedReady);
1888 assert_eq!(state.sha256.as_deref(), Some(first.sha256.as_str()));
1889
1890 let second = fetch_with_retry(&client, request).unwrap();
1891 assert_eq!(second.status, FetchStatus::AlreadyExpanded);
1892 assert_eq!(second.sha256, first.sha256);
1893 }
1894
1895 #[test]
1896 fn fetch_without_expected_sha_then_validate_later_uses_stored_fingerprint() {
1897 let daemon = TestDaemon::start();
1898 let body = b"artifact with delayed hash".to_vec();
1899 let server = TestHttpServer::start(TestHttpConfig {
1900 body: Arc::new(body.clone()),
1901 accept_ranges: false,
1902 send_content_length: true,
1903 chunk_size: 0,
1904 chunk_delay: Duration::ZERO,
1905 path: "delayed.bin".to_string(),
1906 request_started: None,
1907 release_response: None,
1908 });
1909 let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1910 let dir = tempfile::tempdir().unwrap();
1911 let destination = dir.path().join("delayed.bin");
1912
1913 let first =
1914 fetch_with_retry(&client, FetchRequest::new(server.url.clone(), &destination)).unwrap();
1915 assert_eq!(first.status, FetchStatus::Downloaded);
1916 assert_eq!(first.sha256, sha256_hex(&body));
1917
1918 let mut later = FetchRequest::new(server.url.clone(), &destination);
1919 later.expected_sha256 = Some(first.sha256.clone());
1920 let second = fetch_with_retry(&client, later.clone()).unwrap();
1921 assert_eq!(second.status, FetchStatus::AlreadyPresent);
1922 assert_eq!(second.sha256, first.sha256);
1923
1924 let state = client.exists(&later).unwrap();
1925 assert_eq!(state.kind, FetchStateKind::ArtifactReady);
1926 assert_eq!(state.sha256.as_deref(), Some(second.sha256.as_str()));
1927 }
1928
1929 #[test]
1930 fn expanded_state_remains_valid_when_expected_sha_is_added_later() {
1931 let daemon = TestDaemon::start();
1932 let dir = tempfile::tempdir().unwrap();
1933 let archive_path = dir.path().join("bundle.zip");
1934 {
1935 let file = File::create(&archive_path).unwrap();
1936 let mut zip = zip::ZipWriter::new(file);
1937 let options = zip::write::SimpleFileOptions::default();
1938 zip.start_file("hello.txt", options).unwrap();
1939 zip.write_all(b"hello").unwrap();
1940 zip.finish().unwrap();
1941 }
1942 let archive_bytes = fs::read(&archive_path).unwrap();
1943 let server = TestHttpServer::start(TestHttpConfig {
1944 body: Arc::new(archive_bytes.clone()),
1945 accept_ranges: false,
1946 send_content_length: true,
1947 chunk_size: 0,
1948 chunk_delay: Duration::ZERO,
1949 path: "bundle.zip".to_string(),
1950 request_started: None,
1951 release_response: None,
1952 });
1953 let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1954 let cache_path = dir.path().join("cache").join("bundle.zip");
1955 let expanded_path = dir.path().join("expanded");
1956
1957 let mut initial = FetchRequest::new(server.url.clone(), &cache_path);
1958 initial.destination_path_expanded = Some(expanded_path.clone());
1959 let first = fetch_with_retry(&client, initial).unwrap();
1960 assert_eq!(first.status, FetchStatus::Expanded);
1961
1962 let mut later = FetchRequest::new(server.url.clone(), &cache_path);
1963 later.destination_path_expanded = Some(expanded_path.clone());
1964 later.expected_sha256 = Some(first.sha256.clone());
1965 let second = fetch_with_retry(&client, later.clone()).unwrap();
1966 assert_eq!(second.status, FetchStatus::AlreadyExpanded);
1967 assert_eq!(second.sha256, first.sha256);
1968
1969 let state = client.exists(&later).unwrap();
1970 assert_eq!(state.kind, FetchStateKind::ExpandedReady);
1971 assert_eq!(state.sha256.as_deref(), Some(second.sha256.as_str()));
1972 }
1973
1974 #[test]
1975 fn force_is_rejected_for_existing_artifact_state() {
1976 let daemon = TestDaemon::start();
1977 let body = b"immutable".to_vec();
1978 let server = TestHttpServer::start(TestHttpConfig {
1979 body: Arc::new(body),
1980 accept_ranges: false,
1981 send_content_length: true,
1982 chunk_size: 0,
1983 chunk_delay: Duration::ZERO,
1984 path: "immutable.bin".to_string(),
1985 request_started: None,
1986 release_response: None,
1987 });
1988 let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1989 let dir = tempfile::tempdir().unwrap();
1990 let destination = dir.path().join("immutable.bin");
1991
1992 let _ =
1993 fetch_with_retry(&client, FetchRequest::new(server.url.clone(), &destination)).unwrap();
1994
1995 let mut force = FetchRequest::new(server.url.clone(), &destination);
1996 force.force = true;
1997 let err = fetch_with_retry(&client, force).unwrap_err();
1998 assert!(err.contains("purge"));
1999 }
2000
2001 #[test]
2002 fn fetch_rejects_unsafe_zip_entries_end_to_end() {
2003 let daemon = TestDaemon::start();
2004 let dir = tempfile::tempdir().unwrap();
2005 let archive_path = dir.path().join("unsafe.zip");
2006 {
2007 let file = File::create(&archive_path).unwrap();
2008 let mut zip = zip::ZipWriter::new(file);
2009 let options = zip::write::SimpleFileOptions::default();
2010 zip.start_file("../evil.txt", options).unwrap();
2011 zip.write_all(b"bad").unwrap();
2012 zip.finish().unwrap();
2013 }
2014 let archive_bytes = fs::read(&archive_path).unwrap();
2015 let server = TestHttpServer::start(TestHttpConfig {
2016 body: Arc::new(archive_bytes),
2017 accept_ranges: false,
2018 send_content_length: true,
2019 chunk_size: 0,
2020 chunk_delay: Duration::ZERO,
2021 path: "unsafe.zip".to_string(),
2022 request_started: None,
2023 release_response: None,
2024 });
2025 let client = DownloadClient::new(Some(daemon.endpoint.clone()));
2026 let cache_path = dir.path().join("cache").join("unsafe.zip");
2027 let expanded_path = dir.path().join("expanded");
2028 let mut request = FetchRequest::new(server.url.clone(), &cache_path);
2029 request.destination_path_expanded = Some(expanded_path.clone());
2030
2031 let err = fetch_with_retry(&client, request).unwrap_err();
2032 assert!(err.contains("unsafe zip entry"));
2033 assert!(!dir.path().join("evil.txt").exists());
2034 }
2035}