1use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::{Instant, SystemTime, UNIX_EPOCH};
9
10use anyhow::{Context, Result, bail};
11use bytes::Bytes;
12use futures::stream::{self, StreamExt};
13use serde::{Deserialize, Serialize};
14use tokio::sync::{Semaphore, mpsc};
15use tokio::task::JoinHandle;
16use tracing::{info, warn};
17
18use modde_core::manifest::wabbajack::{
19 ArchiveInstallBatch, ArchiveState, DownloadDirective, InstallDirective, WabbajackManifest,
20};
21
22use crate::cache::{ByteCacheKey, ByteLruCache};
23use crate::decompress::{ArchiveBatchExtractor, ArchiveInput, ArchiveRequest, ArchiveRequestKind};
24use crate::traits::{AnySource, DownloadSource};
25
26use super::bsa_repack;
27use super::diagnostics::{
28 ArchiveBatchRecord, ProcessSnapshot, ProgressEvent, WabbajackDiagnostics,
29 cgroup_memory_pressure_high, current_process_snapshot,
30};
31use super::impact::{MissingArchiveImpact, MissingArchivePolicy};
32use super::inline::InlineSource;
33use super::patcher;
34use super::staging::StagingStore;
35
36const DEFAULT_CONCURRENCY: usize = 4;
38const APPLY_STATE_VERSION: u32 = 1;
39const DEFAULT_ARCHIVE_MEMORY_MAX_BYTES: u64 = 256 * 1024 * 1024;
40
41#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "kebab-case")]
45pub enum ArchiveRetentionPolicy {
46 #[default]
47 Keep,
48 PruneApplied,
49 Auto,
50}
51
52impl ArchiveRetentionPolicy {
53 #[must_use]
56 pub fn from_env() -> Self {
57 match std::env::var("MODDE_ARCHIVE_RETENTION")
58 .ok()
59 .as_deref()
60 .map(str::to_ascii_lowercase)
61 .as_deref()
62 {
63 Some("prune-applied" | "prune" | "delete") => Self::PruneApplied,
64 Some("auto") => Self::Auto,
65 _ => Self::Keep,
66 }
67 }
68}
69
70#[derive(Debug, Serialize, Deserialize)]
71struct ArchiveBatchSentinel {
72 pipeline_version: u32,
73 archive_hash: u64,
74 archive_size_bytes: u64,
75 directive_indices: Vec<usize>,
76}
77
78#[derive(Debug, Serialize, Deserialize)]
79struct CreateBsaSentinel {
80 pipeline_version: u32,
81 directive_index: usize,
82 temp_id: String,
83 to: String,
84}
85
86#[derive(Debug, Serialize, Deserialize)]
87struct VerifiedArchiveSidecar {
88 pipeline_version: u32,
89 archive_hash: u64,
90 size_bytes: u64,
91 modified_unix_ms: u128,
92 verified_unix_ms: u128,
93}
94
95#[derive(Debug, Clone)]
96enum TrustedArchive {
97 Path(PathBuf),
98 Bytes {
99 label: String,
100 bytes: Bytes,
101 fallback_path: PathBuf,
102 },
103}
104
105#[derive(Debug, Default, Clone, Copy)]
106struct ArchiveTrustStats {
107 sidecar_hit: bool,
108 streamed_hash_bytes: u64,
109 memory_archive_hit: bool,
110 disk_fallback: bool,
111}
112
113#[derive(Debug, Default, Clone, Copy)]
114struct StagingAdoptionSummary {
115 archive_batches: usize,
116 create_bsa: usize,
117}
118
119struct DiagnosticsHeartbeatGuard(Option<JoinHandle<()>>);
120
121impl Drop for DiagnosticsHeartbeatGuard {
122 fn drop(&mut self) {
123 if let Some(handle) = self.0.take() {
124 handle.abort();
125 }
126 }
127}
128
129#[derive(Debug, Clone)]
131pub enum InstallProgress {
132 Starting {
133 total_downloads: usize,
134 },
135 Downloading {
136 name: String,
137 bytes: u64,
138 total: u64,
139 },
140 DownloadComplete {
141 name: String,
142 },
143 Verifying {
144 name: String,
145 },
146 Applying {
147 directive_index: usize,
148 total: usize,
149 },
150 Patching {
151 name: String,
152 },
153 CreatingBSA {
154 name: String,
155 },
156 LauncherConfigured {
157 report: modde_games::launcher::LauncherConfigurationReport,
158 },
159 InlineFile {
160 name: String,
161 },
162 StagingAdopted {
163 archive_batches: usize,
164 create_bsa: usize,
165 },
166 Complete,
167 Failed {
168 error: String,
169 },
170}
171
172pub struct WabbajackInstaller {
174 manifest: WabbajackManifest,
175 archives_by_hash: std::collections::HashMap<u64, usize>,
178 wabbajack_path: PathBuf,
180 store_dir: PathBuf,
181 staging_dir: PathBuf,
182 game_dir: Option<PathBuf>,
183 sources: Arc<Vec<AnySource>>,
184 concurrency: usize,
185 continue_on_error: bool,
190 byte_cache: Arc<ByteLruCache>,
191 diagnostics: Option<WabbajackDiagnostics>,
192 archive_retention: ArchiveRetentionPolicy,
193 missing_archive_policy: MissingArchivePolicy,
194 archive_memory_max_bytes: u64,
195}
196
197impl WabbajackInstaller {
198 #[must_use]
202 pub fn new(
203 manifest: WabbajackManifest,
204 wabbajack_path: PathBuf,
205 store_dir: PathBuf,
206 staging_dir: PathBuf,
207 ) -> Self {
208 let mut archives_by_hash = std::collections::HashMap::new();
209 for (idx, archive) in manifest.archives.iter().enumerate() {
210 archives_by_hash.entry(archive.hash).or_insert(idx);
211 }
212 Self {
213 manifest,
214 archives_by_hash,
215 wabbajack_path,
216 store_dir,
217 staging_dir,
218 game_dir: None,
219 sources: Arc::new(Vec::new()),
220 concurrency: DEFAULT_CONCURRENCY,
221 continue_on_error: false,
222 byte_cache: Arc::new(ByteLruCache::from_env()),
223 diagnostics: None,
224 archive_retention: ArchiveRetentionPolicy::from_env(),
225 missing_archive_policy: MissingArchivePolicy::Fail,
226 archive_memory_max_bytes: std::env::var("MODDE_ARCHIVE_MEMORY_MAX_BYTES")
227 .ok()
228 .and_then(|v| v.parse::<u64>().ok())
229 .unwrap_or(DEFAULT_ARCHIVE_MEMORY_MAX_BYTES),
230 }
231 }
232
233 pub fn set_continue_on_error(&mut self, value: bool) {
235 self.continue_on_error = value;
236 }
237
238 pub fn set_diagnostics(&mut self, diagnostics: WabbajackDiagnostics) {
240 self.diagnostics = Some(diagnostics);
241 }
242
243 pub fn set_archive_retention(&mut self, policy: ArchiveRetentionPolicy) {
245 self.archive_retention = policy;
246 }
247
248 pub fn set_missing_archive_policy(&mut self, policy: MissingArchivePolicy) {
250 self.missing_archive_policy = policy;
251 }
252
253 pub fn set_game_dir(&mut self, game_dir: PathBuf) {
255 self.game_dir = Some(game_dir);
256 }
257
258 pub fn add_source(&mut self, source: AnySource) {
260 Arc::get_mut(&mut self.sources)
261 .expect("add_source must be called before install")
262 .push(source);
263 }
264
265 pub fn set_concurrency(&mut self, concurrency: usize) {
267 self.concurrency = concurrency.max(1);
268 }
269
270 pub async fn install(&self, progress_tx: mpsc::UnboundedSender<InstallProgress>) -> Result<()> {
272 let staging_store = StagingStore::new(&self.staging_dir);
273 staging_store.prepare_resumable().await?;
274 let _diagnostics_heartbeat = self.diagnostics.as_ref().map(|diagnostics| {
275 let cache = Arc::clone(&self.byte_cache);
276 DiagnosticsHeartbeatGuard(Some(
277 diagnostics.spawn_heartbeat(Arc::new(move || cache.bytes_used())),
278 ))
279 });
280 if let Some(diagnostics) = &self.diagnostics {
281 diagnostics.set_phase("preflight");
282 diagnostics.record_progress(ProgressEvent::Other);
283 }
284
285 let downloads = self.manifest.download_directives();
286 let installs = self.manifest.install_directives();
287
288 self.validate_game_file_sources()?;
289 self.verify_game_file_sources().await?;
290 self.validate_download_sources(&downloads)?;
291 self.preflight_authored_files().await?;
292 self.check_diagnostics_abort()?;
293
294 progress_tx
295 .send(InstallProgress::Starting {
296 total_downloads: downloads.len(),
297 })
298 .ok();
299
300 let total = installs.len();
308 let progress_for_parallel = progress_tx.clone();
309 let max_in_flight = std::env::var("MODDE_APPLY_MAX_IN_FLIGHT")
322 .ok()
323 .and_then(|v| v.parse::<usize>().ok())
324 .filter(|&n| n > 0)
325 .unwrap_or_else(|| {
326 std::thread::available_parallelism()
327 .map(std::num::NonZeroUsize::get)
328 .unwrap_or(8)
329 .clamp(1, 4)
330 });
331 let max_ram_fraction = std::env::var("MODDE_APPLY_RAM_FRACTION")
336 .ok()
337 .and_then(|v| v.parse::<f64>().ok())
338 .filter(|&v| v > 0.0 && v <= 1.0)
339 .unwrap_or(0.85);
340 let safety_reserve_bytes = std::env::var("MODDE_APPLY_SAFETY_RESERVE_GIB")
341 .ok()
342 .and_then(|v| v.parse::<u64>().ok())
343 .map_or(2 * (1_u64 << 30), |g| g * (1_u64 << 30));
344 let page_cache_fraction = std::env::var("MODDE_APPLY_PAGE_CACHE_FRACTION")
356 .ok()
357 .and_then(|v| v.parse::<f64>().ok())
358 .filter(|&v: &f64| v > 0.0 && v <= 1.0)
359 .unwrap_or(1.0);
360 let weighted_config = memory_admission::weighted::WeightedConfig {
361 base: memory_admission::Config {
362 max_ram_fraction,
363 ..memory_admission::Config::default()
364 }
365 .validate()
366 .expect("hard-coded default memory-admission config is valid"),
367 safety_reserve_bytes,
368 max_single_weight_bytes: 4 * (1_u64 << 30),
371 max_page_cache_fraction: page_cache_fraction,
372 ..memory_admission::weighted::WeightedConfig::default()
373 };
374 let provider: memory_admission::provider::SharedMemoryProvider = {
380 #[cfg(target_os = "linux")]
381 {
382 memory_admission::providers::CgroupV2Provider::shared()
383 .unwrap_or_else(|| memory_admission::providers::ProcMeminfoProvider::shared())
384 }
385 #[cfg(not(target_os = "linux"))]
386 {
387 memory_admission::providers::ProcMeminfoProvider::shared()
388 }
389 };
390 let gate =
391 memory_admission::weighted::AsyncWeightedAdmissionGate::new(weighted_config, provider);
392
393 let archive_size_by_hash: HashMap<u64, u64> = self
396 .manifest
397 .archives
398 .iter()
399 .map(|a| (a.hash, a.size))
400 .collect();
401 let archive_size_by_hash = Arc::new(archive_size_by_hash);
402
403 info!(
404 max_in_flight,
405 max_ram_fraction,
406 safety_reserve_gib = safety_reserve_bytes / (1 << 30),
407 page_cache_fraction,
408 byte_cache_used = self.byte_cache.bytes_used(),
409 "starting weighted parallel apply pass"
410 );
411 let inline_source = installs
412 .iter()
413 .any(|directive| {
414 matches!(
415 directive,
416 InstallDirective::InlineFile { .. }
417 | InstallDirective::PatchedFromArchive { .. }
418 )
419 })
420 .then(|| InlineSource::open(&self.wabbajack_path).map(Arc::new))
421 .transpose()?;
422
423 let impact = MissingArchiveImpact::analyze(&self.manifest, &self.store_dir);
429 let skip_plan = impact.skip_plan(&self.manifest, self.missing_archive_policy);
430 if !skip_plan.is_empty() {
431 warn!(
432 policy = ?self.missing_archive_policy,
433 missing_archives = impact.missing_archives.len(),
434 skipped_directives = skip_plan.skipped_directives.len(),
435 skipped_mod_roots = skip_plan.skipped_mod_roots.len(),
436 "omitting Wabbajack outputs for missing optional archives"
437 );
438 }
439
440 let mut inline_directives: Vec<(usize, &InstallDirective)> = Vec::new();
441 for (i, directive) in installs.iter().enumerate() {
442 if matches!(directive, InstallDirective::InlineFile { .. })
443 && !skip_plan.should_skip_directive(i, directive)
444 {
445 inline_directives.push((i, directive));
446 }
447 }
448 let mut archive_batches = self.manifest.install_directives_grouped_by_archive();
449 if !skip_plan.is_empty() {
450 for batch in &mut archive_batches {
451 batch.directives.retain(|indexed| {
452 !skip_plan.should_skip_directive(indexed.directive_index, &indexed.directive)
453 });
454 }
455 archive_batches.retain(|batch| !batch.directives.is_empty());
456 }
457 let archive_batch_count = archive_batches.len();
458 let patch_max_in_flight = std::env::var("MODDE_PATCH_MAX_IN_FLIGHT")
459 .ok()
460 .and_then(|v| v.parse::<usize>().ok())
461 .filter(|&n| n > 0)
462 .unwrap_or(1);
463 let patch_batch_gate = Arc::new(Semaphore::new(patch_max_in_flight));
464 let directive_weights = DirectiveWeights::from_env();
467 let archive_batch_weights = ArchiveBatchWeights::from_env();
468 info!(
469 archive_batch_count,
470 inline_directive_count = inline_directives.len(),
471 patch_max_in_flight,
472 "grouped apply directives by archive"
473 );
474 let adoption = self
475 .adopt_existing_staging(&archive_batches, &installs)
476 .await?;
477 self.check_diagnostics_abort()?;
478 if adoption.archive_batches > 0 || adoption.create_bsa > 0 {
479 info!(
480 archive_batches = adoption.archive_batches,
481 create_bsa = adoption.create_bsa,
482 "adopted existing Wabbajack staging outputs"
483 );
484 progress_tx
485 .send(InstallProgress::StagingAdopted {
486 archive_batches: adoption.archive_batches,
487 create_bsa: adoption.create_bsa,
488 })
489 .ok();
490 }
491
492 if let Some(diagnostics) = &self.diagnostics {
495 diagnostics.set_phase("apply-inline");
496 }
497 let inline_results: Vec<(usize, Result<()>)> = stream::iter(inline_directives)
498 .map(|(i, directive)| {
499 let progress_tx = progress_for_parallel.clone();
500 let gate = gate.clone();
501 let sizes = Arc::clone(&archive_size_by_hash);
502 let inline_source = inline_source.clone();
503 async move {
504 let weight = estimate_directive_weight(directive, &sizes, &directive_weights);
505 let _permit = gate.acquire(weight).await;
506 if let Err(e) = self.check_diagnostics_abort() {
507 return (i, Err(e));
508 }
509 progress_tx
510 .send(InstallProgress::Applying {
511 directive_index: i,
512 total,
513 })
514 .ok();
515 let InstallDirective::InlineFile { source_data_id, to } = directive else {
516 unreachable!("inline_directives only contains InlineFile");
517 };
518 progress_tx
519 .send(InstallProgress::InlineFile { name: to.clone() })
520 .ok();
521 let result = match inline_source.as_deref() {
522 Some(inline_source) => {
523 self.apply_inline_file(inline_source, source_data_id, to)
524 .await
525 }
526 None => Err(anyhow::anyhow!("inline source was not initialized")),
527 };
528 (i, result)
529 }
530 })
531 .buffer_unordered(max_in_flight)
532 .collect()
533 .await;
534
535 if let Some(diagnostics) = &self.diagnostics {
539 diagnostics.set_phase("apply-archive-batches");
540 }
541 let archive_results: Vec<Vec<(usize, Result<()>)>> = stream::iter(archive_batches)
542 .map(|batch| {
543 let progress_tx = progress_for_parallel.clone();
544 let gate = gate.clone();
545 let inline_source = inline_source.clone();
546 let patch_batch_gate = Arc::clone(&patch_batch_gate);
547 async move {
548 let _patch_permit = if archive_batch_has_patch(&batch) {
549 Some(
550 patch_batch_gate
551 .acquire_owned()
552 .await
553 .expect("semaphore open"),
554 )
555 } else {
556 None
557 };
558 let archive_weight =
562 estimate_archive_batch_weight(&batch, &archive_batch_weights);
563 let _permit = gate.acquire(archive_weight).await;
564 if let Err(e) = self.check_diagnostics_abort() {
565 return batch
566 .directives
567 .into_iter()
568 .map(|directive| {
569 (directive.directive_index, Err(anyhow::anyhow!("{e:#}")))
570 })
571 .collect();
572 }
573
574 for indexed in &batch.directives {
575 let i = indexed.directive_index;
576 progress_tx
577 .send(InstallProgress::Applying {
578 directive_index: i,
579 total,
580 })
581 .ok();
582 }
583 self.apply_archive_batch(batch, inline_source.as_deref(), &progress_tx)
584 .await
585 }
586 })
587 .buffer_unordered(max_in_flight)
588 .collect()
589 .await;
590
591 for (i, result) in inline_results
592 .into_iter()
593 .chain(archive_results.into_iter().flatten())
594 {
595 if let Err(e) = result {
596 if self.continue_on_error {
597 warn!(directive_index = i, "skipping directive after error: {e:#}");
598 } else {
599 return Err(e);
600 }
601 }
602 }
603
604 if let Some(diagnostics) = &self.diagnostics {
606 diagnostics.set_phase("create-bsa");
607 }
608 for (i, directive) in installs.iter().enumerate() {
609 self.check_diagnostics_abort()?;
610 let InstallDirective::CreateBSA {
611 temp_id,
612 to,
613 file_states,
614 } = directive
615 else {
616 continue;
617 };
618 if skip_plan.should_skip_create_bsa(i, temp_id, to) {
619 warn!(
620 directive_index = i,
621 temp_id,
622 to,
623 "skipping CreateBSA because an optional upstream archive is missing"
624 );
625 continue;
626 }
627 if self.create_bsa_sentinel_valid(i, temp_id, to).await {
628 continue;
629 }
630 progress_tx
631 .send(InstallProgress::Applying {
632 directive_index: i,
633 total,
634 })
635 .ok();
636 progress_tx
637 .send(InstallProgress::CreatingBSA { name: to.clone() })
638 .ok();
639 if let Err(e) = self.apply_create_bsa(temp_id, to, file_states).await {
640 if self.continue_on_error {
641 warn!(directive_index = i, "skipping CreateBSA after error: {e:#}");
642 } else {
643 return Err(e);
644 }
645 } else {
646 self.write_create_bsa_sentinel(i, temp_id, to).await?;
647 if let Some(diagnostics) = &self.diagnostics {
648 diagnostics.record_progress(ProgressEvent::CreateBsaComplete);
649 }
650 }
651 }
652
653 if let Some(diagnostics) = &self.diagnostics {
654 diagnostics.set_phase("compress-staging");
655 }
656 self.check_diagnostics_abort()?;
657 let summary = staging_store
658 .compress_eligible_files(self.concurrency)
659 .await?;
660 info!(
661 compressed_files = summary.compressed_files,
662 skipped_files = summary.skipped_files,
663 original_bytes = summary.original_bytes,
664 compressed_bytes = summary.compressed_bytes,
665 "compressed Wabbajack staging files"
666 );
667
668 progress_tx.send(InstallProgress::Complete).ok();
669 if let Some(diagnostics) = &self.diagnostics {
670 diagnostics.set_phase("complete");
671 diagnostics.record_progress(ProgressEvent::Other);
672 }
673 info!("wabbajack installation complete");
674 Ok(())
675 }
676
677 async fn ensure_archive_trusted(
678 &self,
679 archive_hash: u64,
680 progress_tx: &mpsc::UnboundedSender<InstallProgress>,
681 ) -> Result<(TrustedArchive, ArchiveTrustStats)> {
682 let path = archive_path(&self.store_dir, &archive_hash);
683 let mut stats = ArchiveTrustStats::default();
684
685 if path.exists() {
686 if self.verified_sidecar_valid(&path, archive_hash).await {
687 stats.sidecar_hit = true;
688 } else {
689 let metadata = tokio::fs::metadata(&path).await?;
690 progress_tx
691 .send(InstallProgress::Verifying {
692 name: format!("{archive_hash:016x}"),
693 })
694 .ok();
695 modde_core::hash::verify_xxh64(&path, archive_hash)
696 .await
697 .with_context(|| {
698 format!("hash verification failed for archive {archive_hash:016x}")
699 })?;
700 stats.streamed_hash_bytes = metadata.len();
701 self.write_verified_sidecar(&path, archive_hash).await?;
702 }
703 return self
704 .trusted_archive_for_path(path, archive_hash, stats)
705 .await;
706 }
707
708 let Some(directive) = self
709 .archives_by_hash
710 .get(&archive_hash)
711 .and_then(|&i| self.manifest.archives[i].download_directive())
712 else {
713 bail!(
714 "archive {archive_hash:016x} is not present in store and has no download directive"
715 );
716 };
717
718 let name = directive.display_name().into_owned();
719 if matches!(directive, DownloadDirective::Manual { .. }) {
720 bail!("manual archive {name} ({archive_hash:016x}) is missing from the store");
721 }
722
723 let Some(source) = self
724 .sources
725 .iter()
726 .find(|source| source.can_handle(&directive))
727 else {
728 bail!("no download source registered for archive {name} ({archive_hash:016x})");
729 };
730
731 if let Some(parent) = path.parent() {
732 tokio::fs::create_dir_all(parent).await?;
733 }
734 let handle = source
735 .resolve(&directive)
736 .await
737 .with_context(|| format!("failed to resolve download: {name}"))?;
738 progress_tx
739 .send(InstallProgress::Downloading {
740 name: name.clone(),
741 bytes: 0,
742 total: handle.size_hint.unwrap_or(0),
743 })
744 .ok();
745 source
746 .download(handle, &path)
747 .await
748 .with_context(|| format!("failed to download: {name}"))?;
749 self.write_verified_sidecar(&path, archive_hash).await?;
750 progress_tx
751 .send(InstallProgress::DownloadComplete { name })
752 .ok();
753 self.trusted_archive_for_path(path, archive_hash, stats)
754 .await
755 }
756
757 async fn trusted_archive_for_path(
758 &self,
759 path: PathBuf,
760 archive_hash: u64,
761 mut stats: ArchiveTrustStats,
762 ) -> Result<(TrustedArchive, ArchiveTrustStats)> {
763 let metadata = tokio::fs::metadata(&path).await?;
764 if metadata.len() <= self.archive_memory_max_bytes
765 && archive_path_is_memory_supported(&path).await
766 {
767 let bytes = tokio::fs::read(&path).await?;
768 stats.memory_archive_hit = true;
769 return Ok((
770 TrustedArchive::Bytes {
771 label: format!("{archive_hash:016x}.archive"),
772 bytes: Bytes::from(bytes),
773 fallback_path: path,
774 },
775 stats,
776 ));
777 }
778 stats.disk_fallback = true;
779 Ok((TrustedArchive::Path(path), stats))
780 }
781
782 async fn verified_sidecar_valid(&self, path: &Path, archive_hash: u64) -> bool {
783 let Ok(metadata) = tokio::fs::metadata(path).await else {
784 return false;
785 };
786 let Ok(data) = tokio::fs::read_to_string(verified_sidecar_path(path)).await else {
787 return false;
788 };
789 let Ok(sidecar) = serde_json::from_str::<VerifiedArchiveSidecar>(&data) else {
790 return false;
791 };
792 sidecar.pipeline_version == APPLY_STATE_VERSION
793 && sidecar.archive_hash == archive_hash
794 && sidecar.size_bytes == metadata.len()
795 && sidecar.modified_unix_ms == metadata_modified_unix_ms(&metadata)
796 }
797
798 async fn write_verified_sidecar(&self, path: &Path, archive_hash: u64) -> Result<()> {
799 let metadata = tokio::fs::metadata(path).await?;
800 let sidecar = VerifiedArchiveSidecar {
801 pipeline_version: APPLY_STATE_VERSION,
802 archive_hash,
803 size_bytes: metadata.len(),
804 modified_unix_ms: metadata_modified_unix_ms(&metadata),
805 verified_unix_ms: unix_ms(),
806 };
807 tokio::fs::write(
808 verified_sidecar_path(path),
809 serde_json::to_vec_pretty(&sidecar)?,
810 )
811 .await?;
812 Ok(())
813 }
814
815 async fn maybe_prune_archive(&self, batch: &ArchiveInstallBatch) -> Result<u64> {
816 if self.archive_retention == ArchiveRetentionPolicy::Keep {
817 return Ok(0);
818 }
819 if self.game_file_source_path(batch.archive_hash)?.is_some() {
820 return Ok(0);
821 }
822 let path = archive_path(&self.store_dir, &batch.archive_hash);
823 if !path.exists() {
824 return Ok(0);
825 }
826 if self.archive_retention == ArchiveRetentionPolicy::Auto
827 && (batch.archive_size_bytes > self.archive_memory_max_bytes
828 || batch.directives.len() > 1)
829 {
830 return Ok(0);
831 }
832 let size = tokio::fs::metadata(&path)
833 .await
834 .map(|m| m.len())
835 .unwrap_or(0);
836 tokio::fs::remove_file(&path).await?;
837 let _ = tokio::fs::remove_file(verified_sidecar_path(&path)).await;
838 Ok(size)
839 }
840
841 async fn apply_from_archive(
843 &self,
844 archive_hash: u64,
845 from: &str,
846 to: &str,
847 expected_size: u64,
848 ) -> Result<()> {
849 validate_archive_entry(from)?;
851 validate_archive_entry(to)?;
852
853 let output_path = self.staging_dir.join(normalize_path(to));
854
855 if let Some(parent) = output_path.parent() {
856 tokio::fs::create_dir_all(parent).await?;
857 }
858
859 if let Some(source) = self.game_file_source_path(archive_hash)? {
860 if game_file_source_is_whole_file(&source.path, &source.rel_path, from) {
861 modde_core::link::link_or_copy(&source.path, &output_path).await?;
862 } else {
863 let archive_path = source.path;
864 let from = from.to_string();
865 tokio::task::spawn_blocking(move || {
866 ArchiveBatchExtractor::extract_selected(
867 &archive_path,
868 &[ArchiveRequest {
869 directive_index: 0,
870 from,
871 inner_path: None,
872 kind: ArchiveRequestKind::WriteFile {
873 to: output_path,
874 expected_size: (expected_size > 0).then_some(expected_size),
875 },
876 }],
877 )
878 })
879 .await??;
880 }
881 info!(from = %from, to = %to, "extracted game-file source");
882 return Ok(());
883 }
884
885 let data = self
888 .read_archive_source(archive_hash, from)
889 .await
890 .with_context(|| {
891 format!("failed to extract '{from}' from archive {archive_hash:016x}")
892 })?;
893
894 tokio::fs::write(&output_path, &data).await?;
895
896 info!(from = %from, to = %to, "extracted file from archive");
897 Ok(())
898 }
899
900 async fn apply_archive_batch(
901 &self,
902 batch: ArchiveInstallBatch,
903 inline_source: Option<&InlineSource>,
904 progress_tx: &mpsc::UnboundedSender<InstallProgress>,
905 ) -> Vec<(usize, Result<()>)> {
906 if let Err(e) = self.check_diagnostics_abort() {
907 return batch
908 .directives
909 .into_iter()
910 .map(|directive| (directive.directive_index, Err(anyhow::anyhow!("{e:#}"))))
911 .collect();
912 }
913 if self.archive_batch_sentinel_valid(&batch).await {
914 return batch
915 .directives
916 .into_iter()
917 .map(|directive| (directive.directive_index, Ok(())))
918 .collect();
919 }
920
921 let mut native_requests = Vec::new();
922 let mut patch_directives = HashMap::new();
923 let mut results = Vec::with_capacity(batch.directives.len());
924 let batch_started = Instant::now();
925 let mut extraction_ms = 0;
926 let mut patch_ms = 0;
927 let mut trust_check_ms = 0;
928 let mut prune_ms = 0;
929 let mut pruned_bytes = 0;
930 let mut trust_stats = ArchiveTrustStats::default();
931 let mut extracted_patch_source_bytes = 0;
932 let byte_cache_used_before = self.byte_cache.bytes_used();
933 let process_before = current_process_snapshot();
934 let patch_count = batch
935 .directives
936 .iter()
937 .filter(|indexed| {
938 matches!(
939 indexed.directive,
940 InstallDirective::PatchedFromArchive { .. }
941 )
942 })
943 .count();
944 let _batch_guard = self.diagnostics.as_ref().map(|diagnostics| {
945 diagnostics.start_archive_batch(
946 batch.archive_hash,
947 batch.directives.len(),
948 patch_count,
949 batch.archive_size_bytes,
950 )
951 });
952 info!(
953 archive_hash = %format!("{:016x}", batch.archive_hash),
954 archive_size_bytes = batch.archive_size_bytes,
955 directives = batch.directives.len(),
956 patch_directives = patch_count,
957 byte_cache_used = self.byte_cache.bytes_used(),
958 "starting archive apply batch"
959 );
960
961 let trusted_archive = if self
962 .game_file_source_path(batch.archive_hash)
963 .ok()
964 .flatten()
965 .is_some()
966 {
967 None
968 } else {
969 let trust_started = Instant::now();
970 let trusted = self
971 .ensure_archive_trusted(batch.archive_hash, progress_tx)
972 .await;
973 trust_check_ms = trust_started.elapsed().as_millis();
974 match trusted {
975 Ok((archive, stats)) => {
976 trust_stats = stats;
977 Some(archive)
978 }
979 Err(e) => {
980 let msg = format!("{e:#}");
981 let results = batch
982 .directives
983 .iter()
984 .map(|indexed| {
985 (
986 indexed.directive_index,
987 Err(anyhow::anyhow!("archive trust check failed: {msg}")),
988 )
989 })
990 .collect::<Vec<_>>();
991 self.write_archive_batch_metrics(
992 &batch,
993 patch_count,
994 batch_started,
995 trust_check_ms,
996 extraction_ms,
997 patch_ms,
998 prune_ms,
999 extracted_patch_source_bytes,
1000 trust_stats,
1001 pruned_bytes,
1002 byte_cache_used_before,
1003 process_before,
1004 &results,
1005 )
1006 .await;
1007 return results;
1008 }
1009 }
1010 };
1011
1012 for indexed in &batch.directives {
1013 if let Err(e) = self.check_diagnostics_abort() {
1014 results.push((indexed.directive_index, Err(e)));
1015 continue;
1016 }
1017 match &indexed.directive {
1018 InstallDirective::FromArchive {
1019 archive_hash,
1020 from,
1021 inner_path,
1022 to,
1023 size,
1024 } => match self.game_file_source_path(*archive_hash) {
1025 Ok(Some(_)) => {
1026 results.push((
1027 indexed.directive_index,
1028 self.apply_from_archive(*archive_hash, from, to, *size)
1029 .await,
1030 ));
1031 }
1032 Ok(None) => {
1033 if let Err(e) =
1038 validate_archive_entry(from).and_then(|()| validate_archive_entry(to))
1039 {
1040 results.push((indexed.directive_index, Err(e)));
1041 continue;
1042 }
1043 let output_path = self.staging_dir.join(normalize_path(to));
1044 native_requests.push(ArchiveRequest {
1045 directive_index: indexed.directive_index,
1046 from: from.clone(),
1047 inner_path: inner_path.clone(),
1048 kind: ArchiveRequestKind::WriteFile {
1049 to: output_path,
1050 expected_size: (*size > 0).then_some(*size),
1051 },
1052 });
1053 }
1054 Err(e) => results.push((indexed.directive_index, Err(e))),
1055 },
1056 InstallDirective::PatchedFromArchive {
1057 archive_hash,
1058 from,
1059 inner_path,
1060 to,
1061 patch_id,
1062 size,
1063 } => {
1064 progress_tx
1065 .send(InstallProgress::Patching { name: to.clone() })
1066 .ok();
1067 match self.game_file_source_path(*archive_hash) {
1068 Ok(Some(_)) => {
1069 let Some(inline_source) = inline_source else {
1070 results.push((
1071 indexed.directive_index,
1072 Err(anyhow::anyhow!("inline source was not initialized")),
1073 ));
1074 continue;
1075 };
1076 results.push((
1077 indexed.directive_index,
1078 self.apply_patched_from_archive(
1079 inline_source,
1080 *archive_hash,
1081 from,
1082 to,
1083 patch_id,
1084 *size,
1085 )
1086 .await,
1087 ));
1088 }
1089 Ok(None) => {
1090 if let Some(bytes) = self.patch_source_from_cache(*archive_hash, from) {
1091 let Some(inline_source) = inline_source else {
1092 results.push((
1093 indexed.directive_index,
1094 Err(anyhow::anyhow!("inline source was not initialized")),
1095 ));
1096 continue;
1097 };
1098 results.push((
1099 indexed.directive_index,
1100 self.write_patched_output(
1101 inline_source,
1102 bytes,
1103 to,
1104 patch_id,
1105 *size,
1106 )
1107 .await,
1108 ));
1109 } else {
1110 patch_directives.insert(
1111 indexed.directive_index,
1112 (
1113 *archive_hash,
1114 from.clone(),
1115 inner_path.clone(),
1116 to.clone(),
1117 patch_id.clone(),
1118 *size,
1119 ),
1120 );
1121 native_requests.push(ArchiveRequest {
1122 directive_index: indexed.directive_index,
1123 from: from.clone(),
1124 inner_path: inner_path.clone(),
1125 kind: ArchiveRequestKind::Bytes,
1126 });
1127 }
1128 }
1129 Err(e) => results.push((indexed.directive_index, Err(e))),
1130 }
1131 }
1132 _ => unreachable!("archive batch contains only archive-backed directives"),
1133 }
1134 }
1135
1136 if native_requests.is_empty() {
1137 if results.iter().all(|(_, result)| result.is_ok())
1138 && let Err(e) = self.write_archive_batch_sentinel(&batch).await
1139 {
1140 let msg = format!("{e:#}");
1141 return results
1142 .into_iter()
1143 .map(|(idx, result)| {
1144 if result.is_ok() {
1145 (
1146 idx,
1147 Err(anyhow::anyhow!(
1148 "failed to write archive batch sentinel: {msg}"
1149 )),
1150 )
1151 } else {
1152 (idx, result)
1153 }
1154 })
1155 .collect();
1156 }
1157 if results.iter().all(|(_, result)| result.is_ok())
1158 && let Some(diagnostics) = &self.diagnostics
1159 {
1160 diagnostics.record_progress(ProgressEvent::ArchiveBatchComplete);
1161 }
1162 self.write_archive_batch_metrics(
1163 &batch,
1164 patch_count,
1165 batch_started,
1166 trust_check_ms,
1167 extraction_ms,
1168 patch_ms,
1169 prune_ms,
1170 extracted_patch_source_bytes,
1171 trust_stats,
1172 pruned_bytes,
1173 byte_cache_used_before,
1174 process_before,
1175 &results,
1176 )
1177 .await;
1178 return results;
1179 }
1180
1181 let mut write_requests = Vec::new();
1182 let mut patch_requests = Vec::new();
1183 for request in native_requests {
1184 if patch_directives.contains_key(&request.directive_index) {
1185 patch_requests.push(request);
1186 } else {
1187 write_requests.push(request);
1188 }
1189 }
1190
1191 if !write_requests.is_empty() {
1192 let extraction_started = Instant::now();
1193 let native_result =
1194 extract_trusted_archive_requests(trusted_archive.clone(), write_requests.clone())
1195 .await;
1196 extraction_ms += extraction_started.elapsed().as_millis();
1197 match native_result {
1198 Ok(_) => {
1199 results.extend(
1200 write_requests
1201 .iter()
1202 .map(|request| (request.directive_index, Ok(()))),
1203 );
1204 }
1205 Err(e) => {
1206 let msg = format!("{e:#}");
1207 results.extend(write_requests.iter().map(|request| {
1208 (
1209 request.directive_index,
1210 Err(anyhow::anyhow!("archive batch extraction failed: {msg}")),
1211 )
1212 }));
1213 }
1214 }
1215 }
1216
1217 let mut nested_patch_groups: HashMap<String, Vec<ArchiveRequest>> = HashMap::new();
1218 let mut plain_patch_requests = Vec::new();
1219 for request in patch_requests {
1220 if request.inner_path.is_some() {
1221 nested_patch_groups
1222 .entry(normalize_path(&request.from).to_lowercase())
1223 .or_default()
1224 .push(request);
1225 } else {
1226 plain_patch_requests.push(request);
1227 }
1228 }
1229
1230 let patch_chunk_size = archive_patch_chunk_size();
1231 for chunk in plain_patch_requests.chunks(patch_chunk_size) {
1232 let chunk_requests = chunk.to_vec();
1233 let extraction_started = Instant::now();
1234 let native_result =
1235 extract_trusted_archive_requests(trusted_archive.clone(), chunk_requests.clone())
1236 .await;
1237 extraction_ms += extraction_started.elapsed().as_millis();
1238 let mut output = match native_result {
1239 Ok(output) => output,
1240 Err(e) => {
1241 let msg = format!("{e:#}");
1242 results.extend(chunk_requests.iter().map(|request| {
1243 (
1244 request.directive_index,
1245 Err(anyhow::anyhow!("archive batch extraction failed: {msg}")),
1246 )
1247 }));
1248 continue;
1249 }
1250 };
1251
1252 for request in chunk_requests {
1253 if let Err(e) = self.check_diagnostics_abort() {
1254 results.push((request.directive_index, Err(e)));
1255 continue;
1256 }
1257 if let Some((archive_hash, from, inner_path, to, patch_id, size)) =
1258 patch_directives.remove(&request.directive_index)
1259 {
1260 let Some(inline_source) = inline_source else {
1261 results.push((
1262 request.directive_index,
1263 Err(anyhow::anyhow!("inline source was not initialized")),
1264 ));
1265 continue;
1266 };
1267 let Some(bytes) = output.bytes.remove(&request.directive_index) else {
1268 results.push((
1269 request.directive_index,
1270 Err(anyhow::anyhow!(
1271 "patch source '{}' missing from extractor output",
1272 inner_path.as_deref().unwrap_or(&from)
1273 )),
1274 ));
1275 continue;
1276 };
1277 extracted_patch_source_bytes += bytes.len() as u64;
1278 let bytes = self.cache_patch_source(archive_hash, &from, Bytes::from(bytes));
1279 let patch_started = Instant::now();
1280 results.push((
1281 request.directive_index,
1282 self.write_patched_output(inline_source, bytes, &to, &patch_id, size)
1283 .await,
1284 ));
1285 patch_ms += patch_started.elapsed().as_millis();
1286 } else {
1287 results.push((request.directive_index, Ok(())));
1288 }
1289 }
1290 }
1291
1292 for group_requests in nested_patch_groups.into_values() {
1293 let Some(first_request) = group_requests.first() else {
1294 continue;
1295 };
1296 let outer_directive_index = first_request.directive_index;
1297 let outer_from = first_request.from.clone();
1298 let outer_request = ArchiveRequest {
1299 directive_index: outer_directive_index,
1300 from: outer_from.clone(),
1301 inner_path: None,
1302 kind: ArchiveRequestKind::Bytes,
1303 };
1304 let extraction_started = Instant::now();
1305 let outer_result =
1306 extract_trusted_archive_requests(trusted_archive.clone(), vec![outer_request])
1307 .await;
1308 extraction_ms += extraction_started.elapsed().as_millis();
1309 let outer_bytes = match outer_result {
1310 Ok(mut output) => {
1311 if let Some(bytes) = output.bytes.remove(&outer_directive_index) {
1312 Bytes::from(bytes)
1313 } else {
1314 let msg = format!("nested archive '{outer_from}' missing from output");
1315 results.extend(group_requests.iter().map(|request| {
1316 (request.directive_index, Err(anyhow::anyhow!(msg.clone())))
1317 }));
1318 continue;
1319 }
1320 }
1321 Err(e) => {
1322 let msg = format!("{e:#}");
1323 results.extend(group_requests.iter().map(|request| {
1324 (
1325 request.directive_index,
1326 Err(anyhow::anyhow!(
1327 "nested archive extraction failed for '{outer_from}': {msg}"
1328 )),
1329 )
1330 }));
1331 continue;
1332 }
1333 };
1334
1335 let inner_requests = group_requests
1336 .iter()
1337 .filter_map(|request| {
1338 Some(ArchiveRequest {
1339 directive_index: request.directive_index,
1340 from: request.inner_path.clone()?,
1341 inner_path: None,
1342 kind: ArchiveRequestKind::Bytes,
1343 })
1344 })
1345 .collect::<Vec<_>>();
1346
1347 let nested_temp =
1348 if outer_bytes.starts_with(b"BSA\0") || outer_bytes.starts_with(b"BTDX") {
1349 let temp_result = (|| -> Result<tempfile::NamedTempFile> {
1350 let mut temp = tempfile::NamedTempFile::new().with_context(|| {
1351 format!("failed to create temp nested archive for {outer_from}")
1352 })?;
1353 std::io::Write::write_all(&mut temp, &outer_bytes).with_context(|| {
1354 format!("failed to write temp nested archive for {outer_from}")
1355 })?;
1356 std::io::Write::flush(&mut temp).with_context(|| {
1357 format!("failed to flush temp nested archive for {outer_from}")
1358 })?;
1359 Ok(temp)
1360 })();
1361 let Ok(temp) = temp_result else {
1362 let msg = format!("{:#}", temp_result.unwrap_err());
1363 results.extend(group_requests.iter().map(|request| {
1364 (
1365 request.directive_index,
1366 Err(anyhow::anyhow!(
1367 "failed to stage nested archive '{outer_from}': {msg}"
1368 )),
1369 )
1370 }));
1371 continue;
1372 };
1373 Some(temp)
1374 } else {
1375 None
1376 };
1377
1378 for chunk in inner_requests.chunks(patch_chunk_size) {
1379 let chunk_requests = chunk.to_vec();
1380 let extraction_started = Instant::now();
1381 let native_result = if let Some(temp) = &nested_temp {
1382 extract_archive_path_requests(temp.path().to_path_buf(), chunk_requests.clone())
1383 .await
1384 } else {
1385 extract_nested_archive_requests(
1386 outer_from.clone(),
1387 outer_bytes.clone(),
1388 chunk_requests.clone(),
1389 )
1390 .await
1391 };
1392 extraction_ms += extraction_started.elapsed().as_millis();
1393 let mut output = match native_result {
1394 Ok(output) => output,
1395 Err(e) => {
1396 let msg = format!("{e:#}");
1397 results.extend(chunk_requests.iter().map(|request| {
1398 (
1399 request.directive_index,
1400 Err(anyhow::anyhow!(
1401 "nested archive batch extraction failed: {msg}"
1402 )),
1403 )
1404 }));
1405 continue;
1406 }
1407 };
1408
1409 for request in chunk_requests {
1410 if let Err(e) = self.check_diagnostics_abort() {
1411 results.push((request.directive_index, Err(e)));
1412 continue;
1413 }
1414 if let Some((archive_hash, from, inner_path, to, patch_id, size)) =
1415 patch_directives.remove(&request.directive_index)
1416 {
1417 let Some(inline_source) = inline_source else {
1418 results.push((
1419 request.directive_index,
1420 Err(anyhow::anyhow!("inline source was not initialized")),
1421 ));
1422 continue;
1423 };
1424 let Some(bytes) = output.bytes.remove(&request.directive_index) else {
1425 results.push((
1426 request.directive_index,
1427 Err(anyhow::anyhow!(
1428 "patch source '{}' missing from nested extractor output",
1429 inner_path.as_deref().unwrap_or(&from)
1430 )),
1431 ));
1432 continue;
1433 };
1434 extracted_patch_source_bytes += bytes.len() as u64;
1435 let cache_key = inner_path.as_deref().unwrap_or(&from);
1436 let bytes =
1437 self.cache_patch_source(archive_hash, cache_key, Bytes::from(bytes));
1438 let patch_started = Instant::now();
1439 results.push((
1440 request.directive_index,
1441 self.write_patched_output(inline_source, bytes, &to, &patch_id, size)
1442 .await,
1443 ));
1444 patch_ms += patch_started.elapsed().as_millis();
1445 }
1446 }
1447 }
1448 }
1449
1450 if results.iter().all(|(_, result)| result.is_ok())
1451 && let Err(e) = self.write_archive_batch_sentinel(&batch).await
1452 {
1453 let msg = format!("{e:#}");
1454 return results
1455 .into_iter()
1456 .map(|(idx, result)| {
1457 if result.is_ok() {
1458 (
1459 idx,
1460 Err(anyhow::anyhow!(
1461 "failed to write archive batch sentinel: {msg}"
1462 )),
1463 )
1464 } else {
1465 (idx, result)
1466 }
1467 })
1468 .collect();
1469 }
1470 if results.iter().all(|(_, result)| result.is_ok())
1471 && let Some(diagnostics) = &self.diagnostics
1472 {
1473 diagnostics.record_progress(ProgressEvent::ArchiveBatchComplete);
1474 }
1475 if results.iter().all(|(_, result)| result.is_ok()) {
1476 let prune_started = Instant::now();
1477 match self.maybe_prune_archive(&batch).await {
1478 Ok(bytes) => pruned_bytes = bytes,
1479 Err(e) => warn!(
1480 archive_hash = %format!("{:016x}", batch.archive_hash),
1481 "failed to prune applied archive: {e:#}"
1482 ),
1483 }
1484 prune_ms = prune_started.elapsed().as_millis();
1485 }
1486 self.write_archive_batch_metrics(
1487 &batch,
1488 patch_count,
1489 batch_started,
1490 trust_check_ms,
1491 extraction_ms,
1492 patch_ms,
1493 prune_ms,
1494 extracted_patch_source_bytes,
1495 trust_stats,
1496 pruned_bytes,
1497 byte_cache_used_before,
1498 process_before,
1499 &results,
1500 )
1501 .await;
1502
1503 results
1504 }
1505
1506 async fn write_archive_batch_metrics(
1507 &self,
1508 batch: &ArchiveInstallBatch,
1509 patch_count: usize,
1510 batch_started: Instant,
1511 trust_check_ms: u128,
1512 extraction_ms: u128,
1513 patch_ms: u128,
1514 prune_ms: u128,
1515 extracted_patch_source_bytes: u64,
1516 trust_stats: ArchiveTrustStats,
1517 pruned_bytes: u64,
1518 byte_cache_used_before: u64,
1519 process_before: ProcessSnapshot,
1520 results: &[(usize, Result<()>)],
1521 ) {
1522 let Some(diagnostics) = &self.diagnostics else {
1523 return;
1524 };
1525 let process_after = current_process_snapshot();
1526 let record = ArchiveBatchRecord {
1527 kind: "archive_batch".to_string(),
1528 unix_ms: std::time::SystemTime::now()
1529 .duration_since(std::time::UNIX_EPOCH)
1530 .unwrap_or_default()
1531 .as_millis(),
1532 archive_hash: format!("{:016x}", batch.archive_hash),
1533 archive_size_bytes: batch.archive_size_bytes,
1534 directive_count: batch.directives.len(),
1535 patch_count,
1536 elapsed_ms: batch_started.elapsed().as_millis(),
1537 trust_check_ms,
1538 extraction_ms,
1539 patch_ms,
1540 prune_ms,
1541 extracted_patch_source_bytes,
1542 sidecar_hit: trust_stats.sidecar_hit,
1543 streamed_hash_bytes: trust_stats.streamed_hash_bytes,
1544 memory_archive_hit: trust_stats.memory_archive_hit,
1545 disk_archive_fallback: trust_stats.disk_fallback,
1546 pruned_bytes,
1547 byte_cache_used_before,
1548 byte_cache_used_after: self.byte_cache.bytes_used(),
1549 success_count: results.iter().filter(|(_, result)| result.is_ok()).count(),
1550 error_count: results.iter().filter(|(_, result)| result.is_err()).count(),
1551 first_error: results
1552 .iter()
1553 .find_map(|(_, result)| result.as_ref().err().map(|error| format!("{error:#}"))),
1554 rss_before_kib: process_before.vm_rss_kib,
1555 rss_after_kib: process_after.vm_rss_kib,
1556 swap_before_kib: process_before.vm_swap_kib,
1557 swap_after_kib: process_after.vm_swap_kib,
1558 };
1559 if let Err(e) = diagnostics.record_archive_batch(&record).await {
1560 warn!("failed to write Wabbajack archive batch diagnostics: {e:#}");
1561 }
1562 }
1563
1564 async fn adopt_existing_staging(
1565 &self,
1566 archive_batches: &[ArchiveInstallBatch],
1567 installs: &[InstallDirective],
1568 ) -> Result<StagingAdoptionSummary> {
1569 let mut summary = StagingAdoptionSummary::default();
1570
1571 for batch in archive_batches {
1572 if self.archive_batch_sentinel_valid(batch).await {
1573 continue;
1574 }
1575 if self.archive_batch_outputs_exist(batch).await {
1576 self.write_archive_batch_sentinel(batch).await?;
1577 summary.archive_batches += 1;
1578 }
1579 }
1580
1581 for (directive_index, directive) in installs.iter().enumerate() {
1582 let InstallDirective::CreateBSA { temp_id, to, .. } = directive else {
1583 continue;
1584 };
1585 if self
1586 .create_bsa_sentinel_valid(directive_index, temp_id, to)
1587 .await
1588 {
1589 continue;
1590 }
1591 if StagingStore::new(&self.staging_dir)
1592 .logical_exists(to)
1593 .await
1594 {
1595 self.write_create_bsa_sentinel(directive_index, temp_id, to)
1596 .await?;
1597 summary.create_bsa += 1;
1598 }
1599 }
1600
1601 Ok(summary)
1602 }
1603
1604 async fn archive_batch_sentinel_valid(&self, batch: &ArchiveInstallBatch) -> bool {
1605 let path = self.archive_batch_sentinel_path(batch.archive_hash);
1606 let Ok(data) = tokio::fs::read_to_string(&path).await else {
1607 return false;
1608 };
1609 let Ok(sentinel) = serde_json::from_str::<ArchiveBatchSentinel>(&data) else {
1610 return false;
1611 };
1612 let expected_indices = batch
1613 .directives
1614 .iter()
1615 .map(|d| d.directive_index)
1616 .collect::<Vec<_>>();
1617 sentinel.pipeline_version == APPLY_STATE_VERSION
1618 && sentinel.archive_hash == batch.archive_hash
1619 && sentinel.archive_size_bytes == batch.archive_size_bytes
1620 && sentinel.directive_indices == expected_indices
1621 && self.archive_batch_outputs_exist(batch).await
1622 }
1623
1624 async fn archive_batch_outputs_exist(&self, batch: &ArchiveInstallBatch) -> bool {
1625 let staging_store = StagingStore::new(&self.staging_dir);
1626 for indexed in &batch.directives {
1627 let to = match &indexed.directive {
1628 InstallDirective::FromArchive { to, .. }
1629 | InstallDirective::PatchedFromArchive { to, .. } => to,
1630 _ => continue,
1631 };
1632 if !staging_store.logical_exists(to).await {
1633 return false;
1634 }
1635 }
1636 true
1637 }
1638
1639 async fn write_archive_batch_sentinel(&self, batch: &ArchiveInstallBatch) -> Result<()> {
1640 let path = self.archive_batch_sentinel_path(batch.archive_hash);
1641 if let Some(parent) = path.parent() {
1642 tokio::fs::create_dir_all(parent).await?;
1643 }
1644 let sentinel = ArchiveBatchSentinel {
1645 pipeline_version: APPLY_STATE_VERSION,
1646 archive_hash: batch.archive_hash,
1647 archive_size_bytes: batch.archive_size_bytes,
1648 directive_indices: batch.directives.iter().map(|d| d.directive_index).collect(),
1649 };
1650 tokio::fs::write(&path, serde_json::to_vec_pretty(&sentinel)?).await?;
1651 Ok(())
1652 }
1653
1654 fn archive_batch_sentinel_path(&self, archive_hash: u64) -> PathBuf {
1655 self.staging_dir
1656 .join("_state/archive-batches")
1657 .join(format!("{archive_hash:016x}.json"))
1658 }
1659
1660 async fn create_bsa_sentinel_valid(
1661 &self,
1662 directive_index: usize,
1663 temp_id: &str,
1664 to: &str,
1665 ) -> bool {
1666 let path = self.create_bsa_sentinel_path(directive_index);
1667 let Ok(data) = tokio::fs::read_to_string(&path).await else {
1668 return false;
1669 };
1670 let Ok(sentinel) = serde_json::from_str::<CreateBsaSentinel>(&data) else {
1671 return false;
1672 };
1673 sentinel.pipeline_version == APPLY_STATE_VERSION
1674 && sentinel.directive_index == directive_index
1675 && sentinel.temp_id == temp_id
1676 && sentinel.to == to
1677 && StagingStore::new(&self.staging_dir)
1678 .logical_exists(to)
1679 .await
1680 }
1681
1682 async fn write_create_bsa_sentinel(
1683 &self,
1684 directive_index: usize,
1685 temp_id: &str,
1686 to: &str,
1687 ) -> Result<()> {
1688 let path = self.create_bsa_sentinel_path(directive_index);
1689 if let Some(parent) = path.parent() {
1690 tokio::fs::create_dir_all(parent).await?;
1691 }
1692 let sentinel = CreateBsaSentinel {
1693 pipeline_version: APPLY_STATE_VERSION,
1694 directive_index,
1695 temp_id: temp_id.to_string(),
1696 to: to.to_string(),
1697 };
1698 tokio::fs::write(&path, serde_json::to_vec_pretty(&sentinel)?).await?;
1699 Ok(())
1700 }
1701
1702 fn create_bsa_sentinel_path(&self, directive_index: usize) -> PathBuf {
1703 self.staging_dir
1704 .join("_state/create-bsa")
1705 .join(format!("{directive_index}.json"))
1706 }
1707
1708 async fn apply_inline_file(
1710 &self,
1711 inline_source: &InlineSource,
1712 source_data_id: &str,
1713 to: &str,
1714 ) -> Result<()> {
1715 let staging_store = StagingStore::new(&self.staging_dir);
1716
1717 validate_archive_entry(to)?;
1719
1720 let data = inline_source.read(source_data_id)?;
1721 let output_path = staging_store.write_path_for_logical(to, Some(data.len() as u64));
1722
1723 if let Some(parent) = output_path.parent() {
1724 tokio::fs::create_dir_all(parent).await?;
1725 }
1726
1727 write_bytes_maybe_zstd(&output_path, &data).await?;
1728
1729 info!(source_data_id = %source_data_id, to = %to, "wrote inline file");
1730 Ok(())
1731 }
1732
1733 async fn apply_patched_from_archive(
1735 &self,
1736 inline_source: &InlineSource,
1737 archive_hash: u64,
1738 from: &str,
1739 to: &str,
1740 patch_id: &str,
1741 expected_size: u64,
1742 ) -> Result<()> {
1743 let source_data = self
1745 .read_archive_source_cached(archive_hash, from)
1746 .await
1747 .with_context(|| {
1748 format!("failed to extract '{from}' from archive {archive_hash:016x} for patching")
1749 })?;
1750
1751 self.write_patched_output(inline_source, source_data, to, patch_id, expected_size)
1752 .await
1753 }
1754
1755 async fn write_patched_output(
1756 &self,
1757 inline_source: &InlineSource,
1758 source_data: Bytes,
1759 to: &str,
1760 patch_id: &str,
1761 expected_size: u64,
1762 ) -> Result<()> {
1763 validate_archive_entry(to)?;
1765
1766 let output_path = self.staging_dir.join(normalize_path(to));
1770 let part_path = output_path.with_extension(format!(
1771 "{}modde-part",
1772 output_path
1773 .extension()
1774 .and_then(|extension| extension.to_str())
1775 .map(|extension| format!("{extension}."))
1776 .unwrap_or_default()
1777 ));
1778 let stale_compressed = super::staging::compressed_path(&output_path);
1779
1780 if let Some(parent) = output_path.parent() {
1781 tokio::fs::create_dir_all(parent).await?;
1782 }
1783 if tokio::fs::metadata(&stale_compressed).await.is_ok() {
1784 tokio::fs::remove_file(&stale_compressed).await?;
1785 }
1786 if tokio::fs::metadata(&part_path).await.is_ok() {
1787 tokio::fs::remove_file(&part_path).await?;
1788 }
1789
1790 let patch_data = inline_source.read(patch_id)?;
1791 let _patch_guard = self.diagnostics.as_ref().map(|diagnostics| {
1792 diagnostics.start_patch(
1793 to.to_string(),
1794 patch_id.to_string(),
1795 source_data.len() as u64,
1796 expected_size,
1797 )
1798 });
1799
1800 let to_owned = to.to_string();
1801 let output_path_for_task = output_path.clone();
1802 let part_path_for_task = part_path.clone();
1803 let patch_result = tokio::task::spawn_blocking(move || {
1804 let mut output = std::fs::File::create(&part_path_for_task).with_context(|| {
1805 format!(
1806 "failed to create patched output: {}",
1807 part_path_for_task.display()
1808 )
1809 })?;
1810 let expected = (expected_size > 0).then_some(expected_size);
1811 let output_bytes = patcher::apply_patch_to_writer_limited(
1812 &source_data,
1813 &patch_data,
1814 &mut output,
1815 expected,
1816 )
1817 .with_context(|| format!("failed to apply patch for: {to_owned}"))?;
1818 output
1819 .sync_all()
1820 .with_context(|| format!("failed to sync patched output: {to_owned}"))?;
1821 drop(output);
1822 std::fs::rename(&part_path_for_task, &output_path_for_task).with_context(|| {
1823 format!(
1824 "failed to move patched output into place: {}",
1825 output_path_for_task.display()
1826 )
1827 })?;
1828 Ok::<u64, anyhow::Error>(output_bytes)
1829 })
1830 .await?;
1831 let output_bytes = match patch_result {
1832 Ok(output_bytes) => output_bytes,
1833 Err(e) => {
1834 let _ = tokio::fs::remove_file(&part_path).await;
1835 return Err(e);
1836 }
1837 };
1838 trim_process_allocator();
1839
1840 info!(
1841 to = %to,
1842 patch_id = %patch_id,
1843 expected_size,
1844 output_bytes,
1845 "applied binary patch"
1846 );
1847 Ok(())
1848 }
1849
1850 async fn apply_create_bsa(
1852 &self,
1853 temp_id: &str,
1854 to: &str,
1855 file_states: &[modde_core::manifest::wabbajack::BSAFileState],
1856 ) -> Result<()> {
1857 let bsa_staging = self.staging_dir.join(format!("bsa_temp_{temp_id}"));
1859 let output_path = self.staging_dir.join(normalize_path(to));
1860
1861 if let Some(parent) = output_path.parent() {
1862 tokio::fs::create_dir_all(parent).await?;
1863 }
1864
1865 bsa_repack::create_bsa(file_states, &bsa_staging, &output_path)
1866 .await
1867 .with_context(|| format!("failed to create BSA: {to}"))?;
1868
1869 info!(to = %to, files = file_states.len(), "created BSA archive");
1870 Ok(())
1871 }
1872
1873 fn validate_game_file_sources(&self) -> Result<()> {
1874 if !self.manifest.archives.iter().any(is_game_file_archive) {
1875 return Ok(());
1876 }
1877
1878 let Some(game_dir) = &self.game_dir else {
1879 bail!(
1880 "wabbajack modlist references local game files; pass --game-dir so modde can read the installed game files"
1881 );
1882 };
1883
1884 if !game_dir.is_dir() {
1885 bail!(
1886 "game directory for Wabbajack game-file sources does not exist: {}",
1887 game_dir.display()
1888 );
1889 }
1890
1891 Ok(())
1892 }
1893
1894 fn validate_download_sources(&self, downloads: &[DownloadDirective]) -> Result<()> {
1895 let missing: Vec<String> = downloads
1896 .iter()
1897 .filter(|directive| {
1898 !self
1899 .sources
1900 .iter()
1901 .any(|source| source.can_handle(directive))
1902 })
1903 .map(|directive| directive.display_name().into_owned())
1904 .collect();
1905
1906 if !missing.is_empty() {
1907 let shown = missing
1908 .iter()
1909 .take(20)
1910 .map(|name| format!(" - {name}"))
1911 .collect::<Vec<_>>()
1912 .join("\n");
1913 let omitted = missing.len().saturating_sub(20);
1914 let suffix = if omitted == 0 {
1915 String::new()
1916 } else {
1917 format!("\n ... and {omitted} more")
1918 };
1919 bail!(
1920 "Wabbajack manifest requires {} download(s) with no registered source:\n{}{}\nConfigure the missing source before running the install.",
1921 missing.len(),
1922 shown,
1923 suffix
1924 );
1925 }
1926
1927 Ok(())
1928 }
1929
1930 async fn preflight_authored_files(&self) -> Result<()> {
1931 for source in self.sources.iter() {
1932 if let AnySource::WabbajackCdn(source) = source {
1933 return source.preflight_archives(&self.manifest.archives).await;
1934 }
1935 }
1936 Ok(())
1937 }
1938
1939 async fn verify_game_file_sources(&self) -> Result<()> {
1940 let mut failures = Vec::new();
1941
1942 for archive in self
1943 .manifest
1944 .archives
1945 .iter()
1946 .filter(|a| is_game_file_archive(a))
1947 {
1948 let Some(source) = self.game_file_source_path(archive.hash)? else {
1949 continue;
1950 };
1951
1952 if !source.path.exists() {
1953 failures.push(format!(
1954 "game-file source '{}' is missing at {}",
1955 source.rel_path,
1956 source.path.display()
1957 ));
1958 continue;
1959 }
1960
1961 let actual = modde_core::hash::hash_file_xxh64(&source.path)
1962 .await
1963 .with_context(|| {
1964 format!("failed to hash game-file source '{}'", source.rel_path)
1965 })?;
1966 if actual != archive.hash {
1967 failures.push(format!(
1968 "game-file source '{}' failed hash verification (expected xxh64 {:016x}, got {:016x})",
1969 source.rel_path, archive.hash, actual
1970 ));
1971 }
1972 }
1973
1974 if !failures.is_empty() {
1975 bail!(
1976 "Wabbajack game-file source validation failed for {} file(s):\n{}",
1977 failures.len(),
1978 failures
1979 .iter()
1980 .map(|failure| format!(" - {failure}"))
1981 .collect::<Vec<_>>()
1982 .join("\n")
1983 );
1984 }
1985
1986 Ok(())
1987 }
1988
1989 async fn read_archive_source(&self, archive_hash: u64, from: &str) -> Result<Vec<u8>> {
1990 if let Some(source) = self.game_file_source_path(archive_hash)? {
1991 return read_game_file_source(&source.path, from).await;
1992 }
1993
1994 let archive_path = archive_path(&self.store_dir, &archive_hash);
1995 let from = from.to_string();
1996 tokio::task::spawn_blocking(move || {
1997 let output = ArchiveBatchExtractor::extract_selected(
1998 &archive_path,
1999 &[ArchiveRequest {
2000 directive_index: 0,
2001 from,
2002 inner_path: None,
2003 kind: ArchiveRequestKind::Bytes,
2004 }],
2005 )?;
2006 output
2007 .bytes
2008 .get(&0)
2009 .cloned()
2010 .ok_or_else(|| anyhow::anyhow!("archive extractor returned no bytes"))
2011 })
2012 .await?
2013 }
2014
2015 async fn read_archive_source_cached(&self, archive_hash: u64, from: &str) -> Result<Bytes> {
2016 if let Some(bytes) = self.patch_source_from_cache(archive_hash, from) {
2017 return Ok(bytes);
2018 }
2019 let data = self.read_archive_source(archive_hash, from).await?;
2020 Ok(self.cache_patch_source(archive_hash, from, Bytes::from(data)))
2021 }
2022
2023 fn patch_source_from_cache(&self, archive_hash: u64, from: &str) -> Option<Bytes> {
2024 self.byte_cache.get(&ByteCacheKey {
2025 archive_hash,
2026 inner_path: normalize_path(from),
2027 })
2028 }
2029
2030 fn cache_patch_source(&self, archive_hash: u64, from: &str, bytes: Bytes) -> Bytes {
2031 let disable_under_pressure = std::env::var("MODDE_BYTE_CACHE_DISABLE_UNDER_PRESSURE")
2032 .ok()
2033 .and_then(|v| v.parse::<bool>().ok())
2034 .unwrap_or(true);
2035 let pressure_threshold = std::env::var("MODDE_BYTE_CACHE_PRESSURE_FRACTION")
2036 .ok()
2037 .and_then(|v| v.parse::<f64>().ok())
2038 .filter(|&v| v > 0.0 && v <= 1.0)
2039 .unwrap_or(0.80);
2040 if disable_under_pressure && cgroup_memory_pressure_high(pressure_threshold) {
2041 warn!(
2042 archive_hash = %format!("{archive_hash:016x}"),
2043 from = %from,
2044 bytes = bytes.len(),
2045 pressure_threshold,
2046 "skipping patch source byte cache under cgroup memory pressure"
2047 );
2048 return bytes;
2049 }
2050 self.byte_cache.insert(
2051 ByteCacheKey {
2052 archive_hash,
2053 inner_path: normalize_path(from),
2054 },
2055 bytes,
2056 )
2057 }
2058
2059 fn check_diagnostics_abort(&self) -> Result<()> {
2060 if let Some(diagnostics) = &self.diagnostics {
2061 diagnostics.check_abort()?;
2062 }
2063 Ok(())
2064 }
2065
2066 fn game_file_source_path(&self, archive_hash: u64) -> Result<Option<GameFileSourcePath>> {
2067 let Some(archive) = self
2068 .archives_by_hash
2069 .get(&archive_hash)
2070 .map(|&i| &self.manifest.archives[i])
2071 else {
2072 return Ok(None);
2073 };
2074
2075 let Some(state) = archive.state.as_ref() else {
2076 return Ok(None);
2077 };
2078
2079 let rel_path = match state {
2080 ArchiveState::GameFileSourceDownloader { metadata } => state.game_file_path().ok_or_else(|| {
2081 anyhow::anyhow!(
2082 "game-file source archive '{}' does not contain a recognized file path field; metadata keys: {}",
2083 archive.name,
2084 metadata.keys().cloned().collect::<Vec<_>>().join(", ")
2085 )
2086 })?,
2087 _ => return Ok(None),
2088 };
2089
2090 validate_archive_entry(rel_path)?;
2091
2092 let Some(game_dir) = &self.game_dir else {
2093 bail!(
2094 "archive {} is a game-file source, but no game directory was provided",
2095 archive.name
2096 );
2097 };
2098
2099 let path = game_dir.join(normalize_path(rel_path));
2100 if path.exists() {
2101 return Ok(Some(GameFileSourcePath {
2102 rel_path: rel_path.to_string(),
2103 path,
2104 }));
2105 }
2106
2107 Ok(Some(GameFileSourcePath {
2108 rel_path: rel_path.to_string(),
2109 path: find_path_case_insensitive(game_dir, &normalize_path(rel_path)).unwrap_or(path),
2110 }))
2111 }
2112}
2113
2114#[derive(Debug)]
2115struct GameFileSourcePath {
2116 rel_path: String,
2117 path: PathBuf,
2118}
2119
2120fn is_game_file_archive(archive: &modde_core::manifest::wabbajack::ArchiveEntry) -> bool {
2121 matches!(
2122 archive.state.as_ref(),
2123 Some(ArchiveState::GameFileSourceDownloader { .. })
2124 )
2125}
2126
2127fn validate_archive_entry(name: &str) -> Result<()> {
2130 let normalized = name.replace('\\', "/");
2132
2133 for component in normalized.split('/') {
2135 if component == ".." {
2136 bail!("archive entry contains path traversal: {name}");
2137 }
2138 }
2139
2140 if normalized.starts_with('/') {
2142 bail!("archive entry contains absolute path: {name}");
2143 }
2144
2145 Ok(())
2146}
2147
2148#[cfg(test)]
2150fn validate_zip_entry<R: std::io::Read + ?Sized>(entry: &zip::read::ZipFile<'_, R>) -> Result<()> {
2151 let name = entry.name();
2152 validate_archive_entry(name)?;
2153
2154 if entry.is_symlink() {
2156 bail!("archive entry is a symlink (rejected for security): {name}");
2157 }
2158
2159 Ok(())
2160}
2161
2162fn normalize_path(path: &str) -> String {
2164 path.replace('\\', "/")
2165}
2166
2167fn game_file_source_is_whole_file(path: &Path, rel_path: &str, from: &str) -> bool {
2168 if from.trim().is_empty() || from == "." {
2169 return true;
2170 }
2171
2172 let normalized_from = normalize_path(from);
2173 let normalized_rel_path = normalize_path(rel_path);
2174 let normalized_path = normalize_path(&path.to_string_lossy());
2175 let path_name = path
2176 .file_name()
2177 .and_then(|name| name.to_str())
2178 .map(normalize_path);
2179
2180 normalized_rel_path.eq_ignore_ascii_case(&normalized_from)
2181 || normalized_path.ends_with(&normalized_from)
2182 || path_name
2183 .as_deref()
2184 .is_some_and(|name| name.eq_ignore_ascii_case(&normalized_from))
2185}
2186
2187async fn read_game_file_source(path: &Path, from: &str) -> Result<Vec<u8>> {
2188 if !from.trim().is_empty() {
2189 validate_archive_entry(from)?;
2190 }
2191
2192 if path.symlink_metadata()?.file_type().is_symlink() {
2193 bail!(
2194 "game-file source is a symlink (rejected for security): {}",
2195 path.display()
2196 );
2197 }
2198
2199 if from.trim().is_empty() || from == "." {
2200 return tokio::fs::read(path)
2201 .await
2202 .with_context(|| format!("failed to read game-file source: {}", path.display()));
2203 }
2204
2205 if game_file_source_is_whole_file(path, &path.to_string_lossy(), from) {
2206 return tokio::fs::read(path)
2207 .await
2208 .with_context(|| format!("failed to read game-file source: {}", path.display()));
2209 }
2210
2211 let archive_path = path.to_path_buf();
2212 let from = from.to_string();
2213 tokio::task::spawn_blocking(move || {
2214 let output = ArchiveBatchExtractor::extract_selected(
2215 &archive_path,
2216 &[ArchiveRequest {
2217 directive_index: 0,
2218 from,
2219 inner_path: None,
2220 kind: ArchiveRequestKind::Bytes,
2221 }],
2222 )?;
2223 output
2224 .bytes
2225 .get(&0)
2226 .cloned()
2227 .ok_or_else(|| anyhow::anyhow!("archive extractor returned no bytes"))
2228 })
2229 .await?
2230}
2231
2232pub(crate) fn archive_path(store_dir: &Path, hash: &u64) -> PathBuf {
2234 store_dir.join(format!("{hash:016x}.archive"))
2235}
2236
2237fn verified_sidecar_path(path: &Path) -> PathBuf {
2238 let mut sidecar = path.as_os_str().to_os_string();
2239 sidecar.push(".verified.json");
2240 PathBuf::from(sidecar)
2241}
2242
2243fn metadata_modified_unix_ms(metadata: &std::fs::Metadata) -> u128 {
2244 metadata
2245 .modified()
2246 .ok()
2247 .and_then(|time| time.duration_since(UNIX_EPOCH).ok())
2248 .map(|duration| duration.as_millis())
2249 .unwrap_or(0)
2250}
2251
2252fn unix_ms() -> u128 {
2253 SystemTime::now()
2254 .duration_since(UNIX_EPOCH)
2255 .unwrap_or_default()
2256 .as_millis()
2257}
2258
2259async fn archive_path_is_memory_supported(path: &Path) -> bool {
2260 let Ok(mut file) = tokio::fs::File::open(path).await else {
2261 return false;
2262 };
2263 let mut magic = [0_u8; 8];
2264 let Ok(read) = tokio::io::AsyncReadExt::read(&mut file, &mut magic).await else {
2265 return false;
2266 };
2267 let prefix = &magic[..read];
2268 prefix.starts_with(b"PK") || prefix.starts_with(&[0x37, 0x7a, 0xbc, 0xaf, 0x27, 0x1c])
2269}
2270
2271async fn extract_trusted_archive_requests(
2272 trusted_archive: Option<TrustedArchive>,
2273 requests: Vec<ArchiveRequest>,
2274) -> Result<crate::decompress::ArchiveBatchOutput> {
2275 let native_result = tokio::task::spawn_blocking(move || match trusted_archive {
2276 Some(TrustedArchive::Path(path)) => ArchiveBatchExtractor::extract_selected(&path, &requests),
2277 Some(TrustedArchive::Bytes {
2278 label,
2279 bytes,
2280 fallback_path,
2281 }) => {
2282 let memory_result = ArchiveBatchExtractor::extract_selected_from(
2283 ArchiveInput::Bytes {
2284 name: &label,
2285 bytes: &bytes,
2286 },
2287 &requests,
2288 );
2289 match memory_result {
2290 Ok(output) => Ok(output),
2291 Err(memory_error) if is_unsupported_in_memory_archive(&memory_error) => {
2292 warn!(
2293 archive = %label,
2294 "in-memory archive extraction failed, falling back to disk: {memory_error:#}"
2295 );
2296 ArchiveBatchExtractor::extract_selected(&fallback_path, &requests)
2297 }
2298 Err(memory_error) => Err(memory_error),
2299 }
2300 }
2301 None => unreachable!("native requests require a trusted archive"),
2302 })
2303 .await;
2304 trim_process_allocator();
2305
2306 match native_result {
2307 Ok(result) => result,
2308 Err(e) => Err(anyhow::anyhow!("archive extraction task failed: {e:#}")),
2309 }
2310}
2311
2312async fn extract_archive_path_requests(
2313 path: PathBuf,
2314 requests: Vec<ArchiveRequest>,
2315) -> Result<crate::decompress::ArchiveBatchOutput> {
2316 let native_result = tokio::task::spawn_blocking(move || {
2317 ArchiveBatchExtractor::extract_selected(&path, &requests)
2318 })
2319 .await;
2320 trim_process_allocator();
2321
2322 match native_result {
2323 Ok(result) => result,
2324 Err(e) => Err(anyhow::anyhow!("archive extraction task failed: {e:#}")),
2325 }
2326}
2327
2328async fn extract_nested_archive_requests(
2329 label: String,
2330 bytes: Bytes,
2331 requests: Vec<ArchiveRequest>,
2332) -> Result<crate::decompress::ArchiveBatchOutput> {
2333 let native_result = tokio::task::spawn_blocking(move || {
2334 if bytes.starts_with(b"BSA\0") || bytes.starts_with(b"BTDX") {
2335 let mut temp = tempfile::NamedTempFile::new()?;
2336 std::io::Write::write_all(&mut temp, &bytes)?;
2337 std::io::Write::flush(&mut temp)?;
2338 ArchiveBatchExtractor::extract_selected(temp.path(), &requests)
2339 } else {
2340 ArchiveBatchExtractor::extract_selected_from(
2341 ArchiveInput::Bytes {
2342 name: &label,
2343 bytes: &bytes,
2344 },
2345 &requests,
2346 )
2347 }
2348 })
2349 .await;
2350 trim_process_allocator();
2351
2352 match native_result {
2353 Ok(result) => result,
2354 Err(e) => Err(anyhow::anyhow!(
2355 "nested archive extraction task failed: {e:#}"
2356 )),
2357 }
2358}
2359
2360fn is_unsupported_in_memory_archive(error: &anyhow::Error) -> bool {
2361 format!("{error:#}").contains("unsupported in-memory archive format")
2362}
2363
2364fn archive_patch_chunk_size() -> usize {
2365 std::env::var("MODDE_ARCHIVE_PATCH_CHUNK_SIZE")
2366 .ok()
2367 .and_then(|value| value.parse().ok())
2368 .filter(|&value| value > 0)
2369 .unwrap_or(8)
2370}
2371
2372async fn write_bytes_maybe_zstd(path: &Path, data: &[u8]) -> Result<()> {
2373 if super::staging::is_compressed_path(path) {
2374 let path = path.to_path_buf();
2375 let data = Bytes::copy_from_slice(data);
2376 tokio::task::spawn_blocking(move || -> Result<()> {
2377 let file = std::fs::File::create(&path)
2378 .with_context(|| format!("failed to create {}", path.display()))?;
2379 let mut encoder = zstd::stream::write::Encoder::new(file, 9)?;
2380 std::io::Write::write_all(&mut encoder, &data)?;
2381 encoder.finish()?;
2382 Ok(())
2383 })
2384 .await??;
2385 return Ok(());
2386 }
2387 tokio::fs::write(path, data).await?;
2388 Ok(())
2389}
2390
2391#[derive(Debug, Clone, Copy)]
2404struct DirectiveWeights {
2405 from_archive_factor: f64,
2406 patched_factor: f64,
2407 create_bsa_floor_mb: u64,
2408}
2409
2410impl DirectiveWeights {
2411 fn from_env() -> Self {
2413 let from_archive_factor: f64 = std::env::var("MODDE_APPLY_WEIGHT_FROM_ARCHIVE_FACTOR")
2414 .ok()
2415 .and_then(|v| v.parse().ok())
2416 .filter(|&v: &f64| v > 0.0)
2417 .unwrap_or(0.5);
2418 let patched_factor: f64 = std::env::var("MODDE_APPLY_WEIGHT_PATCHED_FACTOR")
2419 .ok()
2420 .and_then(|v| v.parse().ok())
2421 .filter(|&v: &f64| v > 0.0)
2422 .unwrap_or(3.0);
2423 let create_bsa_floor_mb: u64 = std::env::var("MODDE_APPLY_WEIGHT_BSA_FLOOR_MB")
2424 .ok()
2425 .and_then(|v| v.parse().ok())
2426 .unwrap_or(64);
2427 Self {
2428 from_archive_factor,
2429 patched_factor,
2430 create_bsa_floor_mb,
2431 }
2432 }
2433}
2434
2435#[derive(Debug, Clone, Copy)]
2440struct ArchiveBatchWeights {
2441 patched_factor: f64,
2442 archive_factor: f64,
2443}
2444
2445impl ArchiveBatchWeights {
2446 fn from_env() -> Self {
2448 let patched_factor: f64 = std::env::var("MODDE_APPLY_WEIGHT_PATCHED_BATCH_FACTOR")
2449 .ok()
2450 .and_then(|v| v.parse::<f64>().ok())
2451 .filter(|&v| v > 0.0)
2452 .unwrap_or(1.0);
2453 let archive_factor: f64 = std::env::var("MODDE_APPLY_WEIGHT_ARCHIVE_BATCH_FACTOR")
2454 .ok()
2455 .and_then(|v| v.parse::<f64>().ok())
2456 .filter(|&v| v > 0.0)
2457 .unwrap_or(0.5);
2458 Self {
2459 patched_factor,
2460 archive_factor,
2461 }
2462 }
2463}
2464
2465fn estimate_directive_weight(
2466 directive: &InstallDirective,
2467 archive_size_by_hash: &HashMap<u64, u64>,
2468 weights: &DirectiveWeights,
2469) -> u64 {
2470 const FLOOR_BYTES: u64 = 8 * 1024 * 1024; let raw = match directive {
2473 InstallDirective::InlineFile { .. } => FLOOR_BYTES,
2474 InstallDirective::FromArchive { archive_hash, .. } => {
2475 let size = archive_size_by_hash.get(archive_hash).copied().unwrap_or(0);
2476 ((size as f64) * weights.from_archive_factor) as u64
2477 }
2478 InstallDirective::PatchedFromArchive { archive_hash, .. } => {
2479 let size = archive_size_by_hash.get(archive_hash).copied().unwrap_or(0);
2480 ((size as f64) * weights.patched_factor) as u64
2481 }
2482 InstallDirective::CreateBSA { file_states, .. } => file_states
2483 .iter()
2484 .map(|fs| fs.size)
2485 .sum::<u64>()
2486 .max(weights.create_bsa_floor_mb * 1024 * 1024),
2487 };
2488
2489 raw.max(FLOOR_BYTES)
2490}
2491
2492fn estimate_archive_batch_weight(
2493 batch: &ArchiveInstallBatch,
2494 weights: &ArchiveBatchWeights,
2495) -> u64 {
2496 const FLOOR_BYTES: u64 = 8 * 1024 * 1024;
2497 let has_patch = archive_batch_has_patch(batch);
2498 let factor = if has_patch {
2499 weights.patched_factor
2500 } else {
2501 weights.archive_factor
2502 };
2503 (((batch.archive_size_bytes as f64) * factor) as u64).max(FLOOR_BYTES)
2504}
2505
2506fn archive_batch_has_patch(batch: &ArchiveInstallBatch) -> bool {
2507 batch.directives.iter().any(|directive| {
2508 matches!(
2509 &directive.directive,
2510 InstallDirective::PatchedFromArchive { .. }
2511 )
2512 })
2513}
2514
2515#[cfg(all(unix, target_env = "gnu"))]
2516fn trim_process_allocator() {
2517 unsafe {
2521 libc::malloc_trim(0);
2522 }
2523}
2524
2525#[cfg(not(all(unix, target_env = "gnu")))]
2526fn trim_process_allocator() {}
2527
2528fn find_path_case_insensitive(base: &Path, relative_path: &str) -> Result<PathBuf> {
2530 validate_archive_entry(relative_path)?;
2531
2532 let parts: Vec<&str> = relative_path.split('/').collect();
2533 let mut current = base.to_path_buf();
2534
2535 for part in &parts {
2536 let target_lower = part.to_lowercase();
2537 let mut found = false;
2538
2539 for entry in std::fs::read_dir(¤t)
2540 .with_context(|| format!("failed to read dir: {}", current.display()))?
2541 {
2542 let entry = entry?;
2543 if entry.file_name().to_string_lossy().to_lowercase() == target_lower {
2544 current = entry.path();
2545 if current.symlink_metadata()?.file_type().is_symlink() {
2547 anyhow::bail!("path component is a symlink (rejected for security): {part}");
2548 }
2549 found = true;
2550 break;
2551 }
2552 }
2553
2554 if !found {
2555 anyhow::bail!(
2556 "path component '{}' not found in {}",
2557 part,
2558 current.display()
2559 );
2560 }
2561 }
2562
2563 Ok(current)
2564}
2565
2566#[cfg(test)]
2568fn extract_from_zip(archive_path: &Path, inner_path: &str) -> Result<Vec<u8>> {
2569 let file = std::fs::File::open(archive_path)
2570 .with_context(|| format!("failed to open archive: {}", archive_path.display()))?;
2571
2572 let mut archive = zip::ZipArchive::new(file)
2573 .with_context(|| format!("failed to read zip archive: {}", archive_path.display()))?;
2574
2575 let entry_name = find_entry_in_archive(&archive, inner_path).with_context(|| {
2576 format!(
2577 "file '{}' not found in archive {}",
2578 inner_path,
2579 archive_path.display()
2580 )
2581 })?;
2582
2583 let mut entry = archive.by_name(&entry_name)?;
2584 validate_zip_entry(&entry)?;
2585 let mut data = Vec::with_capacity(entry.size() as usize);
2586 std::io::Read::read_to_end(&mut entry, &mut data)?;
2587
2588 Ok(data)
2589}
2590
2591#[cfg(test)]
2593fn find_entry_in_archive(archive: &zip::ZipArchive<std::fs::File>, path: &str) -> Result<String> {
2594 let normalized = path.replace('\\', "/");
2596 let backslash = path.replace('/', "\\");
2597
2598 for i in 0..archive.len() {
2599 let name = archive.name_for_index(i).unwrap_or_default().to_string();
2600 if name == *path || name == normalized || name == backslash {
2601 return Ok(name);
2602 }
2603 let name_lower = name.to_lowercase();
2605 if name_lower == path.to_lowercase()
2606 || name_lower == normalized.to_lowercase()
2607 || name_lower == backslash.to_lowercase()
2608 {
2609 return Ok(name);
2610 }
2611 }
2612
2613 anyhow::bail!("entry not found: {path}");
2614}
2615
2616#[cfg(test)]
2617mod tests {
2618 use super::*;
2619 use crate::wabbajack::staging::{StagingPrepareStatus, StagingStore, compressed_path};
2620
2621 #[test]
2622 fn validate_archive_entry_rejects_traversal_and_absolute() {
2623 assert!(validate_archive_entry("mods/Foo/textures/x.dds").is_ok());
2626 assert!(validate_archive_entry("../escape.txt").is_err());
2627 assert!(validate_archive_entry("a/b/../../../escape").is_err());
2628 assert!(validate_archive_entry("..\\escape.txt").is_err());
2629 assert!(validate_archive_entry("/etc/passwd").is_err());
2630 }
2631 use std::collections::HashMap;
2632 use std::io::{Read as _, Write as _};
2633 use std::net::TcpListener;
2634 use std::sync::atomic::{AtomicUsize, Ordering};
2635 use std::thread;
2636 use xxhash_rust::xxh64::xxh64;
2637
2638 fn create_zip_file(path: &std::path::Path, entries: &[(&str, &[u8])]) {
2640 let file = std::fs::File::create(path).unwrap();
2641 let mut writer = zip::ZipWriter::new(file);
2642 let options = zip::write::SimpleFileOptions::default()
2643 .compression_method(zip::CompressionMethod::Stored);
2644 for (name, data) in entries {
2645 writer.start_file(*name, options).unwrap();
2646 writer.write_all(data).unwrap();
2647 }
2648 writer.finish().unwrap();
2649 }
2650
2651 fn zip_bytes(entries: &[(&str, &[u8])]) -> Vec<u8> {
2652 let mut cursor = std::io::Cursor::new(Vec::new());
2653 {
2654 let mut writer = zip::ZipWriter::new(&mut cursor);
2655 let options = zip::write::SimpleFileOptions::default()
2656 .compression_method(zip::CompressionMethod::Stored);
2657 for (name, data) in entries {
2658 writer.start_file(*name, options).unwrap();
2659 writer.write_all(data).unwrap();
2660 }
2661 writer.finish().unwrap();
2662 }
2663 cursor.into_inner()
2664 }
2665
2666 fn data_patch(data: &[u8]) -> Vec<u8> {
2667 let mut patch = Vec::new();
2668 patch.extend_from_slice(b"OCTODELTA");
2669 patch.push(1);
2670 patch.push(4);
2671 patch.extend_from_slice(b"SHA1");
2672 patch.extend_from_slice(&20_u32.to_le_bytes());
2673 patch.extend_from_slice(&[0_u8; 20]);
2674 patch.extend_from_slice(b">>>");
2675 patch.push(0x80);
2676 patch.extend_from_slice(&(data.len() as u64).to_le_bytes());
2677 patch.extend_from_slice(data);
2678 patch
2679 }
2680
2681 fn open_zip(path: &std::path::Path) -> zip::ZipArchive<std::fs::File> {
2683 let file = std::fs::File::open(path).unwrap();
2684 zip::ZipArchive::new(file).unwrap()
2685 }
2686
2687 fn minimal_manifest() -> WabbajackManifest {
2688 WabbajackManifest {
2689 name: "test".into(),
2690 author: "a".into(),
2691 description: "d".into(),
2692 game: "SkyrimSE".into(),
2693 version: "1.0".into(),
2694 archives: vec![],
2695 directives: vec![],
2696 }
2697 }
2698
2699 fn game_file_archive(
2700 hash: u64,
2701 rel_path: &str,
2702 ) -> modde_core::manifest::wabbajack::ArchiveEntry {
2703 modde_core::manifest::wabbajack::ArchiveEntry {
2704 hash,
2705 name: rel_path.replace(['\\', '/'], "_"),
2706 size: 0,
2707 state: Some(ArchiveState::GameFileSourceDownloader {
2708 metadata: HashMap::from([(
2709 "File".to_string(),
2710 serde_json::Value::String(rel_path.to_string()),
2711 )]),
2712 }),
2713 }
2714 }
2715
2716 fn manifest_with_game_file(hash: u64, rel_path: &str, to: &str) -> WabbajackManifest {
2717 WabbajackManifest {
2718 archives: vec![game_file_archive(hash, rel_path)],
2719 directives: vec![modde_core::manifest::wabbajack::RawDirective::FromArchive {
2720 archive_hash_path: vec![serde_json::Value::Number(hash.into())],
2721 to: to.into(),
2722 size: 0,
2723 }],
2724 ..minimal_manifest()
2725 }
2726 }
2727
2728 fn manifest_with_nexus_download(hash: u64) -> WabbajackManifest {
2729 WabbajackManifest {
2730 archives: vec![modde_core::manifest::wabbajack::ArchiveEntry {
2731 hash,
2732 name: "nexus archive".into(),
2733 size: 1,
2734 state: Some(ArchiveState::NexusDownloader {
2735 game_name: "skyrimspecialedition".into(),
2736 mod_id: 123.into(),
2737 file_id: 456.into(),
2738 }),
2739 }],
2740 ..minimal_manifest()
2741 }
2742 }
2743
2744 #[tokio::test]
2745 async fn archive_trust_sidecar_skips_second_hash_pass() {
2746 let dir = tempfile::tempdir().unwrap();
2747 let store = dir.path().join("store");
2748 let staging = dir.path().join("staging");
2749 tokio::fs::create_dir_all(&store).await.unwrap();
2750 let bytes = b"trusted archive bytes";
2751 let hash = xxh64(bytes, 0);
2752 let path = archive_path(&store, &hash);
2753 tokio::fs::write(&path, bytes).await.unwrap();
2754
2755 let inst = WabbajackInstaller::new(
2756 minimal_manifest(),
2757 dir.path().join("list.wabbajack"),
2758 store,
2759 staging,
2760 );
2761 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
2762
2763 let (_archive, first) = inst.ensure_archive_trusted(hash, &tx).await.unwrap();
2764 assert_eq!(first.streamed_hash_bytes, bytes.len() as u64);
2765 assert!(!first.sidecar_hit);
2766 assert!(verified_sidecar_path(&path).exists());
2767
2768 let (_archive, second) = inst.ensure_archive_trusted(hash, &tx).await.unwrap();
2769 assert!(second.sidecar_hit);
2770 assert_eq!(second.streamed_hash_bytes, 0);
2771 }
2772
2773 #[tokio::test]
2774 async fn stale_archive_trust_sidecar_forces_rehash() {
2775 let dir = tempfile::tempdir().unwrap();
2776 let store = dir.path().join("store");
2777 let staging = dir.path().join("staging");
2778 tokio::fs::create_dir_all(&store).await.unwrap();
2779 let bytes = b"trusted archive bytes";
2780 let hash = xxh64(bytes, 0);
2781 let path = archive_path(&store, &hash);
2782 tokio::fs::write(&path, bytes).await.unwrap();
2783
2784 let inst = WabbajackInstaller::new(
2785 minimal_manifest(),
2786 dir.path().join("list.wabbajack"),
2787 store,
2788 staging,
2789 );
2790 inst.write_verified_sidecar(&path, hash).await.unwrap();
2791 let mut sidecar: serde_json::Value =
2792 serde_json::from_slice(&tokio::fs::read(verified_sidecar_path(&path)).await.unwrap())
2793 .unwrap();
2794 sidecar["size_bytes"] = serde_json::json!(1);
2795 tokio::fs::write(
2796 verified_sidecar_path(&path),
2797 serde_json::to_vec_pretty(&sidecar).unwrap(),
2798 )
2799 .await
2800 .unwrap();
2801
2802 let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
2803 let (_archive, stats) = inst.ensure_archive_trusted(hash, &tx).await.unwrap();
2804 assert!(!stats.sidecar_hit);
2805 assert_eq!(stats.streamed_hash_bytes, bytes.len() as u64);
2806 }
2807
2808 fn synthetic_server(
2809 cdn_archive: Vec<u8>,
2810 direct_archive: Vec<u8>,
2811 ) -> (String, Arc<AtomicUsize>) {
2812 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
2813 let addr = listener.local_addr().unwrap();
2814 let count = Arc::new(AtomicUsize::new(0));
2815 let thread_count = Arc::clone(&count);
2816 thread::spawn(move || {
2817 for stream in listener.incoming().take(8) {
2818 let mut stream = stream.unwrap();
2819 let mut buf = [0_u8; 2048];
2820 let n = stream.read(&mut buf).unwrap();
2821 let request = String::from_utf8_lossy(&buf[..n]);
2822 let path = request
2823 .lines()
2824 .next()
2825 .and_then(|line| line.split_whitespace().nth(1))
2826 .unwrap_or("/");
2827 thread_count.fetch_add(1, Ordering::SeqCst);
2828 match path {
2829 "/authored_files/download/cdn.zip_abc" => {
2830 let body = format!(
2831 r#"<script>
2832 const MUNGED_NAME = "cdn.zip_abc";
2833 const FILE_NAME = "cdn.zip";
2834 const FILE_SIZE_BYTES = {};
2835 const PARTS = [{{"Size":{},"Offset":0,"Index":0}}];
2836 </script>"#,
2837 cdn_archive.len(),
2838 cdn_archive.len()
2839 );
2840 write_response(&mut stream, "200 OK", body.as_bytes());
2841 }
2842 "/authored_files/cdn.zip_abc/parts/0" => {
2843 write_response(&mut stream, "200 OK", &cdn_archive);
2844 }
2845 "/direct.zip" => {
2846 write_response(&mut stream, "200 OK", &direct_archive);
2847 }
2848 _ => write_response(&mut stream, "404 Not Found", b"not found"),
2849 }
2850 }
2851 });
2852 (format!("http://{addr}"), count)
2853 }
2854
2855 fn write_response(stream: &mut std::net::TcpStream, status: &str, body: &[u8]) {
2856 let headers = format!(
2857 "HTTP/1.1 {status}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
2858 body.len()
2859 );
2860 stream.write_all(headers.as_bytes()).unwrap();
2861 stream.write_all(body).unwrap();
2862 }
2863
2864 #[test]
2868 fn new_stores_fields() {
2869 let store = PathBuf::from("/tmp/store");
2870 let staging = PathBuf::from("/tmp/staging");
2871 let inst = WabbajackInstaller::new(
2872 minimal_manifest(),
2873 PathBuf::from("/tmp/test.wabbajack"),
2874 store,
2875 staging,
2876 );
2877 assert_eq!(inst.concurrency, DEFAULT_CONCURRENCY);
2878 assert!(inst.sources.is_empty());
2879 assert_eq!(inst.store_dir, PathBuf::from("/tmp/store"));
2880 assert_eq!(inst.staging_dir, PathBuf::from("/tmp/staging"));
2881 assert_eq!(inst.manifest.name, "test");
2882 }
2883
2884 #[test]
2885 fn validate_download_sources_rejects_required_source_before_network() {
2886 let dir = tempfile::tempdir().unwrap();
2887 let inst = WabbajackInstaller::new(
2888 manifest_with_nexus_download(1),
2889 dir.path().join("test.wabbajack"),
2890 dir.path().join("store"),
2891 dir.path().join("staging"),
2892 );
2893
2894 let downloads = inst.manifest.download_directives();
2895 let err = inst.validate_download_sources(&downloads).unwrap_err();
2896 let msg = format!("{err:#}");
2897 assert!(
2898 msg.contains("no registered source"),
2899 "unexpected error: {msg}"
2900 );
2901 assert!(msg.contains("nexus:123"), "unexpected error: {msg}");
2902 }
2903
2904 #[tokio::test]
2905 async fn adoption_preserves_existing_outputs_and_writes_sentinels() {
2906 let dir = tempfile::tempdir().unwrap();
2907 let staging = dir.path().join("staging");
2908 let output = staging.join("mods/adopted/file.txt");
2909 tokio::fs::create_dir_all(output.parent().unwrap())
2910 .await
2911 .unwrap();
2912 tokio::fs::write(&output, b"already applied").await.unwrap();
2913 tokio::fs::write(staging.join("mods/adopted/output.bsa"), b"bsa")
2914 .await
2915 .unwrap();
2916
2917 let hash = 42_u64;
2918 let manifest = WabbajackManifest {
2919 archives: vec![modde_core::manifest::wabbajack::ArchiveEntry {
2920 hash,
2921 name: "source.zip".into(),
2922 size: 123,
2923 state: None,
2924 }],
2925 directives: vec![
2926 modde_core::manifest::wabbajack::RawDirective::FromArchive {
2927 archive_hash_path: vec![
2928 serde_json::Value::Number(hash.into()),
2929 serde_json::Value::String("file.txt".into()),
2930 ],
2931 to: "mods/adopted/file.txt".into(),
2932 size: 0,
2933 },
2934 modde_core::manifest::wabbajack::RawDirective::CreateBSA {
2935 temp_id: "bsa-temp".into(),
2936 to: "mods/adopted/output.bsa".into(),
2937 file_states: vec![],
2938 },
2939 ],
2940 ..minimal_manifest()
2941 };
2942 let inst = WabbajackInstaller::new(
2943 manifest,
2944 dir.path().join("test.wabbajack"),
2945 dir.path().join("store"),
2946 staging.clone(),
2947 );
2948
2949 let prepare_status = StagingStore::new(&staging)
2950 .prepare_resumable()
2951 .await
2952 .unwrap();
2953 assert_eq!(prepare_status, StagingPrepareStatus::Adopted);
2954
2955 let batches = inst.manifest.install_directives_grouped_by_archive();
2956 let installs = inst.manifest.install_directives();
2957 let adoption = inst
2958 .adopt_existing_staging(&batches, &installs)
2959 .await
2960 .unwrap();
2961
2962 assert_eq!(adoption.archive_batches, 1);
2963 assert_eq!(adoption.create_bsa, 1);
2964 assert!(output.exists());
2965 assert!(inst.archive_batch_sentinel_path(hash).exists());
2966 assert!(inst.create_bsa_sentinel_path(1).exists());
2967 assert!(inst.archive_batch_sentinel_valid(&batches[0]).await);
2968 assert!(
2969 inst.create_bsa_sentinel_valid(1, "bsa-temp", "mods/adopted/output.bsa")
2970 .await
2971 );
2972 }
2973
2974 #[tokio::test]
2975 async fn synthetic_wabbajack_pipeline_reaches_late_directives() {
2976 let dir = tempfile::tempdir().unwrap();
2977 let cdn_archive = zip_bytes(&[("source.txt", b"basis")]);
2978 let large_direct = vec![7_u8; 1024 * 1024 + 17];
2979 let direct_archive = zip_bytes(&[
2980 ("direct.txt", b"direct"),
2981 ("large.dds", large_direct.as_slice()),
2982 ]);
2983 let cdn_hash = xxh64(&cdn_archive, 0);
2984 let direct_hash = xxh64(&direct_archive, 0);
2985 let (base_url, _requests) = synthetic_server(cdn_archive, direct_archive);
2986 let wabbajack_path = dir.path().join("synthetic.wabbajack");
2987 create_zip_file(
2988 &wabbajack_path,
2989 &[
2990 ("inline-data", b"inline"),
2991 ("patch-data", &data_patch(b"patched")),
2992 ],
2993 );
2994
2995 let manifest = WabbajackManifest {
2996 archives: vec![
2997 modde_core::manifest::wabbajack::ArchiveEntry {
2998 hash: cdn_hash,
2999 name: "cdn.zip".into(),
3000 size: 0,
3001 state: Some(ArchiveState::WabbajackCDNDownloader {
3002 metadata: HashMap::from([(
3003 "Url".into(),
3004 serde_json::Value::String(format!(
3005 "{base_url}/authored_files/download/cdn.zip_abc"
3006 )),
3007 )]),
3008 }),
3009 },
3010 modde_core::manifest::wabbajack::ArchiveEntry {
3011 hash: direct_hash,
3012 name: "direct.zip".into(),
3013 size: 0,
3014 state: Some(ArchiveState::HttpDownloader {
3015 url: format!("{base_url}/direct.zip"),
3016 headers: HashMap::new(),
3017 }),
3018 },
3019 ],
3020 directives: vec![
3021 modde_core::manifest::wabbajack::RawDirective::FromArchive {
3022 archive_hash_path: vec![
3023 serde_json::Value::Number(cdn_hash.into()),
3024 serde_json::Value::String("source.txt".into()),
3025 ],
3026 to: "mods/cdn/source.txt".into(),
3027 size: 0,
3028 },
3029 modde_core::manifest::wabbajack::RawDirective::FromArchive {
3030 archive_hash_path: vec![
3031 serde_json::Value::Number(direct_hash.into()),
3032 serde_json::Value::String("direct.txt".into()),
3033 ],
3034 to: "mods/direct/direct.txt".into(),
3035 size: 0,
3036 },
3037 modde_core::manifest::wabbajack::RawDirective::FromArchive {
3038 archive_hash_path: vec![
3039 serde_json::Value::Number(direct_hash.into()),
3040 serde_json::Value::String("large.dds".into()),
3041 ],
3042 to: "mods/direct/large.dds".into(),
3043 size: 0,
3044 },
3045 modde_core::manifest::wabbajack::RawDirective::InlineFile {
3046 hash: 0,
3047 size: 6,
3048 source_data_id: "inline-data".into(),
3049 to: "mods/inline/inline.txt".into(),
3050 },
3051 modde_core::manifest::wabbajack::RawDirective::PatchedFromArchive {
3052 archive_hash_path: vec![
3053 serde_json::Value::Number(cdn_hash.into()),
3054 serde_json::Value::String("source.txt".into()),
3055 ],
3056 to: "mods/patched/patched.txt".into(),
3057 hash: 0,
3058 patch_id: "patch-data".into(),
3059 size: 0,
3060 },
3061 ],
3062 ..minimal_manifest()
3063 };
3064 let mut inst = WabbajackInstaller::new(
3065 manifest,
3066 wabbajack_path,
3067 dir.path().join("store"),
3068 dir.path().join("staging"),
3069 );
3070 let client = reqwest::Client::new();
3071 inst.add_source(crate::AnySource::WabbajackCdn(
3072 crate::wabbajack::cdn::WabbajackCdnSource::new(client.clone()),
3073 ));
3074 inst.add_source(crate::AnySource::Direct(crate::direct::DirectSource::new(
3075 client,
3076 )));
3077
3078 let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3079 inst.install(progress_tx).await.unwrap();
3080
3081 assert_eq!(
3082 tokio::fs::read(dir.path().join("staging/mods/cdn/source.txt"))
3083 .await
3084 .unwrap(),
3085 b"basis"
3086 );
3087 assert_eq!(
3088 tokio::fs::read(dir.path().join("staging/mods/direct/direct.txt"))
3089 .await
3090 .unwrap(),
3091 b"direct"
3092 );
3093 assert_eq!(
3094 tokio::fs::read(dir.path().join("staging/mods/inline/inline.txt"))
3095 .await
3096 .unwrap(),
3097 b"inline"
3098 );
3099 assert_eq!(
3100 tokio::fs::read(dir.path().join("staging/mods/patched/patched.txt"))
3101 .await
3102 .unwrap(),
3103 b"patched"
3104 );
3105
3106 let large_path = dir.path().join("staging/mods/direct/large.dds");
3107 assert!(compressed_path(&large_path).exists());
3108 assert!(!large_path.exists());
3109 let staging_store = StagingStore::new(dir.path().join("staging"));
3110 let mut reader = staging_store
3111 .open_logical_reader("mods/direct/large.dds")
3112 .unwrap();
3113 let mut decoded = Vec::new();
3114 reader.read_to_end(&mut decoded).unwrap();
3115 assert_eq!(decoded, large_direct);
3116
3117 assert!(
3118 dir.path()
3119 .join(format!(
3120 "staging/_state/archive-batches/{direct_hash:016x}.json"
3121 ))
3122 .exists()
3123 );
3124 let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3125 inst.install(progress_tx).await.unwrap();
3126 assert!(compressed_path(&large_path).exists());
3127 }
3128
3129 #[test]
3133 fn set_concurrency_changes_value() {
3134 let mut inst = WabbajackInstaller::new(
3135 minimal_manifest(),
3136 PathBuf::new(),
3137 PathBuf::new(),
3138 PathBuf::new(),
3139 );
3140 inst.set_concurrency(16);
3141 assert_eq!(inst.concurrency, 16);
3142 }
3143
3144 #[test]
3145 fn set_concurrency_clamps_zero_to_one() {
3146 let mut inst = WabbajackInstaller::new(
3147 minimal_manifest(),
3148 PathBuf::new(),
3149 PathBuf::new(),
3150 PathBuf::new(),
3151 );
3152 inst.set_concurrency(0);
3153 assert_eq!(inst.concurrency, 1);
3154 }
3155
3156 #[tokio::test]
3157 async fn install_reads_game_file_source_from_game_dir() {
3158 let dir = tempfile::tempdir().unwrap();
3159 let game_dir = dir.path().join("game");
3160 let store_dir = dir.path().join("store");
3161 let staging_dir = dir.path().join("staging");
3162 std::fs::create_dir_all(game_dir.join("Data")).unwrap();
3163 let source_bytes = b"game file bytes";
3164 std::fs::write(game_dir.join("Data/Update.esm"), source_bytes).unwrap();
3165
3166 let manifest = manifest_with_game_file(
3167 xxh64(source_bytes, 0),
3168 "Data\\Update.esm",
3169 "mods/Skyrim Base/Update.esm",
3170 );
3171
3172 let mut inst = WabbajackInstaller::new(
3173 manifest,
3174 dir.path().join("test.wabbajack"),
3175 store_dir,
3176 staging_dir.clone(),
3177 );
3178 inst.set_game_dir(game_dir);
3179
3180 let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3181 inst.install(progress_tx).await.unwrap();
3182
3183 assert_eq!(
3184 std::fs::read(staging_dir.join("mods/Skyrim Base/Update.esm")).unwrap(),
3185 b"game file bytes"
3186 );
3187 }
3188
3189 #[tokio::test]
3190 async fn install_requires_game_dir_for_game_file_sources() {
3191 let dir = tempfile::tempdir().unwrap();
3192 let manifest = WabbajackManifest {
3193 archives: vec![game_file_archive(0x1234, "Data\\Update.esm")],
3194 directives: vec![],
3195 ..minimal_manifest()
3196 };
3197
3198 let inst = WabbajackInstaller::new(
3199 manifest,
3200 dir.path().join("test.wabbajack"),
3201 dir.path().join("store"),
3202 dir.path().join("staging"),
3203 );
3204
3205 let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3206 let err = inst.install(progress_tx).await.unwrap_err();
3207 let msg = format!("{err:#}");
3208 assert!(
3209 msg.contains("pass --game-dir"),
3210 "unexpected error message: {msg}"
3211 );
3212 }
3213
3214 #[tokio::test]
3215 async fn install_rejects_mismatched_game_file_hash() {
3216 let dir = tempfile::tempdir().unwrap();
3217 let game_dir = dir.path().join("game");
3218 std::fs::create_dir_all(game_dir.join("Data")).unwrap();
3219 std::fs::write(game_dir.join("Data/Update.esm"), b"actual bytes").unwrap();
3220
3221 let manifest = manifest_with_game_file(
3222 xxh64(b"expected bytes", 0),
3223 "Data\\Update.esm",
3224 "mods/Update.esm",
3225 );
3226 let mut inst = WabbajackInstaller::new(
3227 manifest,
3228 dir.path().join("test.wabbajack"),
3229 dir.path().join("store"),
3230 dir.path().join("staging"),
3231 );
3232 inst.set_game_dir(game_dir);
3233
3234 let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3235 let err = inst.install(progress_tx).await.unwrap_err();
3236 let msg = format!("{err:#}");
3237 assert!(msg.contains("Data\\Update.esm"), "unexpected error: {msg}");
3238 assert!(
3239 msg.contains("expected xxh64"),
3240 "unexpected error message: {msg}"
3241 );
3242 }
3243
3244 #[tokio::test]
3245 async fn install_rejects_missing_game_file() {
3246 let dir = tempfile::tempdir().unwrap();
3247 let game_dir = dir.path().join("game");
3248 std::fs::create_dir_all(&game_dir).unwrap();
3249
3250 let manifest = manifest_with_game_file(
3251 xxh64(b"missing bytes", 0),
3252 "Data\\Update.esm",
3253 "mods/Update.esm",
3254 );
3255 let mut inst = WabbajackInstaller::new(
3256 manifest,
3257 dir.path().join("test.wabbajack"),
3258 dir.path().join("store"),
3259 dir.path().join("staging"),
3260 );
3261 inst.set_game_dir(game_dir);
3262
3263 let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3264 let err = inst.install(progress_tx).await.unwrap_err();
3265 let msg = format!("{err:#}");
3266 assert!(msg.contains("Data\\Update.esm"), "unexpected error: {msg}");
3267 assert!(msg.contains("missing"), "unexpected error: {msg}");
3268 }
3269
3270 #[tokio::test]
3271 async fn install_reports_all_invalid_game_files() {
3272 let dir = tempfile::tempdir().unwrap();
3273 let game_dir = dir.path().join("game");
3274 std::fs::create_dir_all(game_dir.join("Data")).unwrap();
3275 std::fs::write(game_dir.join("Data/Update.esm"), b"actual bytes").unwrap();
3276
3277 let mismatch_hash = xxh64(b"expected bytes", 0);
3278 let missing_hash = xxh64(b"missing bytes", 0);
3279 let manifest = WabbajackManifest {
3280 archives: vec![
3281 game_file_archive(mismatch_hash, "Data\\Update.esm"),
3282 game_file_archive(missing_hash, "SkyrimSE.exe"),
3283 ],
3284 directives: vec![
3285 modde_core::manifest::wabbajack::RawDirective::FromArchive {
3286 archive_hash_path: vec![serde_json::Value::Number(mismatch_hash.into())],
3287 to: "mods/Update.esm".into(),
3288 size: 0,
3289 },
3290 modde_core::manifest::wabbajack::RawDirective::FromArchive {
3291 archive_hash_path: vec![serde_json::Value::Number(missing_hash.into())],
3292 to: "mods/SkyrimSE.exe".into(),
3293 size: 0,
3294 },
3295 ],
3296 ..minimal_manifest()
3297 };
3298 let mut inst = WabbajackInstaller::new(
3299 manifest,
3300 dir.path().join("test.wabbajack"),
3301 dir.path().join("store"),
3302 dir.path().join("staging"),
3303 );
3304 inst.set_game_dir(game_dir);
3305
3306 let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3307 let err = inst.install(progress_tx).await.unwrap_err();
3308 let msg = format!("{err:#}");
3309 assert!(
3310 msg.contains("validation failed for 2 file(s)"),
3311 "unexpected error: {msg}"
3312 );
3313 assert!(msg.contains("Data\\Update.esm"), "unexpected error: {msg}");
3314 assert!(msg.contains("got"), "unexpected error: {msg}");
3315 assert!(msg.contains("SkyrimSE.exe"), "unexpected error: {msg}");
3316 assert!(msg.contains("missing"), "unexpected error: {msg}");
3317 }
3318
3319 #[tokio::test]
3320 async fn install_resolves_game_file_path_case_insensitively() {
3321 let dir = tempfile::tempdir().unwrap();
3322 let game_dir = dir.path().join("game");
3323 let store_dir = dir.path().join("store");
3324 let staging_dir = dir.path().join("staging");
3325 std::fs::create_dir_all(game_dir.join("DATA")).unwrap();
3326 let source_bytes = b"case insensitive game file";
3327 std::fs::write(game_dir.join("DATA/update.ESM"), source_bytes).unwrap();
3328
3329 let manifest = manifest_with_game_file(
3330 xxh64(source_bytes, 0),
3331 "Data\\Update.esm",
3332 "mods/Update.esm",
3333 );
3334 let mut inst = WabbajackInstaller::new(
3335 manifest,
3336 dir.path().join("test.wabbajack"),
3337 store_dir,
3338 staging_dir.clone(),
3339 );
3340 inst.set_game_dir(game_dir);
3341
3342 let (progress_tx, _progress_rx) = mpsc::unbounded_channel();
3343 inst.install(progress_tx).await.unwrap();
3344
3345 assert_eq!(
3346 std::fs::read(staging_dir.join("mods/Update.esm")).unwrap(),
3347 source_bytes
3348 );
3349 }
3350
3351 #[test]
3355 fn find_entry_exact_match() {
3356 let dir = tempfile::tempdir().unwrap();
3357 let zip_path = dir.path().join("test.zip");
3358 create_zip_file(&zip_path, &[("data/meshes/test.nif", b"mesh")]);
3359 let archive = open_zip(&zip_path);
3360
3361 let result = find_entry_in_archive(&archive, "data/meshes/test.nif").unwrap();
3362 assert_eq!(result, "data/meshes/test.nif");
3363 }
3364
3365 #[test]
3369 fn find_entry_backslash_to_forward_slash() {
3370 let dir = tempfile::tempdir().unwrap();
3371 let zip_path = dir.path().join("test.zip");
3372 create_zip_file(&zip_path, &[("data/meshes/test.nif", b"mesh")]);
3373 let archive = open_zip(&zip_path);
3374
3375 let result = find_entry_in_archive(&archive, "data\\meshes\\test.nif").unwrap();
3376 assert_eq!(result, "data/meshes/test.nif");
3377 }
3378
3379 #[test]
3380 fn find_entry_forward_slash_to_backslash() {
3381 let dir = tempfile::tempdir().unwrap();
3382 let zip_path = dir.path().join("test.zip");
3383 create_zip_file(&zip_path, &[("data\\meshes\\test.nif", b"mesh")]);
3384 let archive = open_zip(&zip_path);
3385
3386 let result = find_entry_in_archive(&archive, "data/meshes/test.nif").unwrap();
3387 assert_eq!(result, "data\\meshes\\test.nif");
3388 }
3389
3390 #[test]
3394 fn find_entry_case_insensitive() {
3395 let dir = tempfile::tempdir().unwrap();
3396 let zip_path = dir.path().join("test.zip");
3397 create_zip_file(&zip_path, &[("Data/Meshes/Test.NIF", b"mesh")]);
3398 let archive = open_zip(&zip_path);
3399
3400 let result = find_entry_in_archive(&archive, "data/meshes/test.nif").unwrap();
3401 assert_eq!(result, "Data/Meshes/Test.NIF");
3402 }
3403
3404 #[test]
3405 fn find_entry_not_found() {
3406 let dir = tempfile::tempdir().unwrap();
3407 let zip_path = dir.path().join("test.zip");
3408 create_zip_file(&zip_path, &[("other.txt", b"data")]);
3409 let archive = open_zip(&zip_path);
3410
3411 let result = find_entry_in_archive(&archive, "nonexistent.txt");
3412 assert!(result.is_err());
3413 }
3414
3415 #[test]
3419 fn archive_path_zero_padded_hex() {
3420 let store = PathBuf::from("/store");
3421 let hash: u64 = 0xDEADBEEF;
3422 let path = archive_path(&store, &hash);
3423 assert_eq!(path, PathBuf::from("/store/00000000deadbeef.archive"));
3424 }
3425
3426 #[test]
3427 fn archive_path_full_width_hash() {
3428 let store = PathBuf::from("/store");
3429 let hash: u64 = 0xFFFFFFFFFFFFFFFF;
3430 let path = archive_path(&store, &hash);
3431 assert_eq!(path, PathBuf::from("/store/ffffffffffffffff.archive"));
3432 }
3433
3434 #[test]
3435 fn archive_path_zero_hash() {
3436 let store = PathBuf::from("/store");
3437 let hash: u64 = 0;
3438 let path = archive_path(&store, &hash);
3439 assert_eq!(path, PathBuf::from("/store/0000000000000000.archive"));
3440 }
3441
3442 #[test]
3446 fn directive_name_nexus() {
3447 let d = DownloadDirective::Nexus {
3448 game_id: "skyrimse".into(),
3449 mod_id: 12345.into(),
3450 file_id: 1.into(),
3451 hash: 0,
3452 };
3453 assert_eq!(d.display_name(), "nexus:12345");
3454 }
3455
3456 #[test]
3457 fn directive_name_github() {
3458 let d = DownloadDirective::GitHub {
3459 user: "user".into(),
3460 repo: "myrepo".into(),
3461 tag: "v1".into(),
3462 asset: "a.zip".into(),
3463 hash: 0,
3464 };
3465 assert_eq!(d.display_name(), "github:myrepo");
3466 }
3467
3468 #[test]
3469 fn directive_name_gdrive() {
3470 let d = DownloadDirective::GoogleDrive {
3471 id: "abc123".into(),
3472 hash: 0,
3473 };
3474 assert_eq!(d.display_name(), "gdrive:abc123");
3475 }
3476
3477 #[test]
3478 fn directive_name_mega() {
3479 let d = DownloadDirective::Mega {
3480 url: "https://mega.nz/file/ABCDEF#key".into(),
3481 hash: 0,
3482 };
3483 let name = d.display_name();
3484 assert!(name.starts_with("mega:"));
3485 assert!(name.len() <= 35); }
3487
3488 #[test]
3489 fn directive_name_direct_url() {
3490 let d = DownloadDirective::DirectURL {
3491 url: "https://example.com/files/mod.zip".into(),
3492 headers: HashMap::new(),
3493 mirror_resolver: None,
3494 hash: 0,
3495 };
3496 let name = d.display_name();
3497 assert!(name.starts_with("http:"));
3498 assert!(name.len() <= 35); }
3500
3501 #[test]
3502 fn directive_name_mega_short_url() {
3503 let d = DownloadDirective::Mega {
3504 url: "https://mega.nz/short".into(),
3505 hash: 0,
3506 };
3507 let name = d.display_name();
3508 assert_eq!(name, "mega:https://mega.nz/short");
3509 }
3510
3511 #[test]
3515 fn directive_hash_nexus() {
3516 let d = DownloadDirective::Nexus {
3517 game_id: "s".into(),
3518 mod_id: 1.into(),
3519 file_id: 1.into(),
3520 hash: 0xABCD,
3521 };
3522 assert_eq!(d.hash(), 0xABCD);
3523 }
3524
3525 #[test]
3526 fn directive_hash_github() {
3527 let d = DownloadDirective::GitHub {
3528 user: "u".into(),
3529 repo: "r".into(),
3530 tag: "t".into(),
3531 asset: "a".into(),
3532 hash: 999,
3533 };
3534 assert_eq!(d.hash(), 999);
3535 }
3536
3537 #[test]
3538 fn directive_hash_gdrive() {
3539 let d = DownloadDirective::GoogleDrive {
3540 id: "x".into(),
3541 hash: 42,
3542 };
3543 assert_eq!(d.hash(), 42);
3544 }
3545
3546 #[test]
3547 fn directive_hash_mega() {
3548 let d = DownloadDirective::Mega {
3549 url: "u".into(),
3550 hash: 7777,
3551 };
3552 assert_eq!(d.hash(), 7777);
3553 }
3554
3555 #[test]
3556 fn directive_hash_direct_url() {
3557 let d = DownloadDirective::DirectURL {
3558 url: "u".into(),
3559 headers: HashMap::new(),
3560 mirror_resolver: None,
3561 hash: 0xFFFF,
3562 };
3563 assert_eq!(d.hash(), 0xFFFF);
3564 }
3565
3566 #[test]
3570 fn extract_from_zip_valid() {
3571 let dir = tempfile::tempdir().unwrap();
3572 let zip_path = dir.path().join("test.zip");
3573 create_zip_file(&zip_path, &[("inner/file.txt", b"hello world")]);
3574
3575 let data = extract_from_zip(&zip_path, "inner/file.txt").unwrap();
3576 assert_eq!(data, b"hello world");
3577 }
3578
3579 #[test]
3583 fn extract_from_zip_missing_entry() {
3584 let dir = tempfile::tempdir().unwrap();
3585 let zip_path = dir.path().join("test.zip");
3586 create_zip_file(&zip_path, &[("exists.txt", b"data")]);
3587
3588 let result = extract_from_zip(&zip_path, "does_not_exist.txt");
3589 assert!(result.is_err());
3590 let err_msg = format!("{:#}", result.unwrap_err());
3591 assert!(err_msg.contains("not found"), "unexpected error: {err_msg}");
3592 }
3593
3594 #[test]
3598 fn normalize_path_backslashes() {
3599 assert_eq!(normalize_path("mods\\test\\file.txt"), "mods/test/file.txt");
3600 }
3601
3602 #[test]
3603 fn normalize_path_already_forward() {
3604 assert_eq!(normalize_path("mods/test/file.txt"), "mods/test/file.txt");
3605 }
3606}