1use std::{
5 fs,
6 path::{Path, PathBuf},
7};
8
9use tracing::{debug, instrument, trace};
10
11use super::{
12 FsStore,
13 fs_io::{list_hashes_from_dir, read_file_bytes, read_file_header},
14 fs_paths::{
15 action_path, actions_dir, blobs_dir, hash_path, redaction_path, redactions_dir, state_path,
16 state_visibility_dir, state_visibility_path, states_dir, trees_dir,
17 },
18};
19use crate::{
20 object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree},
21 store::{
22 HeddleError, ObjectStore, Result, codec,
23 compression::{header_uncompressed_size, is_compressed},
24 pack::{ObjectType, PackManager, PackObjectId},
25 },
26};
27
28const BLOB_HEADER_PEEK: usize = 13;
36
37fn validate_loaded_tree(tree: Tree) -> Result<Tree> {
38 tree.validate()?;
39 Ok(tree)
40}
41
42fn validate_blob_bytes(data: &[u8], hash: ContentHash) -> Result<()> {
43 let mut hasher = ContentHash::typed_hasher("blob", data.len() as u64);
44 hasher.update(data);
45 let found = ContentHash::from_bytes(hasher.finalize().into());
46 if found != hash {
47 return Err(HeddleError::Corruption {
48 expected: hash,
49 found,
50 });
51 }
52
53 Ok(())
54}
55
56fn validate_tree_serialized(data: &[u8], hash: ContentHash) -> Result<Tree> {
57 let tree: Tree = rmp_serde::from_slice(data)?;
58 let tree = validate_loaded_tree(tree)?;
59 let found = tree.hash();
60 if found != hash {
61 return Err(HeddleError::Corruption {
62 expected: hash,
63 found,
64 });
65 }
66
67 Ok(tree)
68}
69
70fn validate_loaded_state(requested_id: &ChangeId, state: State) -> Result<State> {
71 if state.change_id != *requested_id {
72 return Err(HeddleError::InvalidObject(format!(
73 "state change_id mismatch: requested {}, found {}",
74 requested_id, state.change_id
75 )));
76 }
77
78 Ok(state)
79}
80
81fn validate_state_serialized(data: &[u8], id: ChangeId) -> Result<State> {
82 let state: State = rmp_serde::from_slice(data)?;
83 validate_loaded_state(&id, state)
84}
85
86fn validate_loaded_action(requested_id: &ActionId, action: Action) -> Result<Action> {
87 let found_id = action.compute_id();
88 if found_id != *requested_id {
89 return Err(HeddleError::InvalidObject(format!(
90 "action id mismatch: requested {}, found {}",
91 requested_id, found_id
92 )));
93 }
94
95 Ok(action)
96}
97
98fn validate_action_serialized(data: &[u8], id: ActionId) -> Result<Action> {
99 let action: Action = rmp_serde::from_slice(data)?;
100 validate_loaded_action(&id, action)
101}
102
103fn validate_and_list_pack(reader: &crate::store::pack::PackReader) -> Result<Vec<PackObjectId>> {
111 let ids = reader.list_ids();
112 for id in &ids {
113 let Some((obj_type, data)) = reader.get_object_bytes(id)? else {
114 continue;
115 };
116 validate_pack_entry(id, obj_type, data.as_ref())?;
117 }
118 Ok(ids)
119}
120
121fn state_entries_from_pack(
122 reader: &crate::store::pack::PackReader,
123 ids: &[PackObjectId],
124) -> Result<Vec<(ChangeId, Vec<u8>)>> {
125 let mut states = Vec::new();
126 for id in ids {
127 let PackObjectId::ChangeId(change_id) = id else {
128 continue;
129 };
130 let Some((obj_type, data)) = reader.get_object(id)? else {
131 continue;
132 };
133 if obj_type != ObjectType::State {
134 return Err(HeddleError::InvalidObject(format!(
135 "pack id {} is indexed as {:?}, expected State",
136 change_id.to_string_full(),
137 obj_type
138 )));
139 }
140 validate_state_serialized(&data, *change_id)?;
141 states.push((*change_id, data));
142 }
143 Ok(states)
144}
145
146fn validate_pack_entry(id: &PackObjectId, obj_type: ObjectType, data: &[u8]) -> Result<()> {
147 match (id, obj_type) {
148 (PackObjectId::Hash(hash), ObjectType::Blob) => validate_blob_bytes(data, *hash),
149 (PackObjectId::Hash(hash), ObjectType::Tree) => {
150 validate_tree_serialized(data, *hash).map(|_| ())
151 }
152 (PackObjectId::Hash(hash), ObjectType::Action) => {
153 validate_action_serialized(data, ActionId::from_hash(*hash)).map(|_| ())
154 }
155 (PackObjectId::ChangeId(change_id), ObjectType::State) => {
156 validate_state_serialized(data, *change_id).map(|_| ())
157 }
158 _ => Err(HeddleError::InvalidObject(format!(
159 "unsupported native pack object: {:?} {:?}",
160 id, obj_type
161 ))),
162 }
163}
164
165impl FsStore {
166 fn try_get_blob_once(&self, hash: &ContentHash) -> Result<Option<Blob>> {
169 let path = hash_path(&blobs_dir(&self.root), hash);
170 let loose_exists = path.exists();
171 let pack_has = if loose_exists {
172 false
173 } else if let Ok(manager) = self.pack_manager().read() {
174 manager.has_object(hash)
175 } else {
176 false
177 };
178 if (loose_exists || pack_has)
179 && let Ok(cache) = self.recent_blobs.read()
180 && let Some(blob) = cache.get(hash)
181 {
182 trace!("Found blob in recent object cache");
183 return Ok(Some(blob.clone()));
184 }
185
186 if let Ok(manager) = self.pack_manager().read()
187 && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
188 && obj_type == ObjectType::Blob
189 {
190 trace!("Found blob in packfile");
191 return Ok(Some(Blob::new(data)));
199 }
200
201 match read_file_bytes(&path)? {
202 Some(data) => {
203 trace!(size = data.as_slice().len(), "Blob data read");
204 let content = codec::decode_blob_content(data.as_slice())?;
205 let blob = Blob::new(content);
206 if blob.hash() != *hash {
213 return Err(HeddleError::Corruption {
214 expected: *hash,
215 found: blob.hash(),
216 });
217 }
218 Ok(Some(blob))
219 }
220 None => Ok(None),
221 }
222 }
223
224 fn loose_or_packed(
229 &self,
230 loose_path: &Path,
231 in_pack: impl FnOnce(&PackManager) -> bool,
232 ) -> Result<bool> {
233 if loose_path.exists() {
234 return Ok(true);
235 }
236 if let Ok(manager) = self.pack_manager().read() {
237 return Ok(in_pack(&manager));
238 }
239 Ok(false)
240 }
241
242 fn try_has_blob_once(&self, hash: &ContentHash) -> Result<bool> {
243 let path = hash_path(&blobs_dir(&self.root), hash);
244 self.loose_or_packed(&path, |m| m.has_object(hash))
245 }
246
247 fn try_get_blob_size_once(&self, hash: &ContentHash) -> Result<Option<u64>> {
262 if let Ok(cache) = self.recent_blobs.read()
263 && let Some(blob) = cache.get(hash)
264 {
265 return Ok(Some(blob.content().len() as u64));
266 }
267
268 let path = hash_path(&blobs_dir(&self.root), hash);
269 if let Some((header, file_len)) = read_file_header(&path, BLOB_HEADER_PEEK)? {
270 if let Some(size) = header_uncompressed_size(&header) {
271 return Ok(Some(size));
272 }
273 return Ok(Some(file_len));
276 }
277
278 if let Ok(manager) = self.pack_manager().read()
279 && let Some(size) = manager.get_hashed_object_size(hash)?
280 {
281 return Ok(Some(size));
282 }
283 Ok(None)
284 }
285
286 fn try_get_tree_once(&self, hash: &ContentHash) -> Result<Option<Tree>> {
287 if let Ok(cache) = self.recent_trees.read()
294 && let Some(tree) = cache.get(hash)
295 {
296 trace!("Found tree in recent object cache");
297 return Ok(Some(tree.clone()));
298 }
299
300 if let Ok(manager) = self.pack_manager().read()
301 && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
302 && obj_type == ObjectType::Tree
303 {
304 trace!("Found tree in packfile");
305 let tree = validate_loaded_tree(rmp_serde::from_slice(&data)?)?;
306 if tree.hash() != *hash {
307 return Err(HeddleError::Corruption {
308 expected: *hash,
309 found: tree.hash(),
310 });
311 }
312 return Ok(Some(tree));
313 }
314
315 let path = hash_path(&trees_dir(&self.root), hash);
316 match read_file_bytes(&path)? {
317 Some(data) => {
318 trace!(size = data.as_slice().len(), "Tree data read");
319 let tree = validate_loaded_tree(codec::decode_tree(data.as_slice())?)?;
320 if tree.hash() != *hash {
321 return Err(HeddleError::Corruption {
322 expected: *hash,
323 found: tree.hash(),
324 });
325 }
326 if let Ok(mut cache) = self.recent_trees.write() {
327 cache.insert(*hash, tree.clone());
328 }
329 Ok(Some(tree))
330 }
331 None => Ok(None),
332 }
333 }
334
335 fn try_has_tree_once(&self, hash: &ContentHash) -> Result<bool> {
336 let path = hash_path(&trees_dir(&self.root), hash);
337 self.loose_or_packed(&path, |m| m.has_object(hash))
338 }
339
340 fn try_get_state_once(&self, id: &ChangeId) -> Result<Option<State>> {
341 let path = state_path(&self.root, id);
342 let loose_exists = path.exists();
343 let pack_has = if loose_exists {
344 false
345 } else if let Ok(manager) = self.pack_manager().read() {
346 manager.has_object_id(&PackObjectId::ChangeId(*id))
347 } else {
348 false
349 };
350 if (loose_exists || pack_has)
351 && let Ok(cache) = self.recent_states.read()
352 && let Some(state) = cache.get(id)
353 {
354 trace!("Found state in recent object cache");
355 return Ok(Some(state.clone()));
356 }
357
358 if loose_exists && let Some(data) = read_file_bytes(&path)? {
368 trace!(
369 size = data.as_slice().len(),
370 "State read from loose object (shadows any packed copy)"
371 );
372 let state = validate_loaded_state(id, codec::decode_state(data.as_slice())?)?;
373 if let Ok(mut cache) = self.recent_states.write() {
374 cache.insert(*id, state.clone());
375 }
376 return Ok(Some(state));
377 }
378
379 if let Ok(manager) = self.pack_manager().read()
380 && let Some((obj_type, data)) = manager.get_object(&PackObjectId::ChangeId(*id))?
381 && obj_type == ObjectType::State
382 {
383 trace!("Found state in packfile");
384 let state = validate_loaded_state(id, rmp_serde::from_slice(&data)?)?;
385 if let Ok(mut cache) = self.recent_states.write() {
386 cache.insert(*id, state.clone());
387 }
388 return Ok(Some(state));
389 }
390
391 Ok(None)
392 }
393
394 fn try_has_state_once(&self, id: &ChangeId) -> Result<bool> {
395 let path = state_path(&self.root, id);
396 self.loose_or_packed(&path, |m| m.has_object_id(&PackObjectId::ChangeId(*id)))
397 }
398}
399
400impl ObjectStore for FsStore {
401 fn clear_recent_caches(&self) {
402 self.clear_recent_object_caches();
403 }
404
405 fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
412 if let Ok(manager) = self.pack_manager().read()
413 && let Some((obj_type, data)) = manager.get_hashed_object_bytes(hash)?
414 && obj_type == crate::store::pack::ObjectType::Blob
415 {
416 return Ok(Some(data));
417 }
418 Ok(self
419 .get_blob(hash)?
420 .map(|blob| bytes::Bytes::from(blob.into_content())))
421 }
422
423 #[instrument(skip(self), fields(hash = %hash.short()))]
424 fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
425 if let Some(blob) = self.try_get_blob_once(hash)? {
426 return Ok(Some(blob));
427 }
428 if self.reload_packs_if_stale()?
433 && let Some(blob) = self.try_get_blob_once(hash)?
434 {
435 return Ok(Some(blob));
436 }
437 trace!("Blob not found");
438 Ok(None)
439 }
440
441 #[instrument(skip(self, blob), fields(size = blob.content().len()))]
442 fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
443 let hash = blob.hash();
444 let path = hash_path(&blobs_dir(&self.root), &hash);
445
446 if !path.exists() {
447 let data = codec::encode_blob_content(blob.content(), &self.compression)?;
448 trace!(compressed_size = data.len(), "Writing blob");
449 self.write_loose_object_atomic(&path, &data)?;
450 } else {
451 trace!("Blob already exists, skipping write");
452 }
453 if let Ok(mut cache) = self.recent_blobs.write() {
454 cache.insert(hash, blob.clone());
455 }
456
457 Ok(hash)
458 }
459
460 #[instrument(skip(self, blob), fields(hash = %hash.short()))]
461 fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
462 if blob.hash() != hash {
463 return Err(HeddleError::Corruption {
464 expected: hash,
465 found: blob.hash(),
466 });
467 }
468
469 let path = hash_path(&blobs_dir(&self.root), &hash);
470
471 if !path.exists() {
472 let data = codec::encode_blob_content(blob.content(), &self.compression)?;
473 trace!(
474 compressed_size = data.len(),
475 "Writing blob with precomputed hash"
476 );
477 self.write_loose_object_atomic(&path, &data)?;
478 }
479 if let Ok(mut cache) = self.recent_blobs.write() {
480 cache.insert(hash, blob.clone());
481 }
482
483 Ok(hash)
484 }
485
486 #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
487 fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
488 validate_blob_bytes(data, hash)?;
489
490 let path = hash_path(&blobs_dir(&self.root), &hash);
491 if !path.exists() {
492 trace!(
493 size = data.len(),
494 "Writing raw blob bytes with precomputed hash"
495 );
496 self.write_loose_object_atomic(&path, data)?;
497 }
498 if let Ok(mut cache) = self.recent_blobs.write() {
499 cache.insert(hash, Blob::from_slice(data));
500 }
501
502 Ok(hash)
503 }
504
505 #[instrument(skip(self), fields(hash = %hash.short()))]
506 fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
507 if self.try_has_blob_once(hash)? {
508 return Ok(true);
509 }
510 if self.reload_packs_if_stale()? {
511 return self.try_has_blob_once(hash);
512 }
513 Ok(false)
514 }
515
516 fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
533 let path = hash_path(&blobs_dir(&self.root), hash);
534 if let Ok(verified) = self.verified_loose_blobs.read()
539 && verified.get(hash).is_some()
540 && path.exists()
541 {
542 return Some(path);
543 }
544
545 let (header, _) = read_file_header(&path, BLOB_HEADER_PEEK).ok().flatten()?;
558 if is_compressed(&header) {
559 return None;
560 }
561 let bytes = read_file_bytes(&path).ok().flatten()?;
562 let actual = ContentHash::compute_typed("blob", bytes.as_slice());
563 if actual != *hash {
564 return None;
568 }
569 if let Ok(mut verified) = self.verified_loose_blobs.write() {
570 verified.insert(*hash, ());
571 }
572 Some(path)
573 }
574
575 #[instrument(skip(self), fields(hash = %hash.short()))]
588 fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
589 let path = hash_path(&blobs_dir(&self.root), hash);
590
591 if let Some((header, _)) = read_file_header(&path, 9)?
593 && !is_compressed(&header)
594 {
595 trace!("Blob already loose+uncompressed; skipping promotion");
596 return Ok(false);
597 }
598
599 let blob = self.get_blob(hash)?.ok_or_else(|| {
603 HeddleError::NotFound(format!(
604 "blob {} not found in store; cannot promote to loose-uncompressed",
605 hash
606 ))
607 })?;
608
609 debug!(
622 size = blob.content().len(),
623 "Promoting blob to loose-uncompressed canonical store"
624 );
625 self.write_loose_object_cache(&path, blob.content())?;
626 if let Ok(mut verified) = self.verified_loose_blobs.write() {
627 verified.insert(*hash, ());
628 }
629 Ok(true)
630 }
631
632 #[instrument(skip(self), fields(hash = %hash.short()))]
633 fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
634 if let Some(size) = self.try_get_blob_size_once(hash)? {
635 return Ok(Some(size));
636 }
637 if self.reload_packs_if_stale()?
641 && let Some(size) = self.try_get_blob_size_once(hash)?
642 {
643 return Ok(Some(size));
644 }
645 Ok(None)
646 }
647
648 #[instrument(skip(self), fields(hash = %hash.short()))]
649 fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
650 if let Some(tree) = self.try_get_tree_once(hash)? {
651 return Ok(Some(tree));
652 }
653 if self.reload_packs_if_stale()?
654 && let Some(tree) = self.try_get_tree_once(hash)?
655 {
656 return Ok(Some(tree));
657 }
658 trace!("Tree not found");
659 Ok(None)
660 }
661
662 #[instrument(skip(self, tree), fields(entry_count = tree.entries().len()))]
663 fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
664 let hash = tree.hash();
665 let path = hash_path(&trees_dir(&self.root), &hash);
666
667 if !path.exists() {
668 let (_, data) = codec::encode_tree(tree, &self.compression)?;
669 trace!(compressed_size = data.len(), "Writing tree");
670 self.write_loose_object_atomic(&path, &data)?;
671 } else {
672 trace!("Tree already exists, skipping write");
673 }
674 if let Ok(mut cache) = self.recent_trees.write() {
675 cache.insert(hash, tree.clone());
676 }
677
678 Ok(hash)
679 }
680
681 #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
682 fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
683 let tree = validate_tree_serialized(data, hash)?;
684
685 let path = hash_path(&trees_dir(&self.root), &hash);
686 if !path.exists() {
687 trace!(size = data.len(), "Writing raw serialized tree");
688 self.write_loose_object_atomic(&path, data)?;
689 }
690 if let Ok(mut cache) = self.recent_trees.write() {
691 cache.insert(hash, tree);
692 }
693
694 Ok(hash)
695 }
696
697 #[instrument(skip(self), fields(hash = %hash.short()))]
698 fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
699 if self.try_has_tree_once(hash)? {
700 return Ok(true);
701 }
702 if self.reload_packs_if_stale()? {
703 return self.try_has_tree_once(hash);
704 }
705 Ok(false)
706 }
707
708 #[instrument(skip(self), fields(id = %id.short()))]
709 fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
710 if let Some(state) = self.try_get_state_once(id)? {
711 return Ok(Some(state));
712 }
713 if self.reload_packs_if_stale()?
714 && let Some(state) = self.try_get_state_once(id)?
715 {
716 return Ok(Some(state));
717 }
718 trace!("State not found");
719 Ok(None)
720 }
721
722 #[instrument(skip(self, state), fields(id = %state.change_id.short()))]
723 fn put_state(&self, state: &State) -> Result<()> {
724 let path = state_path(&self.root, &state.change_id);
725 let data = codec::encode_state(state, &self.compression)?;
726 trace!(compressed_size = data.len(), "Writing state");
727 self.write_loose_object_atomic(&path, &data)?;
728 if let Ok(mut cache) = self.recent_states.write() {
729 cache.insert(state.change_id, state.clone());
730 }
731 Ok(())
732 }
733
734 #[instrument(skip(self, data), fields(id = %id.short(), size = data.len()))]
735 fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
736 let state = validate_state_serialized(data, id)?;
737 let path = state_path(&self.root, &id);
738 trace!(size = data.len(), "Writing raw serialized state");
739 self.write_loose_object_atomic(&path, data)?;
740 if let Ok(mut cache) = self.recent_states.write() {
741 cache.insert(id, state);
742 }
743 Ok(())
744 }
745
746 #[instrument(skip(self), fields(id = %id.short()))]
747 fn has_state(&self, id: &ChangeId) -> Result<bool> {
748 if self.try_has_state_once(id)? {
749 return Ok(true);
750 }
751 if self.reload_packs_if_stale()? {
752 return self.try_has_state_once(id);
753 }
754 Ok(false)
755 }
756
757 #[instrument(skip(self))]
758 fn list_states(&self) -> Result<Vec<ChangeId>> {
759 self.reload_packs_if_stale()?;
760
761 let dir = states_dir(&self.root);
762 if !dir.exists() {
763 return Ok(Vec::new());
764 }
765
766 let mut states = Vec::new();
767 for entry in fs::read_dir(&dir)? {
768 let entry = entry?;
769 let path = entry.path();
770 if let Some(name) = path.file_stem()
771 && let Some(name_str) = name.to_str()
772 && let Ok(id) = ChangeId::parse(name_str)
773 {
774 states.push(id);
775 }
776 }
777 if let Ok(manager) = self.pack_manager().read() {
778 for id in manager.list_all_ids()? {
779 if let PackObjectId::ChangeId(change_id) = id
780 && !states.contains(&change_id)
781 {
782 states.push(change_id);
783 }
784 }
785 }
786 debug!(count = states.len(), "Listed states");
787 Ok(states)
788 }
789
790 #[instrument(skip(self), fields(id = %id))]
791 fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
792 let path = action_path(&self.root, id);
793 if !path.exists()
794 && let Ok(manager) = self.pack_manager().read()
795 && let Some((obj_type, data)) = manager.get_hashed_object(id.as_hash())?
796 && obj_type == ObjectType::Action
797 {
798 trace!("Found action in packfile");
799 let action = validate_loaded_action(id, rmp_serde::from_slice(&data)?)?;
800 return Ok(Some(action));
801 }
802 match read_file_bytes(&path)? {
803 Some(data) => {
804 trace!(size = data.as_slice().len(), "Action data read");
805 let action = validate_loaded_action(id, codec::decode_action(data.as_slice())?)?;
806 Ok(Some(action))
807 }
808 None => {
809 trace!("Action not found");
810 Ok(None)
811 }
812 }
813 }
814
815 #[instrument(skip(self, action))]
816 fn put_action(&self, action: &mut Action) -> Result<ActionId> {
817 let id = action.id();
818 let path = action_path(&self.root, &id);
819
820 if !path.exists() {
821 let (_, data) = codec::encode_action(action, &self.compression)?;
822 trace!(id = %id, compressed_size = data.len(), "Writing action");
823 self.write_loose_object_atomic(&path, &data)?;
824 }
825
826 Ok(id)
827 }
828
829 #[instrument(skip(self))]
830 fn list_actions(&self) -> Result<Vec<ActionId>> {
831 let dir = actions_dir(&self.root);
832 let mut actions = Vec::new();
833 if dir.exists() {
834 for entry in fs::read_dir(&dir)? {
835 let entry = entry?;
836 let path = entry.path();
837 if let Some(name) = path.file_stem()
838 && let Some(name_str) = name.to_str()
839 && let Ok(hash) = ContentHash::from_hex(name_str)
840 {
841 actions.push(ActionId::from_hash(hash));
842 }
843 }
844 }
845 if let Ok(manager) = self.pack_manager().read() {
846 for id in manager.list_all_ids()? {
847 if let PackObjectId::Hash(hash) = id
848 && !actions.iter().any(|action_id| action_id.as_hash() == &hash)
849 && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
850 && obj_type == ObjectType::Action
851 {
852 actions.push(ActionId::from_hash(hash));
853 }
854 }
855 }
856 debug!(count = actions.len(), "Listed actions");
857 Ok(actions)
858 }
859
860 #[instrument(skip(self))]
861 fn list_blobs(&self) -> Result<Vec<ContentHash>> {
862 let dir = blobs_dir(&self.root);
863 let mut blobs = list_hashes_from_dir(&dir)?;
864 if let Ok(manager) = self.pack_manager().read() {
865 for id in manager.list_all_ids()? {
866 if let PackObjectId::Hash(hash) = id
867 && !blobs.contains(&hash)
868 && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
869 && obj_type == ObjectType::Blob
870 {
871 blobs.push(hash);
872 }
873 }
874 }
875 Ok(blobs)
876 }
877
878 #[instrument(skip(self))]
879 fn list_trees(&self) -> Result<Vec<ContentHash>> {
880 let dir = trees_dir(&self.root);
881 let mut trees = list_hashes_from_dir(&dir)?;
882 if let Ok(manager) = self.pack_manager().read() {
883 for id in manager.list_all_ids()? {
884 if let PackObjectId::Hash(hash) = id
885 && !trees.contains(&hash)
886 && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
887 && obj_type == ObjectType::Tree
888 {
889 trees.push(hash);
890 }
891 }
892 }
893 Ok(trees)
894 }
895
896 #[instrument(skip(self))]
897 fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
898 self.pack_objects_impl(aggressive)
899 }
900
901 #[instrument(skip(self), fields(id = ?id))]
902 fn get_pack_object(&self, id: &PackObjectId) -> Result<Option<(ObjectType, Vec<u8>)>> {
903 if let Ok(manager) = self.pack_manager().read()
904 && let Some((obj_type, data)) = manager.get_object(id)?
905 {
906 return Ok(Some((obj_type, data)));
907 }
908
909 match id {
910 PackObjectId::Hash(hash) => {
911 if let Some(blob) = self.get_blob(hash)? {
912 return Ok(Some((ObjectType::Blob, blob.content().to_vec())));
913 }
914 if let Some(tree) = self.get_tree(hash)? {
915 return Ok(Some((ObjectType::Tree, rmp_serde::to_vec_named(&tree)?)));
916 }
917 if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
918 return Ok(Some((
919 ObjectType::Action,
920 rmp_serde::to_vec_named(&action)?,
921 )));
922 }
923 Ok(None)
924 }
925 PackObjectId::ChangeId(change_id) => {
926 if let Some(state) = self.get_state(change_id)? {
927 Ok(Some((ObjectType::State, rmp_serde::to_vec_named(&state)?)))
928 } else {
929 Ok(None)
930 }
931 }
932 }
933 }
934
935 #[instrument(skip(self, pack_data, index_data))]
936 fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<PackObjectId>> {
937 let reader = crate::store::pack::PackReader::from_slice(pack_data, index_data)?;
938 let ids = validate_and_list_pack(&reader)?;
939 let state_entries = state_entries_from_pack(&reader, &ids)?;
940 self.install_pack_files(pack_data, index_data)?;
941 for (id, data) in state_entries {
942 self.put_state_serialized(&data, id)?;
943 }
944 self.clear_recent_object_caches();
945 Ok(ids)
946 }
947
948 #[instrument(skip(self, blobs), fields(count = blobs.len()))]
949 fn put_blobs_packed(&self, blobs: Vec<(crate::object::ContentHash, Vec<u8>)>) -> Result<()> {
950 self.put_blobs_packed_impl(blobs)
951 }
952
953 #[instrument(skip(self))]
954 fn install_pack_streaming(
955 &self,
956 pack_path: &std::path::Path,
957 index_path: &std::path::Path,
958 ) -> Result<Vec<PackObjectId>> {
959 let ids = {
965 let reader = crate::store::pack::PackReader::open(pack_path, index_path)?;
966 validate_and_list_pack(&reader)?
967 };
968 let state_entries = {
969 let reader = crate::store::pack::PackReader::open(pack_path, index_path)?;
970 state_entries_from_pack(&reader, &ids)?
971 };
972 self.install_pack_files_streaming(pack_path, index_path)?;
973 for (id, data) in state_entries {
974 self.put_state_serialized(&data, id)?;
975 }
976 Ok(ids)
977 }
978
979 #[instrument(skip(self))]
980 fn prune_loose_objects(&self) -> Result<(u64, u64)> {
981 self.prune_loose_objects_impl()
982 }
983
984 #[instrument(skip(self))]
985 fn begin_snapshot_write_batch(&self) -> Result<()> {
986 self.begin_snapshot_write_batch_impl()
987 }
988
989 #[instrument(skip(self))]
990 fn flush_snapshot_write_batch(&self) -> Result<()> {
991 self.flush_snapshot_write_batch_impl()
992 }
993
994 #[instrument(skip(self))]
995 fn abort_snapshot_write_batch(&self) {
996 self.abort_snapshot_write_batch_impl();
997 }
998
999 fn has_redactions_for_blob(&self, blob: &ContentHash) -> Result<bool> {
1000 Ok(redaction_path(&self.root, blob).exists())
1001 }
1002
1003 fn get_redactions_bytes_for_blob(&self, blob: &ContentHash) -> Result<Option<Vec<u8>>> {
1004 let path = redaction_path(&self.root, blob);
1005 match fs::read(&path) {
1006 Ok(bytes) => Ok(Some(bytes)),
1007 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
1008 Err(err) => Err(HeddleError::Io(err)),
1009 }
1010 }
1011
1012 fn put_redactions_bytes_for_blob(&self, blob: &ContentHash, bytes: &[u8]) -> Result<()> {
1013 let dir = redactions_dir(&self.root);
1014 if !dir.exists() {
1015 fs::create_dir_all(&dir)?;
1016 }
1017 let path = redaction_path(&self.root, blob);
1018 crate::fs_atomic::write_file_atomic(&path, bytes)?;
1019 Ok(())
1020 }
1021
1022 fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
1023 let dir = redactions_dir(&self.root);
1024 if !dir.exists() {
1025 return Ok(Vec::new());
1026 }
1027 let mut out = Vec::new();
1028 for entry in fs::read_dir(&dir)? {
1029 let entry = entry?;
1030 let path = entry.path();
1031 if path.extension().and_then(|e| e.to_str()) != Some("bin") {
1032 continue;
1033 }
1034 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1035 continue;
1036 };
1037 if let Ok(hash) = ContentHash::from_hex(stem) {
1038 out.push(hash);
1039 }
1040 }
1041 Ok(out)
1042 }
1043
1044 fn has_state_visibility_for_state(&self, state: &ChangeId) -> Result<bool> {
1045 Ok(state_visibility_path(&self.root, state).exists())
1046 }
1047
1048 fn get_state_visibility_bytes_for_state(&self, state: &ChangeId) -> Result<Option<Vec<u8>>> {
1049 let path = state_visibility_path(&self.root, state);
1050 match fs::read(&path) {
1051 Ok(bytes) => Ok(Some(bytes)),
1052 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
1053 Err(err) => Err(HeddleError::Io(err)),
1054 }
1055 }
1056
1057 fn put_state_visibility_bytes_for_state(&self, state: &ChangeId, bytes: &[u8]) -> Result<()> {
1058 let dir = state_visibility_dir(&self.root);
1059 if !dir.exists() {
1060 fs::create_dir_all(&dir)?;
1061 }
1062 let path = state_visibility_path(&self.root, state);
1063 crate::fs_atomic::write_file_atomic(&path, bytes)?;
1064 Ok(())
1065 }
1066
1067 fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
1068 let dir = state_visibility_dir(&self.root);
1069 if !dir.exists() {
1070 return Ok(Vec::new());
1071 }
1072 let mut out = Vec::new();
1073 for entry in fs::read_dir(&dir)? {
1074 let entry = entry?;
1075 let path = entry.path();
1076 if path.extension().and_then(|e| e.to_str()) != Some("bin") {
1077 continue;
1078 }
1079 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1080 continue;
1081 };
1082 if let Ok(state) = ChangeId::parse(stem) {
1083 out.push(state);
1084 }
1085 }
1086 Ok(out)
1087 }
1088}