Skip to main content

zccache_download_client/
artifact.rs

1use std::fs::{self, File, OpenOptions};
2use std::io::{self, Read, Write};
3use std::path::{Component, Path, PathBuf};
4
5use reqwest::header::ACCEPT_ENCODING;
6use serde::{Deserialize, Serialize};
7use sha2::{Digest, Sha256};
8use tokio::io::AsyncWriteExt;
9use zccache_download::{canonical_destination, stable_download_id, DownloadOptions, DownloadPhase};
10
11use crate::DownloadClient;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
14pub enum WaitMode {
15    Block,
16    NoWait,
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20pub enum ArchiveFormat {
21    Auto,
22    None,
23    Zst,
24    Zip,
25    Xz,
26    TarGz,
27    TarXz,
28    TarZst,
29    SevenZip,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
33pub enum FetchStatus {
34    Downloaded,
35    AlreadyPresent,
36    Expanded,
37    AlreadyExpanded,
38    Ready,
39    Locked,
40    DryRun,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44pub enum FetchStateKind {
45    Missing,
46    ArtifactReady,
47    ExpandedReady,
48    Invalid,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
52pub enum DownloadSource {
53    Url(String),
54    MultipartUrls(Vec<String>),
55}
56
57impl DownloadSource {
58    #[must_use]
59    pub fn primary_url(&self) -> &str {
60        match self {
61            Self::Url(url) => url,
62            Self::MultipartUrls(urls) => urls.first().map(String::as_str).unwrap_or(""),
63        }
64    }
65}
66
67impl From<String> for DownloadSource {
68    fn from(value: String) -> Self {
69        Self::Url(value)
70    }
71}
72
73impl From<&str> for DownloadSource {
74    fn from(value: &str) -> Self {
75        Self::Url(value.to_string())
76    }
77}
78
79impl From<Vec<String>> for DownloadSource {
80    fn from(value: Vec<String>) -> Self {
81        Self::MultipartUrls(value)
82    }
83}
84
85#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
86pub struct FetchRequest {
87    pub source: DownloadSource,
88    pub destination_path: PathBuf,
89    pub destination_path_expanded: Option<PathBuf>,
90    pub expected_sha256: Option<String>,
91    pub archive_format: ArchiveFormat,
92    pub wait_mode: WaitMode,
93    pub dry_run: bool,
94    pub force: bool,
95    pub download_options: DownloadOptions,
96}
97
98impl FetchRequest {
99    #[must_use]
100    pub fn new(source: impl Into<DownloadSource>, destination_path: impl Into<PathBuf>) -> Self {
101        Self {
102            source: source.into(),
103            destination_path: destination_path.into(),
104            destination_path_expanded: None,
105            expected_sha256: None,
106            archive_format: ArchiveFormat::Auto,
107            wait_mode: WaitMode::Block,
108            dry_run: false,
109            force: false,
110            download_options: DownloadOptions::default(),
111        }
112    }
113}
114
115#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
116pub struct FetchResult {
117    pub status: FetchStatus,
118    pub cache_path: PathBuf,
119    pub expanded_path: Option<PathBuf>,
120    pub bytes: Option<u64>,
121    pub sha256: String,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
125pub struct FetchState {
126    pub kind: FetchStateKind,
127    pub cache_path: PathBuf,
128    pub expanded_path: Option<PathBuf>,
129    pub bytes: Option<u64>,
130    pub sha256: Option<String>,
131    pub reason: Option<String>,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
135struct ExpandedMarker {
136    source: DownloadSource,
137    cache_path: String,
138    artifact_sha256: String,
139    archive_format: ArchiveFormat,
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
143struct ArtifactMarker {
144    source: DownloadSource,
145    cache_path: String,
146    sha256: String,
147    bytes: u64,
148}
149
150#[derive(Debug, Clone, PartialEq, Eq)]
151struct ArtifactFingerprint {
152    sha256: String,
153    bytes: u64,
154}
155
156#[derive(Debug, Clone)]
157struct ResolvedFetchRequest {
158    source: DownloadSource,
159    cache_path: PathBuf,
160    expanded_path: Option<PathBuf>,
161    expected_sha256: Option<String>,
162    archive_format: ArchiveFormat,
163    wait_mode: WaitMode,
164    dry_run: bool,
165    force: bool,
166    download_options: DownloadOptions,
167}
168
169impl DownloadClient {
170    pub fn fetch(&self, request: FetchRequest) -> Result<FetchResult, String> {
171        let resolved = resolve_request(&request)?;
172        let initial = exists_resolved(&resolved)?;
173        if resolved.force && initial.kind != FetchStateKind::Missing {
174            return Err(format!(
175                "artifact state already exists at {}; purge it before forcing replacement",
176                resolved.cache_path.display()
177            ));
178        }
179        if !resolved.force {
180            match initial.kind {
181                FetchStateKind::ExpandedReady => {
182                    return Ok(FetchResult {
183                        status: FetchStatus::AlreadyExpanded,
184                        cache_path: resolved.cache_path,
185                        expanded_path: resolved.expanded_path,
186                        bytes: initial.bytes,
187                        sha256: initial
188                            .sha256
189                            .ok_or_else(|| "missing artifact sha256 fingerprint".to_string())?,
190                    });
191                }
192                FetchStateKind::ArtifactReady if resolved.expanded_path.is_none() => {
193                    return Ok(FetchResult {
194                        status: FetchStatus::AlreadyPresent,
195                        cache_path: resolved.cache_path,
196                        expanded_path: None,
197                        bytes: initial.bytes,
198                        sha256: initial
199                            .sha256
200                            .ok_or_else(|| "missing artifact sha256 fingerprint".to_string())?,
201                    });
202                }
203                _ => {}
204            }
205        }
206
207        if resolved.dry_run {
208            return Ok(FetchResult {
209                status: FetchStatus::DryRun,
210                cache_path: resolved.cache_path,
211                expanded_path: resolved.expanded_path,
212                bytes: initial.bytes,
213                sha256: initial.sha256.unwrap_or_default(),
214            });
215        }
216
217        let _lock = match acquire_fetch_lock(&resolved) {
218            Ok(lock) => lock,
219            Err(message) if message == "locked" => {
220                return Ok(FetchResult {
221                    status: FetchStatus::Locked,
222                    cache_path: resolved.cache_path,
223                    expanded_path: resolved.expanded_path,
224                    bytes: initial.bytes,
225                    sha256: initial.sha256.unwrap_or_default(),
226                });
227            }
228            Err(message) => return Err(message),
229        };
230        let current = exists_resolved(&resolved)?;
231        if current.kind == FetchStateKind::Invalid {
232            return Err(format!(
233                "{}; purge the artifact state before retrying",
234                current
235                    .reason
236                    .clone()
237                    .unwrap_or_else(|| "artifact exists but failed validation".to_string())
238            ));
239        }
240        if !resolved.force {
241            match current.kind {
242                FetchStateKind::ExpandedReady => {
243                    return Ok(FetchResult {
244                        status: FetchStatus::AlreadyExpanded,
245                        cache_path: resolved.cache_path,
246                        expanded_path: resolved.expanded_path,
247                        bytes: current.bytes,
248                        sha256: current
249                            .sha256
250                            .ok_or_else(|| "missing artifact sha256 fingerprint".to_string())?,
251                    });
252                }
253                FetchStateKind::ArtifactReady if resolved.expanded_path.is_none() => {
254                    return Ok(FetchResult {
255                        status: FetchStatus::AlreadyPresent,
256                        cache_path: resolved.cache_path,
257                        expanded_path: None,
258                        bytes: current.bytes,
259                        sha256: current
260                            .sha256
261                            .ok_or_else(|| "missing artifact sha256 fingerprint".to_string())?,
262                    });
263                }
264                _ => {}
265            }
266        }
267
268        let mut downloaded_now = false;
269        if resolved.force || current.kind != FetchStateKind::ArtifactReady {
270            match &resolved.source {
271                DownloadSource::Url(url) => {
272                    let mut handle = self.download(
273                        url,
274                        &resolved.cache_path,
275                        resolved.download_options.clone(),
276                    )?;
277                    let status = loop {
278                        let status = handle.wait(None)?;
279                        if crate::is_terminal(&status) {
280                            break status;
281                        }
282                    };
283                    if status.phase != DownloadPhase::Completed {
284                        return Err(status.error.unwrap_or_else(|| {
285                            format!("download finished in unexpected phase {:?}", status.phase)
286                        }));
287                    }
288                    handle.close()?;
289                }
290                DownloadSource::MultipartUrls(urls) => {
291                    download_explicit_parts(urls, &resolved.cache_path)?;
292                }
293            }
294            downloaded_now = true;
295        }
296
297        let fingerprint = match validate_artifact(&resolved) {
298            Ok(fingerprint) => fingerprint,
299            Err(err) => {
300                cleanup_invalid_fetch_state(&resolved);
301                return Err(err);
302            }
303        };
304        write_artifact_marker(&resolved, &fingerprint)?;
305
306        if let Some(expanded_path) = &resolved.expanded_path {
307            let expanded_ready = expanded_marker_matches(&resolved, &fingerprint)?;
308            if !resolved.force && expanded_ready {
309                return Ok(FetchResult {
310                    status: if downloaded_now {
311                        FetchStatus::Ready
312                    } else {
313                        FetchStatus::AlreadyExpanded
314                    },
315                    cache_path: resolved.cache_path.clone(),
316                    expanded_path: Some(expanded_path.clone()),
317                    bytes: Some(fingerprint.bytes),
318                    sha256: fingerprint.sha256.clone(),
319                });
320            }
321
322            if expanded_path.exists() {
323                return Err(format!(
324                    "expanded destination {} already exists but is not validated; purge it before retrying",
325                    expanded_path.display()
326                ));
327            }
328
329            remove_path_if_exists(&expanded_marker_path(expanded_path))?;
330            extract_archive(&resolved, expanded_path)?;
331            write_expanded_marker(&resolved, &fingerprint)?;
332            return Ok(FetchResult {
333                status: FetchStatus::Expanded,
334                cache_path: resolved.cache_path.clone(),
335                expanded_path: Some(expanded_path.clone()),
336                bytes: Some(fingerprint.bytes),
337                sha256: fingerprint.sha256,
338            });
339        }
340
341        Ok(FetchResult {
342            status: if downloaded_now {
343                FetchStatus::Downloaded
344            } else {
345                FetchStatus::AlreadyPresent
346            },
347            cache_path: resolved.cache_path.clone(),
348            expanded_path: None,
349            bytes: Some(fingerprint.bytes),
350            sha256: fingerprint.sha256,
351        })
352    }
353
354    pub fn exists(&self, request: &FetchRequest) -> Result<FetchState, String> {
355        let resolved = resolve_request_no_create(request)?;
356        exists_resolved(&resolved)
357    }
358}
359
360fn resolve_request(request: &FetchRequest) -> Result<ResolvedFetchRequest, String> {
361    Ok(ResolvedFetchRequest {
362        source: normalize_source(request.source.clone())?,
363        cache_path: canonical_destination(&request.destination_path)
364            .map_err(|e| e.to_string())?
365            .into_path_buf(),
366        expanded_path: request
367            .destination_path_expanded
368            .as_ref()
369            .map(|p| normalize_target(p, true))
370            .transpose()?,
371        expected_sha256: request.expected_sha256.clone().map(normalize_sha256),
372        archive_format: request.archive_format,
373        wait_mode: request.wait_mode,
374        dry_run: request.dry_run,
375        force: request.force,
376        download_options: request.download_options.clone(),
377    })
378}
379
380fn resolve_request_no_create(request: &FetchRequest) -> Result<ResolvedFetchRequest, String> {
381    Ok(ResolvedFetchRequest {
382        source: normalize_source(request.source.clone())?,
383        cache_path: normalize_target(&request.destination_path, false)?,
384        expanded_path: request
385            .destination_path_expanded
386            .as_ref()
387            .map(|p| normalize_target(p, false))
388            .transpose()?,
389        expected_sha256: request.expected_sha256.clone().map(normalize_sha256),
390        archive_format: request.archive_format,
391        wait_mode: request.wait_mode,
392        dry_run: request.dry_run,
393        force: request.force,
394        download_options: request.download_options.clone(),
395    })
396}
397
398fn normalize_target(path: &Path, create_parent: bool) -> Result<PathBuf, String> {
399    let absolute = if path.is_absolute() {
400        path.to_path_buf()
401    } else {
402        std::env::current_dir()
403            .map_err(|e| e.to_string())?
404            .join(path)
405    };
406    let file_name = absolute
407        .file_name()
408        .map(ToOwned::to_owned)
409        .ok_or_else(|| "path must include a terminal file or directory name".to_string())?;
410    let parent = absolute.parent().unwrap_or_else(|| Path::new("."));
411    let canonical_parent = if parent.exists() {
412        std::fs::canonicalize(parent).map_err(|e| e.to_string())?
413    } else if create_parent {
414        std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
415        std::fs::canonicalize(parent).map_err(|e| e.to_string())?
416    } else {
417        zccache_core::NormalizedPath::new(parent).into_path_buf()
418    };
419    Ok(canonical_parent.join(file_name))
420}
421
422fn normalize_sha256(value: String) -> String {
423    value.trim().to_ascii_lowercase()
424}
425
426fn normalize_source(source: DownloadSource) -> Result<DownloadSource, String> {
427    match source {
428        DownloadSource::Url(url) => {
429            if url.trim().is_empty() {
430                Err("download source URL must not be empty".to_string())
431            } else {
432                Ok(DownloadSource::Url(url))
433            }
434        }
435        DownloadSource::MultipartUrls(urls) => {
436            if urls.is_empty() {
437                return Err("multipart download source must include at least one URL".to_string());
438            }
439            if urls.iter().any(|url| url.trim().is_empty()) {
440                return Err("multipart download source contains an empty URL".to_string());
441            }
442            Ok(DownloadSource::MultipartUrls(urls))
443        }
444    }
445}
446
447fn exists_resolved(request: &ResolvedFetchRequest) -> Result<FetchState, String> {
448    let cache_exists = request.cache_path.exists();
449    let fingerprint = if cache_exists {
450        Some(read_or_compute_artifact_fingerprint(request)?)
451    } else {
452        None
453    };
454    let cache_valid = fingerprint
455        .as_ref()
456        .map(|fingerprint| artifact_matches_request(request, fingerprint))
457        .unwrap_or(false);
458    let bytes = fingerprint.as_ref().map(|fingerprint| fingerprint.bytes);
459    let sha256 = fingerprint
460        .as_ref()
461        .map(|fingerprint| fingerprint.sha256.clone());
462
463    if let Some(expanded_path) = &request.expanded_path {
464        if cache_valid
465            && expanded_marker_matches(
466                request,
467                fingerprint
468                    .as_ref()
469                    .ok_or_else(|| "missing artifact fingerprint".to_string())?,
470            )?
471            && expanded_path.exists()
472        {
473            return Ok(FetchState {
474                kind: FetchStateKind::ExpandedReady,
475                cache_path: request.cache_path.clone(),
476                expanded_path: Some(expanded_path.clone()),
477                bytes,
478                sha256,
479                reason: None,
480            });
481        }
482
483        if cache_valid {
484            return Ok(FetchState {
485                kind: FetchStateKind::ArtifactReady,
486                cache_path: request.cache_path.clone(),
487                expanded_path: Some(expanded_path.clone()),
488                bytes,
489                sha256,
490                reason: Some("expanded destination not ready".to_string()),
491            });
492        }
493    } else if cache_valid {
494        return Ok(FetchState {
495            kind: FetchStateKind::ArtifactReady,
496            cache_path: request.cache_path.clone(),
497            expanded_path: None,
498            bytes,
499            sha256,
500            reason: None,
501        });
502    }
503
504    if cache_exists {
505        return Ok(FetchState {
506            kind: FetchStateKind::Invalid,
507            cache_path: request.cache_path.clone(),
508            expanded_path: request.expanded_path.clone(),
509            bytes,
510            sha256,
511            reason: Some("artifact exists but failed validation".to_string()),
512        });
513    }
514
515    Ok(FetchState {
516        kind: FetchStateKind::Missing,
517        cache_path: request.cache_path.clone(),
518        expanded_path: request.expanded_path.clone(),
519        bytes: None,
520        sha256: None,
521        reason: None,
522    })
523}
524
525fn artifact_matches_request(
526    request: &ResolvedFetchRequest,
527    fingerprint: &ArtifactFingerprint,
528) -> bool {
529    request
530        .expected_sha256
531        .as_ref()
532        .map(|expected_sha256| fingerprint.sha256 == *expected_sha256)
533        .unwrap_or(true)
534}
535
536fn validate_artifact(request: &ResolvedFetchRequest) -> Result<ArtifactFingerprint, String> {
537    if !request.cache_path.exists() {
538        return Err(format!(
539            "downloaded artifact missing at {}",
540            request.cache_path.display()
541        ));
542    }
543    let fingerprint =
544        compute_artifact_fingerprint(&request.cache_path).map_err(|e| e.to_string())?;
545    if let Some(expected_sha256) = &request.expected_sha256 {
546        if fingerprint.sha256 != *expected_sha256 {
547            return Err(format!(
548                "sha256 mismatch for {}: expected {}, got {}",
549                request.cache_path.display(),
550                expected_sha256,
551                fingerprint.sha256
552            ));
553        }
554    }
555    Ok(fingerprint)
556}
557
558fn cleanup_invalid_fetch_state(request: &ResolvedFetchRequest) {
559    let _ = remove_path_if_exists(&request.cache_path);
560    let _ = remove_path_if_exists(&artifact_marker_path(&request.cache_path));
561    if let Some(expanded_path) = &request.expanded_path {
562        let _ = remove_path_if_exists(expanded_path);
563        let _ = remove_path_if_exists(&expanded_marker_path(expanded_path));
564    }
565}
566
567fn download_explicit_parts(part_urls: &[String], destination: &Path) -> Result<(), String> {
568    let temp_path = temp_download_path(destination);
569    let runtime = tokio::runtime::Builder::new_current_thread()
570        .enable_all()
571        .build()
572        .map_err(|e| format!("failed to create tokio runtime: {e}"))?;
573    runtime.block_on(async move {
574        let client = reqwest::Client::builder()
575            .user_agent(format!("zccache-download/{}", zccache_core::VERSION))
576            .build()
577            .map_err(|e| e.to_string())?;
578
579        if let Some(parent) = destination.parent() {
580            tokio::fs::create_dir_all(parent)
581                .await
582                .map_err(|e| e.to_string())?;
583        }
584
585        let _ = tokio::fs::remove_file(&temp_path).await;
586
587        let result = async {
588            let mut output = tokio::fs::File::create(&temp_path)
589                .await
590                .map_err(|e| e.to_string())?;
591            for url in part_urls {
592                let mut response = client
593                    .get(url)
594                    .header(ACCEPT_ENCODING, "identity")
595                    .send()
596                    .await
597                    .map_err(|e| e.to_string())?;
598                if !response.status().is_success() {
599                    return Err(format!("unexpected status {} for {url}", response.status()));
600                }
601                while let Some(chunk) = response.chunk().await.map_err(|e| e.to_string())? {
602                    output.write_all(&chunk).await.map_err(|e| e.to_string())?;
603                }
604            }
605            output.flush().await.map_err(|e| e.to_string())?;
606            drop(output);
607            if destination.exists() {
608                let _ = tokio::fs::remove_file(destination).await;
609            }
610            tokio::fs::rename(&temp_path, destination)
611                .await
612                .map_err(|e| e.to_string())
613        }
614        .await;
615
616        if result.is_err() {
617            let _ = tokio::fs::remove_file(&temp_path).await;
618        }
619        result
620    })
621}
622
623fn sha256_file(path: &Path) -> io::Result<String> {
624    let mut file = File::open(path)?;
625    let mut hasher = Sha256::new();
626    let mut buf = [0u8; 64 * 1024];
627    loop {
628        let n = file.read(&mut buf)?;
629        if n == 0 {
630            break;
631        }
632        hasher.update(&buf[..n]);
633    }
634    Ok(format!("{:x}", hasher.finalize()))
635}
636
637fn compute_artifact_fingerprint(path: &Path) -> io::Result<ArtifactFingerprint> {
638    let sha256 = sha256_file(path)?;
639    let bytes = fs::metadata(path)?.len();
640    Ok(ArtifactFingerprint { sha256, bytes })
641}
642
643fn temp_download_path(destination: &Path) -> PathBuf {
644    destination.with_extension(format!(
645        "{}part",
646        destination
647            .extension()
648            .map(|ext| format!("{}.", ext.to_string_lossy()))
649            .unwrap_or_default()
650    ))
651}
652
653struct FetchLock {
654    _file: File,
655}
656
657fn acquire_fetch_lock(request: &ResolvedFetchRequest) -> Result<FetchLock, String> {
658    let lock_path = fetch_lock_path(request);
659    if let Some(parent) = lock_path.parent() {
660        fs::create_dir_all(parent).map_err(|e| e.to_string())?;
661    }
662    let file = OpenOptions::new()
663        .read(true)
664        .write(true)
665        .create(true)
666        .truncate(false)
667        .open(&lock_path)
668        .map_err(|e| e.to_string())?;
669    match request.wait_mode {
670        WaitMode::Block => fs2::FileExt::lock_exclusive(&file).map_err(|e| e.to_string())?,
671        WaitMode::NoWait => {
672            if fs2::FileExt::try_lock_exclusive(&file).is_err() {
673                return Err("locked".to_string());
674            }
675        }
676    }
677    Ok(FetchLock { _file: file })
678}
679
680fn fetch_lock_path(request: &ResolvedFetchRequest) -> PathBuf {
681    let mut key = zccache_core::normalize_for_key(&request.cache_path);
682    if let Some(expanded_path) = &request.expanded_path {
683        key.push('\n');
684        key.push_str(&zccache_core::normalize_for_key(expanded_path));
685    }
686    let hash = stable_download_id(Path::new(&key));
687    zccache_core::config::default_cache_dir()
688        .join("downloads")
689        .join("locks")
690        .join(format!("{hash}.lock"))
691        .into_path_buf()
692}
693
694fn artifact_marker_path(cache_path: &Path) -> PathBuf {
695    let hash = stable_download_id(cache_path);
696    zccache_core::config::default_cache_dir()
697        .join("downloads")
698        .join("artifact-state")
699        .join(format!("{hash}.json"))
700        .into_path_buf()
701}
702
703fn expanded_marker_path(expanded_path: &Path) -> PathBuf {
704    let hash = stable_download_id(expanded_path);
705    zccache_core::config::default_cache_dir()
706        .join("downloads")
707        .join("expanded-state")
708        .join(format!("{hash}.json"))
709        .into_path_buf()
710}
711
712fn read_or_compute_artifact_fingerprint(
713    request: &ResolvedFetchRequest,
714) -> Result<ArtifactFingerprint, String> {
715    let fingerprint =
716        compute_artifact_fingerprint(&request.cache_path).map_err(|e| e.to_string())?;
717    if let Ok(content) = fs::read_to_string(artifact_marker_path(&request.cache_path)) {
718        let marker: ArtifactMarker = serde_json::from_str(&content).map_err(|e| e.to_string())?;
719        if marker.source != request.source
720            || marker.cache_path != request.cache_path.to_string_lossy()
721            || marker.sha256 != fingerprint.sha256
722            || marker.bytes != fingerprint.bytes
723        {
724            return Err(format!(
725                "artifact marker for {} does not match the on-disk payload",
726                request.cache_path.display()
727            ));
728        }
729    }
730    Ok(fingerprint)
731}
732
733fn write_artifact_marker(
734    request: &ResolvedFetchRequest,
735    fingerprint: &ArtifactFingerprint,
736) -> Result<(), String> {
737    let marker_path = artifact_marker_path(&request.cache_path);
738    if let Some(parent) = marker_path.parent() {
739        fs::create_dir_all(parent).map_err(|e| e.to_string())?;
740    }
741    let marker = ArtifactMarker {
742        source: request.source.clone(),
743        cache_path: request.cache_path.to_string_lossy().into_owned(),
744        sha256: fingerprint.sha256.clone(),
745        bytes: fingerprint.bytes,
746    };
747    let json = serde_json::to_string(&marker).map_err(|e| e.to_string())?;
748    fs::write(marker_path, json).map_err(|e| e.to_string())
749}
750
751fn expanded_marker_matches(
752    request: &ResolvedFetchRequest,
753    fingerprint: &ArtifactFingerprint,
754) -> Result<bool, String> {
755    let Some(expanded_path) = &request.expanded_path else {
756        return Ok(false);
757    };
758    let marker_path = expanded_marker_path(expanded_path);
759    let marker: ExpandedMarker = match fs::read_to_string(&marker_path) {
760        Ok(content) => serde_json::from_str(&content).map_err(|e| e.to_string())?,
761        Err(_) => return Ok(false),
762    };
763    if marker.source != request.source {
764        return Ok(false);
765    }
766    if marker.cache_path != request.cache_path.to_string_lossy() {
767        return Ok(false);
768    }
769    if marker.artifact_sha256 != fingerprint.sha256 {
770        return Ok(false);
771    }
772    if marker.archive_format != detect_archive_format(request)? {
773        return Ok(false);
774    }
775    Ok(expanded_path.exists())
776}
777
778fn write_expanded_marker(
779    request: &ResolvedFetchRequest,
780    fingerprint: &ArtifactFingerprint,
781) -> Result<(), String> {
782    let Some(expanded_path) = &request.expanded_path else {
783        return Ok(());
784    };
785    let marker_path = expanded_marker_path(expanded_path);
786    if let Some(parent) = marker_path.parent() {
787        fs::create_dir_all(parent).map_err(|e| e.to_string())?;
788    }
789    let marker = ExpandedMarker {
790        source: request.source.clone(),
791        cache_path: request.cache_path.to_string_lossy().into_owned(),
792        artifact_sha256: fingerprint.sha256.clone(),
793        archive_format: detect_archive_format(request)?,
794    };
795    let json = serde_json::to_string(&marker).map_err(|e| e.to_string())?;
796    fs::write(marker_path, json).map_err(|e| e.to_string())
797}
798
799fn detect_archive_format(request: &ResolvedFetchRequest) -> Result<ArchiveFormat, String> {
800    match request.archive_format {
801        ArchiveFormat::Auto => auto_archive_format(&request.cache_path),
802        other => Ok(other),
803    }
804}
805
806fn auto_archive_format(path: &Path) -> Result<ArchiveFormat, String> {
807    let name = path
808        .file_name()
809        .map(|n| n.to_string_lossy().to_ascii_lowercase())
810        .unwrap_or_default();
811    if name.ends_with(".tar.gz") {
812        Ok(ArchiveFormat::TarGz)
813    } else if name.ends_with(".tar.xz") {
814        Ok(ArchiveFormat::TarXz)
815    } else if name.ends_with(".tar.zst") || name.ends_with(".tzst") {
816        Ok(ArchiveFormat::TarZst)
817    } else if name.ends_with(".zip") {
818        Ok(ArchiveFormat::Zip)
819    } else if name.ends_with(".zst") {
820        Ok(ArchiveFormat::Zst)
821    } else if name.ends_with(".xz") {
822        Ok(ArchiveFormat::Xz)
823    } else if name.ends_with(".7z") {
824        Ok(ArchiveFormat::SevenZip)
825    } else {
826        Ok(ArchiveFormat::None)
827    }
828}
829
830fn extract_archive(request: &ResolvedFetchRequest, expanded_path: &Path) -> Result<(), String> {
831    match detect_archive_format(request)? {
832        ArchiveFormat::None => {
833            copy_file(&request.cache_path, expanded_path).map_err(|e| e.to_string())
834        }
835        ArchiveFormat::Zst => {
836            let input = File::open(&request.cache_path).map_err(|e| e.to_string())?;
837            let mut decoder = ruzstd::StreamingDecoder::new(input).map_err(|e| e.to_string())?;
838            write_decoded_to_file(&mut decoder, expanded_path).map_err(|e| e.to_string())
839        }
840        ArchiveFormat::Xz => {
841            let input = File::open(&request.cache_path).map_err(|e| e.to_string())?;
842            if let Some(parent) = expanded_path.parent() {
843                fs::create_dir_all(parent).map_err(|e| e.to_string())?;
844            }
845            let mut output = File::create(expanded_path).map_err(|e| e.to_string())?;
846            let mut input = io::BufReader::new(input);
847            lzma_rs::xz_decompress(&mut input, &mut output).map_err(|e| e.to_string())
848        }
849        ArchiveFormat::Zip => extract_zip(&request.cache_path, expanded_path),
850        ArchiveFormat::TarGz => {
851            let input = File::open(&request.cache_path).map_err(|e| e.to_string())?;
852            let decoder = flate2::read::GzDecoder::new(input);
853            extract_tar(decoder, expanded_path)
854        }
855        ArchiveFormat::TarXz => {
856            let input = File::open(&request.cache_path).map_err(|e| e.to_string())?;
857            let mut decoded = Vec::new();
858            let mut input = io::BufReader::new(input);
859            lzma_rs::xz_decompress(&mut input, &mut decoded).map_err(|e| e.to_string())?;
860            extract_tar(io::Cursor::new(decoded), expanded_path)
861        }
862        ArchiveFormat::TarZst => {
863            let input = File::open(&request.cache_path).map_err(|e| e.to_string())?;
864            let decoder = ruzstd::StreamingDecoder::new(input).map_err(|e| e.to_string())?;
865            extract_tar(decoder, expanded_path)
866        }
867        ArchiveFormat::SevenZip => extract_7z(&request.cache_path, expanded_path),
868        ArchiveFormat::Auto => Err("archive format auto-detection failed".to_string()),
869    }
870}
871
872fn extract_7z(archive_path: &Path, destination: &Path) -> Result<(), String> {
873    fs::create_dir_all(destination).map_err(|e| e.to_string())?;
874    let base = destination.to_path_buf();
875    sevenz_rust::decompress_file_with_extract_fn(
876        archive_path,
877        destination,
878        move |entry, reader, _default_dest| {
879            let relative = Path::new(entry.name());
880            let out_path = safe_join(&base, relative).map_err(std::io::Error::other)?;
881            if entry.is_directory() {
882                fs::create_dir_all(&out_path)?;
883                return Ok(true);
884            }
885            if let Some(parent) = out_path.parent() {
886                fs::create_dir_all(parent)?;
887            }
888            let mut output = File::create(&out_path)?;
889            io::copy(reader, &mut output)?;
890            output.flush()?;
891            Ok(true)
892        },
893    )
894    .map_err(|e| e.to_string())
895}
896
897fn write_decoded_to_file(reader: &mut dyn Read, destination: &Path) -> io::Result<()> {
898    if let Some(parent) = destination.parent() {
899        fs::create_dir_all(parent)?;
900    }
901    let mut output = File::create(destination)?;
902    io::copy(reader, &mut output)?;
903    output.flush()?;
904    Ok(())
905}
906
907fn copy_file(source: &Path, destination: &Path) -> io::Result<()> {
908    if let Some(parent) = destination.parent() {
909        fs::create_dir_all(parent)?;
910    }
911    fs::copy(source, destination)?;
912    Ok(())
913}
914
915fn extract_zip(archive_path: &Path, destination: &Path) -> Result<(), String> {
916    fs::create_dir_all(destination).map_err(|e| e.to_string())?;
917    let file = File::open(archive_path).map_err(|e| e.to_string())?;
918    let mut zip = zip::ZipArchive::new(file).map_err(|e| e.to_string())?;
919    for i in 0..zip.len() {
920        let mut entry = zip.by_index(i).map_err(|e| e.to_string())?;
921        let name = entry
922            .enclosed_name()
923            .map(|p| p.to_path_buf())
924            .ok_or_else(|| format!("unsafe zip entry: {}", entry.name()))?;
925        let out_path = safe_join(destination, &name)?;
926        if entry.is_dir() {
927            fs::create_dir_all(&out_path).map_err(|e| e.to_string())?;
928            continue;
929        }
930        if let Some(mode) = entry.unix_mode() {
931            if (mode & 0o170000) == 0o120000 {
932                return Err(format!(
933                    "zip symlink entries are not allowed: {}",
934                    entry.name()
935                ));
936            }
937        }
938        if let Some(parent) = out_path.parent() {
939            fs::create_dir_all(parent).map_err(|e| e.to_string())?;
940        }
941        let mut out = File::create(&out_path).map_err(|e| e.to_string())?;
942        io::copy(&mut entry, &mut out).map_err(|e| e.to_string())?;
943    }
944    Ok(())
945}
946
947fn extract_tar<R: Read>(reader: R, destination: &Path) -> Result<(), String> {
948    fs::create_dir_all(destination).map_err(|e| e.to_string())?;
949    let mut archive = tar::Archive::new(reader);
950    let entries = archive.entries().map_err(|e| e.to_string())?;
951    for item in entries {
952        let mut entry = item.map_err(|e| e.to_string())?;
953        let path = entry.path().map_err(|e| e.to_string())?;
954        let out_path = safe_join(destination, &path)?;
955        let entry_type = entry.header().entry_type();
956        if entry_type.is_symlink() || entry_type.is_hard_link() {
957            return Err(format!(
958                "tar link entries are not allowed: {}",
959                path.display()
960            ));
961        }
962        if entry_type.is_dir() {
963            fs::create_dir_all(&out_path).map_err(|e| e.to_string())?;
964            continue;
965        }
966        if let Some(parent) = out_path.parent() {
967            fs::create_dir_all(parent).map_err(|e| e.to_string())?;
968        }
969        let mut out = File::create(&out_path).map_err(|e| e.to_string())?;
970        io::copy(&mut entry, &mut out).map_err(|e| e.to_string())?;
971    }
972    Ok(())
973}
974
975fn safe_join(base: &Path, entry: &Path) -> Result<PathBuf, String> {
976    if entry.is_absolute() {
977        return Err(format!(
978            "absolute archive entry is not allowed: {}",
979            entry.display()
980        ));
981    }
982    let mut clean = PathBuf::new();
983    for component in entry.components() {
984        match component {
985            Component::Normal(part) => clean.push(part),
986            Component::CurDir => {}
987            _ => return Err(format!("unsafe archive entry: {}", entry.display())),
988        }
989    }
990    Ok(base.join(clean))
991}
992
993fn remove_path_if_exists(path: &Path) -> Result<(), String> {
994    if !path.exists() {
995        return Ok(());
996    }
997    if path.is_dir() {
998        fs::remove_dir_all(path).map_err(|e| e.to_string())
999    } else {
1000        fs::remove_file(path).map_err(|e| e.to_string())
1001    }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006    use super::*;
1007
1008    use std::net::{TcpListener, TcpStream};
1009    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1010    use std::sync::Arc;
1011    use std::thread;
1012    use std::time::{Duration, Instant};
1013
1014    use flate2::write::GzEncoder;
1015    use flate2::Compression;
1016    use zccache_download_daemon::DownloadDaemon;
1017
1018    #[derive(Clone)]
1019    struct TestHttpConfig {
1020        body: Arc<Vec<u8>>,
1021        accept_ranges: bool,
1022        send_content_length: bool,
1023        chunk_size: usize,
1024        chunk_delay: Duration,
1025        path: String,
1026        request_started: Option<Arc<AtomicBool>>,
1027        release_response: Option<Arc<AtomicBool>>,
1028    }
1029
1030    struct TestHttpServer {
1031        url: String,
1032        request_count: Arc<AtomicUsize>,
1033        range_request_count: Arc<AtomicUsize>,
1034        shutdown: Arc<AtomicBool>,
1035        thread: Option<thread::JoinHandle<()>>,
1036    }
1037
1038    impl TestHttpServer {
1039        fn start(config: TestHttpConfig) -> Self {
1040            let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1041            let addr = listener.local_addr().unwrap();
1042            listener.set_nonblocking(true).unwrap();
1043            let url = format!("http://{addr}/{}", config.path);
1044            let request_count = Arc::new(AtomicUsize::new(0));
1045            let range_request_count = Arc::new(AtomicUsize::new(0));
1046            let shutdown = Arc::new(AtomicBool::new(false));
1047            let request_count_clone = Arc::clone(&request_count);
1048            let range_request_count_clone = Arc::clone(&range_request_count);
1049            let shutdown_clone = Arc::clone(&shutdown);
1050            let config_for_thread = config.clone();
1051            let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel(1);
1052            let thread = thread::spawn(move || {
1053                ready_tx.send(()).unwrap();
1054                while !shutdown_clone.load(Ordering::Relaxed) {
1055                    match listener.accept() {
1056                        Ok((stream, _)) => {
1057                            let config = config_for_thread.clone();
1058                            let request_count = Arc::clone(&request_count_clone);
1059                            let range_request_count = Arc::clone(&range_request_count_clone);
1060                            thread::spawn(move || {
1061                                let _ = handle_test_http_connection(
1062                                    stream,
1063                                    config,
1064                                    request_count,
1065                                    range_request_count,
1066                                );
1067                            });
1068                        }
1069                        Err(err)
1070                            if matches!(
1071                                err.kind(),
1072                                io::ErrorKind::WouldBlock
1073                                    | io::ErrorKind::Interrupted
1074                                    | io::ErrorKind::ConnectionAborted
1075                                    | io::ErrorKind::ConnectionReset
1076                            ) =>
1077                        {
1078                            // Windows can surface transient listener errors while the probe
1079                            // connection is racing with the first real client request.
1080                            thread::sleep(Duration::from_millis(10));
1081                        }
1082                        Err(_) => break,
1083                    }
1084                }
1085            });
1086            ready_rx
1087                .recv_timeout(Duration::from_secs(1))
1088                .expect("test http server failed to start");
1089            wait_for_test_http_server(&addr, &config.path);
1090            request_count.store(0, Ordering::Relaxed);
1091            range_request_count.store(0, Ordering::Relaxed);
1092            Self {
1093                url,
1094                request_count,
1095                range_request_count,
1096                shutdown,
1097                thread: Some(thread),
1098            }
1099        }
1100
1101        fn request_count(&self) -> usize {
1102            self.request_count.load(Ordering::Relaxed)
1103        }
1104
1105        fn range_request_count(&self) -> usize {
1106            self.range_request_count.load(Ordering::Relaxed)
1107        }
1108    }
1109
1110    fn wait_for_test_http_server(addr: &std::net::SocketAddr, path: &str) {
1111        let deadline = Instant::now() + Duration::from_secs(1);
1112        let request = format!("HEAD /{path} HTTP/1.1\r\nHost: {addr}\r\nConnection: close\r\n\r\n");
1113        while Instant::now() < deadline {
1114            if let Ok(mut stream) = TcpStream::connect(addr) {
1115                if stream
1116                    .set_read_timeout(Some(Duration::from_millis(100)))
1117                    .is_err()
1118                {
1119                    thread::sleep(Duration::from_millis(10));
1120                    continue;
1121                }
1122                if stream
1123                    .set_write_timeout(Some(Duration::from_millis(100)))
1124                    .is_err()
1125                {
1126                    thread::sleep(Duration::from_millis(10));
1127                    continue;
1128                }
1129                if stream.write_all(request.as_bytes()).is_err() {
1130                    thread::sleep(Duration::from_millis(10));
1131                    continue;
1132                }
1133                let mut response = Vec::new();
1134                let mut buf = [0u8; 256];
1135                loop {
1136                    match stream.read(&mut buf) {
1137                        Ok(0) => break,
1138                        Ok(n) => {
1139                            response.extend_from_slice(&buf[..n]);
1140                            if response.windows(4).any(|window| window == b"\r\n\r\n") {
1141                                return;
1142                            }
1143                        }
1144                        Err(err)
1145                            if err.kind() == io::ErrorKind::WouldBlock
1146                                || err.kind() == io::ErrorKind::TimedOut =>
1147                        {
1148                            break;
1149                        }
1150                        Err(_) => break,
1151                    }
1152                }
1153            }
1154            thread::sleep(Duration::from_millis(10));
1155        }
1156        panic!("test http server at {addr} did not respond in time");
1157    }
1158
1159    fn wait_for_test_condition(
1160        timeout: Duration,
1161        description: &str,
1162        mut predicate: impl FnMut() -> bool,
1163    ) {
1164        let deadline = Instant::now() + timeout;
1165        while Instant::now() < deadline {
1166            if predicate() {
1167                return;
1168            }
1169            thread::sleep(Duration::from_millis(10));
1170        }
1171        panic!("timed out waiting for {description}");
1172    }
1173
1174    impl Drop for TestHttpServer {
1175        fn drop(&mut self) {
1176            self.shutdown.store(true, Ordering::Relaxed);
1177            let _ = TcpStream::connect(
1178                self.url
1179                    .trim_start_matches("http://")
1180                    .split('/')
1181                    .next()
1182                    .unwrap_or_default(),
1183            );
1184            if let Some(thread) = self.thread.take() {
1185                let _ = thread.join();
1186            }
1187        }
1188    }
1189
1190    fn handle_test_http_connection(
1191        mut stream: TcpStream,
1192        config: TestHttpConfig,
1193        request_count: Arc<AtomicUsize>,
1194        range_request_count: Arc<AtomicUsize>,
1195    ) -> io::Result<()> {
1196        let mut request = Vec::new();
1197        let mut buf = [0u8; 4096];
1198        loop {
1199            let n = stream.read(&mut buf)?;
1200            if n == 0 {
1201                return Ok(());
1202            }
1203            request.extend_from_slice(&buf[..n]);
1204            if request.windows(4).any(|window| window == b"\r\n\r\n") {
1205                break;
1206            }
1207        }
1208        request_count.fetch_add(1, Ordering::Relaxed);
1209        let request_text = String::from_utf8_lossy(&request);
1210        let mut lines = request_text.lines();
1211        let request_line = lines.next().unwrap_or_default();
1212        let mut parts = request_line.split_whitespace();
1213        let method = parts.next().unwrap_or_default();
1214        let range_header = request_text.lines().find_map(|line| {
1215            let (name, value) = line.split_once(':')?;
1216            if name.eq_ignore_ascii_case("range") {
1217                Some(value.trim().to_string())
1218            } else {
1219                None
1220            }
1221        });
1222
1223        let mut body = (*config.body).clone();
1224        let mut status_line = "HTTP/1.1 200 OK\r\n".to_string();
1225        let mut content_range = None;
1226        if let Some(range) = range_header {
1227            if config.accept_ranges {
1228                if let Some((start, end)) = parse_range(&range, body.len() as u64) {
1229                    range_request_count.fetch_add(1, Ordering::Relaxed);
1230                    status_line = "HTTP/1.1 206 Partial Content\r\n".to_string();
1231                    content_range = Some(format!("bytes {start}-{end}/{}", body.len()));
1232                    body = body[start as usize..=end as usize].to_vec();
1233                }
1234            }
1235        }
1236
1237        let mut headers = String::new();
1238        headers.push_str("Connection: close\r\n");
1239        headers.push_str("Content-Type: application/octet-stream\r\n");
1240        if config.accept_ranges {
1241            headers.push_str("Accept-Ranges: bytes\r\n");
1242        }
1243        if config.send_content_length {
1244            headers.push_str(&format!("Content-Length: {}\r\n", body.len()));
1245        }
1246        if let Some(content_range) = content_range {
1247            headers.push_str(&format!("Content-Range: {content_range}\r\n"));
1248        }
1249
1250        stream.write_all(status_line.as_bytes())?;
1251        stream.write_all(headers.as_bytes())?;
1252        stream.write_all(b"\r\n")?;
1253
1254        if method.eq_ignore_ascii_case("HEAD") {
1255            stream.flush()?;
1256            return Ok(());
1257        }
1258
1259        let first_body_request = config
1260            .request_started
1261            .as_ref()
1262            .map(|request_started| {
1263                request_started
1264                    .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1265                    .is_ok()
1266            })
1267            .unwrap_or(false);
1268        if first_body_request {
1269            if let Some(release_response) = &config.release_response {
1270                while !release_response.load(Ordering::Acquire) {
1271                    thread::sleep(Duration::from_millis(5));
1272                }
1273            }
1274        }
1275
1276        if config.chunk_size == 0 {
1277            stream.write_all(&body)?;
1278        } else {
1279            for chunk in body.chunks(config.chunk_size) {
1280                stream.write_all(chunk)?;
1281                stream.flush()?;
1282                if !config.chunk_delay.is_zero() {
1283                    thread::sleep(config.chunk_delay);
1284                }
1285            }
1286        }
1287        stream.flush()?;
1288        Ok(())
1289    }
1290
1291    fn parse_range(header: &str, total_len: u64) -> Option<(u64, u64)> {
1292        let range = header.strip_prefix("bytes=")?;
1293        let (start, end) = range.split_once('-')?;
1294        let start = start.parse::<u64>().ok()?;
1295        let end = if end.is_empty() {
1296            total_len.checked_sub(1)?
1297        } else {
1298            end.parse::<u64>().ok()?
1299        };
1300        if start > end || end >= total_len {
1301            return None;
1302        }
1303        Some((start, end))
1304    }
1305
1306    struct TestDaemon {
1307        endpoint: String,
1308        shutdown: Arc<tokio::sync::Notify>,
1309        thread: Option<thread::JoinHandle<()>>,
1310    }
1311
1312    impl TestDaemon {
1313        fn start() -> Self {
1314            let endpoint = unique_test_endpoint();
1315            let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel(1);
1316            let endpoint_for_thread = endpoint.clone();
1317            let thread = thread::spawn(move || {
1318                let runtime = tokio::runtime::Builder::new_current_thread()
1319                    .enable_all()
1320                    .build()
1321                    .unwrap();
1322                runtime.block_on(async move {
1323                    let mut daemon = DownloadDaemon::bind(&endpoint_for_thread).unwrap();
1324                    ready_tx.send(daemon.shutdown_handle()).unwrap();
1325                    daemon.run().await.unwrap();
1326                });
1327            });
1328            let shutdown = ready_rx
1329                .recv_timeout(Duration::from_secs(5))
1330                .expect("download daemon failed to bind");
1331            let client = DownloadClient::new(Some(endpoint.clone()));
1332            let deadline = Instant::now() + Duration::from_secs(5);
1333            while Instant::now() < deadline {
1334                if client.daemon_status().is_ok() {
1335                    return Self {
1336                        endpoint,
1337                        shutdown,
1338                        thread: Some(thread),
1339                    };
1340                }
1341                thread::sleep(Duration::from_millis(50));
1342            }
1343            panic!("download daemon did not start in time");
1344        }
1345    }
1346
1347    impl Drop for TestDaemon {
1348        fn drop(&mut self) {
1349            self.shutdown.notify_one();
1350            if let Some(thread) = self.thread.take() {
1351                let _ = thread.join();
1352            }
1353        }
1354    }
1355
1356    fn unique_test_endpoint() -> String {
1357        static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
1358        let id = NEXT_ID.fetch_add(1, Ordering::Relaxed);
1359        #[cfg(windows)]
1360        {
1361            format!(
1362                r"\\.\pipe\zccache-download-test-{}-{id}",
1363                std::process::id()
1364            )
1365        }
1366        #[cfg(unix)]
1367        {
1368            std::env::temp_dir()
1369                .join(format!(
1370                    "zccache-download-test-{}-{id}.sock",
1371                    std::process::id()
1372                ))
1373                .display()
1374                .to_string()
1375        }
1376    }
1377
1378    fn sha256_hex(data: &[u8]) -> String {
1379        let mut hasher = Sha256::new();
1380        hasher.update(data);
1381        format!("{:x}", hasher.finalize())
1382    }
1383
1384    #[test]
1385    fn auto_detect_archive_formats() {
1386        assert_eq!(
1387            auto_archive_format(Path::new("toolchain.tar.gz")).unwrap(),
1388            ArchiveFormat::TarGz
1389        );
1390        assert_eq!(
1391            auto_archive_format(Path::new("toolchain.tar.xz")).unwrap(),
1392            ArchiveFormat::TarXz
1393        );
1394        assert_eq!(
1395            auto_archive_format(Path::new("toolchain.tar.zst")).unwrap(),
1396            ArchiveFormat::TarZst
1397        );
1398        assert_eq!(
1399            auto_archive_format(Path::new("toolchain.zip")).unwrap(),
1400            ArchiveFormat::Zip
1401        );
1402        assert_eq!(
1403            auto_archive_format(Path::new("toolchain.7z")).unwrap(),
1404            ArchiveFormat::SevenZip
1405        );
1406    }
1407
1408    #[test]
1409    fn safe_join_rejects_parent_traversal() {
1410        let err = safe_join(Path::new("out"), Path::new("../evil")).unwrap_err();
1411        assert!(err.contains("unsafe"));
1412    }
1413
1414    #[test]
1415    fn zip_extraction_rejects_path_traversal() {
1416        let dir = tempfile::tempdir().unwrap();
1417        let archive = dir.path().join("bad.zip");
1418        {
1419            let file = File::create(&archive).unwrap();
1420            let mut zip = zip::ZipWriter::new(file);
1421            let options = zip::write::SimpleFileOptions::default();
1422            zip.start_file("../evil.txt", options).unwrap();
1423            zip.write_all(b"bad").unwrap();
1424            zip.finish().unwrap();
1425        }
1426        let out = dir.path().join("extract");
1427        let err = extract_zip(&archive, &out).unwrap_err();
1428        assert!(err.contains("unsafe zip entry"));
1429    }
1430
1431    #[test]
1432    fn tar_gz_extracts_regular_files() {
1433        let dir = tempfile::tempdir().unwrap();
1434        let archive = dir.path().join("ok.tar.gz");
1435        {
1436            let file = File::create(&archive).unwrap();
1437            let encoder = GzEncoder::new(file, Compression::default());
1438            let mut builder = tar::Builder::new(encoder);
1439            let data = b"hello";
1440            let mut header = tar::Header::new_gnu();
1441            header.set_size(data.len() as u64);
1442            header.set_mode(0o644);
1443            header.set_cksum();
1444            builder
1445                .append_data(&mut header, "bin/tool.txt", &data[..])
1446                .unwrap();
1447            builder.finish().unwrap();
1448        }
1449        let out = dir.path().join("extract");
1450        let file = File::open(&archive).unwrap();
1451        let decoder = flate2::read::GzDecoder::new(file);
1452        extract_tar(decoder, &out).unwrap();
1453        assert_eq!(
1454            fs::read(out.join("bin").join("tool.txt")).unwrap(),
1455            b"hello"
1456        );
1457    }
1458
1459    #[test]
1460    fn fetch_cache_miss_then_hit_and_exists_stay_local() {
1461        let daemon = TestDaemon::start();
1462        let body = b"artifact payload".to_vec();
1463        let server = TestHttpServer::start(TestHttpConfig {
1464            body: Arc::new(body.clone()),
1465            accept_ranges: false,
1466            send_content_length: true,
1467            chunk_size: 0,
1468            chunk_delay: Duration::ZERO,
1469            path: "artifact.bin".to_string(),
1470            request_started: None,
1471            release_response: None,
1472        });
1473        let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1474        let dir = tempfile::tempdir().unwrap();
1475        let mut request = FetchRequest::new(server.url.clone(), dir.path().join("artifact.bin"));
1476        request.expected_sha256 = Some(sha256_hex(&body));
1477
1478        let first = client.fetch(request.clone()).unwrap();
1479        assert_eq!(first.status, FetchStatus::Downloaded);
1480        assert_eq!(first.sha256, sha256_hex(&body));
1481        let requests_after_first = server.request_count();
1482        assert!(requests_after_first > 0);
1483
1484        let second = client.fetch(request.clone()).unwrap();
1485        assert_eq!(second.status, FetchStatus::AlreadyPresent);
1486        assert_eq!(server.request_count(), requests_after_first);
1487
1488        let state = client.exists(&request).unwrap();
1489        assert_eq!(state.kind, FetchStateKind::ArtifactReady);
1490        assert_eq!(state.sha256.as_deref(), Some(first.sha256.as_str()));
1491        assert_eq!(server.request_count(), requests_after_first);
1492    }
1493
1494    #[test]
1495    fn fetch_checksum_mismatch_cleans_up_invalid_artifact() {
1496        let daemon = TestDaemon::start();
1497        let body = b"wrong checksum body".to_vec();
1498        let server = TestHttpServer::start(TestHttpConfig {
1499            body: Arc::new(body),
1500            accept_ranges: false,
1501            send_content_length: true,
1502            chunk_size: 0,
1503            chunk_delay: Duration::ZERO,
1504            path: "bad.bin".to_string(),
1505            request_started: None,
1506            release_response: None,
1507        });
1508        let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1509        let dir = tempfile::tempdir().unwrap();
1510        let destination = dir.path().join("bad.bin");
1511        let mut request = FetchRequest::new(server.url.clone(), &destination);
1512        request.expected_sha256 = Some("00".repeat(32));
1513
1514        let err = client.fetch(request.clone()).unwrap_err();
1515        assert!(err.contains("sha256 mismatch"));
1516        assert!(!destination.exists());
1517
1518        let state = client.exists(&request).unwrap();
1519        assert_eq!(state.kind, FetchStateKind::Missing);
1520    }
1521
1522    #[test]
1523    fn fetch_single_url_max_connections_uses_range_requests() {
1524        let daemon = TestDaemon::start();
1525        let body: Vec<u8> = (0..128 * 1024).map(|i| (i % 251) as u8).collect();
1526        let server = TestHttpServer::start(TestHttpConfig {
1527            body: Arc::new(body.clone()),
1528            accept_ranges: true,
1529            send_content_length: true,
1530            chunk_size: 4096,
1531            chunk_delay: Duration::ZERO,
1532            path: "multipart.bin".to_string(),
1533            request_started: None,
1534            release_response: None,
1535        });
1536        let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1537        let dir = tempfile::tempdir().unwrap();
1538        let mut request = FetchRequest::new(server.url.clone(), dir.path().join("multipart.bin"));
1539        request.download_options.max_connections = Some(4);
1540        request.download_options.min_segment_size = Some(1024);
1541        request.expected_sha256 = Some(sha256_hex(&body));
1542
1543        let result = client.fetch(request).unwrap();
1544        assert_eq!(result.status, FetchStatus::Downloaded);
1545        assert_eq!(result.sha256, sha256_hex(&body));
1546        assert!(server.range_request_count() >= 2);
1547    }
1548
1549    #[test]
1550    fn fetch_explicit_multipart_urls_concatenates_and_stays_local() {
1551        let daemon = TestDaemon::start();
1552        let part_a = b"hello ".to_vec();
1553        let part_b = b"multipart ".to_vec();
1554        let part_c = b"world".to_vec();
1555        let mut full = Vec::new();
1556        full.extend_from_slice(&part_a);
1557        full.extend_from_slice(&part_b);
1558        full.extend_from_slice(&part_c);
1559
1560        let server_a = TestHttpServer::start(TestHttpConfig {
1561            body: Arc::new(part_a),
1562            accept_ranges: false,
1563            send_content_length: true,
1564            chunk_size: 0,
1565            chunk_delay: Duration::ZERO,
1566            path: "artifact.part-aa".to_string(),
1567            request_started: None,
1568            release_response: None,
1569        });
1570        let server_b = TestHttpServer::start(TestHttpConfig {
1571            body: Arc::new(part_b),
1572            accept_ranges: false,
1573            send_content_length: true,
1574            chunk_size: 0,
1575            chunk_delay: Duration::ZERO,
1576            path: "artifact.part-ab".to_string(),
1577            request_started: None,
1578            release_response: None,
1579        });
1580        let server_c = TestHttpServer::start(TestHttpConfig {
1581            body: Arc::new(part_c),
1582            accept_ranges: false,
1583            send_content_length: true,
1584            chunk_size: 0,
1585            chunk_delay: Duration::ZERO,
1586            path: "artifact.part-ac".to_string(),
1587            request_started: None,
1588            release_response: None,
1589        });
1590
1591        let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1592        let dir = tempfile::tempdir().unwrap();
1593        let destination = dir.path().join("artifact.bin");
1594        let mut request = FetchRequest::new(
1595            vec![
1596                server_a.url.clone(),
1597                server_b.url.clone(),
1598                server_c.url.clone(),
1599            ],
1600            &destination,
1601        );
1602        request.expected_sha256 = Some(sha256_hex(&full));
1603
1604        let first = client.fetch(request.clone()).unwrap();
1605        assert_eq!(first.status, FetchStatus::Downloaded);
1606        assert_eq!(first.sha256, sha256_hex(&full));
1607        assert_eq!(fs::read(&destination).unwrap(), full);
1608        let request_counts = (
1609            server_a.request_count(),
1610            server_b.request_count(),
1611            server_c.request_count(),
1612        );
1613
1614        let second = client.fetch(request.clone()).unwrap();
1615        assert_eq!(second.status, FetchStatus::AlreadyPresent);
1616        assert_eq!(
1617            (
1618                server_a.request_count(),
1619                server_b.request_count(),
1620                server_c.request_count()
1621            ),
1622            request_counts
1623        );
1624
1625        let state = client.exists(&request).unwrap();
1626        assert_eq!(state.kind, FetchStateKind::ArtifactReady);
1627        assert_eq!(state.sha256.as_deref(), Some(first.sha256.as_str()));
1628    }
1629
1630    #[test]
1631    fn fetch_no_wait_returns_locked_while_other_client_is_downloading() {
1632        let daemon = TestDaemon::start();
1633        let request_started = Arc::new(AtomicBool::new(false));
1634        let release_response = Arc::new(AtomicBool::new(false));
1635        let body: Vec<u8> = (0..512 * 1024).map(|i| (i % 251) as u8).collect();
1636        let server = TestHttpServer::start(TestHttpConfig {
1637            body: Arc::new(body),
1638            accept_ranges: false,
1639            send_content_length: true,
1640            chunk_size: 4096,
1641            chunk_delay: Duration::from_millis(2),
1642            path: "slow.bin".to_string(),
1643            request_started: Some(Arc::clone(&request_started)),
1644            release_response: Some(Arc::clone(&release_response)),
1645        });
1646        let dest_dir = tempfile::tempdir().unwrap();
1647        let destination = dest_dir.path().join("slow.bin");
1648
1649        let endpoint = daemon.endpoint.clone();
1650        let url = server.url.clone();
1651        let destination_for_thread = destination.clone();
1652        let download_thread = thread::spawn(move || {
1653            let client = DownloadClient::new(Some(endpoint));
1654            let request = FetchRequest::new(url, &destination_for_thread);
1655            client.fetch(request)
1656        });
1657
1658        wait_for_test_condition(Duration::from_secs(5), "initial download request", || {
1659            request_started.load(Ordering::Acquire)
1660        });
1661
1662        let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1663        let mut no_wait = FetchRequest::new(server.url.clone(), &destination);
1664        no_wait.wait_mode = WaitMode::NoWait;
1665        let locked = client.fetch(no_wait).unwrap();
1666        assert_eq!(locked.status, FetchStatus::Locked);
1667
1668        release_response.store(true, Ordering::Release);
1669        let completed = download_thread.join().unwrap().unwrap();
1670        assert_eq!(completed.status, FetchStatus::Downloaded);
1671    }
1672
1673    #[test]
1674    fn fetch_multipart_no_wait_returns_locked_while_other_client_is_downloading() {
1675        let daemon = TestDaemon::start();
1676        let request_started = Arc::new(AtomicBool::new(false));
1677        let release_response = Arc::new(AtomicBool::new(false));
1678        let slow_server = TestHttpServer::start(TestHttpConfig {
1679            body: Arc::new((0..512 * 1024).map(|i| (i % 251) as u8).collect()),
1680            accept_ranges: false,
1681            send_content_length: true,
1682            chunk_size: 4096,
1683            chunk_delay: Duration::from_millis(2),
1684            path: "slow.part-aa".to_string(),
1685            request_started: Some(Arc::clone(&request_started)),
1686            release_response: Some(Arc::clone(&release_response)),
1687        });
1688        let fast_server = TestHttpServer::start(TestHttpConfig {
1689            body: Arc::new(b"tail".to_vec()),
1690            accept_ranges: false,
1691            send_content_length: true,
1692            chunk_size: 0,
1693            chunk_delay: Duration::ZERO,
1694            path: "slow.part-ab".to_string(),
1695            request_started: None,
1696            release_response: None,
1697        });
1698        let dest_dir = tempfile::tempdir().unwrap();
1699        let destination = dest_dir.path().join("slow.bin");
1700
1701        let endpoint = daemon.endpoint.clone();
1702        let source = vec![slow_server.url.clone(), fast_server.url.clone()];
1703        let destination_for_thread = destination.clone();
1704        let download_thread = thread::spawn(move || {
1705            let client = DownloadClient::new(Some(endpoint));
1706            let request = FetchRequest::new(source, &destination_for_thread);
1707            client.fetch(request)
1708        });
1709
1710        wait_for_test_condition(
1711            Duration::from_secs(5),
1712            "initial multipart download request",
1713            || request_started.load(Ordering::Acquire),
1714        );
1715
1716        let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1717        let mut no_wait = FetchRequest::new(
1718            vec![slow_server.url.clone(), fast_server.url.clone()],
1719            &destination,
1720        );
1721        no_wait.wait_mode = WaitMode::NoWait;
1722        let locked = client.fetch(no_wait).unwrap();
1723        assert_eq!(locked.status, FetchStatus::Locked);
1724
1725        release_response.store(true, Ordering::Release);
1726        let completed = download_thread.join().unwrap().unwrap();
1727        assert_eq!(completed.status, FetchStatus::Downloaded);
1728    }
1729
1730    #[test]
1731    fn fetch_dry_run_avoids_network_and_filesystem_mutation() {
1732        let daemon = TestDaemon::start();
1733        let server = TestHttpServer::start(TestHttpConfig {
1734            body: Arc::new(b"dry-run".to_vec()),
1735            accept_ranges: false,
1736            send_content_length: true,
1737            chunk_size: 0,
1738            chunk_delay: Duration::ZERO,
1739            path: "dry.bin".to_string(),
1740            request_started: None,
1741            release_response: None,
1742        });
1743        let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1744        let dir = tempfile::tempdir().unwrap();
1745        let destination = dir.path().join("dry.bin");
1746        let mut request = FetchRequest::new(server.url.clone(), &destination);
1747        request.dry_run = true;
1748
1749        let result = client.fetch(request).unwrap();
1750        assert_eq!(result.status, FetchStatus::DryRun);
1751        assert_eq!(server.request_count(), 0);
1752        assert!(!destination.exists());
1753    }
1754
1755    #[test]
1756    fn fetch_expands_7z_and_exists_reports_expanded_ready() {
1757        let daemon = TestDaemon::start();
1758        let dir = tempfile::tempdir().unwrap();
1759        let source_dir = dir.path().join("source");
1760        fs::create_dir_all(source_dir.join("bin")).unwrap();
1761        fs::write(source_dir.join("bin").join("tool.txt"), b"tool data").unwrap();
1762        let archive_path = dir.path().join("toolchain.7z");
1763        sevenz_rust::compress_to_path(&source_dir, &archive_path).unwrap();
1764        let archive_bytes = fs::read(&archive_path).unwrap();
1765
1766        let server = TestHttpServer::start(TestHttpConfig {
1767            body: Arc::new(archive_bytes.clone()),
1768            accept_ranges: false,
1769            send_content_length: true,
1770            chunk_size: 0,
1771            chunk_delay: Duration::ZERO,
1772            path: "toolchain.7z".to_string(),
1773            request_started: None,
1774            release_response: None,
1775        });
1776        let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1777        let cache_path = dir.path().join("cache").join("toolchain.7z");
1778        let expanded_path = dir.path().join("expanded");
1779        let mut request = FetchRequest::new(server.url.clone(), &cache_path);
1780        request.destination_path_expanded = Some(expanded_path.clone());
1781        request.expected_sha256 = Some(sha256_hex(&archive_bytes));
1782
1783        let first = client.fetch(request.clone()).unwrap();
1784        assert_eq!(first.status, FetchStatus::Expanded);
1785        assert_eq!(first.sha256, sha256_hex(&archive_bytes));
1786        let extracted = [
1787            expanded_path.join("source").join("bin").join("tool.txt"),
1788            expanded_path.join("bin").join("tool.txt"),
1789            expanded_path.join("tool.txt"),
1790        ]
1791        .into_iter()
1792        .find(|path| path.exists())
1793        .expect("expected extracted file in expanded directory");
1794        assert_eq!(fs::read(extracted).unwrap(), b"tool data");
1795
1796        let state = client.exists(&request).unwrap();
1797        assert_eq!(state.kind, FetchStateKind::ExpandedReady);
1798        assert_eq!(state.sha256.as_deref(), Some(first.sha256.as_str()));
1799
1800        let second = client.fetch(request).unwrap();
1801        assert_eq!(second.status, FetchStatus::AlreadyExpanded);
1802        assert_eq!(second.sha256, first.sha256);
1803    }
1804
1805    #[test]
1806    fn fetch_without_expected_sha_then_validate_later_uses_stored_fingerprint() {
1807        let daemon = TestDaemon::start();
1808        let body = b"artifact with delayed hash".to_vec();
1809        let server = TestHttpServer::start(TestHttpConfig {
1810            body: Arc::new(body.clone()),
1811            accept_ranges: false,
1812            send_content_length: true,
1813            chunk_size: 0,
1814            chunk_delay: Duration::ZERO,
1815            path: "delayed.bin".to_string(),
1816            request_started: None,
1817            release_response: None,
1818        });
1819        let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1820        let dir = tempfile::tempdir().unwrap();
1821        let destination = dir.path().join("delayed.bin");
1822
1823        let first = client
1824            .fetch(FetchRequest::new(server.url.clone(), &destination))
1825            .unwrap();
1826        assert_eq!(first.status, FetchStatus::Downloaded);
1827        assert_eq!(first.sha256, sha256_hex(&body));
1828
1829        let mut later = FetchRequest::new(server.url.clone(), &destination);
1830        later.expected_sha256 = Some(first.sha256.clone());
1831        let second = client.fetch(later.clone()).unwrap();
1832        assert_eq!(second.status, FetchStatus::AlreadyPresent);
1833        assert_eq!(second.sha256, first.sha256);
1834
1835        let state = client.exists(&later).unwrap();
1836        assert_eq!(state.kind, FetchStateKind::ArtifactReady);
1837        assert_eq!(state.sha256.as_deref(), Some(second.sha256.as_str()));
1838    }
1839
1840    #[test]
1841    fn expanded_state_remains_valid_when_expected_sha_is_added_later() {
1842        let daemon = TestDaemon::start();
1843        let dir = tempfile::tempdir().unwrap();
1844        let archive_path = dir.path().join("bundle.zip");
1845        {
1846            let file = File::create(&archive_path).unwrap();
1847            let mut zip = zip::ZipWriter::new(file);
1848            let options = zip::write::SimpleFileOptions::default();
1849            zip.start_file("hello.txt", options).unwrap();
1850            zip.write_all(b"hello").unwrap();
1851            zip.finish().unwrap();
1852        }
1853        let archive_bytes = fs::read(&archive_path).unwrap();
1854        let server = TestHttpServer::start(TestHttpConfig {
1855            body: Arc::new(archive_bytes.clone()),
1856            accept_ranges: false,
1857            send_content_length: true,
1858            chunk_size: 0,
1859            chunk_delay: Duration::ZERO,
1860            path: "bundle.zip".to_string(),
1861            request_started: None,
1862            release_response: None,
1863        });
1864        let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1865        let cache_path = dir.path().join("cache").join("bundle.zip");
1866        let expanded_path = dir.path().join("expanded");
1867
1868        let mut initial = FetchRequest::new(server.url.clone(), &cache_path);
1869        initial.destination_path_expanded = Some(expanded_path.clone());
1870        let first = client.fetch(initial).unwrap();
1871        assert_eq!(first.status, FetchStatus::Expanded);
1872
1873        let mut later = FetchRequest::new(server.url.clone(), &cache_path);
1874        later.destination_path_expanded = Some(expanded_path.clone());
1875        later.expected_sha256 = Some(first.sha256.clone());
1876        let second = client.fetch(later.clone()).unwrap();
1877        assert_eq!(second.status, FetchStatus::AlreadyExpanded);
1878        assert_eq!(second.sha256, first.sha256);
1879
1880        let state = client.exists(&later).unwrap();
1881        assert_eq!(state.kind, FetchStateKind::ExpandedReady);
1882        assert_eq!(state.sha256.as_deref(), Some(second.sha256.as_str()));
1883    }
1884
1885    #[test]
1886    fn force_is_rejected_for_existing_artifact_state() {
1887        let daemon = TestDaemon::start();
1888        let body = b"immutable".to_vec();
1889        let server = TestHttpServer::start(TestHttpConfig {
1890            body: Arc::new(body),
1891            accept_ranges: false,
1892            send_content_length: true,
1893            chunk_size: 0,
1894            chunk_delay: Duration::ZERO,
1895            path: "immutable.bin".to_string(),
1896            request_started: None,
1897            release_response: None,
1898        });
1899        let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1900        let dir = tempfile::tempdir().unwrap();
1901        let destination = dir.path().join("immutable.bin");
1902
1903        let _ = client
1904            .fetch(FetchRequest::new(server.url.clone(), &destination))
1905            .unwrap();
1906
1907        let mut force = FetchRequest::new(server.url.clone(), &destination);
1908        force.force = true;
1909        let err = client.fetch(force).unwrap_err();
1910        assert!(err.contains("purge"));
1911    }
1912
1913    #[test]
1914    fn fetch_rejects_unsafe_zip_entries_end_to_end() {
1915        let daemon = TestDaemon::start();
1916        let dir = tempfile::tempdir().unwrap();
1917        let archive_path = dir.path().join("unsafe.zip");
1918        {
1919            let file = File::create(&archive_path).unwrap();
1920            let mut zip = zip::ZipWriter::new(file);
1921            let options = zip::write::SimpleFileOptions::default();
1922            zip.start_file("../evil.txt", options).unwrap();
1923            zip.write_all(b"bad").unwrap();
1924            zip.finish().unwrap();
1925        }
1926        let archive_bytes = fs::read(&archive_path).unwrap();
1927        let server = TestHttpServer::start(TestHttpConfig {
1928            body: Arc::new(archive_bytes),
1929            accept_ranges: false,
1930            send_content_length: true,
1931            chunk_size: 0,
1932            chunk_delay: Duration::ZERO,
1933            path: "unsafe.zip".to_string(),
1934            request_started: None,
1935            release_response: None,
1936        });
1937        let client = DownloadClient::new(Some(daemon.endpoint.clone()));
1938        let cache_path = dir.path().join("cache").join("unsafe.zip");
1939        let expanded_path = dir.path().join("expanded");
1940        let mut request = FetchRequest::new(server.url.clone(), &cache_path);
1941        request.destination_path_expanded = Some(expanded_path.clone());
1942
1943        let err = client.fetch(request).unwrap_err();
1944        assert!(err.contains("unsafe zip entry"));
1945        assert!(!dir.path().join("evil.txt").exists());
1946    }
1947}