Skip to main content

snapdir_stores/
stream.rs

1//! Object/manifest-level, content-addressed streaming over a [`Store`].
2//!
3//! [`StreamStore`] is the foundation for store-to-store sync: it exposes the
4//! raw, content-addressed blob and manifest primitives an orchestrator needs to
5//! copy a snapshot directly from one store to another — through memory, with no
6//! local filesystem staging.
7//!
8//! Where [`Store`] works at the *snapshot* level (read a whole tree out with
9//! [`get_manifest`](Store::get_manifest) + [`fetch_files`](Store::fetch_files),
10//! write one in with [`push`](Store::push)), [`StreamStore`] works at the
11//! *object* level: check whether a single content object is already present
12//! ([`has_object`](StreamStore::has_object)), read one raw blob by its
13//! content-address ([`get_object`](StreamStore::get_object)), write one raw blob
14//! ([`put_object`](StreamStore::put_object)), and write the manifest object
15//! itself ([`put_manifest`](StreamStore::put_manifest)). A later orchestrator
16//! can then walk a source manifest, `get_object` each referenced blob from the
17//! source, `put_object` it into the destination (skipping any the destination
18//! already `has_object`), and finally `put_manifest` — never touching the local
19//! disk.
20//!
21//! Every read and write is BLAKE3-verified against the address it is filed
22//! under (the same integrity discipline as [`Store`]): a blob whose bytes do not
23//! hash to its checksum is rejected with [`StoreError::Integrity`] rather than
24//! returned or stored, so corruption can never silently propagate across a
25//! store-to-store copy.
26//!
27//! The sharded object/manifest keys and the manifest byte-format are reused
28//! verbatim from each backend's existing [`Store`] implementation, so a
29//! `StreamStore` round-trip is byte-for-byte interchangeable with a `push` /
30//! `fetch_files` round-trip (and with the Bash oracle's layout).
31//!
32//! Like [`Store`], the trait is **synchronous**: the network backends drive
33//! their async SDK calls on an internal runtime via `block_on`, exactly as their
34//! [`Store`] methods do. It is **not** implemented for the external-store shim
35//! ([`ExternalStore`](crate::shim::ExternalStore)), which is shell- and
36//! local-path-based and cannot stream raw object blobs.
37
38use snapdir_core::manifest::Manifest;
39use snapdir_core::store::{Store, StoreError};
40
41/// Raw, content-addressed object/manifest streaming on top of a [`Store`].
42///
43/// See the [module docs](crate::stream) for the store-to-store sync motivation
44/// and the verification invariants. The [`Store`] supertrait means every
45/// implementor also offers [`get_manifest`](Store::get_manifest),
46/// [`fetch_files`](Store::fetch_files), and [`push`](Store::push).
47pub trait StreamStore: Store {
48    /// Returns `true` if an object with this content-address already exists in
49    /// the store.
50    ///
51    /// This is the existence check a store-to-store orchestrator uses to skip
52    /// re-copying blobs the destination already holds. It does not read or
53    /// verify the object body.
54    ///
55    /// # Errors
56    ///
57    /// [`StoreError::Io`] / [`StoreError::Backend`] on transport failure.
58    fn has_object(&self, checksum: &str) -> Result<bool, StoreError>;
59
60    /// Reads the raw object blob filed under `checksum`, verifying its bytes
61    /// hash (BLAKE3) back to `checksum` before returning them.
62    ///
63    /// # Errors
64    ///
65    /// - [`StoreError::ObjectNotFound`] if no object is stored at `checksum`.
66    /// - [`StoreError::Integrity`] if the stored bytes do not hash to
67    ///   `checksum` (the blob is corrupt or tampered).
68    /// - [`StoreError::Io`] / [`StoreError::Backend`] on transport failure.
69    fn get_object(&self, checksum: &str) -> Result<Vec<u8>, StoreError>;
70
71    /// Writes a raw object blob at its content-address, verifying `bytes` hash
72    /// (BLAKE3) to `checksum` *before* storing anything.
73    ///
74    /// A mismatch stores nothing and returns an error, so a corrupt blob can
75    /// never land at a content-address it does not belong to.
76    ///
77    /// # Errors
78    ///
79    /// - [`StoreError::Integrity`] if `bytes` do not hash to `checksum`.
80    /// - [`StoreError::Io`] / [`StoreError::Backend`] on transport failure.
81    fn put_object(&self, checksum: &str, bytes: Vec<u8>) -> Result<(), StoreError>;
82
83    /// Writes the manifest object for `id`, verifying the manifest's bytes hash
84    /// back to `id` before storing it.
85    ///
86    /// This is the final step of a store-to-store copy: it is written only after
87    /// every referenced object has landed, so a manifest is never observable
88    /// before the content it references (mirroring [`push`](Store::push)).
89    ///
90    /// # Errors
91    ///
92    /// - [`StoreError::Integrity`] if the manifest does not hash to `id`.
93    /// - [`StoreError::Io`] / [`StoreError::Backend`] on transport failure.
94    fn put_manifest(&self, id: &str, manifest: &Manifest) -> Result<(), StoreError>;
95
96    /// Returns the subset of `checksums` NOT present in the store, PRESERVING
97    /// INPUT ORDER.
98    ///
99    /// This is the diff primitive behind the `snapdir objects-needed` wire
100    /// plumbing (SNAPPACK acceleration, see [`crate::pack`]): a sender offers
101    /// a snapshot's full object list and the receiver answers with exactly the
102    /// objects it still needs, so only those ride the pack stream.
103    ///
104    /// Semantics:
105    ///
106    /// - **Fail closed:** every checksum is validated against
107    ///   `^[0-9a-f]{64}$` ([`crate::pack::is_hex64`]) BEFORE the first
108    ///   existence probe; any invalid entry is a hard error and nothing is
109    ///   returned (a malformed request must never be partially answered).
110    /// - **Order-preserving, no dedup:** the returned complement keeps the
111    ///   input order, and deduplication is the CALLER's job — an absent
112    ///   checksum supplied twice is reported twice.
113    /// - The default implementation loops [`has_object`](Self::has_object)
114    ///   (one existence probe per checksum). Follow-up (not this gate): the
115    ///   S3/GCS backends should override this with their batched listing APIs
116    ///   to cut round trips — any override must preserve the exact contract
117    ///   above.
118    ///
119    /// # Errors
120    ///
121    /// - [`StoreError::Backend`] if any checksum is not 64 lowercase hex
122    ///   characters (fail closed, before any probe).
123    /// - Whatever [`has_object`](Self::has_object) surfaces on transport
124    ///   failure.
125    fn objects_needed(&self, checksums: &[String]) -> Result<Vec<String>, StoreError> {
126        // Validate EVERYTHING up front so an invalid entry can never be
127        // answered partially (fail closed).
128        for checksum in checksums {
129            if !crate::pack::is_hex64(checksum) {
130                return Err(StoreError::Backend {
131                    message: format!(
132                        "invalid object checksum {checksum:?}: expected 64 lowercase hex characters"
133                    ),
134                    source: None,
135                });
136            }
137        }
138        let mut needed = Vec::new();
139        for checksum in checksums {
140            if !self.has_object(checksum)? {
141                needed.push(checksum.clone());
142            }
143        }
144        Ok(needed)
145    }
146}