1use std::{path::PathBuf, sync::Arc};
5
6use crate::object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree};
7
8pub mod agent_registry;
9pub mod atomic;
10pub mod compression;
11pub mod fs;
12pub mod liveness;
13#[cfg(any(test, feature = "memory-backend"))]
14pub mod memory;
15pub mod pack;
16pub mod shallow;
17pub mod store_compliance;
18
19#[cfg(feature = "s3")]
20mod s3;
21
22pub use agent_registry::{
23 ActorChainNode, AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary, ContextQueryEntry,
24 ReserveOutcome, generate_agent_id,
25};
26pub use compression::{CompressionConfig, CompressionError, compress, decompress};
27pub use fs::FsStore;
28pub use liveness::{Liveness, current_boot_id, is_owner_alive, process_alive};
29#[cfg(any(test, feature = "memory-backend"))]
30pub use memory::InMemoryStore;
31pub use pack::{PackBuilder, PackObjectId, PackReader, PackStats};
32#[cfg(feature = "s3")]
33pub use s3::{S3Store, S3StoreBuilder};
34pub use shallow::ShallowInfo;
35
36pub use crate::error::{HeddleError as StoreError, HeddleError, Result};
37
38impl From<CompressionError> for HeddleError {
39 fn from(e: CompressionError) -> Self {
40 HeddleError::Compression(e.to_string())
41 }
42}
43
44#[derive(Clone)]
45pub struct SharedStore(pub Arc<dyn ObjectStore>);
46
47impl ObjectStore for SharedStore {
48 fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
49 self.0.get_blob(hash)
50 }
51 fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
52 self.0.put_blob(blob)
53 }
54 fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
55 self.0.put_blob_with_hash(blob, hash)
56 }
57 fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
58 self.0.has_blob(hash)
59 }
60 fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
61 self.0.blob_size(hash)
62 }
63 fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
64 self.0.loose_blob_path(hash)
65 }
66 fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
67 self.0.promote_to_loose_uncompressed(hash)
68 }
69 fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
70 self.0.get_tree(hash)
71 }
72 fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
73 self.0.put_tree(tree)
74 }
75 fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
76 self.0.has_tree(hash)
77 }
78 fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
79 self.0.get_state(id)
80 }
81 fn put_state(&self, state: &State) -> Result<()> {
82 self.0.put_state(state)
83 }
84 fn has_state(&self, id: &ChangeId) -> Result<bool> {
85 self.0.has_state(id)
86 }
87 fn list_states(&self) -> Result<Vec<ChangeId>> {
88 self.0.list_states()
89 }
90 fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
91 self.0.get_action(id)
92 }
93 fn put_action(&self, action: &mut Action) -> Result<ActionId> {
94 self.0.put_action(action)
95 }
96 fn list_actions(&self) -> Result<Vec<ActionId>> {
97 self.0.list_actions()
98 }
99 fn list_blobs(&self) -> Result<Vec<ContentHash>> {
100 self.0.list_blobs()
101 }
102 fn list_trees(&self) -> Result<Vec<ContentHash>> {
103 self.0.list_trees()
104 }
105 fn get_pack_object(
106 &self,
107 id: &pack::PackObjectId,
108 ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
109 self.0.get_pack_object(id)
110 }
111 fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
112 self.0.install_pack(pack_data, index_data)
113 }
114 fn install_pack_streaming(
115 &self,
116 pack_path: &std::path::Path,
117 index_path: &std::path::Path,
118 ) -> Result<()> {
119 self.0.install_pack_streaming(pack_path, index_path)
120 }
121 fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
122 self.0.put_blobs_packed(blobs)
123 }
124 fn begin_snapshot_write_batch(&self) -> Result<()> {
125 self.0.begin_snapshot_write_batch()
126 }
127 fn flush_snapshot_write_batch(&self) -> Result<()> {
128 self.0.flush_snapshot_write_batch()
129 }
130 fn abort_snapshot_write_batch(&self) {
131 self.0.abort_snapshot_write_batch()
132 }
133}
134
135pub trait ObjectStore: Send + Sync {
137 fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>>;
138 fn put_blob(&self, blob: &Blob) -> Result<ContentHash>;
139
140 fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
154 Ok(self.get_blob(hash)?.map(|blob| blob.content().len() as u64))
155 }
156
157 fn loose_blob_path(&self, _hash: &ContentHash) -> Option<PathBuf> {
169 None
170 }
171
172 fn promote_to_loose_uncompressed(&self, _hash: &ContentHash) -> Result<bool> {
210 Ok(false)
211 }
212
213 fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
214 if blob.hash() != hash {
215 return Err(HeddleError::InvalidObject("blob hash mismatch".to_string()));
216 }
217 self.put_blob(blob)
218 }
219
220 fn has_blob(&self, hash: &ContentHash) -> Result<bool>;
221 fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>>;
222 fn put_tree(&self, tree: &Tree) -> Result<ContentHash>;
223 fn has_tree(&self, hash: &ContentHash) -> Result<bool>;
224 fn get_state(&self, id: &ChangeId) -> Result<Option<State>>;
225 fn put_state(&self, state: &State) -> Result<()>;
226 fn has_state(&self, id: &ChangeId) -> Result<bool>;
227 fn list_states(&self) -> Result<Vec<ChangeId>>;
228 fn get_action(&self, id: &ActionId) -> Result<Option<Action>>;
229 fn put_action(&self, action: &mut Action) -> Result<ActionId>;
230 fn list_actions(&self) -> Result<Vec<ActionId>>;
231 fn list_blobs(&self) -> Result<Vec<ContentHash>>;
232 fn list_trees(&self) -> Result<Vec<ContentHash>>;
233
234 fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
235 self.put_blob_with_hash(&Blob::from_slice(data), hash)
236 }
237
238 fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
239 let tree: Tree = rmp_serde::from_slice(data)?;
240 tree.validate()?;
241 if tree.hash() != hash {
242 return Err(HeddleError::Corruption {
243 expected: hash,
244 found: tree.hash(),
245 });
246 }
247 self.put_tree(&tree)
248 }
249
250 fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
251 let state: State = rmp_serde::from_slice(data)?;
252 if state.change_id != id {
253 return Err(HeddleError::InvalidObject(format!(
254 "state change_id mismatch: expected {}, found {}",
255 id, state.change_id
256 )));
257 }
258 self.put_state(&state)
259 }
260
261 fn put_action_serialized(&self, data: &[u8], id: ActionId) -> Result<()> {
262 let mut action: Action = rmp_serde::from_slice(data)?;
263 let found_id = action.compute_id();
264 if found_id != id {
265 return Err(HeddleError::InvalidObject(format!(
266 "action id mismatch: expected {}, found {}",
267 id, found_id
268 )));
269 }
270 let stored_id = self.put_action(&mut action)?;
271 if stored_id != id {
272 return Err(HeddleError::InvalidObject(format!(
273 "action id mismatch after write: expected {}, found {}",
274 id, stored_id
275 )));
276 }
277 Ok(())
278 }
279
280 fn get_pack_object(
281 &self,
282 id: &pack::PackObjectId,
283 ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
284 match id {
285 pack::PackObjectId::Hash(hash) => {
286 if let Some(blob) = self.get_blob(hash)? {
287 return Ok(Some((pack::ObjectType::Blob, blob.content().to_vec())));
288 }
289 if let Some(tree) = self.get_tree(hash)? {
290 return Ok(Some((
291 pack::ObjectType::Tree,
292 rmp_serde::to_vec_named(&tree)?,
293 )));
294 }
295 if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
296 return Ok(Some((
297 pack::ObjectType::Action,
298 rmp_serde::to_vec_named(&action)?,
299 )));
300 }
301 Ok(None)
302 }
303 pack::PackObjectId::ChangeId(change_id) => {
304 if let Some(state) = self.get_state(change_id)? {
305 Ok(Some((
306 pack::ObjectType::State,
307 rmp_serde::to_vec_named(&state)?,
308 )))
309 } else {
310 Ok(None)
311 }
312 }
313 }
314 }
315
316 fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
326 for (hash, data) in blobs {
327 if !self.has_blob(&hash)? {
328 self.put_blob_bytes_with_hash(&data, hash)?;
329 }
330 }
331 Ok(())
332 }
333
334 fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
335 let reader = pack::PackReader::from_bytes(pack_data.to_vec(), index_data.to_vec())?;
336 let ids = reader.list_ids();
337 for id in &ids {
338 let Some((obj_type, data)) = reader.get_object(id)? else {
339 continue;
340 };
341 match (id, obj_type) {
342 (pack::PackObjectId::Hash(hash), pack::ObjectType::Blob) => {
343 self.put_blob_bytes_with_hash(&data, *hash)?;
344 }
345 (pack::PackObjectId::Hash(hash), pack::ObjectType::Tree) => {
346 self.put_tree_serialized(&data, *hash)?;
347 }
348 (pack::PackObjectId::Hash(hash), pack::ObjectType::Action) => {
349 self.put_action_serialized(&data, ActionId::from_hash(*hash))?;
350 }
351 (pack::PackObjectId::ChangeId(change_id), pack::ObjectType::State) => {
352 self.put_state_serialized(&data, *change_id)?;
353 }
354 _ => {
355 return Err(HeddleError::InvalidObject(format!(
356 "unsupported native pack object: {:?} {:?}",
357 id, obj_type
358 )));
359 }
360 }
361 }
362 Ok(ids)
363 }
364
365 fn install_pack_streaming(
382 &self,
383 pack_path: &std::path::Path,
384 index_path: &std::path::Path,
385 ) -> Result<()> {
386 let pack_data = std::fs::read(pack_path).map_err(StoreError::from)?;
387 let index_data = std::fs::read(index_path).map_err(StoreError::from)?;
388 self.install_pack(&pack_data, &index_data)?;
389 let _ = std::fs::remove_file(pack_path);
393 let _ = std::fs::remove_file(index_path);
394 Ok(())
395 }
396
397 fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
398 let _ = aggressive;
399 Ok((0, 0))
400 }
401
402 fn prune_loose_objects(&self) -> Result<(u64, u64)> {
403 Ok((0, 0))
404 }
405
406 fn begin_snapshot_write_batch(&self) -> Result<()> {
407 Ok(())
408 }
409
410 fn flush_snapshot_write_batch(&self) -> Result<()> {
411 Ok(())
412 }
413
414 fn abort_snapshot_write_batch(&self) {}
415}