znippy-compress 0.7.2

Compression logic for Znippy, a parallel chunked compression system.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
//! No-barrier slice pipeline (TODO_NOW.md "THE FINAL SOLUTION 2026").
//!
//! River model: one reader pours files into shared 200 MB slots; small files
//! coalesce bow-to-stern (1 file = 1 slice), big files are cut into slice-size
//! logs spilling across slots. All workers pull slices off ONE global queue and
//! never wait for a slot to finish. Each worker compresses (if needed), stamps
//! BLAKE3 over the ORIGINAL bytes, pwrites the payload at a reserved offset, and
//! releases the slot. The finalizer builds the arrow-ipc index from metadata
//! only — no payload ever crosses a channel.
//!
//! This is the directory-compression entry point (`compress_dir`).

use anyhow::{Result, anyhow};
use crossbeam_channel::bounded;
use std::fs::File;
use std::io::{self, BufReader, Read};
use std::os::unix::fs::FileExt;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use walkdir::WalkDir;

use znippy_common::common_config::CONFIG;
use znippy_common::index::{
    FileExtMeta, MULTI_INDEX_MAGIC, ManifestEntry, build_arrow_metadata_for_config,
    build_metadata_batch, compose_index_schema, should_skip_compression, write_manifest_bytes,
};
use znippy_common::meta::{BlobMeta, ChunkMeta};
use znippy_common::slotpool::Magazine;
use znippy_common::CompressionReport;

/// Slot buffer size. Big enough that the single reader stays ahead of the cores.
const SLOT_SIZE: usize = 200 * 1024 * 1024;
/// Read-ahead depth: number of slots the reader can fill before the cores drain.
const NUM_SLOTS: usize = 8;

/// A fired round handed from a barrel to the writer. Carries either the
/// compressed output buffer (recycled after pwrite) or, for the skip path, a
/// pointer into the slot (the writer pwrites it zero-copy, then frees the slot).
struct WriteJob {
    buf: Option<Vec<u8>>,
    ptr: *const u8,
    on_disk_len: usize,
    slot_id: u32,
    release_slot: bool,
    file_index: u64,
    fdata_offset: u64,
    chunk_seq: u32,
    checksum: [u8; 32],
    compressed: bool,
    uncompressed_size: u64,
}
// Safety: `ptr` (skip path) addresses slot memory that stays live until the
// writer calls `release_one` after the pwrite — the reader's reclaim-all barrier
// guarantees no job outlives its slot.
unsafe impl Send for WriteJob {}

/// Read until `buf` is full or EOF; returns bytes read (< buf.len() only at EOF).
fn read_fully<R: Read>(r: &mut R, buf: &mut [u8]) -> io::Result<usize> {
    let mut n = 0;
    while n < buf.len() {
        match r.read(&mut buf[n..])? {
            0 => break,
            k => n += k,
        }
    }
    Ok(n)
}

