Skip to main content

chkpt_core/ops/
save.rs

1use crate::config::{project_id_from_path, StoreLayout};
2use crate::error::{ChkpttError, Result};
3use crate::index::{FileEntry, FileIndex};
4use crate::ops::io_order::sort_scanned_refs_for_locality;
5use crate::ops::lock::ProjectLock;
6use crate::scanner::ScannedFile;
7use crate::store::blob::{hex_to_bytes, read_or_mmap, read_path_bytes};
8use crate::store::catalog::{BlobLocation, CatalogSnapshot, ManifestEntry, MetadataCatalog};
9use crate::store::pack::PackWriter;
10use crate::store::snapshot::{Snapshot, SnapshotStats};
11use crate::store::tree::{EntryType, TreeEntry, TreeStore};
12use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
13use std::path::Path;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{mpsc, Arc, Mutex};
16
17use crate::ops::progress::{emit, ProgressCallback, ProgressEvent};
18
19#[derive(Default)]
20pub struct SaveOptions {
21    pub message: Option<String>,
22    pub include_deps: bool,
23    pub progress: ProgressCallback,
24}
25
26#[derive(Debug)]
27pub struct SaveResult {
28    pub snapshot_id: String,
29    pub stats: SnapshotStats,
30}
31
32/// Represents a file with its blob hash after processing.
33struct ProcessedFile {
34    relative_path: String,
35    blob_hash_bytes: [u8; 16],
36    size: u64,
37    mode: u32,
38    entry_type: EntryType,
39}
40
41struct PreparedFile {
42    relative_path: String,
43    blob_hash_bytes: [u8; 16],
44    /// None if this hash was already seen (duplicate content — compression skipped)
45    compressed: Option<Vec<u8>>,
46    size: u64,
47    mode: u32,
48    mtime_secs: i64,
49    mtime_nanos: i64,
50    inode: Option<u64>,
51    entry_type: EntryType,
52}
53
54struct NewBlobRecord {
55    blob_hash: [u8; 16],
56    size: u64,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
60struct HardlinkKey {
61    device: u64,
62    inode: u64,
63}
64
65#[derive(Debug, Clone)]
66struct HardlinkPrepared {
67    blob_hash_bytes: [u8; 16],
68}
69
70const SEEN_HASH_SHARDS: usize = 64;
71const PREPARED_FILE_PIPELINE_SLOTS: usize = 64;
72struct ShardedSeenHashes {
73    shards: Vec<Mutex<HashSet<[u8; 16]>>>,
74}
75
76impl ShardedSeenHashes {
77    fn new(expected_entries: usize) -> Self {
78        let shard_count = SEEN_HASH_SHARDS.max(1);
79        let per_shard_capacity = expected_entries.div_ceil(shard_count).max(1);
80        let mut shards = Vec::with_capacity(shard_count);
81        for _ in 0..shard_count {
82            shards.push(Mutex::new(HashSet::with_capacity(per_shard_capacity)));
83        }
84        Self { shards }
85    }
86
87    #[inline]
88    fn insert(&self, hash: [u8; 16]) -> bool {
89        let shard_index = ((hash[0] as usize) << 8 | hash[1] as usize) % self.shards.len();
90        let mut shard = self.shards[shard_index].lock().unwrap();
91        shard.insert(hash)
92    }
93}
94
95/// Save a checkpoint of the workspace.
96///
97/// This is the main orchestrating function that ties together scanning,
98/// hashing, blob storage, tree building, and snapshot creation.
99pub fn save(workspace_root: &Path, options: SaveOptions) -> Result<SaveResult> {
100    // 1. Compute project_id from workspace path
101    let project_id = project_id_from_path(workspace_root);
102
103    // 2. Create StoreLayout, ensure directories exist
104    let layout = StoreLayout::new(&project_id);
105    layout.ensure_dirs()?;
106
107    // 3. Acquire project lock
108    let _lock = ProjectLock::acquire(&layout.locks_dir())?;
109    let catalog = MetadataCatalog::open(layout.catalog_path())?;
110
111    // 4. Scan workspace (respect .chkptignore)
112    let scanned_files =
113        crate::scanner::scan_workspace_with_options(workspace_root, None, options.include_deps)?;
114    emit(
115        &options.progress,
116        ProgressEvent::ScanComplete {
117            file_count: scanned_files.len() as u64,
118        },
119    );
120
121    // 5. Open/create FileIndex
122    let mut index = FileIndex::open(layout.index_path())?;
123    let cached_entries = index.entries();
124
125    // 6. Create blob store
126    let packs_dir = layout.packs_dir();
127    // Load all known blob hashes from the catalog's blob_index table.
128    // This relies on blob_index being authoritative (kept in sync by
129    // bulk_upsert_blob_locations after each pack write). If the catalog
130    // and pack store ever diverge (e.g. manual .dat deletion), a store
131    // repair operation would be needed.
132    let known_blob_hashes = catalog.all_blob_hashes()?;
133    let mut pack_writer = PackWriter::new(&packs_dir)?;
134
135    // 7. Process each scanned file: check index, hash, store blob
136    let mut processed_files = Vec::with_capacity(scanned_files.len());
137    let mut files_to_prepare = Vec::new();
138    let mut updated_entries = Vec::new();
139    let mut blob_locations_to_record = Vec::new();
140    let mut new_blob_records = Vec::new();
141    let mut total_bytes: u64 = 0;
142    let mut current_paths =
143        (!cached_entries.is_empty()).then(|| HashSet::with_capacity(scanned_files.len()));
144
145    for scanned in &scanned_files {
146        if let Some(paths) = current_paths.as_mut() {
147            paths.insert(scanned.relative_path.as_str());
148        }
149
150        if let Some(processed) =
151            cached_processed_file(scanned, cached_entries.get(&scanned.relative_path))
152        {
153            total_bytes += processed.size;
154            processed_files.push(processed);
155        } else {
156            files_to_prepare.push(scanned);
157        }
158    }
159    let removed_paths = current_paths
160        .map(|paths| {
161            cached_entries
162                .keys()
163                .filter(|path| !paths.contains(path.as_str()))
164                .cloned()
165                .collect::<Vec<_>>()
166        })
167        .unwrap_or_default();
168
169    let mut new_objects: u64 = 0;
170
171    // Sharded dedup set reduces lock contention during parallel hash+compress.
172    let seen_hashes = Arc::new(ShardedSeenHashes::new(files_to_prepare.len()));
173
174    // Prioritize on-disk locality to reduce random I/O during read+hash+compress.
175    sort_scanned_refs_for_locality(&mut files_to_prepare);
176
177    // Batch parallel: each thread processes a chunk independently, no channel overhead
178    let total_to_process = files_to_prepare.len() as u64;
179    emit(
180        &options.progress,
181        ProgressEvent::ProcessStart {
182            total: total_to_process,
183        },
184    );
185    let progress_counter = AtomicU64::new(0);
186    process_prepared_files_streaming(
187        &files_to_prepare,
188        &seen_hashes,
189        &options.progress,
190        &progress_counter,
191        total_to_process,
192        |prepared| {
193            let PreparedFile {
194                relative_path,
195                blob_hash_bytes,
196                compressed,
197                size,
198                mode,
199                mtime_secs,
200                mtime_nanos,
201                inode,
202                entry_type,
203            } = prepared;
204
205            total_bytes += size;
206            if let Some(compressed) = compressed {
207                let exists_in_store = known_blob_hashes.contains(&blob_hash_bytes);
208                if !exists_in_store {
209                    pack_writer.add_pre_compressed_bytes(blob_hash_bytes, compressed)?;
210                    new_blob_records.push(NewBlobRecord {
211                        blob_hash: blob_hash_bytes,
212                        size,
213                    });
214                    new_objects += 1;
215                }
216            }
217
218            updated_entries.push(FileEntry {
219                path: relative_path.clone(),
220                blob_hash: blob_hash_bytes,
221                size,
222                mtime_secs,
223                mtime_nanos,
224                inode,
225                mode,
226            });
227            processed_files.push(ProcessedFile {
228                relative_path,
229                blob_hash_bytes,
230                size,
231                mode,
232                entry_type,
233            });
234            Ok(())
235        },
236    )?;
237    let new_pack_hash = if !pack_writer.is_empty() {
238        Some(pack_writer.finish()?)
239    } else {
240        None
241    };
242    if let Some(pack_hash) = new_pack_hash {
243        for blob in &new_blob_records {
244            blob_locations_to_record.push((
245                blob.blob_hash,
246                BlobLocation {
247                    pack_hash: Some(pack_hash.clone()),
248                    size: blob.size,
249                },
250            ));
251        }
252    }
253    catalog.bulk_upsert_blob_locations(&blob_locations_to_record)?;
254    emit(&options.progress, ProgressEvent::PackComplete);
255
256    // 8. Build tree bottom-up (skip if nothing changed)
257    let latest_catalog_snapshot = catalog.latest_snapshot()?;
258
259    let root_tree_hash = if new_objects == 0
260        && removed_paths.is_empty()
261        && updated_entries.is_empty()
262    {
263        if let Some(ref snapshot) = latest_catalog_snapshot {
264            // Nothing changed — reuse previous tree hash (skip build entirely)
265            root_tree_hash_for_snapshot(&catalog, snapshot, &TreeStore::new(layout.trees_dir()))?
266        } else {
267            let tree_store = TreeStore::new(layout.trees_dir());
268            let hex = build_tree(&processed_files, &tree_store)?;
269            hex_to_bytes(&hex)?
270        }
271    } else {
272        let tree_store = TreeStore::new(layout.trees_dir());
273        let hex = build_tree(&processed_files, &tree_store)?;
274        hex_to_bytes(&hex)?
275    };
276
277    // 9. Find latest snapshot for parent_snapshot_id (already fetched above)
278    let parent_snapshot_id = latest_catalog_snapshot
279        .as_ref()
280        .map(|snapshot| snapshot.id.clone());
281
282    // 10. Create Snapshot
283    let stats = SnapshotStats {
284        total_files: scanned_files.len() as u64,
285        total_bytes,
286        new_objects,
287    };
288
289    let snapshot = Snapshot::new(
290        options.message,
291        root_tree_hash,
292        parent_snapshot_id,
293        stats.clone(),
294    );
295
296    let snapshot_id = snapshot.id.clone();
297    let catalog_snapshot = CatalogSnapshot {
298        id: snapshot.id.clone(),
299        created_at: snapshot.created_at,
300        message: snapshot.message.clone(),
301        parent_snapshot_id: snapshot.parent_snapshot_id.clone(),
302        manifest_snapshot_id: None,
303        root_tree_hash: Some(root_tree_hash),
304        stats: stats.clone(),
305    };
306    let no_manifest_changes =
307        new_objects == 0 && removed_paths.is_empty() && updated_entries.is_empty();
308
309    // 11. Save snapshot
310    if no_manifest_changes {
311        let manifest_snapshot_id = latest_catalog_snapshot
312            .as_ref()
313            .map(|snapshot| {
314                snapshot
315                    .manifest_snapshot_id
316                    .as_deref()
317                    .unwrap_or(snapshot.id.as_str())
318            })
319            .unwrap_or(snapshot.id.as_str());
320        catalog.insert_snapshot_metadata_only(&catalog_snapshot, manifest_snapshot_id)?;
321    } else {
322        let mut manifest: Vec<ManifestEntry> = processed_files
323            .iter()
324            .map(|processed| ManifestEntry {
325                path: processed.relative_path.clone(),
326                blob_hash: processed.blob_hash_bytes,
327                size: processed.size,
328                mode: processed.mode,
329            })
330            .collect();
331        manifest.sort_unstable_by(|left, right| left.path.cmp(&right.path));
332        catalog.insert_snapshot(&catalog_snapshot, &manifest)?;
333    }
334
335    // 12. Update only changed index entries and remove stale paths.
336    index.apply_changes(&removed_paths, &updated_entries)?;
337
338    // 13. Lock released automatically via drop
339
340    // 14. Return SaveResult
341    Ok(SaveResult { snapshot_id, stats })
342}
343
344#[cfg_attr(not(test), allow(dead_code))]
345fn store_has_external_objects(objects_dir: &Path, packs_dir: &Path) -> Result<bool> {
346    let _ = objects_dir;
347    Ok(store_has_pack_objects(packs_dir)?)
348}
349
350fn store_has_pack_objects(packs_dir: &Path) -> Result<bool> {
351    let entries = match std::fs::read_dir(packs_dir) {
352        Ok(entries) => entries,
353        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(false),
354        Err(error) => return Err(error.into()),
355    };
356
357    for entry in entries {
358        let entry = entry?;
359        if !entry.file_type()?.is_file() {
360            continue;
361        }
362
363        let name = entry.file_name();
364        let name = name.to_string_lossy();
365        if name.starts_with("pack-") && name.ends_with(".dat") {
366            return Ok(true);
367        }
368    }
369
370    Ok(false)
371}
372
373fn root_tree_hash_for_snapshot(
374    catalog: &MetadataCatalog,
375    snapshot: &CatalogSnapshot,
376    tree_store: &TreeStore,
377) -> Result<[u8; 16]> {
378    if let Some(root_tree_hash) = snapshot.root_tree_hash {
379        return Ok(root_tree_hash);
380    }
381
382    let manifest = catalog.snapshot_manifest(&snapshot.id)?;
383    if manifest.is_empty() && snapshot.stats.total_files > 0 {
384        return Err(ChkpttError::StoreCorrupted(format!(
385            "snapshot '{}' is missing both manifest entries and root_tree_hash",
386            snapshot.id
387        )));
388    }
389    let root_tree_hex = build_tree(
390        &manifest
391            .into_iter()
392            .map(|entry| ProcessedFile {
393                relative_path: entry.path,
394                blob_hash_bytes: entry.blob_hash,
395                size: entry.size,
396                mode: entry.mode,
397                entry_type: if entry.mode & 0o170000 == 0o120000 {
398                    EntryType::Symlink
399                } else {
400                    EntryType::File
401                },
402            })
403            .collect::<Vec<_>>(),
404        tree_store,
405    )?;
406    hex_to_bytes(&root_tree_hex)
407}
408
409fn cached_processed_file(
410    scanned: &ScannedFile,
411    cached: Option<&FileEntry>,
412) -> Option<ProcessedFile> {
413    if let Some(cached) = cached {
414        if cached.mtime_secs == scanned.mtime_secs
415            && cached.mtime_nanos == scanned.mtime_nanos
416            && cached.size == scanned.size
417            && cached.inode == scanned.inode
418            && cached.mode == scanned.mode
419        {
420            return Some(ProcessedFile {
421                relative_path: scanned.relative_path.clone(),
422                blob_hash_bytes: cached.blob_hash,
423                size: scanned.size,
424                mode: scanned.mode,
425                entry_type: if scanned.is_symlink {
426                    EntryType::Symlink
427                } else {
428                    EntryType::File
429                },
430            });
431        }
432    }
433    None
434}
435
436fn prepare_file(scanned: &ScannedFile, seen_hashes: &ShardedSeenHashes) -> Result<PreparedFile> {
437    let (blob_hash_bytes, compressed) = if scanned.is_symlink {
438        let content = read_path_bytes(&scanned.absolute_path, true)?;
439        let blob_hash_bytes = xxhash_rust::xxh3::xxh3_128(&content).to_le_bytes();
440        let is_new = seen_hashes.insert(blob_hash_bytes);
441        let compressed = if is_new {
442            Some(compress_with_worker_context(&content))
443        } else {
444            None
445        };
446        (blob_hash_bytes, compressed)
447    } else {
448        let content = read_or_mmap(&scanned.absolute_path)?;
449        let blob_hash_bytes = xxhash_rust::xxh3::xxh3_128(content.as_ref()).to_le_bytes();
450        let is_new = seen_hashes.insert(blob_hash_bytes);
451        let compressed = if is_new {
452            Some(compress_with_worker_context(content.as_ref()))
453        } else {
454            None
455        };
456        (blob_hash_bytes, compressed)
457    };
458
459    Ok(PreparedFile {
460        relative_path: scanned.relative_path.clone(),
461        blob_hash_bytes,
462        compressed,
463        size: scanned.size,
464        mode: scanned.mode,
465        mtime_secs: scanned.mtime_secs,
466        mtime_nanos: scanned.mtime_nanos,
467        inode: scanned.inode,
468        entry_type: if scanned.is_symlink {
469            EntryType::Symlink
470        } else {
471            EntryType::File
472        },
473    })
474}
475
476fn hardlink_key(scanned: &ScannedFile) -> Option<HardlinkKey> {
477    if scanned.is_symlink {
478        return None;
479    }
480
481    Some(HardlinkKey {
482        device: scanned.device?,
483        inode: scanned.inode?,
484    })
485}
486
487fn prepare_file_with_hardlink_cache(
488    scanned: &ScannedFile,
489    seen_hashes: &ShardedSeenHashes,
490    hardlinks: &mut HashMap<HardlinkKey, HardlinkPrepared>,
491) -> Result<PreparedFile> {
492    if let Some(key) = hardlink_key(scanned) {
493        if let Some(cached) = hardlinks.get(&key) {
494            return Ok(PreparedFile {
495                relative_path: scanned.relative_path.clone(),
496                blob_hash_bytes: cached.blob_hash_bytes,
497                compressed: None,
498                size: scanned.size,
499                mode: scanned.mode,
500                mtime_secs: scanned.mtime_secs,
501                mtime_nanos: scanned.mtime_nanos,
502                inode: scanned.inode,
503                entry_type: if scanned.is_symlink {
504                    EntryType::Symlink
505                } else {
506                    EntryType::File
507                },
508            });
509        }
510
511        let prepared = prepare_file(scanned, seen_hashes)?;
512        hardlinks.insert(
513            key,
514            HardlinkPrepared {
515                blob_hash_bytes: prepared.blob_hash_bytes,
516            },
517        );
518        return Ok(prepared);
519    }
520
521    prepare_file(scanned, seen_hashes)
522}
523
524fn split_scanned_refs_preserving_hardlinks<'a>(
525    scanned_files: &'a [&'a ScannedFile],
526    worker_count: usize,
527) -> Vec<&'a [&'a ScannedFile]> {
528    if scanned_files.is_empty() {
529        return Vec::new();
530    }
531
532    let target_chunk_size = scanned_files.len().div_ceil(worker_count.max(1));
533    let mut chunks = Vec::with_capacity(worker_count.max(1));
534    let mut start = 0usize;
535
536    while start < scanned_files.len() {
537        let mut end = (start + target_chunk_size).min(scanned_files.len());
538        while end < scanned_files.len()
539            && hardlink_key(scanned_files[end - 1]) == hardlink_key(scanned_files[end])
540            && hardlink_key(scanned_files[end]).is_some()
541        {
542            end += 1;
543        }
544        chunks.push(&scanned_files[start..end]);
545        start = end;
546    }
547
548    chunks
549}
550
551fn compress_with_worker_context(content: &[u8]) -> Vec<u8> {
552    use lz4_flex::frame::FrameEncoder;
553    let mut encoder = FrameEncoder::new(Vec::new());
554    std::io::Write::write_all(&mut encoder, content).unwrap();
555    encoder.finish().unwrap()
556}
557
558/// Prepare files with a bounded producer/consumer pipeline so compressed blobs
559/// can be consumed immediately instead of accumulating for the full save.
560fn process_prepared_files_streaming<F>(
561    scanned_files: &[&ScannedFile],
562    seen_hashes: &Arc<ShardedSeenHashes>,
563    progress: &ProgressCallback,
564    progress_counter: &AtomicU64,
565    total_to_process: u64,
566    mut on_prepared: F,
567) -> Result<()>
568where
569    F: FnMut(PreparedFile) -> Result<()>,
570{
571    if scanned_files.is_empty() {
572        return Ok(());
573    }
574
575    let worker_count = std::thread::available_parallelism()
576        .map(|n| n.get())
577        .unwrap_or(1)
578        .min(scanned_files.len());
579
580    if worker_count <= 1 {
581        let mut hardlinks = HashMap::new();
582        for scanned in scanned_files {
583            let prepared = prepare_file_with_hardlink_cache(scanned, seen_hashes, &mut hardlinks)?;
584            let completed = progress_counter.fetch_add(1, Ordering::Relaxed) + 1;
585            emit(
586                progress,
587                ProgressEvent::ProcessFile {
588                    completed,
589                    total: total_to_process,
590                },
591            );
592            on_prepared(prepared)?;
593        }
594        return Ok(());
595    }
596
597    let chunks = split_scanned_refs_preserving_hardlinks(scanned_files, worker_count);
598
599    std::thread::scope(|scope| -> Result<()> {
600        let (sender, receiver) = mpsc::sync_channel::<Result<PreparedFile>>(
601            PREPARED_FILE_PIPELINE_SLOTS.max(worker_count),
602        );
603
604        let handles: Vec<_> = chunks
605            .into_iter()
606            .map(|chunk| {
607                let sender = sender.clone();
608                scope.spawn(move || {
609                    let mut hardlinks = HashMap::new();
610
611                    for scanned in chunk {
612                        let result =
613                            prepare_file_with_hardlink_cache(scanned, seen_hashes, &mut hardlinks);
614                        let completed = progress_counter.fetch_add(1, Ordering::Relaxed) + 1;
615                        emit(
616                            progress,
617                            ProgressEvent::ProcessFile {
618                                completed,
619                                total: total_to_process,
620                            },
621                        );
622                        if sender.send(result).is_err() {
623                            return;
624                        }
625                    }
626                })
627            })
628            .collect();
629        drop(sender);
630
631        let mut consumer_result = Ok(());
632        loop {
633            match receiver.recv() {
634                Ok(Ok(prepared)) => {
635                    if let Err(error) = on_prepared(prepared) {
636                        consumer_result = Err(error);
637                        break;
638                    }
639                }
640                Ok(Err(error)) => {
641                    consumer_result = Err(error);
642                    break;
643                }
644                Err(_) => break,
645            }
646        }
647        drop(receiver);
648
649        for handle in handles {
650            handle.join().unwrap();
651        }
652
653        consumer_result
654    })
655}
656
657/// Build tree structure bottom-up from processed files.
658///
659/// Groups files by their directory path, creates tree entries for each file,
660/// recursively builds subtrees for subdirectories, and returns the root tree hash.
661fn build_tree(processed_files: &[ProcessedFile], tree_store: &TreeStore) -> Result<String> {
662    // Group files by parent directory
663    let mut dir_files: BTreeMap<String, Vec<&ProcessedFile>> = BTreeMap::new();
664    let mut all_dirs: BTreeSet<String> = BTreeSet::new();
665    let mut child_dirs: BTreeMap<String, Vec<String>> = BTreeMap::new();
666    all_dirs.insert(String::new()); // root always exists
667
668    for pf in processed_files {
669        let parent = match pf.relative_path.rfind('/') {
670            Some(pos) => &pf.relative_path[..pos],
671            None => "",
672        };
673        dir_files.entry(parent.to_string()).or_default().push(pf);
674        register_directory_hierarchy(parent, &mut all_dirs, &mut child_dirs);
675    }
676
677    // Sort directories bottom-up (deepest first)
678    let mut dir_list: Vec<String> = all_dirs.into_iter().collect();
679    dir_list.sort_unstable_by(|a, b| {
680        let depth_a = if a.is_empty() {
681            0
682        } else {
683            a.matches('/').count() + 1
684        };
685        let depth_b = if b.is_empty() {
686            0
687        } else {
688            b.matches('/').count() + 1
689        };
690        depth_b.cmp(&depth_a).then_with(|| a.cmp(b))
691    });
692
693    // Phase 1: Compute all tree hashes and encoded data in memory
694    let mut dir_hashes: BTreeMap<String, String> = BTreeMap::new();
695    let mut pack_entries: Vec<(String, Vec<u8>)> = Vec::with_capacity(dir_list.len());
696    let mut known_hashes: HashSet<[u8; 16]> = HashSet::with_capacity(dir_list.len());
697
698    for dir in &dir_list {
699        let file_count = dir_files.get(dir).map(|files| files.len()).unwrap_or(0);
700        let child_count = child_dirs
701            .get(dir)
702            .map(|children| children.len())
703            .unwrap_or(0);
704        let mut entries: Vec<TreeEntry> = Vec::with_capacity(file_count + child_count);
705
706        if let Some(files) = dir_files.get(dir) {
707            for pf in files {
708                let name = match pf.relative_path.rfind('/') {
709                    Some(pos) => &pf.relative_path[pos + 1..],
710                    None => &pf.relative_path,
711                };
712                entries.push(TreeEntry {
713                    name: name.to_string(),
714                    entry_type: pf.entry_type,
715                    hash: pf.blob_hash_bytes,
716                    size: pf.size,
717                    mode: pf.mode,
718                });
719            }
720        }
721
722        if let Some(children) = child_dirs.get(dir) {
723            for sub_dir in children {
724                let sub_hash = dir_hashes.get(sub_dir).ok_or_else(|| {
725                    ChkpttError::Other(format!("Missing tree hash for directory '{}'", sub_dir))
726                })?;
727                let sub_name = match sub_dir.rfind('/') {
728                    Some(pos) => &sub_dir[pos + 1..],
729                    None => sub_dir.as_str(),
730                };
731                entries.push(TreeEntry {
732                    name: sub_name.to_string(),
733                    entry_type: EntryType::Dir,
734                    hash: hex_to_bytes(sub_hash)?,
735                    size: 0,
736                    mode: 0o040755,
737                });
738            }
739        }
740
741        entries.sort_unstable_by(|a, b| a.name.cmp(&b.name));
742        let encoded = bitcode::encode(&entries);
743        let hash_bytes = xxhash_rust::xxh3::xxh3_128(&encoded).to_le_bytes();
744        let hash_hex = crate::store::blob::bytes_to_hex(&hash_bytes);
745
746        dir_hashes.insert(dir.clone(), hash_hex.clone());
747        if known_hashes.insert(hash_bytes) {
748            pack_entries.push((hash_hex, encoded));
749        }
750    }
751
752    // Phase 2: Write all trees to a single pack file (1 write instead of 37K+)
753    tree_store.write_pack(&pack_entries)?;
754
755    dir_hashes
756        .get("")
757        .cloned()
758        .ok_or_else(|| ChkpttError::Other("Failed to build root tree".into()))
759}
760
761fn register_directory_hierarchy(
762    dir: &str,
763    all_dirs: &mut BTreeSet<String>,
764    child_dirs: &mut BTreeMap<String, Vec<String>>,
765) {
766    if dir.is_empty() {
767        return;
768    }
769
770    // Fast path: if the full dir is already known, all ancestors are too
771    if all_dirs.contains(dir) {
772        return;
773    }
774
775    let mut segments_end = Vec::new();
776    for (i, ch) in dir.char_indices() {
777        if ch == '/' {
778            segments_end.push(i);
779        }
780    }
781    segments_end.push(dir.len());
782
783    let mut parent = String::new();
784    for &end in &segments_end {
785        let current = &dir[..end];
786        if all_dirs.insert(current.to_string()) {
787            child_dirs
788                .entry(parent)
789                .or_default()
790                .push(current.to_string());
791        }
792        parent = current.to_string();
793    }
794}
795
796#[cfg(test)]
797mod tests {
798    use super::*;
799    use std::fs;
800    use std::sync::atomic::{AtomicUsize, Ordering};
801    use std::sync::Arc;
802    use tempfile::TempDir;
803
804    #[test]
805    fn test_hex_to_bytes() {
806        let hex = "a3b2c1d4e5f60718293a4b5c6d7e8f90";
807        let bytes = hex_to_bytes(hex).unwrap();
808        assert_eq!(bytes[0], 0xa3);
809        assert_eq!(bytes[1], 0xb2);
810        assert_eq!(bytes[15], 0x90);
811    }
812
813    #[test]
814    fn test_hex_to_bytes_invalid_length() {
815        assert!(hex_to_bytes("abc").is_err());
816    }
817
818    #[test]
819    fn test_store_has_external_objects_false_for_empty_store() {
820        let dir = TempDir::new().unwrap();
821        let objects_dir = dir.path().join("objects");
822        let packs_dir = dir.path().join("packs");
823        std::fs::create_dir_all(&objects_dir).unwrap();
824        std::fs::create_dir_all(&packs_dir).unwrap();
825
826        assert!(!store_has_external_objects(&objects_dir, &packs_dir).unwrap());
827    }
828
829    #[test]
830    fn test_store_has_external_objects_true_for_pack_objects() {
831        let dir = TempDir::new().unwrap();
832        let objects_dir = dir.path().join("objects");
833        let packs_dir = dir.path().join("packs");
834        std::fs::create_dir_all(&packs_dir).unwrap();
835        std::fs::write(packs_dir.join("pack-demo.dat"), b"pack").unwrap();
836
837        assert!(store_has_external_objects(&objects_dir, &packs_dir).unwrap());
838    }
839
840    #[test]
841    fn test_compress_with_worker_context_roundtrip() {
842        let content = b"compression-context-roundtrip-data";
843
844        let compressed = compress_with_worker_context(content);
845        let decompressed = {
846            use lz4_flex::frame::FrameDecoder;
847            let mut decoder = FrameDecoder::new(&compressed[..]);
848            let mut buf = Vec::new();
849            std::io::Read::read_to_end(&mut decoder, &mut buf).unwrap();
850            buf
851        };
852
853        assert_eq!(decompressed, content);
854    }
855
856    #[test]
857    fn test_sharded_seen_hashes_dedups_single_thread() {
858        let seen = ShardedSeenHashes::new(8);
859        let hash = [7u8; 16];
860
861        assert!(seen.insert(hash));
862        assert!(!seen.insert(hash));
863    }
864
865    #[test]
866    fn test_sharded_seen_hashes_dedups_multi_thread() {
867        let seen = Arc::new(ShardedSeenHashes::new(1024));
868        let unique_inserts = Arc::new(AtomicUsize::new(0));
869        let duplicate_hash = [9u8; 16];
870
871        std::thread::scope(|scope| {
872            for i in 0..8u8 {
873                let seen = Arc::clone(&seen);
874                let unique_inserts = Arc::clone(&unique_inserts);
875                scope.spawn(move || {
876                    if seen.insert(duplicate_hash) {
877                        unique_inserts.fetch_add(1, Ordering::Relaxed);
878                    }
879                    let mut unique_hash = [0u8; 16];
880                    unique_hash[0] = i;
881                    if seen.insert(unique_hash) {
882                        unique_inserts.fetch_add(1, Ordering::Relaxed);
883                    }
884                });
885            }
886        });
887
888        assert_eq!(unique_inserts.load(Ordering::Relaxed), 9);
889    }
890
891    #[test]
892    fn test_split_scanned_refs_preserves_hardlink_groups() {
893        let f1 = scanned("a.txt", Some(1));
894        let f2 = scanned("b.txt", Some(1));
895        let f3 = scanned("c.txt", Some(2));
896        let f4 = scanned("d.txt", Some(3));
897        let f5 = scanned("e.txt", Some(3));
898        let refs = vec![&f1, &f2, &f3, &f4, &f5];
899
900        let chunks = split_scanned_refs_preserving_hardlinks(&refs, 2);
901        let paths: Vec<Vec<&str>> = chunks
902            .into_iter()
903            .map(|chunk| {
904                chunk
905                    .iter()
906                    .map(|file| file.relative_path.as_str())
907                    .collect::<Vec<_>>()
908            })
909            .collect();
910
911        assert_eq!(
912            paths,
913            vec![vec!["a.txt", "b.txt", "c.txt"], vec!["d.txt", "e.txt"]]
914        );
915    }
916
917    #[cfg(unix)]
918    #[test]
919    fn test_prepare_file_with_hardlink_cache_reuses_existing_read() {
920        let dir = TempDir::new().unwrap();
921        let original = dir.path().join("original.txt");
922        let alias = dir.path().join("alias.txt");
923        fs::write(&original, "same-content").unwrap();
924        fs::hard_link(&original, &alias).unwrap();
925
926        let original_scanned = scanned_from_path("original.txt", &original);
927        let alias_scanned = scanned_from_path("alias.txt", &alias);
928        let seen_hashes = ShardedSeenHashes::new(2);
929        let mut hardlinks = HashMap::new();
930
931        let first =
932            prepare_file_with_hardlink_cache(&original_scanned, &seen_hashes, &mut hardlinks)
933                .unwrap();
934        let second =
935            prepare_file_with_hardlink_cache(&alias_scanned, &seen_hashes, &mut hardlinks).unwrap();
936
937        assert!(first.compressed.is_some());
938        assert!(second.compressed.is_none());
939        assert_eq!(first.blob_hash_bytes, second.blob_hash_bytes);
940    }
941
942    fn scanned(relative_path: &str, inode: Option<u64>) -> ScannedFile {
943        ScannedFile {
944            relative_path: relative_path.to_string(),
945            absolute_path: std::path::PathBuf::from(relative_path),
946            size: 1,
947            mtime_secs: 1,
948            mtime_nanos: 1,
949            device: Some(1),
950            inode,
951            mode: 0o100644,
952            is_symlink: false,
953        }
954    }
955
956    #[cfg(unix)]
957    fn scanned_from_path(relative_path: &str, path: &Path) -> ScannedFile {
958        use std::os::unix::fs::MetadataExt;
959
960        let metadata = fs::metadata(path).unwrap();
961        ScannedFile {
962            relative_path: relative_path.to_string(),
963            absolute_path: path.to_path_buf(),
964            size: metadata.len(),
965            mtime_secs: metadata.mtime(),
966            mtime_nanos: metadata.mtime_nsec(),
967            device: Some(metadata.dev()),
968            inode: Some(metadata.ino()),
969            mode: metadata.mode(),
970            is_symlink: metadata.file_type().is_symlink(),
971        }
972    }
973}