Skip to main content

modde_sources/wabbajack/
installer.rs

1//! The Wabbajack install engine: drives the full pipeline of downloading
2//! archives, extracting and patching files, repacking BSAs, and writing the
3//! finished modlist into the install directory, with resumable apply state.
4
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::{Instant, SystemTime, UNIX_EPOCH};
9
10use anyhow::{Context, Result, bail};
11use bytes::Bytes;
12use futures::stream::{self, StreamExt};
13use serde::{Deserialize, Serialize};
14use tokio::sync::{Semaphore, mpsc};
15use tokio::task::JoinHandle;
16use tracing::{info, warn};
17
18use modde_core::manifest::wabbajack::{
19    ArchiveInstallBatch, ArchiveState, DownloadDirective, InstallDirective, WabbajackManifest,
20};
21
22use crate::cache::{ByteCacheKey, ByteLruCache};
23use crate::decompress::{ArchiveBatchExtractor, ArchiveInput, ArchiveRequest, ArchiveRequestKind};
24use crate::traits::{AnySource, DownloadSource};
25
26use super::bsa_repack;
27use super::diagnostics::{
28    ArchiveBatchRecord, ProcessSnapshot, ProgressEvent, WabbajackDiagnostics,
29    cgroup_memory_pressure_high, current_process_snapshot,
30};
31use super::impact::{MissingArchiveImpact, MissingArchivePolicy};
32use super::inline::InlineSource;
33use super::patcher;
34use super::staging::StagingStore;
35
36/// Default maximum number of concurrent downloads.
37const DEFAULT_CONCURRENCY: usize = 4;
38const APPLY_STATE_VERSION: u32 = 1;
39const DEFAULT_ARCHIVE_MEMORY_MAX_BYTES: u64 = 256 * 1024 * 1024;
40
41/// What to do with downloaded source archives once they have been applied:
42/// keep them, prune the applied ones, or decide automatically.
43#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "kebab-case")]
45pub enum ArchiveRetentionPolicy {
46    #[default]
47    Keep,
48    PruneApplied,
49    Auto,
50}
51
52impl ArchiveRetentionPolicy {
53    /// Read the policy from the `MODDE_ARCHIVE_RETENTION` environment variable,
54    /// defaulting to [`ArchiveRetentionPolicy::Keep`].
55    #[must_use]
56    pub fn from_env() -> Self {
57        match std::env::var("MODDE_ARCHIVE_RETENTION")
58            .ok()
59            .as_deref()
60            .map(str::to_ascii_lowercase)
61            .as_deref()
62        {
63            Some("prune-applied" | "prune" | "delete") => Self::PruneApplied,
64            Some("auto") => Self::Auto,
65            _ => Self::Keep,
66        }
67    }
68}
69
70#[derive(Debug, Serialize, Deserialize)]
71struct ArchiveBatchSentinel {
72    pipeline_version: u32,
73    archive_hash: u64,
74    archive_size_bytes: u64,
75    directive_indices: Vec<usize>,
76}
77
78#[derive(Debug, Serialize, Deserialize)]
79struct CreateBsaSentinel {
80    pipeline_version: u32,
81    directive_index: usize,
82    temp_id: String,
83    to: String,
84}
85
86#[derive(Debug, Serialize, Deserialize)]
87struct VerifiedArchiveSidecar {
88    pipeline_version: u32,
89    archive_hash: u64,
90    size_bytes: u64,
91    modified_unix_ms: u128,
92    verified_unix_ms: u128,
93}
94
95#[derive(Debug, Clone)]
96enum TrustedArchive {
97    Path(PathBuf),
98    Bytes {
99        label: String,
100        bytes: Bytes,
101        fallback_path: PathBuf,
102    },
103}
104
105#[derive(Debug, Default, Clone, Copy)]
106struct ArchiveTrustStats {
107    sidecar_hit: bool,
108    streamed_hash_bytes: u64,
109    memory_archive_hit: bool,
110    disk_fallback: bool,
111}
112
113#[derive(Debug, Default, Clone, Copy)]
114struct StagingAdoptionSummary {
115    archive_batches: usize,
116    create_bsa: usize,
117}
118
119struct DiagnosticsHeartbeatGuard(Option<JoinHandle<()>>);
120
121impl Drop for DiagnosticsHeartbeatGuard {
122    fn drop(&mut self) {
123        if let Some(handle) = self.0.take() {
124            handle.abort();
125        }
126    }
127}
128
129/// Progress update sent during installation.
130#[derive(Debug, Clone)]
131pub enum InstallProgress {
132    Starting {
133        total_downloads: usize,
134    },
135    Downloading {
136        name: String,
137        bytes: u64,
138        total: u64,
139    },
140    DownloadComplete {
141        name: String,
142    },
143    Verifying {
144        name: String,
145    },
146    Applying {
147        directive_index: usize,
148        total: usize,
149    },
150    Patching {
151        name: String,
152    },
153    CreatingBSA {
154        name: String,
155    },
156    LauncherConfigured {
157        report: modde_games::launcher::LauncherConfigurationReport,
158    },
159    InlineFile {
160        name: String,
161    },
162    StagingAdopted {
163        archive_batches: usize,
164        create_bsa: usize,
165    },
166    Complete,
167    Failed {
168        error: String,
169    },
170}
171
172/// Orchestrate a full Wabbajack install pipeline.
173pub struct WabbajackInstaller {
174    manifest: WabbajackManifest,
175    /// Maps each archive hash to its index into `manifest.archives` (keep-first
176    /// semantics, matching the legacy `.iter().find()` lookups).
177    archives_by_hash: std::collections::HashMap<u64, usize>,
178    /// Path to the `.wabbajack` zip file (needed for `InlineFile` and `PatchedFromArchive` data).
179    wabbajack_path: PathBuf,
180    store_dir: PathBuf,
181    staging_dir: PathBuf,
182    game_dir: Option<PathBuf>,
183    sources: Arc<Vec<AnySource>>,
184    concurrency: usize,
185    /// When true, per-archive download and per-directive apply failures are logged
186    /// rather than fatal. Used for very large modlists where a handful of archives
187    /// require manual intervention (deleted Nexus mods, rate-limited cloud hosts)
188    /// but the rest of the modlist should still install.
189    continue_on_error: bool,
190    byte_cache: Arc<ByteLruCache>,
191    diagnostics: Option<WabbajackDiagnostics>,
192    archive_retention: ArchiveRetentionPolicy,
193    missing_archive_policy: MissingArchivePolicy,
194    archive_memory_max_bytes: u64,
195}
196
197impl WabbajackInstaller {
198    /// Create an installer for `manifest`, reading archive data from
199    /// `wabbajack_path` and using `store_dir` and `staging_dir` as the download
200    /// store and working staging area.
201    #[must_use]
202    pub fn new(
203        manifest: WabbajackManifest,
204        wabbajack_path: PathBuf,
205        store_dir: PathBuf,
206        staging_dir: PathBuf,
207    ) -> Self {
208        let mut archives_by_hash = std::collections::HashMap::new();
209        for (idx, archive) in manifest.archives.iter().enumerate() {
210            archives_by_hash.entry(archive.hash).or_insert(idx);
211        }
212        Self {
213            manifest,
214            archives_by_hash,
215            wabbajack_path,
216            store_dir,
217            staging_dir,
218            game_dir: None,
219            sources: Arc::new(Vec::new()),
220            concurrency: DEFAULT_CONCURRENCY,
221            continue_on_error: false,
222            byte_cache: Arc::new(ByteLruCache::from_env()),
223            diagnostics: None,
224            archive_retention: ArchiveRetentionPolicy::from_env(),
225            missing_archive_policy: MissingArchivePolicy::Fail,
226            archive_memory_max_bytes: std::env::var("MODDE_ARCHIVE_MEMORY_MAX_BYTES")
227                .ok()
228                .and_then(|v| v.parse::<u64>().ok())
229                .unwrap_or(DEFAULT_ARCHIVE_MEMORY_MAX_BYTES),
230        }
231    }
232
233    /// When set, log per-archive failures rather than aborting the install.
234    pub fn set_continue_on_error(&mut self, value: bool) {
235        self.continue_on_error = value;
236    }
237
238    /// Attach a diagnostics sink for heartbeat and progress reporting.
239    pub fn set_diagnostics(&mut self, diagnostics: WabbajackDiagnostics) {
240        self.diagnostics = Some(diagnostics);
241    }
242
243    /// Set the retention policy applied to downloaded archives after install.
244    pub fn set_archive_retention(&mut self, policy: ArchiveRetentionPolicy) {
245        self.archive_retention = policy;
246    }
247
248    /// Set how missing source archives are handled during install.
249    pub fn set_missing_archive_policy(&mut self, policy: MissingArchivePolicy) {
250        self.missing_archive_policy = policy;
251    }
252
253    /// Set the game install directory used by Wabbajack game-file sources.
254    pub fn set_game_dir(&mut self, game_dir: PathBuf) {
255        self.game_dir = Some(game_dir);
256    }
257
258    /// Register a download source implementation.
259    pub fn add_source(&mut self, source: AnySource) {
260        Arc::get_mut(&mut self.sources)
261            .expect("add_source must be called before install")
262            .push(source);
263    }
264
265    /// Set the maximum number of concurrent downloads.
266    pub fn set_concurrency(&mut self, concurrency: usize) {
267        self.concurrency = concurrency.max(1);
268    }
269
270    /// Run the full install pipeline, sending progress updates via channel.
271    pub async fn install(&self, progress_tx: mpsc::UnboundedSender<InstallProgress>) -> Result<()> {
272        let staging_store = StagingStore::new(&self.staging_dir);
273        staging_store.prepare_resumable().await?;
274        let _diagnostics_heartbeat = self.diagnostics.as_ref().map(|diagnostics| {
275            let cache = Arc::clone(&self.byte_cache);
276            DiagnosticsHeartbeatGuard(Some(
277                diagnostics.spawn_heartbeat(Arc::new(move || cache.bytes_used())),
278            ))
279        });
280        if let Some(diagnostics) = &self.diagnostics {
281            diagnostics.set_phase("preflight");
282            diagnostics.record_progress(ProgressEvent::Other);
283        }
284
285        let downloads = self.manifest.download_directives();
286        let installs = self.manifest.install_directives();
287
288        self.validate_game_file_sources()?;
289        self.verify_game_file_sources().await?;
290        self.validate_download_sources(&downloads)?;
291        self.preflight_authored_files().await?;
292        self.check_diagnostics_abort()?;
293
294        progress_tx
295            .send(InstallProgress::Starting {
296                total_downloads: downloads.len(),
297            })
298            .ok();
299
300        // Apply install directives.
301        //
302        // FromArchive / InlineFile / PatchedFromArchive directives each touch a
303        // unique destination path inside the staging tree, so they can run in
304        // parallel. CreateBSA is held back to a second pass because it depends
305        // on the staging directory being fully populated for its `temp_id`
306        // bucket.
307        let total = installs.len();
308        let progress_for_parallel = progress_tx.clone();
309        // Apply concurrency.
310        //
311        // Bsdiff patches and large archive extractions can each peak at
312        // hundreds of MB of resident memory; multiplied by a fixed worker
313        // count this can swamp swap on machines that are otherwise healthy.
314        //
315        // We use a host-RAM-aware admission gate (`memory-admission` crate) so
316        // workers self-throttle when the system runs hot, and a generous
317        // structural cap on `buffer_unordered` so the gate has waiters to
318        // release once memory frees up. `MODDE_APPLY_MAX_IN_FLIGHT` overrides
319        // the cap for benchmarking; `MODDE_APPLY_RAM_FRACTION` tunes the
320        // throttle threshold (default 0.80).
321        let max_in_flight = std::env::var("MODDE_APPLY_MAX_IN_FLIGHT")
322            .ok()
323            .and_then(|v| v.parse::<usize>().ok())
324            .filter(|&n| n > 0)
325            .unwrap_or_else(|| {
326                std::thread::available_parallelism()
327                    .map(std::num::NonZeroUsize::get)
328                    .unwrap_or(8)
329                    .clamp(1, 4)
330            });
331        // RAM fraction defaults are aggressive: the recommended deployment is
332        // inside a cgroup (`systemd-run --user --scope -p MemoryHigh=…`) so
333        // the kernel is the hard ceiling. The application-level gate just
334        // prevents pathological in-process spikes.
335        let max_ram_fraction = std::env::var("MODDE_APPLY_RAM_FRACTION")
336            .ok()
337            .and_then(|v| v.parse::<f64>().ok())
338            .filter(|&v| v > 0.0 && v <= 1.0)
339            .unwrap_or(0.85);
340        let safety_reserve_bytes = std::env::var("MODDE_APPLY_SAFETY_RESERVE_GIB")
341            .ok()
342            .and_then(|v| v.parse::<u64>().ok())
343            .map_or(2 * (1_u64 << 30), |g| g * (1_u64 << 30));
344        // Page-cache throttle is an extra defence against the I/O-induced
345        // thrash failure mode (huge readahead folios + slow reclaim). Default
346        // is permissive since fadvise(DONTNEED) on archives already prevents
347        // most readahead build-up; tighten it only if the safety reserve
348        // alone is not enough.
349        // Default page-cache check is OFF (1.0) — when modde runs inside a
350        // cgroup MemoryHigh, the kernel handles page-cache pressure without
351        // application-level help, and the in-application throttle just
352        // misfires when the cache is hot from prior activity. Operators
353        // running outside a cgroup can opt back in by setting
354        // `MODDE_APPLY_PAGE_CACHE_FRACTION=0.6` or similar.
355        let page_cache_fraction = std::env::var("MODDE_APPLY_PAGE_CACHE_FRACTION")
356            .ok()
357            .and_then(|v| v.parse::<f64>().ok())
358            .filter(|&v: &f64| v > 0.0 && v <= 1.0)
359            .unwrap_or(1.0);
360        let weighted_config = memory_admission::weighted::WeightedConfig {
361            base: memory_admission::Config {
362                max_ram_fraction,
363                ..memory_admission::Config::default()
364            }
365            .validate()
366            .expect("hard-coded default memory-admission config is valid"),
367            safety_reserve_bytes,
368            // Anything beyond this size runs alone — typical Wabbajack lists
369            // top out around 8-12 GiB single archives.
370            max_single_weight_bytes: 4 * (1_u64 << 30),
371            max_page_cache_fraction: page_cache_fraction,
372            ..memory_admission::weighted::WeightedConfig::default()
373        };
374        // Prefer the cgroup-v2 provider when modde runs inside a memory-bounded
375        // scope (e.g. `systemd-run --user --scope -p MemoryHigh=…`). The host
376        // `MemAvailable` view is the wrong signal in that case — the cgroup
377        // gets throttled long before host memory runs out, and a host-level
378        // gate happily admits work that the kernel then forces into swap.
379        let provider: memory_admission::provider::SharedMemoryProvider = {
380            #[cfg(target_os = "linux")]
381            {
382                memory_admission::providers::CgroupV2Provider::shared()
383                    .unwrap_or_else(|| memory_admission::providers::ProcMeminfoProvider::shared())
384            }
385            #[cfg(not(target_os = "linux"))]
386            {
387                memory_admission::providers::ProcMeminfoProvider::shared()
388            }
389        };
390        let gate =
391            memory_admission::weighted::AsyncWeightedAdmissionGate::new(weighted_config, provider);
392
393        // Build a hash → size table from the manifest so per-directive weight
394        // estimation is a constant-time lookup.
395        let archive_size_by_hash: HashMap<u64, u64> = self
396            .manifest
397            .archives
398            .iter()
399            .map(|a| (a.hash, a.size))
400            .collect();
401        let archive_size_by_hash = Arc::new(archive_size_by_hash);
402
403        info!(
404            max_in_flight,
405            max_ram_fraction,
406            safety_reserve_gib = safety_reserve_bytes / (1 << 30),
407            page_cache_fraction,
408            byte_cache_used = self.byte_cache.bytes_used(),
409            "starting weighted parallel apply pass"
410        );
411        let inline_source = installs
412            .iter()
413            .any(|directive| {
414                matches!(
415                    directive,
416                    InstallDirective::InlineFile { .. }
417                        | InstallDirective::PatchedFromArchive { .. }
418                )
419            })
420            .then(|| InlineSource::open(&self.wabbajack_path).map(Arc::new))
421            .transpose()?;
422
423        // Phase 1 (per-archive batching): group FromArchive and
424        // PatchedFromArchive directives by archive_hash so each archive is
425        // touched at most once concurrently. Phase 2's native decoder can
426        // then turn each batch into a single archive reader session. InlineFile
427        // directives are independent of archives so they fan out separately.
428        let impact = MissingArchiveImpact::analyze(&self.manifest, &self.store_dir);
429        let skip_plan = impact.skip_plan(&self.manifest, self.missing_archive_policy);
430        if !skip_plan.is_empty() {
431            warn!(
432                policy = ?self.missing_archive_policy,
433                missing_archives = impact.missing_archives.len(),
434                skipped_directives = skip_plan.skipped_directives.len(),
435                skipped_mod_roots = skip_plan.skipped_mod_roots.len(),
436                "omitting Wabbajack outputs for missing optional archives"
437            );
438        }
439
440        let mut inline_directives: Vec<(usize, &InstallDirective)> = Vec::new();
441        for (i, directive) in installs.iter().enumerate() {
442            if matches!(directive, InstallDirective::InlineFile { .. })
443                && !skip_plan.should_skip_directive(i, directive)
444            {
445                inline_directives.push((i, directive));
446            }
447        }
448        let mut archive_batches = self.manifest.install_directives_grouped_by_archive();
449        if !skip_plan.is_empty() {
450            for batch in &mut archive_batches {
451                batch.directives.retain(|indexed| {
452                    !skip_plan.should_skip_directive(indexed.directive_index, &indexed.directive)
453                });
454            }
455            archive_batches.retain(|batch| !batch.directives.is_empty());
456        }
457        let archive_batch_count = archive_batches.len();
458        let patch_max_in_flight = std::env::var("MODDE_PATCH_MAX_IN_FLIGHT")
459            .ok()
460            .and_then(|v| v.parse::<usize>().ok())
461            .filter(|&n| n > 0)
462            .unwrap_or(1);
463        let patch_batch_gate = Arc::new(Semaphore::new(patch_max_in_flight));
464        // Read apply-weight tunables once; env does not change mid-run, so we
465        // avoid re-parsing it per directive/per batch in the fan-out below.
466        let directive_weights = DirectiveWeights::from_env();
467        let archive_batch_weights = ArchiveBatchWeights::from_env();
468        info!(
469            archive_batch_count,
470            inline_directive_count = inline_directives.len(),
471            patch_max_in_flight,
472            "grouped apply directives by archive"
473        );
474        let adoption = self
475            .adopt_existing_staging(&archive_batches, &installs)
476            .await?;
477        self.check_diagnostics_abort()?;
478        if adoption.archive_batches > 0 || adoption.create_bsa > 0 {
479            info!(
480                archive_batches = adoption.archive_batches,
481                create_bsa = adoption.create_bsa,
482                "adopted existing Wabbajack staging outputs"
483            );
484            progress_tx
485                .send(InstallProgress::StagingAdopted {
486                    archive_batches: adoption.archive_batches,
487                    create_bsa: adoption.create_bsa,
488                })
489                .ok();
490        }
491
492        // Inline-file pass — these have no archive contention so we can
493        // saturate the gate's RAM budget without backpressuring on archives.
494        if let Some(diagnostics) = &self.diagnostics {
495            diagnostics.set_phase("apply-inline");
496        }
497        let inline_results: Vec<(usize, Result<()>)> = stream::iter(inline_directives)
498            .map(|(i, directive)| {
499                let progress_tx = progress_for_parallel.clone();
500                let gate = gate.clone();
501                let sizes = Arc::clone(&archive_size_by_hash);
502                let inline_source = inline_source.clone();
503                async move {
504                    let weight = estimate_directive_weight(directive, &sizes, &directive_weights);
505                    let _permit = gate.acquire(weight).await;
506                    if let Err(e) = self.check_diagnostics_abort() {
507                        return (i, Err(e));
508                    }
509                    progress_tx
510                        .send(InstallProgress::Applying {
511                            directive_index: i,
512                            total,
513                        })
514                        .ok();
515                    let InstallDirective::InlineFile { source_data_id, to } = directive else {
516                        unreachable!("inline_directives only contains InlineFile");
517                    };
518                    progress_tx
519                        .send(InstallProgress::InlineFile { name: to.clone() })
520                        .ok();
521                    let result = match inline_source.as_deref() {
522                        Some(inline_source) => {
523                            self.apply_inline_file(inline_source, source_data_id, to)
524                                .await
525                        }
526                        None => Err(anyhow::anyhow!("inline source was not initialized")),
527                    };
528                    (i, result)
529                }
530            })
531            .buffer_unordered(max_in_flight)
532            .collect()
533            .await;
534
535        // Archive-batch pass — one task per archive_hash, each task processes
536        // its own directives sequentially. Concurrency is over batches, so
537        // the same archive is never decompressed twice in parallel.
538        if let Some(diagnostics) = &self.diagnostics {
539            diagnostics.set_phase("apply-archive-batches");
540        }
541        let archive_results: Vec<Vec<(usize, Result<()>)>> = stream::iter(archive_batches)
542            .map(|batch| {
543                let progress_tx = progress_for_parallel.clone();
544                let gate = gate.clone();
545                let inline_source = inline_source.clone();
546                let patch_batch_gate = Arc::clone(&patch_batch_gate);
547                async move {
548                    let _patch_permit = if archive_batch_has_patch(&batch) {
549                        Some(
550                            patch_batch_gate
551                                .acquire_owned()
552                                .await
553                                .expect("semaphore open"),
554                        )
555                    } else {
556                        None
557                    };
558                    // The batch's whole-archive memory cost is the archive
559                    // size itself (decompression working set, conservatively
560                    // half), regardless of how many directives it serves.
561                    let archive_weight =
562                        estimate_archive_batch_weight(&batch, &archive_batch_weights);
563                    let _permit = gate.acquire(archive_weight).await;
564                    if let Err(e) = self.check_diagnostics_abort() {
565                        return batch
566                            .directives
567                            .into_iter()
568                            .map(|directive| {
569                                (directive.directive_index, Err(anyhow::anyhow!("{e:#}")))
570                            })
571                            .collect();
572                    }
573
574                    for indexed in &batch.directives {
575                        let i = indexed.directive_index;
576                        progress_tx
577                            .send(InstallProgress::Applying {
578                                directive_index: i,
579                                total,
580                            })
581                            .ok();
582                    }
583                    self.apply_archive_batch(batch, inline_source.as_deref(), &progress_tx)
584                        .await
585                }
586            })
587            .buffer_unordered(max_in_flight)
588            .collect()
589            .await;
590
591        for (i, result) in inline_results
592            .into_iter()
593            .chain(archive_results.into_iter().flatten())
594        {
595            if let Err(e) = result {
596                if self.continue_on_error {
597                    warn!(directive_index = i, "skipping directive after error: {e:#}");
598                } else {
599                    return Err(e);
600                }
601            }
602        }
603
604        // Second pass: CreateBSA (depends on staged temp directories).
605        if let Some(diagnostics) = &self.diagnostics {
606            diagnostics.set_phase("create-bsa");
607        }
608        for (i, directive) in installs.iter().enumerate() {
609            self.check_diagnostics_abort()?;
610            let InstallDirective::CreateBSA {
611                temp_id,
612                to,
613                file_states,
614            } = directive
615            else {
616                continue;
617            };
618            if skip_plan.should_skip_create_bsa(i, temp_id, to) {
619                warn!(
620                    directive_index = i,
621                    temp_id,
622                    to,
623                    "skipping CreateBSA because an optional upstream archive is missing"
624                );
625                continue;
626            }
627            if self.create_bsa_sentinel_valid(i, temp_id, to).await {
628                continue;
629            }
630            progress_tx
631                .send(InstallProgress::Applying {
632                    directive_index: i,
633                    total,
634                })
635                .ok();
636            progress_tx
637                .send(InstallProgress::CreatingBSA { name: to.clone() })
638                .ok();
639            if let Err(e) = self.apply_create_bsa(temp_id, to, file_states).await {
640                if self.continue_on_error {
641                    warn!(directive_index = i, "skipping CreateBSA after error: {e:#}");
642                } else {
643                    return Err(e);
644                }
645            } else {
646                self.write_create_bsa_sentinel(i, temp_id, to).await?;
647                if let Some(diagnostics) = &self.diagnostics {
648                    diagnostics.record_progress(ProgressEvent::CreateBsaComplete);
649                }
650            }
651        }
652
653        if let Some(diagnostics) = &self.diagnostics {
654            diagnostics.set_phase("compress-staging");
655        }
656        self.check_diagnostics_abort()?;
657        let summary = staging_store
658            .compress_eligible_files(self.concurrency)
659            .await?;
660        info!(
661            compressed_files = summary.compressed_files,
662            skipped_files = summary.skipped_files,
663            original_bytes = summary.original_bytes,
664            compressed_bytes = summary.compressed_bytes,
665            "compressed Wabbajack staging files"
666        );
667
668        progress_tx.send(InstallProgress::Complete).ok();
669        if let Some(diagnostics) = &self.diagnostics {
670            diagnostics.set_phase("complete");
671            diagnostics.record_progress(ProgressEvent::Other);
672        }
673        info!("wabbajack installation complete");
674        Ok(())
675    }
676
677    async fn ensure_archive_trusted(
678        &self,
679        archive_hash: u64,
680        progress_tx: &mpsc::UnboundedSender<InstallProgress>,
681    ) -> Result<(TrustedArchive, ArchiveTrustStats)> {
682        let path = archive_path(&self.store_dir, &archive_hash);
683        let mut stats = ArchiveTrustStats::default();
684
685        if path.exists() {
686            if self.verified_sidecar_valid(&path, archive_hash).await {
687                stats.sidecar_hit = true;
688            } else {
689                let metadata = tokio::fs::metadata(&path).await?;
690                progress_tx
691                    .send(InstallProgress::Verifying {
692                        name: format!("{archive_hash:016x}"),
693                    })
694                    .ok();
695                modde_core::hash::verify_xxh64(&path, archive_hash)
696                    .await
697                    .with_context(|| {
698                        format!("hash verification failed for archive {archive_hash:016x}")
699                    })?;
700                stats.streamed_hash_bytes = metadata.len();
701                self.write_verified_sidecar(&path, archive_hash).await?;
702            }
703            return self
704                .trusted_archive_for_path(path, archive_hash, stats)
705                .await;
706        }
707
708        let Some(directive) = self
709            .archives_by_hash
710            .get(&archive_hash)
711            .and_then(|&i| self.manifest.archives[i].download_directive())
712        else {
713            bail!(
714                "archive {archive_hash:016x} is not present in store and has no download directive"
715            );
716        };
717
718        let name = directive.display_name().into_owned();
719        if matches!(directive, DownloadDirective::Manual { .. }) {
720            bail!("manual archive {name} ({archive_hash:016x}) is missing from the store");
721        }
722
723        let Some(source) = self
724            .sources
725            .iter()
726            .find(|source| source.can_handle(&directive))
727        else {
728            bail!("no download source registered for archive {name} ({archive_hash:016x})");
729        };
730
731        if let Some(parent) = path.parent() {
732            tokio::fs::create_dir_all(parent).await?;
733        }
734        let handle = source
735            .resolve(&directive)
736            .await
737            .with_context(|| format!("failed to resolve download: {name}"))?;
738        progress_tx
739            .send(InstallProgress::Downloading {
740                name: name.clone(),
741                bytes: 0,
742                total: handle.size_hint.unwrap_or(0),
743            })
744            .ok();
745        source
746            .download(handle, &path)
747            .await
748            .with_context(|| format!("failed to download: {name}"))?;
749        self.write_verified_sidecar(&path, archive_hash).await?;
750        progress_tx
751            .send(InstallProgress::DownloadComplete { name })
752            .ok();
753        self.trusted_archive_for_path(path, archive_hash, stats)
754            .await
755    }
756
757    async fn trusted_archive_for_path(
758        &self,
759        path: PathBuf,
760        archive_hash: u64,
761        mut stats: ArchiveTrustStats,
762    ) -> Result<(TrustedArchive, ArchiveTrustStats)> {
763        let metadata = tokio::fs::metadata(&path).await?;
764        if metadata.len() <= self.archive_memory_max_bytes
765            && archive_path_is_memory_supported(&path).await
766        {
767            let bytes = tokio::fs::read(&path).await?;
768            stats.memory_archive_hit = true;
769            return Ok((
770                TrustedArchive::Bytes {
771                    label: format!("{archive_hash:016x}.archive"),
772                    bytes: Bytes::from(bytes),
773                    fallback_path: path,
774                },
775                stats,
776            ));
777        }
778        stats.disk_fallback = true;
779        Ok((TrustedArchive::Path(path), stats))
780    }
781
782    async fn verified_sidecar_valid(&self, path: &Path, archive_hash: u64) -> bool {
783        let Ok(metadata) = tokio::fs::metadata(path).await else {
784            return false;
785        };
786        let Ok(data) = tokio::fs::read_to_string(verified_sidecar_path(path)).await else {
787            return false;
788        };
789        let Ok(sidecar) = serde_json::from_str::<VerifiedArchiveSidecar>(&data) else {
790            return false;
791        };
792        sidecar.pipeline_version == APPLY_STATE_VERSION
793            && sidecar.archive_hash == archive_hash
794            && sidecar.size_bytes == metadata.len()
795            && sidecar.modified_unix_ms == metadata_modified_unix_ms(&metadata)
796    }
797
798    async fn write_verified_sidecar(&self, path: &Path, archive_hash: u64) -> Result<()> {
799        let metadata = tokio::fs::metadata(path).await?;
800        let sidecar = VerifiedArchiveSidecar {
801            pipeline_version: APPLY_STATE_VERSION,
802            archive_hash,
803            size_bytes: metadata.len(),
804            modified_unix_ms: metadata_modified_unix_ms(&metadata),
805            verified_unix_ms: unix_ms(),
806        };
807        tokio::fs::write(
808            verified_sidecar_path(path),
809            serde_json::to_vec_pretty(&sidecar)?,
810        )
811        .await?;
812        Ok(())
813    }
814
815    async fn maybe_prune_archive(&self, batch: &ArchiveInstallBatch) -> Result<u64> {
816        if self.archive_retention == ArchiveRetentionPolicy::Keep {
817            return Ok(0);
818        }
819        if self.game_file_source_path(batch.archive_hash)?.is_some() {
820            return Ok(0);
821        }
822        let path = archive_path(&self.store_dir, &batch.archive_hash);
823        if !path.exists() {
824            return Ok(0);
825        }
826        if self.archive_retention == ArchiveRetentionPolicy::Auto
827            && (batch.archive_size_bytes > self.archive_memory_max_bytes
828                || batch.directives.len() > 1)
829        {
830            return Ok(0);
831        }
832        let size = tokio::fs::metadata(&path)
833            .await
834            .map(|m| m.len())
835            .unwrap_or(0);
836        tokio::fs::remove_file(&path).await?;
837        let _ = tokio::fs::remove_file(verified_sidecar_path(&path)).await;
838        Ok(size)
839    }
840
841    /// Extract a file from a downloaded archive and place it in the staging directory.
842    async fn apply_from_archive(
843        &self,
844        archive_hash: u64,
845        from: &str,
846        to: &str,
847        expected_size: u64,
848    ) -> Result<()> {
849        // Validate paths against traversal attacks
850        validate_archive_entry(from)?;
851        validate_archive_entry(to)?;
852
853        let output_path = self.staging_dir.join(normalize_path(to));
854
855        if let Some(parent) = output_path.parent() {
856            tokio::fs::create_dir_all(parent).await?;
857        }
858
859        if let Some(source) = self.game_file_source_path(archive_hash)? {
860            if game_file_source_is_whole_file(&source.path, &source.rel_path, from) {
861                modde_core::link::link_or_copy(&source.path, &output_path).await?;
862            } else {
863                let archive_path = source.path;
864                let from = from.to_string();
865                tokio::task::spawn_blocking(move || {
866                    ArchiveBatchExtractor::extract_selected(
867                        &archive_path,
868                        &[ArchiveRequest {
869                            directive_index: 0,
870                            from,
871                            inner_path: None,
872                            kind: ArchiveRequestKind::WriteFile {
873                                to: output_path,
874                                expected_size: (expected_size > 0).then_some(expected_size),
875                            },
876                        }],
877                    )
878                })
879                .await??;
880            }
881            info!(from = %from, to = %to, "extracted game-file source");
882            return Ok(());
883        }
884
885        // Extract the specific file from a downloaded archive or resolve a
886        // Wabbajack game-file source from the local game install.
887        let data = self
888            .read_archive_source(archive_hash, from)
889            .await
890            .with_context(|| {
891                format!("failed to extract '{from}' from archive {archive_hash:016x}")
892            })?;
893
894        tokio::fs::write(&output_path, &data).await?;
895
896        info!(from = %from, to = %to, "extracted file from archive");
897        Ok(())
898    }
899
900    async fn apply_archive_batch(
901        &self,
902        batch: ArchiveInstallBatch,
903        inline_source: Option<&InlineSource>,
904        progress_tx: &mpsc::UnboundedSender<InstallProgress>,
905    ) -> Vec<(usize, Result<()>)> {
906        if let Err(e) = self.check_diagnostics_abort() {
907            return batch
908                .directives
909                .into_iter()
910                .map(|directive| (directive.directive_index, Err(anyhow::anyhow!("{e:#}"))))
911                .collect();
912        }
913        if self.archive_batch_sentinel_valid(&batch).await {
914            return batch
915                .directives
916                .into_iter()
917                .map(|directive| (directive.directive_index, Ok(())))
918                .collect();
919        }
920
921        let mut native_requests = Vec::new();
922        let mut patch_directives = HashMap::new();
923        let mut results = Vec::with_capacity(batch.directives.len());
924        let batch_started = Instant::now();
925        let mut extraction_ms = 0;
926        let mut patch_ms = 0;
927        let mut trust_check_ms = 0;
928        let mut prune_ms = 0;
929        let mut pruned_bytes = 0;
930        let mut trust_stats = ArchiveTrustStats::default();
931        let mut extracted_patch_source_bytes = 0;
932        let byte_cache_used_before = self.byte_cache.bytes_used();
933        let process_before = current_process_snapshot();
934        let patch_count = batch
935            .directives
936            .iter()
937            .filter(|indexed| {
938                matches!(
939                    indexed.directive,
940                    InstallDirective::PatchedFromArchive { .. }
941                )
942            })
943            .count();
944        let _batch_guard = self.diagnostics.as_ref().map(|diagnostics| {
945            diagnostics.start_archive_batch(
946                batch.archive_hash,
947                batch.directives.len(),
948                patch_count,
949                batch.archive_size_bytes,
950            )
951        });
952        info!(
953            archive_hash = %format!("{:016x}", batch.archive_hash),
954            archive_size_bytes = batch.archive_size_bytes,
955            directives = batch.directives.len(),
956            patch_directives = patch_count,
957            byte_cache_used = self.byte_cache.bytes_used(),
958            "starting archive apply batch"
959        );
960
961        let trusted_archive = if self
962            .game_file_source_path(batch.archive_hash)
963            .ok()
964            .flatten()
965            .is_some()
966        {
967            None
968        } else {
969            let trust_started = Instant::now();
970            let trusted = self
971                .ensure_archive_trusted(batch.archive_hash, progress_tx)
972                .await;
973            trust_check_ms = trust_started.elapsed().as_millis();
974            match trusted {
975                Ok((archive, stats)) => {
976                    trust_stats = stats;
977                    Some(archive)
978                }
979                Err(e) => {
980                    let msg = format!("{e:#}");
981                    let results = batch
982                        .directives
983                        .iter()
984                        .map(|indexed| {
985                            (
986                                indexed.directive_index,
987                                Err(anyhow::anyhow!("archive trust check failed: {msg}")),
988                            )
989                        })
990                        .collect::<Vec<_>>();
991                    self.write_archive_batch_metrics(
992                        &batch,
993                        patch_count,
994                        batch_started,
995                        trust_check_ms,
996                        extraction_ms,
997                        patch_ms,
998                        prune_ms,
999                        extracted_patch_source_bytes,
1000                        trust_stats,
1001                        pruned_bytes,
1002                        byte_cache_used_before,
1003                        process_before,
1004                        &results,
1005                    )
1006                    .await;
1007                    return results;
1008                }
1009            }
1010        };
1011
1012        for indexed in &batch.directives {
1013            if let Err(e) = self.check_diagnostics_abort() {
1014                results.push((indexed.directive_index, Err(e)));
1015                continue;
1016            }
1017            match &indexed.directive {
1018                InstallDirective::FromArchive {
1019                    archive_hash,
1020                    from,
1021                    inner_path,
1022                    to,
1023                    size,
1024                } => match self.game_file_source_path(*archive_hash) {
1025                    Ok(Some(_)) => {
1026                        results.push((
1027                            indexed.directive_index,
1028                            self.apply_from_archive(*archive_hash, from, to, *size)
1029                                .await,
1030                        ));
1031                    }
1032                    Ok(None) => {
1033                        // Validate against path traversal / zip-slip before joining
1034                        // onto the staging dir (mirrors apply_from_archive). The batch
1035                        // extractor validates `from`/`inner_path` but never the `to`
1036                        // destination, so a malicious manifest `to` must be rejected here.
1037                        if let Err(e) =
1038                            validate_archive_entry(from).and_then(|()| validate_archive_entry(to))
1039                        {
1040                            results.push((indexed.directive_index, Err(e)));
1041                            continue;
1042                        }
1043                        let output_path = self.staging_dir.join(normalize_path(to));
1044                        native_requests.push(ArchiveRequest {
1045                            directive_index: indexed.directive_index,
1046                            from: from.clone(),
1047                            inner_path: inner_path.clone(),
1048                            kind: ArchiveRequestKind::WriteFile {
1049                                to: output_path,
1050                                expected_size: (*size > 0).then_some(*size),
1051                            },
1052                        });
1053                    }
1054                    Err(e) => results.push((indexed.directive_index, Err(e))),
1055                },
1056                InstallDirective::PatchedFromArchive {
1057                    archive_hash,
1058                    from,
1059                    inner_path,
1060                    to,
1061                    patch_id,
1062                    size,
1063                } => {
1064                    progress_tx
1065                        .send(InstallProgress::Patching { name: to.clone() })
1066                        .ok();
1067                    match self.game_file_source_path(*archive_hash) {
1068                        Ok(Some(_)) => {
1069                            let Some(inline_source) = inline_source else {
1070                                results.push((
1071                                    indexed.directive_index,
1072                                    Err(anyhow::anyhow!("inline source was not initialized")),
1073                                ));
1074                                continue;
1075                            };
1076                            results.push((
1077                                indexed.directive_index,
1078                                self.apply_patched_from_archive(
1079                                    inline_source,
1080                                    *archive_hash,
1081                                    from,
1082                                    to,
1083                                    patch_id,
1084                                    *size,
1085                                )
1086                                .await,
1087                            ));
1088                        }
1089                        Ok(None) => {
1090                            if let Some(bytes) = self.patch_source_from_cache(*archive_hash, from) {
1091                                let Some(inline_source) = inline_source else {
1092                                    results.push((
1093                                        indexed.directive_index,
1094                                        Err(anyhow::anyhow!("inline source was not initialized")),
1095                                    ));
1096                                    continue;
1097                                };
1098                                results.push((
1099                                    indexed.directive_index,
1100                                    self.write_patched_output(
1101                                        inline_source,
1102                                        bytes,
1103                                        to,
1104                                        patch_id,
1105                                        *size,
1106                                    )
1107                                    .await,
1108                                ));
1109                            } else {
1110                                patch_directives.insert(
1111                                    indexed.directive_index,
1112                                    (
1113                                        *archive_hash,
1114                                        from.clone(),
1115                                        inner_path.clone(),
1116                                        to.clone(),
1117                                        patch_id.clone(),
1118                                        *size,
1119                                    ),
1120                                );
1121                                native_requests.push(ArchiveRequest {
1122                                    directive_index: indexed.directive_index,
1123                                    from: from.clone(),
1124                                    inner_path: inner_path.clone(),
1125                                    kind: ArchiveRequestKind::Bytes,
1126                                });
1127                            }
1128                        }
1129                        Err(e) => results.push((indexed.directive_index, Err(e))),
1130                    }
1131                }
1132                _ => unreachable!("archive batch contains only archive-backed directives"),
1133            }
1134        }
1135
1136        if native_requests.is_empty() {
1137            if results.iter().all(|(_, result)| result.is_ok())
1138                && let Err(e) = self.write_archive_batch_sentinel(&batch).await
1139            {
1140                let msg = format!("{e:#}");
1141                return results
1142                    .into_iter()
1143                    .map(|(idx, result)| {
1144                        if result.is_ok() {
1145                            (
1146                                idx,
1147                                Err(anyhow::anyhow!(
1148                                    "failed to write archive batch sentinel: {msg}"
1149                                )),
1150                            )
1151                        } else {
1152                            (idx, result)
1153                        }
1154                    })
1155                    .collect();
1156            }
1157            if results.iter().all(|(_, result)| result.is_ok())
1158                && let Some(diagnostics) = &self.diagnostics
1159            {
1160                diagnostics.record_progress(ProgressEvent::ArchiveBatchComplete);
1161            }
1162            self.write_archive_batch_metrics(
1163                &batch,
1164                patch_count,
1165                batch_started,
1166                trust_check_ms,
1167                extraction_ms,
1168                patch_ms,
1169                prune_ms,
1170                extracted_patch_source_bytes,
1171                trust_stats,
1172                pruned_bytes,
1173                byte_cache_used_before,
1174                process_before,
1175                &results,
1176            )
1177            .await;
1178            return results;
1179        }
1180
1181        let mut write_requests = Vec::new();
1182        let mut patch_requests = Vec::new();
1183        for request in native_requests {
1184            if patch_directives.contains_key(&request.directive_index) {
1185                patch_requests.push(request);
1186            } else {
1187                write_requests.push(request);
1188            }
1189        }
1190
1191        if !write_requests.is_empty() {
1192            let extraction_started = Instant::now();
1193            let native_result =
1194                extract_trusted_archive_requests(trusted_archive.clone(), write_requests.clone())
1195                    .await;
1196            extraction_ms += extraction_started.elapsed().as_millis();
1197            match native_result {
1198                Ok(_) => {
1199                    results.extend(
1200                        write_requests
1201                            .iter()
1202                            .map(|request| (request.directive_index, Ok(()))),
1203                    );
1204                }
1205                Err(e) => {
1206                    let msg = format!("{e:#}");
1207                    results.extend(write_requests.iter().map(|request| {
1208                        (
1209                            request.directive_index,
1210                            Err(anyhow::anyhow!("archive batch extraction failed: {msg}")),
1211                        )
1212                    }));
1213                }
1214            }
1215        }
1216
1217        let mut nested_patch_groups: HashMap<String, Vec<ArchiveRequest>> = HashMap::new();
1218        let mut plain_patch_requests = Vec::new();
1219        for request in patch_requests {
1220            if request.inner_path.is_some() {
1221                nested_patch_groups
1222                    .entry(normalize_path(&request.from).to_lowercase())
1223                    .or_default()
1224                    .push(request);
1225            } else {
1226                plain_patch_requests.push(request);
1227            }
1228        }
1229
1230        let patch_chunk_size = archive_patch_chunk_size();
1231        for chunk in plain_patch_requests.chunks(patch_chunk_size) {
1232            let chunk_requests = chunk.to_vec();
1233            let extraction_started = Instant::now();
1234            let native_result =
1235                extract_trusted_archive_requests(trusted_archive.clone(), chunk_requests.clone())
1236                    .await;
1237            extraction_ms += extraction_started.elapsed().as_millis();
1238            let mut output = match native_result {
1239                Ok(output) => output,
1240                Err(e) => {
1241                    let msg = format!("{e:#}");
1242                    results.extend(chunk_requests.iter().map(|request| {
1243                        (
1244                            request.directive_index,
1245                            Err(anyhow::anyhow!("archive batch extraction failed: {msg}")),
1246                        )
1247                    }));
1248                    continue;
1249                }
1250            };
1251
1252            for request in chunk_requests {
1253                if let Err(e) = self.check_diagnostics_abort() {
1254                    results.push((request.directive_index, Err(e)));
1255                    continue;
1256                }
1257                if let Some((archive_hash, from, inner_path, to, patch_id, size)) =
1258                    patch_directives.remove(&request.directive_index)
1259                {
1260                    let Some(inline_source) = inline_source else {
1261                        results.push((
1262                            request.directive_index,
1263                            Err(anyhow::anyhow!("inline source was not initialized")),
1264                        ));
1265                        continue;
1266                    };
1267                    let Some(bytes) = output.bytes.remove(&request.directive_index) else {
1268                        results.push((
1269                            request.directive_index,
1270                            Err(anyhow::anyhow!(
1271                                "patch source '{}' missing from extractor output",
1272                                inner_path.as_deref().unwrap_or(&from)
1273                            )),
1274                        ));
1275                        continue;
1276                    };
1277                    extracted_patch_source_bytes += bytes.len() as u64;
1278                    let bytes = self.cache_patch_source(archive_hash, &from, Bytes::from(bytes));
1279                    let patch_started = Instant::now();
1280                    results.push((
1281                        request.directive_index,
1282                        self.write_patched_output(inline_source, bytes, &to, &patch_id, size)
1283                            .await,
1284                    ));
1285                    patch_ms += patch_started.elapsed().as_millis();
1286                } else {
1287                    results.push((request.directive_index, Ok(())));
1288                }
1289            }
1290        }
1291
1292        for group_requests in nested_patch_groups.into_values() {
1293            let Some(first_request) = group_requests.first() else {
1294                continue;
1295            };
1296            let outer_directive_index = first_request.directive_index;
1297            let outer_from = first_request.from.clone();
1298            let outer_request = ArchiveRequest {
1299                directive_index: outer_directive_index,
1300                from: outer_from.clone(),
1301                inner_path: None,
1302                kind: ArchiveRequestKind::Bytes,
1303            };
1304            let extraction_started = Instant::now();
1305            let outer_result =
1306                extract_trusted_archive_requests(trusted_archive.clone(), vec![outer_request])
1307                    .await;
1308            extraction_ms += extraction_started.elapsed().as_millis();
1309            let outer_bytes = match outer_result {
1310                Ok(mut output) => {
1311                    if let Some(bytes) = output.bytes.remove(&outer_directive_index) {
1312                        Bytes::from(bytes)
1313                    } else {
1314                        let msg = format!("nested archive '{outer_from}' missing from output");
1315                        results.extend(group_requests.iter().map(|request| {
1316                            (request.directive_index, Err(anyhow::anyhow!(msg.clone())))
1317                        }));
1318                        continue;
1319                    }
1320                }
1321                Err(e) => {
1322                    let msg = format!("{e:#}");
1323                    results.extend(group_requests.iter().map(|request| {
1324                        (
1325                            request.directive_index,
1326                            Err(anyhow::anyhow!(
1327                                "nested archive extraction failed for '{outer_from}': {msg}"
1328                            )),
1329                        )
1330                    }));
1331                    continue;
1332                }
1333            };
1334
1335            let inner_requests = group_requests
1336                .iter()
1337                .filter_map(|request| {
1338                    Some(ArchiveRequest {
1339                        directive_index: request.directive_index,
1340                        from: request.inner_path.clone()?,
1341                        inner_path: None,
1342                        kind: ArchiveRequestKind::Bytes,
1343                    })
1344                })
1345                .collect::<Vec<_>>();
1346
1347            let nested_temp =
1348                if outer_bytes.starts_with(b"BSA\0") || outer_bytes.starts_with(b"BTDX") {
1349                    let temp_result = (|| -> Result<tempfile::NamedTempFile> {
1350                        let mut temp = tempfile::NamedTempFile::new().with_context(|| {
1351                            format!("failed to create temp nested archive for {outer_from}")
1352                        })?;
1353                        std::io::Write::write_all(&mut temp, &outer_bytes).with_context(|| {
1354                            format!("failed to write temp nested archive for {outer_from}")
1355                        })?;
1356                        std::io::Write::flush(&mut temp).with_context(|| {
1357                            format!("failed to flush temp nested archive for {outer_from}")
1358                        })?;
1359                        Ok(temp)
1360                    })();
1361                    let Ok(temp) = temp_result else {
1362                        let msg = format!("{:#}", temp_result.unwrap_err());
1363                        results.extend(group_requests.iter().map(|request| {
1364                            (
1365                                request.directive_index,
1366                                Err(anyhow::anyhow!(
1367                                    "failed to stage nested archive '{outer_from}': {msg}"
1368                                )),
1369                            )
1370                        }));
1371                        continue;
1372                    };
1373                    Some(temp)
1374                } else {
1375                    None
1376                };
1377
1378            for chunk in inner_requests.chunks(patch_chunk_size) {
1379                let chunk_requests = chunk.to_vec();
1380                let extraction_started = Instant::now();
1381                let native_result = if let Some(temp) = &nested_temp {
1382                    extract_archive_path_requests(temp.path().to_path_buf(), chunk_requests.clone())
1383                        .await
1384                } else {
1385                    extract_nested_archive_requests(
1386                        outer_from.clone(),
1387                        outer_bytes.clone(),
1388                        chunk_requests.clone(),
1389                    )
1390                    .await
1391                };
1392                extraction_ms += extraction_started.elapsed().as_millis();
1393                let mut output = match native_result {
1394                    Ok(output) => output,
1395                    Err(e) => {
1396                        let msg = format!("{e:#}");
1397                        results.extend(chunk_requests.iter().map(|request| {
1398                            (
1399                                request.directive_index,
1400                                Err(anyhow::anyhow!(
1401                                    "nested archive batch extraction failed: {msg}"
1402                                )),
1403                            )
1404                        }));
1405                        continue;
1406                    }
1407                };
1408
1409                for request in chunk_requests {
1410                    if let Err(e) = self.check_diagnostics_abort() {
1411                        results.push((request.directive_index, Err(e)));
1412                        continue;
1413                    }
1414                    if let Some((archive_hash, from, inner_path, to, patch_id, size)) =
1415                        patch_directives.remove(&request.directive_index)
1416                    {
1417                        let Some(inline_source) = inline_source else {
1418                            results.push((
1419                                request.directive_index,
1420                                Err(anyhow::anyhow!("inline source was not initialized")),
1421                            ));
1422                            continue;
1423                        };
1424                        let Some(bytes) = output.bytes.remove(&request.directive_index) else {
1425                            results.push((
1426                                request.directive_index,
1427                                Err(anyhow::anyhow!(
1428                                    "patch source '{}' missing from nested extractor output",
1429                                    inner_path.as_deref().unwrap_or(&from)
1430                                )),
1431                            ));
1432                            continue;
1433                        };
1434                        extracted_patch_source_bytes += bytes.len() as u64;
1435                        let cache_key = inner_path.as_deref().unwrap_or(&from);
1436                        let bytes =
1437                            self.cache_patch_source(archive_hash, cache_key, Bytes::from(bytes));
1438                        let patch_started = Instant::now();
1439                        results.push((
1440                            request.directive_index,
1441                            self.write_patched_output(inline_source, bytes, &to, &patch_id, size)
1442                                .await,
1443                        ));
1444                        patch_ms += patch_started.elapsed().as_millis();
1445                    }
1446                }
1447            }
1448        }
1449
1450        if results.iter().all(|(_, result)| result.is_ok())
1451            && let Err(e) = self.write_archive_batch_sentinel(&batch).await
1452        {
1453            let msg = format!("{e:#}");
1454            return results
1455                .into_iter()
1456                .map(|(idx, result)| {
1457                    if result.is_ok() {
1458                        (
1459                            idx,
1460                            Err(anyhow::anyhow!(
1461                                "failed to write archive batch sentinel: {msg}"
1462                            )),
1463                        )
1464                    } else {
1465                        (idx, result)
1466                    }
1467                })
1468                .collect();
1469        }
1470        if results.iter().all(|(_, result)| result.is_ok())
1471            && let Some(diagnostics) = &self.diagnostics
1472        {
1473            diagnostics.record_progress(ProgressEvent::ArchiveBatchComplete);
1474        }
1475        if results.iter().all(|(_, result)| result.is_ok()) {
1476            let prune_started = Instant::now();
1477            match self.maybe_prune_archive(&batch).await {
1478                Ok(bytes) => pruned_bytes = bytes,
1479                Err(e) => warn!(
1480                    archive_hash = %format!("{:016x}", batch.archive_hash),
1481                    "failed to prune applied archive: {e:#}"
1482                ),
1483            }
1484            prune_ms = prune_started.elapsed().as_millis();
1485        }
1486        self.write_archive_batch_metrics(
1487            &batch,
1488            patch_count,
1489            batch_started,
1490            trust_check_ms,
1491            extraction_ms,
1492            patch_ms,
1493            prune_ms,
1494            extracted_patch_source_bytes,
1495            trust_stats,
1496            pruned_bytes,
1497            byte_cache_used_before,
1498            process_before,
1499            &results,
1500        )
1501        .await;
1502
1503        results
1504    }
1505
1506    async fn write_archive_batch_metrics(
1507        &self,
1508        batch: &ArchiveInstallBatch,
1509        patch_count: usize,
1510        batch_started: Instant,
1511        trust_check_ms: u128,
1512        extraction_ms: u128,
1513        patch_ms: u128,
1514        prune_ms: u128,
1515        extracted_patch_source_bytes: u64,
1516        trust_stats: ArchiveTrustStats,
1517        pruned_bytes: u64,
1518        byte_cache_used_before: u64,
1519        process_before: ProcessSnapshot,
1520        results: &[(usize, Result<()>)],
1521    ) {
1522        let Some(diagnostics) = &self.diagnostics else {
1523            return;
1524        };
1525        let process_after = current_process_snapshot();
1526        let record = ArchiveBatchRecord {
1527            kind: "archive_batch".to_string(),
1528            unix_ms: std::time::SystemTime::now()
1529                .duration_since(std::time::UNIX_EPOCH)
1530                .unwrap_or_default()
1531                .as_millis(),
1532            archive_hash: format!("{:016x}", batch.archive_hash),
1533            archive_size_bytes: batch.archive_size_bytes,
1534            directive_count: batch.directives.len(),
1535            patch_count,
1536            elapsed_ms: batch_started.elapsed().as_millis(),
1537            trust_check_ms,
1538            extraction_ms,
1539            patch_ms,
1540            prune_ms,
1541            extracted_patch_source_bytes,
1542            sidecar_hit: trust_stats.sidecar_hit,
1543            streamed_hash_bytes: trust_stats.streamed_hash_bytes,
1544            memory_archive_hit: trust_stats.memory_archive_hit,
1545            disk_archive_fallback: trust_stats.disk_fallback,
1546            pruned_bytes,
1547            byte_cache_used_before,
1548            byte_cache_used_after: self.byte_cache.bytes_used(),
1549            success_count: results.iter().filter(|(_, result)| result.is_ok()).count(),
1550            error_count: results.iter().filter(|(_, result)| result.is_err()).count(),
1551            first_error: results
1552                .iter()
1553                .find_map(|(_, result)| result.as_ref().err().map(|error| format!("{error:#}"))),
1554            rss_before_kib: process_before.vm_rss_kib,
1555            rss_after_kib: process_after.vm_rss_kib,
1556            swap_before_kib: process_before.vm_swap_kib,
1557            swap_after_kib: process_after.vm_swap_kib,
1558        };
1559        if let Err(e) = diagnostics.record_archive_batch(&record).await {
1560            warn!("failed to write Wabbajack archive batch diagnostics: {e:#}");
1561        }
1562    }
1563
1564    async fn adopt_existing_staging(
1565        &self,
1566        archive_batches: &[ArchiveInstallBatch],
1567        installs: &[InstallDirective],
1568    ) -> Result<StagingAdoptionSummary> {
1569        let mut summary = StagingAdoptionSummary::default();
1570
1571        for batch in archive_batches {
1572            if self.archive_batch_sentinel_valid(batch).await {
1573                continue;
1574            }
1575            if self.archive_batch_outputs_exist(batch).await {
1576                self.write_archive_batch_sentinel(batch).await?;
1577                summary.archive_batches += 1;
1578            }
1579        }
1580
1581        for (directive_index, directive) in installs.iter().enumerate() {
1582            let InstallDirective::CreateBSA { temp_id, to, .. } = directive else {
1583                continue;
1584            };
1585            if self
1586                .create_bsa_sentinel_valid(directive_index, temp_id, to)
1587                .await
1588            {
1589                continue;
1590            }
1591            if StagingStore::new(&self.staging_dir)
1592                .logical_exists(to)
1593                .await
1594            {
1595                self.write_create_bsa_sentinel(directive_index, temp_id, to)
1596                    .await?;
1597                summary.create_bsa += 1;
1598            }
1599        }
1600
1601        Ok(summary)
1602    }
1603
1604    async fn archive_batch_sentinel_valid(&self, batch: &ArchiveInstallBatch) -> bool {
1605        let path = self.archive_batch_sentinel_path(batch.archive_hash);
1606        let Ok(data) = tokio::fs::read_to_string(&path).await else {
1607            return false;
1608        };
1609        let Ok(sentinel) = serde_json::from_str::<ArchiveBatchSentinel>(&data) else {
1610            return false;
1611        };
1612        let expected_indices = batch
1613            .directives
1614            .iter()
1615            .map(|d| d.directive_index)
1616            .collect::<Vec<_>>();
1617        sentinel.pipeline_version == APPLY_STATE_VERSION
1618            && sentinel.archive_hash == batch.archive_hash
1619            && sentinel.archive_size_bytes == batch.archive_size_bytes
1620            && sentinel.directive_indices == expected_indices
1621            && self.archive_batch_outputs_exist(batch).await
1622    }
1623
1624    async fn archive_batch_outputs_exist(&self, batch: &ArchiveInstallBatch) -> bool {
1625        let staging_store = StagingStore::new(&self.staging_dir);
1626        for indexed in &batch.directives {
1627            let to = match &indexed.directive {
1628                InstallDirective::FromArchive { to, .. }
1629                | InstallDirective::PatchedFromArchive { to, .. } => to,
1630                _ => continue,
1631            };
1632            if !staging_store.logical_exists(to).await {
1633                return false;
1634            }
1635        }
1636        true
1637    }
1638
1639    async fn write_archive_batch_sentinel(&self, batch: &ArchiveInstallBatch) -> Result<()> {
1640        let path = self.archive_batch_sentinel_path(batch.archive_hash);
1641        if let Some(parent) = path.parent() {
1642            tokio::fs::create_dir_all(parent).await?;
1643        }
1644        let sentinel = ArchiveBatchSentinel {
1645            pipeline_version: APPLY_STATE_VERSION,
1646            archive_hash: batch.archive_hash,
1647            archive_size_bytes: batch.archive_size_bytes,
1648            directive_indices: batch.directives.iter().map(|d| d.directive_index).collect(),
1649        };
1650        tokio::fs::write(&path, serde_json::to_vec_pretty(&sentinel)?).await?;
1651        Ok(())
1652    }
1653
1654    fn archive_batch_sentinel_path(&self, archive_hash: u64) -> PathBuf {
1655        self.staging_dir
1656            .join("_state/archive-batches")
1657            .join(format!("{archive_hash:016x}.json"))
1658    }
1659
1660    async fn create_bsa_sentinel_valid(
1661        &self,
1662        directive_index: usize,
1663        temp_id: &str,
1664        to: &str,
1665    ) -> bool {
1666        let path = self.create_bsa_sentinel_path(directive_index);
1667        let Ok(data) = tokio::fs::read_to_string(&path).await else {
1668            return false;
1669        };
1670        let Ok(sentinel) = serde_json::from_str::<CreateBsaSentinel>(&data) else {
1671            return false;
1672        };
1673        sentinel.pipeline_version == APPLY_STATE_VERSION
1674            && sentinel.directive_index == directive_index
1675            && sentinel.temp_id == temp_id
1676            && sentinel.to == to
1677            && StagingStore::new(&self.staging_dir)
1678                .logical_exists(to)
1679                .await
1680    }
1681
1682    async fn write_create_bsa_sentinel(
1683        &self,
1684        directive_index: usize,
1685        temp_id: &str,
1686        to: &str,
1687    ) -> Result<()> {
1688        let path = self.create_bsa_sentinel_path(directive_index);
1689        if let Some(parent) = path.parent() {
1690            tokio::fs::create_dir_all(parent).await?;
1691        }
1692        let sentinel = CreateBsaSentinel {
1693            pipeline_version: APPLY_STATE_VERSION,
1694            directive_index,
1695            temp_id: temp_id.to_string(),
1696            to: to.to_string(),
1697        };
1698        tokio::fs::write(&path, serde_json::to_vec_pretty(&sentinel)?).await?;
1699        Ok(())
1700    }
1701
1702    fn create_bsa_sentinel_path(&self, directive_index: usize) -> PathBuf {
1703        self.staging_dir
1704            .join("_state/create-bsa")
1705            .join(format!("{directive_index}.json"))
1706    }
1707
1708    /// Extract inline file data from the `.wabbajack` zip and write to staging.
1709    async fn apply_inline_file(
1710        &self,
1711        inline_source: &InlineSource,
1712        source_data_id: &str,
1713        to: &str,
1714    ) -> Result<()> {
1715        let staging_store = StagingStore::new(&self.staging_dir);
1716
1717        // Validate the output path against traversal attacks
1718        validate_archive_entry(to)?;
1719
1720        let data = inline_source.read(source_data_id)?;
1721        let output_path = staging_store.write_path_for_logical(to, Some(data.len() as u64));
1722
1723        if let Some(parent) = output_path.parent() {
1724            tokio::fs::create_dir_all(parent).await?;
1725        }
1726
1727        write_bytes_maybe_zstd(&output_path, &data).await?;
1728
1729        info!(source_data_id = %source_data_id, to = %to, "wrote inline file");
1730        Ok(())
1731    }
1732
1733    /// Extract a file from archive, apply a binary delta patch, and write to staging.
1734    async fn apply_patched_from_archive(
1735        &self,
1736        inline_source: &InlineSource,
1737        archive_hash: u64,
1738        from: &str,
1739        to: &str,
1740        patch_id: &str,
1741        expected_size: u64,
1742    ) -> Result<()> {
1743        // Extract source file from a downloaded archive or local game-file source.
1744        let source_data = self
1745            .read_archive_source_cached(archive_hash, from)
1746            .await
1747            .with_context(|| {
1748                format!("failed to extract '{from}' from archive {archive_hash:016x} for patching")
1749            })?;
1750
1751        self.write_patched_output(inline_source, source_data, to, patch_id, expected_size)
1752            .await
1753    }
1754
1755    async fn write_patched_output(
1756        &self,
1757        inline_source: &InlineSource,
1758        source_data: Bytes,
1759        to: &str,
1760        patch_id: &str,
1761        expected_size: u64,
1762    ) -> Result<()> {
1763        // Validate the output path against traversal attacks
1764        validate_archive_entry(to)?;
1765
1766        // Patched outputs can expand far beyond the source size. Keep the hot
1767        // patch path plain and let the compatibility compression sweep handle
1768        // eligible files later with bounded worker controls.
1769        let output_path = self.staging_dir.join(normalize_path(to));
1770        let part_path = output_path.with_extension(format!(
1771            "{}modde-part",
1772            output_path
1773                .extension()
1774                .and_then(|extension| extension.to_str())
1775                .map(|extension| format!("{extension}."))
1776                .unwrap_or_default()
1777        ));
1778        let stale_compressed = super::staging::compressed_path(&output_path);
1779
1780        if let Some(parent) = output_path.parent() {
1781            tokio::fs::create_dir_all(parent).await?;
1782        }
1783        if tokio::fs::metadata(&stale_compressed).await.is_ok() {
1784            tokio::fs::remove_file(&stale_compressed).await?;
1785        }
1786        if tokio::fs::metadata(&part_path).await.is_ok() {
1787            tokio::fs::remove_file(&part_path).await?;
1788        }
1789
1790        let patch_data = inline_source.read(patch_id)?;
1791        let _patch_guard = self.diagnostics.as_ref().map(|diagnostics| {
1792            diagnostics.start_patch(
1793                to.to_string(),
1794                patch_id.to_string(),
1795                source_data.len() as u64,
1796                expected_size,
1797            )
1798        });
1799
1800        let to_owned = to.to_string();
1801        let output_path_for_task = output_path.clone();
1802        let part_path_for_task = part_path.clone();
1803        let patch_result = tokio::task::spawn_blocking(move || {
1804            let mut output = std::fs::File::create(&part_path_for_task).with_context(|| {
1805                format!(
1806                    "failed to create patched output: {}",
1807                    part_path_for_task.display()
1808                )
1809            })?;
1810            let expected = (expected_size > 0).then_some(expected_size);
1811            let output_bytes = patcher::apply_patch_to_writer_limited(
1812                &source_data,
1813                &patch_data,
1814                &mut output,
1815                expected,
1816            )
1817            .with_context(|| format!("failed to apply patch for: {to_owned}"))?;
1818            output
1819                .sync_all()
1820                .with_context(|| format!("failed to sync patched output: {to_owned}"))?;
1821            drop(output);
1822            std::fs::rename(&part_path_for_task, &output_path_for_task).with_context(|| {
1823                format!(
1824                    "failed to move patched output into place: {}",
1825                    output_path_for_task.display()
1826                )
1827            })?;
1828            Ok::<u64, anyhow::Error>(output_bytes)
1829        })
1830        .await?;
1831        let output_bytes = match patch_result {
1832            Ok(output_bytes) => output_bytes,
1833            Err(e) => {
1834                let _ = tokio::fs::remove_file(&part_path).await;
1835                return Err(e);
1836            }
1837        };
1838        trim_process_allocator();
1839
1840        info!(
1841            to = %to,
1842            patch_id = %patch_id,
1843            expected_size,
1844            output_bytes,
1845            "applied binary patch"
1846        );
1847        Ok(())
1848    }
1849
1850    /// Create a BSA/BA2 archive from file states.
1851    async fn apply_create_bsa(
1852        &self,
1853        temp_id: &str,
1854        to: &str,
1855        file_states: &[modde_core::manifest::wabbajack::BSAFileState],
1856    ) -> Result<()> {
1857        // BSA source files are expected to be in a temp subdirectory of staging
1858        let bsa_staging = self.staging_dir.join(format!("bsa_temp_{temp_id}"));
1859        let output_path = self.staging_dir.join(normalize_path(to));
1860
1861        if let Some(parent) = output_path.parent() {
1862            tokio::fs::create_dir_all(parent).await?;
1863        }
1864
1865        bsa_repack::create_bsa(file_states, &bsa_staging, &output_path)
1866            .await
1867            .with_context(|| format!("failed to create BSA: {to}"))?;
1868
1869        info!(to = %to, files = file_states.len(), "created BSA archive");
1870        Ok(())
1871    }
1872
1873    fn validate_game_file_sources(&self) -> Result<()> {
1874        if !self.manifest.archives.iter().any(is_game_file_archive) {
1875            return Ok(());
1876        }
1877
1878        let Some(game_dir) = &self.game_dir else {
1879            bail!(
1880                "wabbajack modlist references local game files; pass --game-dir so modde can read the installed game files"
1881            );
1882        };
1883
1884        if !game_dir.is_dir() {
1885            bail!(
1886                "game directory for Wabbajack game-file sources does not exist: {}",
1887                game_dir.display()
1888            );
1889        }
1890
1891        Ok(())
1892    }
1893
1894    fn validate_download_sources(&self, downloads: &[DownloadDirective]) -> Result<()> {
1895        let missing: Vec<String> = downloads
1896            .iter()
1897            .filter(|directive| {
1898                !self
1899                    .sources
1900                    .iter()
1901                    .any(|source| source.can_handle(directive))
1902            })
1903            .map(|directive| directive.display_name().into_owned())
1904            .collect();
1905
1906        if !missing.is_empty() {
1907            let shown = missing
1908                .iter()
1909                .take(20)
1910                .map(|name| format!("  - {name}"))
1911                .collect::<Vec<_>>()
1912                .join("\n");
1913            let omitted = missing.len().saturating_sub(20);
1914            let suffix = if omitted == 0 {
1915                String::new()
1916            } else {
1917                format!("\n  ... and {omitted} more")
1918            };
1919            bail!(
1920                "Wabbajack manifest requires {} download(s) with no registered source:\n{}{}\nConfigure the missing source before running the install.",
1921                missing.len(),
1922                shown,
1923                suffix
1924            );
1925        }
1926
1927        Ok(())
1928    }
1929
1930    async fn preflight_authored_files(&self) -> Result<()> {
1931        for source in self.sources.iter() {
1932            if let AnySource::WabbajackCdn(source) = source {
1933                return source.preflight_archives(&self.manifest.archives).await;
1934            }
1935        }
1936        Ok(())
1937    }
1938
1939    async fn verify_game_file_sources(&self) -> Result<()> {
1940        let mut failures = Vec::new();
1941
1942        for archive in self
1943            .manifest
1944            .archives
1945            .iter()
1946            .filter(|a| is_game_file_archive(a))
1947        {
1948            let Some(source) = self.game_file_source_path(archive.hash)? else {
1949                continue;
1950            };
1951
1952            if !source.path.exists() {
1953                failures.push(format!(
1954                    "game-file source '{}' is missing at {}",
1955                    source.rel_path,
1956                    source.path.display()
1957                ));
1958                continue;
1959            }
1960
1961            let actual = modde_core::hash::hash_file_xxh64(&source.path)
1962                .await
1963                .with_context(|| {
1964                    format!("failed to hash game-file source '{}'", source.rel_path)
1965                })?;
1966            if actual != archive.hash {
1967                failures.push(format!(
1968                    "game-file source '{}' failed hash verification (expected xxh64 {:016x}, got {:016x})",
1969                    source.rel_path, archive.hash, actual
1970                ));
1971            }
1972        }
1973
1974        if !failures.is_empty() {
1975            bail!(
1976                "Wabbajack game-file source validation failed for {} file(s):\n{}",
1977                failures.len(),
1978                failures
1979                    .iter()
1980                    .map(|failure| format!("  - {failure}"))
1981                    .collect::<Vec<_>>()
1982                    .join("\n")
1983            );
1984        }
1985
1986        Ok(())
1987    }
1988
1989    async fn read_archive_source(&self, archive_hash: u64, from: &str) -> Result<Vec<u8>> {
1990        if let Some(source) = self.game_file_source_path(archive_hash)? {
1991            return read_game_file_source(&source.path, from).await;
1992        }
1993
1994        let archive_path = archive_path(&self.store_dir, &archive_hash);
1995        let from = from.to_string();
1996        tokio::task::spawn_blocking(move || {
1997            let output = ArchiveBatchExtractor::extract_selected(
1998                &archive_path,
1999                &[ArchiveRequest {
2000                    directive_index: 0,
2001                    from,
2002                    inner_path: None,
2003                    kind: ArchiveRequestKind::Bytes,
2004                }],
2005            )?;
2006            output
2007                .bytes
2008                .get(&0)
2009                .cloned()
2010                .ok_or_else(|| anyhow::anyhow!("archive extractor returned no bytes"))
2011        })
2012        .await?
2013    }
2014
2015    async fn read_archive_source_cached(&self, archive_hash: u64, from: &str) -> Result<Bytes> {
2016        if let Some(bytes) = self.patch_source_from_cache(archive_hash, from) {
2017            return Ok(bytes);
2018        }
2019        let data = self.read_archive_source(archive_hash, from).await?;
2020        Ok(self.cache_patch_source(archive_hash, from, Bytes::from(data)))
2021    }
2022
2023    fn patch_source_from_cache(&self, archive_hash: u64, from: &str) -> Option<Bytes> {
2024        self.byte_cache.get(&ByteCacheKey {
2025            archive_hash,
2026            inner_path: normalize_path(from),
2027        })
2028    }
2029
2030    fn cache_patch_source(&self, archive_hash: u64, from: &str, bytes: Bytes) -> Bytes {
2031        let disable_under_pressure = std::env::var("MODDE_BYTE_CACHE_DISABLE_UNDER_PRESSURE")
2032            .ok()
2033            .and_then(|v| v.parse::<bool>().ok())
2034            .unwrap_or(true);
2035        let pressure_threshold = std::env::var("MODDE_BYTE_CACHE_PRESSURE_FRACTION")
2036            .ok()
2037            .and_then(|v| v.parse::<f64>().ok())
2038            .filter(|&v| v > 0.0 && v <= 1.0)
2039            .unwrap_or(0.80);
2040        if disable_under_pressure && cgroup_memory_pressure_high(pressure_threshold) {
2041            warn!(
2042                archive_hash = %format!("{archive_hash:016x}"),
2043                from = %from,
2044                bytes = bytes.len(),
2045                pressure_threshold,
2046                "skipping patch source byte cache under cgroup memory pressure"
2047            );
2048            return bytes;
2049        }
2050        self.byte_cache.insert(
2051            ByteCacheKey {
2052                archive_hash,
2053                inner_path: normalize_path(from),
2054            },
2055            bytes,
2056        )
2057    }
2058
2059    fn check_diagnostics_abort(&self) -> Result<()> {
2060        if let Some(diagnostics) = &self.diagnostics {
2061            diagnostics.check_abort()?;
2062        }
2063        Ok(())
2064    }
2065
2066    fn game_file_source_path(&self, archive_hash: u64) -> Result<Option<GameFileSourcePath>> {
2067        let Some(archive) = self
2068            .archives_by_hash
2069            .get(&archive_hash)
2070            .map(|&i| &self.manifest.archives[i])
2071        else {
2072            return Ok(None);
2073        };
2074
2075        let Some(state) = archive.state.as_ref() else {
2076            return Ok(None);
2077        };
2078
2079        let rel_path = match state {
2080            ArchiveState::GameFileSourceDownloader { metadata } => state.game_file_path().ok_or_else(|| {
2081                anyhow::anyhow!(
2082                    "game-file source archive '{}' does not contain a recognized file path field; metadata keys: {}",
2083                    archive.name,
2084                    metadata.keys().cloned().collect::<Vec<_>>().join(", ")
2085                )
2086            })?,
2087            _ => return Ok(None),
2088        };
2089
2090        validate_archive_entry(rel_path)?;
2091
2092        let Some(game_dir) = &self.game_dir else {
2093            bail!(
2094                "archive {} is a game-file source, but no game directory was provided",
2095                archive.name
2096            );
2097        };
2098
2099        let path = game_dir.join(normalize_path(rel_path));
2100        if path.exists() {
2101            return Ok(Some(GameFileSourcePath {
2102                rel_path: rel_path.to_string(),
2103                path,
2104            }));
2105        }
2106
2107        Ok(Some(GameFileSourcePath {
2108            rel_path: rel_path.to_string(),
2109            path: find_path_case_insensitive(game_dir, &normalize_path(rel_path)).unwrap_or(path),
2110        }))
2111    }
2112}
2113
2114#[derive(Debug)]
2115struct GameFileSourcePath {
2116    rel_path: String,
2117    path: PathBuf,
2118}
2119
2120fn is_game_file_archive(archive: &modde_core::manifest::wabbajack::ArchiveEntry) -> bool {
2121    matches!(
2122        archive.state.as_ref(),
2123        Some(ArchiveState::GameFileSourceDownloader { .. })
2124    )
2125}
2126
2127/// Validate that an archive entry name does not contain path traversal components
2128/// or represent a symlink entry, preventing zip-slip and symlink attacks.
2129fn validate_archive_entry(name: &str) -> Result<()> {
2130    // Normalize separators for consistent checking
2131    let normalized = name.replace('\\', "/");
2132
2133    // Reject entries with ".." path components (path traversal / zip-slip)
2134    for component in normalized.split('/') {
2135        if component == ".." {
2136            bail!("archive entry contains path traversal: {name}");
2137        }
2138    }
2139
2140    // Reject absolute paths
2141    if normalized.starts_with('/') {
2142        bail!("archive entry contains absolute path: {name}");
2143    }
2144
2145    Ok(())
2146}
2147
2148/// Validate a zip entry, checking for path traversal and symlinks.
2149#[cfg(test)]
2150fn validate_zip_entry<R: std::io::Read + ?Sized>(entry: &zip::read::ZipFile<'_, R>) -> Result<()> {
2151    let name = entry.name();
2152    validate_archive_entry(name)?;
2153
2154    // Reject symlink entries from zip archives
2155    if entry.is_symlink() {
2156        bail!("archive entry is a symlink (rejected for security): {name}");
2157    }
2158
2159    Ok(())
2160}
2161
2162/// Normalize Windows-style backslash paths to forward slashes for Linux.
2163fn normalize_path(path: &str) -> String {
2164    path.replace('\\', "/")
2165}
2166
2167fn game_file_source_is_whole_file(path: &Path, rel_path: &str, from: &str) -> bool {
2168    if from.trim().is_empty() || from == "." {
2169        return true;
2170    }
2171
2172    let normalized_from = normalize_path(from);
2173    let normalized_rel_path = normalize_path(rel_path);
2174    let normalized_path = normalize_path(&path.to_string_lossy());
2175    let path_name = path
2176        .file_name()
2177        .and_then(|name| name.to_str())
2178        .map(normalize_path);
2179
2180    normalized_rel_path.eq_ignore_ascii_case(&normalized_from)
2181        || normalized_path.ends_with(&normalized_from)
2182        || path_name
2183            .as_deref()
2184            .is_some_and(|name| name.eq_ignore_ascii_case(&normalized_from))
2185}
2186
2187async fn read_game_file_source(path: &Path, from: &str) -> Result<Vec<u8>> {
2188    if !from.trim().is_empty() {
2189        validate_archive_entry(from)?;
2190    }
2191
2192    if path.symlink_metadata()?.file_type().is_symlink() {
2193        bail!(
2194            "game-file source is a symlink (rejected for security): {}",
2195            path.display()
2196        );
2197    }
2198
2199    if from.trim().is_empty() || from == "." {
2200        return tokio::fs::read(path)
2201            .await
2202            .with_context(|| format!("failed to read game-file source: {}", path.display()));
2203    }
2204
2205    if game_file_source_is_whole_file(path, &path.to_string_lossy(), from) {
2206        return tokio::fs::read(path)
2207            .await
2208            .with_context(|| format!("failed to read game-file source: {}", path.display()));
2209    }
2210
2211    let archive_path = path.to_path_buf();
2212    let from = from.to_string();
2213    tokio::task::spawn_blocking(move || {
2214        let output = ArchiveBatchExtractor::extract_selected(
2215            &archive_path,
2216            &[ArchiveRequest {
2217                directive_index: 0,
2218                from,
2219                inner_path: None,
2220                kind: ArchiveRequestKind::Bytes,
2221            }],
2222        )?;
2223        output
2224            .bytes
2225            .get(&0)
2226            .cloned()
2227            .ok_or_else(|| anyhow::anyhow!("archive extractor returned no bytes"))
2228    })
2229    .await?
2230}
2231
2232/// Compute the storage path for an archive by its hash.
2233pub(crate) fn archive_path(store_dir: &Path, hash: &u64) -> PathBuf {
2234    store_dir.join(format!("{hash:016x}.archive"))
2235}
2236
2237fn verified_sidecar_path(path: &Path) -> PathBuf {
2238    let mut sidecar = path.as_os_str().to_os_string();
2239    sidecar.push(".verified.json");
2240    PathBuf::from(sidecar)
2241}
2242
2243fn metadata_modified_unix_ms(metadata: &std::fs::Metadata) -> u128 {
2244    metadata
2245        .modified()
2246        .ok()
2247        .and_then(|time| time.duration_since(UNIX_EPOCH).ok())
2248        .map(|duration| duration.as_millis())
2249        .unwrap_or(0)
2250}
2251
2252fn unix_ms() -> u128 {
2253    SystemTime::now()
2254        .duration_since(UNIX_EPOCH)
2255        .unwrap_or_default()
2256        .as_millis()
2257}
2258
2259async fn archive_path_is_memory_supported(path: &Path) -> bool {
2260    let Ok(mut file) = tokio::fs::File::open(path).await else {
2261        return false;
2262    };
2263    let mut magic = [0_u8; 8];
2264    let Ok(read) = tokio::io::AsyncReadExt::read(&mut file, &mut magic).await else {
2265        return false;
2266    };
2267    let prefix = &magic[..read];
2268    prefix.starts_with(b"PK") || prefix.starts_with(&[0x37, 0x7a, 0xbc, 0xaf, 0x27, 0x1c])
2269}
2270
2271async fn extract_trusted_archive_requests(
2272    trusted_archive: Option<TrustedArchive>,
2273    requests: Vec<ArchiveRequest>,
2274) -> Result<crate::decompress::ArchiveBatchOutput> {
2275    let native_result = tokio::task::spawn_blocking(move || match trusted_archive {
2276        Some(TrustedArchive::Path(path)) => ArchiveBatchExtractor::extract_selected(&path, &requests),
2277        Some(TrustedArchive::Bytes {
2278            label,
2279            bytes,
2280            fallback_path,
2281        }) => {
2282            let memory_result = ArchiveBatchExtractor::extract_selected_from(
2283                ArchiveInput::Bytes {
2284                    name: &label,
2285                    bytes: &bytes,
2286                },
2287                &requests,
2288            );
2289            match memory_result {
2290                Ok(output) => Ok(output),
2291                Err(memory_error) if is_unsupported_in_memory_archive(&memory_error) => {
2292                    warn!(
2293                        archive = %label,
2294                        "in-memory archive extraction failed, falling back to disk: {memory_error:#}"
2295                    );
2296                    ArchiveBatchExtractor::extract_selected(&fallback_path, &requests)
2297                }
2298                Err(memory_error) => Err(memory_error),
2299            }
2300        }
2301        None => unreachable!("native requests require a trusted archive"),
2302    })
2303    .await;
2304    trim_process_allocator();
2305
2306    match native_result {
2307        Ok(result) => result,
2308        Err(e) => Err(anyhow::anyhow!("archive extraction task failed: {e:#}")),
2309    }
2310}
2311
2312async fn extract_archive_path_requests(
2313    path: PathBuf,
2314    requests: Vec<ArchiveRequest>,
2315) -> Result<crate::decompress::ArchiveBatchOutput> {
2316    let native_result = tokio::task::spawn_blocking(move || {
2317        ArchiveBatchExtractor::extract_selected(&path, &requests)
2318    })
2319    .await;
2320    trim_process_allocator();
2321
2322    match native_result {
2323        Ok(result) => result,
2324        Err(e) => Err(anyhow::anyhow!("archive extraction task failed: {e:#}")),
2325    }
2326}
2327
2328async fn extract_nested_archive_requests(
2329    label: String,
2330    bytes: Bytes,
2331    requests: Vec<ArchiveRequest>,
2332) -> Result<crate::decompress::ArchiveBatchOutput> {
2333    let native_result = tokio::task::spawn_blocking(move || {
2334        if bytes.starts_with(b"BSA\0") || bytes.starts_with(b"BTDX") {
2335            let mut temp = tempfile::NamedTempFile::new()?;
2336            std::io::Write::write_all(&mut temp, &bytes)?;
2337            std::io::Write::flush(&mut temp)?;
2338            ArchiveBatchExtractor::extract_selected(temp.path(), &requests)
2339        } else {
2340            ArchiveBatchExtractor::extract_selected_from(
2341                ArchiveInput::Bytes {
2342                    name: &label,
2343                    bytes: &bytes,
2344                },
2345                &requests,
2346            )
2347        }
2348    })
2349    .await;
2350    trim_process_allocator();
2351
2352    match native_result {
2353        Ok(result) => result,
2354        Err(e) => Err(anyhow::anyhow!(
2355            "nested archive extraction task failed: {e:#}"
2356        )),
2357    }
2358}
2359
2360fn is_unsupported_in_memory_archive(error: &anyhow::Error) -> bool {
2361    format!("{error:#}").contains("unsupported in-memory archive format")
2362}
2363
2364fn archive_patch_chunk_size() -> usize {
2365    std::env::var("MODDE_ARCHIVE_PATCH_CHUNK_SIZE")
2366        .ok()
2367        .and_then(|value| value.parse().ok())
2368        .filter(|&value| value > 0)
2369        .unwrap_or(8)
2370}
2371
2372async fn write_bytes_maybe_zstd(path: &Path, data: &[u8]) -> Result<()> {
2373    if super::staging::is_compressed_path(path) {
2374        let path = path.to_path_buf();
2375        let data = Bytes::copy_from_slice(data);
2376        tokio::task::spawn_blocking(move || -> Result<()> {
2377            let file = std::fs::File::create(&path)
2378                .with_context(|| format!("failed to create {}", path.display()))?;
2379            let mut encoder = zstd::stream::write::Encoder::new(file, 9)?;
2380            std::io::Write::write_all(&mut encoder, &data)?;
2381            encoder.finish()?;
2382            Ok(())
2383        })
2384        .await??;
2385        return Ok(());
2386    }
2387    tokio::fs::write(path, data).await?;
2388    Ok(())
2389}
2390
2391/// Best-effort estimate of the peak resident bytes a directive will need
2392/// during its apply step.
2393///
2394/// Most apply work is dominated by archive operations — either streaming a
2395/// single file out of a native decoder, or feeding a source file plus its
2396/// patch into bsdiff. We intentionally over-count rather than under-count:
2397/// being throttled too aggressively just slows the install, while admitting
2398/// a worker that the host can't satisfy crashes it.
2399/// Apply-weight tunables for per-directive weighting.
2400///
2401/// Read once from the environment before the apply fan-out so that
2402/// `estimate_directive_weight` does not touch `std::env` per directive.
2403#[derive(Debug, Clone, Copy)]
2404struct DirectiveWeights {
2405    from_archive_factor: f64,
2406    patched_factor: f64,
2407    create_bsa_floor_mb: u64,
2408}
2409
2410impl DirectiveWeights {
2411    /// Read the tunables from the environment (site operators can dial without rebuilding).
2412    fn from_env() -> Self {
2413        let from_archive_factor: f64 = std::env::var("MODDE_APPLY_WEIGHT_FROM_ARCHIVE_FACTOR")
2414            .ok()
2415            .and_then(|v| v.parse().ok())
2416            .filter(|&v: &f64| v > 0.0)
2417            .unwrap_or(0.5);
2418        let patched_factor: f64 = std::env::var("MODDE_APPLY_WEIGHT_PATCHED_FACTOR")
2419            .ok()
2420            .and_then(|v| v.parse().ok())
2421            .filter(|&v: &f64| v > 0.0)
2422            .unwrap_or(3.0);
2423        let create_bsa_floor_mb: u64 = std::env::var("MODDE_APPLY_WEIGHT_BSA_FLOOR_MB")
2424            .ok()
2425            .and_then(|v| v.parse().ok())
2426            .unwrap_or(64);
2427        Self {
2428            from_archive_factor,
2429            patched_factor,
2430            create_bsa_floor_mb,
2431        }
2432    }
2433}
2434
2435/// Apply-weight tunables for per-archive-batch weighting.
2436///
2437/// Read once from the environment before the apply fan-out so that
2438/// `estimate_archive_batch_weight` does not touch `std::env` per batch.
2439#[derive(Debug, Clone, Copy)]
2440struct ArchiveBatchWeights {
2441    patched_factor: f64,
2442    archive_factor: f64,
2443}
2444
2445impl ArchiveBatchWeights {
2446    /// Read the tunables from the environment (site operators can dial without rebuilding).
2447    fn from_env() -> Self {
2448        let patched_factor: f64 = std::env::var("MODDE_APPLY_WEIGHT_PATCHED_BATCH_FACTOR")
2449            .ok()
2450            .and_then(|v| v.parse::<f64>().ok())
2451            .filter(|&v| v > 0.0)
2452            .unwrap_or(1.0);
2453        let archive_factor: f64 = std::env::var("MODDE_APPLY_WEIGHT_ARCHIVE_BATCH_FACTOR")
2454            .ok()
2455            .and_then(|v| v.parse::<f64>().ok())
2456            .filter(|&v| v > 0.0)
2457            .unwrap_or(0.5);
2458        Self {
2459            patched_factor,
2460            archive_factor,
2461        }
2462    }
2463}
2464
2465fn estimate_directive_weight(
2466    directive: &InstallDirective,
2467    archive_size_by_hash: &HashMap<u64, u64>,
2468    weights: &DirectiveWeights,
2469) -> u64 {
2470    const FLOOR_BYTES: u64 = 8 * 1024 * 1024; // 8 MiB
2471
2472    let raw = match directive {
2473        InstallDirective::InlineFile { .. } => FLOOR_BYTES,
2474        InstallDirective::FromArchive { archive_hash, .. } => {
2475            let size = archive_size_by_hash.get(archive_hash).copied().unwrap_or(0);
2476            ((size as f64) * weights.from_archive_factor) as u64
2477        }
2478        InstallDirective::PatchedFromArchive { archive_hash, .. } => {
2479            let size = archive_size_by_hash.get(archive_hash).copied().unwrap_or(0);
2480            ((size as f64) * weights.patched_factor) as u64
2481        }
2482        InstallDirective::CreateBSA { file_states, .. } => file_states
2483            .iter()
2484            .map(|fs| fs.size)
2485            .sum::<u64>()
2486            .max(weights.create_bsa_floor_mb * 1024 * 1024),
2487    };
2488
2489    raw.max(FLOOR_BYTES)
2490}
2491
2492fn estimate_archive_batch_weight(
2493    batch: &ArchiveInstallBatch,
2494    weights: &ArchiveBatchWeights,
2495) -> u64 {
2496    const FLOOR_BYTES: u64 = 8 * 1024 * 1024;
2497    let has_patch = archive_batch_has_patch(batch);
2498    let factor = if has_patch {
2499        weights.patched_factor
2500    } else {
2501        weights.archive_factor
2502    };
2503    (((batch.archive_size_bytes as f64) * factor) as u64).max(FLOOR_BYTES)
2504}
2505
2506fn archive_batch_has_patch(batch: &ArchiveInstallBatch) -> bool {
2507    batch.directives.iter().any(|directive| {
2508        matches!(
2509            &directive.directive,
2510            InstallDirective::PatchedFromArchive { .. }
2511        )
2512    })
2513}
2514
2515#[cfg(all(unix, target_env = "gnu"))]
2516fn trim_process_allocator() {
2517    // Native decoders can transiently allocate very large buffers for solid
2518    // archives. glibc often keeps those arenas mapped, which makes the next
2519    // batch look resident even after Rust values were dropped.
2520    unsafe {
2521        libc::malloc_trim(0);
2522    }
2523}
2524
2525#[cfg(not(all(unix, target_env = "gnu")))]
2526fn trim_process_allocator() {}
2527
2528/// Find a path by case-insensitive path matching in a directory tree.
2529fn find_path_case_insensitive(base: &Path, relative_path: &str) -> Result<PathBuf> {
2530    validate_archive_entry(relative_path)?;
2531
2532    let parts: Vec<&str> = relative_path.split('/').collect();
2533    let mut current = base.to_path_buf();
2534
2535    for part in &parts {
2536        let target_lower = part.to_lowercase();
2537        let mut found = false;
2538
2539        for entry in std::fs::read_dir(&current)
2540            .with_context(|| format!("failed to read dir: {}", current.display()))?
2541        {
2542            let entry = entry?;
2543            if entry.file_name().to_string_lossy().to_lowercase() == target_lower {
2544                current = entry.path();
2545                // Reject symlinks in intermediate path components
2546                if current.symlink_metadata()?.file_type().is_symlink() {
2547                    anyhow::bail!("path component is a symlink (rejected for security): {part}");
2548                }
2549                found = true;
2550                break;
2551            }
2552        }
2553
2554        if !found {
2555            anyhow::bail!(
2556                "path component '{}' not found in {}",
2557                part,
2558                current.display()
2559            );
2560        }
2561    }
2562
2563    Ok(current)
2564}
2565
2566/// Extract a file from a zip archive.
2567#[cfg(test)]
2568fn extract_from_zip(archive_path: &Path, inner_path: &str) -> Result<Vec<u8>> {
2569    let file = std::fs::File::open(archive_path)
2570        .with_context(|| format!("failed to open archive: {}", archive_path.display()))?;
2571
2572    let mut archive = zip::ZipArchive::new(file)
2573        .with_context(|| format!("failed to read zip archive: {}", archive_path.display()))?;
2574
2575    let entry_name = find_entry_in_archive(&archive, inner_path).with_context(|| {
2576        format!(
2577            "file '{}' not found in archive {}",
2578            inner_path,
2579            archive_path.display()
2580        )
2581    })?;
2582
2583    let mut entry = archive.by_name(&entry_name)?;
2584    validate_zip_entry(&entry)?;
2585    let mut data = Vec::with_capacity(entry.size() as usize);
2586    std::io::Read::read_to_end(&mut entry, &mut data)?;
2587
2588    Ok(data)
2589}
2590
2591/// Find a file entry in a zip archive, trying multiple path formats.
2592#[cfg(test)]
2593fn find_entry_in_archive(archive: &zip::ZipArchive<std::fs::File>, path: &str) -> Result<String> {
2594    // Normalize separators for comparison
2595    let normalized = path.replace('\\', "/");
2596    let backslash = path.replace('/', "\\");
2597
2598    for i in 0..archive.len() {
2599        let name = archive.name_for_index(i).unwrap_or_default().to_string();
2600        if name == *path || name == normalized || name == backslash {
2601            return Ok(name);
2602        }
2603        // Case-insensitive fallback
2604        let name_lower = name.to_lowercase();
2605        if name_lower == path.to_lowercase()
2606            || name_lower == normalized.to_lowercase()
2607            || name_lower == backslash.to_lowercase()
2608        {
2609            return Ok(name);
2610        }
2611    }
2612
2613    anyhow::bail!("entry not found: {path}");
2614}
2615
2616#[cfg(test)]
2617mod tests {
2618    use super::*;
2619    use crate::wabbajack::staging::{StagingPrepareStatus, StagingStore, compressed_path};
2620
2621    #[test]
2622    fn validate_archive_entry_rejects_traversal_and_absolute() {
2623        // The batch FromArchive path (install_directives) relies on this to block
2624        // zip-slip via a malicious manifest `to` field.
2625        assert!(validate_archive_entry("mods/Foo/textures/x.dds").is_ok());
2626        assert!(validate_archive_entry("../escape.txt").is_err());
2627        assert!(validate_archive_entry("a/b/../../../escape").is_err());
2628        assert!(validate_archive_entry("..\\escape.txt").is_err());
2629        assert!(validate_archive_entry("/etc/passwd").is_err());
2630    }
2631    use std::collections::HashMap;
2632    use std::io::{Read as _, Write as _};
2633    use std::net::TcpListener;
2634    use std::sync::atomic::{AtomicUsize, Ordering};
2635    use std::thread;
2636    use xxhash_rust::xxh64::xxh64;
2637
2638    /// Helper: create a zip file on disk with the given entries.
2639    fn create_zip_file(path: &std::path::Path, entries: &[(&str, &[u8])]) {
2640        let file = std::fs::File::create(path).unwrap();
2641        let mut writer = zip::ZipWriter::new(file);
2642        let options = zip::write::SimpleFileOptions::default()
2643            .compression_method(zip::CompressionMethod::Stored);
2644        for (name, data) in entries {
2645            writer.start_file(*name, options).unwrap();
2646            writer.write_all(data).unwrap();
2647        }
2648        writer.finish().unwrap();
2649    }
2650
2651    fn zip_bytes(entries: &[(&str, &[u8])]) -> Vec<u8> {
2652        let mut cursor = std::io::Cursor::new(Vec::new());
2653        {
2654            let mut writer = zip::ZipWriter::new(&mut cursor);
2655            let options = zip::write::SimpleFileOptions::default()
2656                .compression_method(zip::CompressionMethod::Stored);
2657            for (name, data) in entries {
2658                writer.start_file(*name, options).unwrap();
2659                writer.write_all(data).unwrap();
2660            }
2661            writer.finish().unwrap();
2662        }
2663        cursor.into_inner()
2664    }
2665
2666    fn data_patch(data: &[u8]) -> Vec<u8> {
2667        let mut patch = Vec::new();
2668        patch.extend_from_slice(b"OCTODELTA");
2669        patch.push(1);
2670        patch.push(4);
2671        patch.extend_from_slice(b"SHA1");
2672        patch.extend_from_slice(&20_u32.to_le_bytes());
2673        patch.extend_from_slice(&[0_u8; 20]);
2674        patch.extend_from_slice(b">>>");
2675        patch.push(0x80);
2676        patch.extend_from_slice(&(data.len() as u64).to_le_bytes());
2677        patch.extend_from_slice(data);
2678        patch
2679    }
2680
2681    /// Helper: open a zip for `find_entry_in_archive` tests.
2682    fn open_zip(path: &std::path::Path) -> zip::ZipArchive<std::fs::File> {
2683        let file = std::fs::File::open(path).unwrap();
2684        zip::ZipArchive::new(file).unwrap()
2685    }
2686
2687    fn minimal_manifest() -> WabbajackManifest {
2688        WabbajackManifest {
2689            name: "test".into(),
2690            author: "a".into(),
2691            description: "d".into(),
2692            game: "SkyrimSE".into(),
2693            version: "1.0".into(),
2694            archives: vec![],
2695            directives: vec![],
2696        }
2697    }
2698
2699    fn game_file_archive(
2700        hash: u64,
2701        rel_path: &str,
2702    ) -> modde_core::manifest::wabbajack::ArchiveEntry {
2703        modde_core::manifest::wabbajack::ArchiveEntry {
2704            hash,
2705            name: rel_path.replace(['\\', '/'], "_"),
2706            size: 0,
2707            state: Some(ArchiveState::GameFileSourceDownloader {
2708                metadata: HashMap::from([(
2709                    "File".to_string(),
2710                    serde_json::Value::String(rel_path.to_string()),
2711                )]),
2712            }),
2713        }
2714    }
2715
2716    fn manifest_with_game_file(hash: u64, rel_path: &str, to: &str) -> WabbajackManifest {
2717        WabbajackManifest {
2718            archives: vec![game_file_archive(hash, rel_path)],
2719            directives: vec![modde_core::manifest::wabbajack::RawDirective::FromArchive {
2720                archive_hash_path: vec![serde_json::Value::Number(hash.into())],
2721                to: to.into(),
2722                size: 0,
2723            }],
2724            ..minimal_manifest()
2725        }
2726    }
2727
2728    fn manifest_with_nexus_download(hash: u64) -> WabbajackManifest {
2729        WabbajackManifest {
2730            archives: vec![modde_core::manifest::wabbajack::ArchiveEntry {
2731                hash,
2732                name: "nexus archive".into(),
2733                size: 1,
2734                state: Some(ArchiveState::NexusDownloader {
2735                    game_name: "skyrimspecialedition".into(),
2736                    mod_id: 123.into(),
2737                    file_id: 456.into(),
2738                }),
2739            }],
2740            ..minimal_manifest()
2741        }
2742    }
2743
2744    #[tokio::test]
2745    async fn archive_trust_sidecar_skips_second_hash_pass() {
2746        let dir = tempfile::tempdir().unwrap();
2747        let store = dir.path().join("store");
2748        let staging = dir.path().join("staging");
2749        tokio::fs::create_dir_all(&store).await.unwrap();
2750        let bytes = b"trusted archive bytes";
2751        let hash = xxh64(bytes, 0);
2752        let path = archive_path(&store, &hash);
2753        tokio::fs::write(&path, bytes).await.unwrap();
2754
2755        let inst = WabbajackInstaller::new(
2756            minimal_manifest(),
2757            dir.path().join("list.wabbajack"),
2758            store,
2759            staging,
2760        );
2761        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
2762
2763        let (_archive, first) = inst.ensure_archive_trusted(hash, &tx).await.unwrap();
2764        assert_eq!(first.streamed_hash_bytes, bytes.len() as u64);
2765        assert!(!first.sidecar_hit);
2766        assert!(verified_sidecar_path(&path).exists());
2767
2768        let (_archive, second) = inst.ensure_archive_trusted(hash, &tx).await.unwrap();
2769        assert!(second.sidecar_hit);
2770        assert_eq!(second.streamed_hash_bytes, 0);
2771    }
2772
2773    #[tokio::test]
2774    async fn stale_archive_trust_sidecar_forces_rehash() {
2775        let dir = tempfile::tempdir().unwrap();
2776        let store = dir.path().join("store");
2777        let staging = dir.path().join("staging");
2778        tokio::fs::create_dir_all(&store).await.unwrap();
2779        let bytes = b"trusted archive bytes";
2780        let hash = xxh64(bytes, 0);
2781        let path = archive_path(&store, &hash);
2782        tokio::fs::write(&path, bytes).await.unwrap();
2783
2784        let inst = WabbajackInstaller::new(
2785            minimal_manifest(),
2786            dir.path().join("list.wabbajack"),
2787            store,
2788            staging,
2789        );
2790        inst.write_verified_sidecar(&path, hash).await.unwrap();
2791        let mut sidecar: serde_json::Value =
2792            serde_json::from_slice(&tokio::fs::read(verified_sidecar_path(&path)).await.unwrap())
2793                .unwrap();
2794        sidecar["size_bytes"] = serde_json::json!(1);
2795        tokio::fs::write(
2796            verified_sidecar_path(&path),
2797            serde_json::to_vec_pretty(&sidecar).unwrap(),
2798        )
2799        .await
2800        .unwrap();
2801
2802        let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
2803        let (_archive, stats) = inst.ensure_archive_trusted(hash, &tx).await.unwrap();
2804        assert!(!stats.sidecar_hit);
2805        assert_eq!(stats.streamed_hash_bytes, bytes.len() as u64);
2806    }
2807
2808    fn synthetic_server(
2809        cdn_archive: Vec<u8>,
2810        direct_archive: Vec<u8>,
2811    ) -> (String, Arc<AtomicUsize>) {
2812        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2813        let addr = listener.local_addr().unwrap();
2814        let count = Arc::new(AtomicUsize::new(0));
2815        let thread_count = Arc::clone(&count);
2816        thread::spawn(move || {
2817            for stream in listener.incoming().take(8) {
2818                let mut stream = stream.unwrap();
2819                let mut buf = [0_u8; 2048];
2820                let n = stream.read(&mut buf).unwrap();
2821                let request = String::from_utf8_lossy(&buf[..n]);
2822                let path = request
2823                    .lines()
2824                    .next()
2825                    .and_then(|line| line.split_whitespace().nth(1))
2826                    .unwrap_or("/");
2827                thread_count.fetch_add(1, Ordering::SeqCst);
2828                match path {
2829                    "/authored_files/download/cdn.zip_abc" => {
2830                        let body = format!(
2831                            r#"<script>
2832                              const MUNGED_NAME = "cdn.zip_abc";
2833                              const FILE_NAME = "cdn.zip";
2834                              const FILE_SIZE_BYTES = {};
2835                              const PARTS = [{{"Size":{},"Offset":0,"Index":0}}];
2836                            </script>"#,
2837                            cdn_archive.len(),
2838                            cdn_archive.len()
2839                        );
2840                        write_response(&mut stream, "200 OK", body.as_bytes());
2841                    }
2842                    "/authored_files/cdn.zip_abc/parts/0" => {
2843                        write_response(&mut stream, "200 OK", &cdn_archive);
2844                    }
2845                    "/direct.zip" => {
2846                        write_response(&mut stream, "200 OK", &direct_archive);
2847                    }
2848                    _ => write_response(&mut stream, "404 Not Found", b"not found"),
2849                }
2850            }
2851        });
2852        (format!("http://{addr}"), count)
2853    }
2854
2855    fn write_response(stream: &mut std::net::TcpStream, status: &str, body: &[u8]) {
2856        let headers = format!(
2857            "HTTP/1.1 {status}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
2858            body.len()
2859        );
2860        stream.write_all(headers.as_bytes()).unwrap();
2861        stream.write_all(body).unwrap();
2862    }
2863
2864    // -----------------------------------------------------------------------
2865    // 1. WabbajackInstaller::new() creates correct directory structure
2866    // -----------------------------------------------------------------------
2867    #[test]
2868    fn new_stores_fields() {
2869        let store = PathBuf::from("/tmp/store");
2870        let staging = PathBuf::from("/tmp/staging");
2871        let inst = WabbajackInstaller::new(
2872            minimal_manifest(),
2873            PathBuf::from("/tmp/test.wabbajack"),
2874            store,
2875            staging,
2876        );
2877        assert_eq!(inst.concurrency, DEFAULT_CONCURRENCY);
2878        assert!(inst.sources.is_empty());
2879        assert_eq!(inst.store_dir, PathBuf::from("/tmp/store"));
2880        assert_eq!(inst.staging_dir, PathBuf::from("/tmp/staging"));
2881        assert_eq!(inst.manifest.name, "test");
2882    }
2883
2884    #[test]
2885    fn validate_download_sources_rejects_required_source_before_network() {
2886        let dir = tempfile::tempdir().unwrap();
2887        let inst = WabbajackInstaller::new(
2888            manifest_with_nexus_download(1),
2889            dir.path().join("test.wabbajack"),
2890            dir.path().join("store"),
2891            dir.path().join("staging"),
2892        );
2893
2894        let downloads = inst.manifest.download_directives();
2895        let err = inst.validate_download_sources(&downloads).unwrap_err();
2896        let msg = format!("{err:#}");
2897        assert!(
2898            msg.contains("no registered source"),
2899            "unexpected error: {msg}"
2900        );
2901        assert!(msg.contains("nexus:123"), "unexpected error: {msg}");
2902    }
2903
2904    #[tokio::test]
2905    async fn adoption_preserves_existing_outputs_and_writes_sentinels() {
2906        let dir = tempfile::tempdir().unwrap();
2907        let staging = dir.path().join("staging");
2908        let output = staging.join("mods/adopted/file.txt");
2909        tokio::fs::create_dir_all(output.parent().unwrap())
2910            .await
2911            .unwrap();
2912        tokio::fs::write(&output, b"already applied").await.unwrap();
2913        tokio::fs::write(staging.join("mods/adopted/output.bsa"), b"bsa")
2914            .await
2915            .unwrap();
2916
2917        let hash = 42_u64;
2918        let manifest = WabbajackManifest {
2919            archives: vec![modde_core::manifest::wabbajack::ArchiveEntry {
2920                hash,
2921                name: "source.zip".into(),
2922                size: 123,
2923                state: None,
2924            }],
2925            directives: vec![
2926                modde_core::manifest::wabbajack::RawDirective::FromArchive {
2927                    archive_hash_path: vec![
2928                        serde_json::Value::Number(hash.into()),
2929                        serde_json::Value::String("file.txt".into()),
2930                    ],
2931                    to: "mods/adopted/file.txt".into(),
2932                    size: 0,
2933                },
2934                modde_core::manifest::wabbajack::RawDirective::CreateBSA {
2935                    temp_id: "bsa-temp".into(),
2936                    to: "mods/adopted/output.bsa".into(),
2937                    file_states: vec![],
2938                },
2939            ],
2940            ..minimal_manifest()
2941        };
2942        let inst = WabbajackInstaller::new(
2943            manifest,
2944            dir.path().join("test.wabbajack"),
2945            dir.path().join("store"),
2946            staging.clone(),
2947        );
2948
2949        let prepare_status = StagingStore::new(&staging)
2950            .prepare_resumable()
2951            .await
2952            .unwrap();
2953        assert_eq!(prepare_status, StagingPrepareStatus::Adopted);
2954
2955        let batches = inst.manifest.install_directives_grouped_by_archive();
2956        let installs = inst.manifest.install_directives();
2957        let adoption = inst
2958            .adopt_existing_staging(&batches, &installs)
2959            .await
2960            .unwrap();
2961
2962        assert_eq!(adoption.archive_batches, 1);
2963        assert_eq!(adoption.create_bsa, 1);
2964        assert!(output.exists());
2965        assert!(inst.archive_batch_sentinel_path(hash).exists());
2966        assert!(inst.create_bsa_sentinel_path(1).exists());
2967        assert!(inst.archive_batch_sentinel_valid(&batches[0]).await);
2968        assert!(
2969            inst.create_bsa_sentinel_valid(1, "bsa-temp", "mods/adopted/output.bsa")
2970                .await
2971        );
2972    }
2973
2974    #[tokio::test]
2975    async fn synthetic_wabbajack_pipeline_reaches_late_directives() {
2976        let dir = tempfile::tempdir().unwrap();
2977        let cdn_archive = zip_bytes(&[("source.txt", b"basis")]);
2978        let large_direct = vec![7_u8; 1024 * 1024 + 17];
2979        let direct_archive = zip_bytes(&[
2980            ("direct.txt", b"direct"),
2981            ("large.dds", large_direct.as_slice()),
2982        ]);
2983        let cdn_hash = xxh64(&cdn_archive, 0);
2984        let direct_hash = xxh64(&direct_archive, 0);
2985        let (base_url, _requests) = synthetic_server(cdn_archive, direct_archive);
2986        let wabbajack_path = dir.path().join("synthetic.wabbajack");
2987        create_zip_file(
2988            &wabbajack_path,
2989            &[
2990                ("inline-data", b"inline"),
2991                ("patch-data", &data_patch(b"patched")),
2992            ],
2993        );
2994
2995        let manifest = WabbajackManifest {
2996            archives: vec![
2997                modde_core::manifest::wabbajack::ArchiveEntry {
2998                    hash: cdn_hash,
2999                    name: "cdn.zip".into(),
3000                    size: 0,
3001                    state: Some(ArchiveState::WabbajackCDNDownloader {
3002                        metadata: HashMap::from([(
3003                            "Url".into(),
3004                            serde_json::Value::String(format!(
3005                                "{base_url}/authored_files/download/cdn.zip_abc"
3006                            )),
3007                        )]),
3008                    }),
3009                },
3010                modde_core::manifest::wabbajack::ArchiveEntry {
3011                    hash: direct_hash,
3012                    name: "direct.zip".into(),
3013                    size: 0,
3014                    state: Some(ArchiveState::HttpDownloader {
3015                        url: format!("{base_url}/direct.zip"),
3016                        headers: HashMap::new(),
3017                    }),
3018                },
3019            ],
3020            directives: vec![
3021                modde_core::manifest::wabbajack::RawDirective::FromArchive {
3022                    archive_hash_path: vec![
3023                        serde_json::Value::Number(cdn_hash.into()),
3024                        serde_json::Value::String("source.txt".into()),
3025                    ],
3026                    to: "mods/cdn/source.txt".into(),
3027                    size: 0,
3028                },
3029                modde_core::manifest::wabbajack::RawDirective::FromArchive {
3030                    archive_hash_path: vec![
3031                        serde_json::Value::Number(direct_hash.into()),
3032                        serde_json::Value::String("direct.txt".into()),
3033                    ],
3034                    to: "mods/direct/direct.txt".into(),
3035                    size: 0,
3036                },
3037                modde_core::manifest::wabbajack::RawDirective::FromArchive {
3038                    archive_hash_path: vec![
3039                        serde_json::Value::Number(direct_hash.into()),
3040                        serde_json::Value::String("large.dds".into()),
3041                    ],
3042                    to: "mods/direct/large.dds".into(),
3043                    size: 0,
3044                },
3045                modde_core::manifest::wabbajack::RawDirective::InlineFile {
3046                    hash: 0,
3047                    size: 6,
3048                    source_data_id: "inline-data".into(),
3049                    to: "mods/inline/inline.txt".into(),
3050                },
3051                modde_core::manifest::wabbajack::RawDirective::PatchedFromArchive {
3052                    archive_hash_path: vec![
3053                        serde_json::Value::Number(cdn_hash.into()),
3054                        serde_json::Value::String("source.txt".into()),
3055                    ],
3056                    to: "mods/patched/patched.txt".into(),
3057                    hash: 0,
3058                    patch_id: "patch-data".into(),
3059                    size: 0,
3060                },
3061            ],
3062            ..minimal_manifest()
3063        };
3064        let mut inst = WabbajackInstaller::new(
3065            manifest,
3066            wabbajack_path,
3067            dir.path().join("store"),
3068            dir.path().join("staging"),
3069        );
3070        let client = reqwest::Client::new();
3071        inst.add_source(crate::AnySource::WabbajackCdn(
3072            crate::wabbajack::cdn::WabbajackCdnSource::new(client.clone()),
3073        ));
3074        inst.add_source(crate::AnySource::Direct(crate::direct::DirectSource::new(
3075            client,
3076        )));
3077
3078        let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3079        inst.install(progress_tx).await.unwrap();
3080
3081        assert_eq!(
3082            tokio::fs::read(dir.path().join("staging/mods/cdn/source.txt"))
3083                .await
3084                .unwrap(),
3085            b"basis"
3086        );
3087        assert_eq!(
3088            tokio::fs::read(dir.path().join("staging/mods/direct/direct.txt"))
3089                .await
3090                .unwrap(),
3091            b"direct"
3092        );
3093        assert_eq!(
3094            tokio::fs::read(dir.path().join("staging/mods/inline/inline.txt"))
3095                .await
3096                .unwrap(),
3097            b"inline"
3098        );
3099        assert_eq!(
3100            tokio::fs::read(dir.path().join("staging/mods/patched/patched.txt"))
3101                .await
3102                .unwrap(),
3103            b"patched"
3104        );
3105
3106        let large_path = dir.path().join("staging/mods/direct/large.dds");
3107        assert!(compressed_path(&large_path).exists());
3108        assert!(!large_path.exists());
3109        let staging_store = StagingStore::new(dir.path().join("staging"));
3110        let mut reader = staging_store
3111            .open_logical_reader("mods/direct/large.dds")
3112            .unwrap();
3113        let mut decoded = Vec::new();
3114        reader.read_to_end(&mut decoded).unwrap();
3115        assert_eq!(decoded, large_direct);
3116
3117        assert!(
3118            dir.path()
3119                .join(format!(
3120                    "staging/_state/archive-batches/{direct_hash:016x}.json"
3121                ))
3122                .exists()
3123        );
3124        let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3125        inst.install(progress_tx).await.unwrap();
3126        assert!(compressed_path(&large_path).exists());
3127    }
3128
3129    // -----------------------------------------------------------------------
3130    // 2. set_concurrency changes and clamps the value
3131    // -----------------------------------------------------------------------
3132    #[test]
3133    fn set_concurrency_changes_value() {
3134        let mut inst = WabbajackInstaller::new(
3135            minimal_manifest(),
3136            PathBuf::new(),
3137            PathBuf::new(),
3138            PathBuf::new(),
3139        );
3140        inst.set_concurrency(16);
3141        assert_eq!(inst.concurrency, 16);
3142    }
3143
3144    #[test]
3145    fn set_concurrency_clamps_zero_to_one() {
3146        let mut inst = WabbajackInstaller::new(
3147            minimal_manifest(),
3148            PathBuf::new(),
3149            PathBuf::new(),
3150            PathBuf::new(),
3151        );
3152        inst.set_concurrency(0);
3153        assert_eq!(inst.concurrency, 1);
3154    }
3155
3156    #[tokio::test]
3157    async fn install_reads_game_file_source_from_game_dir() {
3158        let dir = tempfile::tempdir().unwrap();
3159        let game_dir = dir.path().join("game");
3160        let store_dir = dir.path().join("store");
3161        let staging_dir = dir.path().join("staging");
3162        std::fs::create_dir_all(game_dir.join("Data")).unwrap();
3163        let source_bytes = b"game file bytes";
3164        std::fs::write(game_dir.join("Data/Update.esm"), source_bytes).unwrap();
3165
3166        let manifest = manifest_with_game_file(
3167            xxh64(source_bytes, 0),
3168            "Data\\Update.esm",
3169            "mods/Skyrim Base/Update.esm",
3170        );
3171
3172        let mut inst = WabbajackInstaller::new(
3173            manifest,
3174            dir.path().join("test.wabbajack"),
3175            store_dir,
3176            staging_dir.clone(),
3177        );
3178        inst.set_game_dir(game_dir);
3179
3180        let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3181        inst.install(progress_tx).await.unwrap();
3182
3183        assert_eq!(
3184            std::fs::read(staging_dir.join("mods/Skyrim Base/Update.esm")).unwrap(),
3185            b"game file bytes"
3186        );
3187    }
3188
3189    #[tokio::test]
3190    async fn install_requires_game_dir_for_game_file_sources() {
3191        let dir = tempfile::tempdir().unwrap();
3192        let manifest = WabbajackManifest {
3193            archives: vec![game_file_archive(0x1234, "Data\\Update.esm")],
3194            directives: vec![],
3195            ..minimal_manifest()
3196        };
3197
3198        let inst = WabbajackInstaller::new(
3199            manifest,
3200            dir.path().join("test.wabbajack"),
3201            dir.path().join("store"),
3202            dir.path().join("staging"),
3203        );
3204
3205        let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3206        let err = inst.install(progress_tx).await.unwrap_err();
3207        let msg = format!("{err:#}");
3208        assert!(
3209            msg.contains("pass --game-dir"),
3210            "unexpected error message: {msg}"
3211        );
3212    }
3213
3214    #[tokio::test]
3215    async fn install_rejects_mismatched_game_file_hash() {
3216        let dir = tempfile::tempdir().unwrap();
3217        let game_dir = dir.path().join("game");
3218        std::fs::create_dir_all(game_dir.join("Data")).unwrap();
3219        std::fs::write(game_dir.join("Data/Update.esm"), b"actual bytes").unwrap();
3220
3221        let manifest = manifest_with_game_file(
3222            xxh64(b"expected bytes", 0),
3223            "Data\\Update.esm",
3224            "mods/Update.esm",
3225        );
3226        let mut inst = WabbajackInstaller::new(
3227            manifest,
3228            dir.path().join("test.wabbajack"),
3229            dir.path().join("store"),
3230            dir.path().join("staging"),
3231        );
3232        inst.set_game_dir(game_dir);
3233
3234        let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3235        let err = inst.install(progress_tx).await.unwrap_err();
3236        let msg = format!("{err:#}");
3237        assert!(msg.contains("Data\\Update.esm"), "unexpected error: {msg}");
3238        assert!(
3239            msg.contains("expected xxh64"),
3240            "unexpected error message: {msg}"
3241        );
3242    }
3243
3244    #[tokio::test]
3245    async fn install_rejects_missing_game_file() {
3246        let dir = tempfile::tempdir().unwrap();
3247        let game_dir = dir.path().join("game");
3248        std::fs::create_dir_all(&game_dir).unwrap();
3249
3250        let manifest = manifest_with_game_file(
3251            xxh64(b"missing bytes", 0),
3252            "Data\\Update.esm",
3253            "mods/Update.esm",
3254        );
3255        let mut inst = WabbajackInstaller::new(
3256            manifest,
3257            dir.path().join("test.wabbajack"),
3258            dir.path().join("store"),
3259            dir.path().join("staging"),
3260        );
3261        inst.set_game_dir(game_dir);
3262
3263        let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3264        let err = inst.install(progress_tx).await.unwrap_err();
3265        let msg = format!("{err:#}");
3266        assert!(msg.contains("Data\\Update.esm"), "unexpected error: {msg}");
3267        assert!(msg.contains("missing"), "unexpected error: {msg}");
3268    }
3269
3270    #[tokio::test]
3271    async fn install_reports_all_invalid_game_files() {
3272        let dir = tempfile::tempdir().unwrap();
3273        let game_dir = dir.path().join("game");
3274        std::fs::create_dir_all(game_dir.join("Data")).unwrap();
3275        std::fs::write(game_dir.join("Data/Update.esm"), b"actual bytes").unwrap();
3276
3277        let mismatch_hash = xxh64(b"expected bytes", 0);
3278        let missing_hash = xxh64(b"missing bytes", 0);
3279        let manifest = WabbajackManifest {
3280            archives: vec![
3281                game_file_archive(mismatch_hash, "Data\\Update.esm"),
3282                game_file_archive(missing_hash, "SkyrimSE.exe"),
3283            ],
3284            directives: vec![
3285                modde_core::manifest::wabbajack::RawDirective::FromArchive {
3286                    archive_hash_path: vec![serde_json::Value::Number(mismatch_hash.into())],
3287                    to: "mods/Update.esm".into(),
3288                    size: 0,
3289                },
3290                modde_core::manifest::wabbajack::RawDirective::FromArchive {
3291                    archive_hash_path: vec![serde_json::Value::Number(missing_hash.into())],
3292                    to: "mods/SkyrimSE.exe".into(),
3293                    size: 0,
3294                },
3295            ],
3296            ..minimal_manifest()
3297        };
3298        let mut inst = WabbajackInstaller::new(
3299            manifest,
3300            dir.path().join("test.wabbajack"),
3301            dir.path().join("store"),
3302            dir.path().join("staging"),
3303        );
3304        inst.set_game_dir(game_dir);
3305
3306        let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3307        let err = inst.install(progress_tx).await.unwrap_err();
3308        let msg = format!("{err:#}");
3309        assert!(
3310            msg.contains("validation failed for 2 file(s)"),
3311            "unexpected error: {msg}"
3312        );
3313        assert!(msg.contains("Data\\Update.esm"), "unexpected error: {msg}");
3314        assert!(msg.contains("got"), "unexpected error: {msg}");
3315        assert!(msg.contains("SkyrimSE.exe"), "unexpected error: {msg}");
3316        assert!(msg.contains("missing"), "unexpected error: {msg}");
3317    }
3318
3319    #[tokio::test]
3320    async fn install_resolves_game_file_path_case_insensitively() {
3321        let dir = tempfile::tempdir().unwrap();
3322        let game_dir = dir.path().join("game");
3323        let store_dir = dir.path().join("store");
3324        let staging_dir = dir.path().join("staging");
3325        std::fs::create_dir_all(game_dir.join("DATA")).unwrap();
3326        let source_bytes = b"case insensitive game file";
3327        std::fs::write(game_dir.join("DATA/update.ESM"), source_bytes).unwrap();
3328
3329        let manifest = manifest_with_game_file(
3330            xxh64(source_bytes, 0),
3331            "Data\\Update.esm",
3332            "mods/Update.esm",
3333        );
3334        let mut inst = WabbajackInstaller::new(
3335            manifest,
3336            dir.path().join("test.wabbajack"),
3337            store_dir,
3338            staging_dir.clone(),
3339        );
3340        inst.set_game_dir(game_dir);
3341
3342        let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3343        inst.install(progress_tx).await.unwrap();
3344
3345        assert_eq!(
3346            std::fs::read(staging_dir.join("mods/Update.esm")).unwrap(),
3347            source_bytes
3348        );
3349    }
3350
3351    // -----------------------------------------------------------------------
3352    // 3. find_entry_in_archive with exact match
3353    // -----------------------------------------------------------------------
3354    #[test]
3355    fn find_entry_exact_match() {
3356        let dir = tempfile::tempdir().unwrap();
3357        let zip_path = dir.path().join("test.zip");
3358        create_zip_file(&zip_path, &[("data/meshes/test.nif", b"mesh")]);
3359        let archive = open_zip(&zip_path);
3360
3361        let result = find_entry_in_archive(&archive, "data/meshes/test.nif").unwrap();
3362        assert_eq!(result, "data/meshes/test.nif");
3363    }
3364
3365    // -----------------------------------------------------------------------
3366    // 4. find_entry_in_archive with normalized paths (/ vs \)
3367    // -----------------------------------------------------------------------
3368    #[test]
3369    fn find_entry_backslash_to_forward_slash() {
3370        let dir = tempfile::tempdir().unwrap();
3371        let zip_path = dir.path().join("test.zip");
3372        create_zip_file(&zip_path, &[("data/meshes/test.nif", b"mesh")]);
3373        let archive = open_zip(&zip_path);
3374
3375        let result = find_entry_in_archive(&archive, "data\\meshes\\test.nif").unwrap();
3376        assert_eq!(result, "data/meshes/test.nif");
3377    }
3378
3379    #[test]
3380    fn find_entry_forward_slash_to_backslash() {
3381        let dir = tempfile::tempdir().unwrap();
3382        let zip_path = dir.path().join("test.zip");
3383        create_zip_file(&zip_path, &[("data\\meshes\\test.nif", b"mesh")]);
3384        let archive = open_zip(&zip_path);
3385
3386        let result = find_entry_in_archive(&archive, "data/meshes/test.nif").unwrap();
3387        assert_eq!(result, "data\\meshes\\test.nif");
3388    }
3389
3390    // -----------------------------------------------------------------------
3391    // 5. find_entry_in_archive with case-insensitive fallback
3392    // -----------------------------------------------------------------------
3393    #[test]
3394    fn find_entry_case_insensitive() {
3395        let dir = tempfile::tempdir().unwrap();
3396        let zip_path = dir.path().join("test.zip");
3397        create_zip_file(&zip_path, &[("Data/Meshes/Test.NIF", b"mesh")]);
3398        let archive = open_zip(&zip_path);
3399
3400        let result = find_entry_in_archive(&archive, "data/meshes/test.nif").unwrap();
3401        assert_eq!(result, "Data/Meshes/Test.NIF");
3402    }
3403
3404    #[test]
3405    fn find_entry_not_found() {
3406        let dir = tempfile::tempdir().unwrap();
3407        let zip_path = dir.path().join("test.zip");
3408        create_zip_file(&zip_path, &[("other.txt", b"data")]);
3409        let archive = open_zip(&zip_path);
3410
3411        let result = find_entry_in_archive(&archive, "nonexistent.txt");
3412        assert!(result.is_err());
3413    }
3414
3415    // -----------------------------------------------------------------------
3416    // 6. archive_path formatting (hash as 016x)
3417    // -----------------------------------------------------------------------
3418    #[test]
3419    fn archive_path_zero_padded_hex() {
3420        let store = PathBuf::from("/store");
3421        let hash: u64 = 0xDEADBEEF;
3422        let path = archive_path(&store, &hash);
3423        assert_eq!(path, PathBuf::from("/store/00000000deadbeef.archive"));
3424    }
3425
3426    #[test]
3427    fn archive_path_full_width_hash() {
3428        let store = PathBuf::from("/store");
3429        let hash: u64 = 0xFFFFFFFFFFFFFFFF;
3430        let path = archive_path(&store, &hash);
3431        assert_eq!(path, PathBuf::from("/store/ffffffffffffffff.archive"));
3432    }
3433
3434    #[test]
3435    fn archive_path_zero_hash() {
3436        let store = PathBuf::from("/store");
3437        let hash: u64 = 0;
3438        let path = archive_path(&store, &hash);
3439        assert_eq!(path, PathBuf::from("/store/0000000000000000.archive"));
3440    }
3441
3442    // -----------------------------------------------------------------------
3443    // 7. directive_name for each directive type
3444    // -----------------------------------------------------------------------
3445    #[test]
3446    fn directive_name_nexus() {
3447        let d = DownloadDirective::Nexus {
3448            game_id: "skyrimse".into(),
3449            mod_id: 12345.into(),
3450            file_id: 1.into(),
3451            hash: 0,
3452        };
3453        assert_eq!(d.display_name(), "nexus:12345");
3454    }
3455
3456    #[test]
3457    fn directive_name_github() {
3458        let d = DownloadDirective::GitHub {
3459            user: "user".into(),
3460            repo: "myrepo".into(),
3461            tag: "v1".into(),
3462            asset: "a.zip".into(),
3463            hash: 0,
3464        };
3465        assert_eq!(d.display_name(), "github:myrepo");
3466    }
3467
3468    #[test]
3469    fn directive_name_gdrive() {
3470        let d = DownloadDirective::GoogleDrive {
3471            id: "abc123".into(),
3472            hash: 0,
3473        };
3474        assert_eq!(d.display_name(), "gdrive:abc123");
3475    }
3476
3477    #[test]
3478    fn directive_name_mega() {
3479        let d = DownloadDirective::Mega {
3480            url: "https://mega.nz/file/ABCDEF#key".into(),
3481            hash: 0,
3482        };
3483        let name = d.display_name();
3484        assert!(name.starts_with("mega:"));
3485        assert!(name.len() <= 35); // "mega:" + up to 30 chars
3486    }
3487
3488    #[test]
3489    fn directive_name_direct_url() {
3490        let d = DownloadDirective::DirectURL {
3491            url: "https://example.com/files/mod.zip".into(),
3492            headers: HashMap::new(),
3493            mirror_resolver: None,
3494            hash: 0,
3495        };
3496        let name = d.display_name();
3497        assert!(name.starts_with("http:"));
3498        assert!(name.len() <= 35); // "http:" + up to 30 chars
3499    }
3500
3501    #[test]
3502    fn directive_name_mega_short_url() {
3503        let d = DownloadDirective::Mega {
3504            url: "https://mega.nz/short".into(),
3505            hash: 0,
3506        };
3507        let name = d.display_name();
3508        assert_eq!(name, "mega:https://mega.nz/short");
3509    }
3510
3511    // -----------------------------------------------------------------------
3512    // 8. directive_hash extraction for each directive type
3513    // -----------------------------------------------------------------------
3514    #[test]
3515    fn directive_hash_nexus() {
3516        let d = DownloadDirective::Nexus {
3517            game_id: "s".into(),
3518            mod_id: 1.into(),
3519            file_id: 1.into(),
3520            hash: 0xABCD,
3521        };
3522        assert_eq!(d.hash(), 0xABCD);
3523    }
3524
3525    #[test]
3526    fn directive_hash_github() {
3527        let d = DownloadDirective::GitHub {
3528            user: "u".into(),
3529            repo: "r".into(),
3530            tag: "t".into(),
3531            asset: "a".into(),
3532            hash: 999,
3533        };
3534        assert_eq!(d.hash(), 999);
3535    }
3536
3537    #[test]
3538    fn directive_hash_gdrive() {
3539        let d = DownloadDirective::GoogleDrive {
3540            id: "x".into(),
3541            hash: 42,
3542        };
3543        assert_eq!(d.hash(), 42);
3544    }
3545
3546    #[test]
3547    fn directive_hash_mega() {
3548        let d = DownloadDirective::Mega {
3549            url: "u".into(),
3550            hash: 7777,
3551        };
3552        assert_eq!(d.hash(), 7777);
3553    }
3554
3555    #[test]
3556    fn directive_hash_direct_url() {
3557        let d = DownloadDirective::DirectURL {
3558            url: "u".into(),
3559            headers: HashMap::new(),
3560            mirror_resolver: None,
3561            hash: 0xFFFF,
3562        };
3563        assert_eq!(d.hash(), 0xFFFF);
3564    }
3565
3566    // -----------------------------------------------------------------------
3567    // 9. extract_from_zip with valid zip
3568    // -----------------------------------------------------------------------
3569    #[test]
3570    fn extract_from_zip_valid() {
3571        let dir = tempfile::tempdir().unwrap();
3572        let zip_path = dir.path().join("test.zip");
3573        create_zip_file(&zip_path, &[("inner/file.txt", b"hello world")]);
3574
3575        let data = extract_from_zip(&zip_path, "inner/file.txt").unwrap();
3576        assert_eq!(data, b"hello world");
3577    }
3578
3579    // -----------------------------------------------------------------------
3580    // 10. extract_from_zip with nonexistent entry
3581    // -----------------------------------------------------------------------
3582    #[test]
3583    fn extract_from_zip_missing_entry() {
3584        let dir = tempfile::tempdir().unwrap();
3585        let zip_path = dir.path().join("test.zip");
3586        create_zip_file(&zip_path, &[("exists.txt", b"data")]);
3587
3588        let result = extract_from_zip(&zip_path, "does_not_exist.txt");
3589        assert!(result.is_err());
3590        let err_msg = format!("{:#}", result.unwrap_err());
3591        assert!(err_msg.contains("not found"), "unexpected error: {err_msg}");
3592    }
3593
3594    // -----------------------------------------------------------------------
3595    // 11. normalize_path
3596    // -----------------------------------------------------------------------
3597    #[test]
3598    fn normalize_path_backslashes() {
3599        assert_eq!(normalize_path("mods\\test\\file.txt"), "mods/test/file.txt");
3600    }
3601
3602    #[test]
3603    fn normalize_path_already_forward() {
3604        assert_eq!(normalize_path("mods/test/file.txt"), "mods/test/file.txt");
3605    }
3606}