Skip to main content

zipatch_rs/apply/
mod.rs

1//! Filesystem application of parsed `ZiPatch` chunks.
2//!
3//! # Parse / apply separation
4//!
5//! The crate is intentionally split into two independent layers:
6//!
7//! - **Parsing** (`src/chunk/`) — reads the binary wire format and produces
8//!   [`Chunk`] values. Nothing in the parser allocates file handles, stats
9//!   paths, or performs I/O against the install tree.
10//! - **Applying** (this module) — takes a stream of [`Chunk`] values and
11//!   writes the patch changes to disk (or to whatever backing the configured
12//!   [`Vfs`](crate::apply::Vfs) provides).
13//!
14//! The only bridge between the two layers is [`Chunk::apply`], which
15//! dispatches each parsed chunk to its filesystem-side logic. Callers that
16//! only need to inspect patch contents can use the parser without ever
17//! touching this module.
18//!
19//! # Two-type API: [`ApplyConfig`] and [`ApplySession`]
20//!
21//! The apply layer is split into two complementary types, each with one job:
22//!
23//! - [`ApplyConfig`] — the frozen *configuration* of an apply: install root,
24//!   target platform, ignore flags, the [`Vfs`](crate::apply::Vfs) backing, observer, and
25//!   checkpoint sink. Constructed via [`ApplyConfig::new`] and configured
26//!   via `with_*` builder methods. Holds no runtime state and performs no
27//!   I/O.
28//! - [`ApplySession`] — the *runtime* state of an active apply: open
29//!   file-handle cache, memoised directory and `SqPack` path caches, the
30//!   reusable DEFLATE decompressor, and per-chunk progress bookkeeping.
31//!   Created by consuming an [`ApplyConfig`] via
32//!   [`ApplyConfig::into_session`], or implicitly by the high-level
33//!   driver entry points.
34//!
35//! # Pluggable filesystem
36//!
37//! Every filesystem effect — open, write, truncate, fsync, mkdir, unlink,
38//! readdir — is routed through the [`Vfs`](crate::apply::Vfs) trait. The default backing
39//! ([`StdFs`](crate::apply::StdFs)) wraps `std::fs`. Swap in
40//! [`InMemoryFs`](crate::apply::InMemoryFs) for tests, dry-run
41//! previews, or sandboxed/embedded scenarios:
42//!
43//! ```no_run
44//! use zipatch_rs::{ApplyConfig, ZiPatchReader};
45//! use zipatch_rs::apply::InMemoryFs;
46//!
47//! let fs = InMemoryFs::new();
48//! let cfg = ApplyConfig::new("/virtual/game").with_vfs(fs.clone());
49//! // cfg.apply_patch(reader)?;
50//! // fs.snapshot_files() — inspect the resulting in-memory layout.
51//! # let _ = (cfg, fs);
52//! ```
53//!
54//! ## Driver entry points
55//!
56//! The high-level drivers live on [`ApplyConfig`] as consuming methods so
57//! the typical "build config, run patch, drop" flow stays a one-liner:
58//!
59//! ```no_run
60//! use std::fs::File;
61//! use zipatch_rs::{ApplyConfig, ZiPatchReader};
62//!
63//! let reader = ZiPatchReader::new(File::open("game.patch").unwrap()).unwrap();
64//! ApplyConfig::new("/opt/ffxiv/game").apply_patch(reader).unwrap();
65//! ```
66//!
67//! Callers that want to dispatch individual chunks (or apply several patches
68//! back-to-back against the same install) materialise a session explicitly:
69//!
70//! ```no_run
71//! use zipatch_rs::{ApplyConfig, Chunk};
72//!
73//! let mut session = ApplyConfig::new("/opt/ffxiv/game").into_session();
74//! // chunk.apply(&mut session)?;
75//! # let _ = &mut session;
76//! ```
77//!
78//! # File-handle cache
79//!
80//! Every apply path that writes to a `SqPack` file calls an internal
81//! `open_cached` method on [`ApplySession`] rather than opening the file
82//! directly. The cache transparently returns an existing writable handle or
83//! opens a new one via the configured [`Vfs`](crate::apply::Vfs).
84//!
85//! Cached handles are wrapped in a [`std::io::BufWriter`] with a per-handle
86//! buffer (default 64 KiB, see [`DEFAULT_BUFFER_CAPACITY`](crate::apply::DEFAULT_BUFFER_CAPACITY))
87//! to coalesce the many small writes the SQPK pipeline emits. Override via
88//! [`ApplyConfig::with_buffer_capacity`].
89//!
90//! The cache is capped at [`DEFAULT_MAX_CACHED_FDS`](crate::apply::DEFAULT_MAX_CACHED_FDS)
91//! (256) entries by default; override via [`ApplyConfig::with_max_cached_fds`]. When it is
92//! full and a new, uncached path is requested, **all** cached handles are
93//! flushed and closed at once before the new one is inserted.
94//!
95//! # Errors
96//!
97//! Every [`Chunk::apply`] call returns [`crate::ApplyResult`], which is
98//! `Result<(), `[`crate::ApplyError`]`>`. On error, the apply operation
99//! aborts at the failing chunk.
100
101mod cancel;
102pub mod checkpoint;
103mod driver;
104pub(crate) mod observer;
105pub(crate) mod path;
106pub(crate) mod sqpk;
107pub mod vfs;
108
109pub use cancel::CancelToken;
110pub use checkpoint::{
111    Checkpoint, CheckpointPolicy, CheckpointSink, InFlightAddFile, IndexedCheckpoint,
112    NoopCheckpointSink, SequentialCheckpoint,
113};
114pub use observer::{ApplyObserver, ChunkEvent, NoopObserver};
115pub use vfs::{InMemoryFs, StdFs, Vfs, VfsMetadata, VfsRead, VfsWrite};
116
117use crate::ApplyResult as Result;
118use crate::Platform;
119use crate::chunk::Chunk;
120use crate::chunk::adir::AddDirectory;
121use crate::chunk::aply::{ApplyOption, ApplyOptionKind};
122use crate::chunk::ddir::DeleteDirectory;
123use std::collections::{HashMap, HashSet};
124use std::io::{BufWriter, Seek, SeekFrom, Write};
125use std::path::{Path, PathBuf};
126use tracing::{trace, warn};
127
128/// Buffered, seekable wrapper around a [`VfsWrite`] handle.
129///
130/// Stored in [`ApplySession`]'s file-handle cache. The `BufWriter` coalesces
131/// the many small writes the SQPK pipeline emits into a smaller number of
132/// underlying `write()` calls against the vfs handle.
133pub(crate) struct CachedWriter {
134    inner: BufWriter<Box<dyn VfsWrite>>,
135}
136
137impl std::fmt::Debug for CachedWriter {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.debug_struct("CachedWriter").finish_non_exhaustive()
140    }
141}
142
143impl Write for CachedWriter {
144    #[inline]
145    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
146        self.inner.write(buf)
147    }
148    #[inline]
149    fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
150        self.inner.write_all(buf)
151    }
152    #[inline]
153    fn flush(&mut self) -> std::io::Result<()> {
154        self.inner.flush()
155    }
156}
157
158impl Seek for CachedWriter {
159    #[inline]
160    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
161        self.inner.seek(pos)
162    }
163    #[inline]
164    fn stream_position(&mut self) -> std::io::Result<u64> {
165        self.inner.stream_position()
166    }
167}
168
169impl CachedWriter {
170    /// Truncate the underlying file to length zero.
171    pub(crate) fn truncate_to_zero(&mut self) -> std::io::Result<()> {
172        self.inner.flush()?;
173        self.inner.get_mut().set_len(0)
174    }
175
176    /// Force buffered data through and then `sync_all` the underlying handle.
177    fn sync_all_inner(&mut self) -> std::io::Result<()> {
178        self.inner.flush()?;
179        self.inner.get_mut().sync_all()
180    }
181}
182
183/// Panics if `policy` is `FsyncEveryN(0)`. Called from both
184/// [`ApplyConfig::with_checkpoint_sink`] and
185/// [`crate::IndexApplier::with_checkpoint_sink`] so the two install points
186/// surface the same diagnostic.
187pub(crate) fn validate_checkpoint_policy(policy: CheckpointPolicy) {
188    assert!(
189        !matches!(policy, CheckpointPolicy::FsyncEveryN(0)),
190        "CheckpointPolicy::FsyncEveryN(0) is invalid; use CheckpointPolicy::Fsync \
191         for an every-record fsync cadence"
192    );
193}
194
195/// Discriminator for the `path_cache` key: which `SqPack` file kind is being
196/// resolved.
197#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
198pub(crate) enum PathKind {
199    Dat,
200    Index,
201}
202
203/// Cache key for resolved `SqPack` `.dat`/`.index` paths.
204#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
205pub(crate) struct PathCacheKey {
206    pub(crate) main_id: u16,
207    pub(crate) sub_id: u16,
208    pub(crate) file_id: u32,
209    pub(crate) kind: PathKind,
210}
211
212/// Default cap on the [`ApplySession`] open-file-handle cache.
213///
214/// Used by [`ApplyConfig::new`]; override with
215/// [`ApplyConfig::with_max_cached_fds`].
216pub const DEFAULT_MAX_CACHED_FDS: usize = 256;
217
218/// Default per-handle write buffer capacity (64 KiB) wrapped around every
219/// cached [`Vfs`] handle.
220///
221/// Chosen to comfortably absorb the largest single writes the SQPK pipeline
222/// emits. Used by [`ApplyConfig::new`]; override with
223/// [`ApplyConfig::with_buffer_capacity`].
224pub const DEFAULT_BUFFER_CAPACITY: usize = 64 * 1024;
225
226/// Frozen apply-time configuration: install root, target platform, ignore
227/// flags, the [`Vfs`] backing, observer, and checkpoint sink.
228///
229/// `ApplyConfig` is the *configuration* half of the apply API. It owns
230/// everything that should be settled before an apply starts; the *runtime*
231/// half — open file handles, scratch buffers, per-chunk progress — lives
232/// on [`ApplySession`].
233///
234/// # Construction
235///
236/// Build with [`ApplyConfig::new`], then chain the `with_*` builder methods
237/// to override defaults:
238///
239/// ```
240/// use zipatch_rs::{ApplyConfig, Platform};
241///
242/// let cfg = ApplyConfig::new("/opt/ffxiv/game")
243///     .with_platform(Platform::Win32)
244///     .with_ignore_missing(true);
245///
246/// assert_eq!(cfg.game_path().to_str().unwrap(), "/opt/ffxiv/game");
247/// assert_eq!(cfg.platform(), Platform::Win32);
248/// assert!(cfg.ignore_missing());
249/// ```
250///
251/// # Pluggable filesystem
252///
253/// Defaults to [`StdFs`]. Override with [`ApplyConfig::with_vfs`] to swap in
254/// [`InMemoryFs`] (for tests / previews) or any custom [`Vfs`] backing.
255///
256/// # Running a patch
257///
258/// The high-level drivers — [`ApplyConfig::apply_patch`] and
259/// [`ApplyConfig::resume_apply_patch`] — consume the config, materialise
260/// an [`ApplySession`] internally, run the patch end-to-end, and drop the
261/// session on completion.
262///
263/// # Threading
264///
265/// [`ApplyObserver`], [`CheckpointSink`], and [`Vfs`] all carry `Send + Sync`
266/// supertrait bounds, so an `ApplyConfig` can be constructed on one thread
267/// and shipped to a worker for the actual apply.
268pub struct ApplyConfig {
269    game_path: PathBuf,
270    platform: Platform,
271    ignore_missing: bool,
272    ignore_old_mismatch: bool,
273    vfs: Box<dyn Vfs>,
274    observer: Box<dyn ApplyObserver>,
275    checkpoint_sink: Box<dyn CheckpointSink>,
276    cancel_token: Option<CancelToken>,
277    max_cached_fds: usize,
278    buffer_capacity: usize,
279}
280
281impl std::fmt::Debug for ApplyConfig {
282    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
283        f.debug_struct("ApplyConfig")
284            .field("game_path", &self.game_path)
285            .field("platform", &self.platform)
286            .field("ignore_missing", &self.ignore_missing)
287            .field("ignore_old_mismatch", &self.ignore_old_mismatch)
288            .field("vfs", &"<dyn Vfs>")
289            .field("observer", &"<dyn ApplyObserver>")
290            .field("checkpoint_sink", &"<dyn CheckpointSink>")
291            .field("cancel_token", &self.cancel_token)
292            .field("max_cached_fds", &self.max_cached_fds)
293            .field("buffer_capacity", &self.buffer_capacity)
294            .finish()
295    }
296}
297
298impl ApplyConfig {
299    /// Create a configuration targeting the given game install directory.
300    ///
301    /// Defaults: platform is [`Platform::Win32`], both ignore-flags are off,
302    /// vfs is [`StdFs`], observer is [`NoopObserver`], checkpoint sink is
303    /// [`NoopCheckpointSink`].
304    pub fn new(game_path: impl Into<PathBuf>) -> Self {
305        Self {
306            game_path: game_path.into(),
307            platform: Platform::Win32,
308            ignore_missing: false,
309            ignore_old_mismatch: false,
310            vfs: Box::new(StdFs::new()),
311            observer: Box::new(NoopObserver),
312            checkpoint_sink: Box::new(NoopCheckpointSink),
313            cancel_token: None,
314            max_cached_fds: DEFAULT_MAX_CACHED_FDS,
315            buffer_capacity: DEFAULT_BUFFER_CAPACITY,
316        }
317    }
318
319    /// Returns the configured cap on the open-file-handle cache.
320    #[must_use]
321    pub fn max_cached_fds(&self) -> usize {
322        self.max_cached_fds
323    }
324
325    /// Returns the configured per-handle write buffer capacity, in bytes.
326    #[must_use]
327    pub fn buffer_capacity(&self) -> usize {
328        self.buffer_capacity
329    }
330
331    /// Returns the game installation directory.
332    #[must_use]
333    pub fn game_path(&self) -> &Path {
334        &self.game_path
335    }
336
337    /// Returns the configured target platform.
338    #[must_use]
339    pub fn platform(&self) -> Platform {
340        self.platform
341    }
342
343    /// Returns the configured `ignore_missing` flag.
344    #[must_use]
345    pub fn ignore_missing(&self) -> bool {
346        self.ignore_missing
347    }
348
349    /// Returns the configured `ignore_old_mismatch` flag.
350    #[must_use]
351    pub fn ignore_old_mismatch(&self) -> bool {
352        self.ignore_old_mismatch
353    }
354
355    /// Sets the target platform. Defaults to [`Platform::Win32`].
356    ///
357    /// A [`crate::chunk::sqpk::SqpkTargetInfo`] chunk encountered during
358    /// apply overrides this value on the active [`ApplySession`].
359    #[must_use]
360    pub fn with_platform(mut self, platform: Platform) -> Self {
361        self.platform = platform;
362        self
363    }
364
365    /// Silently ignore missing files instead of returning an error during apply.
366    #[must_use]
367    pub fn with_ignore_missing(mut self, v: bool) -> Self {
368        self.ignore_missing = v;
369        self
370    }
371
372    /// Silently ignore old-data mismatches instead of returning an error
373    /// during apply.
374    #[must_use]
375    pub fn with_ignore_old_mismatch(mut self, v: bool) -> Self {
376        self.ignore_old_mismatch = v;
377        self
378    }
379
380    /// Install a [`Vfs`] implementation. Defaults to [`StdFs`].
381    ///
382    /// Swap in [`InMemoryFs`] for tests, dry-run previews, or sandboxed
383    /// environments where the apply must not touch the host filesystem.
384    #[must_use]
385    pub fn with_vfs(mut self, vfs: impl Vfs + 'static) -> Self {
386        self.vfs = Box::new(vfs);
387        self
388    }
389
390    /// Install an [`ApplyObserver`] for progress reporting and cancellation.
391    #[must_use]
392    pub fn with_observer(mut self, observer: impl ApplyObserver + 'static) -> Self {
393        self.observer = Box::new(observer);
394        self
395    }
396
397    /// Install a [`CancelToken`] the apply driver polls between chunks (and
398    /// inside long-running SQPK `AddFile` block loops) to abort cleanly.
399    ///
400    /// Hold a clone on whichever thread initiates cancellation (typically a
401    /// UI handler) and pass the original here. The driver checks both the
402    /// token and any installed [`ApplyObserver::should_cancel`] at every
403    /// cancel point; either source aborts the apply with
404    /// [`ApplyError::Cancelled`](crate::ApplyError::Cancelled).
405    ///
406    /// Consumers that only want cancellation do not need to implement
407    /// [`ApplyObserver`] at all.
408    #[must_use]
409    pub fn with_cancel_token(mut self, token: CancelToken) -> Self {
410        self.cancel_token = Some(token);
411        self
412    }
413
414    /// Install a [`CheckpointSink`] to receive apply-time checkpoints.
415    ///
416    /// # Panics
417    ///
418    /// Panics if the sink reports [`CheckpointPolicy::FsyncEveryN`] with
419    /// `n == 0`.
420    #[must_use]
421    pub fn with_checkpoint_sink(mut self, sink: impl CheckpointSink + 'static) -> Self {
422        validate_checkpoint_policy(sink.policy());
423        self.checkpoint_sink = Box::new(sink);
424        self
425    }
426
427    /// Install a pre-boxed observer. Crate-internal escape hatch for
428    /// callers (notably [`crate::IndexApplier`]) that already hold a
429    /// `Box<dyn ApplyObserver>`.
430    pub(crate) fn set_boxed_observer(&mut self, observer: Box<dyn ApplyObserver>) {
431        self.observer = observer;
432    }
433
434    /// Install a pre-boxed checkpoint sink. Crate-internal escape hatch.
435    pub(crate) fn set_boxed_checkpoint_sink(&mut self, sink: Box<dyn CheckpointSink>) {
436        validate_checkpoint_policy(sink.policy());
437        self.checkpoint_sink = sink;
438    }
439
440    /// Install a pre-boxed vfs. Crate-internal escape hatch.
441    pub(crate) fn set_boxed_vfs(&mut self, vfs: Box<dyn Vfs>) {
442        self.vfs = vfs;
443    }
444
445    /// Install a cancellation token. Crate-internal escape hatch mirroring
446    /// the other `set_boxed_*` setters.
447    pub(crate) fn set_cancel_token(&mut self, token: CancelToken) {
448        self.cancel_token = Some(token);
449    }
450
451    /// Set the maximum number of writable file handles cached by the
452    /// [`ApplySession`]. When the cache reaches this size and a new path is
453    /// requested, every cached handle is flushed and dropped before the new
454    /// one is inserted.
455    ///
456    /// Defaults to [`DEFAULT_MAX_CACHED_FDS`] (256). Lower this for hosts
457    /// with tight FD limits; raise it for high-throughput appliers writing
458    /// to many distinct `SqPack` files.
459    ///
460    /// # Panics
461    ///
462    /// Panics if `n` is zero — a zero-sized cache would force eviction on
463    /// every open and is a programming error.
464    #[must_use]
465    pub fn with_max_cached_fds(mut self, n: usize) -> Self {
466        assert!(n > 0, "with_max_cached_fds(0) is invalid");
467        self.max_cached_fds = n;
468        self
469    }
470
471    /// Set the per-handle [`std::io::BufWriter`] capacity, in bytes, wrapped
472    /// around every cached [`Vfs`] handle.
473    ///
474    /// Defaults to [`DEFAULT_BUFFER_CAPACITY`] (64 KiB). Raise it for
475    /// high-throughput batch appliers; lower it to reduce per-handle memory
476    /// when many handles are cached concurrently.
477    ///
478    /// # Panics
479    ///
480    /// Panics if `bytes` is zero — a zero-sized buffer defeats the purpose
481    /// of wrapping the handle and is a programming error.
482    #[must_use]
483    pub fn with_buffer_capacity(mut self, bytes: usize) -> Self {
484        assert!(bytes > 0, "with_buffer_capacity(0) is invalid");
485        self.buffer_capacity = bytes;
486        self
487    }
488
489    /// Consume this config and materialise a fresh [`ApplySession`].
490    ///
491    /// Follows the `into_X` convention: same configuration data, handed off
492    /// to the type that owns the per-apply runtime state.
493    #[must_use]
494    pub fn into_session(self) -> ApplySession {
495        ApplySession::new(self)
496    }
497}
498
499/// Active apply-time runtime state.
500///
501/// Holds the mutable scratch and bookkeeping a running apply needs: open
502/// file-handle cache, memoised directory and path caches, the reusable
503/// DEFLATE decompressor, and per-chunk progress counters.
504pub struct ApplySession {
505    config: ApplyConfig,
506    pub(crate) file_cache: HashMap<PathBuf, CachedWriter>,
507    pub(crate) dirs_created: HashSet<PathBuf>,
508    pub(crate) path_cache: HashMap<PathCacheKey, PathBuf>,
509    pub(crate) decompressor: flate2::Decompress,
510    pub(crate) checkpoints_since_fsync: u32,
511    #[cfg(any(test, feature = "test-utils"))]
512    pub(crate) test_flush_count: usize,
513    #[cfg(any(test, feature = "test-utils"))]
514    pub(crate) test_sync_count: usize,
515    pub(crate) current_chunk_index: u64,
516    pub(crate) current_chunk_bytes_read: u64,
517    pub(crate) patch_name: Option<String>,
518    pub(crate) patch_size: Option<u64>,
519}
520
521impl std::fmt::Debug for ApplySession {
522    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
523        let mut s = f.debug_struct("ApplySession");
524        s.field("config", &self.config)
525            .field("file_cache_len", &self.file_cache.len())
526            .field("dirs_created_len", &self.dirs_created.len())
527            .field("path_cache_len", &self.path_cache.len())
528            .field("decompressor", &"<flate2::Decompress>")
529            .field("checkpoints_since_fsync", &self.checkpoints_since_fsync)
530            .field("current_chunk_index", &self.current_chunk_index)
531            .field("current_chunk_bytes_read", &self.current_chunk_bytes_read)
532            .field("patch_name", &self.patch_name)
533            .field("patch_size", &self.patch_size);
534        #[cfg(any(test, feature = "test-utils"))]
535        s.field("test_flush_count", &self.test_flush_count)
536            .field("test_sync_count", &self.test_sync_count);
537        s.finish()
538    }
539}
540
541impl ApplySession {
542    fn new(config: ApplyConfig) -> Self {
543        Self {
544            config,
545            file_cache: HashMap::new(),
546            dirs_created: HashSet::new(),
547            path_cache: HashMap::new(),
548            decompressor: flate2::Decompress::new(false),
549            checkpoints_since_fsync: 0,
550            #[cfg(any(test, feature = "test-utils"))]
551            test_flush_count: 0,
552            #[cfg(any(test, feature = "test-utils"))]
553            test_sync_count: 0,
554            current_chunk_index: 0,
555            current_chunk_bytes_read: 0,
556            patch_name: None,
557            patch_size: None,
558        }
559    }
560
561    /// Returns the underlying [`ApplyConfig`].
562    #[must_use]
563    pub fn config(&self) -> &ApplyConfig {
564        &self.config
565    }
566
567    /// Returns the game installation directory.
568    #[must_use]
569    pub fn game_path(&self) -> &Path {
570        &self.config.game_path
571    }
572
573    /// Returns the current target platform.
574    #[must_use]
575    pub fn platform(&self) -> Platform {
576        self.config.platform
577    }
578
579    /// Returns whether missing files are silently ignored.
580    #[must_use]
581    pub fn ignore_missing(&self) -> bool {
582        self.config.ignore_missing
583    }
584
585    /// Returns whether old-data mismatches are silently ignored.
586    #[must_use]
587    pub fn ignore_old_mismatch(&self) -> bool {
588        self.config.ignore_old_mismatch
589    }
590
591    /// Borrow the configured [`Vfs`] backing.
592    pub(crate) fn vfs(&self) -> &dyn Vfs {
593        &*self.config.vfs
594    }
595
596    /// Test-only: return `(flush_count, sync_count)` recorded since this
597    /// session was constructed. Quarantined behind the `test-utils` feature
598    /// and not part of the stable API.
599    #[cfg(any(test, feature = "test-utils"))]
600    #[doc(hidden)]
601    #[must_use]
602    pub fn test_counters(&self) -> (usize, usize) {
603        (self.test_flush_count, self.test_sync_count)
604    }
605
606    pub(crate) fn set_platform(&mut self, platform: Platform) {
607        self.config.platform = platform;
608    }
609
610    pub(crate) fn set_ignore_missing(&mut self, v: bool) {
611        self.config.ignore_missing = v;
612    }
613
614    pub(crate) fn set_ignore_old_mismatch(&mut self, v: bool) {
615        self.config.ignore_old_mismatch = v;
616    }
617
618    pub(crate) fn observer_mut(&mut self) -> &mut dyn ApplyObserver {
619        &mut *self.config.observer
620    }
621
622    /// Returns `true` if cancellation is requested by either the installed
623    /// [`CancelToken`] or the observer's
624    /// [`ApplyObserver::should_cancel`]. The token is polled first; if it
625    /// fires, the observer poll is skipped.
626    pub(crate) fn cancel_requested(&mut self) -> bool {
627        if let Some(token) = self.config.cancel_token.as_ref() {
628            if token.is_cancelled() {
629                return true;
630            }
631        }
632        self.config.observer.should_cancel()
633    }
634
635    /// Flush every cached `BufWriter`, then `sync_all` the underlying handles.
636    pub fn sync_all(&mut self) -> std::io::Result<()> {
637        #[cfg(any(test, feature = "test-utils"))]
638        {
639            self.test_sync_count += 1;
640        }
641        let mut first_err: Option<std::io::Error> = None;
642        for writer in self.file_cache.values_mut() {
643            if let Err(e) = writer.sync_all_inner() {
644                first_err.get_or_insert(e);
645            }
646        }
647        match first_err {
648            Some(e) => Err(e),
649            None => Ok(()),
650        }
651    }
652
653    /// Record a chunk-boundary `checkpoint` to the installed sink.
654    pub(crate) fn record_checkpoint(&mut self, checkpoint: &Checkpoint) -> Result<()> {
655        self.config.checkpoint_sink.record(checkpoint)?;
656        match self.config.checkpoint_sink.policy() {
657            CheckpointPolicy::Flush => {
658                self.flush()?;
659            }
660            CheckpointPolicy::Fsync => {
661                self.sync_all()?;
662                self.checkpoints_since_fsync = 0;
663            }
664            CheckpointPolicy::FsyncEveryN(n) => {
665                debug_assert!(n >= 1, "FsyncEveryN(0) must be rejected at install time");
666                self.checkpoints_since_fsync = self.checkpoints_since_fsync.saturating_add(1);
667                if self.checkpoints_since_fsync >= n {
668                    self.sync_all()?;
669                    self.checkpoints_since_fsync = 0;
670                } else {
671                    self.flush()?;
672                }
673            }
674        }
675        Ok(())
676    }
677
678    /// Record an in-flight mid-DEFLATE-block `checkpoint`. No flush, no fsync.
679    pub(crate) fn record_checkpoint_mid_block(&mut self, checkpoint: &Checkpoint) -> Result<()> {
680        self.config.checkpoint_sink.record(checkpoint)?;
681        Ok(())
682    }
683
684    /// Return a writable handle to `path`, opening it via the configured
685    /// [`Vfs`] if not already cached.
686    pub(crate) fn open_cached(&mut self, path: &Path) -> std::io::Result<&mut CachedWriter> {
687        if self.file_cache.contains_key(path) {
688            return Ok(self
689                .file_cache
690                .get_mut(path)
691                .expect("contains_key returned true above"));
692        }
693        if self.file_cache.len() >= self.config.max_cached_fds {
694            self.drain_and_flush()?;
695        }
696        let handle = self.config.vfs.open_write(path)?;
697        let writer = CachedWriter {
698            inner: BufWriter::with_capacity(self.config.buffer_capacity, handle),
699        };
700        Ok(self.file_cache.entry(path.to_path_buf()).or_insert(writer))
701    }
702
703    /// Flush and remove the cached handle for `path`, if any.
704    pub(crate) fn evict_cached(&mut self, path: &Path) -> std::io::Result<()> {
705        if let Some(mut writer) = self.file_cache.remove(path) {
706            writer.flush()?;
707        }
708        Ok(())
709    }
710
711    /// Flush and drop every cached file handle.
712    pub(crate) fn clear_file_cache(&mut self) -> std::io::Result<()> {
713        self.drain_and_flush()
714    }
715
716    /// Create `path` and every missing ancestor via the configured [`Vfs`],
717    /// memoising the call.
718    pub(crate) fn ensure_dir_all(&mut self, path: &Path) -> std::io::Result<()> {
719        if self.dirs_created.contains(path) {
720            return Ok(());
721        }
722        self.config.vfs.create_dir_all(path)?;
723        self.dirs_created.insert(path.to_path_buf());
724        Ok(())
725    }
726
727    /// Drop every memoised entry in the created-directories set.
728    pub(crate) fn invalidate_dirs_created(&mut self) {
729        self.dirs_created.clear();
730    }
731
732    /// Drop every memoised entry in the `SqPack` path cache.
733    pub(crate) fn invalidate_path_cache(&mut self) {
734        self.path_cache.clear();
735    }
736
737    fn drain_and_flush(&mut self) -> std::io::Result<()> {
738        let mut first_err: Option<std::io::Error> = None;
739        for (_, mut writer) in self.file_cache.drain() {
740            if let Err(e) = writer.flush() {
741                first_err.get_or_insert(e);
742            }
743        }
744        match first_err {
745            Some(e) => Err(e),
746            None => Ok(()),
747        }
748    }
749
750    /// Flush every buffered write through to the underlying [`Vfs`] handle.
751    pub fn flush(&mut self) -> std::io::Result<()> {
752        #[cfg(any(test, feature = "test-utils"))]
753        {
754            self.test_flush_count += 1;
755        }
756        let mut first_err: Option<std::io::Error> = None;
757        for writer in self.file_cache.values_mut() {
758            if let Err(e) = writer.flush() {
759                first_err.get_or_insert(e);
760            }
761        }
762        match first_err {
763            Some(e) => Err(e),
764            None => Ok(()),
765        }
766    }
767}
768
769impl Chunk {
770    /// Apply this chunk to `session`.
771    pub fn apply(&self, session: &mut ApplySession) -> Result<()> {
772        match self {
773            Chunk::FileHeader(_) | Chunk::ApplyFreeSpace(_) | Chunk::EndOfFile => Ok(()),
774            Chunk::Sqpk(c) => c.apply(session),
775            Chunk::ApplyOption(c) => apply_option(c, session),
776            Chunk::AddDirectory(c) => apply_add_directory(c, session),
777            Chunk::DeleteDirectory(c) => apply_delete_directory(c, session),
778        }
779    }
780}
781
782#[allow(clippy::unnecessary_wraps)]
783pub(crate) fn apply_option(opt: &ApplyOption, session: &mut ApplySession) -> Result<()> {
784    trace!(kind = ?opt.kind, value = opt.value, "apply option");
785    match opt.kind {
786        ApplyOptionKind::IgnoreMissing => session.set_ignore_missing(opt.value),
787        ApplyOptionKind::IgnoreOldMismatch => session.set_ignore_old_mismatch(opt.value),
788    }
789    Ok(())
790}
791
792pub(crate) fn apply_add_directory(c: &AddDirectory, session: &mut ApplySession) -> Result<()> {
793    trace!(name = %c.name, "create directory");
794    let path = session.game_path().join(&c.name);
795    session.ensure_dir_all(&path)?;
796    Ok(())
797}
798
799pub(crate) fn apply_delete_directory(
800    c: &DeleteDirectory,
801    session: &mut ApplySession,
802) -> Result<()> {
803    let path = session.game_path().join(&c.name);
804    match session.vfs().remove_dir(&path) {
805        Ok(()) => {
806            trace!(name = %c.name, "delete directory");
807            session.invalidate_dirs_created();
808            Ok(())
809        }
810        Err(e) if e.kind() == std::io::ErrorKind::NotFound && session.ignore_missing() => {
811            warn!(name = %c.name, "delete directory: not found, ignored");
812            Ok(())
813        }
814        Err(e) => Err(e.into()),
815    }
816}
817
818#[cfg(test)]
819mod tests {
820    use super::*;
821
822    fn session(path: impl Into<PathBuf>) -> ApplySession {
823        ApplyConfig::new(path).into_session()
824    }
825
826    // --- Cache semantics ---
827
828    #[test]
829    fn cache_eviction_clears_all_entries_when_at_capacity() {
830        let tmp = tempfile::tempdir().unwrap();
831        let mut s = session(tmp.path());
832
833        for i in 0..DEFAULT_MAX_CACHED_FDS {
834            s.open_cached(&tmp.path().join(format!("{i}.dat"))).unwrap();
835        }
836        assert_eq!(s.file_cache.len(), DEFAULT_MAX_CACHED_FDS);
837
838        s.open_cached(&tmp.path().join("new.dat")).unwrap();
839        assert_eq!(s.file_cache.len(), 1);
840    }
841
842    #[test]
843    fn cache_hit_does_not_trigger_eviction_when_full() {
844        let tmp = tempfile::tempdir().unwrap();
845        let mut s = session(tmp.path());
846
847        for i in 0..DEFAULT_MAX_CACHED_FDS {
848            s.open_cached(&tmp.path().join(format!("{i}.dat"))).unwrap();
849        }
850        s.open_cached(&tmp.path().join("0.dat")).unwrap();
851        assert_eq!(s.file_cache.len(), DEFAULT_MAX_CACHED_FDS);
852    }
853
854    #[test]
855    fn evict_cached_removes_only_target_path() {
856        let tmp = tempfile::tempdir().unwrap();
857        let mut s = session(tmp.path());
858        let a = tmp.path().join("a.dat");
859        let b = tmp.path().join("b.dat");
860        s.open_cached(&a).unwrap();
861        s.open_cached(&b).unwrap();
862        assert_eq!(s.file_cache.len(), 2);
863
864        s.evict_cached(&a).unwrap();
865        assert_eq!(s.file_cache.len(), 1);
866        assert!(s.file_cache.contains_key(&b));
867    }
868
869    #[test]
870    fn evict_cached_is_noop_for_absent_path() {
871        let tmp = tempfile::tempdir().unwrap();
872        let mut s = session(tmp.path());
873        s.open_cached(&tmp.path().join("a.dat")).unwrap();
874        s.evict_cached(&tmp.path().join("nonexistent.dat")).unwrap();
875        assert_eq!(s.file_cache.len(), 1);
876    }
877
878    #[test]
879    fn clear_file_cache_removes_all_handles() {
880        let tmp = tempfile::tempdir().unwrap();
881        let mut s = session(tmp.path());
882        s.open_cached(&tmp.path().join("a.dat")).unwrap();
883        s.open_cached(&tmp.path().join("b.dat")).unwrap();
884        s.clear_file_cache().unwrap();
885        assert_eq!(s.file_cache.len(), 0);
886    }
887
888    // --- Tuning-knob builders ---
889
890    #[test]
891    fn default_max_cached_fds_matches_constant() {
892        let cfg = ApplyConfig::new("/irrelevant");
893        assert_eq!(cfg.max_cached_fds(), DEFAULT_MAX_CACHED_FDS);
894    }
895
896    #[test]
897    fn default_buffer_capacity_matches_constant() {
898        let cfg = ApplyConfig::new("/irrelevant");
899        assert_eq!(cfg.buffer_capacity(), DEFAULT_BUFFER_CAPACITY);
900    }
901
902    #[test]
903    fn with_max_cached_fds_overrides_default() {
904        let cfg = ApplyConfig::new("/irrelevant").with_max_cached_fds(16);
905        assert_eq!(cfg.max_cached_fds(), 16);
906    }
907
908    #[test]
909    fn with_buffer_capacity_overrides_default() {
910        let cfg = ApplyConfig::new("/irrelevant").with_buffer_capacity(1 << 20);
911        assert_eq!(cfg.buffer_capacity(), 1 << 20);
912    }
913
914    #[test]
915    #[should_panic(expected = "with_max_cached_fds(0) is invalid")]
916    fn with_max_cached_fds_zero_panics() {
917        let _ = ApplyConfig::new("/irrelevant").with_max_cached_fds(0);
918    }
919
920    #[test]
921    #[should_panic(expected = "with_buffer_capacity(0) is invalid")]
922    fn with_buffer_capacity_zero_panics() {
923        let _ = ApplyConfig::new("/irrelevant").with_buffer_capacity(0);
924    }
925
926    #[test]
927    fn custom_max_cached_fds_changes_eviction_threshold() {
928        let tmp = tempfile::tempdir().unwrap();
929        let cfg = ApplyConfig::new(tmp.path()).with_max_cached_fds(4);
930        let mut s = cfg.into_session();
931        for i in 0..4 {
932            s.open_cached(&tmp.path().join(format!("{i}.dat"))).unwrap();
933        }
934        assert_eq!(s.file_cache.len(), 4);
935        s.open_cached(&tmp.path().join("new.dat")).unwrap();
936        // Cache was drained on the 5th distinct open; only the new entry remains.
937        assert_eq!(s.file_cache.len(), 1);
938    }
939
940    // --- Builder accessors ---
941
942    #[test]
943    fn game_path_returns_install_root_unchanged() {
944        let tmp = tempfile::tempdir().unwrap();
945        let cfg = ApplyConfig::new(tmp.path());
946        assert_eq!(cfg.game_path(), tmp.path());
947    }
948
949    #[test]
950    fn default_platform_is_win32() {
951        let cfg = ApplyConfig::new("/irrelevant");
952        assert_eq!(cfg.platform(), Platform::Win32);
953    }
954
955    #[test]
956    fn with_platform_overrides_default() {
957        let cfg = ApplyConfig::new("/irrelevant").with_platform(Platform::Ps4);
958        assert_eq!(cfg.platform(), Platform::Ps4);
959    }
960
961    #[test]
962    fn default_ignore_missing_is_false() {
963        let cfg = ApplyConfig::new("/irrelevant");
964        assert!(!cfg.ignore_missing());
965    }
966
967    #[test]
968    fn with_ignore_missing_toggles_flag_both_ways() {
969        let cfg = ApplyConfig::new("/irrelevant").with_ignore_missing(true);
970        assert!(cfg.ignore_missing());
971        let cfg = cfg.with_ignore_missing(false);
972        assert!(!cfg.ignore_missing());
973    }
974
975    #[test]
976    fn default_ignore_old_mismatch_is_false() {
977        let cfg = ApplyConfig::new("/irrelevant");
978        assert!(!cfg.ignore_old_mismatch());
979    }
980
981    #[test]
982    fn with_ignore_old_mismatch_toggles_flag_both_ways() {
983        let cfg = ApplyConfig::new("/irrelevant").with_ignore_old_mismatch(true);
984        assert!(cfg.ignore_old_mismatch());
985        let cfg = cfg.with_ignore_old_mismatch(false);
986        assert!(!cfg.ignore_old_mismatch());
987    }
988
989    // --- BufWriter cache ---
990
991    #[test]
992    fn buffered_writes_are_invisible_before_flush() {
993        use std::io::Write;
994
995        let tmp = tempfile::tempdir().unwrap();
996        let mut s = session(tmp.path());
997        let path = tmp.path().join("buffered.dat");
998
999        let writer = s.open_cached(&path).unwrap();
1000        writer.write_all(&[0xAB]).unwrap();
1001
1002        assert_eq!(std::fs::metadata(&path).unwrap().len(), 0);
1003
1004        s.flush().unwrap();
1005        assert_eq!(std::fs::read(&path).unwrap(), vec![0xAB]);
1006    }
1007
1008    #[test]
1009    fn flush_keeps_handles_in_cache() {
1010        let tmp = tempfile::tempdir().unwrap();
1011        let mut s = session(tmp.path());
1012        s.open_cached(&tmp.path().join("a.dat")).unwrap();
1013        s.open_cached(&tmp.path().join("b.dat")).unwrap();
1014        s.flush().unwrap();
1015        assert_eq!(s.file_cache.len(), 2);
1016    }
1017
1018    #[test]
1019    fn evict_cached_flushes_pending_writes_to_disk() {
1020        use std::io::Write;
1021
1022        let tmp = tempfile::tempdir().unwrap();
1023        let mut s = session(tmp.path());
1024        let path = tmp.path().join("evict.dat");
1025
1026        let writer = s.open_cached(&path).unwrap();
1027        writer.write_all(b"queued").unwrap();
1028        assert_eq!(std::fs::metadata(&path).unwrap().len(), 0);
1029
1030        s.evict_cached(&path).unwrap();
1031        assert_eq!(std::fs::read(&path).unwrap(), b"queued");
1032        assert!(!s.file_cache.contains_key(&path));
1033    }
1034
1035    #[test]
1036    fn clear_file_cache_flushes_every_pending_write() {
1037        use std::io::Write;
1038
1039        let tmp = tempfile::tempdir().unwrap();
1040        let mut s = session(tmp.path());
1041        let a = tmp.path().join("a.dat");
1042        let b = tmp.path().join("b.dat");
1043
1044        s.open_cached(&a).unwrap().write_all(b"AA").unwrap();
1045        s.open_cached(&b).unwrap().write_all(b"BB").unwrap();
1046
1047        s.clear_file_cache().unwrap();
1048
1049        assert_eq!(std::fs::read(&a).unwrap(), b"AA");
1050        assert_eq!(std::fs::read(&b).unwrap(), b"BB");
1051        assert!(s.file_cache.is_empty());
1052    }
1053
1054    // --- Debug impl ---
1055
1056    #[test]
1057    fn apply_session_debug_renders_all_fields() {
1058        let tmp = tempfile::tempdir().unwrap();
1059        let s = ApplyConfig::new(tmp.path())
1060            .with_platform(Platform::Ps4)
1061            .with_ignore_missing(true)
1062            .into_session();
1063
1064        let rendered = format!("{s:?}");
1065        for needle in [
1066            "ApplySession",
1067            "ApplyConfig",
1068            "game_path",
1069            "platform",
1070            "Ps4",
1071            "ignore_missing",
1072            "true",
1073            "ignore_old_mismatch",
1074            "file_cache_len",
1075            "path_cache_len",
1076            "decompressor",
1077        ] {
1078            assert!(
1079                rendered.contains(needle),
1080                "Debug output must mention {needle:?}; got: {rendered}"
1081            );
1082        }
1083    }
1084
1085    // --- DeleteDirectory happy path ---
1086
1087    #[test]
1088    fn delete_directory_success_removes_existing_dir() {
1089        let tmp = tempfile::tempdir().unwrap();
1090        let target = tmp.path().join("to_remove");
1091        std::fs::create_dir(&target).unwrap();
1092
1093        let mut s = session(tmp.path());
1094        apply_delete_directory(
1095            &DeleteDirectory {
1096                name: "to_remove".into(),
1097            },
1098            &mut s,
1099        )
1100        .expect("delete on an existing directory must succeed");
1101
1102        assert!(!target.exists());
1103    }
1104
1105    // --- ensure_dir_all cache-hit branch ---
1106
1107    #[test]
1108    fn ensure_dir_all_cache_hit_returns_early_without_syscall() {
1109        let tmp = tempfile::tempdir().unwrap();
1110        let mut s = session(tmp.path());
1111
1112        let path = tmp.path().join("cached_dir");
1113        s.ensure_dir_all(&path).unwrap();
1114        assert!(path.is_dir());
1115        assert_eq!(s.dirs_created.len(), 1);
1116
1117        let p2 = tmp.path().join("cached_dir");
1118        s.ensure_dir_all(&p2).unwrap();
1119        assert_eq!(s.dirs_created.len(), 1);
1120    }
1121
1122    // --- InMemoryFs end-to-end ---
1123
1124    #[test]
1125    fn in_memory_fs_records_directory_creation() {
1126        let fs = InMemoryFs::new();
1127        let mut s = ApplyConfig::new("/g").with_vfs(fs.clone()).into_session();
1128        apply_add_directory(&AddDirectory { name: "sub".into() }, &mut s).unwrap();
1129        assert!(fs.snapshot_dirs().contains(&PathBuf::from("/g/sub")));
1130    }
1131}