znippy-common 0.9.7

Core logic and data structures for Znippy, a parallel chunked compression system.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
use anyhow::Result;
use arrow_array::{BooleanArray, FixedSizeBinaryArray, StringArray, UInt64Array};
use std::{
    collections::{HashMap, HashSet},
    fs::{File, OpenOptions},
    os::unix::fs::FileExt,
    path::{Path, PathBuf},
    sync::{
        Arc,
        atomic::{AtomicUsize, Ordering},
    },
    thread,
};

use crate::{
    common_config::CONFIG,
    index::VerifyReport,
};

/// Join an archive-relative path onto `out_dir`, rejecting any path that is
/// absolute or escapes `out_dir` via `..`. Returns the normalized destination,
/// guaranteed to stay lexically within `out_dir`.
///
/// The relative path is read straight from an untrusted archive index, so this
/// is the zip-slip / path-traversal gate: a malicious `..` or absolute path
/// must never cause a write outside the extraction root.
fn safe_output_path(out_dir: &Path, rel_path: &str) -> Result<PathBuf> {
    use std::path::Component;
    let mut normalized = PathBuf::new();
    for comp in Path::new(rel_path).components() {
        match comp {
            Component::Normal(c) => normalized.push(c),
            Component::CurDir => {}
            Component::ParentDir => {
                anyhow::bail!("unsafe archive path escapes output dir: {:?}", rel_path)
            }
            Component::RootDir | Component::Prefix(_) => {
                anyhow::bail!("unsafe absolute archive path: {:?}", rel_path)
            }
        }
    }
    anyhow::ensure!(!normalized.as_os_str().is_empty(), "empty archive path");
    Ok(out_dir.join(normalized))
}

/// Per-worker accumulator, merged on join.
#[derive(Default)]
struct WorkerStats {
    total_chunks: u64,
    total_written_bytes: u64,
    verified_bytes: u64,
    corrupt_bytes: u64,
    corrupt_rows: Vec<u64>,
}

/// Decompress (and optionally extract) a znippy archive.
///
/// No-coordination read pipeline: every chunk in the index is independent work.
/// `num_workers` threads share ONE atomic row cursor — each grabs the next row,
/// `pread`s its blob straight from the archive (positioned I/O, no shared seek),
/// decompresses-or-skips into reusable buffers, blake3-verifies against the
/// per-chunk checksum, and `pwrite`s to its output file at `fdata_offset`.
/// Reads and writes are all positioned, so there is no reader/writer bottleneck
/// and no shared mutable state beyond the lock-free cursor.
pub fn decompress_archive(
    index_path: &Path,
    save_data: bool,
    out_dir: &Path,
) -> Result<VerifyReport> {
    decompress_archive_filtered(index_path, save_data, out_dir, &crate::index::IndexFilter::default())
}

