1use crate::config::{project_id_from_path, StoreLayout};
2use crate::error::{ChkpttError, Result};
3use crate::index::{FileEntry, FileIndex};
4use crate::ops::io_order::sort_scanned_refs_for_locality;
5use crate::ops::lock::ProjectLock;
6use crate::scanner::ScannedFile;
7use crate::store::blob::{hex_to_bytes, read_or_mmap, read_path_bytes};
8use crate::store::catalog::{BlobLocation, CatalogSnapshot, ManifestEntry, MetadataCatalog};
9use crate::store::pack::{PackFinishOptions, PackWriter};
10use crate::store::snapshot::{Snapshot, SnapshotStats};
11use crate::store::tree::{EntryType, TreeEntry, TreeStore};
12use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
13use std::path::Path;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{mpsc, Arc, Mutex};
16
17use crate::ops::progress::{emit, ProgressCallback, ProgressEvent};
18
19#[derive(Default)]
20pub struct SaveOptions {
21 pub message: Option<String>,
22 pub include_deps: bool,
23 pub pack_chunk_bytes: Option<u64>,
24 pub progress: ProgressCallback,
25}
26
27#[derive(Debug)]
28pub struct SaveResult {
29 pub snapshot_id: String,
30 pub stats: SnapshotStats,
31}
32
33struct ProcessedFile {
35 relative_path: String,
36 blob_hash_bytes: [u8; 16],
37 size: u64,
38 mode: u32,
39 entry_type: EntryType,
40}
41
42struct PreparedFile {
43 relative_path: String,
44 blob_hash_bytes: [u8; 16],
45 compressed: Option<Vec<u8>>,
47 size: u64,
48 mode: u32,
49 mtime_secs: i64,
50 mtime_nanos: i64,
51 inode: Option<u64>,
52 entry_type: EntryType,
53}
54
55struct NewBlobRecord {
56 blob_hash: [u8; 16],
57 size: u64,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
61struct HardlinkKey {
62 device: u64,
63 inode: u64,
64}
65
66#[derive(Debug, Clone)]
67struct HardlinkPrepared {
68 blob_hash_bytes: [u8; 16],
69}
70
71const SEEN_HASH_SHARDS: usize = 64;
72const PREPARED_FILE_PIPELINE_SLOTS: usize = 64;
73struct ShardedSeenHashes {
74 shards: Vec<Mutex<HashSet<[u8; 16]>>>,
75}
76
77impl ShardedSeenHashes {
78 fn new(expected_entries: usize) -> Self {
79 let shard_count = SEEN_HASH_SHARDS.max(1);
80 let per_shard_capacity = expected_entries.div_ceil(shard_count).max(1);
81 let mut shards = Vec::with_capacity(shard_count);
82 for _ in 0..shard_count {
83 shards.push(Mutex::new(HashSet::with_capacity(per_shard_capacity)));
84 }
85 Self { shards }
86 }
87
88 #[inline]
89 fn insert(&self, hash: [u8; 16]) -> bool {
90 let shard_index = ((hash[0] as usize) << 8 | hash[1] as usize) % self.shards.len();
91 let mut shard = self.shards[shard_index].lock().unwrap();
92 shard.insert(hash)
93 }
94}
95
96pub fn save(workspace_root: &Path, options: SaveOptions) -> Result<SaveResult> {
101 let project_id = project_id_from_path(workspace_root);
103
104 let layout = StoreLayout::new(&project_id);
106 layout.ensure_dirs()?;
107
108 let _lock = ProjectLock::acquire(&layout.locks_dir())?;
110 let catalog = MetadataCatalog::open(layout.catalog_path())?;
111
112 let scanned_files =
114 crate::scanner::scan_workspace_with_options(workspace_root, None, options.include_deps)?;
115 emit(
116 &options.progress,
117 ProgressEvent::ScanComplete {
118 file_count: scanned_files.len() as u64,
119 },
120 );
121
122 let mut index = FileIndex::open(layout.index_path())?;
124 let cached_entries = index.entries();
125
126 let packs_dir = layout.packs_dir();
128 let known_blob_hashes = catalog.all_blob_hashes()?;
134 let mut pack_writer = PackWriter::new(&packs_dir)?;
135
136 let mut processed_files = Vec::with_capacity(scanned_files.len());
138 let mut files_to_prepare = Vec::new();
139 let mut updated_entries = Vec::new();
140 let mut blob_locations_to_record = Vec::new();
141 let mut new_blob_records = Vec::new();
142 let mut total_bytes: u64 = 0;
143 let mut current_paths =
144 (!cached_entries.is_empty()).then(|| HashSet::with_capacity(scanned_files.len()));
145
146 for scanned in &scanned_files {
147 if let Some(paths) = current_paths.as_mut() {
148 paths.insert(scanned.relative_path.as_str());
149 }
150
151 if let Some(processed) =
152 cached_processed_file(scanned, cached_entries.get(&scanned.relative_path))
153 {
154 total_bytes += processed.size;
155 processed_files.push(processed);
156 } else {
157 files_to_prepare.push(scanned);
158 }
159 }
160 let removed_paths = current_paths
161 .map(|paths| {
162 cached_entries
163 .keys()
164 .filter(|path| !paths.contains(path.as_str()))
165 .cloned()
166 .collect::<Vec<_>>()
167 })
168 .unwrap_or_default();
169
170 let mut new_objects: u64 = 0;
171
172 let seen_hashes = Arc::new(ShardedSeenHashes::new(files_to_prepare.len()));
174
175 sort_scanned_refs_for_locality(&mut files_to_prepare);
177
178 let total_to_process = files_to_prepare.len() as u64;
180 emit(
181 &options.progress,
182 ProgressEvent::ProcessStart {
183 total: total_to_process,
184 },
185 );
186 let progress_counter = AtomicU64::new(0);
187 process_prepared_files_streaming(
188 &files_to_prepare,
189 &seen_hashes,
190 &options.progress,
191 &progress_counter,
192 total_to_process,
193 |prepared| {
194 let PreparedFile {
195 relative_path,
196 blob_hash_bytes,
197 compressed,
198 size,
199 mode,
200 mtime_secs,
201 mtime_nanos,
202 inode,
203 entry_type,
204 } = prepared;
205
206 total_bytes += size;
207 if let Some(compressed) = compressed {
208 let exists_in_store = known_blob_hashes.contains(&blob_hash_bytes);
209 if !exists_in_store {
210 pack_writer.add_pre_compressed_bytes(blob_hash_bytes, compressed)?;
211 new_blob_records.push(NewBlobRecord {
212 blob_hash: blob_hash_bytes,
213 size,
214 });
215 new_objects += 1;
216 }
217 }
218
219 updated_entries.push(FileEntry {
220 path: relative_path.clone(),
221 blob_hash: blob_hash_bytes,
222 size,
223 mtime_secs,
224 mtime_nanos,
225 inode,
226 mode,
227 });
228 processed_files.push(ProcessedFile {
229 relative_path,
230 blob_hash_bytes,
231 size,
232 mode,
233 entry_type,
234 });
235 Ok(())
236 },
237 )?;
238 let new_pack_hash = if !pack_writer.is_empty() {
239 Some(pack_writer.finish_with_options(PackFinishOptions {
240 chunk_bytes: options.pack_chunk_bytes,
241 })?)
242 } else {
243 None
244 };
245 if let Some(pack_hash) = new_pack_hash {
246 for blob in &new_blob_records {
247 blob_locations_to_record.push((
248 blob.blob_hash,
249 BlobLocation {
250 pack_hash: Some(pack_hash.clone()),
251 size: blob.size,
252 },
253 ));
254 }
255 }
256 catalog.bulk_upsert_blob_locations(&blob_locations_to_record)?;
257 emit(&options.progress, ProgressEvent::PackComplete);
258
259 let latest_catalog_snapshot = catalog.latest_snapshot()?;
261
262 let root_tree_hash = if new_objects == 0
263 && removed_paths.is_empty()
264 && updated_entries.is_empty()
265 {
266 if let Some(ref snapshot) = latest_catalog_snapshot {
267 root_tree_hash_for_snapshot(&catalog, snapshot, &TreeStore::new(layout.trees_dir()))?
269 } else {
270 let tree_store = TreeStore::new(layout.trees_dir());
271 let hex = build_tree(&processed_files, &tree_store)?;
272 hex_to_bytes(&hex)?
273 }
274 } else {
275 let tree_store = TreeStore::new(layout.trees_dir());
276 let hex = build_tree(&processed_files, &tree_store)?;
277 hex_to_bytes(&hex)?
278 };
279
280 let parent_snapshot_id = latest_catalog_snapshot
282 .as_ref()
283 .map(|snapshot| snapshot.id.clone());
284
285 let stats = SnapshotStats {
287 total_files: scanned_files.len() as u64,
288 total_bytes,
289 new_objects,
290 };
291
292 let snapshot = Snapshot::new(
293 options.message,
294 root_tree_hash,
295 parent_snapshot_id,
296 stats.clone(),
297 );
298
299 let snapshot_id = snapshot.id.clone();
300 let catalog_snapshot = CatalogSnapshot {
301 id: snapshot.id.clone(),
302 created_at: snapshot.created_at,
303 message: snapshot.message.clone(),
304 parent_snapshot_id: snapshot.parent_snapshot_id.clone(),
305 manifest_snapshot_id: None,
306 root_tree_hash: Some(root_tree_hash),
307 stats: stats.clone(),
308 };
309 let no_manifest_changes =
310 new_objects == 0 && removed_paths.is_empty() && updated_entries.is_empty();
311
312 if no_manifest_changes {
314 let manifest_snapshot_id = latest_catalog_snapshot
315 .as_ref()
316 .map(|snapshot| {
317 snapshot
318 .manifest_snapshot_id
319 .as_deref()
320 .unwrap_or(snapshot.id.as_str())
321 })
322 .unwrap_or(snapshot.id.as_str());
323 catalog.insert_snapshot_metadata_only(&catalog_snapshot, manifest_snapshot_id)?;
324 } else {
325 let mut manifest: Vec<ManifestEntry> = processed_files
326 .iter()
327 .map(|processed| ManifestEntry {
328 path: processed.relative_path.clone(),
329 blob_hash: processed.blob_hash_bytes,
330 size: processed.size,
331 mode: processed.mode,
332 })
333 .collect();
334 manifest.sort_unstable_by(|left, right| left.path.cmp(&right.path));
335 catalog.insert_snapshot(&catalog_snapshot, &manifest)?;
336 }
337
338 index.apply_changes(&removed_paths, &updated_entries)?;
340
341 Ok(SaveResult { snapshot_id, stats })
345}
346
347fn root_tree_hash_for_snapshot(
348 catalog: &MetadataCatalog,
349 snapshot: &CatalogSnapshot,
350 tree_store: &TreeStore,
351) -> Result<[u8; 16]> {
352 if let Some(root_tree_hash) = snapshot.root_tree_hash {
353 return Ok(root_tree_hash);
354 }
355
356 let manifest = catalog.snapshot_manifest(&snapshot.id)?;
357 if manifest.is_empty() && snapshot.stats.total_files > 0 {
358 return Err(ChkpttError::StoreCorrupted(format!(
359 "snapshot '{}' is missing both manifest entries and root_tree_hash",
360 snapshot.id
361 )));
362 }
363 let root_tree_hex = build_tree(
364 &manifest
365 .into_iter()
366 .map(|entry| ProcessedFile {
367 relative_path: entry.path,
368 blob_hash_bytes: entry.blob_hash,
369 size: entry.size,
370 mode: entry.mode,
371 entry_type: if entry.mode & 0o170000 == 0o120000 {
372 EntryType::Symlink
373 } else {
374 EntryType::File
375 },
376 })
377 .collect::<Vec<_>>(),
378 tree_store,
379 )?;
380 hex_to_bytes(&root_tree_hex)
381}
382
383fn cached_processed_file(
384 scanned: &ScannedFile,
385 cached: Option<&FileEntry>,
386) -> Option<ProcessedFile> {
387 if let Some(cached) = cached {
388 if cached.mtime_secs == scanned.mtime_secs
389 && cached.mtime_nanos == scanned.mtime_nanos
390 && cached.size == scanned.size
391 && cached.inode == scanned.inode
392 && cached.mode == scanned.mode
393 {
394 return Some(ProcessedFile {
395 relative_path: scanned.relative_path.clone(),
396 blob_hash_bytes: cached.blob_hash,
397 size: scanned.size,
398 mode: scanned.mode,
399 entry_type: if scanned.is_symlink {
400 EntryType::Symlink
401 } else {
402 EntryType::File
403 },
404 });
405 }
406 }
407 None
408}
409
410fn prepare_file(scanned: &ScannedFile, seen_hashes: &ShardedSeenHashes) -> Result<PreparedFile> {
411 let (blob_hash_bytes, compressed) = if scanned.is_symlink {
412 let content = read_path_bytes(&scanned.absolute_path, true)?;
413 let blob_hash_bytes = xxhash_rust::xxh3::xxh3_128(&content).to_le_bytes();
414 let is_new = seen_hashes.insert(blob_hash_bytes);
415 let compressed = if is_new {
416 Some(compress_with_worker_context(&content))
417 } else {
418 None
419 };
420 (blob_hash_bytes, compressed)
421 } else {
422 let content = read_or_mmap(&scanned.absolute_path)?;
423 let blob_hash_bytes = xxhash_rust::xxh3::xxh3_128(content.as_ref()).to_le_bytes();
424 let is_new = seen_hashes.insert(blob_hash_bytes);
425 let compressed = if is_new {
426 Some(compress_with_worker_context(content.as_ref()))
427 } else {
428 None
429 };
430 (blob_hash_bytes, compressed)
431 };
432
433 Ok(PreparedFile {
434 relative_path: scanned.relative_path.clone(),
435 blob_hash_bytes,
436 compressed,
437 size: scanned.size,
438 mode: scanned.mode,
439 mtime_secs: scanned.mtime_secs,
440 mtime_nanos: scanned.mtime_nanos,
441 inode: scanned.inode,
442 entry_type: if scanned.is_symlink {
443 EntryType::Symlink
444 } else {
445 EntryType::File
446 },
447 })
448}
449
450fn hardlink_key(scanned: &ScannedFile) -> Option<HardlinkKey> {
451 if scanned.is_symlink {
452 return None;
453 }
454
455 Some(HardlinkKey {
456 device: scanned.device?,
457 inode: scanned.inode?,
458 })
459}
460
461fn prepare_file_with_hardlink_cache(
462 scanned: &ScannedFile,
463 seen_hashes: &ShardedSeenHashes,
464 hardlinks: &mut HashMap<HardlinkKey, HardlinkPrepared>,
465) -> Result<PreparedFile> {
466 if let Some(key) = hardlink_key(scanned) {
467 if let Some(cached) = hardlinks.get(&key) {
468 return Ok(PreparedFile {
469 relative_path: scanned.relative_path.clone(),
470 blob_hash_bytes: cached.blob_hash_bytes,
471 compressed: None,
472 size: scanned.size,
473 mode: scanned.mode,
474 mtime_secs: scanned.mtime_secs,
475 mtime_nanos: scanned.mtime_nanos,
476 inode: scanned.inode,
477 entry_type: if scanned.is_symlink {
478 EntryType::Symlink
479 } else {
480 EntryType::File
481 },
482 });
483 }
484
485 let prepared = prepare_file(scanned, seen_hashes)?;
486 hardlinks.insert(
487 key,
488 HardlinkPrepared {
489 blob_hash_bytes: prepared.blob_hash_bytes,
490 },
491 );
492 return Ok(prepared);
493 }
494
495 prepare_file(scanned, seen_hashes)
496}
497
498fn split_scanned_refs_preserving_hardlinks<'a>(
499 scanned_files: &'a [&'a ScannedFile],
500 worker_count: usize,
501) -> Vec<&'a [&'a ScannedFile]> {
502 if scanned_files.is_empty() {
503 return Vec::new();
504 }
505
506 let target_chunk_size = scanned_files.len().div_ceil(worker_count.max(1));
507 let mut chunks = Vec::with_capacity(worker_count.max(1));
508 let mut start = 0usize;
509
510 while start < scanned_files.len() {
511 let mut end = (start + target_chunk_size).min(scanned_files.len());
512 while end < scanned_files.len()
513 && hardlink_key(scanned_files[end - 1]) == hardlink_key(scanned_files[end])
514 && hardlink_key(scanned_files[end]).is_some()
515 {
516 end += 1;
517 }
518 chunks.push(&scanned_files[start..end]);
519 start = end;
520 }
521
522 chunks
523}
524
525fn compress_with_worker_context(content: &[u8]) -> Vec<u8> {
526 use lz4_flex::frame::FrameEncoder;
527 let mut encoder = FrameEncoder::new(Vec::new());
528 std::io::Write::write_all(&mut encoder, content).unwrap();
529 encoder.finish().unwrap()
530}
531
532fn process_prepared_files_streaming<F>(
535 scanned_files: &[&ScannedFile],
536 seen_hashes: &Arc<ShardedSeenHashes>,
537 progress: &ProgressCallback,
538 progress_counter: &AtomicU64,
539 total_to_process: u64,
540 mut on_prepared: F,
541) -> Result<()>
542where
543 F: FnMut(PreparedFile) -> Result<()>,
544{
545 if scanned_files.is_empty() {
546 return Ok(());
547 }
548
549 let worker_count = std::thread::available_parallelism()
550 .map(|n| n.get())
551 .unwrap_or(1)
552 .min(scanned_files.len());
553
554 if worker_count <= 1 {
555 let mut hardlinks = HashMap::new();
556 for scanned in scanned_files {
557 let prepared = prepare_file_with_hardlink_cache(scanned, seen_hashes, &mut hardlinks)?;
558 let completed = progress_counter.fetch_add(1, Ordering::Relaxed) + 1;
559 emit(
560 progress,
561 ProgressEvent::ProcessFile {
562 completed,
563 total: total_to_process,
564 },
565 );
566 on_prepared(prepared)?;
567 }
568 return Ok(());
569 }
570
571 let chunks = split_scanned_refs_preserving_hardlinks(scanned_files, worker_count);
572
573 std::thread::scope(|scope| -> Result<()> {
574 let (sender, receiver) = mpsc::sync_channel::<Result<PreparedFile>>(
575 PREPARED_FILE_PIPELINE_SLOTS.max(worker_count),
576 );
577
578 let handles: Vec<_> = chunks
579 .into_iter()
580 .map(|chunk| {
581 let sender = sender.clone();
582 scope.spawn(move || {
583 let mut hardlinks = HashMap::new();
584
585 for scanned in chunk {
586 let result =
587 prepare_file_with_hardlink_cache(scanned, seen_hashes, &mut hardlinks);
588 let completed = progress_counter.fetch_add(1, Ordering::Relaxed) + 1;
589 emit(
590 progress,
591 ProgressEvent::ProcessFile {
592 completed,
593 total: total_to_process,
594 },
595 );
596 if sender.send(result).is_err() {
597 return;
598 }
599 }
600 })
601 })
602 .collect();
603 drop(sender);
604
605 let mut consumer_result = Ok(());
606 loop {
607 match receiver.recv() {
608 Ok(Ok(prepared)) => {
609 if let Err(error) = on_prepared(prepared) {
610 consumer_result = Err(error);
611 break;
612 }
613 }
614 Ok(Err(error)) => {
615 consumer_result = Err(error);
616 break;
617 }
618 Err(_) => break,
619 }
620 }
621 drop(receiver);
622
623 for handle in handles {
624 handle.join().unwrap();
625 }
626
627 consumer_result
628 })
629}
630
631fn build_tree(processed_files: &[ProcessedFile], tree_store: &TreeStore) -> Result<String> {
636 let mut dir_files: BTreeMap<String, Vec<&ProcessedFile>> = BTreeMap::new();
638 let mut all_dirs: BTreeSet<String> = BTreeSet::new();
639 let mut child_dirs: BTreeMap<String, Vec<String>> = BTreeMap::new();
640 all_dirs.insert(String::new()); for pf in processed_files {
643 let parent = match pf.relative_path.rfind('/') {
644 Some(pos) => &pf.relative_path[..pos],
645 None => "",
646 };
647 dir_files.entry(parent.to_string()).or_default().push(pf);
648 register_directory_hierarchy(parent, &mut all_dirs, &mut child_dirs);
649 }
650
651 let mut dir_list: Vec<String> = all_dirs.into_iter().collect();
653 dir_list.sort_unstable_by(|a, b| {
654 let depth_a = if a.is_empty() {
655 0
656 } else {
657 a.matches('/').count() + 1
658 };
659 let depth_b = if b.is_empty() {
660 0
661 } else {
662 b.matches('/').count() + 1
663 };
664 depth_b.cmp(&depth_a).then_with(|| a.cmp(b))
665 });
666
667 let mut dir_hashes: BTreeMap<String, String> = BTreeMap::new();
669 let mut pack_entries: Vec<(String, Vec<u8>)> = Vec::with_capacity(dir_list.len());
670 let mut known_hashes: HashSet<[u8; 16]> = HashSet::with_capacity(dir_list.len());
671
672 for dir in &dir_list {
673 let file_count = dir_files.get(dir).map(|files| files.len()).unwrap_or(0);
674 let child_count = child_dirs
675 .get(dir)
676 .map(|children| children.len())
677 .unwrap_or(0);
678 let mut entries: Vec<TreeEntry> = Vec::with_capacity(file_count + child_count);
679
680 if let Some(files) = dir_files.get(dir) {
681 for pf in files {
682 let name = match pf.relative_path.rfind('/') {
683 Some(pos) => &pf.relative_path[pos + 1..],
684 None => &pf.relative_path,
685 };
686 entries.push(TreeEntry {
687 name: name.to_string(),
688 entry_type: pf.entry_type,
689 hash: pf.blob_hash_bytes,
690 size: pf.size,
691 mode: pf.mode,
692 });
693 }
694 }
695
696 if let Some(children) = child_dirs.get(dir) {
697 for sub_dir in children {
698 let sub_hash = dir_hashes.get(sub_dir).ok_or_else(|| {
699 ChkpttError::Other(format!("Missing tree hash for directory '{}'", sub_dir))
700 })?;
701 let sub_name = match sub_dir.rfind('/') {
702 Some(pos) => &sub_dir[pos + 1..],
703 None => sub_dir.as_str(),
704 };
705 entries.push(TreeEntry {
706 name: sub_name.to_string(),
707 entry_type: EntryType::Dir,
708 hash: hex_to_bytes(sub_hash)?,
709 size: 0,
710 mode: 0o040755,
711 });
712 }
713 }
714
715 entries.sort_unstable_by(|a, b| a.name.cmp(&b.name));
716 let encoded = bitcode::encode(&entries);
717 let hash_bytes = xxhash_rust::xxh3::xxh3_128(&encoded).to_le_bytes();
718 let hash_hex = crate::store::blob::bytes_to_hex(&hash_bytes);
719
720 dir_hashes.insert(dir.clone(), hash_hex.clone());
721 if known_hashes.insert(hash_bytes) {
722 pack_entries.push((hash_hex, encoded));
723 }
724 }
725
726 tree_store.write_pack(&pack_entries)?;
728
729 dir_hashes
730 .get("")
731 .cloned()
732 .ok_or_else(|| ChkpttError::Other("Failed to build root tree".into()))
733}
734
735fn register_directory_hierarchy(
736 dir: &str,
737 all_dirs: &mut BTreeSet<String>,
738 child_dirs: &mut BTreeMap<String, Vec<String>>,
739) {
740 if dir.is_empty() {
741 return;
742 }
743
744 if all_dirs.contains(dir) {
746 return;
747 }
748
749 let mut segments_end = Vec::new();
750 for (i, ch) in dir.char_indices() {
751 if ch == '/' {
752 segments_end.push(i);
753 }
754 }
755 segments_end.push(dir.len());
756
757 let mut parent = String::new();
758 for &end in &segments_end {
759 let current = &dir[..end];
760 if all_dirs.insert(current.to_string()) {
761 child_dirs
762 .entry(parent)
763 .or_default()
764 .push(current.to_string());
765 }
766 parent = current.to_string();
767 }
768}
769
770#[cfg(test)]
771mod tests {
772 use super::*;
773 use std::fs;
774 use std::sync::atomic::{AtomicUsize, Ordering};
775 use std::sync::Arc;
776 use tempfile::TempDir;
777
778 #[test]
779 fn test_hex_to_bytes() {
780 let hex = "a3b2c1d4e5f60718293a4b5c6d7e8f90";
781 let bytes = hex_to_bytes(hex).unwrap();
782 assert_eq!(bytes[0], 0xa3);
783 assert_eq!(bytes[1], 0xb2);
784 assert_eq!(bytes[15], 0x90);
785 }
786
787 #[test]
788 fn test_hex_to_bytes_invalid_length() {
789 assert!(hex_to_bytes("abc").is_err());
790 }
791
792 #[test]
793 fn test_compress_with_worker_context_roundtrip() {
794 let content = b"compression-context-roundtrip-data";
795
796 let compressed = compress_with_worker_context(content);
797 let decompressed = {
798 use lz4_flex::frame::FrameDecoder;
799 let mut decoder = FrameDecoder::new(&compressed[..]);
800 let mut buf = Vec::new();
801 std::io::Read::read_to_end(&mut decoder, &mut buf).unwrap();
802 buf
803 };
804
805 assert_eq!(decompressed, content);
806 }
807
808 #[test]
809 fn test_sharded_seen_hashes_dedups_single_thread() {
810 let seen = ShardedSeenHashes::new(8);
811 let hash = [7u8; 16];
812
813 assert!(seen.insert(hash));
814 assert!(!seen.insert(hash));
815 }
816
817 #[test]
818 fn test_sharded_seen_hashes_dedups_multi_thread() {
819 let seen = Arc::new(ShardedSeenHashes::new(1024));
820 let unique_inserts = Arc::new(AtomicUsize::new(0));
821 let duplicate_hash = [9u8; 16];
822
823 std::thread::scope(|scope| {
824 for i in 0..8u8 {
825 let seen = Arc::clone(&seen);
826 let unique_inserts = Arc::clone(&unique_inserts);
827 scope.spawn(move || {
828 if seen.insert(duplicate_hash) {
829 unique_inserts.fetch_add(1, Ordering::Relaxed);
830 }
831 let mut unique_hash = [0u8; 16];
832 unique_hash[0] = i;
833 if seen.insert(unique_hash) {
834 unique_inserts.fetch_add(1, Ordering::Relaxed);
835 }
836 });
837 }
838 });
839
840 assert_eq!(unique_inserts.load(Ordering::Relaxed), 9);
841 }
842
843 #[test]
844 fn test_split_scanned_refs_preserves_hardlink_groups() {
845 let f1 = scanned("a.txt", Some(1));
846 let f2 = scanned("b.txt", Some(1));
847 let f3 = scanned("c.txt", Some(2));
848 let f4 = scanned("d.txt", Some(3));
849 let f5 = scanned("e.txt", Some(3));
850 let refs = vec![&f1, &f2, &f3, &f4, &f5];
851
852 let chunks = split_scanned_refs_preserving_hardlinks(&refs, 2);
853 let paths: Vec<Vec<&str>> = chunks
854 .into_iter()
855 .map(|chunk| {
856 chunk
857 .iter()
858 .map(|file| file.relative_path.as_str())
859 .collect::<Vec<_>>()
860 })
861 .collect();
862
863 assert_eq!(
864 paths,
865 vec![vec!["a.txt", "b.txt", "c.txt"], vec!["d.txt", "e.txt"]]
866 );
867 }
868
869 #[cfg(unix)]
870 #[test]
871 fn test_prepare_file_with_hardlink_cache_reuses_existing_read() {
872 let dir = TempDir::new().unwrap();
873 let original = dir.path().join("original.txt");
874 let alias = dir.path().join("alias.txt");
875 fs::write(&original, "same-content").unwrap();
876 fs::hard_link(&original, &alias).unwrap();
877
878 let original_scanned = scanned_from_path("original.txt", &original);
879 let alias_scanned = scanned_from_path("alias.txt", &alias);
880 let seen_hashes = ShardedSeenHashes::new(2);
881 let mut hardlinks = HashMap::new();
882
883 let first =
884 prepare_file_with_hardlink_cache(&original_scanned, &seen_hashes, &mut hardlinks)
885 .unwrap();
886 let second =
887 prepare_file_with_hardlink_cache(&alias_scanned, &seen_hashes, &mut hardlinks).unwrap();
888
889 assert!(first.compressed.is_some());
890 assert!(second.compressed.is_none());
891 assert_eq!(first.blob_hash_bytes, second.blob_hash_bytes);
892 }
893
894 fn scanned(relative_path: &str, inode: Option<u64>) -> ScannedFile {
895 ScannedFile {
896 relative_path: relative_path.to_string(),
897 absolute_path: std::path::PathBuf::from(relative_path),
898 size: 1,
899 mtime_secs: 1,
900 mtime_nanos: 1,
901 device: Some(1),
902 inode,
903 mode: 0o100644,
904 is_symlink: false,
905 }
906 }
907
908 #[cfg(unix)]
909 fn scanned_from_path(relative_path: &str, path: &Path) -> ScannedFile {
910 use std::os::unix::fs::MetadataExt;
911
912 let metadata = fs::metadata(path).unwrap();
913 ScannedFile {
914 relative_path: relative_path.to_string(),
915 absolute_path: path.to_path_buf(),
916 size: metadata.len(),
917 mtime_secs: metadata.mtime(),
918 mtime_nanos: metadata.mtime_nsec(),
919 device: Some(metadata.dev()),
920 inode: Some(metadata.ino()),
921 mode: metadata.mode(),
922 is_symlink: metadata.file_type().is_symlink(),
923 }
924 }
925}