1use std::path::PathBuf;
5
6use crate::object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree};
7
8pub mod agent_registry;
9pub mod atomic;
10pub mod codec;
11pub mod compression;
12pub mod fs;
13pub mod liveness;
14#[cfg(any(test, feature = "memory-backend"))]
15pub mod memory;
16pub mod pack;
17pub mod shallow;
18pub mod source;
19pub mod store_compliance;
20
21pub use agent_registry::{
22 ActorChainNode, AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary, ContextQueryEntry,
23 ReserveOutcome, generate_agent_id,
24};
25pub use compression::{CompressionConfig, CompressionError, compress, decompress};
26pub use fs::FsStore;
27pub use liveness::{Liveness, current_boot_id, is_owner_alive, process_alive};
28#[cfg(any(test, feature = "memory-backend"))]
29pub use memory::InMemoryStore;
30pub use pack::{PackBuilder, PackObjectId, PackReader, PackStats};
31pub use shallow::ShallowInfo;
32#[cfg(feature = "async-source")]
33pub use source::AsyncObjectSource;
34pub use source::ObjectSource;
35
36pub use crate::error::{HeddleError as StoreError, HeddleError, Result};
37
38impl From<CompressionError> for HeddleError {
39 fn from(e: CompressionError) -> Self {
40 HeddleError::Compression(e.to_string())
41 }
42}
43
44pub enum AnyStore {
56 Fs(FsStore),
57}
58
59macro_rules! any_store_dispatch {
65 ($self:ident, $method:ident ( $($arg:expr),* )) => {
66 match $self {
67 AnyStore::Fs(inner) => inner.$method($($arg),*),
68 }
69 };
70}
71
72impl ObjectStore for AnyStore {
73 fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
74 match self {
75 AnyStore::Fs(inner) => ObjectStore::get_blob(inner, hash),
76 }
77 }
78 fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
79 any_store_dispatch!(self, put_blob(blob))
80 }
81 fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
82 match self {
83 AnyStore::Fs(inner) => ObjectStore::get_blob_bytes(inner, hash),
84 }
85 }
86 fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
87 any_store_dispatch!(self, blob_size(hash))
88 }
89 fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
90 any_store_dispatch!(self, loose_blob_path(hash))
91 }
92 fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
93 any_store_dispatch!(self, promote_to_loose_uncompressed(hash))
94 }
95 fn clear_recent_caches(&self) {
96 any_store_dispatch!(self, clear_recent_caches())
97 }
98 fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
99 any_store_dispatch!(self, put_blob_with_hash(blob, hash))
100 }
101 fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
102 any_store_dispatch!(self, has_blob(hash))
103 }
104 fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
105 match self {
106 AnyStore::Fs(inner) => ObjectStore::get_tree(inner, hash),
107 }
108 }
109 fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
110 any_store_dispatch!(self, put_tree(tree))
111 }
112 fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
113 any_store_dispatch!(self, has_tree(hash))
114 }
115 fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
116 match self {
117 AnyStore::Fs(inner) => ObjectStore::get_state(inner, id),
118 }
119 }
120 fn put_state(&self, state: &State) -> Result<()> {
121 any_store_dispatch!(self, put_state(state))
122 }
123 fn has_state(&self, id: &ChangeId) -> Result<bool> {
124 any_store_dispatch!(self, has_state(id))
125 }
126 fn list_states(&self) -> Result<Vec<ChangeId>> {
127 any_store_dispatch!(self, list_states())
128 }
129 fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
130 any_store_dispatch!(self, get_action(id))
131 }
132 fn put_action(&self, action: &mut Action) -> Result<ActionId> {
133 any_store_dispatch!(self, put_action(action))
134 }
135 fn list_actions(&self) -> Result<Vec<ActionId>> {
136 any_store_dispatch!(self, list_actions())
137 }
138 fn list_blobs(&self) -> Result<Vec<ContentHash>> {
139 any_store_dispatch!(self, list_blobs())
140 }
141 fn list_trees(&self) -> Result<Vec<ContentHash>> {
142 any_store_dispatch!(self, list_trees())
143 }
144 fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
145 any_store_dispatch!(self, put_blob_bytes_with_hash(data, hash))
146 }
147 fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
148 any_store_dispatch!(self, put_tree_serialized(data, hash))
149 }
150 fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
151 any_store_dispatch!(self, put_state_serialized(data, id))
152 }
153 fn put_action_serialized(&self, data: &[u8], id: ActionId) -> Result<()> {
154 any_store_dispatch!(self, put_action_serialized(data, id))
155 }
156 fn get_pack_object(
157 &self,
158 id: &pack::PackObjectId,
159 ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
160 any_store_dispatch!(self, get_pack_object(id))
161 }
162 fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
163 any_store_dispatch!(self, put_blobs_packed(blobs))
164 }
165 fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
166 any_store_dispatch!(self, install_pack(pack_data, index_data))
167 }
168 fn install_pack_streaming(
169 &self,
170 pack_path: &std::path::Path,
171 index_path: &std::path::Path,
172 ) -> Result<Vec<pack::PackObjectId>> {
173 any_store_dispatch!(self, install_pack_streaming(pack_path, index_path))
174 }
175 fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
176 any_store_dispatch!(self, pack_objects(aggressive))
177 }
178 fn prune_loose_objects(&self) -> Result<(u64, u64)> {
179 any_store_dispatch!(self, prune_loose_objects())
180 }
181 fn begin_snapshot_write_batch(&self) -> Result<()> {
182 any_store_dispatch!(self, begin_snapshot_write_batch())
183 }
184 fn flush_snapshot_write_batch(&self) -> Result<()> {
185 any_store_dispatch!(self, flush_snapshot_write_batch())
186 }
187 fn abort_snapshot_write_batch(&self) {
188 any_store_dispatch!(self, abort_snapshot_write_batch())
189 }
190 fn has_redactions_for_blob(&self, blob: &ContentHash) -> Result<bool> {
191 any_store_dispatch!(self, has_redactions_for_blob(blob))
192 }
193 fn get_redactions_bytes_for_blob(&self, blob: &ContentHash) -> Result<Option<Vec<u8>>> {
194 any_store_dispatch!(self, get_redactions_bytes_for_blob(blob))
195 }
196 fn put_redactions_bytes_for_blob(&self, blob: &ContentHash, bytes: &[u8]) -> Result<()> {
197 any_store_dispatch!(self, put_redactions_bytes_for_blob(blob, bytes))
198 }
199 fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
200 any_store_dispatch!(self, list_blobs_with_redactions())
201 }
202 fn has_state_visibility_for_state(&self, state: &ChangeId) -> Result<bool> {
203 any_store_dispatch!(self, has_state_visibility_for_state(state))
204 }
205 fn get_state_visibility_bytes_for_state(&self, state: &ChangeId) -> Result<Option<Vec<u8>>> {
206 any_store_dispatch!(self, get_state_visibility_bytes_for_state(state))
207 }
208 fn put_state_visibility_bytes_for_state(&self, state: &ChangeId, bytes: &[u8]) -> Result<()> {
209 any_store_dispatch!(self, put_state_visibility_bytes_for_state(state, bytes))
210 }
211 fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
212 any_store_dispatch!(self, list_states_with_visibility())
213 }
214}
215
216pub trait ObjectStore: Send + Sync {
218 fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>>;
219 fn put_blob(&self, blob: &Blob) -> Result<ContentHash>;
220
221 fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
232 Ok(self
233 .get_blob(hash)?
234 .map(|blob| bytes::Bytes::from(blob.into_content())))
235 }
236
237 fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
251 Ok(self.get_blob(hash)?.map(|blob| blob.content().len() as u64))
252 }
253
254 fn loose_blob_path(&self, _hash: &ContentHash) -> Option<PathBuf> {
266 None
267 }
268
269 fn promote_to_loose_uncompressed(&self, _hash: &ContentHash) -> Result<bool> {
306 Ok(false)
307 }
308
309 fn clear_recent_caches(&self) {}
318
319 fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
320 if blob.hash() != hash {
321 return Err(HeddleError::InvalidObject("blob hash mismatch".to_string()));
322 }
323 self.put_blob(blob)
324 }
325
326 fn has_blob(&self, hash: &ContentHash) -> Result<bool>;
327 fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>>;
328 fn put_tree(&self, tree: &Tree) -> Result<ContentHash>;
329 fn has_tree(&self, hash: &ContentHash) -> Result<bool>;
330 fn get_state(&self, id: &ChangeId) -> Result<Option<State>>;
331 fn put_state(&self, state: &State) -> Result<()>;
332 fn has_state(&self, id: &ChangeId) -> Result<bool>;
333 fn list_states(&self) -> Result<Vec<ChangeId>>;
334 fn get_action(&self, id: &ActionId) -> Result<Option<Action>>;
335 fn put_action(&self, action: &mut Action) -> Result<ActionId>;
336 fn list_actions(&self) -> Result<Vec<ActionId>>;
337 fn list_blobs(&self) -> Result<Vec<ContentHash>>;
338 fn list_trees(&self) -> Result<Vec<ContentHash>>;
339
340 fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
341 self.put_blob_with_hash(&Blob::from_slice(data), hash)
342 }
343
344 fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
345 let tree: Tree = rmp_serde::from_slice(data)?;
346 tree.validate()?;
347 if tree.hash() != hash {
348 return Err(HeddleError::Corruption {
349 expected: hash,
350 found: tree.hash(),
351 });
352 }
353 self.put_tree(&tree)
354 }
355
356 fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
357 let state: State = rmp_serde::from_slice(data)?;
358 if state.change_id != id {
359 return Err(HeddleError::InvalidObject(format!(
360 "state change_id mismatch: expected {}, found {}",
361 id, state.change_id
362 )));
363 }
364 self.put_state(&state)
365 }
366
367 fn put_action_serialized(&self, data: &[u8], id: ActionId) -> Result<()> {
368 let mut action: Action = rmp_serde::from_slice(data)?;
369 let found_id = action.compute_id();
370 if found_id != id {
371 return Err(HeddleError::InvalidObject(format!(
372 "action id mismatch: expected {}, found {}",
373 id, found_id
374 )));
375 }
376 let stored_id = self.put_action(&mut action)?;
377 if stored_id != id {
378 return Err(HeddleError::InvalidObject(format!(
379 "action id mismatch after write: expected {}, found {}",
380 id, stored_id
381 )));
382 }
383 Ok(())
384 }
385
386 fn get_pack_object(
387 &self,
388 id: &pack::PackObjectId,
389 ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
390 match id {
391 pack::PackObjectId::Hash(hash) => {
392 if let Some(blob) = self.get_blob(hash)? {
393 return Ok(Some((pack::ObjectType::Blob, blob.content().to_vec())));
394 }
395 if let Some(tree) = self.get_tree(hash)? {
396 return Ok(Some((
397 pack::ObjectType::Tree,
398 rmp_serde::to_vec_named(&tree)?,
399 )));
400 }
401 if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
402 return Ok(Some((
403 pack::ObjectType::Action,
404 rmp_serde::to_vec_named(&action)?,
405 )));
406 }
407 Ok(None)
408 }
409 pack::PackObjectId::ChangeId(change_id) => {
410 if let Some(state) = self.get_state(change_id)? {
411 Ok(Some((
412 pack::ObjectType::State,
413 rmp_serde::to_vec_named(&state)?,
414 )))
415 } else {
416 Ok(None)
417 }
418 }
419 }
420 }
421
422 fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
432 for (hash, data) in blobs {
433 if !self.has_blob(&hash)? {
434 self.put_blob_bytes_with_hash(&data, hash)?;
435 }
436 }
437 Ok(())
438 }
439
440 fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
441 let reader = pack::PackReader::from_slice(pack_data, index_data)?;
442 let ids = reader.list_ids();
443 for id in &ids {
444 let Some((obj_type, data)) = reader.get_object(id)? else {
445 continue;
446 };
447 match (id, obj_type) {
448 (pack::PackObjectId::Hash(hash), pack::ObjectType::Blob) => {
449 self.put_blob_bytes_with_hash(&data, *hash)?;
450 }
451 (pack::PackObjectId::Hash(hash), pack::ObjectType::Tree) => {
452 self.put_tree_serialized(&data, *hash)?;
453 }
454 (pack::PackObjectId::Hash(hash), pack::ObjectType::Action) => {
455 self.put_action_serialized(&data, ActionId::from_hash(*hash))?;
456 }
457 (pack::PackObjectId::ChangeId(change_id), pack::ObjectType::State) => {
458 self.put_state_serialized(&data, *change_id)?;
459 }
460 _ => {
461 return Err(HeddleError::InvalidObject(format!(
462 "unsupported native pack object: {:?} {:?}",
463 id, obj_type
464 )));
465 }
466 }
467 }
468 Ok(ids)
469 }
470
471 fn install_pack_streaming(
488 &self,
489 pack_path: &std::path::Path,
490 index_path: &std::path::Path,
491 ) -> Result<Vec<pack::PackObjectId>> {
492 let pack_data = std::fs::read(pack_path).map_err(StoreError::from)?;
493 let index_data = std::fs::read(index_path).map_err(StoreError::from)?;
494 let ids = self.install_pack(&pack_data, &index_data)?;
495 let _ = std::fs::remove_file(pack_path);
499 let _ = std::fs::remove_file(index_path);
500 Ok(ids)
501 }
502
503 fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
504 let _ = aggressive;
505 Ok((0, 0))
506 }
507
508 fn prune_loose_objects(&self) -> Result<(u64, u64)> {
509 Ok((0, 0))
510 }
511
512 fn begin_snapshot_write_batch(&self) -> Result<()> {
513 Ok(())
514 }
515
516 fn flush_snapshot_write_batch(&self) -> Result<()> {
517 Ok(())
518 }
519
520 fn abort_snapshot_write_batch(&self) {}
521
522 fn has_redactions_for_blob(&self, _blob: &ContentHash) -> Result<bool> {
534 Ok(false)
535 }
536
537 fn get_redactions_bytes_for_blob(&self, _blob: &ContentHash) -> Result<Option<Vec<u8>>> {
545 Ok(None)
546 }
547
548 fn put_redactions_bytes_for_blob(&self, _blob: &ContentHash, _bytes: &[u8]) -> Result<()> {
557 Err(HeddleError::InvalidObject(
558 "this object store does not support persisting redactions".to_string(),
559 ))
560 }
561
562 fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
569 Ok(Vec::new())
570 }
571
572 fn has_state_visibility_for_state(&self, _state: &ChangeId) -> Result<bool> {
582 Ok(false)
583 }
584
585 fn get_state_visibility_bytes_for_state(&self, _state: &ChangeId) -> Result<Option<Vec<u8>>> {
591 Ok(None)
592 }
593
594 fn put_state_visibility_bytes_for_state(&self, _state: &ChangeId, _bytes: &[u8]) -> Result<()> {
599 Err(HeddleError::InvalidObject(
600 "this object store does not support persisting state visibility".to_string(),
601 ))
602 }
603
604 fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
608 Ok(Vec::new())
609 }
610}
611
612#[cfg(test)]
613mod any_store_tests {
614 use tempfile::TempDir;
615
616 use super::*;
617 use crate::object::{Attribution, Operation, Principal};
618
619 fn fs_any_store() -> (TempDir, AnyStore) {
620 let temp = TempDir::new().unwrap();
621 let store = FsStore::new(temp.path().join(".heddle"));
622 store.init().unwrap();
623 (temp, AnyStore::Fs(store))
624 }
625
626 #[test]
632 fn fs_variant_dispatches_every_object_store_method() {
633 let (_temp, store) = fs_any_store();
634
635 let blob = Blob::from("any-store dispatch blob");
637 let blob_hash = store.put_blob(&blob).unwrap();
638 assert_eq!(
639 ObjectStore::get_blob(&store, &blob_hash)
640 .unwrap()
641 .unwrap()
642 .content(),
643 blob.content()
644 );
645 assert!(store.has_blob(&blob_hash).unwrap());
646 assert_eq!(
647 ObjectStore::get_blob_bytes(&store, &blob_hash)
648 .unwrap()
649 .unwrap()
650 .as_ref(),
651 blob.content()
652 );
653 assert_eq!(
654 store.blob_size(&blob_hash).unwrap().unwrap(),
655 blob.content().len() as u64
656 );
657 assert!(store.loose_blob_path(&blob_hash).is_some());
658 store.promote_to_loose_uncompressed(&blob_hash).unwrap();
659 assert!(store.list_blobs().unwrap().contains(&blob_hash));
660
661 let bytes_blob = Blob::from("put-with-hash blob");
662 let bytes_hash = bytes_blob.hash();
663 assert_eq!(
664 store.put_blob_with_hash(&bytes_blob, bytes_hash).unwrap(),
665 bytes_hash
666 );
667 let raw_blob = Blob::from("raw bytes blob");
668 let raw_hash = raw_blob.hash();
669 assert_eq!(
670 store
671 .put_blob_bytes_with_hash(raw_blob.content(), raw_hash)
672 .unwrap(),
673 raw_hash
674 );
675
676 let tree = Tree::new();
678 let tree_hash = store.put_tree(&tree).unwrap();
679 assert!(ObjectStore::get_tree(&store, &tree_hash).unwrap().is_some());
680 assert!(store.has_tree(&tree_hash).unwrap());
681 assert!(store.list_trees().unwrap().contains(&tree_hash));
682 let tree2 = Tree::new();
683 let tree2_bytes = rmp_serde::to_vec_named(&tree2).unwrap();
684 assert_eq!(
685 store
686 .put_tree_serialized(&tree2_bytes, tree2.hash())
687 .unwrap(),
688 tree2.hash()
689 );
690
691 let attribution =
693 Attribution::human(Principal::new("AnyStore Test", "anystore@example.com"));
694 let state = State::new(tree_hash, vec![], attribution.clone());
695 let change_id = state.change_id;
696 store.put_state(&state).unwrap();
697 assert!(ObjectStore::get_state(&store, &change_id).unwrap().is_some());
698 assert!(store.has_state(&change_id).unwrap());
699 assert!(store.list_states().unwrap().contains(&change_id));
700 let state2 = State::new(tree2.hash(), vec![], attribution.clone());
701 let state2_bytes = rmp_serde::to_vec_named(&state2).unwrap();
702 store
703 .put_state_serialized(&state2_bytes, state2.change_id)
704 .unwrap();
705
706 let mut action = Action::new(
708 None,
709 ChangeId::generate(),
710 Operation::Snapshot,
711 "any-store action",
712 attribution,
713 );
714 let action_id = store.put_action(&mut action).unwrap();
715 assert!(store.get_action(&action_id).unwrap().is_some());
716 assert!(store.list_actions().unwrap().contains(&action_id));
717 let action_bytes = rmp_serde::to_vec_named(&action).unwrap();
718 store
719 .put_action_serialized(&action_bytes, action_id)
720 .unwrap();
721
722 let packed = Blob::from("packed-via-any-store");
724 let packed_hash = packed.hash();
725 store
726 .put_blobs_packed(vec![(packed_hash, packed.into_content())])
727 .unwrap();
728 assert!(
729 store
730 .get_pack_object(&pack::PackObjectId::Hash(packed_hash))
731 .unwrap()
732 .is_some()
733 );
734 store.pack_objects(false).unwrap();
735 store.prune_loose_objects().unwrap();
736 let _ = store.install_pack(&[], &[]);
740 let _ = store.install_pack_streaming(
741 std::path::Path::new("/nonexistent/pack"),
742 std::path::Path::new("/nonexistent/idx"),
743 );
744
745 store.begin_snapshot_write_batch().unwrap();
747 store.flush_snapshot_write_batch().unwrap();
748 store.begin_snapshot_write_batch().unwrap();
749 store.abort_snapshot_write_batch();
750
751 let redaction = b"any-store redaction bytes";
753 store
754 .put_redactions_bytes_for_blob(&blob_hash, redaction)
755 .unwrap();
756 assert!(store.has_redactions_for_blob(&blob_hash).unwrap());
757 assert_eq!(
758 store
759 .get_redactions_bytes_for_blob(&blob_hash)
760 .unwrap()
761 .as_deref(),
762 Some(redaction.as_slice())
763 );
764 assert!(
765 store
766 .list_blobs_with_redactions()
767 .unwrap()
768 .contains(&blob_hash)
769 );
770
771 let state_visibility = b"any-store state visibility bytes";
773 store
774 .put_state_visibility_bytes_for_state(&change_id, state_visibility)
775 .unwrap();
776 assert!(store.has_state_visibility_for_state(&change_id).unwrap());
777 assert_eq!(
778 store
779 .get_state_visibility_bytes_for_state(&change_id)
780 .unwrap()
781 .as_deref(),
782 Some(state_visibility.as_slice())
783 );
784 assert!(
785 store
786 .list_states_with_visibility()
787 .unwrap()
788 .contains(&change_id)
789 );
790
791 store.clear_recent_caches();
793 }
794}