/// Like [`decompress_archive`] but extracts only the files whose `(pkg_type,
/// repo)` sub-index matches `filter` — a selective extract. An empty filter is
/// equivalent to [`decompress_archive`].
pub fn decompress_archive_filtered(
    index_path: &Path,
    save_data: bool,
    out_dir: &Path,
    filter: &crate::index::IndexFilter,
) -> Result<VerifyReport> {
    let (schema, batches) = crate::index::read_znippy_index_filtered(index_path, filter)?;

    let batch = Arc::new(match batches.len() {
        0 => arrow::record_batch::RecordBatch::new_empty(Arc::new(
            crate::index::ZNIPPY_INDEX_SCHEMA.as_ref().clone(),
        )),
        1 => batches.into_iter().next().unwrap(),
        _ => arrow_select::concat::concat_batches(&schema, batches.iter())
            .map_err(|e| anyhow::anyhow!("failed to merge index batches: {}", e))?,
    });

    let total_rows = batch.num_rows();

    let paths_col = batch
        .column_by_name("relative_path")
        .unwrap()
        .as_any()
        .downcast_ref::<StringArray>()
        .unwrap();

    let mut unique_files = HashSet::new();
    for i in 0..total_rows {
        unique_files.insert(paths_col.value(i));
    }
    let total_files = unique_files.len();
    drop(unique_files);

    // Pre-create output files, indexed by row for O(1) lock-free lookup in
    // workers. Rows sharing a path share one Arc<File> (pwrite is positioned, so
    // many threads can write disjoint regions of the same file concurrently).
    let output_files: Arc<Vec<Option<Arc<File>>>> = if save_data {
        let mut path_to_file: HashMap<&str, Arc<File>> = HashMap::new();
        let mut created_dirs: HashSet<PathBuf> = HashSet::new();
        let mut files: Vec<Option<Arc<File>>> = Vec::with_capacity(total_rows);
        for i in 0..total_rows {
            let rel_path = paths_col.value(i);
            let f = match path_to_file.get(rel_path) {
                Some(f) => Arc::clone(f),
                None => {
                    // Reject zip-slip / absolute paths before touching the FS —
                    // on any unsafe entry we return Err and extract nothing.
                    let full = safe_output_path(out_dir, rel_path)?;
                    if let Some(parent) = full.parent() {
                        if created_dirs.insert(parent.to_path_buf()) {
                            std::fs::create_dir_all(parent).map_err(|e| {
                                anyhow::anyhow!(
                                    "failed to create output dir {}: {}",
                                    parent.display(),
                                    e
                                )
                            })?;
                        }
                    }
                    let file = Arc::new(
                        OpenOptions::new()
                            .create(true)
                            .write(true)
                            .truncate(true)
                            .open(&full)
                            .map_err(|e| {
                                anyhow::anyhow!(
                                    "failed to open output file {}: {}",
                                    full.display(),
                                    e
                                )
                            })?,
                    );
                    path_to_file.insert(rel_path, Arc::clone(&file));
                    file
                }
            };
            files.push(Some(f));
        }
        Arc::new(files)
    } else {
        Arc::new(vec![])
    };

    let archive = Arc::new(File::open(index_path)?);
    let archive_len = archive.metadata()?.len();
    let cursor = Arc::new(AtomicUsize::new(0));
    let num_workers = (CONFIG.max_core_in_flight as usize).max(1);

    let mut handles = Vec::with_capacity(num_workers);
    for _ in 0..num_workers {
        let batch = Arc::clone(&batch);
        let archive = Arc::clone(&archive);
        let cursor = Arc::clone(&cursor);
        let output_files = Arc::clone(&output_files);
        handles.push(thread::spawn(move || -> Result<WorkerStats> {
            // Downcast each column once; per-row access is a cheap indexed read.
            let blob_offset_col = batch
                .column_by_name("blob_offset").unwrap()
                .as_any().downcast_ref::<UInt64Array>().unwrap();
            let blob_size_col = batch
                .column_by_name("blob_size").unwrap()
                .as_any().downcast_ref::<UInt64Array>().unwrap();
            let fdata_offset_col = batch
                .column_by_name("fdata_offset").unwrap()
                .as_any().downcast_ref::<UInt64Array>().unwrap();
            let compressed_col = batch
                .column_by_name("compressed").unwrap()
                .as_any().downcast_ref::<BooleanArray>().unwrap();
            let checksum_col = batch
                .column_by_name("checksum").unwrap()
                .as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap();

            let mut st = WorkerStats::default();
            let mut read_buf: Vec<u8> = Vec::new(); // reused: holds the on-disk blob
            let mut out_buf: Vec<u8> = Vec::new(); // reused: holds decompressed bytes

            loop {
                let row = cursor.fetch_add(1, Ordering::Relaxed);
                if row >= total_rows {
                    break;
                }
                st.total_chunks += 1;

                let blob_offset = blob_offset_col.value(row);
                let blob_size = blob_size_col.value(row) as usize;
                let fdata_offset = fdata_offset_col.value(row);
                let compressed = compressed_col.value(row);

                // Bounds-check the attacker-controlled offset/size against the
                // real archive length BEFORE allocating, so a malformed index
                // yields a clean Err instead of a giant zero-fill allocation
                // that aborts the process (DoS).
                if blob_size > 0 {
                    let in_bounds = blob_offset
                        .checked_add(blob_size as u64)
                        .is_some_and(|end| end <= archive_len);
                    anyhow::ensure!(
                        in_bounds,
                        "blob for row {} out of bounds (offset={}, size={}, archive_len={})",
                        row,
                        blob_offset,
                        blob_size,
                        archive_len
                    );
                }
                // pread the blob into the reusable read buffer (positioned read).
                read_buf.resize(blob_size, 0);
                if blob_size > 0 {
                    archive.read_exact_at(&mut read_buf, blob_offset).map_err(|e| {
                        anyhow::anyhow!("failed to read blob for row {}: {}", row, e)
                    })?;
                }

                // Decompress into out_buf, or use the raw blob bytes (skip path).
                let out: &[u8] = if compressed {
                    match crate::codec::decompress_into(&read_buf, &mut out_buf) {
                        Ok(_) => &out_buf,
                        Err(e) => {
                            // Count the failed chunk as corrupt so it is surfaced
                            // (never silently dropped, leaving a hole in output).
                            log::error!("[decomp] row {} error={}", row, e);
                            st.corrupt_rows.push(row as u64);
                            continue;
                        }
                    }
                } else {
                    &read_buf
                };

                let len = out.len() as u64;
                st.total_written_bytes += len;

                // Per-chunk verify: checksum is over the UNCOMPRESSED bytes.
                let computed = blake3::hash(out);
                let verified = &computed.as_bytes()[..] == checksum_col.value(row);
                if verified {
                    st.verified_bytes += len;
                } else {
                    st.corrupt_bytes += len;
                    st.corrupt_rows.push(row as u64);
                    log::error!(
                        "[verify] MISMATCH row={} expected={} got={}",
                        row,
                        hex::encode(checksum_col.value(row)),
                        hex::encode(computed.as_bytes()),
                    );
                }

                // Only write bytes that verify — never present corrupt output as
                // if it were good. A mismatch is surfaced via corrupt_rows above.
                if verified {
                    if let Some(Some(file)) = output_files.get(row) {
                        file.write_all_at(out, fdata_offset).map_err(|e| {
                            anyhow::anyhow!("failed to write output for row {}: {}", row, e)
                        })?;
                    }
                }
            }
            Ok(st)
        }));
    }

    let mut total_chunks = 0u64;
    let mut total_written_bytes = 0u64;
    let mut verified_bytes = 0u64;
    let mut corrupt_bytes = 0u64;
    let mut corrupt_rows: HashSet<u64> = HashSet::new();
    for h in handles {
        let st = match h.join() {
            Ok(res) => res?,
            Err(_) => anyhow::bail!("decompression worker thread panicked"),
        };
        total_chunks += st.total_chunks;
        total_written_bytes += st.total_written_bytes;
        verified_bytes += st.verified_bytes;
        corrupt_bytes += st.corrupt_bytes;
        for r in st.corrupt_rows {
            corrupt_rows.insert(r);
        }
    }
    let corrupt_files = corrupt_rows.len();
    let verified_files = total_files.saturating_sub(corrupt_files);

    Ok(VerifyReport {
        total_files,
        verified_files,
        corrupt_files,
        total_bytes: total_written_bytes,
        verified_bytes,
        corrupt_bytes,
        chunks: total_chunks,
    })
}

