1use crate::config::{project_id_from_path, StoreLayout};
2use crate::error::{ChkpttError, Result};
3use crate::index::{FileEntry, FileIndex};
4use crate::ops::io_order::sort_scanned_refs_for_locality;
5use crate::ops::lock::ProjectLock;
6use crate::scanner::ScannedFile;
7use crate::store::blob::read_path_bytes;
8use crate::store::catalog::{BlobLocation, CatalogSnapshot, ManifestEntry, MetadataCatalog};
9use crate::store::pack::{PackSet, PackWriter};
10use crate::store::snapshot::{Snapshot, SnapshotStats};
11use crate::store::tree::{EntryType, TreeEntry, TreeStore};
12use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
13use std::path::Path;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{mpsc, Arc, Mutex};
16
17use crate::ops::progress::{emit, ProgressCallback, ProgressEvent};
18
19#[derive(Default)]
20pub struct SaveOptions {
21 pub message: Option<String>,
22 pub include_deps: bool,
23 pub progress: ProgressCallback,
24}
25
26#[derive(Debug)]
27pub struct SaveResult {
28 pub snapshot_id: String,
29 pub stats: SnapshotStats,
30}
31
32fn hex_to_bytes(hex: &str) -> Result<[u8; 32]> {
34 let mut bytes = [0u8; 32];
35 if hex.len() != 64 {
36 return Err(ChkpttError::Other(format!(
37 "Invalid hash length: {}",
38 hex.len()
39 )));
40 }
41 for i in 0..32 {
42 bytes[i] = u8::from_str_radix(&hex[i * 2..i * 2 + 2], 16)
43 .map_err(|_| ChkpttError::Other("Invalid hex".into()))?;
44 }
45 Ok(bytes)
46}
47
48struct ProcessedFile {
50 relative_path: String,
51 blob_hash_bytes: [u8; 32],
52 size: u64,
53 mode: u32,
54 entry_type: EntryType,
55}
56
57struct PreparedFile {
58 relative_path: String,
59 blob_hash_hex: String,
60 blob_hash_bytes: [u8; 32],
61 compressed: Option<Vec<u8>>,
63 size: u64,
64 mode: u32,
65 mtime_secs: i64,
66 mtime_nanos: i64,
67 inode: Option<u64>,
68 entry_type: EntryType,
69}
70
71struct NewBlobRecord {
72 blob_hash: [u8; 32],
73 size: u64,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
77struct HardlinkKey {
78 device: u64,
79 inode: u64,
80}
81
82#[derive(Debug, Clone)]
83struct HardlinkPrepared {
84 blob_hash_hex: String,
85 blob_hash_bytes: [u8; 32],
86}
87
88const SEEN_HASH_SHARDS: usize = 64;
89const PREPARED_FILE_PIPELINE_SLOTS: usize = 64;
90struct ShardedSeenHashes {
91 shards: Vec<Mutex<HashSet<[u8; 32]>>>,
92}
93
94impl ShardedSeenHashes {
95 fn new(expected_entries: usize) -> Self {
96 let shard_count = SEEN_HASH_SHARDS.max(1);
97 let per_shard_capacity = expected_entries.div_ceil(shard_count).max(1);
98 let mut shards = Vec::with_capacity(shard_count);
99 for _ in 0..shard_count {
100 shards.push(Mutex::new(HashSet::with_capacity(per_shard_capacity)));
101 }
102 Self { shards }
103 }
104
105 #[inline]
106 fn insert(&self, hash: [u8; 32]) -> bool {
107 let shard_index = ((hash[0] as usize) << 8 | hash[1] as usize) % self.shards.len();
108 let mut shard = self.shards[shard_index].lock().unwrap();
109 shard.insert(hash)
110 }
111}
112
113pub fn save(workspace_root: &Path, options: SaveOptions) -> Result<SaveResult> {
118 let project_id = project_id_from_path(workspace_root);
120
121 let layout = StoreLayout::new(&project_id);
123 layout.ensure_dirs()?;
124
125 let _lock = ProjectLock::acquire(&layout.locks_dir())?;
127 let catalog = MetadataCatalog::open(layout.catalog_path())?;
128
129 let scanned_files =
131 crate::scanner::scan_workspace_with_options(workspace_root, None, options.include_deps)?;
132 emit(
133 &options.progress,
134 ProgressEvent::ScanComplete {
135 file_count: scanned_files.len() as u64,
136 },
137 );
138
139 let mut index = FileIndex::open(layout.index_path())?;
141 let cached_entries = index.entries();
142
143 let packs_dir = layout.packs_dir();
145 let has_pack_objects = store_has_pack_objects(&packs_dir)?;
146 let pack_set = has_pack_objects
147 .then(|| PackSet::open_all(&packs_dir))
148 .transpose()?;
149 let mut pack_writer = PackWriter::new(&packs_dir)?;
150
151 let mut processed_files = Vec::with_capacity(scanned_files.len());
153 let mut files_to_prepare = Vec::new();
154 let mut updated_entries = Vec::new();
155 let mut blob_locations_to_record = Vec::new();
156 let mut new_blob_records = Vec::new();
157 let mut total_bytes: u64 = 0;
158 let mut current_paths =
159 (!cached_entries.is_empty()).then(|| HashSet::with_capacity(scanned_files.len()));
160
161 for scanned in &scanned_files {
162 if let Some(paths) = current_paths.as_mut() {
163 paths.insert(scanned.relative_path.as_str());
164 }
165
166 if let Some(processed) =
167 cached_processed_file(scanned, cached_entries.get(&scanned.relative_path))
168 {
169 total_bytes += processed.size;
170 processed_files.push(processed);
171 } else {
172 files_to_prepare.push(scanned);
173 }
174 }
175 let removed_paths = current_paths
176 .map(|paths| {
177 cached_entries
178 .keys()
179 .filter(|path| !paths.contains(path.as_str()))
180 .cloned()
181 .collect::<Vec<_>>()
182 })
183 .unwrap_or_default();
184
185 let mut new_objects: u64 = 0;
186
187 let seen_hashes = Arc::new(ShardedSeenHashes::new(files_to_prepare.len()));
189
190 sort_scanned_refs_for_locality(&mut files_to_prepare);
192
193 let total_to_process = files_to_prepare.len() as u64;
195 emit(
196 &options.progress,
197 ProgressEvent::ProcessStart {
198 total: total_to_process,
199 },
200 );
201 let progress_counter = AtomicU64::new(0);
202 process_prepared_files_streaming(
203 &files_to_prepare,
204 &seen_hashes,
205 &options.progress,
206 &progress_counter,
207 total_to_process,
208 |prepared| {
209 let PreparedFile {
210 relative_path,
211 blob_hash_hex: _,
212 blob_hash_bytes,
213 compressed,
214 size,
215 mode,
216 mtime_secs,
217 mtime_nanos,
218 inode,
219 entry_type,
220 } = prepared;
221
222 total_bytes += size;
223 if let Some(compressed) = compressed {
224 let exists_in_pack = pack_set
225 .as_ref()
226 .is_some_and(|ps| ps.contains_bytes(&blob_hash_bytes));
227 if !exists_in_pack {
228 pack_writer.add_pre_compressed_bytes(blob_hash_bytes, compressed)?;
229 new_blob_records.push(NewBlobRecord {
230 blob_hash: blob_hash_bytes,
231 size,
232 });
233 new_objects += 1;
234 }
235 }
236
237 updated_entries.push(FileEntry {
238 path: relative_path.clone(),
239 blob_hash: blob_hash_bytes,
240 size,
241 mtime_secs,
242 mtime_nanos,
243 inode,
244 mode,
245 });
246 processed_files.push(ProcessedFile {
247 relative_path,
248 blob_hash_bytes,
249 size,
250 mode,
251 entry_type,
252 });
253 Ok(())
254 },
255 )?;
256 let new_pack_hash = if !pack_writer.is_empty() {
257 Some(pack_writer.finish()?)
258 } else {
259 None
260 };
261 if let Some(pack_hash) = new_pack_hash {
262 for blob in &new_blob_records {
263 blob_locations_to_record.push((
264 blob.blob_hash,
265 BlobLocation {
266 pack_hash: Some(pack_hash.clone()),
267 size: blob.size,
268 },
269 ));
270 }
271 }
272 catalog.bulk_upsert_blob_locations(&blob_locations_to_record)?;
273 emit(&options.progress, ProgressEvent::PackComplete);
274
275 let latest_catalog_snapshot = catalog.latest_snapshot()?;
277
278 let root_tree_hash = if new_objects == 0
279 && removed_paths.is_empty()
280 && updated_entries.is_empty()
281 {
282 if let Some(ref snapshot) = latest_catalog_snapshot {
283 root_tree_hash_for_snapshot(&catalog, snapshot, &TreeStore::new(layout.trees_dir()))?
285 } else {
286 let tree_store = TreeStore::new(layout.trees_dir());
287 let hex = build_tree(&processed_files, &tree_store)?;
288 hex_to_bytes(&hex)?
289 }
290 } else {
291 let tree_store = TreeStore::new(layout.trees_dir());
292 let hex = build_tree(&processed_files, &tree_store)?;
293 hex_to_bytes(&hex)?
294 };
295
296 let parent_snapshot_id = latest_catalog_snapshot
298 .as_ref()
299 .map(|snapshot| snapshot.id.clone());
300
301 let stats = SnapshotStats {
303 total_files: scanned_files.len() as u64,
304 total_bytes,
305 new_objects,
306 };
307
308 let snapshot = Snapshot::new(
309 options.message,
310 root_tree_hash,
311 parent_snapshot_id,
312 stats.clone(),
313 );
314
315 let snapshot_id = snapshot.id.clone();
316 let catalog_snapshot = CatalogSnapshot {
317 id: snapshot.id.clone(),
318 created_at: snapshot.created_at,
319 message: snapshot.message.clone(),
320 parent_snapshot_id: snapshot.parent_snapshot_id.clone(),
321 manifest_snapshot_id: None,
322 root_tree_hash: Some(root_tree_hash),
323 stats: stats.clone(),
324 };
325 let no_manifest_changes =
326 new_objects == 0 && removed_paths.is_empty() && updated_entries.is_empty();
327
328 if no_manifest_changes {
330 let manifest_snapshot_id = latest_catalog_snapshot
331 .as_ref()
332 .map(|snapshot| {
333 snapshot
334 .manifest_snapshot_id
335 .as_deref()
336 .unwrap_or(snapshot.id.as_str())
337 })
338 .unwrap_or(snapshot.id.as_str());
339 catalog.insert_snapshot_metadata_only(&catalog_snapshot, manifest_snapshot_id)?;
340 } else {
341 let mut manifest: Vec<ManifestEntry> = processed_files
342 .iter()
343 .map(|processed| ManifestEntry {
344 path: processed.relative_path.clone(),
345 blob_hash: processed.blob_hash_bytes,
346 size: processed.size,
347 mode: processed.mode,
348 })
349 .collect();
350 manifest.sort_unstable_by(|left, right| left.path.cmp(&right.path));
351 catalog.insert_snapshot(&catalog_snapshot, &manifest)?;
352 }
353
354 index.apply_changes(&removed_paths, &updated_entries)?;
356
357 Ok(SaveResult { snapshot_id, stats })
361}
362
363#[cfg_attr(not(test), allow(dead_code))]
364fn store_has_external_objects(objects_dir: &Path, packs_dir: &Path) -> Result<bool> {
365 let _ = objects_dir;
366 Ok(store_has_pack_objects(packs_dir)?)
367}
368
369fn store_has_pack_objects(packs_dir: &Path) -> Result<bool> {
370 let entries = match std::fs::read_dir(packs_dir) {
371 Ok(entries) => entries,
372 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(false),
373 Err(error) => return Err(error.into()),
374 };
375
376 for entry in entries {
377 let entry = entry?;
378 if !entry.file_type()?.is_file() {
379 continue;
380 }
381
382 let name = entry.file_name();
383 let name = name.to_string_lossy();
384 if name.starts_with("pack-") && name.ends_with(".dat") {
385 return Ok(true);
386 }
387 }
388
389 Ok(false)
390}
391
392fn root_tree_hash_for_snapshot(
393 catalog: &MetadataCatalog,
394 snapshot: &CatalogSnapshot,
395 tree_store: &TreeStore,
396) -> Result<[u8; 32]> {
397 if let Some(root_tree_hash) = snapshot.root_tree_hash {
398 return Ok(root_tree_hash);
399 }
400
401 let manifest = catalog.snapshot_manifest(&snapshot.id)?;
402 if manifest.is_empty() && snapshot.stats.total_files > 0 {
403 return Err(ChkpttError::StoreCorrupted(format!(
404 "snapshot '{}' is missing both manifest entries and root_tree_hash",
405 snapshot.id
406 )));
407 }
408 let root_tree_hex = build_tree(
409 &manifest
410 .into_iter()
411 .map(|entry| ProcessedFile {
412 relative_path: entry.path,
413 blob_hash_bytes: entry.blob_hash,
414 size: entry.size,
415 mode: entry.mode,
416 entry_type: if entry.mode & 0o170000 == 0o120000 {
417 EntryType::Symlink
418 } else {
419 EntryType::File
420 },
421 })
422 .collect::<Vec<_>>(),
423 tree_store,
424 )?;
425 hex_to_bytes(&root_tree_hex)
426}
427
428fn cached_processed_file(
429 scanned: &ScannedFile,
430 cached: Option<&FileEntry>,
431) -> Option<ProcessedFile> {
432 if let Some(cached) = cached {
433 if cached.mtime_secs == scanned.mtime_secs
434 && cached.mtime_nanos == scanned.mtime_nanos
435 && cached.size == scanned.size
436 && cached.inode == scanned.inode
437 && cached.mode == scanned.mode
438 {
439 return Some(ProcessedFile {
440 relative_path: scanned.relative_path.clone(),
441 blob_hash_bytes: cached.blob_hash,
442 size: scanned.size,
443 mode: scanned.mode,
444 entry_type: if scanned.is_symlink {
445 EntryType::Symlink
446 } else {
447 EntryType::File
448 },
449 });
450 }
451 }
452 None
453}
454
455fn prepare_file(
456 scanned: &ScannedFile,
457 seen_hashes: &ShardedSeenHashes,
458 compressor: &mut zstd::bulk::Compressor<'_>,
459) -> Result<PreparedFile> {
460 let content = read_path_bytes(&scanned.absolute_path, scanned.is_symlink)?;
461
462 let hash = blake3::hash(&content);
464 let blob_hash_bytes: [u8; 32] = *hash.as_bytes();
465
466 let is_new = seen_hashes.insert(blob_hash_bytes);
468
469 let compressed = if is_new {
470 Some(compress_with_worker_context(&content, compressor)?)
471 } else {
472 None
473 };
474
475 let blob_hash_hex = hash.to_hex().to_string();
477
478 Ok(PreparedFile {
479 relative_path: scanned.relative_path.clone(),
480 blob_hash_hex,
481 blob_hash_bytes,
482 compressed,
483 size: scanned.size,
484 mode: scanned.mode,
485 mtime_secs: scanned.mtime_secs,
486 mtime_nanos: scanned.mtime_nanos,
487 inode: scanned.inode,
488 entry_type: if scanned.is_symlink {
489 EntryType::Symlink
490 } else {
491 EntryType::File
492 },
493 })
494}
495
496fn hardlink_key(scanned: &ScannedFile) -> Option<HardlinkKey> {
497 if scanned.is_symlink {
498 return None;
499 }
500
501 Some(HardlinkKey {
502 device: scanned.device?,
503 inode: scanned.inode?,
504 })
505}
506
507fn prepare_file_with_hardlink_cache(
508 scanned: &ScannedFile,
509 seen_hashes: &ShardedSeenHashes,
510 compressor: &mut zstd::bulk::Compressor<'_>,
511 hardlinks: &mut HashMap<HardlinkKey, HardlinkPrepared>,
512) -> Result<PreparedFile> {
513 if let Some(key) = hardlink_key(scanned) {
514 if let Some(cached) = hardlinks.get(&key) {
515 return Ok(PreparedFile {
516 relative_path: scanned.relative_path.clone(),
517 blob_hash_hex: cached.blob_hash_hex.clone(),
518 blob_hash_bytes: cached.blob_hash_bytes,
519 compressed: None,
520 size: scanned.size,
521 mode: scanned.mode,
522 mtime_secs: scanned.mtime_secs,
523 mtime_nanos: scanned.mtime_nanos,
524 inode: scanned.inode,
525 entry_type: if scanned.is_symlink {
526 EntryType::Symlink
527 } else {
528 EntryType::File
529 },
530 });
531 }
532
533 let prepared = prepare_file(scanned, seen_hashes, compressor)?;
534 hardlinks.insert(
535 key,
536 HardlinkPrepared {
537 blob_hash_hex: prepared.blob_hash_hex.clone(),
538 blob_hash_bytes: prepared.blob_hash_bytes,
539 },
540 );
541 return Ok(prepared);
542 }
543
544 prepare_file(scanned, seen_hashes, compressor)
545}
546
547fn split_scanned_refs_preserving_hardlinks<'a>(
548 scanned_files: &'a [&'a ScannedFile],
549 worker_count: usize,
550) -> Vec<&'a [&'a ScannedFile]> {
551 if scanned_files.is_empty() {
552 return Vec::new();
553 }
554
555 let target_chunk_size = scanned_files.len().div_ceil(worker_count.max(1));
556 let mut chunks = Vec::with_capacity(worker_count.max(1));
557 let mut start = 0usize;
558
559 while start < scanned_files.len() {
560 let mut end = (start + target_chunk_size).min(scanned_files.len());
561 while end < scanned_files.len()
562 && hardlink_key(scanned_files[end - 1]) == hardlink_key(scanned_files[end])
563 && hardlink_key(scanned_files[end]).is_some()
564 {
565 end += 1;
566 }
567 chunks.push(&scanned_files[start..end]);
568 start = end;
569 }
570
571 chunks
572}
573
574fn compress_with_worker_context(
575 content: &[u8],
576 compressor: &mut zstd::bulk::Compressor<'_>,
577) -> Result<Vec<u8>> {
578 Ok(compressor.compress(content)?)
579}
580
581fn process_prepared_files_streaming<F>(
584 scanned_files: &[&ScannedFile],
585 seen_hashes: &Arc<ShardedSeenHashes>,
586 progress: &ProgressCallback,
587 progress_counter: &AtomicU64,
588 total_to_process: u64,
589 mut on_prepared: F,
590) -> Result<()>
591where
592 F: FnMut(PreparedFile) -> Result<()>,
593{
594 if scanned_files.is_empty() {
595 return Ok(());
596 }
597
598 let worker_count = std::thread::available_parallelism()
599 .map(|n| n.get())
600 .unwrap_or(1)
601 .min(scanned_files.len());
602
603 if worker_count <= 1 {
604 let mut compressor = zstd::bulk::Compressor::new(1)?;
605 let mut hardlinks = HashMap::new();
606 for scanned in scanned_files {
607 let prepared = prepare_file_with_hardlink_cache(
608 scanned,
609 seen_hashes,
610 &mut compressor,
611 &mut hardlinks,
612 )?;
613 let completed = progress_counter.fetch_add(1, Ordering::Relaxed) + 1;
614 emit(
615 progress,
616 ProgressEvent::ProcessFile {
617 completed,
618 total: total_to_process,
619 },
620 );
621 on_prepared(prepared)?;
622 }
623 return Ok(());
624 }
625
626 let chunks = split_scanned_refs_preserving_hardlinks(scanned_files, worker_count);
627
628 std::thread::scope(|scope| -> Result<()> {
629 let (sender, receiver) = mpsc::sync_channel::<Result<PreparedFile>>(
630 PREPARED_FILE_PIPELINE_SLOTS.max(worker_count),
631 );
632
633 let handles: Vec<_> = chunks
634 .into_iter()
635 .map(|chunk| {
636 let sender = sender.clone();
637 scope.spawn(move || {
638 let mut compressor = match zstd::bulk::Compressor::new(1) {
639 Ok(compressor) => compressor,
640 Err(error) => {
641 let _ = sender.send(Err(error.into()));
642 return;
643 }
644 };
645 let mut hardlinks = HashMap::new();
646
647 for scanned in chunk {
648 let result = prepare_file_with_hardlink_cache(
649 scanned,
650 seen_hashes,
651 &mut compressor,
652 &mut hardlinks,
653 );
654 let completed = progress_counter.fetch_add(1, Ordering::Relaxed) + 1;
655 emit(
656 progress,
657 ProgressEvent::ProcessFile {
658 completed,
659 total: total_to_process,
660 },
661 );
662 if sender.send(result).is_err() {
663 return;
664 }
665 }
666 })
667 })
668 .collect();
669 drop(sender);
670
671 let mut consumer_result = Ok(());
672 loop {
673 match receiver.recv() {
674 Ok(Ok(prepared)) => {
675 if let Err(error) = on_prepared(prepared) {
676 consumer_result = Err(error);
677 break;
678 }
679 }
680 Ok(Err(error)) => {
681 consumer_result = Err(error);
682 break;
683 }
684 Err(_) => break,
685 }
686 }
687 drop(receiver);
688
689 for handle in handles {
690 handle.join().unwrap();
691 }
692
693 consumer_result
694 })
695}
696
697fn build_tree(processed_files: &[ProcessedFile], tree_store: &TreeStore) -> Result<String> {
702 let mut dir_files: BTreeMap<String, Vec<&ProcessedFile>> = BTreeMap::new();
704 let mut all_dirs: BTreeSet<String> = BTreeSet::new();
705 let mut child_dirs: BTreeMap<String, Vec<String>> = BTreeMap::new();
706 all_dirs.insert(String::new()); for pf in processed_files {
709 let parent = if let Some(pos) = pf.relative_path.rfind('/') {
710 pf.relative_path[..pos].to_string()
711 } else {
712 String::new()
713 };
714 dir_files.entry(parent.clone()).or_default().push(pf);
715 register_directory_hierarchy(&parent, &mut all_dirs, &mut child_dirs);
716 }
717
718 let mut dir_list: Vec<String> = all_dirs.into_iter().collect();
720 dir_list.sort_unstable_by(|a, b| {
721 let depth_a = if a.is_empty() {
722 0
723 } else {
724 a.matches('/').count() + 1
725 };
726 let depth_b = if b.is_empty() {
727 0
728 } else {
729 b.matches('/').count() + 1
730 };
731 depth_b.cmp(&depth_a).then_with(|| a.cmp(b))
732 });
733
734 let mut dir_hashes: BTreeMap<String, String> = BTreeMap::new();
736 let mut pack_entries: Vec<(String, Vec<u8>)> = Vec::with_capacity(dir_list.len());
737 let mut known_hashes: HashSet<[u8; 32]> = HashSet::with_capacity(dir_list.len());
738
739 for dir in &dir_list {
740 let file_count = dir_files.get(dir).map(|files| files.len()).unwrap_or(0);
741 let child_count = child_dirs
742 .get(dir)
743 .map(|children| children.len())
744 .unwrap_or(0);
745 let mut entries: Vec<TreeEntry> = Vec::with_capacity(file_count + child_count);
746
747 if let Some(files) = dir_files.get(dir) {
748 for pf in files {
749 let name = if let Some(pos) = pf.relative_path.rfind('/') {
750 pf.relative_path[pos + 1..].to_string()
751 } else {
752 pf.relative_path.clone()
753 };
754 entries.push(TreeEntry {
755 name,
756 entry_type: pf.entry_type,
757 hash: pf.blob_hash_bytes,
758 size: pf.size,
759 mode: pf.mode,
760 });
761 }
762 }
763
764 if let Some(children) = child_dirs.get(dir) {
765 for sub_dir in children {
766 let sub_hash = dir_hashes.get(sub_dir).ok_or_else(|| {
767 ChkpttError::Other(format!("Missing tree hash for directory '{}'", sub_dir))
768 })?;
769 let sub_name = if let Some(pos) = sub_dir.rfind('/') {
770 sub_dir[pos + 1..].to_string()
771 } else {
772 sub_dir.clone()
773 };
774 entries.push(TreeEntry {
775 name: sub_name,
776 entry_type: EntryType::Dir,
777 hash: hex_to_bytes(sub_hash)?,
778 size: 0,
779 mode: 0o040755,
780 });
781 }
782 }
783
784 entries.sort_unstable_by(|a, b| a.name.cmp(&b.name));
785 let encoded = bitcode::encode(&entries);
786 let hash = blake3::hash(&encoded);
787 let hash_bytes = *hash.as_bytes();
788 let hash_hex = hash.to_hex().to_string();
789
790 dir_hashes.insert(dir.clone(), hash_hex.clone());
791 if known_hashes.insert(hash_bytes) {
792 pack_entries.push((hash_hex, encoded));
793 }
794 }
795
796 tree_store.write_pack(&pack_entries)?;
798
799 dir_hashes
800 .get("")
801 .cloned()
802 .ok_or_else(|| ChkpttError::Other("Failed to build root tree".into()))
803}
804
805fn register_directory_hierarchy(
806 dir: &str,
807 all_dirs: &mut BTreeSet<String>,
808 child_dirs: &mut BTreeMap<String, Vec<String>>,
809) {
810 if dir.is_empty() {
811 return;
812 }
813
814 let mut parent = String::new();
815 let mut current = String::with_capacity(dir.len());
816 for segment in dir.split('/') {
817 if !current.is_empty() {
818 current.push('/');
819 }
820 current.push_str(segment);
821
822 if all_dirs.insert(current.clone()) {
823 child_dirs
824 .entry(parent.clone())
825 .or_default()
826 .push(current.clone());
827 }
828 parent.clear();
829 parent.push_str(¤t);
830 }
831}
832
833#[cfg(test)]
834mod tests {
835 use super::*;
836 use std::fs;
837 use std::sync::atomic::{AtomicUsize, Ordering};
838 use std::sync::Arc;
839 use tempfile::TempDir;
840
841 #[test]
842 fn test_hex_to_bytes() {
843 let hex = "a3b2c1d4e5f60718293a4b5c6d7e8f90a1b2c3d4e5f60718293a4b5c6d7e8f90";
844 let bytes = hex_to_bytes(hex).unwrap();
845 assert_eq!(bytes[0], 0xa3);
846 assert_eq!(bytes[1], 0xb2);
847 assert_eq!(bytes[31], 0x90);
848 }
849
850 #[test]
851 fn test_hex_to_bytes_invalid_length() {
852 assert!(hex_to_bytes("abc").is_err());
853 }
854
855 #[test]
856 fn test_store_has_external_objects_false_for_empty_store() {
857 let dir = TempDir::new().unwrap();
858 let objects_dir = dir.path().join("objects");
859 let packs_dir = dir.path().join("packs");
860 std::fs::create_dir_all(&objects_dir).unwrap();
861 std::fs::create_dir_all(&packs_dir).unwrap();
862
863 assert!(!store_has_external_objects(&objects_dir, &packs_dir).unwrap());
864 }
865
866 #[test]
867 fn test_store_has_external_objects_true_for_pack_objects() {
868 let dir = TempDir::new().unwrap();
869 let objects_dir = dir.path().join("objects");
870 let packs_dir = dir.path().join("packs");
871 std::fs::create_dir_all(&packs_dir).unwrap();
872 std::fs::write(packs_dir.join("pack-demo.dat"), b"pack").unwrap();
873
874 assert!(store_has_external_objects(&objects_dir, &packs_dir).unwrap());
875 }
876
877 #[test]
878 fn test_compress_with_worker_context_roundtrip() {
879 let content = b"compression-context-roundtrip-data";
880 let mut compressor = zstd::bulk::Compressor::new(1).unwrap();
881
882 let compressed = compress_with_worker_context(content, &mut compressor).unwrap();
883 let decompressed = zstd::decode_all(&compressed[..]).unwrap();
884
885 assert_eq!(decompressed, content);
886 }
887
888 #[test]
889 fn test_sharded_seen_hashes_dedups_single_thread() {
890 let seen = ShardedSeenHashes::new(8);
891 let hash = [7u8; 32];
892
893 assert!(seen.insert(hash));
894 assert!(!seen.insert(hash));
895 }
896
897 #[test]
898 fn test_sharded_seen_hashes_dedups_multi_thread() {
899 let seen = Arc::new(ShardedSeenHashes::new(1024));
900 let unique_inserts = Arc::new(AtomicUsize::new(0));
901 let duplicate_hash = [9u8; 32];
902
903 std::thread::scope(|scope| {
904 for i in 0..8u8 {
905 let seen = Arc::clone(&seen);
906 let unique_inserts = Arc::clone(&unique_inserts);
907 scope.spawn(move || {
908 if seen.insert(duplicate_hash) {
909 unique_inserts.fetch_add(1, Ordering::Relaxed);
910 }
911 let mut unique_hash = [0u8; 32];
912 unique_hash[0] = i;
913 if seen.insert(unique_hash) {
914 unique_inserts.fetch_add(1, Ordering::Relaxed);
915 }
916 });
917 }
918 });
919
920 assert_eq!(unique_inserts.load(Ordering::Relaxed), 9);
921 }
922
923 #[test]
924 fn test_split_scanned_refs_preserves_hardlink_groups() {
925 let f1 = scanned("a.txt", Some(1));
926 let f2 = scanned("b.txt", Some(1));
927 let f3 = scanned("c.txt", Some(2));
928 let f4 = scanned("d.txt", Some(3));
929 let f5 = scanned("e.txt", Some(3));
930 let refs = vec![&f1, &f2, &f3, &f4, &f5];
931
932 let chunks = split_scanned_refs_preserving_hardlinks(&refs, 2);
933 let paths: Vec<Vec<&str>> = chunks
934 .into_iter()
935 .map(|chunk| {
936 chunk
937 .iter()
938 .map(|file| file.relative_path.as_str())
939 .collect::<Vec<_>>()
940 })
941 .collect();
942
943 assert_eq!(
944 paths,
945 vec![vec!["a.txt", "b.txt", "c.txt"], vec!["d.txt", "e.txt"]]
946 );
947 }
948
949 #[cfg(unix)]
950 #[test]
951 fn test_prepare_file_with_hardlink_cache_reuses_existing_read() {
952 let dir = TempDir::new().unwrap();
953 let original = dir.path().join("original.txt");
954 let alias = dir.path().join("alias.txt");
955 fs::write(&original, "same-content").unwrap();
956 fs::hard_link(&original, &alias).unwrap();
957
958 let original_scanned = scanned_from_path("original.txt", &original);
959 let alias_scanned = scanned_from_path("alias.txt", &alias);
960 let seen_hashes = ShardedSeenHashes::new(2);
961 let mut compressor = zstd::bulk::Compressor::new(1).unwrap();
962 let mut hardlinks = HashMap::new();
963
964 let first = prepare_file_with_hardlink_cache(
965 &original_scanned,
966 &seen_hashes,
967 &mut compressor,
968 &mut hardlinks,
969 )
970 .unwrap();
971 let second = prepare_file_with_hardlink_cache(
972 &alias_scanned,
973 &seen_hashes,
974 &mut compressor,
975 &mut hardlinks,
976 )
977 .unwrap();
978
979 assert!(first.compressed.is_some());
980 assert!(second.compressed.is_none());
981 assert_eq!(first.blob_hash_bytes, second.blob_hash_bytes);
982 assert_eq!(first.blob_hash_hex, second.blob_hash_hex);
983 }
984
985 fn scanned(relative_path: &str, inode: Option<u64>) -> ScannedFile {
986 ScannedFile {
987 relative_path: relative_path.to_string(),
988 absolute_path: std::path::PathBuf::from(relative_path),
989 size: 1,
990 mtime_secs: 1,
991 mtime_nanos: 1,
992 device: Some(1),
993 inode,
994 mode: 0o100644,
995 is_symlink: false,
996 }
997 }
998
999 #[cfg(unix)]
1000 fn scanned_from_path(relative_path: &str, path: &Path) -> ScannedFile {
1001 use std::os::unix::fs::MetadataExt;
1002
1003 let metadata = fs::metadata(path).unwrap();
1004 ScannedFile {
1005 relative_path: relative_path.to_string(),
1006 absolute_path: path.to_path_buf(),
1007 size: metadata.len(),
1008 mtime_secs: metadata.mtime(),
1009 mtime_nanos: metadata.mtime_nsec(),
1010 device: Some(metadata.dev()),
1011 inode: Some(metadata.ino()),
1012 mode: metadata.mode(),
1013 is_symlink: metadata.file_type().is_symlink(),
1014 }
1015 }
1016}