1use std::path::PathBuf;
5
6use crate::object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree};
7
8pub mod agent_registry;
9pub mod atomic;
10pub mod codec;
11pub mod compression;
12pub mod fs;
13pub mod liveness;
14#[cfg(any(test, feature = "memory-backend"))]
15pub mod memory;
16pub mod pack;
17pub mod shallow;
18pub mod source;
19pub mod store_compliance;
20
21pub use agent_registry::{
22 ActorChainNode, AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary, ContextQueryEntry,
23 ReserveOutcome, generate_agent_id,
24};
25pub use compression::{CompressionConfig, CompressionError, compress, decompress};
26pub use fs::FsStore;
27pub use liveness::{Liveness, current_boot_id, is_owner_alive, process_alive};
28#[cfg(any(test, feature = "memory-backend"))]
29pub use memory::InMemoryStore;
30pub use pack::{PackBuilder, PackObjectId, PackReader, PackStats};
31pub use shallow::ShallowInfo;
32#[cfg(feature = "async-source")]
33pub use source::AsyncObjectSource;
34pub use source::ObjectSource;
35
36pub use crate::error::{HeddleError as StoreError, HeddleError, Result};
37
38impl From<CompressionError> for HeddleError {
39 fn from(e: CompressionError) -> Self {
40 HeddleError::Compression(e.to_string())
41 }
42}
43
44#[derive(Clone)]
56pub enum AnyStore {
57 Fs(FsStore),
58}
59
60macro_rules! any_store_dispatch {
66 ($self:ident, $method:ident ( $($arg:expr),* )) => {
67 match $self {
68 AnyStore::Fs(inner) => inner.$method($($arg),*),
69 }
70 };
71}
72
73impl ObjectStore for AnyStore {
74 fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
75 match self {
76 AnyStore::Fs(inner) => ObjectStore::get_blob(inner, hash),
77 }
78 }
79 fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
80 any_store_dispatch!(self, put_blob(blob))
81 }
82 fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
83 match self {
84 AnyStore::Fs(inner) => ObjectStore::get_blob_bytes(inner, hash),
85 }
86 }
87 fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
88 any_store_dispatch!(self, blob_size(hash))
89 }
90 fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
91 any_store_dispatch!(self, loose_blob_path(hash))
92 }
93 fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
94 any_store_dispatch!(self, promote_to_loose_uncompressed(hash))
95 }
96 fn clear_recent_caches(&self) {
97 any_store_dispatch!(self, clear_recent_caches())
98 }
99 fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
100 any_store_dispatch!(self, put_blob_with_hash(blob, hash))
101 }
102 fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
103 any_store_dispatch!(self, has_blob(hash))
104 }
105 fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
106 match self {
107 AnyStore::Fs(inner) => ObjectStore::get_tree(inner, hash),
108 }
109 }
110 fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
111 any_store_dispatch!(self, put_tree(tree))
112 }
113 fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
114 any_store_dispatch!(self, has_tree(hash))
115 }
116 fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
117 match self {
118 AnyStore::Fs(inner) => ObjectStore::get_state(inner, id),
119 }
120 }
121 fn put_state(&self, state: &State) -> Result<()> {
122 any_store_dispatch!(self, put_state(state))
123 }
124 fn has_state(&self, id: &ChangeId) -> Result<bool> {
125 any_store_dispatch!(self, has_state(id))
126 }
127 fn list_states(&self) -> Result<Vec<ChangeId>> {
128 any_store_dispatch!(self, list_states())
129 }
130 fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
131 any_store_dispatch!(self, get_action(id))
132 }
133 fn put_action(&self, action: &mut Action) -> Result<ActionId> {
134 any_store_dispatch!(self, put_action(action))
135 }
136 fn list_actions(&self) -> Result<Vec<ActionId>> {
137 any_store_dispatch!(self, list_actions())
138 }
139 fn list_blobs(&self) -> Result<Vec<ContentHash>> {
140 any_store_dispatch!(self, list_blobs())
141 }
142 fn list_trees(&self) -> Result<Vec<ContentHash>> {
143 any_store_dispatch!(self, list_trees())
144 }
145 fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
146 any_store_dispatch!(self, put_blob_bytes_with_hash(data, hash))
147 }
148 fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
149 any_store_dispatch!(self, put_tree_serialized(data, hash))
150 }
151 fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
152 any_store_dispatch!(self, put_state_serialized(data, id))
153 }
154 fn put_action_serialized(&self, data: &[u8], id: ActionId) -> Result<()> {
155 any_store_dispatch!(self, put_action_serialized(data, id))
156 }
157 fn get_pack_object(
158 &self,
159 id: &pack::PackObjectId,
160 ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
161 any_store_dispatch!(self, get_pack_object(id))
162 }
163 fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
164 any_store_dispatch!(self, put_blobs_packed(blobs))
165 }
166 fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
167 any_store_dispatch!(self, install_pack(pack_data, index_data))
168 }
169 fn install_pack_streaming(
170 &self,
171 pack_path: &std::path::Path,
172 index_path: &std::path::Path,
173 ) -> Result<Vec<pack::PackObjectId>> {
174 any_store_dispatch!(self, install_pack_streaming(pack_path, index_path))
175 }
176 fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
177 any_store_dispatch!(self, pack_objects(aggressive))
178 }
179 fn prune_loose_objects(&self) -> Result<(u64, u64)> {
180 any_store_dispatch!(self, prune_loose_objects())
181 }
182 fn begin_snapshot_write_batch(&self) -> Result<()> {
183 any_store_dispatch!(self, begin_snapshot_write_batch())
184 }
185 fn flush_snapshot_write_batch(&self) -> Result<()> {
186 any_store_dispatch!(self, flush_snapshot_write_batch())
187 }
188 fn abort_snapshot_write_batch(&self) {
189 any_store_dispatch!(self, abort_snapshot_write_batch())
190 }
191 fn has_redactions_for_blob(&self, blob: &ContentHash) -> Result<bool> {
192 any_store_dispatch!(self, has_redactions_for_blob(blob))
193 }
194 fn get_redactions_bytes_for_blob(&self, blob: &ContentHash) -> Result<Option<Vec<u8>>> {
195 any_store_dispatch!(self, get_redactions_bytes_for_blob(blob))
196 }
197 fn put_redactions_bytes_for_blob(&self, blob: &ContentHash, bytes: &[u8]) -> Result<()> {
198 any_store_dispatch!(self, put_redactions_bytes_for_blob(blob, bytes))
199 }
200 fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
201 any_store_dispatch!(self, list_blobs_with_redactions())
202 }
203 fn has_state_visibility_for_state(&self, state: &ChangeId) -> Result<bool> {
204 any_store_dispatch!(self, has_state_visibility_for_state(state))
205 }
206 fn get_state_visibility_bytes_for_state(&self, state: &ChangeId) -> Result<Option<Vec<u8>>> {
207 any_store_dispatch!(self, get_state_visibility_bytes_for_state(state))
208 }
209 fn put_state_visibility_bytes_for_state(&self, state: &ChangeId, bytes: &[u8]) -> Result<()> {
210 any_store_dispatch!(self, put_state_visibility_bytes_for_state(state, bytes))
211 }
212 fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
213 any_store_dispatch!(self, list_states_with_visibility())
214 }
215}
216
217pub trait ObjectStore: Send + Sync {
219 fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>>;
220 fn put_blob(&self, blob: &Blob) -> Result<ContentHash>;
221
222 fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
233 Ok(self
234 .get_blob(hash)?
235 .map(|blob| bytes::Bytes::from(blob.into_content())))
236 }
237
238 fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
252 Ok(self.get_blob(hash)?.map(|blob| blob.content().len() as u64))
253 }
254
255 fn loose_blob_path(&self, _hash: &ContentHash) -> Option<PathBuf> {
267 None
268 }
269
270 fn promote_to_loose_uncompressed(&self, _hash: &ContentHash) -> Result<bool> {
307 Ok(false)
308 }
309
310 fn clear_recent_caches(&self) {}
319
320 fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
321 if blob.hash() != hash {
322 return Err(HeddleError::InvalidObject("blob hash mismatch".to_string()));
323 }
324 self.put_blob(blob)
325 }
326
327 fn has_blob(&self, hash: &ContentHash) -> Result<bool>;
328 fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>>;
329 fn put_tree(&self, tree: &Tree) -> Result<ContentHash>;
330 fn has_tree(&self, hash: &ContentHash) -> Result<bool>;
331 fn get_state(&self, id: &ChangeId) -> Result<Option<State>>;
332 fn put_state(&self, state: &State) -> Result<()>;
333 fn has_state(&self, id: &ChangeId) -> Result<bool>;
334 fn list_states(&self) -> Result<Vec<ChangeId>>;
335 fn get_action(&self, id: &ActionId) -> Result<Option<Action>>;
336 fn put_action(&self, action: &mut Action) -> Result<ActionId>;
337 fn list_actions(&self) -> Result<Vec<ActionId>>;
338 fn list_blobs(&self) -> Result<Vec<ContentHash>>;
339 fn list_trees(&self) -> Result<Vec<ContentHash>>;
340
341 fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
342 self.put_blob_with_hash(&Blob::from_slice(data), hash)
343 }
344
345 fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
346 let tree: Tree = rmp_serde::from_slice(data)?;
347 tree.validate()?;
348 if tree.hash() != hash {
349 return Err(HeddleError::Corruption {
350 expected: hash,
351 found: tree.hash(),
352 });
353 }
354 self.put_tree(&tree)
355 }
356
357 fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
358 let state: State = rmp_serde::from_slice(data)?;
359 if state.change_id != id {
360 return Err(HeddleError::InvalidObject(format!(
361 "state change_id mismatch: expected {}, found {}",
362 id, state.change_id
363 )));
364 }
365 self.put_state(&state)
366 }
367
368 fn put_action_serialized(&self, data: &[u8], id: ActionId) -> Result<()> {
369 let mut action: Action = rmp_serde::from_slice(data)?;
370 let found_id = action.compute_id();
371 if found_id != id {
372 return Err(HeddleError::InvalidObject(format!(
373 "action id mismatch: expected {}, found {}",
374 id, found_id
375 )));
376 }
377 let stored_id = self.put_action(&mut action)?;
378 if stored_id != id {
379 return Err(HeddleError::InvalidObject(format!(
380 "action id mismatch after write: expected {}, found {}",
381 id, stored_id
382 )));
383 }
384 Ok(())
385 }
386
387 fn get_pack_object(
388 &self,
389 id: &pack::PackObjectId,
390 ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
391 match id {
392 pack::PackObjectId::Hash(hash) => {
393 if let Some(blob) = self.get_blob(hash)? {
394 return Ok(Some((pack::ObjectType::Blob, blob.content().to_vec())));
395 }
396 if let Some(tree) = self.get_tree(hash)? {
397 return Ok(Some((
398 pack::ObjectType::Tree,
399 rmp_serde::to_vec_named(&tree)?,
400 )));
401 }
402 if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
403 return Ok(Some((
404 pack::ObjectType::Action,
405 rmp_serde::to_vec_named(&action)?,
406 )));
407 }
408 Ok(None)
409 }
410 pack::PackObjectId::ChangeId(change_id) => {
411 if let Some(state) = self.get_state(change_id)? {
412 Ok(Some((
413 pack::ObjectType::State,
414 rmp_serde::to_vec_named(&state)?,
415 )))
416 } else {
417 Ok(None)
418 }
419 }
420 }
421 }
422
423 fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
433 for (hash, data) in blobs {
434 if !self.has_blob(&hash)? {
435 self.put_blob_bytes_with_hash(&data, hash)?;
436 }
437 }
438 Ok(())
439 }
440
441 fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
442 let reader = pack::PackReader::from_slice(pack_data, index_data)?;
443 let ids = reader.list_ids();
444 for id in &ids {
445 let Some((obj_type, data)) = reader.get_object(id)? else {
446 continue;
447 };
448 match (id, obj_type) {
449 (pack::PackObjectId::Hash(hash), pack::ObjectType::Blob) => {
450 self.put_blob_bytes_with_hash(&data, *hash)?;
451 }
452 (pack::PackObjectId::Hash(hash), pack::ObjectType::Tree) => {
453 self.put_tree_serialized(&data, *hash)?;
454 }
455 (pack::PackObjectId::Hash(hash), pack::ObjectType::Action) => {
456 self.put_action_serialized(&data, ActionId::from_hash(*hash))?;
457 }
458 (pack::PackObjectId::ChangeId(change_id), pack::ObjectType::State) => {
459 self.put_state_serialized(&data, *change_id)?;
460 }
461 _ => {
462 return Err(HeddleError::InvalidObject(format!(
463 "unsupported native pack object: {:?} {:?}",
464 id, obj_type
465 )));
466 }
467 }
468 }
469 Ok(ids)
470 }
471
472 fn install_pack_streaming(
489 &self,
490 pack_path: &std::path::Path,
491 index_path: &std::path::Path,
492 ) -> Result<Vec<pack::PackObjectId>> {
493 let pack_data = std::fs::read(pack_path).map_err(StoreError::from)?;
494 let index_data = std::fs::read(index_path).map_err(StoreError::from)?;
495 let ids = self.install_pack(&pack_data, &index_data)?;
496 let _ = std::fs::remove_file(pack_path);
500 let _ = std::fs::remove_file(index_path);
501 Ok(ids)
502 }
503
504 fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
505 let _ = aggressive;
506 Ok((0, 0))
507 }
508
509 fn prune_loose_objects(&self) -> Result<(u64, u64)> {
510 Ok((0, 0))
511 }
512
513 fn begin_snapshot_write_batch(&self) -> Result<()> {
514 Ok(())
515 }
516
517 fn flush_snapshot_write_batch(&self) -> Result<()> {
518 Ok(())
519 }
520
521 fn abort_snapshot_write_batch(&self) {}
522
523 fn has_redactions_for_blob(&self, _blob: &ContentHash) -> Result<bool> {
535 Ok(false)
536 }
537
538 fn get_redactions_bytes_for_blob(&self, _blob: &ContentHash) -> Result<Option<Vec<u8>>> {
546 Ok(None)
547 }
548
549 fn put_redactions_bytes_for_blob(&self, _blob: &ContentHash, _bytes: &[u8]) -> Result<()> {
558 Err(HeddleError::InvalidObject(
559 "this object store does not support persisting redactions".to_string(),
560 ))
561 }
562
563 fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
570 Ok(Vec::new())
571 }
572
573 fn has_state_visibility_for_state(&self, _state: &ChangeId) -> Result<bool> {
583 Ok(false)
584 }
585
586 fn get_state_visibility_bytes_for_state(&self, _state: &ChangeId) -> Result<Option<Vec<u8>>> {
592 Ok(None)
593 }
594
595 fn put_state_visibility_bytes_for_state(&self, _state: &ChangeId, _bytes: &[u8]) -> Result<()> {
600 Err(HeddleError::InvalidObject(
601 "this object store does not support persisting state visibility".to_string(),
602 ))
603 }
604
605 fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
609 Ok(Vec::new())
610 }
611}
612
613#[cfg(test)]
614mod any_store_tests {
615 use tempfile::TempDir;
616
617 use super::*;
618 use crate::object::{Attribution, Operation, Principal};
619
620 fn fs_any_store() -> (TempDir, AnyStore) {
621 let temp = TempDir::new().unwrap();
622 let store = FsStore::new(temp.path().join(".heddle"));
623 store.init().unwrap();
624 (temp, AnyStore::Fs(store))
625 }
626
627 #[test]
633 fn fs_variant_dispatches_every_object_store_method() {
634 let (_temp, store) = fs_any_store();
635
636 let blob = Blob::from("any-store dispatch blob");
638 let blob_hash = store.put_blob(&blob).unwrap();
639 assert_eq!(
640 ObjectStore::get_blob(&store, &blob_hash)
641 .unwrap()
642 .unwrap()
643 .content(),
644 blob.content()
645 );
646 assert!(store.has_blob(&blob_hash).unwrap());
647 assert_eq!(
648 ObjectStore::get_blob_bytes(&store, &blob_hash)
649 .unwrap()
650 .unwrap()
651 .as_ref(),
652 blob.content()
653 );
654 assert_eq!(
655 store.blob_size(&blob_hash).unwrap().unwrap(),
656 blob.content().len() as u64
657 );
658 assert!(store.loose_blob_path(&blob_hash).is_some());
659 store.promote_to_loose_uncompressed(&blob_hash).unwrap();
660 assert!(store.list_blobs().unwrap().contains(&blob_hash));
661
662 let bytes_blob = Blob::from("put-with-hash blob");
663 let bytes_hash = bytes_blob.hash();
664 assert_eq!(
665 store.put_blob_with_hash(&bytes_blob, bytes_hash).unwrap(),
666 bytes_hash
667 );
668 let raw_blob = Blob::from("raw bytes blob");
669 let raw_hash = raw_blob.hash();
670 assert_eq!(
671 store
672 .put_blob_bytes_with_hash(raw_blob.content(), raw_hash)
673 .unwrap(),
674 raw_hash
675 );
676
677 let tree = Tree::new();
679 let tree_hash = store.put_tree(&tree).unwrap();
680 assert!(ObjectStore::get_tree(&store, &tree_hash).unwrap().is_some());
681 assert!(store.has_tree(&tree_hash).unwrap());
682 assert!(store.list_trees().unwrap().contains(&tree_hash));
683 let tree2 = Tree::new();
684 let tree2_bytes = rmp_serde::to_vec_named(&tree2).unwrap();
685 assert_eq!(
686 store
687 .put_tree_serialized(&tree2_bytes, tree2.hash())
688 .unwrap(),
689 tree2.hash()
690 );
691
692 let attribution =
694 Attribution::human(Principal::new("AnyStore Test", "anystore@example.com"));
695 let state = State::new(tree_hash, vec![], attribution.clone());
696 let change_id = state.change_id;
697 store.put_state(&state).unwrap();
698 assert!(
699 ObjectStore::get_state(&store, &change_id)
700 .unwrap()
701 .is_some()
702 );
703 assert!(store.has_state(&change_id).unwrap());
704 assert!(store.list_states().unwrap().contains(&change_id));
705 let state2 = State::new(tree2.hash(), vec![], attribution.clone());
706 let state2_bytes = rmp_serde::to_vec_named(&state2).unwrap();
707 store
708 .put_state_serialized(&state2_bytes, state2.change_id)
709 .unwrap();
710
711 let mut action = Action::new(
713 None,
714 ChangeId::generate(),
715 Operation::Snapshot,
716 "any-store action",
717 attribution,
718 );
719 let action_id = store.put_action(&mut action).unwrap();
720 assert!(store.get_action(&action_id).unwrap().is_some());
721 assert!(store.list_actions().unwrap().contains(&action_id));
722 let action_bytes = rmp_serde::to_vec_named(&action).unwrap();
723 store
724 .put_action_serialized(&action_bytes, action_id)
725 .unwrap();
726
727 let packed = Blob::from("packed-via-any-store");
729 let packed_hash = packed.hash();
730 store
731 .put_blobs_packed(vec![(packed_hash, packed.into_content())])
732 .unwrap();
733 assert!(
734 store
735 .get_pack_object(&pack::PackObjectId::Hash(packed_hash))
736 .unwrap()
737 .is_some()
738 );
739 store.pack_objects(false).unwrap();
740 store.prune_loose_objects().unwrap();
741 let _ = store.install_pack(&[], &[]);
745 let _ = store.install_pack_streaming(
746 std::path::Path::new("/nonexistent/pack"),
747 std::path::Path::new("/nonexistent/idx"),
748 );
749
750 store.begin_snapshot_write_batch().unwrap();
752 store.flush_snapshot_write_batch().unwrap();
753 store.begin_snapshot_write_batch().unwrap();
754 store.abort_snapshot_write_batch();
755
756 let redaction = b"any-store redaction bytes";
758 store
759 .put_redactions_bytes_for_blob(&blob_hash, redaction)
760 .unwrap();
761 assert!(store.has_redactions_for_blob(&blob_hash).unwrap());
762 assert_eq!(
763 store
764 .get_redactions_bytes_for_blob(&blob_hash)
765 .unwrap()
766 .as_deref(),
767 Some(redaction.as_slice())
768 );
769 assert!(
770 store
771 .list_blobs_with_redactions()
772 .unwrap()
773 .contains(&blob_hash)
774 );
775
776 let state_visibility = b"any-store state visibility bytes";
778 store
779 .put_state_visibility_bytes_for_state(&change_id, state_visibility)
780 .unwrap();
781 assert!(store.has_state_visibility_for_state(&change_id).unwrap());
782 assert_eq!(
783 store
784 .get_state_visibility_bytes_for_state(&change_id)
785 .unwrap()
786 .as_deref(),
787 Some(state_visibility.as_slice())
788 );
789 assert!(
790 store
791 .list_states_with_visibility()
792 .unwrap()
793 .contains(&change_id)
794 );
795
796 store.clear_recent_caches();
798 }
799}