/// Random-access read of a single file from the archive by its relative path.
///
/// Uses the lookup sub-index / trie to find the file's chunks in O(log n)/O(key)
/// (falling back to an index scan for pre-lookup archives), then `pread`s each
/// chunk's blob, decompresses-or-copies it, blake3-verifies against the per-chunk
/// checksum, and reassembles the file by `fdata_offset`. Returns the file bytes,
/// or an error if `target` is not present.
pub fn get_file(archive_path: &Path, target: &str) -> Result<Vec<u8>> {
    let chunks = crate::index::locate_file(archive_path, target)?;
    anyhow::ensure!(!chunks.is_empty(), "file not found in archive: {target}");

    let archive = File::open(archive_path)?;
    let archive_len = archive.metadata()?.len();
    let total: u64 = chunks.iter().map(|c| c.fdata_offset + c.uncompressed_size).max().unwrap_or(0);
    let mut out = vec![0u8; total as usize];

    let mut read_buf: Vec<u8> = Vec::new();
    let mut dec_buf: Vec<u8> = Vec::new();
    for c in &chunks {
        // Bounds-check the attacker-controlled offset/size against the real
        // archive length BEFORE allocating, so a malformed index can't force a
        // giant zero-fill allocation that aborts the process (DoS).
        if c.blob_size > 0 {
            let in_bounds = c
                .blob_offset
                .checked_add(c.blob_size as u64)
                .is_some_and(|end| end <= archive_len);
            anyhow::ensure!(
                in_bounds,
                "blob for {target} out of bounds (offset={}, size={}, archive_len={})",
                c.blob_offset,
                c.blob_size,
                archive_len
            );
        }
        read_buf.resize(c.blob_size as usize, 0);
        if c.blob_size > 0 {
            archive.read_exact_at(&mut read_buf, c.blob_offset)?;
        }
        let bytes: &[u8] = if c.compressed {
            crate::codec::decompress_into(&read_buf, &mut dec_buf)?;
            &dec_buf
        } else {
            &read_buf
        };

        let computed = blake3::hash(bytes);
        anyhow::ensure!(
            computed.as_bytes()[..] == c.checksum[..],
            "checksum mismatch for {target} chunk {}",
            c.chunk_seq
        );

        let start = c.fdata_offset as usize;
        let end = start + bytes.len();
        anyhow::ensure!(end <= out.len(), "chunk overruns file length for {target}");
        out[start..end].copy_from_slice(bytes);
    }
    Ok(out)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn safe_output_path_accepts_normal_relative_paths() {
        let out = Path::new("/tmp/out");
        assert_eq!(
            safe_output_path(out, "a/b/c.txt").unwrap(),
            out.join("a/b/c.txt")
        );
        // `.` components are normalized away but stay inside the root.
        assert_eq!(
            safe_output_path(out, "./a/./b.txt").unwrap(),
            out.join("a/b.txt")
        );
    }

    #[test]
    fn safe_output_path_rejects_zip_slip_and_absolute_paths() {
        let out = Path::new("/tmp/out");
        // `..` traversal must be rejected (zip-slip).
        assert!(safe_output_path(out, "../evil").is_err());
        assert!(safe_output_path(out, "a/../../evil").is_err());
        assert!(safe_output_path(out, "a/b/../../../evil").is_err());
        // Absolute paths must be rejected.
        assert!(safe_output_path(out, "/etc/passwd").is_err());
        // An empty path is rejected.
        assert!(safe_output_path(out, "").is_err());
    }

    #[test]
    fn decompress_corrupt_archive_errors_not_panics() {
        // A malformed `.znippy` (random bytes) must yield a clean Err, never a
        // panic / process abort.
        let dir = tempfile::tempdir().unwrap();
        let archive = dir.path().join("bad.znippy");
        std::fs::write(&archive, b"not a real znippy archive, just junk bytes").unwrap();
        let out_dir = dir.path().join("out");
        let res = decompress_archive(&archive, true, &out_dir);
        assert!(res.is_err(), "corrupt archive should return Err");
    }
}