Skip to main content

zipatch_rs/index/
source.rs

1//! [`PatchSource`] trait and built-in implementations.
2//!
3//! [`PatchSource`] is the byte-fetching seam between a [`crate::index::Plan`]
4//! and the patch files the plan was built from. [`IndexApplier`](crate::index::IndexApplier)
5//! never touches the network or owns a file handle — it calls
6//! [`PatchSource::read`] with a `(patch, offset)` tuple and an exact-size
7//! buffer, and the caller decides how those reads are served (local files,
8//! in-memory cache, HTTP multi-range request, …).
9//!
10//! # Chain semantics
11//!
12//! `patch: PatchIndex` indexes into [`crate::index::Plan::patches`]. A single-patch
13//! plan uses index `0`; multi-patch chains use successive indices in the
14//! order patches were fed to [`crate::index::PlanBuilder::add_patch`]. The
15//! built-in sources store one file/buffer per patch and reject out-of-range
16//! indices with [`crate::IndexError::PatchIndexOutOfRange`].
17//!
18//! # Caller-fills-buffer
19//!
20//! The destination buffer is supplied by the applier at the exact length it
21//! needs (every region carries its length). Returning bytes via `&[u8]` would
22//! force either an extra copy or a `Cow` that allocates for compressed
23//! regions; this shape composes cleanly with HTTP multi-range responses and
24//! pre-allocated scratch buffers without either compromise.
25//!
26//! # Built-in implementations
27//!
28//! - [`FilePatchSource`] — wraps one or more owned [`std::fs::File`]s,
29//!   single-threaded. Use [`FilePatchSource::open`] for one patch or
30//!   [`FilePatchSource::open_chain`] for many. Callers wanting concurrent
31//!   reads should construct one source per thread.
32//! - `MemoryPatchSource` (test-only) — wraps one or more `Vec<u8>`s.
33//!   Gated behind the `test-utils` feature and exposed publicly only via
34//!   [`crate::test_utils::MemoryPatchSource`].
35
36use crate::newtypes::PatchIndex;
37use crate::{IndexError, IndexResult as Result};
38use std::fs::File;
39use std::io::{Read, Seek, SeekFrom};
40use std::path::Path;
41
42/// Source of patch-file bytes for an [`IndexApplier`](crate::index::IndexApplier).
43///
44/// Implementations must fill `dst` completely with `dst.len()` source bytes
45/// starting at `offset` in the patch identified by `patch`. A short read —
46/// fewer than `dst.len()` bytes available — must surface as
47/// [`IndexError::PatchSourceTooShort`], not as a partial fill. An
48/// out-of-range `patch` must surface as
49/// [`IndexError::PatchIndexOutOfRange`].
50///
51/// # Async usage
52///
53/// `PatchSource` is intentionally synchronous. The indexed-apply driver
54/// is dominated by DEFLATE decompression and filesystem syscalls — both
55/// fundamentally blocking workloads — and keeping the trait sync lets the
56/// crate stay trivially embeddable in either a sync binary or an async
57/// runtime without dragging a runtime dependency into every consumer.
58///
59/// Async callers (e.g. a tokio-based launcher) drive the apply by parking
60/// the whole [`IndexApplier::execute`](crate::index::IndexApplier::execute)
61/// call on a blocking-pool thread:
62///
63/// ```ignore
64/// // pseudo-code; the crate has no tokio dependency
65/// let result = tokio::task::spawn_blocking(move || {
66///     applier.execute(&mut my_patch_source)
67/// }).await?;
68/// ```
69///
70/// Implementors whose backing storage is itself async (e.g. an
71/// in-progress download from an `axum` / `reqwest` task) can satisfy the
72/// sync trait by blocking on a channel against a separate async task
73/// that performs the I/O: each [`PatchSource::read`] call sends a
74/// `(patch, offset, len)` request to the async side and blocks on the
75/// response. Because `read` is called from inside the `spawn_blocking`
76/// thread the driver lives on, blocking on a channel does not stall the
77/// runtime's reactor.
78pub trait PatchSource {
79    /// Fill `dst` with `dst.len()` source bytes starting at `offset` in the
80    /// patch indexed by `patch`.
81    ///
82    /// # Errors
83    ///
84    /// - [`IndexError::PatchIndexOutOfRange`] — `patch` is `>= count`.
85    /// - [`IndexError::PatchSourceTooShort`] — the source has fewer than
86    ///   `dst.len()` bytes at `offset` in patch `patch`.
87    /// - [`IndexError::Io`] — underlying I/O failure (only meaningful for
88    ///   sources backed by real I/O, e.g. [`FilePatchSource`]).
89    fn read(&mut self, patch: PatchIndex, offset: u64, dst: &mut [u8]) -> Result<()>;
90}
91
92/// [`PatchSource`] backed by one or more owned [`std::fs::File`]s, one per
93/// patch in the chain.
94///
95/// Single-threaded — no internal `Mutex`. Callers that need concurrent reads
96/// should construct one source per thread.
97#[derive(Debug)]
98pub struct FilePatchSource {
99    files: Vec<File>,
100}
101
102impl FilePatchSource {
103    /// Open a single patch file and wrap it as a one-patch chain.
104    ///
105    /// Equivalent to [`FilePatchSource::open_chain`] with a one-element iterator.
106    ///
107    /// # Errors
108    ///
109    /// Returns [`IndexError::Io`] if the file cannot be opened.
110    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
111        let file = File::open(path)?;
112        Ok(Self { files: vec![file] })
113    }
114
115    /// Open every patch file in `paths` and wrap them as a chain. The first
116    /// path becomes patch index `0`, the second index `1`, and so on, matching
117    /// the order [`crate::index::PlanBuilder::add_patch`] consumed them.
118    ///
119    /// # Errors
120    ///
121    /// Returns [`IndexError::Io`] on the first path that fails to open.
122    pub fn open_chain<I, P>(paths: I) -> Result<Self>
123    where
124        I: IntoIterator<Item = P>,
125        P: AsRef<Path>,
126    {
127        let iter = paths.into_iter();
128        // `IntoIterator` doesn't promise `ExactSizeIterator`, but `Vec` /
129        // `&[P]` inputs (the common case) do report an accurate lower bound
130        // through `size_hint().0` and so skip the geometric regrow.
131        let mut files = Vec::with_capacity(iter.size_hint().0);
132        for p in iter {
133            files.push(File::open(p).map_err(|e| IndexError::Io {
134                path: None,
135                source: e,
136            })?);
137        }
138        Ok(Self { files })
139    }
140
141    /// Wrap an already-open [`File`] as a one-patch chain.
142    #[must_use]
143    pub fn from_file(file: File) -> Self {
144        Self { files: vec![file] }
145    }
146
147    /// Number of patch files in the chain.
148    #[must_use]
149    pub fn patch_count(&self) -> usize {
150        self.files.len()
151    }
152}
153
154impl PatchSource for FilePatchSource {
155    fn read(&mut self, patch: PatchIndex, offset: u64, dst: &mut [u8]) -> Result<()> {
156        let count = self.files.len();
157        let file = self
158            .files
159            .get_mut(patch.get() as usize)
160            .ok_or(IndexError::PatchIndexOutOfRange { patch, count })?;
161        file.seek(SeekFrom::Start(offset))?;
162        match file.read_exact(dst) {
163            Ok(()) => Ok(()),
164            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
165                Err(IndexError::PatchSourceTooShort {
166                    offset,
167                    requested: dst.len(),
168                })
169            }
170            Err(e) => Err(IndexError::Io {
171                source: e,
172                path: None,
173            }),
174        }
175    }
176}
177
178/// [`PatchSource`] backed by one or more in-memory byte buffers, one per
179/// patch in the chain.
180///
181/// Test-only — gated behind the `test-utils` feature and exposed publicly
182/// only via [`crate::test_utils::MemoryPatchSource`]. Not part of the stable
183/// API.
184#[cfg(any(test, feature = "test-utils"))]
185#[doc(hidden)]
186#[derive(Debug, Clone)]
187pub struct MemoryPatchSource {
188    bufs: Vec<std::sync::Arc<[u8]>>,
189}
190
191#[cfg(any(test, feature = "test-utils"))]
192impl MemoryPatchSource {
193    /// Wrap a single `Vec<u8>` as a one-patch chain.
194    #[must_use]
195    pub fn new(buf: Vec<u8>) -> Self {
196        Self {
197            bufs: vec![buf.into()],
198        }
199    }
200
201    /// Wrap a single byte slice as a one-patch chain (copying it).
202    #[must_use]
203    pub fn from_slice(buf: &[u8]) -> Self {
204        Self {
205            bufs: vec![Vec::from(buf).into()],
206        }
207    }
208
209    /// Wrap multiple `Vec<u8>`s as a multi-patch chain.
210    #[must_use]
211    pub fn new_chain(bufs: Vec<Vec<u8>>) -> Self {
212        Self {
213            bufs: bufs.into_iter().map(Into::into).collect(),
214        }
215    }
216
217    /// Wrap multiple byte slices as a multi-patch chain (copying each).
218    #[must_use]
219    pub fn from_slices(bufs: &[&[u8]]) -> Self {
220        Self {
221            bufs: bufs.iter().map(|b| Vec::from(*b).into()).collect(),
222        }
223    }
224
225    /// Number of patches in the chain.
226    #[must_use]
227    pub fn patch_count(&self) -> usize {
228        self.bufs.len()
229    }
230}
231
232#[cfg(any(test, feature = "test-utils"))]
233impl PatchSource for MemoryPatchSource {
234    fn read(&mut self, patch: PatchIndex, offset: u64, dst: &mut [u8]) -> Result<()> {
235        let count = self.bufs.len();
236        let buf = self
237            .bufs
238            .get(patch.get() as usize)
239            .ok_or(IndexError::PatchIndexOutOfRange { patch, count })?;
240        let start = usize::try_from(offset).map_err(|_| IndexError::PatchSourceTooShort {
241            offset,
242            requested: dst.len(),
243        })?;
244        let end = start
245            .checked_add(dst.len())
246            .ok_or(IndexError::PatchSourceTooShort {
247                offset,
248                requested: dst.len(),
249            })?;
250        if end > buf.len() {
251            return Err(IndexError::PatchSourceTooShort {
252                offset,
253                requested: dst.len(),
254            });
255        }
256        dst.copy_from_slice(&buf[start..end]);
257        Ok(())
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    #[test]
266    fn memory_source_round_trips_arbitrary_ranges() {
267        let bytes: Vec<u8> = (0..=255u8).collect();
268        let mut src = MemoryPatchSource::new(bytes.clone());
269
270        let mut head = [0u8; 16];
271        src.read(PatchIndex::new(0), 0, &mut head).unwrap();
272        assert_eq!(&head, &bytes[..16]);
273
274        let mut mid = [0u8; 32];
275        src.read(PatchIndex::new(0), 100, &mut mid).unwrap();
276        assert_eq!(&mid, &bytes[100..132]);
277
278        let mut tail = [0u8; 16];
279        src.read(PatchIndex::new(0), 240, &mut tail).unwrap();
280        assert_eq!(&tail, &bytes[240..256]);
281
282        // Zero-length read at any in-range offset is a no-op success.
283        let mut empty = [0u8; 0];
284        src.read(PatchIndex::new(0), 0, &mut empty).unwrap();
285        src.read(PatchIndex::new(0), 256, &mut empty).unwrap();
286    }
287
288    #[test]
289    fn memory_source_out_of_range_returns_too_short() {
290        let mut src = MemoryPatchSource::new(vec![0u8; 16]);
291        let mut buf = [0u8; 4];
292
293        let err = src
294            .read(PatchIndex::new(0), 15, &mut buf)
295            .expect_err("read past end must fail");
296        match err {
297            IndexError::PatchSourceTooShort { offset, requested } => {
298                assert_eq!(offset, 15);
299                assert_eq!(requested, 4);
300            }
301            other => panic!("expected PatchSourceTooShort, got {other:?}"),
302        }
303
304        let err = src
305            .read(PatchIndex::new(0), 1_000_000, &mut buf)
306            .expect_err("read far past end must fail");
307        assert!(matches!(err, IndexError::PatchSourceTooShort { .. }));
308    }
309
310    #[test]
311    fn memory_source_chain_indexes_each_patch() {
312        // Two patches: patch 0 = [0xA0, 0xA1, ...], patch 1 = [0xB0, 0xB1, ...].
313        let p0: Vec<u8> = (0..16u8).map(|i| 0xA0 | i).collect();
314        let p1: Vec<u8> = (0..16u8).map(|i| 0xB0 | i).collect();
315        let mut src = MemoryPatchSource::new_chain(vec![p0.clone(), p1.clone()]);
316
317        let mut buf = [0u8; 4];
318        src.read(PatchIndex::new(0), 0, &mut buf).unwrap();
319        assert_eq!(&buf, &p0[..4]);
320        src.read(PatchIndex::new(1), 0, &mut buf).unwrap();
321        assert_eq!(&buf, &p1[..4]);
322        src.read(PatchIndex::new(0), 12, &mut buf).unwrap();
323        assert_eq!(&buf, &p0[12..16]);
324    }
325
326    #[test]
327    fn memory_source_chain_rejects_out_of_range_patch() {
328        let mut src = MemoryPatchSource::new_chain(vec![vec![0u8; 16]]);
329        let mut buf = [0u8; 4];
330        let err = src
331            .read(PatchIndex::new(1), 0, &mut buf)
332            .expect_err("patch 1 must be out of range");
333        match err {
334            IndexError::PatchIndexOutOfRange { patch, count } => {
335                assert_eq!(patch, PatchIndex::new(1));
336                assert_eq!(count, 1);
337            }
338            other => panic!("expected PatchIndexOutOfRange, got {other:?}"),
339        }
340    }
341
342    #[test]
343    fn file_source_round_trips_arbitrary_ranges() {
344        let bytes: Vec<u8> = (0..=255u8).collect();
345        let tmp = tempfile::tempdir().unwrap();
346        let path = tmp.path().join("source.bin");
347        std::fs::write(&path, &bytes).unwrap();
348
349        let mut src = FilePatchSource::open(&path).unwrap();
350
351        let mut head = [0u8; 16];
352        src.read(PatchIndex::new(0), 0, &mut head).unwrap();
353        assert_eq!(&head, &bytes[..16]);
354
355        let mut mid = [0u8; 32];
356        src.read(PatchIndex::new(0), 100, &mut mid).unwrap();
357        assert_eq!(&mid, &bytes[100..132]);
358    }
359
360    #[test]
361    fn file_source_short_returns_too_short() {
362        let tmp = tempfile::tempdir().unwrap();
363        let path = tmp.path().join("source.bin");
364        std::fs::write(&path, [0u8; 16]).unwrap();
365
366        let mut src = FilePatchSource::open(&path).unwrap();
367        let mut buf = [0u8; 32];
368        let err = src
369            .read(PatchIndex::new(0), 0, &mut buf)
370            .expect_err("read past end must fail");
371        assert!(matches!(err, IndexError::PatchSourceTooShort { .. }));
372    }
373
374    #[test]
375    fn file_source_chain_indexes_each_file() {
376        let tmp = tempfile::tempdir().unwrap();
377        let p0 = tmp.path().join("p0.bin");
378        let p1 = tmp.path().join("p1.bin");
379        std::fs::write(&p0, b"AAAAAAAA").unwrap();
380        std::fs::write(&p1, b"BBBBBBBB").unwrap();
381
382        let mut src = FilePatchSource::open_chain([&p0, &p1]).unwrap();
383        assert_eq!(src.patch_count(), 2);
384
385        let mut buf = [0u8; 4];
386        src.read(PatchIndex::new(0), 0, &mut buf).unwrap();
387        assert_eq!(&buf, b"AAAA");
388        src.read(PatchIndex::new(1), 4, &mut buf).unwrap();
389        assert_eq!(&buf, b"BBBB");
390    }
391}