Skip to main content

net/adapter/net/dataforts/blob/
adapter.rs

1//! `BlobAdapter` async trait — interface every backend (S3, IPFS,
2//! filesystem, custom) implements to plug into the substrate's
3//! blob path.
4
5use std::ops::Range;
6use std::pin::Pin;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::Stream;
11
12use super::blob_ref::{BlobRef, Encoding};
13use super::error::BlobError;
14
15/// Stream of byte chunks the substrate consumes from `fetch_stream`.
16/// Errors mid-stream surface as `Err(BlobError)`; the consumer
17/// stops on the first error and discards any prior chunks (no
18/// partial-blob verification).
19pub type BlobByteStream = Pin<Box<dyn Stream<Item = Result<Bytes, BlobError>> + Send>>;
20
21/// Operational snapshot returned by [`BlobAdapter::stat`]. Lives at
22/// the trait surface (every adapter must answer) but most fields
23/// are optional — adapters that can't cheaply observe (S3 / IPFS)
24/// fill in only what they know.
25#[derive(Clone, Debug, Default)]
26pub struct BlobStat {
27    /// Total payload size in bytes. Always known when [`BlobAdapter::stat`]
28    /// returns `Ok` — adapters that can't determine the size return
29    /// [`BlobError::NotFound`] instead.
30    pub size: u64,
31    /// Number of distinct nodes currently advertising this blob via
32    /// `causal:<hex>` capability tags. `0` for adapters that don't
33    /// participate in the substrate-side advertisement layer (FS,
34    /// S3 adapters); `Some(n)` for `MeshBlobAdapter`. Best-effort —
35    /// the count reflects the local node's view of the capability
36    /// index at the time of the call.
37    pub replicas_observed: u32,
38    /// Operator-configured replication factor for this adapter, if
39    /// any. `None` for adapters whose durability isn't governed by
40    /// the substrate (S3, IPFS — they rely on the backend's own
41    /// replication semantics); `Some(n)` for `MeshBlobAdapter`.
42    pub replica_target: Option<u8>,
43    /// Last wall-clock time (unix milliseconds) the blob was
44    /// touched (heartbeat advertisement, fetch, store). `None`
45    /// when the adapter doesn't track per-blob last-seen.
46    pub last_seen_unix_ms: Option<u64>,
47    /// Encoding of the stored content. `Some(Replicated)` for the
48    /// v0.2 path; `Some(ReedSolomon { k, m })` reserved for v0.3.
49    /// `None` for adapters that don't model encoding (FS, Noop).
50    pub encoding: Option<Encoding>,
51}
52
53/// Storage backend wrapped by the substrate's blob layer. Each
54/// adapter takes a [`BlobRef`]'s URI and serves the bytes it
55/// resolves to — the substrate handles hash verification on top.
56///
57/// `adapter_id` is the registry key (see
58/// [`super::registry::BlobAdapterRegistry`]). Distinct identities
59/// per adapter so a host can register an S3 backend at
60/// `"s3-primary"` and a fallback at `"s3-replica"` without
61/// collision.
62///
63/// The trait is `async` (async-trait crate, mirroring the rest of
64/// the cortex / net surface) so adapters can do real I/O without
65/// blocking the runtime thread. Sync backends wrap with
66/// `tokio::task::block_in_place` or `spawn_blocking`.
67#[async_trait]
68pub trait BlobAdapter: Send + Sync + 'static {
69    /// Stable identifier for this adapter instance. The registry
70    /// rejects re-registrations with the same id.
71    fn adapter_id(&self) -> &str;
72
73    /// URI schemes this adapter accepts on inbound `BlobRef`s.
74    /// The substrate's blob-dispatch layer routes by channel-
75    /// configured `blob_adapter_id`; before invoking the adapter
76    /// it checks the inbound URI's scheme against this list and
77    /// rejects with [`BlobError::UnsupportedScheme`] when the URI
78    /// scheme isn't accepted. Default returns an empty slice,
79    /// which means "accept anything" — adapters in trusted /
80    /// single-tenant deployments may leave this as-is, but
81    /// adapters that have authority over a privileged backend
82    /// (FS adapter, host-side keys, etc.) should override and
83    /// list the schemes they actually serve so a publisher with
84    /// append rights cannot dictate arbitrary URIs the adapter
85    /// then resolves.
86    fn accepted_schemes(&self) -> &[&str] {
87        &[]
88    }
89
90    /// Persist `bytes` at the URI carried in `blob_ref`. Most
91    /// adapters will derive the URI from `blob_ref.hash` (content-
92    /// addressing) and ignore the inbound URI; some (e.g.
93    /// `FileSystemAdapter`) honor it directly. The hash on
94    /// `blob_ref` is the source of truth — the substrate computes
95    /// it before calling this method.
96    async fn store(&self, blob_ref: &BlobRef, bytes: &[u8]) -> Result<(), BlobError>;
97
98    /// Fetch the full content at `blob_ref.uri`. The substrate
99    /// runs [`BlobRef::verify`] on the returned bytes; on a
100    /// mismatch the call as a whole fails with
101    /// [`BlobError::HashMismatch`].
102    ///
103    /// Returns [`Bytes`] (not `Vec<u8>`) per dataforts perf #184
104    /// so adapters with a refcount-shareable backing buffer
105    /// (`Bytes::from(vec)`, mmap region) can hand it back without
106    /// a final copy, and downstream consumers that want sub-slices
107    /// can take cheap views into the same allocation. Callers
108    /// that genuinely need an owned `Vec<u8>` can call
109    /// `.to_vec()` — they pay the copy only when they need it.
110    async fn fetch(&self, blob_ref: &BlobRef) -> Result<Bytes, BlobError>;
111
112    /// Fetch a byte range. `range.start <= range.end` and both
113    /// bounded by `blob_ref.size`; out-of-range queries surface as
114    /// [`BlobError::Backend`] from the adapter. The substrate does
115    /// NOT verify partial fetches against the full-content hash;
116    /// callers using range fetch are accepting that trade-off.
117    ///
118    /// Returns [`Bytes`] for the same reason as [`Self::fetch`].
119    async fn fetch_range(&self, blob_ref: &BlobRef, range: Range<u64>) -> Result<Bytes, BlobError>;
120
121    /// Probe for existence without fetching. Adapters that cannot
122    /// answer cheaply may emulate by `fetch` + drop; the trait
123    /// makes no efficiency promise.
124    async fn exists(&self, blob_ref: &BlobRef) -> Result<bool, BlobError>;
125
126    /// Stream the blob content as a sequence of byte chunks.
127    /// Default impl routes through [`Self::fetch`] and emits the
128    /// whole payload as a single chunk — fine for adapters that
129    /// hold blobs in RAM or pull them in one shot anyway (S3
130    /// GetObject with no Range, IPFS). Adapters with real
131    /// streaming backends (chunked HTTP, mmap'd local files,
132    /// range-fetched S3) should override to yield progressively.
133    ///
134    /// Substrate-side hash verification consumes the stream as it
135    /// arrives: hash the chunks incrementally, accumulate into a
136    /// buffer (or pipe through to the application), and reject
137    /// on completion if the BLAKE3 doesn't match.
138    ///
139    /// Multi-GB blobs that don't fit in a single buffer must use
140    /// this surface; the all-in-memory [`Self::fetch`] is
141    /// preserved for short payloads and ergonomic callers.
142    async fn fetch_stream(&self, blob_ref: &BlobRef) -> Result<BlobByteStream, BlobError> {
143        let bytes = self.fetch(blob_ref).await?;
144        let stream = futures::stream::once(async move { Ok(bytes) });
145        Ok(Box::pin(stream))
146    }
147
148    /// Store from a stream of byte chunks. Default impl drains the
149    /// stream into a `Vec<u8>` and forwards to [`Self::store`];
150    /// adapters with real streaming write paths (S3 multipart
151    /// upload, chunked filesystem write) should override.
152    ///
153    /// The implementation MUST verify the produced bytes hash to
154    /// `blob_ref.hash` before considering the store durable. The
155    /// substrate's `store` contract requires this; streaming
156    /// impls compute the hash incrementally as chunks arrive.
157    ///
158    /// `size_hint` is the caller's expected total size; adapters
159    /// may use it for pre-allocation but must not require it to
160    /// match the actual stream length.
161    async fn store_stream(
162        &self,
163        blob_ref: &BlobRef,
164        mut stream: BlobByteStream,
165        size_hint: Option<u64>,
166    ) -> Result<(), BlobError> {
167        use futures::StreamExt;
168        // Hard cap on the materialized buffer. Pre-fix the default
169        // impl trusted `size_hint` only for pre-alloc but had no
170        // accumulation bound — a producer that lied about size_hint
171        // (or omitted it) could stream until OOM. The cap matches
172        // the substrate's BLOB_REF_MAX_SIZE (16 GiB) so legitimate
173        // sender-stream loads still pass while runaway streams
174        // fail loudly.
175        const MAX_STREAM_BYTES: u64 = 16 * 1024 * 1024 * 1024;
176        // Clamp the cap to `usize::MAX` on 32-bit targets where the
177        // 16 GiB constant exceeds the usize range — without the
178        // clamp, `Vec`'s allocator would OOM-panic past `usize::MAX`
179        // bytes before the `> MAX_STREAM_BYTES` comparison ever
180        // returned the typed error. Mirror of `mesh.rs:1188-1197`'s
181        // u64→usize range guard.
182        let effective_cap = MAX_STREAM_BYTES.min(usize::MAX as u64);
183        let mut buf: Vec<u8> = match size_hint {
184            Some(n) if (n as usize) <= 16 * 1024 * 1024 => Vec::with_capacity(n as usize),
185            _ => Vec::new(),
186        };
187        while let Some(chunk) = stream.next().await {
188            let bytes = chunk?;
189            if (buf.len() as u64).saturating_add(bytes.len() as u64) > effective_cap {
190                return Err(BlobError::Backend(format!(
191                    "store_stream: accumulated {} bytes exceeds {} cap",
192                    buf.len(),
193                    effective_cap
194                )));
195            }
196            buf.extend_from_slice(&bytes);
197        }
198        self.store(blob_ref, &buf).await
199    }
200
201    /// Best-effort delete. The substrate calls this on the GC
202    /// sweep path (v0.2 [`MeshBlobAdapter`](super::MeshBlobAdapter)); external-storage
203    /// adapters (S3 / IPFS) typically defer durability decisions
204    /// to the backend's own lifecycle policies and may treat this
205    /// as a no-op.
206    ///
207    /// Default impl: returns `Ok(())` without touching the backend
208    /// (no-op delete). Override for adapters that own the blob
209    /// lifecycle.
210    ///
211    /// Manifest-variant semantics — `delete` is **surface-only**:
212    /// a [`BlobRef::Manifest`] delete removes the manifest entry
213    /// (if any) but does NOT recursively remove its chunks. Chunks
214    /// are independently reference-counted at the substrate layer
215    /// and delete on their own GC cycle. See
216    /// `DATAFORTS_BLOB_STORAGE_PLAN.md` § Q4 for the rationale.
217    async fn delete(&self, _blob_ref: &BlobRef) -> Result<(), BlobError> {
218        Ok(())
219    }
220
221    /// Hint to the adapter that `blob_ref`'s bytes will likely be
222    /// fetched soon — kick off any background pre-population
223    /// (cross-node replication, prefetch from cold storage,
224    /// warm-cache load) without blocking on completion. The
225    /// returned `Ok(())` means "the prefetch was initiated", not
226    /// "the bytes are now local".
227    ///
228    /// Default impl: no-op success. Override on adapters with a
229    /// meaningful pre-population path —
230    /// [`MeshBlobAdapter`](super::MeshBlobAdapter) opens each
231    /// constituent chunk channel against the local
232    /// [`Redex`](crate::adapter::net::redex::Redex) handle so the
233    /// per-chunk replication runtime spawns and begins syncing
234    /// from peers carrying the chunk's `causal:<hex>` tag. This is
235    /// the wiring greedy uses when its G-1 admit verdict fires
236    /// (PR-5i — actual fetch is best-effort, parallel to the
237    /// admission decision; greedy doesn't block on the prefetch).
238    ///
239    /// Errors surface back to the caller as
240    /// [`BlobError::Backend`] but are advisory — the calling
241    /// runtime typically counts and drops rather than retrying.
242    async fn prefetch(&self, _blob_ref: &BlobRef) -> Result<(), BlobError> {
243        Ok(())
244    }
245
246    /// Return an operational snapshot of the blob. Used by the
247    /// `net blob stat` CLI + the metrics exporters; surfaces size,
248    /// replica counts (where the adapter knows), encoding, etc.
249    ///
250    /// Default impl returns the `size` carried on the
251    /// [`BlobRef`] with every other field at default — adapters
252    /// that participate in the substrate's advertisement layer
253    /// (e.g. [`MeshBlobAdapter`](super::MeshBlobAdapter)) should override to fill in
254    /// `replicas_observed`, `replica_target`, `encoding`, and
255    /// `last_seen_unix_ms`. The size field comes from the
256    /// [`BlobRef`] itself, so adapters that don't track per-blob
257    /// metadata still answer this method correctly.
258    async fn stat(&self, blob_ref: &BlobRef) -> Result<BlobStat, BlobError> {
259        Ok(BlobStat {
260            size: blob_ref.size(),
261            encoding: blob_ref.encoding(),
262            ..Default::default()
263        })
264    }
265
266    /// Enumerate blob chunks the adapter has observed. Powers
267    /// the operator-facing "Blob & Artifact Explorer" surface
268    /// (`DECK_PLAN.md` § Deferred work § Blob & Artifact
269    /// Explorer) — adapters that can cheaply enumerate (Mesh,
270    /// fs) override; adapters with prohibitive enumeration
271    /// cost (S3 with millions of keys, IPFS) leave the default
272    /// "empty" so consumers don't accidentally rack up backend
273    /// charges.
274    ///
275    /// The default returns an empty vec rather than an error
276    /// because "this adapter doesn't enumerate" is a normal
277    /// answer, not a failure — the BLOBS tab simply shows no
278    /// rows for that adapter.
279    ///
280    /// Distinct from "this adapter holds nothing": consumers
281    /// that need to tell the two apart consult
282    /// [`Self::supports_list`] first. A `false` answer means
283    /// the default opt-out is in effect; a `true` answer means
284    /// the result of `list` is authoritative (`Ok([])` truly
285    /// means empty).
286    ///
287    /// Granularity is **chunk-level**, not logical-blob-level.
288    /// `MeshBlobAdapter` tracks blobs in a refcount table keyed
289    /// by content hash: a `BlobRef::Small` corresponds to one
290    /// entry, a `BlobRef::Manifest` to N entries (one per
291    /// chunk). Reconstructing logical `BlobRef`s would need a
292    /// per-store BlobRef index the substrate doesn't carry
293    /// today; that's tracked as a follow-on in
294    /// `DECK_PLAN.md` § Deferred work § Blob & Artifact
295    /// Explorer.
296    ///
297    /// `opts.prefix_hex` filters by a hex prefix of the
298    /// content hash (e.g. `Some("abcd")` returns only chunks
299    /// whose hash starts with `0xab 0xcd`). `opts.limit` caps
300    /// the result count — adapters may return fewer when
301    /// fewer match. Order is unspecified at the trait level
302    /// (`MeshBlobAdapter` sorts by `last_seen_unix_ms` desc).
303    async fn list(&self, _opts: &BlobListOptions) -> Result<Vec<BlobInventoryEntry>, BlobError> {
304        Ok(Vec::new())
305    }
306
307    /// Whether [`Self::list`] returns an authoritative enumeration.
308    ///
309    /// Defaults to `false`, matching the default `list` impl that
310    /// returns an empty vec to avoid accidental enumeration cost
311    /// on adapters with prohibitive scans (S3 with millions of
312    /// keys, IPFS). Adapters that genuinely enumerate (Mesh, fs)
313    /// override to `true` so consumers (the Deck BLOBS tab,
314    /// scripted exporters) can tell the two cases apart instead
315    /// of conflating "no rows" with "opt-out".
316    fn supports_list(&self) -> bool {
317        false
318    }
319}
320
321/// Options for [`BlobAdapter::list`]. Built to grow — additional
322/// filters (date range, encoding, refcount band) land here
323/// without changing the trait signature.
324#[derive(Clone, Debug, Default)]
325pub struct BlobListOptions {
326    /// Lowercase hex prefix matched against the content hash.
327    /// `None` matches every entry. Adapters that can't filter
328    /// on the prefix scan all and filter in-memory.
329    pub prefix_hex: Option<String>,
330    /// Cap on the returned set. `0` (the default for
331    /// `BlobListOptions::default()`) is interpreted as "no
332    /// caller cap"; consumers reading via the SDK pass a
333    /// concrete value (typically 1000–10000) to bound
334    /// memory.
335    pub limit: usize,
336}
337
338/// One row of the operator-facing blob inventory: a content
339/// hash the adapter has observed, plus the refcount-table
340/// metadata that goes with it. Chunk-level granularity per the
341/// note on [`BlobAdapter::list`].
342#[derive(Clone, Debug, PartialEq, Eq)]
343pub struct BlobInventoryEntry {
344    /// `adapter_id()` of the adapter that produced this entry.
345    /// Multi-adapter deployments surface this so the operator
346    /// can tell which backend holds the chunk; single-adapter
347    /// callers can ignore.
348    pub adapter_id: String,
349    /// 64-character lowercase hex of the blob's BLAKE3 content
350    /// hash. The canonical id at this granularity.
351    pub hash_hex: String,
352    /// Refcount the adapter tracks. `0` means quiescent and on
353    /// the GC retention clock; non-zero means at least one
354    /// source is holding a live reference.
355    pub refcount: u32,
356    /// `true` when the operator has explicitly pinned the
357    /// entry against GC (operators sometimes pin known-good
358    /// chunks during a debug session).
359    pub pinned: bool,
360    /// First wall-clock unix-ms the adapter observed this
361    /// hash (the retention floor measures from here).
362    pub first_seen_unix_ms: u64,
363    /// Most recent wall-clock unix-ms the adapter observed
364    /// this hash (any incr / decr / store).
365    pub last_seen_unix_ms: u64,
366    /// Payload size in bytes. `Some(n)` whenever the local
367    /// adapter has observed a store; `None` for hashes that
368    /// only entered the table via `incr` from a remote source
369    /// (the chunk isn't local yet — the size is the peer's to
370    /// advertise) and for adapters that don't track per-hash
371    /// size cheaply.
372    pub size_bytes: Option<u64>,
373    /// Distinct nodes observed advertising this hash via the
374    /// substrate's `causal:<hex>` capability tag. `None` for
375    /// adapters that don't participate in the advertisement
376    /// layer; mirrors [`BlobStat::replicas_observed`].
377    pub replicas_observed: Option<u32>,
378    /// Operator-configured replication factor for this
379    /// adapter. `None` for adapters whose durability isn't
380    /// governed by the substrate (S3, IPFS, FS); mirrors
381    /// [`BlobStat::replica_target`].
382    pub replica_target: Option<u32>,
383}