Skip to main content

chkpt_core/ops/
save.rs

1use crate::config::{project_id_from_path, StoreLayout};
2use crate::error::{ChkpttError, Result};
3use crate::index::{FileEntry, FileIndex};
4use crate::ops::io_order::sort_scanned_refs_for_locality;
5use crate::ops::lock::ProjectLock;
6use crate::scanner::ScannedFile;
7use crate::store::blob::{hex_to_bytes, read_or_mmap, read_path_bytes};
8use crate::store::catalog::{BlobLocation, CatalogSnapshot, ManifestEntry, MetadataCatalog};
9use crate::store::pack::{PackFinishOptions, PackWriter};
10use crate::store::snapshot::{Snapshot, SnapshotStats};
11use crate::store::tree::{EntryType, TreeEntry, TreeStore};
12use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
13use std::path::Path;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{mpsc, Arc, Mutex};
16
17use crate::ops::progress::{emit, ProgressCallback, ProgressEvent};
18
19#[derive(Default)]
20pub struct SaveOptions {
21    pub message: Option<String>,
22    pub include_deps: bool,
23    pub pack_chunk_bytes: Option<u64>,
24    pub progress: ProgressCallback,
25}
26
27#[derive(Debug)]
28pub struct SaveResult {
29    pub snapshot_id: String,
30    pub stats: SnapshotStats,
31}
32
33/// Represents a file with its blob hash after processing.
34struct ProcessedFile {
35    relative_path: String,
36    blob_hash_bytes: [u8; 16],
37    size: u64,
38    mode: u32,
39    entry_type: EntryType,
40}
41
42struct PreparedFile {
43    relative_path: String,
44    blob_hash_bytes: [u8; 16],
45    /// None if this hash was already seen (duplicate content — compression skipped)
46    compressed: Option<Vec<u8>>,
47    size: u64,
48    mode: u32,
49    mtime_secs: i64,
50    mtime_nanos: i64,
51    inode: Option<u64>,
52    entry_type: EntryType,
53}
54
55struct NewBlobRecord {
56    blob_hash: [u8; 16],
57    size: u64,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
61struct HardlinkKey {
62    device: u64,
63    inode: u64,
64}
65
66#[derive(Debug, Clone)]
67struct HardlinkPrepared {
68    blob_hash_bytes: [u8; 16],
69}
70
71const SEEN_HASH_SHARDS: usize = 64;
72const PREPARED_FILE_PIPELINE_SLOTS: usize = 64;
73struct ShardedSeenHashes {
74    shards: Vec<Mutex<HashSet<[u8; 16]>>>,
75}
76
77impl ShardedSeenHashes {
78    fn new(expected_entries: usize) -> Self {
79        let shard_count = SEEN_HASH_SHARDS.max(1);
80        let per_shard_capacity = expected_entries.div_ceil(shard_count).max(1);
81        let mut shards = Vec::with_capacity(shard_count);
82        for _ in 0..shard_count {
83            shards.push(Mutex::new(HashSet::with_capacity(per_shard_capacity)));
84        }
85        Self { shards }
86    }
87
88    #[inline]
89    fn insert(&self, hash: [u8; 16]) -> bool {
90        let shard_index = ((hash[0] as usize) << 8 | hash[1] as usize) % self.shards.len();
91        let mut shard = self.shards[shard_index].lock().unwrap();
92        shard.insert(hash)
93    }
94}
95
96/// Save a checkpoint of the workspace.
97///
98/// This is the main orchestrating function that ties together scanning,
99/// hashing, blob storage, tree building, and snapshot creation.
100pub fn save(workspace_root: &Path, options: SaveOptions) -> Result<SaveResult> {
101    // 1. Compute project_id from workspace path
102    let project_id = project_id_from_path(workspace_root);
103
104    // 2. Create StoreLayout, ensure directories exist
105    let layout = StoreLayout::new(&project_id);
106    layout.ensure_dirs()?;
107
108    // 3. Acquire project lock
109    let _lock = ProjectLock::acquire(&layout.locks_dir())?;
110    let catalog = MetadataCatalog::open(layout.catalog_path())?;
111
112    // 4. Scan workspace (respect .chkptignore)
113    let scanned_files =
114        crate::scanner::scan_workspace_with_options(workspace_root, None, options.include_deps)?;
115    emit(
116        &options.progress,
117        ProgressEvent::ScanComplete {
118            file_count: scanned_files.len() as u64,
119        },
120    );
121
122    // 5. Open/create FileIndex
123    let mut index = FileIndex::open(layout.index_path())?;
124    let cached_entries = index.entries();
125
126    // 6. Create blob store
127    let packs_dir = layout.packs_dir();
128    // Load all known blob hashes from the catalog's blob_index table.
129    // This relies on blob_index being authoritative (kept in sync by
130    // bulk_upsert_blob_locations after each pack write). If the catalog
131    // and pack store ever diverge (e.g. manual .dat deletion), a store
132    // repair operation would be needed.
133    let known_blob_hashes = catalog.all_blob_hashes()?;
134    let mut pack_writer = PackWriter::new(&packs_dir)?;
135
136    // 7. Process each scanned file: check index, hash, store blob
137    let mut processed_files = Vec::with_capacity(scanned_files.len());
138    let mut files_to_prepare = Vec::new();
139    let mut updated_entries = Vec::new();
140    let mut blob_locations_to_record = Vec::new();
141    let mut new_blob_records = Vec::new();
142    let mut total_bytes: u64 = 0;
143    let mut current_paths =
144        (!cached_entries.is_empty()).then(|| HashSet::with_capacity(scanned_files.len()));
145
146    for scanned in &scanned_files {
147        if let Some(paths) = current_paths.as_mut() {
148            paths.insert(scanned.relative_path.as_str());
149        }
150
151        if let Some(processed) =
152            cached_processed_file(scanned, cached_entries.get(&scanned.relative_path))
153        {
154            total_bytes += processed.size;
155            processed_files.push(processed);
156        } else {
157            files_to_prepare.push(scanned);
158        }
159    }
160    let removed_paths = current_paths
161        .map(|paths| {
162            cached_entries
163                .keys()
164                .filter(|path| !paths.contains(path.as_str()))
165                .cloned()
166                .collect::<Vec<_>>()
167        })
168        .unwrap_or_default();
169
170    let mut new_objects: u64 = 0;
171
172    // Sharded dedup set reduces lock contention during parallel hash+compress.
173    let seen_hashes = Arc::new(ShardedSeenHashes::new(files_to_prepare.len()));
174
175    // Prioritize on-disk locality to reduce random I/O during read+hash+compress.
176    sort_scanned_refs_for_locality(&mut files_to_prepare);
177
178    // Batch parallel: each thread processes a chunk independently, no channel overhead
179    let total_to_process = files_to_prepare.len() as u64;
180    emit(
181        &options.progress,
182        ProgressEvent::ProcessStart {
183            total: total_to_process,
184        },
185    );
186    let progress_counter = AtomicU64::new(0);
187    process_prepared_files_streaming(
188        &files_to_prepare,
189        &seen_hashes,
190        &options.progress,
191        &progress_counter,
192        total_to_process,
193        |prepared| {
194            let PreparedFile {
195                relative_path,
196                blob_hash_bytes,
197                compressed,
198                size,
199                mode,
200                mtime_secs,
201                mtime_nanos,
202                inode,
203                entry_type,
204            } = prepared;
205
206            total_bytes += size;
207            if let Some(compressed) = compressed {
208                let exists_in_store = known_blob_hashes.contains(&blob_hash_bytes);
209                if !exists_in_store {
210                    pack_writer.add_pre_compressed_bytes(blob_hash_bytes, compressed)?;
211                    new_blob_records.push(NewBlobRecord {
212                        blob_hash: blob_hash_bytes,
213                        size,
214                    });
215                    new_objects += 1;
216                }
217            }
218
219            updated_entries.push(FileEntry {
220                path: relative_path.clone(),
221                blob_hash: blob_hash_bytes,
222                size,
223                mtime_secs,
224                mtime_nanos,
225                inode,
226                mode,
227            });
228            processed_files.push(ProcessedFile {
229                relative_path,
230                blob_hash_bytes,
231                size,
232                mode,
233                entry_type,
234            });
235            Ok(())
236        },
237    )?;
238    let new_pack_hash = if !pack_writer.is_empty() {
239        Some(pack_writer.finish_with_options(PackFinishOptions {
240            chunk_bytes: options.pack_chunk_bytes,
241        })?)
242    } else {
243        None
244    };
245    if let Some(pack_hash) = new_pack_hash {
246        for blob in &new_blob_records {
247            blob_locations_to_record.push((
248                blob.blob_hash,
249                BlobLocation {
250                    pack_hash: Some(pack_hash.clone()),
251                    size: blob.size,
252                },
253            ));
254        }
255    }
256    catalog.bulk_upsert_blob_locations(&blob_locations_to_record)?;
257    emit(&options.progress, ProgressEvent::PackComplete);
258
259    // 8. Build tree bottom-up (skip if nothing changed)
260    let latest_catalog_snapshot = catalog.latest_snapshot()?;
261
262    let root_tree_hash = if new_objects == 0
263        && removed_paths.is_empty()
264        && updated_entries.is_empty()
265    {
266        if let Some(ref snapshot) = latest_catalog_snapshot {
267            // Nothing changed — reuse previous tree hash (skip build entirely)
268            root_tree_hash_for_snapshot(&catalog, snapshot, &TreeStore::new(layout.trees_dir()))?
269        } else {
270            let tree_store = TreeStore::new(layout.trees_dir());
271            let hex = build_tree(&processed_files, &tree_store)?;
272            hex_to_bytes(&hex)?
273        }
274    } else {
275        let tree_store = TreeStore::new(layout.trees_dir());
276        let hex = build_tree(&processed_files, &tree_store)?;
277        hex_to_bytes(&hex)?
278    };
279
280    // 9. Find latest snapshot for parent_snapshot_id (already fetched above)
281    let parent_snapshot_id = latest_catalog_snapshot
282        .as_ref()
283        .map(|snapshot| snapshot.id.clone());
284
285    // 10. Create Snapshot
286    let stats = SnapshotStats {
287        total_files: scanned_files.len() as u64,
288        total_bytes,
289        new_objects,
290    };
291
292    let snapshot = Snapshot::new(
293        options.message,
294        root_tree_hash,
295        parent_snapshot_id,
296        stats.clone(),
297    );
298
299    let snapshot_id = snapshot.id.clone();
300    let catalog_snapshot = CatalogSnapshot {
301        id: snapshot.id.clone(),
302        created_at: snapshot.created_at,
303        message: snapshot.message.clone(),
304        parent_snapshot_id: snapshot.parent_snapshot_id.clone(),
305        manifest_snapshot_id: None,
306        root_tree_hash: Some(root_tree_hash),
307        stats: stats.clone(),
308    };
309    let no_manifest_changes =
310        new_objects == 0 && removed_paths.is_empty() && updated_entries.is_empty();
311
312    // 11. Save snapshot
313    if no_manifest_changes {
314        let manifest_snapshot_id = latest_catalog_snapshot
315            .as_ref()
316            .map(|snapshot| {
317                snapshot
318                    .manifest_snapshot_id
319                    .as_deref()
320                    .unwrap_or(snapshot.id.as_str())
321            })
322            .unwrap_or(snapshot.id.as_str());
323        catalog.insert_snapshot_metadata_only(&catalog_snapshot, manifest_snapshot_id)?;
324    } else {
325        let mut manifest: Vec<ManifestEntry> = processed_files
326            .iter()
327            .map(|processed| ManifestEntry {
328                path: processed.relative_path.clone(),
329                blob_hash: processed.blob_hash_bytes,
330                size: processed.size,
331                mode: processed.mode,
332            })
333            .collect();
334        manifest.sort_unstable_by(|left, right| left.path.cmp(&right.path));
335        catalog.insert_snapshot(&catalog_snapshot, &manifest)?;
336    }
337
338    // 12. Update only changed index entries and remove stale paths.
339    index.apply_changes(&removed_paths, &updated_entries)?;
340
341    // 13. Lock released automatically via drop
342
343    // 14. Return SaveResult
344    Ok(SaveResult { snapshot_id, stats })
345}
346
347fn root_tree_hash_for_snapshot(
348    catalog: &MetadataCatalog,
349    snapshot: &CatalogSnapshot,
350    tree_store: &TreeStore,
351) -> Result<[u8; 16]> {
352    if let Some(root_tree_hash) = snapshot.root_tree_hash {
353        return Ok(root_tree_hash);
354    }
355
356    let manifest = catalog.snapshot_manifest(&snapshot.id)?;
357    if manifest.is_empty() && snapshot.stats.total_files > 0 {
358        return Err(ChkpttError::StoreCorrupted(format!(
359            "snapshot '{}' is missing both manifest entries and root_tree_hash",
360            snapshot.id
361        )));
362    }
363    let root_tree_hex = build_tree(
364        &manifest
365            .into_iter()
366            .map(|entry| ProcessedFile {
367                relative_path: entry.path,
368                blob_hash_bytes: entry.blob_hash,
369                size: entry.size,
370                mode: entry.mode,
371                entry_type: if entry.mode & 0o170000 == 0o120000 {
372                    EntryType::Symlink
373                } else {
374                    EntryType::File
375                },
376            })
377            .collect::<Vec<_>>(),
378        tree_store,
379    )?;
380    hex_to_bytes(&root_tree_hex)
381}
382
383fn cached_processed_file(
384    scanned: &ScannedFile,
385    cached: Option<&FileEntry>,
386) -> Option<ProcessedFile> {
387    if let Some(cached) = cached {
388        if cached.mtime_secs == scanned.mtime_secs
389            && cached.mtime_nanos == scanned.mtime_nanos
390            && cached.size == scanned.size
391            && cached.inode == scanned.inode
392            && cached.mode == scanned.mode
393        {
394            return Some(ProcessedFile {
395                relative_path: scanned.relative_path.clone(),
396                blob_hash_bytes: cached.blob_hash,
397                size: scanned.size,
398                mode: scanned.mode,
399                entry_type: if scanned.is_symlink {
400                    EntryType::Symlink
401                } else {
402                    EntryType::File
403                },
404            });
405        }
406    }
407    None
408}
409
410fn prepare_file(scanned: &ScannedFile, seen_hashes: &ShardedSeenHashes) -> Result<PreparedFile> {
411    let (blob_hash_bytes, compressed) = if scanned.is_symlink {
412        let content = read_path_bytes(&scanned.absolute_path, true)?;
413        let blob_hash_bytes = xxhash_rust::xxh3::xxh3_128(&content).to_le_bytes();
414        let is_new = seen_hashes.insert(blob_hash_bytes);
415        let compressed = if is_new {
416            Some(compress_with_worker_context(&content))
417        } else {
418            None
419        };
420        (blob_hash_bytes, compressed)
421    } else {
422        let content = read_or_mmap(&scanned.absolute_path)?;
423        let blob_hash_bytes = xxhash_rust::xxh3::xxh3_128(content.as_ref()).to_le_bytes();
424        let is_new = seen_hashes.insert(blob_hash_bytes);
425        let compressed = if is_new {
426            Some(compress_with_worker_context(content.as_ref()))
427        } else {
428            None
429        };
430        (blob_hash_bytes, compressed)
431    };
432
433    Ok(PreparedFile {
434        relative_path: scanned.relative_path.clone(),
435        blob_hash_bytes,
436        compressed,
437        size: scanned.size,
438        mode: scanned.mode,
439        mtime_secs: scanned.mtime_secs,
440        mtime_nanos: scanned.mtime_nanos,
441        inode: scanned.inode,
442        entry_type: if scanned.is_symlink {
443            EntryType::Symlink
444        } else {
445            EntryType::File
446        },
447    })
448}
449
450fn hardlink_key(scanned: &ScannedFile) -> Option<HardlinkKey> {
451    if scanned.is_symlink {
452        return None;
453    }
454
455    Some(HardlinkKey {
456        device: scanned.device?,
457        inode: scanned.inode?,
458    })
459}
460
461fn prepare_file_with_hardlink_cache(
462    scanned: &ScannedFile,
463    seen_hashes: &ShardedSeenHashes,
464    hardlinks: &mut HashMap<HardlinkKey, HardlinkPrepared>,
465) -> Result<PreparedFile> {
466    if let Some(key) = hardlink_key(scanned) {
467        if let Some(cached) = hardlinks.get(&key) {
468            return Ok(PreparedFile {
469                relative_path: scanned.relative_path.clone(),
470                blob_hash_bytes: cached.blob_hash_bytes,
471                compressed: None,
472                size: scanned.size,
473                mode: scanned.mode,
474                mtime_secs: scanned.mtime_secs,
475                mtime_nanos: scanned.mtime_nanos,
476                inode: scanned.inode,
477                entry_type: if scanned.is_symlink {
478                    EntryType::Symlink
479                } else {
480                    EntryType::File
481                },
482            });
483        }
484
485        let prepared = prepare_file(scanned, seen_hashes)?;
486        hardlinks.insert(
487            key,
488            HardlinkPrepared {
489                blob_hash_bytes: prepared.blob_hash_bytes,
490            },
491        );
492        return Ok(prepared);
493    }
494
495    prepare_file(scanned, seen_hashes)
496}
497
498fn split_scanned_refs_preserving_hardlinks<'a>(
499    scanned_files: &'a [&'a ScannedFile],
500    worker_count: usize,
501) -> Vec<&'a [&'a ScannedFile]> {
502    if scanned_files.is_empty() {
503        return Vec::new();
504    }
505
506    let target_chunk_size = scanned_files.len().div_ceil(worker_count.max(1));
507    let mut chunks = Vec::with_capacity(worker_count.max(1));
508    let mut start = 0usize;
509
510    while start < scanned_files.len() {
511        let mut end = (start + target_chunk_size).min(scanned_files.len());
512        while end < scanned_files.len()
513            && hardlink_key(scanned_files[end - 1]) == hardlink_key(scanned_files[end])
514            && hardlink_key(scanned_files[end]).is_some()
515        {
516            end += 1;
517        }
518        chunks.push(&scanned_files[start..end]);
519        start = end;
520    }
521
522    chunks
523}
524
525fn compress_with_worker_context(content: &[u8]) -> Vec<u8> {
526    use lz4_flex::frame::FrameEncoder;
527    let mut encoder = FrameEncoder::new(Vec::new());
528    std::io::Write::write_all(&mut encoder, content).unwrap();
529    encoder.finish().unwrap()
530}
531
532/// Prepare files with a bounded producer/consumer pipeline so compressed blobs
533/// can be consumed immediately instead of accumulating for the full save.
534fn process_prepared_files_streaming<F>(
535    scanned_files: &[&ScannedFile],
536    seen_hashes: &Arc<ShardedSeenHashes>,
537    progress: &ProgressCallback,
538    progress_counter: &AtomicU64,
539    total_to_process: u64,
540    mut on_prepared: F,
541) -> Result<()>
542where
543    F: FnMut(PreparedFile) -> Result<()>,
544{
545    if scanned_files.is_empty() {
546        return Ok(());
547    }
548
549    let worker_count = std::thread::available_parallelism()
550        .map(|n| n.get())
551        .unwrap_or(1)
552        .min(scanned_files.len());
553
554    if worker_count <= 1 {
555        let mut hardlinks = HashMap::new();
556        for scanned in scanned_files {
557            let prepared = prepare_file_with_hardlink_cache(scanned, seen_hashes, &mut hardlinks)?;
558            let completed = progress_counter.fetch_add(1, Ordering::Relaxed) + 1;
559            emit(
560                progress,
561                ProgressEvent::ProcessFile {
562                    completed,
563                    total: total_to_process,
564                },
565            );
566            on_prepared(prepared)?;
567        }
568        return Ok(());
569    }
570
571    let chunks = split_scanned_refs_preserving_hardlinks(scanned_files, worker_count);
572
573    std::thread::scope(|scope| -> Result<()> {
574        let (sender, receiver) = mpsc::sync_channel::<Result<PreparedFile>>(
575            PREPARED_FILE_PIPELINE_SLOTS.max(worker_count),
576        );
577
578        let handles: Vec<_> = chunks
579            .into_iter()
580            .map(|chunk| {
581                let sender = sender.clone();
582                scope.spawn(move || {
583                    let mut hardlinks = HashMap::new();
584
585                    for scanned in chunk {
586                        let result =
587                            prepare_file_with_hardlink_cache(scanned, seen_hashes, &mut hardlinks);
588                        let completed = progress_counter.fetch_add(1, Ordering::Relaxed) + 1;
589                        emit(
590                            progress,
591                            ProgressEvent::ProcessFile {
592                                completed,
593                                total: total_to_process,
594                            },
595                        );
596                        if sender.send(result).is_err() {
597                            return;
598                        }
599                    }
600                })
601            })
602            .collect();
603        drop(sender);
604
605        let mut consumer_result = Ok(());
606        loop {
607            match receiver.recv() {
608                Ok(Ok(prepared)) => {
609                    if let Err(error) = on_prepared(prepared) {
610                        consumer_result = Err(error);
611                        break;
612                    }
613                }
614                Ok(Err(error)) => {
615                    consumer_result = Err(error);
616                    break;
617                }
618                Err(_) => break,
619            }
620        }
621        drop(receiver);
622
623        for handle in handles {
624            handle.join().unwrap();
625        }
626
627        consumer_result
628    })
629}
630
631/// Build tree structure bottom-up from processed files.
632///
633/// Groups files by their directory path, creates tree entries for each file,
634/// recursively builds subtrees for subdirectories, and returns the root tree hash.
635fn build_tree(processed_files: &[ProcessedFile], tree_store: &TreeStore) -> Result<String> {
636    // Group files by parent directory
637    let mut dir_files: BTreeMap<String, Vec<&ProcessedFile>> = BTreeMap::new();
638    let mut all_dirs: BTreeSet<String> = BTreeSet::new();
639    let mut child_dirs: BTreeMap<String, Vec<String>> = BTreeMap::new();
640    all_dirs.insert(String::new()); // root always exists
641
642    for pf in processed_files {
643        let parent = match pf.relative_path.rfind('/') {
644            Some(pos) => &pf.relative_path[..pos],
645            None => "",
646        };
647        dir_files.entry(parent.to_string()).or_default().push(pf);
648        register_directory_hierarchy(parent, &mut all_dirs, &mut child_dirs);
649    }
650
651    // Sort directories bottom-up (deepest first)
652    let mut dir_list: Vec<String> = all_dirs.into_iter().collect();
653    dir_list.sort_unstable_by(|a, b| {
654        let depth_a = if a.is_empty() {
655            0
656        } else {
657            a.matches('/').count() + 1
658        };
659        let depth_b = if b.is_empty() {
660            0
661        } else {
662            b.matches('/').count() + 1
663        };
664        depth_b.cmp(&depth_a).then_with(|| a.cmp(b))
665    });
666
667    // Phase 1: Compute all tree hashes and encoded data in memory
668    let mut dir_hashes: BTreeMap<String, String> = BTreeMap::new();
669    let mut pack_entries: Vec<(String, Vec<u8>)> = Vec::with_capacity(dir_list.len());
670    let mut known_hashes: HashSet<[u8; 16]> = HashSet::with_capacity(dir_list.len());
671
672    for dir in &dir_list {
673        let file_count = dir_files.get(dir).map(|files| files.len()).unwrap_or(0);
674        let child_count = child_dirs
675            .get(dir)
676            .map(|children| children.len())
677            .unwrap_or(0);
678        let mut entries: Vec<TreeEntry> = Vec::with_capacity(file_count + child_count);
679
680        if let Some(files) = dir_files.get(dir) {
681            for pf in files {
682                let name = match pf.relative_path.rfind('/') {
683                    Some(pos) => &pf.relative_path[pos + 1..],
684                    None => &pf.relative_path,
685                };
686                entries.push(TreeEntry {
687                    name: name.to_string(),
688                    entry_type: pf.entry_type,
689                    hash: pf.blob_hash_bytes,
690                    size: pf.size,
691                    mode: pf.mode,
692                });
693            }
694        }
695
696        if let Some(children) = child_dirs.get(dir) {
697            for sub_dir in children {
698                let sub_hash = dir_hashes.get(sub_dir).ok_or_else(|| {
699                    ChkpttError::Other(format!("Missing tree hash for directory '{}'", sub_dir))
700                })?;
701                let sub_name = match sub_dir.rfind('/') {
702                    Some(pos) => &sub_dir[pos + 1..],
703                    None => sub_dir.as_str(),
704                };
705                entries.push(TreeEntry {
706                    name: sub_name.to_string(),
707                    entry_type: EntryType::Dir,
708                    hash: hex_to_bytes(sub_hash)?,
709                    size: 0,
710                    mode: 0o040755,
711                });
712            }
713        }
714
715        entries.sort_unstable_by(|a, b| a.name.cmp(&b.name));
716        let encoded = bitcode::encode(&entries);
717        let hash_bytes = xxhash_rust::xxh3::xxh3_128(&encoded).to_le_bytes();
718        let hash_hex = crate::store::blob::bytes_to_hex(&hash_bytes);
719
720        dir_hashes.insert(dir.clone(), hash_hex.clone());
721        if known_hashes.insert(hash_bytes) {
722            pack_entries.push((hash_hex, encoded));
723        }
724    }
725
726    // Phase 2: Write all trees to a single pack file (1 write instead of 37K+)
727    tree_store.write_pack(&pack_entries)?;
728
729    dir_hashes
730        .get("")
731        .cloned()
732        .ok_or_else(|| ChkpttError::Other("Failed to build root tree".into()))
733}
734
735fn register_directory_hierarchy(
736    dir: &str,
737    all_dirs: &mut BTreeSet<String>,
738    child_dirs: &mut BTreeMap<String, Vec<String>>,
739) {
740    if dir.is_empty() {
741        return;
742    }
743
744    // Fast path: if the full dir is already known, all ancestors are too
745    if all_dirs.contains(dir) {
746        return;
747    }
748
749    let mut segments_end = Vec::new();
750    for (i, ch) in dir.char_indices() {
751        if ch == '/' {
752            segments_end.push(i);
753        }
754    }
755    segments_end.push(dir.len());
756
757    let mut parent = String::new();
758    for &end in &segments_end {
759        let current = &dir[..end];
760        if all_dirs.insert(current.to_string()) {
761            child_dirs
762                .entry(parent)
763                .or_default()
764                .push(current.to_string());
765        }
766        parent = current.to_string();
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use super::*;
773    use std::fs;
774    use std::sync::atomic::{AtomicUsize, Ordering};
775    use std::sync::Arc;
776    use tempfile::TempDir;
777
778    #[test]
779    fn test_hex_to_bytes() {
780        let hex = "a3b2c1d4e5f60718293a4b5c6d7e8f90";
781        let bytes = hex_to_bytes(hex).unwrap();
782        assert_eq!(bytes[0], 0xa3);
783        assert_eq!(bytes[1], 0xb2);
784        assert_eq!(bytes[15], 0x90);
785    }
786
787    #[test]
788    fn test_hex_to_bytes_invalid_length() {
789        assert!(hex_to_bytes("abc").is_err());
790    }
791
792    #[test]
793    fn test_compress_with_worker_context_roundtrip() {
794        let content = b"compression-context-roundtrip-data";
795
796        let compressed = compress_with_worker_context(content);
797        let decompressed = {
798            use lz4_flex::frame::FrameDecoder;
799            let mut decoder = FrameDecoder::new(&compressed[..]);
800            let mut buf = Vec::new();
801            std::io::Read::read_to_end(&mut decoder, &mut buf).unwrap();
802            buf
803        };
804
805        assert_eq!(decompressed, content);
806    }
807
808    #[test]
809    fn test_sharded_seen_hashes_dedups_single_thread() {
810        let seen = ShardedSeenHashes::new(8);
811        let hash = [7u8; 16];
812
813        assert!(seen.insert(hash));
814        assert!(!seen.insert(hash));
815    }
816
817    #[test]
818    fn test_sharded_seen_hashes_dedups_multi_thread() {
819        let seen = Arc::new(ShardedSeenHashes::new(1024));
820        let unique_inserts = Arc::new(AtomicUsize::new(0));
821        let duplicate_hash = [9u8; 16];
822
823        std::thread::scope(|scope| {
824            for i in 0..8u8 {
825                let seen = Arc::clone(&seen);
826                let unique_inserts = Arc::clone(&unique_inserts);
827                scope.spawn(move || {
828                    if seen.insert(duplicate_hash) {
829                        unique_inserts.fetch_add(1, Ordering::Relaxed);
830                    }
831                    let mut unique_hash = [0u8; 16];
832                    unique_hash[0] = i;
833                    if seen.insert(unique_hash) {
834                        unique_inserts.fetch_add(1, Ordering::Relaxed);
835                    }
836                });
837            }
838        });
839
840        assert_eq!(unique_inserts.load(Ordering::Relaxed), 9);
841    }
842
843    #[test]
844    fn test_split_scanned_refs_preserves_hardlink_groups() {
845        let f1 = scanned("a.txt", Some(1));
846        let f2 = scanned("b.txt", Some(1));
847        let f3 = scanned("c.txt", Some(2));
848        let f4 = scanned("d.txt", Some(3));
849        let f5 = scanned("e.txt", Some(3));
850        let refs = vec![&f1, &f2, &f3, &f4, &f5];
851
852        let chunks = split_scanned_refs_preserving_hardlinks(&refs, 2);
853        let paths: Vec<Vec<&str>> = chunks
854            .into_iter()
855            .map(|chunk| {
856                chunk
857                    .iter()
858                    .map(|file| file.relative_path.as_str())
859                    .collect::<Vec<_>>()
860            })
861            .collect();
862
863        assert_eq!(
864            paths,
865            vec![vec!["a.txt", "b.txt", "c.txt"], vec!["d.txt", "e.txt"]]
866        );
867    }
868
869    #[cfg(unix)]
870    #[test]
871    fn test_prepare_file_with_hardlink_cache_reuses_existing_read() {
872        let dir = TempDir::new().unwrap();
873        let original = dir.path().join("original.txt");
874        let alias = dir.path().join("alias.txt");
875        fs::write(&original, "same-content").unwrap();
876        fs::hard_link(&original, &alias).unwrap();
877
878        let original_scanned = scanned_from_path("original.txt", &original);
879        let alias_scanned = scanned_from_path("alias.txt", &alias);
880        let seen_hashes = ShardedSeenHashes::new(2);
881        let mut hardlinks = HashMap::new();
882
883        let first =
884            prepare_file_with_hardlink_cache(&original_scanned, &seen_hashes, &mut hardlinks)
885                .unwrap();
886        let second =
887            prepare_file_with_hardlink_cache(&alias_scanned, &seen_hashes, &mut hardlinks).unwrap();
888
889        assert!(first.compressed.is_some());
890        assert!(second.compressed.is_none());
891        assert_eq!(first.blob_hash_bytes, second.blob_hash_bytes);
892    }
893
894    fn scanned(relative_path: &str, inode: Option<u64>) -> ScannedFile {
895        ScannedFile {
896            relative_path: relative_path.to_string(),
897            absolute_path: std::path::PathBuf::from(relative_path),
898            size: 1,
899            mtime_secs: 1,
900            mtime_nanos: 1,
901            device: Some(1),
902            inode,
903            mode: 0o100644,
904            is_symlink: false,
905        }
906    }
907
908    #[cfg(unix)]
909    fn scanned_from_path(relative_path: &str, path: &Path) -> ScannedFile {
910        use std::os::unix::fs::MetadataExt;
911
912        let metadata = fs::metadata(path).unwrap();
913        ScannedFile {
914            relative_path: relative_path.to_string(),
915            absolute_path: path.to_path_buf(),
916            size: metadata.len(),
917            mtime_secs: metadata.mtime(),
918            mtime_nanos: metadata.mtime_nsec(),
919            device: Some(metadata.dev()),
920            inode: Some(metadata.ino()),
921            mode: metadata.mode(),
922            is_symlink: metadata.file_type().is_symlink(),
923        }
924    }
925}