Skip to main content

objects/store/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Backend-neutral object storage abstractions and concrete implementations.
3
4use std::{path::PathBuf, sync::Arc};
5
6use crate::object::{Action, ActionId, Blob, ChangeId, ContentHash, State, Tree};
7
8pub mod agent_registry;
9pub mod atomic;
10pub mod compression;
11pub mod fs;
12pub mod liveness;
13#[cfg(any(test, feature = "memory-backend"))]
14pub mod memory;
15pub mod pack;
16pub mod shallow;
17pub mod store_compliance;
18
19#[cfg(feature = "s3")]
20mod s3;
21
22pub use agent_registry::{
23    ActorChainNode, AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary, ContextQueryEntry,
24    ReserveOutcome, generate_agent_id,
25};
26pub use compression::{CompressionConfig, CompressionError, compress, decompress};
27pub use fs::FsStore;
28pub use liveness::{Liveness, current_boot_id, is_owner_alive, process_alive};
29#[cfg(any(test, feature = "memory-backend"))]
30pub use memory::InMemoryStore;
31pub use pack::{PackBuilder, PackObjectId, PackReader, PackStats};
32#[cfg(feature = "s3")]
33pub use s3::{S3Store, S3StoreBuilder};
34pub use shallow::ShallowInfo;
35
36pub use crate::error::{HeddleError as StoreError, HeddleError, Result};
37
38impl From<CompressionError> for HeddleError {
39    fn from(e: CompressionError) -> Self {
40        HeddleError::Compression(e.to_string())
41    }
42}
43
44#[derive(Clone)]
45pub struct SharedStore(pub Arc<dyn ObjectStore>);
46
47impl ObjectStore for SharedStore {
48    fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>> {
49        self.0.get_blob(hash)
50    }
51    fn put_blob(&self, blob: &Blob) -> Result<ContentHash> {
52        self.0.put_blob(blob)
53    }
54    fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
55        self.0.put_blob_with_hash(blob, hash)
56    }
57    fn has_blob(&self, hash: &ContentHash) -> Result<bool> {
58        self.0.has_blob(hash)
59    }
60    fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
61        self.0.blob_size(hash)
62    }
63    fn loose_blob_path(&self, hash: &ContentHash) -> Option<PathBuf> {
64        self.0.loose_blob_path(hash)
65    }
66    fn promote_to_loose_uncompressed(&self, hash: &ContentHash) -> Result<bool> {
67        self.0.promote_to_loose_uncompressed(hash)
68    }
69    fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>> {
70        self.0.get_tree(hash)
71    }
72    fn put_tree(&self, tree: &Tree) -> Result<ContentHash> {
73        self.0.put_tree(tree)
74    }
75    fn has_tree(&self, hash: &ContentHash) -> Result<bool> {
76        self.0.has_tree(hash)
77    }
78    fn get_state(&self, id: &ChangeId) -> Result<Option<State>> {
79        self.0.get_state(id)
80    }
81    fn put_state(&self, state: &State) -> Result<()> {
82        self.0.put_state(state)
83    }
84    fn has_state(&self, id: &ChangeId) -> Result<bool> {
85        self.0.has_state(id)
86    }
87    fn list_states(&self) -> Result<Vec<ChangeId>> {
88        self.0.list_states()
89    }
90    fn get_action(&self, id: &ActionId) -> Result<Option<Action>> {
91        self.0.get_action(id)
92    }
93    fn put_action(&self, action: &mut Action) -> Result<ActionId> {
94        self.0.put_action(action)
95    }
96    fn list_actions(&self) -> Result<Vec<ActionId>> {
97        self.0.list_actions()
98    }
99    fn list_blobs(&self) -> Result<Vec<ContentHash>> {
100        self.0.list_blobs()
101    }
102    fn list_trees(&self) -> Result<Vec<ContentHash>> {
103        self.0.list_trees()
104    }
105    fn get_pack_object(
106        &self,
107        id: &pack::PackObjectId,
108    ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
109        self.0.get_pack_object(id)
110    }
111    fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
112        self.0.install_pack(pack_data, index_data)
113    }
114    fn install_pack_streaming(
115        &self,
116        pack_path: &std::path::Path,
117        index_path: &std::path::Path,
118    ) -> Result<()> {
119        self.0.install_pack_streaming(pack_path, index_path)
120    }
121    fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
122        self.0.put_blobs_packed(blobs)
123    }
124    fn begin_snapshot_write_batch(&self) -> Result<()> {
125        self.0.begin_snapshot_write_batch()
126    }
127    fn flush_snapshot_write_batch(&self) -> Result<()> {
128        self.0.flush_snapshot_write_batch()
129    }
130    fn abort_snapshot_write_batch(&self) {
131        self.0.abort_snapshot_write_batch()
132    }
133    fn has_redactions_for_blob(&self, blob: &ContentHash) -> Result<bool> {
134        self.0.has_redactions_for_blob(blob)
135    }
136    fn get_redactions_bytes_for_blob(&self, blob: &ContentHash) -> Result<Option<Vec<u8>>> {
137        self.0.get_redactions_bytes_for_blob(blob)
138    }
139    fn put_redactions_bytes_for_blob(&self, blob: &ContentHash, bytes: &[u8]) -> Result<()> {
140        self.0.put_redactions_bytes_for_blob(blob, bytes)
141    }
142    fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
143        self.0.list_blobs_with_redactions()
144    }
145}
146
147/// Trait for object storage backends.
148pub trait ObjectStore: Send + Sync {
149    fn get_blob(&self, hash: &ContentHash) -> Result<Option<Blob>>;
150    fn put_blob(&self, blob: &Blob) -> Result<ContentHash>;
151
152    /// Return the *uncompressed* byte length of the blob identified by
153    /// `hash`, or `Ok(None)` when the blob is not in the store.
154    ///
155    /// The contract is "size without paying for content": backends are
156    /// expected to honour this with a header read or index lookup
157    /// rather than a full decompression. This is the hot path for
158    /// directory listings (`ls -l` over a thread mount) where loading
159    /// every blob just to learn its size would dominate.
160    ///
161    /// The default implementation falls back to `get_blob` so backends
162    /// without a cheap size accessor still satisfy the contract; native
163    /// stores (`FsStore`, `InMemoryStore`) override this with a
164    /// header- or hashmap-only path.
165    fn blob_size(&self, hash: &ContentHash) -> Result<Option<u64>> {
166        Ok(self.get_blob(hash)?.map(|blob| blob.content().len() as u64))
167    }
168
169    /// Filesystem path of the loose blob whose on-disk bytes are
170    /// byte-identical to the blob's *uncompressed* content, suitable
171    /// for `hard_link`/`clonefile` materialization without going
172    /// through `get_blob`.
173    ///
174    /// Returns `None` when the blob is missing, is only available via
175    /// a packfile, is stored compressed (the on-disk bytes wouldn't
176    /// match what a worktree consumer needs to read), or the backend
177    /// doesn't expose stable filesystem paths (e.g. `InMemoryStore`,
178    /// `S3Store`). The default impl returns `None` so non-`FsStore`
179    /// backends silently fall through to the bytes path.
180    fn loose_blob_path(&self, _hash: &ContentHash) -> Option<PathBuf> {
181        None
182    }
183
184    /// Ensure the blob identified by `hash` is materialized as an
185    /// uncompressed loose file at the canonical loose path so that
186    /// `loose_blob_path` returns `Some(path)` on a subsequent call.
187    ///
188    /// This is the "warm canonical store" path that lets the
189    /// hardlink-first materializer keep its 5–10× wall-clock and
190    /// storage-allocation wins after `pack_objects + prune_loose_objects`
191    /// has moved everything into a packfile. Without this, the lazy
192    /// hardlink path silently degrades to `fs::write(decompressed)` on
193    /// every materialize, because `loose_blob_path` returns `None` for
194    /// pack-only and compressed-loose blobs.
195    ///
196    /// Cost-amortization: the first promotion of a blob pays
197    /// `decompress + atomic write`. Every subsequent materialize of
198    /// the same blob — into the same worktree on `goto`, or into a
199    /// sibling worktree on `delegate` — is a single `link(2)`. Net
200    /// win for any N > 1 materializations; break-even at N == 1.
201    ///
202    /// Pack invariants are preserved: this method does not remove the
203    /// pack-resident copy. The blob lives in both pack and loose-
204    /// uncompressed until the next `prune_loose_objects` cycle, at
205    /// which point the loose mirror is discarded and a future
206    /// materialize re-promotes on demand.
207    ///
208    /// Idempotent: a blob that's already loose-and-uncompressed is a
209    /// no-op fast path. A blob that's loose-but-compressed is
210    /// rewritten in place (atomically) with the uncompressed bytes.
211    /// A blob that's pack-resident is decompressed out of the pack
212    /// and written loose without touching the pack.
213    ///
214    /// Returns `Ok(true)` when the call did real work (a write
215    /// happened), `Ok(false)` when it was a no-op (blob was already
216    /// loose+uncompressed), and `Err` when the blob isn't in the
217    /// store at all. The default impl returns `Ok(false)` for
218    /// backends that don't expose loose paths (`InMemoryStore`,
219    /// `S3Store`), since the hardlink path is fundamentally
220    /// inapplicable there.
221    fn promote_to_loose_uncompressed(&self, _hash: &ContentHash) -> Result<bool> {
222        Ok(false)
223    }
224
225    fn put_blob_with_hash(&self, blob: &Blob, hash: ContentHash) -> Result<ContentHash> {
226        if blob.hash() != hash {
227            return Err(HeddleError::InvalidObject("blob hash mismatch".to_string()));
228        }
229        self.put_blob(blob)
230    }
231
232    fn has_blob(&self, hash: &ContentHash) -> Result<bool>;
233    fn get_tree(&self, hash: &ContentHash) -> Result<Option<Tree>>;
234    fn put_tree(&self, tree: &Tree) -> Result<ContentHash>;
235    fn has_tree(&self, hash: &ContentHash) -> Result<bool>;
236    fn get_state(&self, id: &ChangeId) -> Result<Option<State>>;
237    fn put_state(&self, state: &State) -> Result<()>;
238    fn has_state(&self, id: &ChangeId) -> Result<bool>;
239    fn list_states(&self) -> Result<Vec<ChangeId>>;
240    fn get_action(&self, id: &ActionId) -> Result<Option<Action>>;
241    fn put_action(&self, action: &mut Action) -> Result<ActionId>;
242    fn list_actions(&self) -> Result<Vec<ActionId>>;
243    fn list_blobs(&self) -> Result<Vec<ContentHash>>;
244    fn list_trees(&self) -> Result<Vec<ContentHash>>;
245
246    fn put_blob_bytes_with_hash(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
247        self.put_blob_with_hash(&Blob::from_slice(data), hash)
248    }
249
250    fn put_tree_serialized(&self, data: &[u8], hash: ContentHash) -> Result<ContentHash> {
251        let tree: Tree = rmp_serde::from_slice(data)?;
252        tree.validate()?;
253        if tree.hash() != hash {
254            return Err(HeddleError::Corruption {
255                expected: hash,
256                found: tree.hash(),
257            });
258        }
259        self.put_tree(&tree)
260    }
261
262    fn put_state_serialized(&self, data: &[u8], id: ChangeId) -> Result<()> {
263        let state: State = rmp_serde::from_slice(data)?;
264        if state.change_id != id {
265            return Err(HeddleError::InvalidObject(format!(
266                "state change_id mismatch: expected {}, found {}",
267                id, state.change_id
268            )));
269        }
270        self.put_state(&state)
271    }
272
273    fn put_action_serialized(&self, data: &[u8], id: ActionId) -> Result<()> {
274        let mut action: Action = rmp_serde::from_slice(data)?;
275        let found_id = action.compute_id();
276        if found_id != id {
277            return Err(HeddleError::InvalidObject(format!(
278                "action id mismatch: expected {}, found {}",
279                id, found_id
280            )));
281        }
282        let stored_id = self.put_action(&mut action)?;
283        if stored_id != id {
284            return Err(HeddleError::InvalidObject(format!(
285                "action id mismatch after write: expected {}, found {}",
286                id, stored_id
287            )));
288        }
289        Ok(())
290    }
291
292    fn get_pack_object(
293        &self,
294        id: &pack::PackObjectId,
295    ) -> Result<Option<(pack::ObjectType, Vec<u8>)>> {
296        match id {
297            pack::PackObjectId::Hash(hash) => {
298                if let Some(blob) = self.get_blob(hash)? {
299                    return Ok(Some((pack::ObjectType::Blob, blob.content().to_vec())));
300                }
301                if let Some(tree) = self.get_tree(hash)? {
302                    return Ok(Some((
303                        pack::ObjectType::Tree,
304                        rmp_serde::to_vec_named(&tree)?,
305                    )));
306                }
307                if let Some(action) = self.get_action(&ActionId::from_hash(*hash))? {
308                    return Ok(Some((
309                        pack::ObjectType::Action,
310                        rmp_serde::to_vec_named(&action)?,
311                    )));
312                }
313                Ok(None)
314            }
315            pack::PackObjectId::ChangeId(change_id) => {
316                if let Some(state) = self.get_state(change_id)? {
317                    Ok(Some((
318                        pack::ObjectType::State,
319                        rmp_serde::to_vec_named(&state)?,
320                    )))
321                } else {
322                    Ok(None)
323                }
324            }
325        }
326    }
327
328    /// Bulk-write a batch of blobs as a single durable unit. The default
329    /// implementation falls back to per-blob writes; backends that
330    /// support packfiles (i.e. `FsStore`) override this to install one
331    /// packfile + index — two fsyncs total instead of N. Used by the
332    /// snapshot hot path so writing 1000 small files takes ~one fsync,
333    /// not 1000.
334    ///
335    /// Blobs already present in the store are skipped on the way in
336    /// (the caller would otherwise duplicate them in the pack).
337    fn put_blobs_packed(&self, blobs: Vec<(ContentHash, Vec<u8>)>) -> Result<()> {
338        for (hash, data) in blobs {
339            if !self.has_blob(&hash)? {
340                self.put_blob_bytes_with_hash(&data, hash)?;
341            }
342        }
343        Ok(())
344    }
345
346    fn install_pack(&self, pack_data: &[u8], index_data: &[u8]) -> Result<Vec<pack::PackObjectId>> {
347        let reader = pack::PackReader::from_bytes(pack_data.to_vec(), index_data.to_vec())?;
348        let ids = reader.list_ids();
349        for id in &ids {
350            let Some((obj_type, data)) = reader.get_object(id)? else {
351                continue;
352            };
353            match (id, obj_type) {
354                (pack::PackObjectId::Hash(hash), pack::ObjectType::Blob) => {
355                    self.put_blob_bytes_with_hash(&data, *hash)?;
356                }
357                (pack::PackObjectId::Hash(hash), pack::ObjectType::Tree) => {
358                    self.put_tree_serialized(&data, *hash)?;
359                }
360                (pack::PackObjectId::Hash(hash), pack::ObjectType::Action) => {
361                    self.put_action_serialized(&data, ActionId::from_hash(*hash))?;
362                }
363                (pack::PackObjectId::ChangeId(change_id), pack::ObjectType::State) => {
364                    self.put_state_serialized(&data, *change_id)?;
365                }
366                _ => {
367                    return Err(HeddleError::InvalidObject(format!(
368                        "unsupported native pack object: {:?} {:?}",
369                        id, obj_type
370                    )));
371                }
372            }
373        }
374        Ok(ids)
375    }
376
377    /// Install a pack and its index from on-disk files
378    /// (typically produced by `StreamingPackBuilder`). The default
379    /// impl reads both files fully and delegates to `install_pack`,
380    /// so any backend that doesn't override this still works (at the
381    /// cost of giving back the bounded-memory promise). Real fs-
382    /// backed stores override this to `rename(2)` both files into the
383    /// pack directory without ever loading them.
384    ///
385    /// On success, the source files at `pack_path`/`index_path` may
386    /// have been moved or removed depending on the backend; callers
387    /// shouldn't continue to rely on them.
388    ///
389    /// Returns nothing — callers that need the list of installed ids
390    /// can read the freshly-installed pack via the store. Most
391    /// callers (including `Importer`) already track inserted ids
392    /// out-of-band via the sha map and don't need a return value.
393    fn install_pack_streaming(
394        &self,
395        pack_path: &std::path::Path,
396        index_path: &std::path::Path,
397    ) -> Result<()> {
398        let pack_data = std::fs::read(pack_path).map_err(StoreError::from)?;
399        let index_data = std::fs::read(index_path).map_err(StoreError::from)?;
400        self.install_pack(&pack_data, &index_data)?;
401        // Default impl: clean up the staged files. Override
402        // implementations that move/rename should not call super and
403        // should manage the file lifecycle themselves.
404        let _ = std::fs::remove_file(pack_path);
405        let _ = std::fs::remove_file(index_path);
406        Ok(())
407    }
408
409    fn pack_objects(&self, aggressive: bool) -> Result<(u64, u64)> {
410        let _ = aggressive;
411        Ok((0, 0))
412    }
413
414    fn prune_loose_objects(&self) -> Result<(u64, u64)> {
415        Ok((0, 0))
416    }
417
418    fn begin_snapshot_write_batch(&self) -> Result<()> {
419        Ok(())
420    }
421
422    fn flush_snapshot_write_batch(&self) -> Result<()> {
423        Ok(())
424    }
425
426    fn abort_snapshot_write_batch(&self) {}
427
428    /// Whether the store holds any redaction record for the given blob.
429    ///
430    /// Redactions live in a sidecar (`<heddle_dir>/redactions/`) that is
431    /// structurally outside the content-addressed object graph so GC
432    /// can't reach them. The wire layer needs a cheap probe to decide
433    /// whether to ship a redaction for a blob in the closure, so this
434    /// is a separate method rather than a `get_*` + null check.
435    ///
436    /// Default impl returns `Ok(false)` — stores that don't model
437    /// redactions silently report "no redactions," which is the
438    /// correct behaviour for purely in-memory or remote-shim stores.
439    fn has_redactions_for_blob(&self, _blob: &ContentHash) -> Result<bool> {
440        Ok(false)
441    }
442
443    /// Return the raw rmp-encoded `RedactionsBlob` bytes for the given
444    /// blob, or `Ok(None)` if no redaction record exists. The bytes
445    /// are byte-identical to what was written by `put_redactions_bytes_for_blob`
446    /// (or by `Repository::put_redaction`); this is the wire-transfer
447    /// payload, not a re-serialized view.
448    ///
449    /// Default impl returns `Ok(None)`.
450    fn get_redactions_bytes_for_blob(&self, _blob: &ContentHash) -> Result<Option<Vec<u8>>> {
451        Ok(None)
452    }
453
454    /// Persist the rmp-encoded `RedactionsBlob` bytes for the given
455    /// blob. Receiver-side replay calls this after signature
456    /// verification so the bytes land in the same sidecar that the
457    /// sender's `Repository::put_redaction` writes to.
458    ///
459    /// Default impl returns an "unsupported" error — stores that don't
460    /// model redactions (e.g. read-only shims) refuse rather than
461    /// silently dropping the record.
462    fn put_redactions_bytes_for_blob(&self, _blob: &ContentHash, _bytes: &[u8]) -> Result<()> {
463        Err(HeddleError::InvalidObject(
464            "this object store does not support persisting redactions".to_string(),
465        ))
466    }
467
468    /// List every blob that has at least one redaction record. Used by
469    /// the GC pin guard and by sync to enumerate redactions for the
470    /// state closure. Order is unspecified; callers that need stable
471    /// ordering should sort.
472    ///
473    /// Default impl returns `Ok(vec![])`.
474    fn list_blobs_with_redactions(&self) -> Result<Vec<ContentHash>> {
475        Ok(Vec::new())
476    }
477}