Skip to main content

objects/store/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Backend-neutral object storage abstractions and concrete implementations.
3
4use std::{path::PathBuf, sync::Arc};
5
6use crate::object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree};
7
8pub mod agent_registry;
9pub mod atomic;
10pub mod compression;
11pub mod fs;
12pub mod liveness;
13#[cfg(any(test, feature = "memory-backend"))]
14pub mod memory;
15pub mod pack;
16pub mod shallow;
17pub mod store_compliance;
18
19#[cfg(feature = "s3")]
20mod s3;
21
22pub use agent_registry::{
23    ActorChainNode, AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary, ContextQueryEntry,
24    ReserveOutcome, generate_agent_id,
25};
26pub use compression::{CompressionConfig, CompressionError, compress, decompress};
27pub use fs::FsStore;
28pub use liveness::{Liveness, current_boot_id, is_owner_alive, process_alive};
29#[cfg(any(test, feature = "memory-backend"))]
30pub use memory::InMemoryStore;
31pub use pack::{PackBuilder, PackObjectId, PackReader, PackStats};
32#[cfg(feature = "s3")]
33pub use s3::{S3Store, S3StoreBuilder};
34pub use shallow::ShallowInfo;
35
36pub use crate::error::{HeddleError as StoreError, HeddleError, Result};
37
38impl From<CompressionError> for HeddleError {
39    fn from(e: CompressionError) -> Self {
40        HeddleError::Compression(e.to_string())
41    }
42}
43
44#[derive(Clone)]
45pub struct SharedStore(pub Arc<dyn ObjectStore>);
46
47impl ObjectStore for SharedStore {
48    fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
49        self.0.get_blob(hash)
50    }
51    fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
52        self.0.put_blob(blob)
53    }
54    fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
55        self.0.put_blob_with_hash(blob, hash)
56    }
57    fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
58        self.0.has_blob(hash)
59    }
60    fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
61        self.0.blob_size(hash)
62    }
63    fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
64        self.0.loose_blob_path(hash)
65    }
66    fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
67        self.0.promote_to_loose_uncompressed(hash)
68    }
69    fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
70        self.0.get_tree(hash)
71    }
72    fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
73        self.0.put_tree(tree)
74    }
75    fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
76        self.0.has_tree(hash)
77    }
78    fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
79        self.0.get_state(id)
80    }
81    fn put_state(&self, state: &State) -> Result<()> {
82        self.0.put_state(state)
83    }
84    fn has_state(&self, id: &ChangeId) -> Result<bool> {
85        self.0.has_state(id)
86    }
87    fn list_states(&self) -> Result<Vec<ChangeId>> {
88        self.0.list_states()
89    }
90    fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
91        self.0.get_action(id)
92    }
93    fn put_action(&self, action: &mut Action) -> Result<ActionId> {
94        self.0.put_action(action)
95    }
96    fn list_actions(&self) -> Result<Vec<ActionId>> {
97        self.0.list_actions()
98    }
99    fn list_blobs(&self) -> Result<Vec<ContentHash>> {
100        self.0.list_blobs()
101    }
102    fn list_trees(&self) -> Result<Vec<ContentHash>> {
103        self.0.list_trees()
104    }
105    fn get_pack_object(
106        &self,
107        id: &pack::PackObjectId,
108    ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
109        self.0.get_pack_object(id)
110    }
111    fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
112        self.0.install_pack(pack_data, index_data)
113    }
114    fn install_pack_streaming(
115        &self,
116        pack_path: &std::path::Path,
117        index_path: &std::path::Path,
118    ) -> Result<()> {
119        self.0.install_pack_streaming(pack_path, index_path)
120    }
121    fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
122        self.0.put_blobs_packed(blobs)
123    }
124    fn begin_snapshot_write_batch(&self) -> Result<()> {
125        self.0.begin_snapshot_write_batch()
126    }
127    fn flush_snapshot_write_batch(&self) -> Result<()> {
128        self.0.flush_snapshot_write_batch()
129    }
130    fn abort_snapshot_write_batch(&self) {
131        self.0.abort_snapshot_write_batch()
132    }
133}
134
135/// Trait for object storage backends.
136pub trait ObjectStore: Send + Sync {
137    fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>>;
138    fn put_blob(&self, blob: &Blob) -> Result<ContentHash>;
139
140    /// Return the *uncompressed* byte length of the blob identified by
141    /// `hash`, or `Ok(None)` when the blob is not in the store.
142    ///
143    /// The contract is "size without paying for content": backends are
144    /// expected to honour this with a header read or index lookup
145    /// rather than a full decompression. This is the hot path for
146    /// directory listings (`ls -l` over a thread mount) where loading
147    /// every blob just to learn its size would dominate.
148    ///
149    /// The default implementation falls back to `get_blob` so backends
150    /// without a cheap size accessor still satisfy the contract; native
151    /// stores (`FsStore`, `InMemoryStore`) override this with a
152    /// header- or hashmap-only path.
153    fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
154        Ok(self.get_blob(hash)?.map(|blob| blob.content().len() as u64))
155    }
156
157    /// Filesystem path of the loose blob whose on-disk bytes are
158    /// byte-identical to the blob's *uncompressed* content, suitable
159    /// for `hard_link`/`clonefile` materialization without going
160    /// through `get_blob`.
161    ///
162    /// Returns `None` when the blob is missing, is only available via
163    /// a packfile, is stored compressed (the on-disk bytes wouldn't
164    /// match what a worktree consumer needs to read), or the backend
165    /// doesn't expose stable filesystem paths (e.g. `InMemoryStore`,
166    /// `S3Store`). The default impl returns `None` so non-`FsStore`
167    /// backends silently fall through to the bytes path.
168    fn loose_blob_path(&self, _hash: &ContentHash) -> Option<PathBuf> {
169        None
170    }
171
172    /// Ensure the blob identified by `hash` is materialized as an
173    /// uncompressed loose file at the canonical loose path so that
174    /// `loose_blob_path` returns `Some(path)` on a subsequent call.
175    ///
176    /// This is the "warm canonical store" path that lets the
177    /// hardlink-first materializer keep its 5–10× wall-clock and
178    /// storage-allocation wins after `pack_objects + prune_loose_objects`
179    /// has moved everything into a packfile. Without this, the lazy
180    /// hardlink path silently degrades to `fs::write(decompressed)` on
181    /// every materialize, because `loose_blob_path` returns `None` for
182    /// pack-only and compressed-loose blobs.
183    ///
184    /// Cost-amortization: the first promotion of a blob pays
185    /// `decompress + atomic write`. Every subsequent materialize of
186    /// the same blob — into the same worktree on `goto`, or into a
187    /// sibling worktree on `delegate` — is a single `link(2)`. Net
188    /// win for any N > 1 materializations; break-even at N == 1.
189    ///
190    /// Pack invariants are preserved: this method does not remove the
191    /// pack-resident copy. The blob lives in both pack and loose-
192    /// uncompressed until the next `prune_loose_objects` cycle, at
193    /// which point the loose mirror is discarded and a future
194    /// materialize re-promotes on demand.
195    ///
196    /// Idempotent: a blob that's already loose-and-uncompressed is a
197    /// no-op fast path. A blob that's loose-but-compressed is
198    /// rewritten in place (atomically) with the uncompressed bytes.
199    /// A blob that's pack-resident is decompressed out of the pack
200    /// and written loose without touching the pack.
201    ///
202    /// Returns `Ok(true)` when the call did real work (a write
203    /// happened), `Ok(false)` when it was a no-op (blob was already
204    /// loose+uncompressed), and `Err` when the blob isn't in the
205    /// store at all. The default impl returns `Ok(false)` for
206    /// backends that don't expose loose paths (`InMemoryStore`,
207    /// `S3Store`), since the hardlink path is fundamentally
208    /// inapplicable there.
209    fn promote_to_loose_uncompressed(&self, _hash: &ContentHash) -> Result<bool> {
210        Ok(false)
211    }
212
213    fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
214        if blob.hash() != hash {
215            return Err(HeddleError::InvalidObject("blob hash mismatch".to_string()));
216        }
217        self.put_blob(blob)
218    }
219
220    fn has_blob(&self, hash: &ContentHash) -> Result<bool>;
221    fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>>;
222    fn put_tree(&self, tree: &Tree) -> Result<ContentHash>;
223    fn has_tree(&self, hash: &ContentHash) -> Result<bool>;
224    fn get_state(&self, id: &ChangeId) -> Result<Option<State>>;
225    fn put_state(&self, state: &State) -> Result<()>;
226    fn has_state(&self, id: &ChangeId) -> Result<bool>;
227    fn list_states(&self) -> Result<Vec<ChangeId>>;
228    fn get_action(&self, id: &ActionId) -> Result<Option<Action>>;
229    fn put_action(&self, action: &mut Action) -> Result<ActionId>;
230    fn list_actions(&self) -> Result<Vec<ActionId>>;
231    fn list_blobs(&self) -> Result<Vec<ContentHash>>;
232    fn list_trees(&self) -> Result<Vec<ContentHash>>;
233
234    fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
235        self.put_blob_with_hash(&Blob::from_slice(data), hash)
236    }
237
238    fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
239        let tree: Tree = rmp_serde::from_slice(data)?;
240        tree.validate()?;
241        if tree.hash() != hash {
242            return Err(HeddleError::Corruption {
243                expected: hash,
244                found: tree.hash(),
245            });
246        }
247        self.put_tree(&tree)
248    }
249
250    fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
251        let state: State = rmp_serde::from_slice(data)?;
252        if state.change_id != id {
253            return Err(HeddleError::InvalidObject(format!(
254                "state change_id mismatch: expected {}, found {}",
255                id, state.change_id
256            )));
257        }
258        self.put_state(&state)
259    }
260
261    fn put_action_serialized(&self, data: &[u8], id: ActionId) -> Result<()> {
262        let mut action: Action = rmp_serde::from_slice(data)?;
263        let found_id = action.compute_id();
264        if found_id != id {
265            return Err(HeddleError::InvalidObject(format!(
266                "action id mismatch: expected {}, found {}",
267                id, found_id
268            )));
269        }
270        let stored_id = self.put_action(&mut action)?;
271        if stored_id != id {
272            return Err(HeddleError::InvalidObject(format!(
273                "action id mismatch after write: expected {}, found {}",
274                id, stored_id
275            )));
276        }
277        Ok(())
278    }
279
280    fn get_pack_object(
281        &self,
282        id: &pack::PackObjectId,
283    ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
284        match id {
285            pack::PackObjectId::Hash(hash) => {
286                if let Some(blob) = self.get_blob(hash)? {
287                    return Ok(Some((pack::ObjectType::Blob, blob.content().to_vec())));
288                }
289                if let Some(tree) = self.get_tree(hash)? {
290                    return Ok(Some((
291                        pack::ObjectType::Tree,
292                        rmp_serde::to_vec_named(&tree)?,
293                    )));
294                }
295                if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
296                    return Ok(Some((
297                        pack::ObjectType::Action,
298                        rmp_serde::to_vec_named(&action)?,
299                    )));
300                }
301                Ok(None)
302            }
303            pack::PackObjectId::ChangeId(change_id) => {
304                if let Some(state) = self.get_state(change_id)? {
305                    Ok(Some((
306                        pack::ObjectType::State,
307                        rmp_serde::to_vec_named(&state)?,
308                    )))
309                } else {
310                    Ok(None)
311                }
312            }
313        }
314    }
315
316    /// Bulk-write a batch of blobs as a single durable unit. The default
317    /// implementation falls back to per-blob writes; backends that
318    /// support packfiles (i.e. `FsStore`) override this to install one
319    /// packfile + index — two fsyncs total instead of N. Used by the
320    /// snapshot hot path so writing 1000 small files takes ~one fsync,
321    /// not 1000.
322    ///
323    /// Blobs already present in the store are skipped on the way in
324    /// (the caller would otherwise duplicate them in the pack).
325    fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
326        for (hash, data) in blobs {
327            if !self.has_blob(&hash)? {
328                self.put_blob_bytes_with_hash(&data, hash)?;
329            }
330        }
331        Ok(())
332    }
333
334    fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
335        let reader = pack::PackReader::from_bytes(pack_data.to_vec(), index_data.to_vec())?;
336        let ids = reader.list_ids();
337        for id in &ids {
338            let Some((obj_type, data)) = reader.get_object(id)? else {
339                continue;
340            };
341            match (id, obj_type) {
342                (pack::PackObjectId::Hash(hash), pack::ObjectType::Blob) => {
343                    self.put_blob_bytes_with_hash(&data, *hash)?;
344                }
345                (pack::PackObjectId::Hash(hash), pack::ObjectType::Tree) => {
346                    self.put_tree_serialized(&data, *hash)?;
347                }
348                (pack::PackObjectId::Hash(hash), pack::ObjectType::Action) => {
349                    self.put_action_serialized(&data, ActionId::from_hash(*hash))?;
350                }
351                (pack::PackObjectId::ChangeId(change_id), pack::ObjectType::State) => {
352                    self.put_state_serialized(&data, *change_id)?;
353                }
354                _ => {
355                    return Err(HeddleError::InvalidObject(format!(
356                        "unsupported native pack object: {:?} {:?}",
357                        id, obj_type
358                    )));
359                }
360            }
361        }
362        Ok(ids)
363    }
364
365    /// Install a pack and its index from on-disk files
366    /// (typically produced by `StreamingPackBuilder`). The default
367    /// impl reads both files fully and delegates to `install_pack`,
368    /// so any backend that doesn't override this still works (at the
369    /// cost of giving back the bounded-memory promise). Real fs-
370    /// backed stores override this to `rename(2)` both files into the
371    /// pack directory without ever loading them.
372    ///
373    /// On success, the source files at `pack_path`/`index_path` may
374    /// have been moved or removed depending on the backend; callers
375    /// shouldn't continue to rely on them.
376    ///
377    /// Returns nothing — callers that need the list of installed ids
378    /// can read the freshly-installed pack via the store. Most
379    /// callers (including `Importer`) already track inserted ids
380    /// out-of-band via the sha map and don't need a return value.
381    fn install_pack_streaming(
382        &self,
383        pack_path: &std::path::Path,
384        index_path: &std::path::Path,
385    ) -> Result<()> {
386        let pack_data = std::fs::read(pack_path).map_err(StoreError::from)?;
387        let index_data = std::fs::read(index_path).map_err(StoreError::from)?;
388        self.install_pack(&pack_data, &index_data)?;
389        // Default impl: clean up the staged files. Override
390        // implementations that move/rename should not call super and
391        // should manage the file lifecycle themselves.
392        let _ = std::fs::remove_file(pack_path);
393        let _ = std::fs::remove_file(index_path);
394        Ok(())
395    }
396
397    fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
398        let _ = aggressive;
399        Ok((0, 0))
400    }
401
402    fn prune_loose_objects(&self) -> Result<(u64, u64)> {
403        Ok((0, 0))
404    }
405
406    fn begin_snapshot_write_batch(&self) -> Result<()> {
407        Ok(())
408    }
409
410    fn flush_snapshot_write_batch(&self) -> Result<()> {
411        Ok(())
412    }
413
414    fn abort_snapshot_write_batch(&self) {}
415}