1use std::path::PathBuf;
5
6use crate::object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree};
7
8pub mod agent_registry;
9pub mod atomic;
10pub mod compression;
11pub mod fs;
12pub mod liveness;
13#[cfg(any(test, feature = "memory-backend"))]
14pub mod memory;
15pub mod pack;
16pub mod shallow;
17pub mod store_compliance;
18
19#[cfg(feature = "s3")]
20mod s3;
21
22pub use agent_registry::{
23 ActorChainNode, AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary, ContextQueryEntry,
24 ReserveOutcome, generate_agent_id,
25};
26pub use compression::{CompressionConfig, CompressionError, compress, decompress};
27pub use fs::FsStore;
28pub use liveness::{Liveness, current_boot_id, is_owner_alive, process_alive};
29#[cfg(any(test, feature = "memory-backend"))]
30pub use memory::InMemoryStore;
31pub use pack::{PackBuilder, PackObjectId, PackReader, PackStats};
32#[cfg(feature = "s3")]
33pub use s3::{S3Store, S3StoreBuilder};
34pub use shallow::ShallowInfo;
35
36pub use crate::error::{HeddleError as StoreError, HeddleError, Result};
37
38impl From<CompressionError> for HeddleError {
39 fn from(e: CompressionError) -> Self {
40 HeddleError::Compression(e.to_string())
41 }
42}
43
44pub enum AnyStore {
57 Fs(FsStore),
58 #[cfg(feature = "s3")]
59 S3(S3Store),
60}
61
62macro_rules! any_store_dispatch {
68 ($self:ident, $method:ident ( $($arg:expr),* )) => {
69 match $self {
70 AnyStore::Fs(inner) => inner.$method($($arg),*),
71 #[cfg(feature = "s3")]
72 AnyStore::S3(inner) => inner.$method($($arg),*),
73 }
74 };
75}
76
77impl ObjectStore for AnyStore {
78 fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
79 any_store_dispatch!(self, get_blob(hash))
80 }
81 fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
82 any_store_dispatch!(self, put_blob(blob))
83 }
84 fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
85 any_store_dispatch!(self, get_blob_bytes(hash))
86 }
87 fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
88 any_store_dispatch!(self, blob_size(hash))
89 }
90 fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
91 any_store_dispatch!(self, loose_blob_path(hash))
92 }
93 fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
94 any_store_dispatch!(self, promote_to_loose_uncompressed(hash))
95 }
96 fn clear_recent_caches(&self) {
97 any_store_dispatch!(self, clear_recent_caches())
98 }
99 fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
100 any_store_dispatch!(self, put_blob_with_hash(blob, hash))
101 }
102 fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
103 any_store_dispatch!(self, has_blob(hash))
104 }
105 fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
106 any_store_dispatch!(self, get_tree(hash))
107 }
108 fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
109 any_store_dispatch!(self, put_tree(tree))
110 }
111 fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
112 any_store_dispatch!(self, has_tree(hash))
113 }
114 fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
115 any_store_dispatch!(self, get_state(id))
116 }
117 fn put_state(&self, state: &State) -> Result<()> {
118 any_store_dispatch!(self, put_state(state))
119 }
120 fn has_state(&self, id: &ChangeId) -> Result<bool> {
121 any_store_dispatch!(self, has_state(id))
122 }
123 fn list_states(&self) -> Result<Vec<ChangeId>> {
124 any_store_dispatch!(self, list_states())
125 }
126 fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
127 any_store_dispatch!(self, get_action(id))
128 }
129 fn put_action(&self, action: &mut Action) -> Result<ActionId> {
130 any_store_dispatch!(self, put_action(action))
131 }
132 fn list_actions(&self) -> Result<Vec<ActionId>> {
133 any_store_dispatch!(self, list_actions())
134 }
135 fn list_blobs(&self) -> Result<Vec<ContentHash>> {
136 any_store_dispatch!(self, list_blobs())
137 }
138 fn list_trees(&self) -> Result<Vec<ContentHash>> {
139 any_store_dispatch!(self, list_trees())
140 }
141 fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
142 any_store_dispatch!(self, put_blob_bytes_with_hash(data, hash))
143 }
144 fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
145 any_store_dispatch!(self, put_tree_serialized(data, hash))
146 }
147 fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
148 any_store_dispatch!(self, put_state_serialized(data, id))
149 }
150 fn put_action_serialized(&self, data: &[u8], id: ActionId) -> Result<()> {
151 any_store_dispatch!(self, put_action_serialized(data, id))
152 }
153 fn get_pack_object(
154 &self,
155 id: &pack::PackObjectId,
156 ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
157 any_store_dispatch!(self, get_pack_object(id))
158 }
159 fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
160 any_store_dispatch!(self, put_blobs_packed(blobs))
161 }
162 fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
163 any_store_dispatch!(self, install_pack(pack_data, index_data))
164 }
165 fn install_pack_streaming(
166 &self,
167 pack_path: &std::path::Path,
168 index_path: &std::path::Path,
169 ) -> Result<Vec<pack::PackObjectId>> {
170 any_store_dispatch!(self, install_pack_streaming(pack_path, index_path))
171 }
172 fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
173 any_store_dispatch!(self, pack_objects(aggressive))
174 }
175 fn prune_loose_objects(&self) -> Result<(u64, u64)> {
176 any_store_dispatch!(self, prune_loose_objects())
177 }
178 fn begin_snapshot_write_batch(&self) -> Result<()> {
179 any_store_dispatch!(self, begin_snapshot_write_batch())
180 }
181 fn flush_snapshot_write_batch(&self) -> Result<()> {
182 any_store_dispatch!(self, flush_snapshot_write_batch())
183 }
184 fn abort_snapshot_write_batch(&self) {
185 any_store_dispatch!(self, abort_snapshot_write_batch())
186 }
187 fn has_redactions_for_blob(&self, blob: &ContentHash) -> Result<bool> {
188 any_store_dispatch!(self, has_redactions_for_blob(blob))
189 }
190 fn get_redactions_bytes_for_blob(&self, blob: &ContentHash) -> Result<Option<Vec<u8>>> {
191 any_store_dispatch!(self, get_redactions_bytes_for_blob(blob))
192 }
193 fn put_redactions_bytes_for_blob(&self, blob: &ContentHash, bytes: &[u8]) -> Result<()> {
194 any_store_dispatch!(self, put_redactions_bytes_for_blob(blob, bytes))
195 }
196 fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
197 any_store_dispatch!(self, list_blobs_with_redactions())
198 }
199 fn has_state_visibility_for_state(&self, state: &ChangeId) -> Result<bool> {
200 any_store_dispatch!(self, has_state_visibility_for_state(state))
201 }
202 fn get_state_visibility_bytes_for_state(&self, state: &ChangeId) -> Result<Option<Vec<u8>>> {
203 any_store_dispatch!(self, get_state_visibility_bytes_for_state(state))
204 }
205 fn put_state_visibility_bytes_for_state(&self, state: &ChangeId, bytes: &[u8]) -> Result<()> {
206 any_store_dispatch!(self, put_state_visibility_bytes_for_state(state, bytes))
207 }
208 fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
209 any_store_dispatch!(self, list_states_with_visibility())
210 }
211}
212
213pub trait ObjectStore: Send + Sync {
215 fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>>;
216 fn put_blob(&self, blob: &Blob) -> Result<ContentHash>;
217
218 fn get_blob_bytes(&self, hash: &ContentHash) -> Result<Option<bytes::Bytes>> {
229 Ok(self
230 .get_blob(hash)?
231 .map(|blob| bytes::Bytes::from(blob.into_content())))
232 }
233
234 fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
248 Ok(self.get_blob(hash)?.map(|blob| blob.content().len() as u64))
249 }
250
251 fn loose_blob_path(&self, _hash: &ContentHash) -> Option<PathBuf> {
263 None
264 }
265
266 fn promote_to_loose_uncompressed(&self, _hash: &ContentHash) -> Result<bool> {
304 Ok(false)
305 }
306
307 fn clear_recent_caches(&self) {}
316
317 fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
318 if blob.hash() != hash {
319 return Err(HeddleError::InvalidObject("blob hash mismatch".to_string()));
320 }
321 self.put_blob(blob)
322 }
323
324 fn has_blob(&self, hash: &ContentHash) -> Result<bool>;
325 fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>>;
326 fn put_tree(&self, tree: &Tree) -> Result<ContentHash>;
327 fn has_tree(&self, hash: &ContentHash) -> Result<bool>;
328 fn get_state(&self, id: &ChangeId) -> Result<Option<State>>;
329 fn put_state(&self, state: &State) -> Result<()>;
330 fn has_state(&self, id: &ChangeId) -> Result<bool>;
331 fn list_states(&self) -> Result<Vec<ChangeId>>;
332 fn get_action(&self, id: &ActionId) -> Result<Option<Action>>;
333 fn put_action(&self, action: &mut Action) -> Result<ActionId>;
334 fn list_actions(&self) -> Result<Vec<ActionId>>;
335 fn list_blobs(&self) -> Result<Vec<ContentHash>>;
336 fn list_trees(&self) -> Result<Vec<ContentHash>>;
337
338 fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
339 self.put_blob_with_hash(&Blob::from_slice(data), hash)
340 }
341
342 fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
343 let tree: Tree = rmp_serde::from_slice(data)?;
344 tree.validate()?;
345 if tree.hash() != hash {
346 return Err(HeddleError::Corruption {
347 expected: hash,
348 found: tree.hash(),
349 });
350 }
351 self.put_tree(&tree)
352 }
353
354 fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
355 let state: State = rmp_serde::from_slice(data)?;
356 if state.change_id != id {
357 return Err(HeddleError::InvalidObject(format!(
358 "state change_id mismatch: expected {}, found {}",
359 id, state.change_id
360 )));
361 }
362 self.put_state(&state)
363 }
364
365 fn put_action_serialized(&self, data: &[u8], id: ActionId) -> Result<()> {
366 let mut action: Action = rmp_serde::from_slice(data)?;
367 let found_id = action.compute_id();
368 if found_id != id {
369 return Err(HeddleError::InvalidObject(format!(
370 "action id mismatch: expected {}, found {}",
371 id, found_id
372 )));
373 }
374 let stored_id = self.put_action(&mut action)?;
375 if stored_id != id {
376 return Err(HeddleError::InvalidObject(format!(
377 "action id mismatch after write: expected {}, found {}",
378 id, stored_id
379 )));
380 }
381 Ok(())
382 }
383
384 fn get_pack_object(
385 &self,
386 id: &pack::PackObjectId,
387 ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
388 match id {
389 pack::PackObjectId::Hash(hash) => {
390 if let Some(blob) = self.get_blob(hash)? {
391 return Ok(Some((pack::ObjectType::Blob, blob.content().to_vec())));
392 }
393 if let Some(tree) = self.get_tree(hash)? {
394 return Ok(Some((
395 pack::ObjectType::Tree,
396 rmp_serde::to_vec_named(&tree)?,
397 )));
398 }
399 if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
400 return Ok(Some((
401 pack::ObjectType::Action,
402 rmp_serde::to_vec_named(&action)?,
403 )));
404 }
405 Ok(None)
406 }
407 pack::PackObjectId::ChangeId(change_id) => {
408 if let Some(state) = self.get_state(change_id)? {
409 Ok(Some((
410 pack::ObjectType::State,
411 rmp_serde::to_vec_named(&state)?,
412 )))
413 } else {
414 Ok(None)
415 }
416 }
417 }
418 }
419
420 fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
430 for (hash, data) in blobs {
431 if !self.has_blob(&hash)? {
432 self.put_blob_bytes_with_hash(&data, hash)?;
433 }
434 }
435 Ok(())
436 }
437
438 fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
439 let reader = pack::PackReader::from_slice(pack_data, index_data)?;
440 let ids = reader.list_ids();
441 for id in &ids {
442 let Some((obj_type, data)) = reader.get_object(id)? else {
443 continue;
444 };
445 match (id, obj_type) {
446 (pack::PackObjectId::Hash(hash), pack::ObjectType::Blob) => {
447 self.put_blob_bytes_with_hash(&data, *hash)?;
448 }
449 (pack::PackObjectId::Hash(hash), pack::ObjectType::Tree) => {
450 self.put_tree_serialized(&data, *hash)?;
451 }
452 (pack::PackObjectId::Hash(hash), pack::ObjectType::Action) => {
453 self.put_action_serialized(&data, ActionId::from_hash(*hash))?;
454 }
455 (pack::PackObjectId::ChangeId(change_id), pack::ObjectType::State) => {
456 self.put_state_serialized(&data, *change_id)?;
457 }
458 _ => {
459 return Err(HeddleError::InvalidObject(format!(
460 "unsupported native pack object: {:?} {:?}",
461 id, obj_type
462 )));
463 }
464 }
465 }
466 Ok(ids)
467 }
468
469 fn install_pack_streaming(
486 &self,
487 pack_path: &std::path::Path,
488 index_path: &std::path::Path,
489 ) -> Result<Vec<pack::PackObjectId>> {
490 let pack_data = std::fs::read(pack_path).map_err(StoreError::from)?;
491 let index_data = std::fs::read(index_path).map_err(StoreError::from)?;
492 let ids = self.install_pack(&pack_data, &index_data)?;
493 let _ = std::fs::remove_file(pack_path);
497 let _ = std::fs::remove_file(index_path);
498 Ok(ids)
499 }
500
501 fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
502 let _ = aggressive;
503 Ok((0, 0))
504 }
505
506 fn prune_loose_objects(&self) -> Result<(u64, u64)> {
507 Ok((0, 0))
508 }
509
510 fn begin_snapshot_write_batch(&self) -> Result<()> {
511 Ok(())
512 }
513
514 fn flush_snapshot_write_batch(&self) -> Result<()> {
515 Ok(())
516 }
517
518 fn abort_snapshot_write_batch(&self) {}
519
520 fn has_redactions_for_blob(&self, _blob: &ContentHash) -> Result<bool> {
532 Ok(false)
533 }
534
535 fn get_redactions_bytes_for_blob(&self, _blob: &ContentHash) -> Result<Option<Vec<u8>>> {
543 Ok(None)
544 }
545
546 fn put_redactions_bytes_for_blob(&self, _blob: &ContentHash, _bytes: &[u8]) -> Result<()> {
555 Err(HeddleError::InvalidObject(
556 "this object store does not support persisting redactions".to_string(),
557 ))
558 }
559
560 fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
567 Ok(Vec::new())
568 }
569
570 fn has_state_visibility_for_state(&self, _state: &ChangeId) -> Result<bool> {
580 Ok(false)
581 }
582
583 fn get_state_visibility_bytes_for_state(&self, _state: &ChangeId) -> Result<Option<Vec<u8>>> {
589 Ok(None)
590 }
591
592 fn put_state_visibility_bytes_for_state(&self, _state: &ChangeId, _bytes: &[u8]) -> Result<()> {
597 Err(HeddleError::InvalidObject(
598 "this object store does not support persisting state visibility".to_string(),
599 ))
600 }
601
602 fn list_states_with_visibility(&self) -> Result<Vec<ChangeId>> {
606 Ok(Vec::new())
607 }
608}
609
610#[cfg(test)]
611mod any_store_tests {
612 use tempfile::TempDir;
613
614 use super::*;
615 use crate::object::{Attribution, Operation, Principal};
616
617 fn fs_any_store() -> (TempDir, AnyStore) {
618 let temp = TempDir::new().unwrap();
619 let store = FsStore::new(temp.path().join(".heddle"));
620 store.init().unwrap();
621 (temp, AnyStore::Fs(store))
622 }
623
624 #[test]
630 fn fs_variant_dispatches_every_object_store_method() {
631 let (_temp, store) = fs_any_store();
632
633 let blob = Blob::from("any-store dispatch blob");
635 let blob_hash = store.put_blob(&blob).unwrap();
636 assert_eq!(
637 store.get_blob(&blob_hash).unwrap().unwrap().content(),
638 blob.content()
639 );
640 assert!(store.has_blob(&blob_hash).unwrap());
641 assert_eq!(
642 store.get_blob_bytes(&blob_hash).unwrap().unwrap().as_ref(),
643 blob.content()
644 );
645 assert_eq!(
646 store.blob_size(&blob_hash).unwrap().unwrap(),
647 blob.content().len() as u64
648 );
649 assert!(store.loose_blob_path(&blob_hash).is_some());
650 store.promote_to_loose_uncompressed(&blob_hash).unwrap();
651 assert!(store.list_blobs().unwrap().contains(&blob_hash));
652
653 let bytes_blob = Blob::from("put-with-hash blob");
654 let bytes_hash = bytes_blob.hash();
655 assert_eq!(
656 store.put_blob_with_hash(&bytes_blob, bytes_hash).unwrap(),
657 bytes_hash
658 );
659 let raw_blob = Blob::from("raw bytes blob");
660 let raw_hash = raw_blob.hash();
661 assert_eq!(
662 store
663 .put_blob_bytes_with_hash(raw_blob.content(), raw_hash)
664 .unwrap(),
665 raw_hash
666 );
667
668 let tree = Tree::new();
670 let tree_hash = store.put_tree(&tree).unwrap();
671 assert!(store.get_tree(&tree_hash).unwrap().is_some());
672 assert!(store.has_tree(&tree_hash).unwrap());
673 assert!(store.list_trees().unwrap().contains(&tree_hash));
674 let tree2 = Tree::new();
675 let tree2_bytes = rmp_serde::to_vec_named(&tree2).unwrap();
676 assert_eq!(
677 store
678 .put_tree_serialized(&tree2_bytes, tree2.hash())
679 .unwrap(),
680 tree2.hash()
681 );
682
683 let attribution =
685 Attribution::human(Principal::new("AnyStore Test", "anystore@example.com"));
686 let state = State::new(tree_hash, vec![], attribution.clone());
687 let change_id = state.change_id;
688 store.put_state(&state).unwrap();
689 assert!(store.get_state(&change_id).unwrap().is_some());
690 assert!(store.has_state(&change_id).unwrap());
691 assert!(store.list_states().unwrap().contains(&change_id));
692 let state2 = State::new(tree2.hash(), vec![], attribution.clone());
693 let state2_bytes = rmp_serde::to_vec_named(&state2).unwrap();
694 store
695 .put_state_serialized(&state2_bytes, state2.change_id)
696 .unwrap();
697
698 let mut action = Action::new(
700 None,
701 ChangeId::generate(),
702 Operation::Snapshot,
703 "any-store action",
704 attribution,
705 );
706 let action_id = store.put_action(&mut action).unwrap();
707 assert!(store.get_action(&action_id).unwrap().is_some());
708 assert!(store.list_actions().unwrap().contains(&action_id));
709 let action_bytes = rmp_serde::to_vec_named(&action).unwrap();
710 store
711 .put_action_serialized(&action_bytes, action_id)
712 .unwrap();
713
714 let packed = Blob::from("packed-via-any-store");
716 let packed_hash = packed.hash();
717 store
718 .put_blobs_packed(vec![(packed_hash, packed.into_content())])
719 .unwrap();
720 assert!(
721 store
722 .get_pack_object(&pack::PackObjectId::Hash(packed_hash))
723 .unwrap()
724 .is_some()
725 );
726 store.pack_objects(false).unwrap();
727 store.prune_loose_objects().unwrap();
728 let _ = store.install_pack(&[], &[]);
732 let _ = store.install_pack_streaming(
733 std::path::Path::new("/nonexistent/pack"),
734 std::path::Path::new("/nonexistent/idx"),
735 );
736
737 store.begin_snapshot_write_batch().unwrap();
739 store.flush_snapshot_write_batch().unwrap();
740 store.begin_snapshot_write_batch().unwrap();
741 store.abort_snapshot_write_batch();
742
743 let redaction = b"any-store redaction bytes";
745 store
746 .put_redactions_bytes_for_blob(&blob_hash, redaction)
747 .unwrap();
748 assert!(store.has_redactions_for_blob(&blob_hash).unwrap());
749 assert_eq!(
750 store
751 .get_redactions_bytes_for_blob(&blob_hash)
752 .unwrap()
753 .as_deref(),
754 Some(redaction.as_slice())
755 );
756 assert!(
757 store
758 .list_blobs_with_redactions()
759 .unwrap()
760 .contains(&blob_hash)
761 );
762
763 let state_visibility = b"any-store state visibility bytes";
765 store
766 .put_state_visibility_bytes_for_state(&change_id, state_visibility)
767 .unwrap();
768 assert!(store.has_state_visibility_for_state(&change_id).unwrap());
769 assert_eq!(
770 store
771 .get_state_visibility_bytes_for_state(&change_id)
772 .unwrap()
773 .as_deref(),
774 Some(state_visibility.as_slice())
775 );
776 assert!(
777 store
778 .list_states_with_visibility()
779 .unwrap()
780 .contains(&change_id)
781 );
782
783 store.clear_recent_caches();
785 }
786}