1use std::path::PathBuf;
5
6use crate::object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree};
7
8pub mod agent_registry;
9pub mod agent_task;
10pub mod codec;
11pub mod fs;
12pub mod liveness;
13#[cfg(any(test, feature = "memory-backend"))]
14pub mod memory;
15pub mod pack;
16pub mod shallow;
17pub mod source;
18pub mod store_compliance;
19
20pub use agent_registry::{
21 ActorChainNode, AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary, ContextQueryEntry,
22 ReserveOutcome, generate_agent_id,
23};
24pub use agent_task::{
25 AGENT_TASK_SCHEMA_VERSION, AgentTaskRecord, AgentTaskStatus, AgentTaskStore,
26 generate_agent_task_id, validate_task_id,
27};
28pub use fs::FsStore;
29pub use heddle_format::compression::{CompressionConfig, CompressionError, compress, decompress};
30pub use liveness::{Liveness, current_boot_id, is_owner_alive, process_alive};
31#[cfg(any(test, feature = "memory-backend"))]
32pub use memory::InMemoryStore;
33pub use pack::{PackBuilder, PackObjectId, PackReader, PackStats};
34pub use shallow::ShallowInfo;
35#[cfg(feature = "async-source")]
36pub use source::AsyncObjectSource;
37pub use source::ObjectSource;
38
39pub use crate::error::{HeddleError as StoreError, HeddleError, Result};
40
41impl From<CompressionError> for HeddleError {
42 fn from(e: CompressionError) -> Self {
43 HeddleError::Compression(e.to_string())
44 }
45}
46
47#[derive(Clone)]
59pub enum AnyStore {
60 Fs(FsStore),
61}
62
63macro_rules! any_store_dispatch {
69 ($self:ident, $method:ident ( $($arg:expr),* )) => {
70 match $self {
71 AnyStore::Fs(inner) => inner.$method($($arg),*),
72 }
73 };
74}
75
76impl ObjectStore for AnyStore {
77 fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
78 match self {
79 AnyStore::Fs(inner) => ObjectStore::get_blob(inner, hash),
80 }
81 }
82 fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
83 any_store_dispatch!(self, put_blob(blob))
84 }
85 fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
86 match self {
87 AnyStore::Fs(inner) => ObjectStore::get_blob_bytes(inner, hash),
88 }
89 }
90 fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
91 any_store_dispatch!(self, blob_size(hash))
92 }
93 fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
94 any_store_dispatch!(self, loose_blob_path(hash))
95 }
96 fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
97 any_store_dispatch!(self, promote_to_loose_uncompressed(hash))
98 }
99 fn clear_recent_caches(&self) {
100 any_store_dispatch!(self, clear_recent_caches())
101 }
102 fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
103 any_store_dispatch!(self, put_blob_with_hash(blob, hash))
104 }
105 fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
106 any_store_dispatch!(self, has_blob(hash))
107 }
108 fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
109 match self {
110 AnyStore::Fs(inner) => ObjectStore::get_tree(inner, hash),
111 }
112 }
113 fn get_tree_serialized(&self, hash: &ContentHash) -> Result<Option<Vec<u8>>> {
114 match self {
115 AnyStore::Fs(inner) => ObjectStore::get_tree_serialized(inner, hash),
116 }
117 }
118 fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
119 any_store_dispatch!(self, put_tree(tree))
120 }
121 fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
122 any_store_dispatch!(self, has_tree(hash))
123 }
124 fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
125 match self {
126 AnyStore::Fs(inner) => ObjectStore::get_state(inner, id),
127 }
128 }
129 fn put_state(&self, state: &State) -> Result<()> {
130 any_store_dispatch!(self, put_state(state))
131 }
132 fn has_state(&self, id: &ChangeId) -> Result<bool> {
133 any_store_dispatch!(self, has_state(id))
134 }
135 fn list_states(&self) -> Result<Vec<ChangeId>> {
136 any_store_dispatch!(self, list_states())
137 }
138 fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
139 any_store_dispatch!(self, get_action(id))
140 }
141 fn put_action(&self, action: &mut Action) -> Result<ActionId> {
142 any_store_dispatch!(self, put_action(action))
143 }
144 fn list_actions(&self) -> Result<Vec<ActionId>> {
145 any_store_dispatch!(self, list_actions())
146 }
147 fn list_blobs(&self) -> Result<Vec<ContentHash>> {
148 any_store_dispatch!(self, list_blobs())
149 }
150 fn list_trees(&self) -> Result<Vec<ContentHash>> {
151 any_store_dispatch!(self, list_trees())
152 }
153 fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
154 any_store_dispatch!(self, put_blob_bytes_with_hash(data, hash))
155 }
156 fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
157 match self {
158 AnyStore::Fs(inner) => ObjectStore::put_tree_serialized(inner, data, hash),
159 }
160 }
161 fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
162 any_store_dispatch!(self, put_state_serialized(data, id))
163 }
164 fn put_action_serialized(&self, data: &[u8], id: ActionId) -> Result<()> {
165 any_store_dispatch!(self, put_action_serialized(data, id))
166 }
167 fn get_pack_object(
168 &self,
169 id: &pack::PackObjectId,
170 ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
171 any_store_dispatch!(self, get_pack_object(id))
172 }
173 fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
174 any_store_dispatch!(self, put_blobs_packed(blobs))
175 }
176 fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
177 any_store_dispatch!(self, install_pack(pack_data, index_data))
178 }
179 fn install_pack_streaming(
180 &self,
181 pack_path: &std::path::Path,
182 index_path: &std::path::Path,
183 ) -> Result<Vec<pack::PackObjectId>> {
184 any_store_dispatch!(self, install_pack_streaming(pack_path, index_path))
185 }
186 fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
187 any_store_dispatch!(self, pack_objects(aggressive))
188 }
189 fn prune_loose_objects(&self) -> Result<(u64, u64)> {
190 any_store_dispatch!(self, prune_loose_objects())
191 }
192 fn begin_snapshot_write_batch(&self) -> Result<()> {
193 any_store_dispatch!(self, begin_snapshot_write_batch())
194 }
195 fn flush_snapshot_write_batch(&self) -> Result<()> {
196 any_store_dispatch!(self, flush_snapshot_write_batch())
197 }
198 fn abort_snapshot_write_batch(&self) {
199 any_store_dispatch!(self, abort_snapshot_write_batch())
200 }
201 fn has_redactions_for_blob(&self, blob: &ContentHash) -> Result<bool> {
202 any_store_dispatch!(self, has_redactions_for_blob(blob))
203 }
204 fn get_redactions_bytes_for_blob(&self, blob: &ContentHash) -> Result<Option<Vec<u8>>> {
205 any_store_dispatch!(self, get_redactions_bytes_for_blob(blob))
206 }
207 fn put_redactions_bytes_for_blob(&self, blob: &ContentHash, bytes: &[u8]) -> Result<()> {
208 any_store_dispatch!(self, put_redactions_bytes_for_blob(blob, bytes))
209 }
210 fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
211 any_store_dispatch!(self, list_blobs_with_redactions())
212 }
213 fn has_state_visibility_for_state(&self, state: &ChangeId) -> Result<bool> {
214 any_store_dispatch!(self, has_state_visibility_for_state(state))
215 }
216 fn get_state_visibility_bytes_for_state(&self, state: &ChangeId) -> Result<Option<Vec<u8>>> {
217 any_store_dispatch!(self, get_state_visibility_bytes_for_state(state))
218 }
219 fn put_state_visibility_bytes_for_state(&self, state: &ChangeId, bytes: &[u8]) -> Result<()> {
220 any_store_dispatch!(self, put_state_visibility_bytes_for_state(state, bytes))
221 }
222 fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
223 any_store_dispatch!(self, list_states_with_visibility())
224 }
225}
226
227pub trait ObjectStore: Send + Sync {
229 fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>>;
230 fn put_blob(&self, blob: &Blob) -> Result<ContentHash>;
231
232 fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
243 Ok(self
244 .get_blob(hash)?
245 .map(|blob| bytes::Bytes::from(blob.into_content())))
246 }
247
248 fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
262 Ok(self.get_blob(hash)?.map(|blob| blob.content().len() as u64))
263 }
264
265 fn loose_blob_path(&self, _hash: &ContentHash) -> Option<PathBuf> {
277 None
278 }
279
280 fn promote_to_loose_uncompressed(&self, _hash: &ContentHash) -> Result<bool> {
317 Ok(false)
318 }
319
320 fn clear_recent_caches(&self) {}
329
330 fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
331 if blob.hash() != hash {
332 return Err(HeddleError::InvalidObject("blob hash mismatch".to_string()));
333 }
334 self.put_blob(blob)
335 }
336
337 fn has_blob(&self, hash: &ContentHash) -> Result<bool>;
338 fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>>;
339 fn put_tree(&self, tree: &Tree) -> Result<ContentHash>;
340 fn has_tree(&self, hash: &ContentHash) -> Result<bool>;
341 fn get_state(&self, id: &ChangeId) -> Result<Option<State>>;
342 fn put_state(&self, state: &State) -> Result<()>;
343 fn has_state(&self, id: &ChangeId) -> Result<bool>;
344 fn list_states(&self) -> Result<Vec<ChangeId>>;
345 fn get_action(&self, id: &ActionId) -> Result<Option<Action>>;
346 fn put_action(&self, action: &mut Action) -> Result<ActionId>;
347 fn list_actions(&self) -> Result<Vec<ActionId>>;
348 fn list_blobs(&self) -> Result<Vec<ContentHash>>;
349 fn list_trees(&self) -> Result<Vec<ContentHash>>;
350
351 fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
352 self.put_blob_with_hash(&Blob::from_slice(data), hash)
353 }
354
355 fn get_tree_serialized(&self, hash: &ContentHash) -> Result<Option<Vec<u8>>> {
363 Ok(self
364 .get_tree(hash)?
365 .map(|tree| rmp_serde::to_vec(&tree))
366 .transpose()?)
367 }
368
369 fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
370 let tree: Tree = rmp_serde::from_slice(data)?;
371 tree.validate()?;
372 if tree.hash() != hash {
373 return Err(HeddleError::Corruption {
374 expected: hash,
375 found: tree.hash(),
376 });
377 }
378 self.put_tree(&tree)
379 }
380
381 fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
382 let state: State = rmp_serde::from_slice(data)?;
383 if state.change_id != id {
384 return Err(HeddleError::InvalidObject(format!(
385 "state change_id mismatch: expected {}, found {}",
386 id, state.change_id
387 )));
388 }
389 self.put_state(&state)
390 }
391
392 fn put_action_serialized(&self, data: &[u8], id: ActionId) -> Result<()> {
393 let mut action: Action = rmp_serde::from_slice(data)?;
394 let found_id = action.compute_id();
395 if found_id != id {
396 return Err(HeddleError::InvalidObject(format!(
397 "action id mismatch: expected {}, found {}",
398 id, found_id
399 )));
400 }
401 let stored_id = self.put_action(&mut action)?;
402 if stored_id != id {
403 return Err(HeddleError::InvalidObject(format!(
404 "action id mismatch after write: expected {}, found {}",
405 id, stored_id
406 )));
407 }
408 Ok(())
409 }
410
411 fn get_pack_object(
412 &self,
413 id: &pack::PackObjectId,
414 ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
415 match id {
416 pack::PackObjectId::Hash(hash) => {
417 if let Some(blob) = self.get_blob(hash)? {
418 return Ok(Some((pack::ObjectType::Blob, blob.content().to_vec())));
419 }
420 if let Some(tree) = self.get_tree(hash)? {
421 return Ok(Some((
422 pack::ObjectType::Tree,
423 rmp_serde::to_vec_named(&tree)?,
424 )));
425 }
426 if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
427 return Ok(Some((
428 pack::ObjectType::Action,
429 rmp_serde::to_vec_named(&action)?,
430 )));
431 }
432 Ok(None)
433 }
434 pack::PackObjectId::ChangeId(change_id) => {
435 if let Some(state) = self.get_state(change_id)? {
436 Ok(Some((
437 pack::ObjectType::State,
438 rmp_serde::to_vec_named(&state)?,
439 )))
440 } else {
441 Ok(None)
442 }
443 }
444 }
445 }
446
447 fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
457 for (hash, data) in blobs {
458 if !self.has_blob(&hash)? {
459 self.put_blob_bytes_with_hash(&data, hash)?;
460 }
461 }
462 Ok(())
463 }
464
465 fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
466 let reader = pack::PackReader::from_slice(pack_data, index_data)?;
467 let ids = reader.list_ids();
468 for id in &ids {
469 let Some((obj_type, data)) = reader.get_object(id)? else {
470 continue;
471 };
472 match (id, obj_type) {
473 (pack::PackObjectId::Hash(hash), pack::ObjectType::Blob) => {
474 self.put_blob_bytes_with_hash(&data, *hash)?;
475 }
476 (pack::PackObjectId::Hash(hash), pack::ObjectType::Tree) => {
477 self.put_tree_serialized(&data, *hash)?;
478 }
479 (pack::PackObjectId::Hash(hash), pack::ObjectType::Action) => {
480 self.put_action_serialized(&data, ActionId::from_hash(*hash))?;
481 }
482 (pack::PackObjectId::ChangeId(change_id), pack::ObjectType::State) => {
483 self.put_state_serialized(&data, *change_id)?;
484 }
485 _ => {
486 return Err(HeddleError::InvalidObject(format!(
487 "unsupported native pack object: {:?} {:?}",
488 id, obj_type
489 )));
490 }
491 }
492 }
493 Ok(ids)
494 }
495
496 fn install_pack_streaming(
513 &self,
514 pack_path: &std::path::Path,
515 index_path: &std::path::Path,
516 ) -> Result<Vec<pack::PackObjectId>> {
517 let pack_data = std::fs::read(pack_path).map_err(StoreError::from)?;
518 let index_data = std::fs::read(index_path).map_err(StoreError::from)?;
519 let ids = self.install_pack(&pack_data, &index_data)?;
520 let _ = std::fs::remove_file(pack_path);
524 let _ = std::fs::remove_file(index_path);
525 Ok(ids)
526 }
527
528 fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
529 let _ = aggressive;
530 Ok((0, 0))
531 }
532
533 fn prune_loose_objects(&self) -> Result<(u64, u64)> {
534 Ok((0, 0))
535 }
536
537 fn begin_snapshot_write_batch(&self) -> Result<()> {
538 Ok(())
539 }
540
541 fn flush_snapshot_write_batch(&self) -> Result<()> {
542 Ok(())
543 }
544
545 fn abort_snapshot_write_batch(&self) {}
546
547 fn has_redactions_for_blob(&self, _blob: &ContentHash) -> Result<bool> {
559 Ok(false)
560 }
561
562 fn get_redactions_bytes_for_blob(&self, _blob: &ContentHash) -> Result<Option<Vec<u8>>> {
570 Ok(None)
571 }
572
573 fn put_redactions_bytes_for_blob(&self, _blob: &ContentHash, _bytes: &[u8]) -> Result<()> {
582 Err(HeddleError::InvalidObject(
583 "this object store does not support persisting redactions".to_string(),
584 ))
585 }
586
587 fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
594 Ok(Vec::new())
595 }
596
597 fn has_state_visibility_for_state(&self, _state: &ChangeId) -> Result<bool> {
607 Ok(false)
608 }
609
610 fn get_state_visibility_bytes_for_state(&self, _state: &ChangeId) -> Result<Option<Vec<u8>>> {
616 Ok(None)
617 }
618
619 fn put_state_visibility_bytes_for_state(&self, _state: &ChangeId, _bytes: &[u8]) -> Result<()> {
624 Err(HeddleError::InvalidObject(
625 "this object store does not support persisting state visibility".to_string(),
626 ))
627 }
628
629 fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
633 Ok(Vec::new())
634 }
635}
636
637#[cfg(test)]
638mod any_store_tests {
639 use tempfile::TempDir;
640
641 use super::*;
642 use crate::object::{Attribution, Operation, Principal};
643
644 fn fs_any_store() -> (TempDir, AnyStore) {
645 let temp = TempDir::new().unwrap();
646 let store = FsStore::new(temp.path().join(".heddle"));
647 store.init().unwrap();
648 (temp, AnyStore::Fs(store))
649 }
650
651 #[test]
657 fn fs_variant_dispatches_every_object_store_method() {
658 let (_temp, store) = fs_any_store();
659
660 let blob = Blob::from("any-store dispatch blob");
662 let blob_hash = store.put_blob(&blob).unwrap();
663 assert_eq!(
664 ObjectStore::get_blob(&store, &blob_hash)
665 .unwrap()
666 .unwrap()
667 .content(),
668 blob.content()
669 );
670 assert!(store.has_blob(&blob_hash).unwrap());
671 assert_eq!(
672 ObjectStore::get_blob_bytes(&store, &blob_hash)
673 .unwrap()
674 .unwrap()
675 .as_ref(),
676 blob.content()
677 );
678 assert_eq!(
679 store.blob_size(&blob_hash).unwrap().unwrap(),
680 blob.content().len() as u64
681 );
682 assert!(store.loose_blob_path(&blob_hash).is_some());
683 store.promote_to_loose_uncompressed(&blob_hash).unwrap();
684 assert!(store.list_blobs().unwrap().contains(&blob_hash));
685
686 let bytes_blob = Blob::from("put-with-hash blob");
687 let bytes_hash = bytes_blob.hash();
688 assert_eq!(
689 store.put_blob_with_hash(&bytes_blob, bytes_hash).unwrap(),
690 bytes_hash
691 );
692 let raw_blob = Blob::from("raw bytes blob");
693 let raw_hash = raw_blob.hash();
694 assert_eq!(
695 store
696 .put_blob_bytes_with_hash(raw_blob.content(), raw_hash)
697 .unwrap(),
698 raw_hash
699 );
700
701 let tree = Tree::new();
703 let tree_hash = store.put_tree(&tree).unwrap();
704 assert!(ObjectStore::get_tree(&store, &tree_hash).unwrap().is_some());
705 assert!(store.has_tree(&tree_hash).unwrap());
706 assert!(store.list_trees().unwrap().contains(&tree_hash));
707 let tree2 = Tree::new();
708 let tree2_bytes = rmp_serde::to_vec_named(&tree2).unwrap();
709 assert_eq!(
710 store
711 .put_tree_serialized(&tree2_bytes, tree2.hash())
712 .unwrap(),
713 tree2.hash()
714 );
715
716 let attribution =
718 Attribution::human(Principal::new("AnyStore Test", "anystore@example.com"));
719 let state = State::new(tree_hash, vec![], attribution.clone());
720 let change_id = state.change_id;
721 store.put_state(&state).unwrap();
722 assert!(
723 ObjectStore::get_state(&store, &change_id)
724 .unwrap()
725 .is_some()
726 );
727 assert!(store.has_state(&change_id).unwrap());
728 assert!(store.list_states().unwrap().contains(&change_id));
729 let state2 = State::new(tree2.hash(), vec![], attribution.clone());
730 let state2_bytes = rmp_serde::to_vec_named(&state2).unwrap();
731 store
732 .put_state_serialized(&state2_bytes, state2.change_id)
733 .unwrap();
734
735 let mut action = Action::new(
737 None,
738 ChangeId::generate(),
739 Operation::Snapshot,
740 "any-store action",
741 attribution,
742 );
743 let action_id = store.put_action(&mut action).unwrap();
744 assert!(store.get_action(&action_id).unwrap().is_some());
745 assert!(store.list_actions().unwrap().contains(&action_id));
746 let action_bytes = rmp_serde::to_vec_named(&action).unwrap();
747 store
748 .put_action_serialized(&action_bytes, action_id)
749 .unwrap();
750
751 let packed = Blob::from("packed-via-any-store");
753 let packed_hash = packed.hash();
754 store
755 .put_blobs_packed(vec![(packed_hash, packed.into_content())])
756 .unwrap();
757 assert!(
758 store
759 .get_pack_object(&pack::PackObjectId::Hash(packed_hash))
760 .unwrap()
761 .is_some()
762 );
763 store.pack_objects(false).unwrap();
764 store.prune_loose_objects().unwrap();
765 let _ = store.install_pack(&[], &[]);
769 let _ = store.install_pack_streaming(
770 std::path::Path::new("/nonexistent/pack"),
771 std::path::Path::new("/nonexistent/idx"),
772 );
773
774 store.begin_snapshot_write_batch().unwrap();
776 store.flush_snapshot_write_batch().unwrap();
777 store.begin_snapshot_write_batch().unwrap();
778 store.abort_snapshot_write_batch();
779
780 let redaction = b"any-store redaction bytes";
782 store
783 .put_redactions_bytes_for_blob(&blob_hash, redaction)
784 .unwrap();
785 assert!(store.has_redactions_for_blob(&blob_hash).unwrap());
786 assert_eq!(
787 store
788 .get_redactions_bytes_for_blob(&blob_hash)
789 .unwrap()
790 .as_deref(),
791 Some(redaction.as_slice())
792 );
793 assert!(
794 store
795 .list_blobs_with_redactions()
796 .unwrap()
797 .contains(&blob_hash)
798 );
799
800 let state_visibility = b"any-store state visibility bytes";
802 store
803 .put_state_visibility_bytes_for_state(&change_id, state_visibility)
804 .unwrap();
805 assert!(store.has_state_visibility_for_state(&change_id).unwrap());
806 assert_eq!(
807 store
808 .get_state_visibility_bytes_for_state(&change_id)
809 .unwrap()
810 .as_deref(),
811 Some(state_visibility.as_slice())
812 );
813 assert!(
814 store
815 .list_states_with_visibility()
816 .unwrap()
817 .contains(&change_id)
818 );
819
820 store.clear_recent_caches();
822 }
823}