1use std::{
5 fs,
6 path::{Path, PathBuf},
7};
8
9use tracing::{debug, instrument, trace};
10
11use super::{
12 FsStore,
13 fs_io::{list_hashes_from_dir, read_file_bytes, read_file_header},
14 fs_paths::{
15 action_path, actions_dir, blobs_dir, hash_path, redaction_path, redactions_dir, state_path,
16 state_visibility_dir, state_visibility_path, states_dir, trees_dir,
17 },
18};
19use crate::{
20 object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree},
21 store::{
22 HeddleError, ObjectStore, Result,
23 compression::{compress, decompress, header_uncompressed_size, is_compressed},
24 pack::{ObjectType, PackManager, PackObjectId},
25 },
26};
27
28const BLOB_HEADER_PEEK: usize = 13;
36
37fn validate_loaded_tree(tree: Tree) -> Result<Tree> {
38 tree.validate()?;
39 Ok(tree)
40}
41
42fn validate_blob_bytes(data: &[u8], hash: ContentHash) -> Result<()> {
43 let mut hasher = ContentHash::typed_hasher("blob", data.len() as u64);
44 hasher.update(data);
45 let found = ContentHash::from_bytes(hasher.finalize().into());
46 if found != hash {
47 return Err(HeddleError::Corruption {
48 expected: hash,
49 found,
50 });
51 }
52
53 Ok(())
54}
55
56fn validate_tree_serialized(data: &[u8], hash: ContentHash) -> Result<Tree> {
57 let tree: Tree = rmp_serde::from_slice(data)?;
58 let tree = validate_loaded_tree(tree)?;
59 let found = tree.hash();
60 if found != hash {
61 return Err(HeddleError::Corruption {
62 expected: hash,
63 found,
64 });
65 }
66
67 Ok(tree)
68}
69
70fn validate_loaded_state(requested_id: &ChangeId, state: State) -> Result<State> {
71 if state.change_id != *requested_id {
72 return Err(HeddleError::InvalidObject(format!(
73 "state change_id mismatch: requested {}, found {}",
74 requested_id, state.change_id
75 )));
76 }
77
78 Ok(state)
79}
80
81fn validate_state_serialized(data: &[u8], id: ChangeId) -> Result<State> {
82 let state: State = rmp_serde::from_slice(data)?;
83 validate_loaded_state(&id, state)
84}
85
86fn validate_loaded_action(requested_id: &ActionId, action: Action) -> Result<Action> {
87 let found_id = action.compute_id();
88 if found_id != *requested_id {
89 return Err(HeddleError::InvalidObject(format!(
90 "action id mismatch: requested {}, found {}",
91 requested_id, found_id
92 )));
93 }
94
95 Ok(action)
96}
97
98fn validate_action_serialized(data: &[u8], id: ActionId) -> Result<Action> {
99 let action: Action = rmp_serde::from_slice(data)?;
100 validate_loaded_action(&id, action)
101}
102
103fn validate_and_list_pack(reader: &crate::store::pack::PackReader) -> Result<Vec<PackObjectId>> {
111 let ids = reader.list_ids();
112 for id in &ids {
113 let Some((obj_type, data)) = reader.get_object_bytes(id)? else {
114 continue;
115 };
116 validate_pack_entry(id, obj_type, data.as_ref())?;
117 }
118 Ok(ids)
119}
120
121fn validate_pack_entry(id: &PackObjectId, obj_type: ObjectType, data: &[u8]) -> Result<()> {
122 match (id, obj_type) {
123 (PackObjectId::Hash(hash), ObjectType::Blob) => validate_blob_bytes(data, *hash),
124 (PackObjectId::Hash(hash), ObjectType::Tree) => {
125 validate_tree_serialized(data, *hash).map(|_| ())
126 }
127 (PackObjectId::Hash(hash), ObjectType::Action) => {
128 validate_action_serialized(data, ActionId::from_hash(*hash)).map(|_| ())
129 }
130 (PackObjectId::ChangeId(change_id), ObjectType::State) => {
131 validate_state_serialized(data, *change_id).map(|_| ())
132 }
133 _ => Err(HeddleError::InvalidObject(format!(
134 "unsupported native pack object: {:?} {:?}",
135 id, obj_type
136 ))),
137 }
138}
139
140impl FsStore {
141 fn try_get_blob_once(&self, hash: &ContentHash) -> Result<Option<Blob>> {
144 let path = hash_path(&blobs_dir(&self.root), hash);
145 let loose_exists = path.exists();
146 let pack_has = if loose_exists {
147 false
148 } else if let Ok(manager) = self.pack_manager().read() {
149 manager.has_object(hash)
150 } else {
151 false
152 };
153 if (loose_exists || pack_has)
154 && let Ok(cache) = self.recent_blobs.read()
155 && let Some(blob) = cache.get(hash)
156 {
157 trace!("Found blob in recent object cache");
158 return Ok(Some(blob.clone()));
159 }
160
161 if let Ok(manager) = self.pack_manager().read()
162 && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
163 && obj_type == ObjectType::Blob
164 {
165 trace!("Found blob in packfile");
166 return Ok(Some(Blob::new(data)));
174 }
175
176 match read_file_bytes(&path)? {
177 Some(data) => {
178 trace!(size = data.as_slice().len(), "Blob data read");
179 let content = if is_compressed(data.as_slice()) {
180 decompress(data.as_slice())?
181 } else {
182 data.into_vec()
183 };
184 let blob = Blob::new(content);
185 if blob.hash() != *hash {
192 return Err(HeddleError::Corruption {
193 expected: *hash,
194 found: blob.hash(),
195 });
196 }
197 Ok(Some(blob))
198 }
199 None => Ok(None),
200 }
201 }
202
203 fn loose_or_packed(
208 &self,
209 loose_path: &Path,
210 in_pack: impl FnOnce(&PackManager) -> bool,
211 ) -> Result<bool> {
212 if loose_path.exists() {
213 return Ok(true);
214 }
215 if let Ok(manager) = self.pack_manager().read() {
216 return Ok(in_pack(&manager));
217 }
218 Ok(false)
219 }
220
221 fn try_has_blob_once(&self, hash: &ContentHash) -> Result<bool> {
222 let path = hash_path(&blobs_dir(&self.root), hash);
223 self.loose_or_packed(&path, |m| m.has_object(hash))
224 }
225
226 fn try_get_blob_size_once(&self, hash: &ContentHash) -> Result<Option<u64>> {
241 if let Ok(cache) = self.recent_blobs.read()
242 && let Some(blob) = cache.get(hash)
243 {
244 return Ok(Some(blob.content().len() as u64));
245 }
246
247 let path = hash_path(&blobs_dir(&self.root), hash);
248 if let Some((header, file_len)) = read_file_header(&path, BLOB_HEADER_PEEK)? {
249 if let Some(size) = header_uncompressed_size(&header) {
250 return Ok(Some(size));
251 }
252 return Ok(Some(file_len));
255 }
256
257 if let Ok(manager) = self.pack_manager().read()
258 && let Some(size) = manager.get_hashed_object_size(hash)?
259 {
260 return Ok(Some(size));
261 }
262 Ok(None)
263 }
264
265 fn try_get_tree_once(&self, hash: &ContentHash) -> Result<Option<Tree>> {
266 if let Ok(cache) = self.recent_trees.read()
273 && let Some(tree) = cache.get(hash)
274 {
275 trace!("Found tree in recent object cache");
276 return Ok(Some(tree.clone()));
277 }
278
279 if let Ok(manager) = self.pack_manager().read()
280 && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
281 && obj_type == ObjectType::Tree
282 {
283 trace!("Found tree in packfile");
284 let tree = validate_loaded_tree(rmp_serde::from_slice(&data)?)?;
285 if tree.hash() != *hash {
286 return Err(HeddleError::Corruption {
287 expected: *hash,
288 found: tree.hash(),
289 });
290 }
291 return Ok(Some(tree));
292 }
293
294 let path = hash_path(&trees_dir(&self.root), hash);
295 match read_file_bytes(&path)? {
296 Some(data) => {
297 trace!(size = data.as_slice().len(), "Tree data read");
298 let decoded = if is_compressed(data.as_slice()) {
299 decompress(data.as_slice())?
300 } else {
301 data.into_vec()
302 };
303 let tree = validate_loaded_tree(rmp_serde::from_slice(&decoded)?)?;
304 if tree.hash() != *hash {
305 return Err(HeddleError::Corruption {
306 expected: *hash,
307 found: tree.hash(),
308 });
309 }
310 if let Ok(mut cache) = self.recent_trees.write() {
311 cache.insert(*hash, tree.clone());
312 }
313 Ok(Some(tree))
314 }
315 None => Ok(None),
316 }
317 }
318
319 fn try_has_tree_once(&self, hash: &ContentHash) -> Result<bool> {
320 let path = hash_path(&trees_dir(&self.root), hash);
321 self.loose_or_packed(&path, |m| m.has_object(hash))
322 }
323
324 fn try_get_state_once(&self, id: &ChangeId) -> Result<Option<State>> {
325 let path = state_path(&self.root, id);
326 let loose_exists = path.exists();
327 let pack_has = if loose_exists {
328 false
329 } else if let Ok(manager) = self.pack_manager().read() {
330 manager.has_object_id(&PackObjectId::ChangeId(*id))
331 } else {
332 false
333 };
334 if (loose_exists || pack_has)
335 && let Ok(cache) = self.recent_states.read()
336 && let Some(state) = cache.get(id)
337 {
338 trace!("Found state in recent object cache");
339 return Ok(Some(state.clone()));
340 }
341
342 if loose_exists && let Some(data) = read_file_bytes(&path)? {
352 trace!(
353 size = data.as_slice().len(),
354 "State read from loose object (shadows any packed copy)"
355 );
356 let decoded = if is_compressed(data.as_slice()) {
357 decompress(data.as_slice())?
358 } else {
359 data.into_vec()
360 };
361 let state = validate_loaded_state(id, rmp_serde::from_slice(&decoded)?)?;
362 if let Ok(mut cache) = self.recent_states.write() {
363 cache.insert(*id, state.clone());
364 }
365 return Ok(Some(state));
366 }
367
368 if let Ok(manager) = self.pack_manager().read()
369 && let Some((obj_type, data)) = manager.get_object(&PackObjectId::ChangeId(*id))?
370 && obj_type == ObjectType::State
371 {
372 trace!("Found state in packfile");
373 let state = validate_loaded_state(id, rmp_serde::from_slice(&data)?)?;
374 if let Ok(mut cache) = self.recent_states.write() {
375 cache.insert(*id, state.clone());
376 }
377 return Ok(Some(state));
378 }
379
380 Ok(None)
381 }
382
383 fn try_has_state_once(&self, id: &ChangeId) -> Result<bool> {
384 let path = state_path(&self.root, id);
385 self.loose_or_packed(&path, |m| m.has_object_id(&PackObjectId::ChangeId(*id)))
386 }
387}
388
389impl ObjectStore for FsStore {
390 fn clear_recent_caches(&self) {
391 self.clear_recent_object_caches();
392 }
393
394 fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
401 if let Ok(manager) = self.pack_manager().read()
402 && let Some((obj_type, data)) = manager.get_hashed_object_bytes(hash)?
403 && obj_type == crate::store::pack::ObjectType::Blob
404 {
405 return Ok(Some(data));
406 }
407 Ok(self
408 .get_blob(hash)?
409 .map(|blob| bytes::Bytes::from(blob.into_content())))
410 }
411
412 #[instrument(skip(self), fields(hash = %hash.short()))]
413 fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
414 if let Some(blob) = self.try_get_blob_once(hash)? {
415 return Ok(Some(blob));
416 }
417 if self.reload_packs_if_stale()?
422 && let Some(blob) = self.try_get_blob_once(hash)?
423 {
424 return Ok(Some(blob));
425 }
426 trace!("Blob not found");
427 Ok(None)
428 }
429
430 #[instrument(skip(self, blob), fields(size = blob.content().len()))]
431 fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
432 let hash = blob.hash();
433 let path = hash_path(&blobs_dir(&self.root), &hash);
434
435 if !path.exists() {
436 let content = blob.content();
437 let data = compress(content, &self.compression)?.unwrap_or_else(|| content.to_vec());
438 trace!(compressed_size = data.len(), "Writing blob");
439 self.write_loose_object_atomic(&path, &data)?;
440 } else {
441 trace!("Blob already exists, skipping write");
442 }
443 if let Ok(mut cache) = self.recent_blobs.write() {
444 cache.insert(hash, blob.clone());
445 }
446
447 Ok(hash)
448 }
449
450 #[instrument(skip(self, blob), fields(hash = %hash.short()))]
451 fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
452 if blob.hash() != hash {
453 return Err(HeddleError::Corruption {
454 expected: hash,
455 found: blob.hash(),
456 });
457 }
458
459 let path = hash_path(&blobs_dir(&self.root), &hash);
460
461 if !path.exists() {
462 let content = blob.content();
463 let data = compress(content, &self.compression)?.unwrap_or_else(|| content.to_vec());
464 trace!(
465 compressed_size = data.len(),
466 "Writing blob with precomputed hash"
467 );
468 self.write_loose_object_atomic(&path, &data)?;
469 }
470 if let Ok(mut cache) = self.recent_blobs.write() {
471 cache.insert(hash, blob.clone());
472 }
473
474 Ok(hash)
475 }
476
477 #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
478 fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
479 validate_blob_bytes(data, hash)?;
480
481 let path = hash_path(&blobs_dir(&self.root), &hash);
482 if !path.exists() {
483 trace!(
484 size = data.len(),
485 "Writing raw blob bytes with precomputed hash"
486 );
487 self.write_loose_object_atomic(&path, data)?;
488 }
489 if let Ok(mut cache) = self.recent_blobs.write() {
490 cache.insert(hash, Blob::from_slice(data));
491 }
492
493 Ok(hash)
494 }
495
496 #[instrument(skip(self), fields(hash = %hash.short()))]
497 fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
498 if self.try_has_blob_once(hash)? {
499 return Ok(true);
500 }
501 if self.reload_packs_if_stale()? {
502 return self.try_has_blob_once(hash);
503 }
504 Ok(false)
505 }
506
507 fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
524 let path = hash_path(&blobs_dir(&self.root), hash);
525 if let Ok(verified) = self.verified_loose_blobs.read()
530 && verified.get(hash).is_some()
531 && path.exists()
532 {
533 return Some(path);
534 }
535
536 let (header, _) = read_file_header(&path, BLOB_HEADER_PEEK).ok().flatten()?;
549 if is_compressed(&header) {
550 return None;
551 }
552 let bytes = read_file_bytes(&path).ok().flatten()?;
553 let actual = ContentHash::compute_typed("blob", bytes.as_slice());
554 if actual != *hash {
555 return None;
559 }
560 if let Ok(mut verified) = self.verified_loose_blobs.write() {
561 verified.insert(*hash, ());
562 }
563 Some(path)
564 }
565
566 #[instrument(skip(self), fields(hash = %hash.short()))]
579 fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
580 let path = hash_path(&blobs_dir(&self.root), hash);
581
582 if let Some((header, _)) = read_file_header(&path, 9)?
584 && !is_compressed(&header)
585 {
586 trace!("Blob already loose+uncompressed; skipping promotion");
587 return Ok(false);
588 }
589
590 let blob = self.get_blob(hash)?.ok_or_else(|| {
594 HeddleError::NotFound(format!(
595 "blob {} not found in store; cannot promote to loose-uncompressed",
596 hash
597 ))
598 })?;
599
600 debug!(
613 size = blob.content().len(),
614 "Promoting blob to loose-uncompressed canonical store"
615 );
616 self.write_loose_object_cache(&path, blob.content())?;
617 if let Ok(mut verified) = self.verified_loose_blobs.write() {
618 verified.insert(*hash, ());
619 }
620 Ok(true)
621 }
622
623 #[instrument(skip(self), fields(hash = %hash.short()))]
624 fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
625 if let Some(size) = self.try_get_blob_size_once(hash)? {
626 return Ok(Some(size));
627 }
628 if self.reload_packs_if_stale()?
632 && let Some(size) = self.try_get_blob_size_once(hash)?
633 {
634 return Ok(Some(size));
635 }
636 Ok(None)
637 }
638
639 #[instrument(skip(self), fields(hash = %hash.short()))]
640 fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
641 if let Some(tree) = self.try_get_tree_once(hash)? {
642 return Ok(Some(tree));
643 }
644 if self.reload_packs_if_stale()?
645 && let Some(tree) = self.try_get_tree_once(hash)?
646 {
647 return Ok(Some(tree));
648 }
649 trace!("Tree not found");
650 Ok(None)
651 }
652
653 #[instrument(skip(self, tree), fields(entry_count = tree.entries().len()))]
654 fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
655 let hash = tree.hash();
656 let path = hash_path(&trees_dir(&self.root), &hash);
657
658 if !path.exists() {
659 let serialized = rmp_serde::to_vec(tree)?;
660 let data = compress(&serialized, &self.compression)?.unwrap_or(serialized);
661 trace!(compressed_size = data.len(), "Writing tree");
662 self.write_loose_object_atomic(&path, &data)?;
663 } else {
664 trace!("Tree already exists, skipping write");
665 }
666 if let Ok(mut cache) = self.recent_trees.write() {
667 cache.insert(hash, tree.clone());
668 }
669
670 Ok(hash)
671 }
672
673 #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
674 fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
675 let tree = validate_tree_serialized(data, hash)?;
676
677 let path = hash_path(&trees_dir(&self.root), &hash);
678 if !path.exists() {
679 trace!(size = data.len(), "Writing raw serialized tree");
680 self.write_loose_object_atomic(&path, data)?;
681 }
682 if let Ok(mut cache) = self.recent_trees.write() {
683 cache.insert(hash, tree);
684 }
685
686 Ok(hash)
687 }
688
689 #[instrument(skip(self), fields(hash = %hash.short()))]
690 fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
691 if self.try_has_tree_once(hash)? {
692 return Ok(true);
693 }
694 if self.reload_packs_if_stale()? {
695 return self.try_has_tree_once(hash);
696 }
697 Ok(false)
698 }
699
700 #[instrument(skip(self), fields(id = %id.short()))]
701 fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
702 if let Some(state) = self.try_get_state_once(id)? {
703 return Ok(Some(state));
704 }
705 if self.reload_packs_if_stale()?
706 && let Some(state) = self.try_get_state_once(id)?
707 {
708 return Ok(Some(state));
709 }
710 trace!("State not found");
711 Ok(None)
712 }
713
714 #[instrument(skip(self, state), fields(id = %state.change_id.short()))]
715 fn put_state(&self, state: &State) -> Result<()> {
716 let path = state_path(&self.root, &state.change_id);
717 let serialized = rmp_serde::to_vec(state)?;
718 let data = compress(&serialized, &self.compression)?.unwrap_or(serialized);
719 trace!(compressed_size = data.len(), "Writing state");
720 self.write_loose_object_atomic(&path, &data)?;
721 if let Ok(mut cache) = self.recent_states.write() {
722 cache.insert(state.change_id, state.clone());
723 }
724 Ok(())
725 }
726
727 #[instrument(skip(self, data), fields(id = %id.short(), size = data.len()))]
728 fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
729 let state = validate_state_serialized(data, id)?;
730 let path = state_path(&self.root, &id);
731 trace!(size = data.len(), "Writing raw serialized state");
732 self.write_loose_object_atomic(&path, data)?;
733 if let Ok(mut cache) = self.recent_states.write() {
734 cache.insert(id, state);
735 }
736 Ok(())
737 }
738
739 #[instrument(skip(self), fields(id = %id.short()))]
740 fn has_state(&self, id: &ChangeId) -> Result<bool> {
741 if self.try_has_state_once(id)? {
742 return Ok(true);
743 }
744 if self.reload_packs_if_stale()? {
745 return self.try_has_state_once(id);
746 }
747 Ok(false)
748 }
749
750 #[instrument(skip(self))]
751 fn list_states(&self) -> Result<Vec<ChangeId>> {
752 self.reload_packs_if_stale()?;
753
754 let dir = states_dir(&self.root);
755 if !dir.exists() {
756 return Ok(Vec::new());
757 }
758
759 let mut states = Vec::new();
760 for entry in fs::read_dir(&dir)? {
761 let entry = entry?;
762 let path = entry.path();
763 if let Some(name) = path.file_stem()
764 && let Some(name_str) = name.to_str()
765 && let Ok(id) = ChangeId::parse(name_str)
766 {
767 states.push(id);
768 }
769 }
770 if let Ok(manager) = self.pack_manager().read() {
771 for id in manager.list_all_ids()? {
772 if let PackObjectId::ChangeId(change_id) = id
773 && !states.contains(&change_id)
774 {
775 states.push(change_id);
776 }
777 }
778 }
779 debug!(count = states.len(), "Listed states");
780 Ok(states)
781 }
782
783 #[instrument(skip(self), fields(id = %id))]
784 fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
785 let path = action_path(&self.root, id);
786 if !path.exists()
787 && let Ok(manager) = self.pack_manager().read()
788 && let Some((obj_type, data)) = manager.get_hashed_object(id.as_hash())?
789 && obj_type == ObjectType::Action
790 {
791 trace!("Found action in packfile");
792 let action = validate_loaded_action(id, rmp_serde::from_slice(&data)?)?;
793 return Ok(Some(action));
794 }
795 match read_file_bytes(&path)? {
796 Some(data) => {
797 trace!(size = data.as_slice().len(), "Action data read");
798 let decoded = if is_compressed(data.as_slice()) {
799 decompress(data.as_slice())?
800 } else {
801 data.into_vec()
802 };
803 let action = validate_loaded_action(id, rmp_serde::from_slice(&decoded)?)?;
804 Ok(Some(action))
805 }
806 None => {
807 trace!("Action not found");
808 Ok(None)
809 }
810 }
811 }
812
813 #[instrument(skip(self, action))]
814 fn put_action(&self, action: &mut Action) -> Result<ActionId> {
815 let id = action.id();
816 let path = action_path(&self.root, &id);
817
818 if !path.exists() {
819 let serialized = rmp_serde::to_vec(action)?;
820 let data = compress(&serialized, &self.compression)?.unwrap_or(serialized);
821 trace!(id = %id, compressed_size = data.len(), "Writing action");
822 self.write_loose_object_atomic(&path, &data)?;
823 }
824
825 Ok(id)
826 }
827
828 #[instrument(skip(self))]
829 fn list_actions(&self) -> Result<Vec<ActionId>> {
830 let dir = actions_dir(&self.root);
831 let mut actions = Vec::new();
832 if dir.exists() {
833 for entry in fs::read_dir(&dir)? {
834 let entry = entry?;
835 let path = entry.path();
836 if let Some(name) = path.file_stem()
837 && let Some(name_str) = name.to_str()
838 && let Ok(hash) = ContentHash::from_hex(name_str)
839 {
840 actions.push(ActionId::from_hash(hash));
841 }
842 }
843 }
844 if let Ok(manager) = self.pack_manager().read() {
845 for id in manager.list_all_ids()? {
846 if let PackObjectId::Hash(hash) = id
847 && !actions.iter().any(|action_id| action_id.as_hash() == &hash)
848 && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
849 && obj_type == ObjectType::Action
850 {
851 actions.push(ActionId::from_hash(hash));
852 }
853 }
854 }
855 debug!(count = actions.len(), "Listed actions");
856 Ok(actions)
857 }
858
859 #[instrument(skip(self))]
860 fn list_blobs(&self) -> Result<Vec<ContentHash>> {
861 let dir = blobs_dir(&self.root);
862 let mut blobs = list_hashes_from_dir(&dir)?;
863 if let Ok(manager) = self.pack_manager().read() {
864 for id in manager.list_all_ids()? {
865 if let PackObjectId::Hash(hash) = id
866 && !blobs.contains(&hash)
867 && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
868 && obj_type == ObjectType::Blob
869 {
870 blobs.push(hash);
871 }
872 }
873 }
874 Ok(blobs)
875 }
876
877 #[instrument(skip(self))]
878 fn list_trees(&self) -> Result<Vec<ContentHash>> {
879 let dir = trees_dir(&self.root);
880 let mut trees = list_hashes_from_dir(&dir)?;
881 if let Ok(manager) = self.pack_manager().read() {
882 for id in manager.list_all_ids()? {
883 if let PackObjectId::Hash(hash) = id
884 && !trees.contains(&hash)
885 && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
886 && obj_type == ObjectType::Tree
887 {
888 trees.push(hash);
889 }
890 }
891 }
892 Ok(trees)
893 }
894
895 #[instrument(skip(self))]
896 fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
897 self.pack_objects_impl(aggressive)
898 }
899
900 #[instrument(skip(self), fields(id = ?id))]
901 fn get_pack_object(&self, id: &PackObjectId) -> Result<Option<(ObjectType, Vec<u8>)>> {
902 if let Ok(manager) = self.pack_manager().read()
903 && let Some((obj_type, data)) = manager.get_object(id)?
904 {
905 return Ok(Some((obj_type, data)));
906 }
907
908 match id {
909 PackObjectId::Hash(hash) => {
910 if let Some(blob) = self.get_blob(hash)? {
911 return Ok(Some((ObjectType::Blob, blob.content().to_vec())));
912 }
913 if let Some(tree) = self.get_tree(hash)? {
914 return Ok(Some((ObjectType::Tree, rmp_serde::to_vec_named(&tree)?)));
915 }
916 if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
917 return Ok(Some((
918 ObjectType::Action,
919 rmp_serde::to_vec_named(&action)?,
920 )));
921 }
922 Ok(None)
923 }
924 PackObjectId::ChangeId(change_id) => {
925 if let Some(state) = self.get_state(change_id)? {
926 Ok(Some((ObjectType::State, rmp_serde::to_vec_named(&state)?)))
927 } else {
928 Ok(None)
929 }
930 }
931 }
932 }
933
934 #[instrument(skip(self, pack_data, index_data))]
935 fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<PackObjectId>> {
936 let reader = crate::store::pack::PackReader::from_slice(pack_data, index_data)?;
937 let ids = validate_and_list_pack(&reader)?;
938 self.install_pack_files(pack_data, index_data)?;
939 self.clear_recent_object_caches();
940 Ok(ids)
941 }
942
943 #[instrument(skip(self, blobs), fields(count = blobs.len()))]
944 fn put_blobs_packed(&self, blobs: Vec<(crate::object::ContentHash, Vec<u8>)>) -> Result<()> {
945 self.put_blobs_packed_impl(blobs)
946 }
947
948 #[instrument(skip(self))]
949 fn install_pack_streaming(
950 &self,
951 pack_path: &std::path::Path,
952 index_path: &std::path::Path,
953 ) -> Result<Vec<PackObjectId>> {
954 let ids = {
960 let reader = crate::store::pack::PackReader::open(pack_path, index_path)?;
961 validate_and_list_pack(&reader)?
962 };
963 self.install_pack_files_streaming(pack_path, index_path)?;
964 Ok(ids)
965 }
966
967 #[instrument(skip(self))]
968 fn prune_loose_objects(&self) -> Result<(u64, u64)> {
969 self.prune_loose_objects_impl()
970 }
971
972 #[instrument(skip(self))]
973 fn begin_snapshot_write_batch(&self) -> Result<()> {
974 self.begin_snapshot_write_batch_impl()
975 }
976
977 #[instrument(skip(self))]
978 fn flush_snapshot_write_batch(&self) -> Result<()> {
979 self.flush_snapshot_write_batch_impl()
980 }
981
982 #[instrument(skip(self))]
983 fn abort_snapshot_write_batch(&self) {
984 self.abort_snapshot_write_batch_impl();
985 }
986
987 fn has_redactions_for_blob(&self, blob: &ContentHash) -> Result<bool> {
988 Ok(redaction_path(&self.root, blob).exists())
989 }
990
991 fn get_redactions_bytes_for_blob(&self, blob: &ContentHash) -> Result<Option<Vec<u8>>> {
992 let path = redaction_path(&self.root, blob);
993 match fs::read(&path) {
994 Ok(bytes) => Ok(Some(bytes)),
995 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
996 Err(err) => Err(HeddleError::Io(err)),
997 }
998 }
999
1000 fn put_redactions_bytes_for_blob(&self, blob: &ContentHash, bytes: &[u8]) -> Result<()> {
1001 let dir = redactions_dir(&self.root);
1002 if !dir.exists() {
1003 fs::create_dir_all(&dir)?;
1004 }
1005 let path = redaction_path(&self.root, blob);
1006 crate::fs_atomic::write_file_atomic(&path, bytes)?;
1007 Ok(())
1008 }
1009
1010 fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
1011 let dir = redactions_dir(&self.root);
1012 if !dir.exists() {
1013 return Ok(Vec::new());
1014 }
1015 let mut out = Vec::new();
1016 for entry in fs::read_dir(&dir)? {
1017 let entry = entry?;
1018 let path = entry.path();
1019 if path.extension().and_then(|e| e.to_str()) != Some("bin") {
1020 continue;
1021 }
1022 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1023 continue;
1024 };
1025 if let Ok(hash) = ContentHash::from_hex(stem) {
1026 out.push(hash);
1027 }
1028 }
1029 Ok(out)
1030 }
1031
1032 fn has_state_visibility_for_state(&self, state: &ChangeId) -> Result<bool> {
1033 Ok(state_visibility_path(&self.root, state).exists())
1034 }
1035
1036 fn get_state_visibility_bytes_for_state(&self, state: &ChangeId) -> Result<Option<Vec<u8>>> {
1037 let path = state_visibility_path(&self.root, state);
1038 match fs::read(&path) {
1039 Ok(bytes) => Ok(Some(bytes)),
1040 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
1041 Err(err) => Err(HeddleError::Io(err)),
1042 }
1043 }
1044
1045 fn put_state_visibility_bytes_for_state(&self, state: &ChangeId, bytes: &[u8]) -> Result<()> {
1046 let dir = state_visibility_dir(&self.root);
1047 if !dir.exists() {
1048 fs::create_dir_all(&dir)?;
1049 }
1050 let path = state_visibility_path(&self.root, state);
1051 crate::fs_atomic::write_file_atomic(&path, bytes)?;
1052 Ok(())
1053 }
1054
1055 fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
1056 let dir = state_visibility_dir(&self.root);
1057 if !dir.exists() {
1058 return Ok(Vec::new());
1059 }
1060 let mut out = Vec::new();
1061 for entry in fs::read_dir(&dir)? {
1062 let entry = entry?;
1063 let path = entry.path();
1064 if path.extension().and_then(|e| e.to_str()) != Some("bin") {
1065 continue;
1066 }
1067 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1068 continue;
1069 };
1070 if let Ok(state) = ChangeId::parse(stem) {
1071 out.push(state);
1072 }
1073 }
1074 Ok(out)
1075 }
1076}