Skip to main content

chkpt_core/ops/
save.rs

1use crate::config::{project_id_from_path, StoreLayout};
2use crate::error::{ChkpttError, Result};
3use crate::index::{FileEntry, FileIndex};
4use crate::ops::io_order::sort_scanned_refs_for_locality;
5use crate::ops::lock::ProjectLock;
6use crate::scanner::ScannedFile;
7use crate::store::blob::read_path_bytes;
8use crate::store::catalog::{BlobLocation, CatalogSnapshot, ManifestEntry, MetadataCatalog};
9use crate::store::pack::{PackSet, PackWriter};
10use crate::store::snapshot::{Snapshot, SnapshotStats};
11use crate::store::tree::{EntryType, TreeEntry, TreeStore};
12use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
13use std::path::Path;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{mpsc, Arc, Mutex};
16
17use crate::ops::progress::{emit, ProgressCallback, ProgressEvent};
18
19#[derive(Default)]
20pub struct SaveOptions {
21    pub message: Option<String>,
22    pub include_deps: bool,
23    pub progress: ProgressCallback,
24}
25
26#[derive(Debug)]
27pub struct SaveResult {
28    pub snapshot_id: String,
29    pub stats: SnapshotStats,
30}
31
32/// Convert a 64-char hex string to a [u8; 32] array.
33fn hex_to_bytes(hex: &str) -> Result<[u8; 32]> {
34    let mut bytes = [0u8; 32];
35    if hex.len() != 64 {
36        return Err(ChkpttError::Other(format!(
37            "Invalid hash length: {}",
38            hex.len()
39        )));
40    }
41    for i in 0..32 {
42        bytes[i] = u8::from_str_radix(&hex[i * 2..i * 2 + 2], 16)
43            .map_err(|_| ChkpttError::Other("Invalid hex".into()))?;
44    }
45    Ok(bytes)
46}
47
48/// Represents a file with its blob hash after processing.
49struct ProcessedFile {
50    relative_path: String,
51    blob_hash_bytes: [u8; 32],
52    size: u64,
53    mode: u32,
54    entry_type: EntryType,
55}
56
57struct PreparedFile {
58    relative_path: String,
59    blob_hash_hex: String,
60    blob_hash_bytes: [u8; 32],
61    /// None if this hash was already seen (duplicate content — compression skipped)
62    compressed: Option<Vec<u8>>,
63    size: u64,
64    mode: u32,
65    mtime_secs: i64,
66    mtime_nanos: i64,
67    inode: Option<u64>,
68    entry_type: EntryType,
69}
70
71struct NewBlobRecord {
72    blob_hash: [u8; 32],
73    size: u64,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
77struct HardlinkKey {
78    device: u64,
79    inode: u64,
80}
81
82#[derive(Debug, Clone)]
83struct HardlinkPrepared {
84    blob_hash_hex: String,
85    blob_hash_bytes: [u8; 32],
86}
87
88const SEEN_HASH_SHARDS: usize = 64;
89const PREPARED_FILE_PIPELINE_SLOTS: usize = 64;
90struct ShardedSeenHashes {
91    shards: Vec<Mutex<HashSet<[u8; 32]>>>,
92}
93
94impl ShardedSeenHashes {
95    fn new(expected_entries: usize) -> Self {
96        let shard_count = SEEN_HASH_SHARDS.max(1);
97        let per_shard_capacity = expected_entries.div_ceil(shard_count).max(1);
98        let mut shards = Vec::with_capacity(shard_count);
99        for _ in 0..shard_count {
100            shards.push(Mutex::new(HashSet::with_capacity(per_shard_capacity)));
101        }
102        Self { shards }
103    }
104
105    #[inline]
106    fn insert(&self, hash: [u8; 32]) -> bool {
107        let shard_index = ((hash[0] as usize) << 8 | hash[1] as usize) % self.shards.len();
108        let mut shard = self.shards[shard_index].lock().unwrap();
109        shard.insert(hash)
110    }
111}
112
113/// Save a checkpoint of the workspace.
114///
115/// This is the main orchestrating function that ties together scanning,
116/// hashing, blob storage, tree building, and snapshot creation.
117pub fn save(workspace_root: &Path, options: SaveOptions) -> Result<SaveResult> {
118    // 1. Compute project_id from workspace path
119    let project_id = project_id_from_path(workspace_root);
120
121    // 2. Create StoreLayout, ensure directories exist
122    let layout = StoreLayout::new(&project_id);
123    layout.ensure_dirs()?;
124
125    // 3. Acquire project lock
126    let _lock = ProjectLock::acquire(&layout.locks_dir())?;
127    let catalog = MetadataCatalog::open(layout.catalog_path())?;
128
129    // 4. Scan workspace (respect .chkptignore)
130    let scanned_files =
131        crate::scanner::scan_workspace_with_options(workspace_root, None, options.include_deps)?;
132    emit(
133        &options.progress,
134        ProgressEvent::ScanComplete {
135            file_count: scanned_files.len() as u64,
136        },
137    );
138
139    // 5. Open/create FileIndex
140    let mut index = FileIndex::open(layout.index_path())?;
141    let cached_entries = index.entries();
142
143    // 6. Create blob store
144    let packs_dir = layout.packs_dir();
145    let has_pack_objects = store_has_pack_objects(&packs_dir)?;
146    let pack_set = has_pack_objects
147        .then(|| PackSet::open_all(&packs_dir))
148        .transpose()?;
149    let mut pack_writer = PackWriter::new(&packs_dir)?;
150
151    // 7. Process each scanned file: check index, hash, store blob
152    let mut processed_files = Vec::with_capacity(scanned_files.len());
153    let mut files_to_prepare = Vec::new();
154    let mut updated_entries = Vec::new();
155    let mut blob_locations_to_record = Vec::new();
156    let mut new_blob_records = Vec::new();
157    let mut total_bytes: u64 = 0;
158    let mut current_paths =
159        (!cached_entries.is_empty()).then(|| HashSet::with_capacity(scanned_files.len()));
160
161    for scanned in &scanned_files {
162        if let Some(paths) = current_paths.as_mut() {
163            paths.insert(scanned.relative_path.as_str());
164        }
165
166        if let Some(processed) =
167            cached_processed_file(scanned, cached_entries.get(&scanned.relative_path))
168        {
169            total_bytes += processed.size;
170            processed_files.push(processed);
171        } else {
172            files_to_prepare.push(scanned);
173        }
174    }
175    let removed_paths = current_paths
176        .map(|paths| {
177            cached_entries
178                .keys()
179                .filter(|path| !paths.contains(path.as_str()))
180                .cloned()
181                .collect::<Vec<_>>()
182        })
183        .unwrap_or_default();
184
185    let mut new_objects: u64 = 0;
186
187    // Sharded dedup set reduces lock contention during parallel hash+compress.
188    let seen_hashes = Arc::new(ShardedSeenHashes::new(files_to_prepare.len()));
189
190    // Prioritize on-disk locality to reduce random I/O during read+hash+compress.
191    sort_scanned_refs_for_locality(&mut files_to_prepare);
192
193    // Batch parallel: each thread processes a chunk independently, no channel overhead
194    let total_to_process = files_to_prepare.len() as u64;
195    emit(
196        &options.progress,
197        ProgressEvent::ProcessStart {
198            total: total_to_process,
199        },
200    );
201    let progress_counter = AtomicU64::new(0);
202    process_prepared_files_streaming(
203        &files_to_prepare,
204        &seen_hashes,
205        &options.progress,
206        &progress_counter,
207        total_to_process,
208        |prepared| {
209            let PreparedFile {
210                relative_path,
211                blob_hash_hex: _,
212                blob_hash_bytes,
213                compressed,
214                size,
215                mode,
216                mtime_secs,
217                mtime_nanos,
218                inode,
219                entry_type,
220            } = prepared;
221
222            total_bytes += size;
223            if let Some(compressed) = compressed {
224                let exists_in_pack = pack_set
225                    .as_ref()
226                    .is_some_and(|ps| ps.contains_bytes(&blob_hash_bytes));
227                if !exists_in_pack {
228                    pack_writer.add_pre_compressed_bytes(blob_hash_bytes, compressed)?;
229                    new_blob_records.push(NewBlobRecord {
230                        blob_hash: blob_hash_bytes,
231                        size,
232                    });
233                    new_objects += 1;
234                }
235            }
236
237            updated_entries.push(FileEntry {
238                path: relative_path.clone(),
239                blob_hash: blob_hash_bytes,
240                size,
241                mtime_secs,
242                mtime_nanos,
243                inode,
244                mode,
245            });
246            processed_files.push(ProcessedFile {
247                relative_path,
248                blob_hash_bytes,
249                size,
250                mode,
251                entry_type,
252            });
253            Ok(())
254        },
255    )?;
256    let new_pack_hash = if !pack_writer.is_empty() {
257        Some(pack_writer.finish()?)
258    } else {
259        None
260    };
261    if let Some(pack_hash) = new_pack_hash {
262        for blob in &new_blob_records {
263            blob_locations_to_record.push((
264                blob.blob_hash,
265                BlobLocation {
266                    pack_hash: Some(pack_hash.clone()),
267                    size: blob.size,
268                },
269            ));
270        }
271    }
272    catalog.bulk_upsert_blob_locations(&blob_locations_to_record)?;
273    emit(&options.progress, ProgressEvent::PackComplete);
274
275    // 8. Build tree bottom-up (skip if nothing changed)
276    let latest_catalog_snapshot = catalog.latest_snapshot()?;
277
278    let root_tree_hash = if new_objects == 0
279        && removed_paths.is_empty()
280        && updated_entries.is_empty()
281    {
282        if let Some(ref snapshot) = latest_catalog_snapshot {
283            // Nothing changed — reuse previous tree hash (skip build entirely)
284            root_tree_hash_for_snapshot(&catalog, snapshot, &TreeStore::new(layout.trees_dir()))?
285        } else {
286            let tree_store = TreeStore::new(layout.trees_dir());
287            let hex = build_tree(&processed_files, &tree_store)?;
288            hex_to_bytes(&hex)?
289        }
290    } else {
291        let tree_store = TreeStore::new(layout.trees_dir());
292        let hex = build_tree(&processed_files, &tree_store)?;
293        hex_to_bytes(&hex)?
294    };
295
296    // 9. Find latest snapshot for parent_snapshot_id (already fetched above)
297    let parent_snapshot_id = latest_catalog_snapshot
298        .as_ref()
299        .map(|snapshot| snapshot.id.clone());
300
301    // 10. Create Snapshot
302    let stats = SnapshotStats {
303        total_files: scanned_files.len() as u64,
304        total_bytes,
305        new_objects,
306    };
307
308    let snapshot = Snapshot::new(
309        options.message,
310        root_tree_hash,
311        parent_snapshot_id,
312        stats.clone(),
313    );
314
315    let snapshot_id = snapshot.id.clone();
316    let catalog_snapshot = CatalogSnapshot {
317        id: snapshot.id.clone(),
318        created_at: snapshot.created_at,
319        message: snapshot.message.clone(),
320        parent_snapshot_id: snapshot.parent_snapshot_id.clone(),
321        manifest_snapshot_id: None,
322        root_tree_hash: Some(root_tree_hash),
323        stats: stats.clone(),
324    };
325    let no_manifest_changes =
326        new_objects == 0 && removed_paths.is_empty() && updated_entries.is_empty();
327
328    // 11. Save snapshot
329    if no_manifest_changes {
330        let manifest_snapshot_id = latest_catalog_snapshot
331            .as_ref()
332            .map(|snapshot| {
333                snapshot
334                    .manifest_snapshot_id
335                    .as_deref()
336                    .unwrap_or(snapshot.id.as_str())
337            })
338            .unwrap_or(snapshot.id.as_str());
339        catalog.insert_snapshot_metadata_only(&catalog_snapshot, manifest_snapshot_id)?;
340    } else {
341        let mut manifest: Vec<ManifestEntry> = processed_files
342            .iter()
343            .map(|processed| ManifestEntry {
344                path: processed.relative_path.clone(),
345                blob_hash: processed.blob_hash_bytes,
346                size: processed.size,
347                mode: processed.mode,
348            })
349            .collect();
350        manifest.sort_unstable_by(|left, right| left.path.cmp(&right.path));
351        catalog.insert_snapshot(&catalog_snapshot, &manifest)?;
352    }
353
354    // 12. Update only changed index entries and remove stale paths.
355    index.apply_changes(&removed_paths, &updated_entries)?;
356
357    // 13. Lock released automatically via drop
358
359    // 14. Return SaveResult
360    Ok(SaveResult { snapshot_id, stats })
361}
362
363#[cfg_attr(not(test), allow(dead_code))]
364fn store_has_external_objects(objects_dir: &Path, packs_dir: &Path) -> Result<bool> {
365    let _ = objects_dir;
366    Ok(store_has_pack_objects(packs_dir)?)
367}
368
369fn store_has_pack_objects(packs_dir: &Path) -> Result<bool> {
370    let entries = match std::fs::read_dir(packs_dir) {
371        Ok(entries) => entries,
372        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(false),
373        Err(error) => return Err(error.into()),
374    };
375
376    for entry in entries {
377        let entry = entry?;
378        if !entry.file_type()?.is_file() {
379            continue;
380        }
381
382        let name = entry.file_name();
383        let name = name.to_string_lossy();
384        if name.starts_with("pack-") && name.ends_with(".dat") {
385            return Ok(true);
386        }
387    }
388
389    Ok(false)
390}
391
392fn root_tree_hash_for_snapshot(
393    catalog: &MetadataCatalog,
394    snapshot: &CatalogSnapshot,
395    tree_store: &TreeStore,
396) -> Result<[u8; 32]> {
397    if let Some(root_tree_hash) = snapshot.root_tree_hash {
398        return Ok(root_tree_hash);
399    }
400
401    let manifest = catalog.snapshot_manifest(&snapshot.id)?;
402    if manifest.is_empty() && snapshot.stats.total_files > 0 {
403        return Err(ChkpttError::StoreCorrupted(format!(
404            "snapshot '{}' is missing both manifest entries and root_tree_hash",
405            snapshot.id
406        )));
407    }
408    let root_tree_hex = build_tree(
409        &manifest
410            .into_iter()
411            .map(|entry| ProcessedFile {
412                relative_path: entry.path,
413                blob_hash_bytes: entry.blob_hash,
414                size: entry.size,
415                mode: entry.mode,
416                entry_type: if entry.mode & 0o170000 == 0o120000 {
417                    EntryType::Symlink
418                } else {
419                    EntryType::File
420                },
421            })
422            .collect::<Vec<_>>(),
423        tree_store,
424    )?;
425    hex_to_bytes(&root_tree_hex)
426}
427
428fn cached_processed_file(
429    scanned: &ScannedFile,
430    cached: Option<&FileEntry>,
431) -> Option<ProcessedFile> {
432    if let Some(cached) = cached {
433        if cached.mtime_secs == scanned.mtime_secs
434            && cached.mtime_nanos == scanned.mtime_nanos
435            && cached.size == scanned.size
436            && cached.inode == scanned.inode
437            && cached.mode == scanned.mode
438        {
439            return Some(ProcessedFile {
440                relative_path: scanned.relative_path.clone(),
441                blob_hash_bytes: cached.blob_hash,
442                size: scanned.size,
443                mode: scanned.mode,
444                entry_type: if scanned.is_symlink {
445                    EntryType::Symlink
446                } else {
447                    EntryType::File
448                },
449            });
450        }
451    }
452    None
453}
454
455fn prepare_file(
456    scanned: &ScannedFile,
457    seen_hashes: &ShardedSeenHashes,
458    compressor: &mut zstd::bulk::Compressor<'_>,
459) -> Result<PreparedFile> {
460    let content = read_path_bytes(&scanned.absolute_path, scanned.is_symlink)?;
461
462    // Hash (BLAKE3 is ~6 GB/s, negligible cost)
463    let hash = blake3::hash(&content);
464    let blob_hash_bytes: [u8; 32] = *hash.as_bytes();
465
466    // Only compress if this hash hasn't been seen yet (use raw bytes, no hex conversion)
467    let is_new = seen_hashes.insert(blob_hash_bytes);
468
469    let compressed = if is_new {
470        Some(compress_with_worker_context(&content, compressor)?)
471    } else {
472        None
473    };
474
475    // Defer hex conversion to caller (only needed for pack writer)
476    let blob_hash_hex = hash.to_hex().to_string();
477
478    Ok(PreparedFile {
479        relative_path: scanned.relative_path.clone(),
480        blob_hash_hex,
481        blob_hash_bytes,
482        compressed,
483        size: scanned.size,
484        mode: scanned.mode,
485        mtime_secs: scanned.mtime_secs,
486        mtime_nanos: scanned.mtime_nanos,
487        inode: scanned.inode,
488        entry_type: if scanned.is_symlink {
489            EntryType::Symlink
490        } else {
491            EntryType::File
492        },
493    })
494}
495
496fn hardlink_key(scanned: &ScannedFile) -> Option<HardlinkKey> {
497    if scanned.is_symlink {
498        return None;
499    }
500
501    Some(HardlinkKey {
502        device: scanned.device?,
503        inode: scanned.inode?,
504    })
505}
506
507fn prepare_file_with_hardlink_cache(
508    scanned: &ScannedFile,
509    seen_hashes: &ShardedSeenHashes,
510    compressor: &mut zstd::bulk::Compressor<'_>,
511    hardlinks: &mut HashMap<HardlinkKey, HardlinkPrepared>,
512) -> Result<PreparedFile> {
513    if let Some(key) = hardlink_key(scanned) {
514        if let Some(cached) = hardlinks.get(&key) {
515            return Ok(PreparedFile {
516                relative_path: scanned.relative_path.clone(),
517                blob_hash_hex: cached.blob_hash_hex.clone(),
518                blob_hash_bytes: cached.blob_hash_bytes,
519                compressed: None,
520                size: scanned.size,
521                mode: scanned.mode,
522                mtime_secs: scanned.mtime_secs,
523                mtime_nanos: scanned.mtime_nanos,
524                inode: scanned.inode,
525                entry_type: if scanned.is_symlink {
526                    EntryType::Symlink
527                } else {
528                    EntryType::File
529                },
530            });
531        }
532
533        let prepared = prepare_file(scanned, seen_hashes, compressor)?;
534        hardlinks.insert(
535            key,
536            HardlinkPrepared {
537                blob_hash_hex: prepared.blob_hash_hex.clone(),
538                blob_hash_bytes: prepared.blob_hash_bytes,
539            },
540        );
541        return Ok(prepared);
542    }
543
544    prepare_file(scanned, seen_hashes, compressor)
545}
546
547fn split_scanned_refs_preserving_hardlinks<'a>(
548    scanned_files: &'a [&'a ScannedFile],
549    worker_count: usize,
550) -> Vec<&'a [&'a ScannedFile]> {
551    if scanned_files.is_empty() {
552        return Vec::new();
553    }
554
555    let target_chunk_size = scanned_files.len().div_ceil(worker_count.max(1));
556    let mut chunks = Vec::with_capacity(worker_count.max(1));
557    let mut start = 0usize;
558
559    while start < scanned_files.len() {
560        let mut end = (start + target_chunk_size).min(scanned_files.len());
561        while end < scanned_files.len()
562            && hardlink_key(scanned_files[end - 1]) == hardlink_key(scanned_files[end])
563            && hardlink_key(scanned_files[end]).is_some()
564        {
565            end += 1;
566        }
567        chunks.push(&scanned_files[start..end]);
568        start = end;
569    }
570
571    chunks
572}
573
574fn compress_with_worker_context(
575    content: &[u8],
576    compressor: &mut zstd::bulk::Compressor<'_>,
577) -> Result<Vec<u8>> {
578    Ok(compressor.compress(content)?)
579}
580
581/// Prepare files with a bounded producer/consumer pipeline so compressed blobs
582/// can be consumed immediately instead of accumulating for the full save.
583fn process_prepared_files_streaming<F>(
584    scanned_files: &[&ScannedFile],
585    seen_hashes: &Arc<ShardedSeenHashes>,
586    progress: &ProgressCallback,
587    progress_counter: &AtomicU64,
588    total_to_process: u64,
589    mut on_prepared: F,
590) -> Result<()>
591where
592    F: FnMut(PreparedFile) -> Result<()>,
593{
594    if scanned_files.is_empty() {
595        return Ok(());
596    }
597
598    let worker_count = std::thread::available_parallelism()
599        .map(|n| n.get())
600        .unwrap_or(1)
601        .min(scanned_files.len());
602
603    if worker_count <= 1 {
604        let mut compressor = zstd::bulk::Compressor::new(1)?;
605        let mut hardlinks = HashMap::new();
606        for scanned in scanned_files {
607            let prepared = prepare_file_with_hardlink_cache(
608                scanned,
609                seen_hashes,
610                &mut compressor,
611                &mut hardlinks,
612            )?;
613            let completed = progress_counter.fetch_add(1, Ordering::Relaxed) + 1;
614            emit(
615                progress,
616                ProgressEvent::ProcessFile {
617                    completed,
618                    total: total_to_process,
619                },
620            );
621            on_prepared(prepared)?;
622        }
623        return Ok(());
624    }
625
626    let chunks = split_scanned_refs_preserving_hardlinks(scanned_files, worker_count);
627
628    std::thread::scope(|scope| -> Result<()> {
629        let (sender, receiver) = mpsc::sync_channel::<Result<PreparedFile>>(
630            PREPARED_FILE_PIPELINE_SLOTS.max(worker_count),
631        );
632
633        let handles: Vec<_> = chunks
634            .into_iter()
635            .map(|chunk| {
636                let sender = sender.clone();
637                scope.spawn(move || {
638                    let mut compressor = match zstd::bulk::Compressor::new(1) {
639                        Ok(compressor) => compressor,
640                        Err(error) => {
641                            let _ = sender.send(Err(error.into()));
642                            return;
643                        }
644                    };
645                    let mut hardlinks = HashMap::new();
646
647                    for scanned in chunk {
648                        let result = prepare_file_with_hardlink_cache(
649                            scanned,
650                            seen_hashes,
651                            &mut compressor,
652                            &mut hardlinks,
653                        );
654                        let completed = progress_counter.fetch_add(1, Ordering::Relaxed) + 1;
655                        emit(
656                            progress,
657                            ProgressEvent::ProcessFile {
658                                completed,
659                                total: total_to_process,
660                            },
661                        );
662                        if sender.send(result).is_err() {
663                            return;
664                        }
665                    }
666                })
667            })
668            .collect();
669        drop(sender);
670
671        let mut consumer_result = Ok(());
672        loop {
673            match receiver.recv() {
674                Ok(Ok(prepared)) => {
675                    if let Err(error) = on_prepared(prepared) {
676                        consumer_result = Err(error);
677                        break;
678                    }
679                }
680                Ok(Err(error)) => {
681                    consumer_result = Err(error);
682                    break;
683                }
684                Err(_) => break,
685            }
686        }
687        drop(receiver);
688
689        for handle in handles {
690            handle.join().unwrap();
691        }
692
693        consumer_result
694    })
695}
696
697/// Build tree structure bottom-up from processed files.
698///
699/// Groups files by their directory path, creates tree entries for each file,
700/// recursively builds subtrees for subdirectories, and returns the root tree hash.
701fn build_tree(processed_files: &[ProcessedFile], tree_store: &TreeStore) -> Result<String> {
702    // Group files by parent directory
703    let mut dir_files: BTreeMap<String, Vec<&ProcessedFile>> = BTreeMap::new();
704    let mut all_dirs: BTreeSet<String> = BTreeSet::new();
705    let mut child_dirs: BTreeMap<String, Vec<String>> = BTreeMap::new();
706    all_dirs.insert(String::new()); // root always exists
707
708    for pf in processed_files {
709        let parent = if let Some(pos) = pf.relative_path.rfind('/') {
710            pf.relative_path[..pos].to_string()
711        } else {
712            String::new()
713        };
714        dir_files.entry(parent.clone()).or_default().push(pf);
715        register_directory_hierarchy(&parent, &mut all_dirs, &mut child_dirs);
716    }
717
718    // Sort directories bottom-up (deepest first)
719    let mut dir_list: Vec<String> = all_dirs.into_iter().collect();
720    dir_list.sort_unstable_by(|a, b| {
721        let depth_a = if a.is_empty() {
722            0
723        } else {
724            a.matches('/').count() + 1
725        };
726        let depth_b = if b.is_empty() {
727            0
728        } else {
729            b.matches('/').count() + 1
730        };
731        depth_b.cmp(&depth_a).then_with(|| a.cmp(b))
732    });
733
734    // Phase 1: Compute all tree hashes and encoded data in memory
735    let mut dir_hashes: BTreeMap<String, String> = BTreeMap::new();
736    let mut pack_entries: Vec<(String, Vec<u8>)> = Vec::with_capacity(dir_list.len());
737    let mut known_hashes: HashSet<[u8; 32]> = HashSet::with_capacity(dir_list.len());
738
739    for dir in &dir_list {
740        let file_count = dir_files.get(dir).map(|files| files.len()).unwrap_or(0);
741        let child_count = child_dirs
742            .get(dir)
743            .map(|children| children.len())
744            .unwrap_or(0);
745        let mut entries: Vec<TreeEntry> = Vec::with_capacity(file_count + child_count);
746
747        if let Some(files) = dir_files.get(dir) {
748            for pf in files {
749                let name = if let Some(pos) = pf.relative_path.rfind('/') {
750                    pf.relative_path[pos + 1..].to_string()
751                } else {
752                    pf.relative_path.clone()
753                };
754                entries.push(TreeEntry {
755                    name,
756                    entry_type: pf.entry_type,
757                    hash: pf.blob_hash_bytes,
758                    size: pf.size,
759                    mode: pf.mode,
760                });
761            }
762        }
763
764        if let Some(children) = child_dirs.get(dir) {
765            for sub_dir in children {
766                let sub_hash = dir_hashes.get(sub_dir).ok_or_else(|| {
767                    ChkpttError::Other(format!("Missing tree hash for directory '{}'", sub_dir))
768                })?;
769                let sub_name = if let Some(pos) = sub_dir.rfind('/') {
770                    sub_dir[pos + 1..].to_string()
771                } else {
772                    sub_dir.clone()
773                };
774                entries.push(TreeEntry {
775                    name: sub_name,
776                    entry_type: EntryType::Dir,
777                    hash: hex_to_bytes(sub_hash)?,
778                    size: 0,
779                    mode: 0o040755,
780                });
781            }
782        }
783
784        entries.sort_unstable_by(|a, b| a.name.cmp(&b.name));
785        let encoded = bitcode::encode(&entries);
786        let hash = blake3::hash(&encoded);
787        let hash_bytes = *hash.as_bytes();
788        let hash_hex = hash.to_hex().to_string();
789
790        dir_hashes.insert(dir.clone(), hash_hex.clone());
791        if known_hashes.insert(hash_bytes) {
792            pack_entries.push((hash_hex, encoded));
793        }
794    }
795
796    // Phase 2: Write all trees to a single pack file (1 write instead of 37K+)
797    tree_store.write_pack(&pack_entries)?;
798
799    dir_hashes
800        .get("")
801        .cloned()
802        .ok_or_else(|| ChkpttError::Other("Failed to build root tree".into()))
803}
804
805fn register_directory_hierarchy(
806    dir: &str,
807    all_dirs: &mut BTreeSet<String>,
808    child_dirs: &mut BTreeMap<String, Vec<String>>,
809) {
810    if dir.is_empty() {
811        return;
812    }
813
814    let mut parent = String::new();
815    let mut current = String::with_capacity(dir.len());
816    for segment in dir.split('/') {
817        if !current.is_empty() {
818            current.push('/');
819        }
820        current.push_str(segment);
821
822        if all_dirs.insert(current.clone()) {
823            child_dirs
824                .entry(parent.clone())
825                .or_default()
826                .push(current.clone());
827        }
828        parent.clear();
829        parent.push_str(&current);
830    }
831}
832
833#[cfg(test)]
834mod tests {
835    use super::*;
836    use std::fs;
837    use std::sync::atomic::{AtomicUsize, Ordering};
838    use std::sync::Arc;
839    use tempfile::TempDir;
840
841    #[test]
842    fn test_hex_to_bytes() {
843        let hex = "a3b2c1d4e5f60718293a4b5c6d7e8f90a1b2c3d4e5f60718293a4b5c6d7e8f90";
844        let bytes = hex_to_bytes(hex).unwrap();
845        assert_eq!(bytes[0], 0xa3);
846        assert_eq!(bytes[1], 0xb2);
847        assert_eq!(bytes[31], 0x90);
848    }
849
850    #[test]
851    fn test_hex_to_bytes_invalid_length() {
852        assert!(hex_to_bytes("abc").is_err());
853    }
854
855    #[test]
856    fn test_store_has_external_objects_false_for_empty_store() {
857        let dir = TempDir::new().unwrap();
858        let objects_dir = dir.path().join("objects");
859        let packs_dir = dir.path().join("packs");
860        std::fs::create_dir_all(&objects_dir).unwrap();
861        std::fs::create_dir_all(&packs_dir).unwrap();
862
863        assert!(!store_has_external_objects(&objects_dir, &packs_dir).unwrap());
864    }
865
866    #[test]
867    fn test_store_has_external_objects_true_for_pack_objects() {
868        let dir = TempDir::new().unwrap();
869        let objects_dir = dir.path().join("objects");
870        let packs_dir = dir.path().join("packs");
871        std::fs::create_dir_all(&packs_dir).unwrap();
872        std::fs::write(packs_dir.join("pack-demo.dat"), b"pack").unwrap();
873
874        assert!(store_has_external_objects(&objects_dir, &packs_dir).unwrap());
875    }
876
877    #[test]
878    fn test_compress_with_worker_context_roundtrip() {
879        let content = b"compression-context-roundtrip-data";
880        let mut compressor = zstd::bulk::Compressor::new(1).unwrap();
881
882        let compressed = compress_with_worker_context(content, &mut compressor).unwrap();
883        let decompressed = zstd::decode_all(&compressed[..]).unwrap();
884
885        assert_eq!(decompressed, content);
886    }
887
888    #[test]
889    fn test_sharded_seen_hashes_dedups_single_thread() {
890        let seen = ShardedSeenHashes::new(8);
891        let hash = [7u8; 32];
892
893        assert!(seen.insert(hash));
894        assert!(!seen.insert(hash));
895    }
896
897    #[test]
898    fn test_sharded_seen_hashes_dedups_multi_thread() {
899        let seen = Arc::new(ShardedSeenHashes::new(1024));
900        let unique_inserts = Arc::new(AtomicUsize::new(0));
901        let duplicate_hash = [9u8; 32];
902
903        std::thread::scope(|scope| {
904            for i in 0..8u8 {
905                let seen = Arc::clone(&seen);
906                let unique_inserts = Arc::clone(&unique_inserts);
907                scope.spawn(move || {
908                    if seen.insert(duplicate_hash) {
909                        unique_inserts.fetch_add(1, Ordering::Relaxed);
910                    }
911                    let mut unique_hash = [0u8; 32];
912                    unique_hash[0] = i;
913                    if seen.insert(unique_hash) {
914                        unique_inserts.fetch_add(1, Ordering::Relaxed);
915                    }
916                });
917            }
918        });
919
920        assert_eq!(unique_inserts.load(Ordering::Relaxed), 9);
921    }
922
923    #[test]
924    fn test_split_scanned_refs_preserves_hardlink_groups() {
925        let f1 = scanned("a.txt", Some(1));
926        let f2 = scanned("b.txt", Some(1));
927        let f3 = scanned("c.txt", Some(2));
928        let f4 = scanned("d.txt", Some(3));
929        let f5 = scanned("e.txt", Some(3));
930        let refs = vec![&f1, &f2, &f3, &f4, &f5];
931
932        let chunks = split_scanned_refs_preserving_hardlinks(&refs, 2);
933        let paths: Vec<Vec<&str>> = chunks
934            .into_iter()
935            .map(|chunk| {
936                chunk
937                    .iter()
938                    .map(|file| file.relative_path.as_str())
939                    .collect::<Vec<_>>()
940            })
941            .collect();
942
943        assert_eq!(
944            paths,
945            vec![vec!["a.txt", "b.txt", "c.txt"], vec!["d.txt", "e.txt"]]
946        );
947    }
948
949    #[cfg(unix)]
950    #[test]
951    fn test_prepare_file_with_hardlink_cache_reuses_existing_read() {
952        let dir = TempDir::new().unwrap();
953        let original = dir.path().join("original.txt");
954        let alias = dir.path().join("alias.txt");
955        fs::write(&original, "same-content").unwrap();
956        fs::hard_link(&original, &alias).unwrap();
957
958        let original_scanned = scanned_from_path("original.txt", &original);
959        let alias_scanned = scanned_from_path("alias.txt", &alias);
960        let seen_hashes = ShardedSeenHashes::new(2);
961        let mut compressor = zstd::bulk::Compressor::new(1).unwrap();
962        let mut hardlinks = HashMap::new();
963
964        let first = prepare_file_with_hardlink_cache(
965            &original_scanned,
966            &seen_hashes,
967            &mut compressor,
968            &mut hardlinks,
969        )
970        .unwrap();
971        let second = prepare_file_with_hardlink_cache(
972            &alias_scanned,
973            &seen_hashes,
974            &mut compressor,
975            &mut hardlinks,
976        )
977        .unwrap();
978
979        assert!(first.compressed.is_some());
980        assert!(second.compressed.is_none());
981        assert_eq!(first.blob_hash_bytes, second.blob_hash_bytes);
982        assert_eq!(first.blob_hash_hex, second.blob_hash_hex);
983    }
984
985    fn scanned(relative_path: &str, inode: Option<u64>) -> ScannedFile {
986        ScannedFile {
987            relative_path: relative_path.to_string(),
988            absolute_path: std::path::PathBuf::from(relative_path),
989            size: 1,
990            mtime_secs: 1,
991            mtime_nanos: 1,
992            device: Some(1),
993            inode,
994            mode: 0o100644,
995            is_symlink: false,
996        }
997    }
998
999    #[cfg(unix)]
1000    fn scanned_from_path(relative_path: &str, path: &Path) -> ScannedFile {
1001        use std::os::unix::fs::MetadataExt;
1002
1003        let metadata = fs::metadata(path).unwrap();
1004        ScannedFile {
1005            relative_path: relative_path.to_string(),
1006            absolute_path: path.to_path_buf(),
1007            size: metadata.len(),
1008            mtime_secs: metadata.mtime(),
1009            mtime_nanos: metadata.mtime_nsec(),
1010            device: Some(metadata.dev()),
1011            inode: Some(metadata.ino()),
1012            mode: metadata.mode(),
1013            is_symlink: metadata.file_type().is_symlink(),
1014        }
1015    }
1016}