1use std::ffi::OsString;
7use std::fs::File;
8use std::io::{BufReader, BufWriter, Read, Write};
9use std::path::{Path, PathBuf};
10
11use anyhow::{Context, Result, bail};
12use serde::{Deserialize, Serialize};
13use tracing::{debug, info};
14use xxhash_rust::xxh3::Xxh3;
15use xxhash_rust::xxh64::Xxh64;
16
17const LAYOUT_VERSION: u32 = 1;
18const COMPRESSED_SUFFIX: &str = ".modde-zst";
19const DEFAULT_MIN_BYTES: u64 = 1024 * 1024;
20const DEFAULT_LEVEL: i32 = 9;
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23pub struct StagingCompressionPolicy {
24 pub min_bytes: u64,
25 pub level: i32,
26 pub suffix: String,
27}
28
29impl StagingCompressionPolicy {
30 #[must_use]
31 pub fn from_env() -> Self {
32 let min_bytes = std::env::var("MODDE_ZSTD_MIN_BYTES")
33 .ok()
34 .and_then(|v| v.parse::<u64>().ok())
35 .unwrap_or(DEFAULT_MIN_BYTES);
36 let level = std::env::var("MODDE_ZSTD_LEVEL")
37 .ok()
38 .and_then(|v| v.parse::<i32>().ok())
39 .unwrap_or(DEFAULT_LEVEL)
40 .clamp(1, 22);
41 Self {
42 min_bytes,
43 level,
44 suffix: COMPRESSED_SUFFIX.to_string(),
45 }
46 }
47}
48
49#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
50struct StagingLayout {
51 version: u32,
52 compression: StagingCompressionPolicy,
53}
54
55#[derive(Debug, Clone)]
56pub struct StagingStore {
57 root: PathBuf,
58 policy: StagingCompressionPolicy,
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62pub struct CompressionSummary {
63 pub compressed_files: usize,
64 pub skipped_files: usize,
65 pub original_bytes: u64,
66 pub compressed_bytes: u64,
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum StagingPrepareStatus {
71 Created,
72 Compatible,
73 Adopted,
74 Reset,
75}
76
77impl StagingStore {
78 #[must_use]
79 pub fn new(root: impl Into<PathBuf>) -> Self {
80 Self {
81 root: root.into(),
82 policy: StagingCompressionPolicy::from_env(),
83 }
84 }
85
86 #[must_use]
87 pub fn with_policy(root: impl Into<PathBuf>, policy: StagingCompressionPolicy) -> Self {
88 Self {
89 root: root.into(),
90 policy,
91 }
92 }
93
94 pub async fn prepare_fresh(&self) -> Result<()> {
95 if self.root.exists() && !self.layout_compatible().await {
96 info!(
97 staging = %self.root.display(),
98 "removing incompatible Wabbajack staging layout"
99 );
100 tokio::fs::remove_dir_all(&self.root)
101 .await
102 .with_context(|| format!("failed to remove {}", self.root.display()))?;
103 }
104 tokio::fs::create_dir_all(self.state_dir()).await?;
105 self.write_layout().await
106 }
107
108 pub async fn prepare_resumable(&self) -> Result<StagingPrepareStatus> {
109 if !self.root.exists() {
110 tokio::fs::create_dir_all(self.state_dir()).await?;
111 self.write_layout().await?;
112 return Ok(StagingPrepareStatus::Created);
113 }
114
115 if self.layout_compatible().await {
116 return Ok(StagingPrepareStatus::Compatible);
117 }
118
119 info!(
120 staging = %self.root.display(),
121 "adopting existing Wabbajack staging layout"
122 );
123 tokio::fs::create_dir_all(self.state_dir()).await?;
124 self.write_layout().await?;
125 Ok(StagingPrepareStatus::Adopted)
126 }
127
128 pub async fn reset_and_prepare(&self) -> Result<StagingPrepareStatus> {
129 if self.root.exists() {
130 tokio::fs::remove_dir_all(&self.root)
131 .await
132 .with_context(|| format!("failed to remove {}", self.root.display()))?;
133 }
134 tokio::fs::create_dir_all(self.state_dir()).await?;
135 self.write_layout().await?;
136 Ok(StagingPrepareStatus::Reset)
137 }
138
139 pub async fn has_compatible_layout(&self) -> bool {
140 self.layout_compatible().await
141 }
142
143 pub async fn logical_exists(&self, relative: &str) -> bool {
144 let logical = self.logical_path(relative);
145 tokio::fs::metadata(&logical).await.is_ok()
146 || tokio::fs::metadata(compressed_path(&logical)).await.is_ok()
147 }
148
149 pub async fn hash_logical_file(&self, relative: &str) -> Result<u64> {
150 let root = self.root.clone();
151 let relative = normalize_relative(relative);
152 tokio::task::spawn_blocking(move || {
153 let store = StagingStore::new(root);
154 let mut reader = store.open_logical_reader(&relative)?;
155 hash_reader(&mut reader)
156 })
157 .await?
158 }
159
160 pub async fn hash_logical_file_compat(&self, relative: &str) -> Result<(u64, u64)> {
161 let root = self.root.clone();
162 let relative = normalize_relative(relative);
163 tokio::task::spawn_blocking(move || {
164 let store = StagingStore::new(root);
165 let mut reader = store.open_logical_reader(&relative)?;
166 hash_reader_compat(&mut reader)
167 })
168 .await?
169 }
170
171 pub fn open_logical_reader(&self, relative: &str) -> Result<Box<dyn Read + Send>> {
172 let logical = self.logical_path(relative);
173 if logical.exists() {
174 let file = File::open(&logical)
175 .with_context(|| format!("failed to open {}", logical.display()))?;
176 return Ok(Box::new(BufReader::new(file)));
177 }
178 let compressed = compressed_path(&logical);
179 if compressed.exists() {
180 let file = File::open(&compressed)
181 .with_context(|| format!("failed to open {}", compressed.display()))?;
182 let decoder = zstd::stream::read::Decoder::new(file)
183 .with_context(|| format!("failed to decode {}", compressed.display()))?;
184 return Ok(Box::new(BufReader::new(decoder)));
185 }
186 bail!("logical staging file is missing: {}", logical.display())
187 }
188
189 pub async fn materialize_logical_file(&self, relative: &str, dest: &Path) -> Result<()> {
190 let src = self.logical_path(relative);
191 if src.exists() {
192 modde_core::link::link_or_copy(&src, dest).await?;
193 return Ok(());
194 }
195
196 let compressed = compressed_path(&src);
197 if let Some(parent) = dest.parent() {
198 tokio::fs::create_dir_all(parent).await?;
199 }
200 if dest.exists() || dest.symlink_metadata().is_ok() {
201 tokio::fs::remove_file(dest).await.ok();
202 }
203 let dest = dest.to_path_buf();
204 tokio::task::spawn_blocking(move || decode_file_to(&compressed, &dest)).await??;
205 Ok(())
206 }
207
208 #[must_use]
209 pub fn write_path_for_logical(&self, relative: &str, expected_size: Option<u64>) -> PathBuf {
210 let logical = self.logical_path(relative);
211 if self.should_compress_logical(relative, expected_size) {
212 compressed_path(&logical)
213 } else {
214 logical
215 }
216 }
217
218 #[must_use]
219 pub fn should_compress_logical(&self, relative: &str, expected_size: Option<u64>) -> bool {
220 let logical = self.logical_path(relative);
221 eligible_for_compression_path(&self.root, &logical, &self.policy, expected_size)
222 }
223
224 pub async fn compress_eligible_files(&self, workers: usize) -> Result<CompressionSummary> {
225 let root = self.root.clone();
226 let policy = self.policy.clone();
227 tokio::task::spawn_blocking(move || {
228 let mut files = Vec::new();
229 collect_files(&root, &root, &mut files)?;
230
231 let mut summary = CompressionSummary {
232 compressed_files: 0,
233 skipped_files: 0,
234 original_bytes: 0,
235 compressed_bytes: 0,
236 };
237 let zstd_workers = workers.clamp(1, 32) as u32;
238
239 for path in files {
240 if !eligible_for_compression(&root, &path, &policy)? {
241 summary.skipped_files += 1;
242 continue;
243 }
244 let metadata = std::fs::metadata(&path)?;
245 let original_len = metadata.len();
246 let compressed = compressed_path(&path);
247 compress_file(&path, &compressed, policy.level, zstd_workers)?;
248 let compressed_len = std::fs::metadata(&compressed)?.len();
249 std::fs::remove_file(&path)?;
250 summary.compressed_files += 1;
251 summary.original_bytes += original_len;
252 summary.compressed_bytes += compressed_len;
253 debug!(
254 src = %path.display(),
255 dst = %compressed.display(),
256 original_len,
257 compressed_len,
258 "compressed staging file"
259 );
260 }
261
262 Ok(summary)
263 })
264 .await?
265 }
266
267 fn logical_path(&self, relative: &str) -> PathBuf {
268 self.root.join(normalize_relative(relative))
269 }
270
271 fn state_dir(&self) -> PathBuf {
272 self.root.join("_state")
273 }
274
275 fn layout_path(&self) -> PathBuf {
276 self.state_dir().join("staging-layout.json")
277 }
278
279 async fn layout_compatible(&self) -> bool {
280 let Ok(data) = tokio::fs::read_to_string(self.layout_path()).await else {
281 return false;
282 };
283 let Ok(layout) = serde_json::from_str::<StagingLayout>(&data) else {
284 return false;
285 };
286 layout
287 == StagingLayout {
288 version: LAYOUT_VERSION,
289 compression: self.policy.clone(),
290 }
291 }
292
293 async fn write_layout(&self) -> Result<()> {
294 let layout = StagingLayout {
295 version: LAYOUT_VERSION,
296 compression: self.policy.clone(),
297 };
298 tokio::fs::write(self.layout_path(), serde_json::to_vec_pretty(&layout)?).await?;
299 Ok(())
300 }
301}
302
303#[must_use]
304pub fn compressed_path(path: &Path) -> PathBuf {
305 let mut name: OsString = path.as_os_str().to_os_string();
306 name.push(COMPRESSED_SUFFIX);
307 PathBuf::from(name)
308}
309
310#[must_use]
311pub fn is_compressed_path(path: &Path) -> bool {
312 path.as_os_str()
313 .to_string_lossy()
314 .ends_with(COMPRESSED_SUFFIX)
315}
316
317#[must_use]
318pub fn logical_path_from_compressed(path: &Path) -> PathBuf {
319 let text = path.as_os_str().to_string_lossy();
320 if let Some(stripped) = text.strip_suffix(COMPRESSED_SUFFIX) {
321 PathBuf::from(stripped)
322 } else {
323 path.to_path_buf()
324 }
325}
326
327fn normalize_relative(path: &str) -> String {
328 path.replace('\\', "/")
329}
330
331fn hash_reader(reader: &mut dyn Read) -> Result<u64> {
332 let mut hasher = Xxh3::new();
333 let mut buf = vec![0_u8; 1024 * 1024];
334 loop {
335 let read = reader.read(&mut buf)?;
336 if read == 0 {
337 break;
338 }
339 hasher.update(&buf[..read]);
340 }
341 Ok(hasher.digest())
342}
343
344fn hash_reader_compat(reader: &mut dyn Read) -> Result<(u64, u64)> {
345 let mut xxh64 = Xxh64::new(0);
346 let mut xxh3 = Xxh3::new();
347 let mut buf = vec![0_u8; 1024 * 1024];
348 loop {
349 let read = reader.read(&mut buf)?;
350 if read == 0 {
351 break;
352 }
353 xxh64.update(&buf[..read]);
354 xxh3.update(&buf[..read]);
355 }
356 Ok((xxh64.digest(), xxh3.digest()))
357}
358
359fn collect_files(root: &Path, dir: &Path, files: &mut Vec<PathBuf>) -> Result<()> {
360 for entry in std::fs::read_dir(dir)? {
361 let entry = entry?;
362 let path = entry.path();
363 let file_type = entry.file_type()?;
364 if file_type.is_dir() {
365 let name = path
366 .file_name()
367 .and_then(|name| name.to_str())
368 .unwrap_or("");
369 if path == root.join("_state") || name.starts_with("bsa_temp_") {
370 continue;
371 }
372 collect_files(root, &path, files)?;
373 } else if file_type.is_file() {
374 files.push(path);
375 }
376 }
377 Ok(())
378}
379
380fn eligible_for_compression(
381 root: &Path,
382 path: &Path,
383 policy: &StagingCompressionPolicy,
384) -> Result<bool> {
385 if !eligible_for_compression_path(root, path, policy, None) {
386 return Ok(false);
387 }
388 Ok(std::fs::metadata(path)?.len() >= policy.min_bytes)
389}
390
391fn eligible_for_compression_path(
392 root: &Path,
393 path: &Path,
394 policy: &StagingCompressionPolicy,
395 expected_size: Option<u64>,
396) -> bool {
397 if is_compressed_path(path) {
398 return false;
399 }
400 let relative = path.strip_prefix(root).unwrap_or(path);
401 if relative
402 .components()
403 .any(|component| component.as_os_str() == "_state")
404 {
405 return false;
406 }
407 if relative.components().any(|component| {
408 component
409 .as_os_str()
410 .to_string_lossy()
411 .starts_with("bsa_temp_")
412 }) {
413 return false;
414 }
415 let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
416 return false;
417 };
418 if matches!(file_name, "meta.ini" | "meta.json") {
419 return false;
420 }
421 let extension = path
422 .extension()
423 .and_then(|ext| ext.to_str())
424 .map(str::to_ascii_lowercase);
425 if matches!(
426 extension.as_deref(),
427 Some(
428 "dll"
429 | "exe"
430 | "esp"
431 | "esm"
432 | "esl"
433 | "ini"
434 | "json"
435 | "toml"
436 | "yaml"
437 | "yml"
438 | "xml"
439 | "cfg"
440 | "conf"
441 )
442 ) {
443 return false;
444 }
445 expected_size.is_none_or(|size| size >= policy.min_bytes)
446}
447
448fn compress_file(src: &Path, dest: &Path, level: i32, workers: u32) -> Result<()> {
449 let input = File::open(src).with_context(|| format!("failed to open {}", src.display()))?;
450 let output =
451 File::create(dest).with_context(|| format!("failed to create {}", dest.display()))?;
452 let mut encoder = zstd::stream::write::Encoder::new(BufWriter::new(output), level)?;
453 encoder.multithread(workers)?;
454 let mut reader = BufReader::new(input);
455 std::io::copy(&mut reader, &mut encoder)?;
456 let mut output = encoder.finish()?;
457 output.flush()?;
458 Ok(())
459}
460
461fn decode_file_to(src: &Path, dest: &Path) -> Result<()> {
462 let input = File::open(src).with_context(|| format!("failed to open {}", src.display()))?;
463 let mut decoder = zstd::stream::read::Decoder::new(input)
464 .with_context(|| format!("failed to decode {}", src.display()))?;
465 let output =
466 File::create(dest).with_context(|| format!("failed to create {}", dest.display()))?;
467 let mut writer = BufWriter::new(output);
468 std::io::copy(&mut decoder, &mut writer)?;
469 writer.flush()?;
470 Ok(())
471}
472
473#[cfg(test)]
474mod tests {
475 use super::*;
476 use xxhash_rust::xxh3::xxh3_64;
477
478 fn test_policy(min_bytes: u64) -> StagingCompressionPolicy {
479 StagingCompressionPolicy {
480 min_bytes,
481 level: 1,
482 suffix: COMPRESSED_SUFFIX.to_string(),
483 }
484 }
485
486 #[tokio::test]
487 async fn staging_store_resolves_plain_and_compressed_logical_paths() {
488 let temp = tempfile::tempdir().unwrap();
489 let store = StagingStore::with_policy(temp.path(), test_policy(1));
490 store.prepare_fresh().await.unwrap();
491 let rel = "mods/Test/textures/large.dds";
492 let logical = temp.path().join(rel);
493 tokio::fs::create_dir_all(logical.parent().unwrap())
494 .await
495 .unwrap();
496 tokio::fs::write(&logical, b"plain bytes").await.unwrap();
497 assert!(store.logical_exists(rel).await);
498
499 store.compress_eligible_files(1).await.unwrap();
500 assert!(!logical.exists());
501 assert!(compressed_path(&logical).exists());
502 assert!(store.logical_exists(rel).await);
503 let mut reader = store.open_logical_reader(rel).unwrap();
504 let mut data = Vec::new();
505 reader.read_to_end(&mut data).unwrap();
506 assert_eq!(data, b"plain bytes");
507 }
508
509 #[tokio::test]
510 async fn zstd_round_trip_preserves_hash() {
511 let temp = tempfile::tempdir().unwrap();
512 let store = StagingStore::with_policy(temp.path(), test_policy(1));
513 store.prepare_fresh().await.unwrap();
514 let rel = "mods/Test/meshes/blob.bin";
515 let logical = temp.path().join(rel);
516 tokio::fs::create_dir_all(logical.parent().unwrap())
517 .await
518 .unwrap();
519 let data = vec![42_u8; 256 * 1024];
520 tokio::fs::write(&logical, &data).await.unwrap();
521 store.compress_eligible_files(1).await.unwrap();
522 assert_eq!(store.hash_logical_file(rel).await.unwrap(), xxh3_64(&data));
523 }
524
525 #[tokio::test]
526 async fn compression_policy_skips_metadata_plugins_configs_and_tiny_files() {
527 let temp = tempfile::tempdir().unwrap();
528 let store = StagingStore::with_policy(temp.path(), test_policy(8));
529 store.prepare_fresh().await.unwrap();
530 for rel in [
531 "_state/state.bin",
532 "mods/Test/meta.ini",
533 "bsa_temp_abc/large.dds",
534 "mods/Test/plugin.esp",
535 "mods/Test/skse.dll",
536 "mods/Test/config.ini",
537 "mods/Test/tiny.bin",
538 ] {
539 let path = temp.path().join(rel);
540 tokio::fs::create_dir_all(path.parent().unwrap())
541 .await
542 .unwrap();
543 tokio::fs::write(&path, b"1234").await.unwrap();
544 }
545 let compressible = temp.path().join("mods/Test/texture.dds");
546 tokio::fs::write(&compressible, b"123456789").await.unwrap();
547
548 let summary = store.compress_eligible_files(1).await.unwrap();
549 assert_eq!(summary.compressed_files, 1);
550 assert!(compressed_path(&compressible).exists());
551 assert!(temp.path().join("mods/Test/plugin.esp").exists());
552 assert!(temp.path().join("mods/Test/skse.dll").exists());
553 assert!(temp.path().join("mods/Test/config.ini").exists());
554 assert!(temp.path().join("bsa_temp_abc/large.dds").exists());
555 }
556
557 #[tokio::test]
558 async fn fresh_layout_removes_incompatible_staging_but_keeps_compatible() {
559 let temp = tempfile::tempdir().unwrap();
560 let store = StagingStore::with_policy(temp.path(), test_policy(1));
561 tokio::fs::create_dir_all(temp.path()).await.unwrap();
562 tokio::fs::write(temp.path().join("old.txt"), b"old")
563 .await
564 .unwrap();
565 store.prepare_fresh().await.unwrap();
566 assert!(!temp.path().join("old.txt").exists());
567
568 tokio::fs::write(temp.path().join("kept.txt"), b"kept")
569 .await
570 .unwrap();
571 store.prepare_fresh().await.unwrap();
572 assert!(temp.path().join("kept.txt").exists());
573 }
574
575 #[tokio::test]
576 async fn resumable_layout_adopts_incompatible_staging_without_deleting_files() {
577 let temp = tempfile::tempdir().unwrap();
578 let store = StagingStore::with_policy(temp.path(), test_policy(1));
579 tokio::fs::create_dir_all(temp.path()).await.unwrap();
580 tokio::fs::write(temp.path().join("old.txt"), b"old")
581 .await
582 .unwrap();
583
584 assert_eq!(
585 store.prepare_resumable().await.unwrap(),
586 StagingPrepareStatus::Adopted
587 );
588 assert!(temp.path().join("old.txt").exists());
589 assert!(store.has_compatible_layout().await);
590
591 assert_eq!(
592 store.prepare_resumable().await.unwrap(),
593 StagingPrepareStatus::Compatible
594 );
595
596 assert_eq!(
597 store.reset_and_prepare().await.unwrap(),
598 StagingPrepareStatus::Reset
599 );
600 assert!(!temp.path().join("old.txt").exists());
601 }
602}