pub fn compress_dir(
    input_dir: &PathBuf,
    output: &PathBuf,
    no_skip: bool,
    plugin: Option<&znippy_common::plugin::PluginRegistry>,
    repo: Option<&str>,
) -> Result<CompressionReport> {
    let mut total_dirs = 0u64;
    let all_files: Arc<Vec<PathBuf>> = Arc::new(
        WalkDir::new(input_dir)
            .into_iter()
            .filter_map(|e| e.ok())
            .filter_map(|e| {
                if e.file_type().is_dir() {
                    total_dirs += 1;
                    None
                } else if e.file_type().is_file() {
                    Some(e.into_path())
                } else {
                    None
                }
            })
            .collect(),
    );
    let total_files = all_files.len() as u64;

    // Plugin pre-pass — per-file metadata extraction, OUTSIDE the slice pipeline.
    let ext_meta: Vec<FileExtMeta> = if let Some(reg) = plugin {
        let type_id = reg.type_id().unwrap_or(0);
        all_files
            .iter()
            .map(|path| {
                let rel = path.strip_prefix(input_dir).unwrap_or(path).to_string_lossy();
                if !reg.matches(&rel) {
                    return None;
                }
                let data = std::fs::read(path).ok()?;
                reg.extract(&rel, &data).map(|row| (type_id, row))
            })
            .collect()
    } else {
        vec![None; all_files.len()]
    };
    let ext_fields: Vec<znippy_common::arrow::datatypes::Field> =
        plugin.map(|r| r.schema_fields()).unwrap_or_default();

    let output_path = output.with_extension("znippy");
    let file = Arc::new(File::create(&output_path)?);
    let out_cursor = Arc::new(AtomicU64::new(0)); // blob region starts at 0

    let num_workers = CONFIG.max_core_in_flight.max(1);
    let pool = Magazine::new(NUM_SLOTS, SLOT_SIZE, num_workers);
    let slice_size = pool.slice_size();
    let returner = pool.returner();

    // The current (reader→barrels) and the write belt (barrels→writer).
    let (tx_slice, rx_slice) = bounded(NUM_SLOTS * 4);
    let (tx_write, rx_write) = bounded::<WriteJob>(num_workers * 4);
    // Recycled compressed-output buffers: the writer returns each after pwrite,
    // so the compress path never allocates per round.
    let (free_bufs_tx, free_bufs_rx) = bounded::<Vec<u8>>(num_workers * 4);
    for _ in 0..num_workers * 4 {
        free_bufs_tx.send(Vec::new()).ok();
    }

    // ── READER: the single source ───────────────────────────────────────────
    let reader_thread = {
        let all_files = Arc::clone(&all_files);
        thread::spawn(move || -> (u64, u64, u64, u64) {
            let mut uncompressed_files = 0u64;
            let mut uncompressed_bytes = 0u64;
            let mut compressed_files = 0u64;
            let mut compressed_bytes = 0u64;

            let mut cur = None;

            for (file_index, path) in all_files.iter().enumerate() {
                let skip = !no_skip && should_skip_compression(path);
                let file_size = path.metadata().map(|m| m.len()).unwrap_or(0);
                if skip {
                    uncompressed_files += 1;
                    uncompressed_bytes += file_size;
                } else {
                    compressed_files += 1;
                    compressed_bytes += file_size;
                }

                let f = match File::open(path) {
                    Ok(f) => f,
                    Err(e) => {
                        log::warn!("[reader] open {} failed: {}", path.display(), e);
                        continue;
                    }
                };
                let mut rdr = BufReader::new(f);
                let mut fdata_offset = 0u64;
                let mut chunk_seq = 0u32;

                // Empty file → one zero-length slice so it appears in the index.
                if file_size == 0 {
                    ensure_room(&pool, &tx_slice, &mut cur, 0);
                    let fill = cur.as_mut().unwrap();
                    fill.commit_slice(0, skip, file_index as u64, 0, 0);
                    continue;
                }

                let mut remaining = file_size;
                // Small file (≤ slice_size) is NEVER split: read it whole as one slice.
                // Big file is cut into slice_size logs (may spill across slots).
                let small = file_size <= slice_size as u64;
                while remaining > 0 {
                    let want = if small {
                        file_size as usize
                    } else {
                        slice_size.min(remaining as usize)
                    };
                    ensure_room(&pool, &tx_slice, &mut cur, want);
                    let fill = cur.as_mut().unwrap();
                    let buf = fill.writable(want);
                    let got = match read_fully(&mut rdr, buf) {
                        Ok(g) => g,
                        Err(e) => {
                            log::warn!("[reader] read {} failed: {}", path.display(), e);
                            break;
                        }
                    };
                    if got == 0 {
                        break;
                    }
                    fill.commit_slice(got, skip, file_index as u64, fdata_offset, chunk_seq);
                    fdata_offset += got as u64;
                    chunk_seq += 1;
                    remaining = remaining.saturating_sub(got as u64);
                    if got < want {
                        break; // short read = EOF
                    }
                }
            }

            // Publish the last partial slot.
            if let Some(fill) = cur.take() {
                for s in fill.publish() {
                    tx_slice.send(s).ok();
                }
            }

            // Wait for the pipeline to fully drain: reclaiming every slot proves all
            // slices were processed AND released, so the pool memory is safe to drop.
            for _ in 0..pool.num_slots() {
                if pool.claim().is_none() {
                    break;
                }
            }

            // Dropping tx_slice here lets the workers exit once the queue empties.
            drop(tx_slice);
            drop(pool);

            (uncompressed_files, uncompressed_bytes, compressed_files, compressed_bytes)
        })
    };

    // ── BARRELS: compress (or skip) and toss to the writer — never wait on I/O ─
    let mut workers = Vec::with_capacity(num_workers);
    for _ in 0..num_workers {
        let rx_slice = rx_slice.clone();
        let tx_write = tx_write.clone();
        let free_bufs_rx = free_bufs_rx.clone();
        let returner = returner.clone();
        let level = CONFIG.compression_level;
        workers.push(thread::spawn(move || -> Result<()> {
            let mut cctx = znippy_common::codec::CompressCtx::new(level)?;
            while let Ok(slice) = rx_slice.recv() {
                // Safety: slot stays live until release_one (compress: here; skip: writer).
                let src = unsafe { slice.as_slice() };
                let checksum = *blake3::hash(src).as_bytes(); // ORIGINAL bytes, pre-compression
                if slice.skip {
                    // Hand the slot bytes to the writer; it frees the slot after pwrite.
                    tx_write
                        .send(WriteJob {
                            buf: None,
                            ptr: src.as_ptr(),
                            on_disk_len: src.len(),
                            slot_id: slice.slot_id,
                            release_slot: true,
                            file_index: slice.file_index,
                            fdata_offset: slice.fdata_offset,
                            chunk_seq: slice.chunk_seq,
                            checksum,
                            compressed: false,
                            uncompressed_size: src.len() as u64,
                        })
                        .ok();
                } else {
                    let mut buf = free_bufs_rx.recv().unwrap_or_default();
                    let n = cctx.compress_into(src, &mut buf)?;
                    let uncompressed_size = src.len() as u64;
                    returner.release_one(slice.slot_id); // input consumed → free slot now
                    tx_write
                        .send(WriteJob {
                            buf: Some(buf),
                            ptr: std::ptr::null(),
                            on_disk_len: n,
                            slot_id: 0,
                            release_slot: false,
                            file_index: slice.file_index,
                            fdata_offset: slice.fdata_offset,
                            chunk_seq: slice.chunk_seq,
                            checksum,
                            compressed: true,
                            uncompressed_size,
                        })
                        .ok();
                }
            }
            Ok(())
        }));
    }

    // ── WRITER: one worker that only fires bytes at the disk ────────────────────
    let writer_thread = {
        let file = Arc::clone(&file);
        let out_cursor = Arc::clone(&out_cursor);
        let returner = returner.clone();
        thread::spawn(move || -> Result<Vec<BlobMeta>> {
            let mut all_blobs: Vec<BlobMeta> = Vec::new();
            while let Ok(job) = rx_write.recv() {
                let off = out_cursor.fetch_add(job.on_disk_len as u64, Ordering::Relaxed);
                match job.buf {
                    Some(mut buf) => {
                        file.write_all_at(&buf[..job.on_disk_len], off)?;
                        buf.clear();
                        free_bufs_tx.send(buf).ok(); // recycle for a barrel
                    }
                    None => {
                        // Safety: slot is live until we release it just below.
                        let bytes =
                            unsafe { std::slice::from_raw_parts(job.ptr, job.on_disk_len) };
                        file.write_all_at(bytes, off)?;
                        if job.release_slot {
                            returner.release_one(job.slot_id);
                        }
                    }
                }
                all_blobs.push(BlobMeta {
                    chunk_meta: ChunkMeta {
                        fdata_offset: job.fdata_offset,
                        file_index: job.file_index,
                        chunk_seq: job.chunk_seq,
                        checksum: job.checksum,
                        compressed: job.compressed,
                        uncompressed_size: job.uncompressed_size,
                        compressed_size: job.on_disk_len as u64,
                    },
                    blob_offset: off,
                    blob_size: job.on_disk_len as u64,
                });
            }
            Ok(all_blobs)
        })
    };

    // Drop main's handles so the belts close when reader/barrels/writer finish.
    // (tx_slice moved into the reader; free_bufs_tx moved into the writer.)
    drop(tx_write);
    drop(rx_slice);
    drop(free_bufs_rx);

    // ── FINALIZER (main): join, then build the index from metadata only ─────────
    let (uncompressed_files, uncompressed_bytes, compressed_files, compressed_bytes) =
        reader_thread.join().map_err(|_| anyhow!("reader panicked"))?;
    for w in workers {
        w.join().map_err(|_| anyhow!("barrel panicked"))??;
    }
    let mut all_blobs = writer_thread.join().map_err(|_| anyhow!("writer panicked"))??;

    // Stable index order (out-of-order completion is fine on disk; index is sorted).
    all_blobs.sort_by_key(|b| (b.chunk_meta.file_index, b.chunk_meta.chunk_seq));

    let total_chunks = all_blobs.len() as u64;
    let index_offset = out_cursor.load(Ordering::Relaxed); // end of blob region
    let blob_bytes = index_offset;

    let input_dir_for_paths = input_dir.clone();
    let all_files_for_paths = Arc::clone(&all_files);
    let path_resolver = |file_index: u64| {
        let idx = file_index as usize;
        all_files_for_paths[idx]
            .strip_prefix(&input_dir_for_paths)
            .unwrap_or(&all_files_for_paths[idx])
            .to_string_lossy()
            .to_string()
    };

    use arrow::ipc::writer::StreamWriter;
    let row_count = all_blobs.len() as u64;
    let batch = build_metadata_batch(&all_blobs, path_resolver, &ext_meta, &ext_fields)
        .map_err(|e| anyhow!("build index batch: {e}"))?;
    let meta_map = build_arrow_metadata_for_config(&CONFIG);
    let composed = compose_index_schema(&ext_fields);
    let schema_with_meta =
        arrow::datatypes::Schema::new_with_metadata(composed.fields().to_vec(), meta_map);
    let mut sub_bytes: Vec<u8> = Vec::new();
    let mut sw = StreamWriter::try_new(&mut sub_bytes, &schema_with_meta)
        .map_err(|e| anyhow!("index writer: {e}"))?;
    sw.write(&batch).map_err(|e| anyhow!("index write: {e}"))?;
    sw.finish().map_err(|e| anyhow!("index finish: {e}"))?;

    let sub_len = sub_bytes.len() as u64;
    file.write_all_at(&sub_bytes, index_offset)?;

    let manifest_offset = index_offset + sub_len;
    let pkg_type_val: i8 = plugin.and_then(|r| r.type_id()).unwrap_or(0);
    let manifest_entries = vec![ManifestEntry {
        pkg_type: pkg_type_val,
        repo: repo.unwrap_or("").to_string(),
        module_name: String::new(),
        index_offset,
        index_len: sub_len,
        row_count,
    }];
    let manifest_bytes = write_manifest_bytes(&manifest_entries)
        .map_err(|e| anyhow!("manifest: {e}"))?;
    file.write_all_at(&manifest_bytes, manifest_offset)?;

    let after_manifest = manifest_offset + manifest_bytes.len() as u64;
    file.write_all_at(&MULTI_INDEX_MAGIC, after_manifest)?;
    file.write_all_at(
        &manifest_offset.to_le_bytes(),
        after_manifest + MULTI_INDEX_MAGIC.len() as u64,
    )?;
    file.sync_all()?;

    let total_bytes_out =
        after_manifest + MULTI_INDEX_MAGIC.len() as u64 + 8;

    log::info!(
        "[slot_packer] {} chunks, {} blob bytes, manifest at {}",
        total_chunks, blob_bytes, manifest_offset
    );

    Ok(CompressionReport {
        total_files,
        compressed_files,
        uncompressed_files,
        chunks: total_chunks,
        total_dirs,
        total_bytes_in: compressed_bytes + uncompressed_bytes,
        total_bytes_out,
        compressed_bytes,
        uncompressed_bytes,
        compression_ratio: if uncompressed_bytes > 0 {
            (compressed_bytes as f32 / blob_bytes.max(1) as f32) * 100.0
        } else {
            0.0
        },
    })
}

/// Ensure `cur` is a claimed slot with at least `need` bytes free; publish &
/// re-claim when full. `need` is always ≤ SLOT_SIZE, so a fresh slot fits it.
fn ensure_room<'p>(
    pool: &'p Magazine,
    tx_slice: &crossbeam_channel::Sender<znippy_common::slotpool::Round>,
    cur: &mut Option<znippy_common::slotpool::Clip<'p>>,
    need: usize,
) {
    loop {
        if cur.is_none() {
            *cur = pool.claim();
            if cur.is_none() {
                return; // pool shut down
            }
        }
        if cur.as_ref().unwrap().remaining() >= need {
            return;
        }
        let slices = cur.take().unwrap().publish();
        for s in slices {
            tx_slice.send(s).ok();
        }
    }
}