1use std::{
5 fs,
6 path::{Path, PathBuf},
7};
8
9use heddle_format::compression::{header_uncompressed_size, is_compressed};
10use tracing::{debug, instrument, trace};
11
12use super::{
13 FsStore,
14 fs_io::{list_hashes_from_dir, read_file_bytes, read_file_header},
15 fs_paths::{
16 action_path, actions_dir, blobs_dir, hash_path, redaction_path, redactions_dir, state_path,
17 state_visibility_dir, state_visibility_path, states_dir, trees_dir,
18 },
19};
20use crate::{
21 object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree},
22 store::{
23 HeddleError, ObjectStore, Result, codec,
24 pack::{ObjectType, PackManager, PackObjectId},
25 },
26};
27
28const BLOB_HEADER_PEEK: usize = 13;
36
37fn validate_loaded_tree(tree: Tree) -> Result<Tree> {
38 tree.validate()?;
39 Ok(tree)
40}
41
42fn validate_blob_bytes(data: &[u8], hash: ContentHash) -> Result<()> {
43 let mut hasher = ContentHash::typed_hasher("blob", data.len() as u64);
44 hasher.update(data);
45 let found = ContentHash::from_bytes(hasher.finalize().into());
46 if found != hash {
47 return Err(HeddleError::Corruption {
48 expected: hash,
49 found,
50 });
51 }
52
53 Ok(())
54}
55
56fn validate_tree_serialized(data: &[u8], hash: ContentHash) -> Result<Tree> {
57 let tree = codec::decode_tree_serialized(data)?;
58 let tree = validate_loaded_tree(tree)?;
59 let found = tree.hash();
60 if found != hash {
61 return Err(HeddleError::Corruption {
62 expected: hash,
63 found,
64 });
65 }
66
67 Ok(tree)
68}
69
70fn validate_loaded_state(requested_id: &ChangeId, state: State) -> Result<State> {
71 if state.change_id != *requested_id {
72 return Err(HeddleError::InvalidObject(format!(
73 "state change_id mismatch: requested {}, found {}",
74 requested_id, state.change_id
75 )));
76 }
77
78 Ok(state)
79}
80
81fn validate_state_serialized(data: &[u8], id: ChangeId) -> Result<State> {
82 let state: State = rmp_serde::from_slice(data)?;
83 validate_loaded_state(&id, state)
84}
85
86fn validate_loaded_action(requested_id: &ActionId, action: Action) -> Result<Action> {
87 let found_id = action.compute_id();
88 if found_id != *requested_id {
89 return Err(HeddleError::InvalidObject(format!(
90 "action id mismatch: requested {}, found {}",
91 requested_id, found_id
92 )));
93 }
94
95 Ok(action)
96}
97
98fn validate_action_serialized(data: &[u8], id: ActionId) -> Result<Action> {
99 let action: Action = rmp_serde::from_slice(data)?;
100 validate_loaded_action(&id, action)
101}
102
103fn validate_and_list_pack(reader: &crate::store::pack::PackReader) -> Result<Vec<PackObjectId>> {
111 let ids = reader.list_ids();
112 for id in &ids {
113 let Some((obj_type, data)) = reader.get_object_bytes(id)? else {
114 continue;
115 };
116 validate_pack_entry(id, obj_type, data.as_ref())?;
117 }
118 Ok(ids)
119}
120
121fn state_entries_from_pack(
122 reader: &crate::store::pack::PackReader,
123 ids: &[PackObjectId],
124) -> Result<Vec<(ChangeId, Vec<u8>)>> {
125 let mut states = Vec::new();
126 for id in ids {
127 let PackObjectId::ChangeId(change_id) = id else {
128 continue;
129 };
130 let Some((obj_type, data)) = reader.get_object(id)? else {
131 continue;
132 };
133 if obj_type != ObjectType::State {
134 return Err(HeddleError::InvalidObject(format!(
135 "pack id {} is indexed as {:?}, expected State",
136 change_id.to_string_full(),
137 obj_type
138 )));
139 }
140 validate_state_serialized(&data, *change_id)?;
141 states.push((*change_id, data));
142 }
143 Ok(states)
144}
145
146fn validate_pack_entry(id: &PackObjectId, obj_type: ObjectType, data: &[u8]) -> Result<()> {
147 match (id, obj_type) {
148 (PackObjectId::Hash(hash), ObjectType::Blob) => validate_blob_bytes(data, *hash),
149 (PackObjectId::Hash(hash), ObjectType::Tree) => {
150 validate_tree_serialized(data, *hash).map(|_| ())
151 }
152 (PackObjectId::Hash(hash), ObjectType::Action) => {
153 validate_action_serialized(data, ActionId::from_hash(*hash)).map(|_| ())
154 }
155 (PackObjectId::ChangeId(change_id), ObjectType::State) => {
156 validate_state_serialized(data, *change_id).map(|_| ())
157 }
158 _ => Err(HeddleError::InvalidObject(format!(
159 "unsupported native pack object: {:?} {:?}",
160 id, obj_type
161 ))),
162 }
163}
164
165impl FsStore {
166 fn try_get_blob_once(&self, hash: &ContentHash) -> Result<Option<Blob>> {
169 let path = hash_path(&blobs_dir(&self.root), hash);
170 let loose_exists = path.exists();
171 let pack_has = if loose_exists {
172 false
173 } else if let Ok(manager) = self.pack_manager().read() {
174 manager.has_object(hash)
175 } else {
176 false
177 };
178 if (loose_exists || pack_has)
179 && let Ok(cache) = self.recent_blobs.read()
180 && let Some(blob) = cache.get(hash)
181 {
182 trace!("Found blob in recent object cache");
183 return Ok(Some(blob.clone()));
184 }
185
186 if let Ok(manager) = self.pack_manager().read()
187 && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
188 && obj_type == ObjectType::Blob
189 {
190 trace!("Found blob in packfile");
191 return Ok(Some(Blob::new(data)));
199 }
200
201 match read_file_bytes(&path)? {
202 Some(data) => {
203 trace!(size = data.as_slice().len(), "Blob data read");
204 let content = codec::decode_blob_content(data.as_slice())?;
205 let blob = Blob::new(content);
206 if blob.hash() != *hash {
213 return Err(HeddleError::Corruption {
214 expected: *hash,
215 found: blob.hash(),
216 });
217 }
218 Ok(Some(blob))
219 }
220 None => Ok(None),
221 }
222 }
223
224 fn loose_or_packed(
229 &self,
230 loose_path: &Path,
231 in_pack: impl FnOnce(&PackManager) -> bool,
232 ) -> Result<bool> {
233 if loose_path.exists() {
234 return Ok(true);
235 }
236 if let Ok(manager) = self.pack_manager().read() {
237 return Ok(in_pack(&manager));
238 }
239 Ok(false)
240 }
241
242 fn try_has_blob_once(&self, hash: &ContentHash) -> Result<bool> {
243 let path = hash_path(&blobs_dir(&self.root), hash);
244 self.loose_or_packed(&path, |m| m.has_object(hash))
245 }
246
247 fn try_get_blob_size_once(&self, hash: &ContentHash) -> Result<Option<u64>> {
262 if let Ok(cache) = self.recent_blobs.read()
263 && let Some(blob) = cache.get(hash)
264 {
265 return Ok(Some(blob.content().len() as u64));
266 }
267
268 let path = hash_path(&blobs_dir(&self.root), hash);
269 if let Some((header, file_len)) = read_file_header(&path, BLOB_HEADER_PEEK)? {
270 if let Some(size) = header_uncompressed_size(&header) {
271 return Ok(Some(size));
272 }
273 return Ok(Some(file_len));
276 }
277
278 if let Ok(manager) = self.pack_manager().read()
279 && let Some(size) = manager.get_hashed_object_size(hash)?
280 {
281 return Ok(Some(size));
282 }
283 Ok(None)
284 }
285
286 fn try_get_tree_once(&self, hash: &ContentHash) -> Result<Option<Tree>> {
287 if let Ok(cache) = self.recent_trees.read()
291 && let Some(tree) = cache.get(hash)
292 {
293 trace!("Found tree in recent object cache");
294 return Ok(Some(tree.clone()));
295 }
296
297 let path = hash_path(&trees_dir(&self.root), hash);
301 if path.exists()
302 && let Some(data) = read_file_bytes(&path)?
303 {
304 trace!(size = data.as_slice().len(), "Tree data read");
305 let tree = validate_loaded_tree(codec::decode_tree(data.as_slice())?)?;
306 if tree.hash() != *hash {
307 return Err(HeddleError::Corruption {
308 expected: *hash,
309 found: tree.hash(),
310 });
311 }
312 if let Ok(mut cache) = self.recent_trees.write() {
313 cache.insert(*hash, tree.clone());
314 }
315 return Ok(Some(tree));
316 }
317
318 if let Ok(manager) = self.pack_manager().read()
319 && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
320 && obj_type == ObjectType::Tree
321 {
322 trace!("Found tree in packfile");
323 let tree = validate_loaded_tree(codec::decode_tree_serialized(&data)?)?;
324 if tree.hash() != *hash {
325 return Err(HeddleError::Corruption {
326 expected: *hash,
327 found: tree.hash(),
328 });
329 }
330 return Ok(Some(tree));
331 }
332 Ok(None)
333 }
334
335 fn try_get_tree_serialized_once(&self, hash: &ContentHash) -> Result<Option<Vec<u8>>> {
336 let path = hash_path(&trees_dir(&self.root), hash);
337 if path.exists()
338 && let Some(data) = read_file_bytes(&path)?
339 {
340 return Ok(Some(codec::decode_tree_body(data.as_slice())?));
341 }
342
343 if let Ok(manager) = self.pack_manager().read()
344 && let Some((obj_type, data)) = manager.get_hashed_object(hash)?
345 && obj_type == ObjectType::Tree
346 {
347 return Ok(Some(data));
348 }
349
350 Ok(None)
351 }
352
353 fn try_has_tree_once(&self, hash: &ContentHash) -> Result<bool> {
354 let path = hash_path(&trees_dir(&self.root), hash);
355 self.loose_or_packed(&path, |m| m.has_object(hash))
356 }
357
358 fn try_get_state_once(&self, id: &ChangeId) -> Result<Option<State>> {
359 let path = state_path(&self.root, id);
360 let loose_exists = path.exists();
361 let pack_has = if loose_exists {
362 false
363 } else if let Ok(manager) = self.pack_manager().read() {
364 manager.has_object_id(&PackObjectId::ChangeId(*id))
365 } else {
366 false
367 };
368 if (loose_exists || pack_has)
369 && let Ok(cache) = self.recent_states.read()
370 && let Some(state) = cache.get(id)
371 {
372 trace!("Found state in recent object cache");
373 return Ok(Some(state.clone()));
374 }
375
376 if loose_exists && let Some(data) = read_file_bytes(&path)? {
386 trace!(
387 size = data.as_slice().len(),
388 "State read from loose object (shadows any packed copy)"
389 );
390 let state = validate_loaded_state(id, codec::decode_state(data.as_slice())?)?;
391 if let Ok(mut cache) = self.recent_states.write() {
392 cache.insert(*id, state.clone());
393 }
394 return Ok(Some(state));
395 }
396
397 if let Ok(manager) = self.pack_manager().read()
398 && let Some((obj_type, data)) = manager.get_object(&PackObjectId::ChangeId(*id))?
399 && obj_type == ObjectType::State
400 {
401 trace!("Found state in packfile");
402 let state = validate_loaded_state(id, rmp_serde::from_slice(&data)?)?;
403 if let Ok(mut cache) = self.recent_states.write() {
404 cache.insert(*id, state.clone());
405 }
406 return Ok(Some(state));
407 }
408
409 Ok(None)
410 }
411
412 fn try_has_state_once(&self, id: &ChangeId) -> Result<bool> {
413 let path = state_path(&self.root, id);
414 self.loose_or_packed(&path, |m| m.has_object_id(&PackObjectId::ChangeId(*id)))
415 }
416}
417
418impl ObjectStore for FsStore {
419 fn clear_recent_caches(&self) {
420 self.clear_recent_object_caches();
421 }
422
423 fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
430 if let Ok(manager) = self.pack_manager().read()
431 && let Some((obj_type, data)) = manager.get_hashed_object_bytes(hash)?
432 && obj_type == crate::store::pack::ObjectType::Blob
433 {
434 return Ok(Some(data));
435 }
436 Ok(self
437 .get_blob(hash)?
438 .map(|blob| bytes::Bytes::from(blob.into_content())))
439 }
440
441 #[instrument(skip(self), fields(hash = %hash.short()))]
442 fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
443 if let Some(blob) = self.try_get_blob_once(hash)? {
444 return Ok(Some(blob));
445 }
446 if self.reload_packs_if_stale()?
451 && let Some(blob) = self.try_get_blob_once(hash)?
452 {
453 return Ok(Some(blob));
454 }
455 trace!("Blob not found");
456 Ok(None)
457 }
458
459 #[instrument(skip(self, blob), fields(size = blob.content().len()))]
460 fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
461 let hash = blob.hash();
462 let path = hash_path(&blobs_dir(&self.root), &hash);
463
464 if !path.exists() {
465 let data = codec::encode_blob_content(blob.content(), &self.compression)?;
466 trace!(compressed_size = data.len(), "Writing blob");
467 self.write_loose_object_atomic(&path, &data)?;
468 } else {
469 trace!("Blob already exists, skipping write");
470 }
471 if let Ok(mut cache) = self.recent_blobs.write() {
472 cache.insert(hash, blob.clone());
473 }
474
475 Ok(hash)
476 }
477
478 #[instrument(skip(self, blob), fields(hash = %hash.short()))]
479 fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
480 if blob.hash() != hash {
481 return Err(HeddleError::Corruption {
482 expected: hash,
483 found: blob.hash(),
484 });
485 }
486
487 let path = hash_path(&blobs_dir(&self.root), &hash);
488
489 if !path.exists() {
490 let data = codec::encode_blob_content(blob.content(), &self.compression)?;
491 trace!(
492 compressed_size = data.len(),
493 "Writing blob with precomputed hash"
494 );
495 self.write_loose_object_atomic(&path, &data)?;
496 }
497 if let Ok(mut cache) = self.recent_blobs.write() {
498 cache.insert(hash, blob.clone());
499 }
500
501 Ok(hash)
502 }
503
504 #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
505 fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
506 validate_blob_bytes(data, hash)?;
507
508 let path = hash_path(&blobs_dir(&self.root), &hash);
509 if !path.exists() {
510 trace!(
511 size = data.len(),
512 "Writing raw blob bytes with precomputed hash"
513 );
514 self.write_loose_object_atomic(&path, data)?;
515 }
516 if let Ok(mut cache) = self.recent_blobs.write() {
517 cache.insert(hash, Blob::from_slice(data));
518 }
519
520 Ok(hash)
521 }
522
523 #[instrument(skip(self), fields(hash = %hash.short()))]
524 fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
525 if self.try_has_blob_once(hash)? {
526 return Ok(true);
527 }
528 if self.reload_packs_if_stale()? {
529 return self.try_has_blob_once(hash);
530 }
531 Ok(false)
532 }
533
534 fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
551 let path = hash_path(&blobs_dir(&self.root), hash);
552 if let Ok(verified) = self.verified_loose_blobs.read()
557 && verified.get(hash).is_some()
558 && path.exists()
559 {
560 return Some(path);
561 }
562
563 let (header, _) = read_file_header(&path, BLOB_HEADER_PEEK).ok().flatten()?;
576 if is_compressed(&header) {
577 return None;
578 }
579 let bytes = read_file_bytes(&path).ok().flatten()?;
580 let actual = ContentHash::compute_typed("blob", bytes.as_slice());
581 if actual != *hash {
582 return None;
586 }
587 if let Ok(mut verified) = self.verified_loose_blobs.write() {
588 verified.insert(*hash, ());
589 }
590 Some(path)
591 }
592
593 #[instrument(skip(self), fields(hash = %hash.short()))]
606 fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
607 let path = hash_path(&blobs_dir(&self.root), hash);
608
609 if let Some((header, _)) = read_file_header(&path, 9)?
611 && !is_compressed(&header)
612 {
613 trace!("Blob already loose+uncompressed; skipping promotion");
614 return Ok(false);
615 }
616
617 let blob = self.get_blob(hash)?.ok_or_else(|| {
621 HeddleError::NotFound(format!(
622 "blob {} not found in store; cannot promote to loose-uncompressed",
623 hash
624 ))
625 })?;
626
627 debug!(
640 size = blob.content().len(),
641 "Promoting blob to loose-uncompressed canonical store"
642 );
643 self.write_loose_object_cache(&path, blob.content())?;
644 if let Ok(mut verified) = self.verified_loose_blobs.write() {
645 verified.insert(*hash, ());
646 }
647 Ok(true)
648 }
649
650 #[instrument(skip(self), fields(hash = %hash.short()))]
651 fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
652 if let Some(size) = self.try_get_blob_size_once(hash)? {
653 return Ok(Some(size));
654 }
655 if self.reload_packs_if_stale()?
659 && let Some(size) = self.try_get_blob_size_once(hash)?
660 {
661 return Ok(Some(size));
662 }
663 Ok(None)
664 }
665
666 #[instrument(skip(self), fields(hash = %hash.short()))]
667 fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
668 if let Some(tree) = self.try_get_tree_once(hash)? {
669 return Ok(Some(tree));
670 }
671 if self.reload_packs_if_stale()?
672 && let Some(tree) = self.try_get_tree_once(hash)?
673 {
674 return Ok(Some(tree));
675 }
676 trace!("Tree not found");
677 Ok(None)
678 }
679
680 #[instrument(skip(self), fields(hash = %hash.short()))]
681 fn get_tree_serialized(&self, hash: &ContentHash) -> Result<Option<Vec<u8>>> {
682 if let Some(data) = self.try_get_tree_serialized_once(hash)? {
683 return Ok(Some(data));
684 }
685 if self.reload_packs_if_stale()?
686 && let Some(data) = self.try_get_tree_serialized_once(hash)?
687 {
688 return Ok(Some(data));
689 }
690 Ok(None)
691 }
692
693 #[instrument(skip(self, tree), fields(entry_count = tree.entries().len()))]
694 fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
695 let hash = tree.hash();
696 let path = hash_path(&trees_dir(&self.root), &hash);
697
698 if !path.exists() {
699 let (_, data) = codec::encode_tree(tree, &self.compression)?;
700 trace!(compressed_size = data.len(), "Writing tree");
701 self.write_loose_object_atomic(&path, &data)?;
702 } else {
703 trace!("Tree already exists, skipping write");
704 }
705 if let Ok(mut cache) = self.recent_trees.write() {
706 cache.insert(hash, tree.clone());
707 }
708
709 Ok(hash)
710 }
711
712 #[instrument(skip(self, data), fields(hash = %hash.short(), size = data.len()))]
713 fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
714 let tree = validate_tree_serialized(data, hash)?;
715
716 let path = hash_path(&trees_dir(&self.root), &hash);
717 let should_write = match read_file_bytes(&path)? {
718 Some(existing) => codec::decode_tree_body(existing.as_slice())? != data,
719 None => true,
720 };
721 if should_write {
722 trace!(size = data.len(), "Writing raw serialized tree");
723 self.write_loose_object_atomic(&path, data)?;
724 }
725 if let Ok(mut cache) = self.recent_trees.write() {
726 cache.insert(hash, tree);
727 }
728
729 Ok(hash)
730 }
731
732 #[instrument(skip(self), fields(hash = %hash.short()))]
733 fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
734 if self.try_has_tree_once(hash)? {
735 return Ok(true);
736 }
737 if self.reload_packs_if_stale()? {
738 return self.try_has_tree_once(hash);
739 }
740 Ok(false)
741 }
742
743 #[instrument(skip(self), fields(id = %id.short()))]
744 fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
745 if let Some(state) = self.try_get_state_once(id)? {
746 return Ok(Some(state));
747 }
748 if self.reload_packs_if_stale()?
749 && let Some(state) = self.try_get_state_once(id)?
750 {
751 return Ok(Some(state));
752 }
753 trace!("State not found");
754 Ok(None)
755 }
756
757 #[instrument(skip(self, state), fields(id = %state.change_id.short()))]
758 fn put_state(&self, state: &State) -> Result<()> {
759 let path = state_path(&self.root, &state.change_id);
760 let data = codec::encode_state(state, &self.compression)?;
761 trace!(compressed_size = data.len(), "Writing state");
762 self.write_loose_object_atomic(&path, &data)?;
763 if let Ok(mut cache) = self.recent_states.write() {
764 cache.insert(state.change_id, state.clone());
765 }
766 Ok(())
767 }
768
769 #[instrument(skip(self, data), fields(id = %id.short(), size = data.len()))]
770 fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
771 let state = validate_state_serialized(data, id)?;
772 let path = state_path(&self.root, &id);
773 trace!(size = data.len(), "Writing raw serialized state");
774 self.write_loose_object_atomic(&path, data)?;
775 if let Ok(mut cache) = self.recent_states.write() {
776 cache.insert(id, state);
777 }
778 Ok(())
779 }
780
781 #[instrument(skip(self), fields(id = %id.short()))]
782 fn has_state(&self, id: &ChangeId) -> Result<bool> {
783 if self.try_has_state_once(id)? {
784 return Ok(true);
785 }
786 if self.reload_packs_if_stale()? {
787 return self.try_has_state_once(id);
788 }
789 Ok(false)
790 }
791
792 #[instrument(skip(self))]
793 fn list_states(&self) -> Result<Vec<ChangeId>> {
794 self.reload_packs_if_stale()?;
795
796 let dir = states_dir(&self.root);
797 if !dir.exists() {
798 return Ok(Vec::new());
799 }
800
801 let mut states = Vec::new();
802 for entry in fs::read_dir(&dir)? {
803 let entry = entry?;
804 let path = entry.path();
805 if let Some(name) = path.file_stem()
806 && let Some(name_str) = name.to_str()
807 && let Ok(id) = ChangeId::parse(name_str)
808 {
809 states.push(id);
810 }
811 }
812 if let Ok(manager) = self.pack_manager().read() {
813 for id in manager.list_all_ids()? {
814 if let PackObjectId::ChangeId(change_id) = id
815 && !states.contains(&change_id)
816 {
817 states.push(change_id);
818 }
819 }
820 }
821 debug!(count = states.len(), "Listed states");
822 Ok(states)
823 }
824
825 #[instrument(skip(self), fields(id = %id))]
826 fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
827 let path = action_path(&self.root, id);
828 if !path.exists()
829 && let Ok(manager) = self.pack_manager().read()
830 && let Some((obj_type, data)) = manager.get_hashed_object(id.as_hash())?
831 && obj_type == ObjectType::Action
832 {
833 trace!("Found action in packfile");
834 let action = validate_loaded_action(id, rmp_serde::from_slice(&data)?)?;
835 return Ok(Some(action));
836 }
837 match read_file_bytes(&path)? {
838 Some(data) => {
839 trace!(size = data.as_slice().len(), "Action data read");
840 let action = validate_loaded_action(id, codec::decode_action(data.as_slice())?)?;
841 Ok(Some(action))
842 }
843 None => {
844 trace!("Action not found");
845 Ok(None)
846 }
847 }
848 }
849
850 #[instrument(skip(self, action))]
851 fn put_action(&self, action: &mut Action) -> Result<ActionId> {
852 let id = action.id();
853 let path = action_path(&self.root, &id);
854
855 if !path.exists() {
856 let (_, data) = codec::encode_action(action, &self.compression)?;
857 trace!(id = %id, compressed_size = data.len(), "Writing action");
858 self.write_loose_object_atomic(&path, &data)?;
859 }
860
861 Ok(id)
862 }
863
864 #[instrument(skip(self))]
865 fn list_actions(&self) -> Result<Vec<ActionId>> {
866 let dir = actions_dir(&self.root);
867 let mut actions = Vec::new();
868 if dir.exists() {
869 for entry in fs::read_dir(&dir)? {
870 let entry = entry?;
871 let path = entry.path();
872 if let Some(name) = path.file_stem()
873 && let Some(name_str) = name.to_str()
874 && let Ok(hash) = ContentHash::from_hex(name_str)
875 {
876 actions.push(ActionId::from_hash(hash));
877 }
878 }
879 }
880 if let Ok(manager) = self.pack_manager().read() {
881 for id in manager.list_all_ids()? {
882 if let PackObjectId::Hash(hash) = id
883 && !actions.iter().any(|action_id| action_id.as_hash() == &hash)
884 && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
885 && obj_type == ObjectType::Action
886 {
887 actions.push(ActionId::from_hash(hash));
888 }
889 }
890 }
891 debug!(count = actions.len(), "Listed actions");
892 Ok(actions)
893 }
894
895 #[instrument(skip(self))]
896 fn list_blobs(&self) -> Result<Vec<ContentHash>> {
897 let dir = blobs_dir(&self.root);
898 let mut blobs = list_hashes_from_dir(&dir)?;
899 if let Ok(manager) = self.pack_manager().read() {
900 for id in manager.list_all_ids()? {
901 if let PackObjectId::Hash(hash) = id
902 && !blobs.contains(&hash)
903 && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
904 && obj_type == ObjectType::Blob
905 {
906 blobs.push(hash);
907 }
908 }
909 }
910 Ok(blobs)
911 }
912
913 #[instrument(skip(self))]
914 fn list_trees(&self) -> Result<Vec<ContentHash>> {
915 let dir = trees_dir(&self.root);
916 let mut trees = list_hashes_from_dir(&dir)?;
917 if let Ok(manager) = self.pack_manager().read() {
918 for id in manager.list_all_ids()? {
919 if let PackObjectId::Hash(hash) = id
920 && !trees.contains(&hash)
921 && let Some((obj_type, _)) = manager.get_hashed_object(&hash)?
922 && obj_type == ObjectType::Tree
923 {
924 trees.push(hash);
925 }
926 }
927 }
928 Ok(trees)
929 }
930
931 #[instrument(skip(self))]
932 fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
933 self.pack_objects_impl(aggressive)
934 }
935
936 #[instrument(skip(self), fields(id = ?id))]
937 fn get_pack_object(&self, id: &PackObjectId) -> Result<Option<(ObjectType, Vec<u8>)>> {
938 if let Ok(manager) = self.pack_manager().read()
939 && let Some((obj_type, data)) = manager.get_object(id)?
940 {
941 return Ok(Some((obj_type, data)));
942 }
943
944 match id {
945 PackObjectId::Hash(hash) => {
946 if let Some(blob) = self.get_blob(hash)? {
947 return Ok(Some((ObjectType::Blob, blob.content().to_vec())));
948 }
949 if let Some(tree) = self.get_tree(hash)? {
950 return Ok(Some((ObjectType::Tree, rmp_serde::to_vec_named(&tree)?)));
951 }
952 if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
953 return Ok(Some((
954 ObjectType::Action,
955 rmp_serde::to_vec_named(&action)?,
956 )));
957 }
958 Ok(None)
959 }
960 PackObjectId::ChangeId(change_id) => {
961 if let Some(state) = self.get_state(change_id)? {
962 Ok(Some((ObjectType::State, rmp_serde::to_vec_named(&state)?)))
963 } else {
964 Ok(None)
965 }
966 }
967 }
968 }
969
970 #[instrument(skip(self, pack_data, index_data))]
971 fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<PackObjectId>> {
972 let reader = crate::store::pack::PackReader::from_slice(pack_data, index_data)?;
973 let ids = validate_and_list_pack(&reader)?;
974 let state_entries = state_entries_from_pack(&reader, &ids)?;
975 self.install_pack_files(pack_data, index_data)?;
976 for (id, data) in state_entries {
977 self.put_state_serialized(&data, id)?;
978 }
979 self.clear_recent_object_caches();
980 Ok(ids)
981 }
982
983 #[instrument(skip(self, blobs), fields(count = blobs.len()))]
984 fn put_blobs_packed(&self, blobs: Vec<(crate::object::ContentHash, Vec<u8>)>) -> Result<()> {
985 self.put_blobs_packed_impl(blobs)
986 }
987
988 #[instrument(skip(self))]
989 fn install_pack_streaming(
990 &self,
991 pack_path: &std::path::Path,
992 index_path: &std::path::Path,
993 ) -> Result<Vec<PackObjectId>> {
994 let ids = {
1000 let reader = crate::store::pack::PackReader::open(pack_path, index_path)?;
1001 validate_and_list_pack(&reader)?
1002 };
1003 let state_entries = {
1004 let reader = crate::store::pack::PackReader::open(pack_path, index_path)?;
1005 state_entries_from_pack(&reader, &ids)?
1006 };
1007 self.install_pack_files_streaming(pack_path, index_path)?;
1008 for (id, data) in state_entries {
1009 self.put_state_serialized(&data, id)?;
1010 }
1011 Ok(ids)
1012 }
1013
1014 #[instrument(skip(self))]
1015 fn prune_loose_objects(&self) -> Result<(u64, u64)> {
1016 self.prune_loose_objects_impl()
1017 }
1018
1019 #[instrument(skip(self))]
1020 fn begin_snapshot_write_batch(&self) -> Result<()> {
1021 self.begin_snapshot_write_batch_impl()
1022 }
1023
1024 #[instrument(skip(self))]
1025 fn flush_snapshot_write_batch(&self) -> Result<()> {
1026 self.flush_snapshot_write_batch_impl()
1027 }
1028
1029 #[instrument(skip(self))]
1030 fn abort_snapshot_write_batch(&self) {
1031 self.abort_snapshot_write_batch_impl();
1032 }
1033
1034 fn has_redactions_for_blob(&self, blob: &ContentHash) -> Result<bool> {
1035 Ok(redaction_path(&self.root, blob).exists())
1036 }
1037
1038 fn get_redactions_bytes_for_blob(&self, blob: &ContentHash) -> Result<Option<Vec<u8>>> {
1039 let path = redaction_path(&self.root, blob);
1040 match fs::read(&path) {
1041 Ok(bytes) => Ok(Some(bytes)),
1042 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
1043 Err(err) => Err(HeddleError::Io(err)),
1044 }
1045 }
1046
1047 fn put_redactions_bytes_for_blob(&self, blob: &ContentHash, bytes: &[u8]) -> Result<()> {
1048 let dir = redactions_dir(&self.root);
1049 if !dir.exists() {
1050 fs::create_dir_all(&dir)?;
1051 }
1052 let path = redaction_path(&self.root, blob);
1053 crate::fs_atomic::write_file_atomic(&path, bytes)?;
1054 Ok(())
1055 }
1056
1057 fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
1058 let dir = redactions_dir(&self.root);
1059 if !dir.exists() {
1060 return Ok(Vec::new());
1061 }
1062 let mut out = Vec::new();
1063 for entry in fs::read_dir(&dir)? {
1064 let entry = entry?;
1065 let path = entry.path();
1066 if path.extension().and_then(|e| e.to_str()) != Some("bin") {
1067 continue;
1068 }
1069 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1070 continue;
1071 };
1072 if let Ok(hash) = ContentHash::from_hex(stem) {
1073 out.push(hash);
1074 }
1075 }
1076 Ok(out)
1077 }
1078
1079 fn has_state_visibility_for_state(&self, state: &ChangeId) -> Result<bool> {
1080 Ok(state_visibility_path(&self.root, state).exists())
1081 }
1082
1083 fn get_state_visibility_bytes_for_state(&self, state: &ChangeId) -> Result<Option<Vec<u8>>> {
1084 let path = state_visibility_path(&self.root, state);
1085 match fs::read(&path) {
1086 Ok(bytes) => Ok(Some(bytes)),
1087 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
1088 Err(err) => Err(HeddleError::Io(err)),
1089 }
1090 }
1091
1092 fn put_state_visibility_bytes_for_state(&self, state: &ChangeId, bytes: &[u8]) -> Result<()> {
1093 let dir = state_visibility_dir(&self.root);
1094 if !dir.exists() {
1095 fs::create_dir_all(&dir)?;
1096 }
1097 let path = state_visibility_path(&self.root, state);
1098 crate::fs_atomic::write_file_atomic(&path, bytes)?;
1099 Ok(())
1100 }
1101
1102 fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
1103 let dir = state_visibility_dir(&self.root);
1104 if !dir.exists() {
1105 return Ok(Vec::new());
1106 }
1107 let mut out = Vec::new();
1108 for entry in fs::read_dir(&dir)? {
1109 let entry = entry?;
1110 let path = entry.path();
1111 if path.extension().and_then(|e| e.to_str()) != Some("bin") {
1112 continue;
1113 }
1114 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
1115 continue;
1116 };
1117 if let Ok(state) = ChangeId::parse(stem) {
1118 out.push(state);
1119 }
1120 }
1121 Ok(out)
1122 }
1123}