Skip to main content

modde_sources/wabbajack/
staging.rs

1//! Manages the on-disk Wabbajack staging area via [`StagingStore`], transparently
2//! zstd-compressing eligible large files while resolving, hashing, and
3//! materializing logical paths regardless of their compressed state, and tracking
4//! a versioned [`StagingCompressionPolicy`] layout for resumable installs.
5
6use std::ffi::OsString;
7use std::fs::File;
8use std::io::{BufReader, BufWriter, Read, Write};
9use std::path::{Path, PathBuf};
10
11use anyhow::{Context, Result, bail};
12use serde::{Deserialize, Serialize};
13use tracing::{debug, info};
14use xxhash_rust::xxh3::Xxh3;
15use xxhash_rust::xxh64::Xxh64;
16
17const LAYOUT_VERSION: u32 = 1;
18const COMPRESSED_SUFFIX: &str = ".modde-zst";
19const DEFAULT_MIN_BYTES: u64 = 1024 * 1024;
20const DEFAULT_LEVEL: i32 = 9;
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23pub struct StagingCompressionPolicy {
24    pub min_bytes: u64,
25    pub level: i32,
26    pub suffix: String,
27}
28
29impl StagingCompressionPolicy {
30    #[must_use]
31    pub fn from_env() -> Self {
32        let min_bytes = std::env::var("MODDE_ZSTD_MIN_BYTES")
33            .ok()
34            .and_then(|v| v.parse::<u64>().ok())
35            .unwrap_or(DEFAULT_MIN_BYTES);
36        let level = std::env::var("MODDE_ZSTD_LEVEL")
37            .ok()
38            .and_then(|v| v.parse::<i32>().ok())
39            .unwrap_or(DEFAULT_LEVEL)
40            .clamp(1, 22);
41        Self {
42            min_bytes,
43            level,
44            suffix: COMPRESSED_SUFFIX.to_string(),
45        }
46    }
47}
48
49#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
50struct StagingLayout {
51    version: u32,
52    compression: StagingCompressionPolicy,
53}
54
55#[derive(Debug, Clone)]
56pub struct StagingStore {
57    root: PathBuf,
58    policy: StagingCompressionPolicy,
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub struct CompressionSummary {
63    pub compressed_files: usize,
64    pub skipped_files: usize,
65    pub original_bytes: u64,
66    pub compressed_bytes: u64,
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum StagingPrepareStatus {
71    Created,
72    Compatible,
73    Adopted,
74    Reset,
75}
76
77impl StagingStore {
78    #[must_use]
79    pub fn new(root: impl Into<PathBuf>) -> Self {
80        Self {
81            root: root.into(),
82            policy: StagingCompressionPolicy::from_env(),
83        }
84    }
85
86    #[must_use]
87    pub fn with_policy(root: impl Into<PathBuf>, policy: StagingCompressionPolicy) -> Self {
88        Self {
89            root: root.into(),
90            policy,
91        }
92    }
93
94    pub async fn prepare_fresh(&self) -> Result<()> {
95        if self.root.exists() && !self.layout_compatible().await {
96            info!(
97                staging = %self.root.display(),
98                "removing incompatible Wabbajack staging layout"
99            );
100            tokio::fs::remove_dir_all(&self.root)
101                .await
102                .with_context(|| format!("failed to remove {}", self.root.display()))?;
103        }
104        tokio::fs::create_dir_all(self.state_dir()).await?;
105        self.write_layout().await
106    }
107
108    pub async fn prepare_resumable(&self) -> Result<StagingPrepareStatus> {
109        if !self.root.exists() {
110            tokio::fs::create_dir_all(self.state_dir()).await?;
111            self.write_layout().await?;
112            return Ok(StagingPrepareStatus::Created);
113        }
114
115        if self.layout_compatible().await {
116            return Ok(StagingPrepareStatus::Compatible);
117        }
118
119        info!(
120            staging = %self.root.display(),
121            "adopting existing Wabbajack staging layout"
122        );
123        tokio::fs::create_dir_all(self.state_dir()).await?;
124        self.write_layout().await?;
125        Ok(StagingPrepareStatus::Adopted)
126    }
127
128    pub async fn reset_and_prepare(&self) -> Result<StagingPrepareStatus> {
129        if self.root.exists() {
130            tokio::fs::remove_dir_all(&self.root)
131                .await
132                .with_context(|| format!("failed to remove {}", self.root.display()))?;
133        }
134        tokio::fs::create_dir_all(self.state_dir()).await?;
135        self.write_layout().await?;
136        Ok(StagingPrepareStatus::Reset)
137    }
138
139    pub async fn has_compatible_layout(&self) -> bool {
140        self.layout_compatible().await
141    }
142
143    pub async fn logical_exists(&self, relative: &str) -> bool {
144        let logical = self.logical_path(relative);
145        tokio::fs::metadata(&logical).await.is_ok()
146            || tokio::fs::metadata(compressed_path(&logical)).await.is_ok()
147    }
148
149    pub async fn hash_logical_file(&self, relative: &str) -> Result<u64> {
150        let root = self.root.clone();
151        let relative = normalize_relative(relative);
152        tokio::task::spawn_blocking(move || {
153            let store = StagingStore::new(root);
154            let mut reader = store.open_logical_reader(&relative)?;
155            hash_reader(&mut reader)
156        })
157        .await?
158    }
159
160    pub async fn hash_logical_file_compat(&self, relative: &str) -> Result<(u64, u64)> {
161        let root = self.root.clone();
162        let relative = normalize_relative(relative);
163        tokio::task::spawn_blocking(move || {
164            let store = StagingStore::new(root);
165            let mut reader = store.open_logical_reader(&relative)?;
166            hash_reader_compat(&mut reader)
167        })
168        .await?
169    }
170
171    pub fn open_logical_reader(&self, relative: &str) -> Result<Box<dyn Read + Send>> {
172        let logical = self.logical_path(relative);
173        if logical.exists() {
174            let file = File::open(&logical)
175                .with_context(|| format!("failed to open {}", logical.display()))?;
176            return Ok(Box::new(BufReader::new(file)));
177        }
178        let compressed = compressed_path(&logical);
179        if compressed.exists() {
180            let file = File::open(&compressed)
181                .with_context(|| format!("failed to open {}", compressed.display()))?;
182            let decoder = zstd::stream::read::Decoder::new(file)
183                .with_context(|| format!("failed to decode {}", compressed.display()))?;
184            return Ok(Box::new(BufReader::new(decoder)));
185        }
186        bail!("logical staging file is missing: {}", logical.display())
187    }
188
189    pub async fn materialize_logical_file(&self, relative: &str, dest: &Path) -> Result<()> {
190        let src = self.logical_path(relative);
191        if src.exists() {
192            modde_core::link::link_or_copy(&src, dest).await?;
193            return Ok(());
194        }
195
196        let compressed = compressed_path(&src);
197        if let Some(parent) = dest.parent() {
198            tokio::fs::create_dir_all(parent).await?;
199        }
200        if dest.exists() || dest.symlink_metadata().is_ok() {
201            tokio::fs::remove_file(dest).await.ok();
202        }
203        let dest = dest.to_path_buf();
204        tokio::task::spawn_blocking(move || decode_file_to(&compressed, &dest)).await??;
205        Ok(())
206    }
207
208    #[must_use]
209    pub fn write_path_for_logical(&self, relative: &str, expected_size: Option<u64>) -> PathBuf {
210        let logical = self.logical_path(relative);
211        if self.should_compress_logical(relative, expected_size) {
212            compressed_path(&logical)
213        } else {
214            logical
215        }
216    }
217
218    #[must_use]
219    pub fn should_compress_logical(&self, relative: &str, expected_size: Option<u64>) -> bool {
220        let logical = self.logical_path(relative);
221        eligible_for_compression_path(&self.root, &logical, &self.policy, expected_size)
222    }
223
224    pub async fn compress_eligible_files(&self, workers: usize) -> Result<CompressionSummary> {
225        let root = self.root.clone();
226        let policy = self.policy.clone();
227        tokio::task::spawn_blocking(move || {
228            let mut files = Vec::new();
229            collect_files(&root, &root, &mut files)?;
230
231            let mut summary = CompressionSummary {
232                compressed_files: 0,
233                skipped_files: 0,
234                original_bytes: 0,
235                compressed_bytes: 0,
236            };
237            let zstd_workers = workers.clamp(1, 32) as u32;
238
239            for path in files {
240                if !eligible_for_compression(&root, &path, &policy)? {
241                    summary.skipped_files += 1;
242                    continue;
243                }
244                let metadata = std::fs::metadata(&path)?;
245                let original_len = metadata.len();
246                let compressed = compressed_path(&path);
247                compress_file(&path, &compressed, policy.level, zstd_workers)?;
248                let compressed_len = std::fs::metadata(&compressed)?.len();
249                std::fs::remove_file(&path)?;
250                summary.compressed_files += 1;
251                summary.original_bytes += original_len;
252                summary.compressed_bytes += compressed_len;
253                debug!(
254                    src = %path.display(),
255                    dst = %compressed.display(),
256                    original_len,
257                    compressed_len,
258                    "compressed staging file"
259                );
260            }
261
262            Ok(summary)
263        })
264        .await?
265    }
266
267    fn logical_path(&self, relative: &str) -> PathBuf {
268        self.root.join(normalize_relative(relative))
269    }
270
271    fn state_dir(&self) -> PathBuf {
272        self.root.join("_state")
273    }
274
275    fn layout_path(&self) -> PathBuf {
276        self.state_dir().join("staging-layout.json")
277    }
278
279    async fn layout_compatible(&self) -> bool {
280        let Ok(data) = tokio::fs::read_to_string(self.layout_path()).await else {
281            return false;
282        };
283        let Ok(layout) = serde_json::from_str::<StagingLayout>(&data) else {
284            return false;
285        };
286        layout
287            == StagingLayout {
288                version: LAYOUT_VERSION,
289                compression: self.policy.clone(),
290            }
291    }
292
293    async fn write_layout(&self) -> Result<()> {
294        let layout = StagingLayout {
295            version: LAYOUT_VERSION,
296            compression: self.policy.clone(),
297        };
298        tokio::fs::write(self.layout_path(), serde_json::to_vec_pretty(&layout)?).await?;
299        Ok(())
300    }
301}
302
303#[must_use]
304pub fn compressed_path(path: &Path) -> PathBuf {
305    let mut name: OsString = path.as_os_str().to_os_string();
306    name.push(COMPRESSED_SUFFIX);
307    PathBuf::from(name)
308}
309
310#[must_use]
311pub fn is_compressed_path(path: &Path) -> bool {
312    path.as_os_str()
313        .to_string_lossy()
314        .ends_with(COMPRESSED_SUFFIX)
315}
316
317#[must_use]
318pub fn logical_path_from_compressed(path: &Path) -> PathBuf {
319    let text = path.as_os_str().to_string_lossy();
320    if let Some(stripped) = text.strip_suffix(COMPRESSED_SUFFIX) {
321        PathBuf::from(stripped)
322    } else {
323        path.to_path_buf()
324    }
325}
326
327fn normalize_relative(path: &str) -> String {
328    path.replace('\\', "/")
329}
330
331fn hash_reader(reader: &mut dyn Read) -> Result<u64> {
332    let mut hasher = Xxh3::new();
333    let mut buf = vec![0_u8; 1024 * 1024];
334    loop {
335        let read = reader.read(&mut buf)?;
336        if read == 0 {
337            break;
338        }
339        hasher.update(&buf[..read]);
340    }
341    Ok(hasher.digest())
342}
343
344fn hash_reader_compat(reader: &mut dyn Read) -> Result<(u64, u64)> {
345    let mut xxh64 = Xxh64::new(0);
346    let mut xxh3 = Xxh3::new();
347    let mut buf = vec![0_u8; 1024 * 1024];
348    loop {
349        let read = reader.read(&mut buf)?;
350        if read == 0 {
351            break;
352        }
353        xxh64.update(&buf[..read]);
354        xxh3.update(&buf[..read]);
355    }
356    Ok((xxh64.digest(), xxh3.digest()))
357}
358
359fn collect_files(root: &Path, dir: &Path, files: &mut Vec<PathBuf>) -> Result<()> {
360    for entry in std::fs::read_dir(dir)? {
361        let entry = entry?;
362        let path = entry.path();
363        let file_type = entry.file_type()?;
364        if file_type.is_dir() {
365            let name = path
366                .file_name()
367                .and_then(|name| name.to_str())
368                .unwrap_or("");
369            if path == root.join("_state") || name.starts_with("bsa_temp_") {
370                continue;
371            }
372            collect_files(root, &path, files)?;
373        } else if file_type.is_file() {
374            files.push(path);
375        }
376    }
377    Ok(())
378}
379
380fn eligible_for_compression(
381    root: &Path,
382    path: &Path,
383    policy: &StagingCompressionPolicy,
384) -> Result<bool> {
385    if !eligible_for_compression_path(root, path, policy, None) {
386        return Ok(false);
387    }
388    Ok(std::fs::metadata(path)?.len() >= policy.min_bytes)
389}
390
391fn eligible_for_compression_path(
392    root: &Path,
393    path: &Path,
394    policy: &StagingCompressionPolicy,
395    expected_size: Option<u64>,
396) -> bool {
397    if is_compressed_path(path) {
398        return false;
399    }
400    let relative = path.strip_prefix(root).unwrap_or(path);
401    if relative
402        .components()
403        .any(|component| component.as_os_str() == "_state")
404    {
405        return false;
406    }
407    if relative.components().any(|component| {
408        component
409            .as_os_str()
410            .to_string_lossy()
411            .starts_with("bsa_temp_")
412    }) {
413        return false;
414    }
415    let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
416        return false;
417    };
418    if matches!(file_name, "meta.ini" | "meta.json") {
419        return false;
420    }
421    let extension = path
422        .extension()
423        .and_then(|ext| ext.to_str())
424        .map(str::to_ascii_lowercase);
425    if matches!(
426        extension.as_deref(),
427        Some(
428            "dll"
429                | "exe"
430                | "esp"
431                | "esm"
432                | "esl"
433                | "ini"
434                | "json"
435                | "toml"
436                | "yaml"
437                | "yml"
438                | "xml"
439                | "cfg"
440                | "conf"
441        )
442    ) {
443        return false;
444    }
445    expected_size.is_none_or(|size| size >= policy.min_bytes)
446}
447
448fn compress_file(src: &Path, dest: &Path, level: i32, workers: u32) -> Result<()> {
449    let input = File::open(src).with_context(|| format!("failed to open {}", src.display()))?;
450    let output =
451        File::create(dest).with_context(|| format!("failed to create {}", dest.display()))?;
452    let mut encoder = zstd::stream::write::Encoder::new(BufWriter::new(output), level)?;
453    encoder.multithread(workers)?;
454    let mut reader = BufReader::new(input);
455    std::io::copy(&mut reader, &mut encoder)?;
456    let mut output = encoder.finish()?;
457    output.flush()?;
458    Ok(())
459}
460
461fn decode_file_to(src: &Path, dest: &Path) -> Result<()> {
462    let input = File::open(src).with_context(|| format!("failed to open {}", src.display()))?;
463    let mut decoder = zstd::stream::read::Decoder::new(input)
464        .with_context(|| format!("failed to decode {}", src.display()))?;
465    let output =
466        File::create(dest).with_context(|| format!("failed to create {}", dest.display()))?;
467    let mut writer = BufWriter::new(output);
468    std::io::copy(&mut decoder, &mut writer)?;
469    writer.flush()?;
470    Ok(())
471}
472
473#[cfg(test)]
474mod tests {
475    use super::*;
476    use xxhash_rust::xxh3::xxh3_64;
477
478    fn test_policy(min_bytes: u64) -> StagingCompressionPolicy {
479        StagingCompressionPolicy {
480            min_bytes,
481            level: 1,
482            suffix: COMPRESSED_SUFFIX.to_string(),
483        }
484    }
485
486    #[tokio::test]
487    async fn staging_store_resolves_plain_and_compressed_logical_paths() {
488        let temp = tempfile::tempdir().unwrap();
489        let store = StagingStore::with_policy(temp.path(), test_policy(1));
490        store.prepare_fresh().await.unwrap();
491        let rel = "mods/Test/textures/large.dds";
492        let logical = temp.path().join(rel);
493        tokio::fs::create_dir_all(logical.parent().unwrap())
494            .await
495            .unwrap();
496        tokio::fs::write(&logical, b"plain bytes").await.unwrap();
497        assert!(store.logical_exists(rel).await);
498
499        store.compress_eligible_files(1).await.unwrap();
500        assert!(!logical.exists());
501        assert!(compressed_path(&logical).exists());
502        assert!(store.logical_exists(rel).await);
503        let mut reader = store.open_logical_reader(rel).unwrap();
504        let mut data = Vec::new();
505        reader.read_to_end(&mut data).unwrap();
506        assert_eq!(data, b"plain bytes");
507    }
508
509    #[tokio::test]
510    async fn zstd_round_trip_preserves_hash() {
511        let temp = tempfile::tempdir().unwrap();
512        let store = StagingStore::with_policy(temp.path(), test_policy(1));
513        store.prepare_fresh().await.unwrap();
514        let rel = "mods/Test/meshes/blob.bin";
515        let logical = temp.path().join(rel);
516        tokio::fs::create_dir_all(logical.parent().unwrap())
517            .await
518            .unwrap();
519        let data = vec![42_u8; 256 * 1024];
520        tokio::fs::write(&logical, &data).await.unwrap();
521        store.compress_eligible_files(1).await.unwrap();
522        assert_eq!(store.hash_logical_file(rel).await.unwrap(), xxh3_64(&data));
523    }
524
525    #[tokio::test]
526    async fn compression_policy_skips_metadata_plugins_configs_and_tiny_files() {
527        let temp = tempfile::tempdir().unwrap();
528        let store = StagingStore::with_policy(temp.path(), test_policy(8));
529        store.prepare_fresh().await.unwrap();
530        for rel in [
531            "_state/state.bin",
532            "mods/Test/meta.ini",
533            "bsa_temp_abc/large.dds",
534            "mods/Test/plugin.esp",
535            "mods/Test/skse.dll",
536            "mods/Test/config.ini",
537            "mods/Test/tiny.bin",
538        ] {
539            let path = temp.path().join(rel);
540            tokio::fs::create_dir_all(path.parent().unwrap())
541                .await
542                .unwrap();
543            tokio::fs::write(&path, b"1234").await.unwrap();
544        }
545        let compressible = temp.path().join("mods/Test/texture.dds");
546        tokio::fs::write(&compressible, b"123456789").await.unwrap();
547
548        let summary = store.compress_eligible_files(1).await.unwrap();
549        assert_eq!(summary.compressed_files, 1);
550        assert!(compressed_path(&compressible).exists());
551        assert!(temp.path().join("mods/Test/plugin.esp").exists());
552        assert!(temp.path().join("mods/Test/skse.dll").exists());
553        assert!(temp.path().join("mods/Test/config.ini").exists());
554        assert!(temp.path().join("bsa_temp_abc/large.dds").exists());
555    }
556
557    #[tokio::test]
558    async fn fresh_layout_removes_incompatible_staging_but_keeps_compatible() {
559        let temp = tempfile::tempdir().unwrap();
560        let store = StagingStore::with_policy(temp.path(), test_policy(1));
561        tokio::fs::create_dir_all(temp.path()).await.unwrap();
562        tokio::fs::write(temp.path().join("old.txt"), b"old")
563            .await
564            .unwrap();
565        store.prepare_fresh().await.unwrap();
566        assert!(!temp.path().join("old.txt").exists());
567
568        tokio::fs::write(temp.path().join("kept.txt"), b"kept")
569            .await
570            .unwrap();
571        store.prepare_fresh().await.unwrap();
572        assert!(temp.path().join("kept.txt").exists());
573    }
574
575    #[tokio::test]
576    async fn resumable_layout_adopts_incompatible_staging_without_deleting_files() {
577        let temp = tempfile::tempdir().unwrap();
578        let store = StagingStore::with_policy(temp.path(), test_policy(1));
579        tokio::fs::create_dir_all(temp.path()).await.unwrap();
580        tokio::fs::write(temp.path().join("old.txt"), b"old")
581            .await
582            .unwrap();
583
584        assert_eq!(
585            store.prepare_resumable().await.unwrap(),
586            StagingPrepareStatus::Adopted
587        );
588        assert!(temp.path().join("old.txt").exists());
589        assert!(store.has_compatible_layout().await);
590
591        assert_eq!(
592            store.prepare_resumable().await.unwrap(),
593            StagingPrepareStatus::Compatible
594        );
595
596        assert_eq!(
597            store.reset_and_prepare().await.unwrap(),
598            StagingPrepareStatus::Reset
599        );
600        assert!(!temp.path().join("old.txt").exists());
601    }
602}