net/adapter/net/dataforts/blob/adapter.rs
1//! `BlobAdapter` async trait — interface every backend (S3, IPFS,
2//! filesystem, custom) implements to plug into the substrate's
3//! blob path.
4
5use std::ops::Range;
6use std::pin::Pin;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::Stream;
11
12use super::blob_ref::{BlobRef, Encoding};
13use super::error::BlobError;
14
15/// Stream of byte chunks the substrate consumes from `fetch_stream`.
16/// Errors mid-stream surface as `Err(BlobError)`; the consumer
17/// stops on the first error and discards any prior chunks (no
18/// partial-blob verification).
19pub type BlobByteStream = Pin<Box<dyn Stream<Item = Result<Bytes, BlobError>> + Send>>;
20
21/// Operational snapshot returned by [`BlobAdapter::stat`]. Lives at
22/// the trait surface (every adapter must answer) but most fields
23/// are optional — adapters that can't cheaply observe (S3 / IPFS)
24/// fill in only what they know.
25#[derive(Clone, Debug, Default)]
26pub struct BlobStat {
27 /// Total payload size in bytes. Always known when [`BlobAdapter::stat`]
28 /// returns `Ok` — adapters that can't determine the size return
29 /// [`BlobError::NotFound`] instead.
30 pub size: u64,
31 /// Number of distinct nodes currently advertising this blob via
32 /// `causal:<hex>` capability tags. `0` for adapters that don't
33 /// participate in the substrate-side advertisement layer (FS,
34 /// S3 adapters); `Some(n)` for `MeshBlobAdapter`. Best-effort —
35 /// the count reflects the local node's view of the capability
36 /// index at the time of the call.
37 pub replicas_observed: u32,
38 /// Operator-configured replication factor for this adapter, if
39 /// any. `None` for adapters whose durability isn't governed by
40 /// the substrate (S3, IPFS — they rely on the backend's own
41 /// replication semantics); `Some(n)` for `MeshBlobAdapter`.
42 pub replica_target: Option<u8>,
43 /// Last wall-clock time (unix milliseconds) the blob was
44 /// touched (heartbeat advertisement, fetch, store). `None`
45 /// when the adapter doesn't track per-blob last-seen.
46 pub last_seen_unix_ms: Option<u64>,
47 /// Encoding of the stored content. `Some(Replicated)` for the
48 /// v0.2 path; `Some(ReedSolomon { k, m })` reserved for v0.3.
49 /// `None` for adapters that don't model encoding (FS, Noop).
50 pub encoding: Option<Encoding>,
51}
52
53/// Storage backend wrapped by the substrate's blob layer. Each
54/// adapter takes a [`BlobRef`]'s URI and serves the bytes it
55/// resolves to — the substrate handles hash verification on top.
56///
57/// `adapter_id` is the registry key (see
58/// [`super::registry::BlobAdapterRegistry`]). Distinct identities
59/// per adapter so a host can register an S3 backend at
60/// `"s3-primary"` and a fallback at `"s3-replica"` without
61/// collision.
62///
63/// The trait is `async` (async-trait crate, mirroring the rest of
64/// the cortex / net surface) so adapters can do real I/O without
65/// blocking the runtime thread. Sync backends wrap with
66/// `tokio::task::block_in_place` or `spawn_blocking`.
67#[async_trait]
68pub trait BlobAdapter: Send + Sync + 'static {
69 /// Stable identifier for this adapter instance. The registry
70 /// rejects re-registrations with the same id.
71 fn adapter_id(&self) -> &str;
72
73 /// URI schemes this adapter accepts on inbound `BlobRef`s.
74 /// The substrate's blob-dispatch layer routes by channel-
75 /// configured `blob_adapter_id`; before invoking the adapter
76 /// it checks the inbound URI's scheme against this list and
77 /// rejects with [`BlobError::UnsupportedScheme`] when the URI
78 /// scheme isn't accepted. Default returns an empty slice,
79 /// which means "accept anything" — adapters in trusted /
80 /// single-tenant deployments may leave this as-is, but
81 /// adapters that have authority over a privileged backend
82 /// (FS adapter, host-side keys, etc.) should override and
83 /// list the schemes they actually serve so a publisher with
84 /// append rights cannot dictate arbitrary URIs the adapter
85 /// then resolves.
86 fn accepted_schemes(&self) -> &[&str] {
87 &[]
88 }
89
90 /// Persist `bytes` at the URI carried in `blob_ref`. Most
91 /// adapters will derive the URI from `blob_ref.hash` (content-
92 /// addressing) and ignore the inbound URI; some (e.g.
93 /// `FileSystemAdapter`) honor it directly. The hash on
94 /// `blob_ref` is the source of truth — the substrate computes
95 /// it before calling this method.
96 async fn store(&self, blob_ref: &BlobRef, bytes: &[u8]) -> Result<(), BlobError>;
97
98 /// Fetch the full content at `blob_ref.uri`. The substrate
99 /// runs [`BlobRef::verify`] on the returned bytes; on a
100 /// mismatch the call as a whole fails with
101 /// [`BlobError::HashMismatch`].
102 ///
103 /// Returns [`Bytes`] (not `Vec<u8>`) per dataforts perf #184
104 /// so adapters with a refcount-shareable backing buffer
105 /// (`Bytes::from(vec)`, mmap region) can hand it back without
106 /// a final copy, and downstream consumers that want sub-slices
107 /// can take cheap views into the same allocation. Callers
108 /// that genuinely need an owned `Vec<u8>` can call
109 /// `.to_vec()` — they pay the copy only when they need it.
110 async fn fetch(&self, blob_ref: &BlobRef) -> Result<Bytes, BlobError>;
111
112 /// Fetch a byte range. `range.start <= range.end` and both
113 /// bounded by `blob_ref.size`; out-of-range queries surface as
114 /// [`BlobError::Backend`] from the adapter. The substrate does
115 /// NOT verify partial fetches against the full-content hash;
116 /// callers using range fetch are accepting that trade-off.
117 ///
118 /// Returns [`Bytes`] for the same reason as [`Self::fetch`].
119 async fn fetch_range(&self, blob_ref: &BlobRef, range: Range<u64>) -> Result<Bytes, BlobError>;
120
121 /// Probe for existence without fetching. Adapters that cannot
122 /// answer cheaply may emulate by `fetch` + drop; the trait
123 /// makes no efficiency promise.
124 async fn exists(&self, blob_ref: &BlobRef) -> Result<bool, BlobError>;
125
126 /// Stream the blob content as a sequence of byte chunks.
127 /// Default impl routes through [`Self::fetch`] and emits the
128 /// whole payload as a single chunk — fine for adapters that
129 /// hold blobs in RAM or pull them in one shot anyway (S3
130 /// GetObject with no Range, IPFS). Adapters with real
131 /// streaming backends (chunked HTTP, mmap'd local files,
132 /// range-fetched S3) should override to yield progressively.
133 ///
134 /// Substrate-side hash verification consumes the stream as it
135 /// arrives: hash the chunks incrementally, accumulate into a
136 /// buffer (or pipe through to the application), and reject
137 /// on completion if the BLAKE3 doesn't match.
138 ///
139 /// Multi-GB blobs that don't fit in a single buffer must use
140 /// this surface; the all-in-memory [`Self::fetch`] is
141 /// preserved for short payloads and ergonomic callers.
142 async fn fetch_stream(&self, blob_ref: &BlobRef) -> Result<BlobByteStream, BlobError> {
143 let bytes = self.fetch(blob_ref).await?;
144 let stream = futures::stream::once(async move { Ok(bytes) });
145 Ok(Box::pin(stream))
146 }
147
148 /// Store from a stream of byte chunks. Default impl drains the
149 /// stream into a `Vec<u8>` and forwards to [`Self::store`];
150 /// adapters with real streaming write paths (S3 multipart
151 /// upload, chunked filesystem write) should override.
152 ///
153 /// The implementation MUST verify the produced bytes hash to
154 /// `blob_ref.hash` before considering the store durable. The
155 /// substrate's `store` contract requires this; streaming
156 /// impls compute the hash incrementally as chunks arrive.
157 ///
158 /// `size_hint` is the caller's expected total size; adapters
159 /// may use it for pre-allocation but must not require it to
160 /// match the actual stream length.
161 async fn store_stream(
162 &self,
163 blob_ref: &BlobRef,
164 mut stream: BlobByteStream,
165 size_hint: Option<u64>,
166 ) -> Result<(), BlobError> {
167 use futures::StreamExt;
168 // Hard cap on the materialized buffer. Pre-fix the default
169 // impl trusted `size_hint` only for pre-alloc but had no
170 // accumulation bound — a producer that lied about size_hint
171 // (or omitted it) could stream until OOM. The cap matches
172 // the substrate's BLOB_REF_MAX_SIZE (16 GiB) so legitimate
173 // sender-stream loads still pass while runaway streams
174 // fail loudly.
175 const MAX_STREAM_BYTES: u64 = 16 * 1024 * 1024 * 1024;
176 // Clamp the cap to `usize::MAX` on 32-bit targets where the
177 // 16 GiB constant exceeds the usize range — without the
178 // clamp, `Vec`'s allocator would OOM-panic past `usize::MAX`
179 // bytes before the `> MAX_STREAM_BYTES` comparison ever
180 // returned the typed error. Mirror of `mesh.rs:1188-1197`'s
181 // u64→usize range guard.
182 let effective_cap = MAX_STREAM_BYTES.min(usize::MAX as u64);
183 let mut buf: Vec<u8> = match size_hint {
184 Some(n) if (n as usize) <= 16 * 1024 * 1024 => Vec::with_capacity(n as usize),
185 _ => Vec::new(),
186 };
187 while let Some(chunk) = stream.next().await {
188 let bytes = chunk?;
189 if (buf.len() as u64).saturating_add(bytes.len() as u64) > effective_cap {
190 return Err(BlobError::Backend(format!(
191 "store_stream: accumulated {} bytes exceeds {} cap",
192 buf.len(),
193 effective_cap
194 )));
195 }
196 buf.extend_from_slice(&bytes);
197 }
198 self.store(blob_ref, &buf).await
199 }
200
201 /// Best-effort delete. The substrate calls this on the GC
202 /// sweep path (v0.2 [`MeshBlobAdapter`](super::MeshBlobAdapter)); external-storage
203 /// adapters (S3 / IPFS) typically defer durability decisions
204 /// to the backend's own lifecycle policies and may treat this
205 /// as a no-op.
206 ///
207 /// Default impl: returns `Ok(())` without touching the backend
208 /// (no-op delete). Override for adapters that own the blob
209 /// lifecycle.
210 ///
211 /// Manifest-variant semantics — `delete` is **surface-only**:
212 /// a [`BlobRef::Manifest`] delete removes the manifest entry
213 /// (if any) but does NOT recursively remove its chunks. Chunks
214 /// are independently reference-counted at the substrate layer
215 /// and delete on their own GC cycle. See
216 /// `DATAFORTS_BLOB_STORAGE_PLAN.md` § Q4 for the rationale.
217 async fn delete(&self, _blob_ref: &BlobRef) -> Result<(), BlobError> {
218 Ok(())
219 }
220
221 /// Hint to the adapter that `blob_ref`'s bytes will likely be
222 /// fetched soon — kick off any background pre-population
223 /// (cross-node replication, prefetch from cold storage,
224 /// warm-cache load) without blocking on completion. The
225 /// returned `Ok(())` means "the prefetch was initiated", not
226 /// "the bytes are now local".
227 ///
228 /// Default impl: no-op success. Override on adapters with a
229 /// meaningful pre-population path —
230 /// [`MeshBlobAdapter`](super::MeshBlobAdapter) opens each
231 /// constituent chunk channel against the local
232 /// [`Redex`](crate::adapter::net::redex::Redex) handle so the
233 /// per-chunk replication runtime spawns and begins syncing
234 /// from peers carrying the chunk's `causal:<hex>` tag. This is
235 /// the wiring greedy uses when its G-1 admit verdict fires
236 /// (PR-5i — actual fetch is best-effort, parallel to the
237 /// admission decision; greedy doesn't block on the prefetch).
238 ///
239 /// Errors surface back to the caller as
240 /// [`BlobError::Backend`] but are advisory — the calling
241 /// runtime typically counts and drops rather than retrying.
242 async fn prefetch(&self, _blob_ref: &BlobRef) -> Result<(), BlobError> {
243 Ok(())
244 }
245
246 /// Return an operational snapshot of the blob. Used by the
247 /// `net blob stat` CLI + the metrics exporters; surfaces size,
248 /// replica counts (where the adapter knows), encoding, etc.
249 ///
250 /// Default impl returns the `size` carried on the
251 /// [`BlobRef`] with every other field at default — adapters
252 /// that participate in the substrate's advertisement layer
253 /// (e.g. [`MeshBlobAdapter`](super::MeshBlobAdapter)) should override to fill in
254 /// `replicas_observed`, `replica_target`, `encoding`, and
255 /// `last_seen_unix_ms`. The size field comes from the
256 /// [`BlobRef`] itself, so adapters that don't track per-blob
257 /// metadata still answer this method correctly.
258 async fn stat(&self, blob_ref: &BlobRef) -> Result<BlobStat, BlobError> {
259 Ok(BlobStat {
260 size: blob_ref.size(),
261 encoding: blob_ref.encoding(),
262 ..Default::default()
263 })
264 }
265
266 /// Enumerate blob chunks the adapter has observed. Powers
267 /// the operator-facing "Blob & Artifact Explorer" surface
268 /// (`DECK_PLAN.md` § Deferred work § Blob & Artifact
269 /// Explorer) — adapters that can cheaply enumerate (Mesh,
270 /// fs) override; adapters with prohibitive enumeration
271 /// cost (S3 with millions of keys, IPFS) leave the default
272 /// "empty" so consumers don't accidentally rack up backend
273 /// charges.
274 ///
275 /// The default returns an empty vec rather than an error
276 /// because "this adapter doesn't enumerate" is a normal
277 /// answer, not a failure — the BLOBS tab simply shows no
278 /// rows for that adapter.
279 ///
280 /// Distinct from "this adapter holds nothing": consumers
281 /// that need to tell the two apart consult
282 /// [`Self::supports_list`] first. A `false` answer means
283 /// the default opt-out is in effect; a `true` answer means
284 /// the result of `list` is authoritative (`Ok([])` truly
285 /// means empty).
286 ///
287 /// Granularity is **chunk-level**, not logical-blob-level.
288 /// `MeshBlobAdapter` tracks blobs in a refcount table keyed
289 /// by content hash: a `BlobRef::Small` corresponds to one
290 /// entry, a `BlobRef::Manifest` to N entries (one per
291 /// chunk). Reconstructing logical `BlobRef`s would need a
292 /// per-store BlobRef index the substrate doesn't carry
293 /// today; that's tracked as a follow-on in
294 /// `DECK_PLAN.md` § Deferred work § Blob & Artifact
295 /// Explorer.
296 ///
297 /// `opts.prefix_hex` filters by a hex prefix of the
298 /// content hash (e.g. `Some("abcd")` returns only chunks
299 /// whose hash starts with `0xab 0xcd`). `opts.limit` caps
300 /// the result count — adapters may return fewer when
301 /// fewer match. Order is unspecified at the trait level
302 /// (`MeshBlobAdapter` sorts by `last_seen_unix_ms` desc).
303 async fn list(&self, _opts: &BlobListOptions) -> Result<Vec<BlobInventoryEntry>, BlobError> {
304 Ok(Vec::new())
305 }
306
307 /// Whether [`Self::list`] returns an authoritative enumeration.
308 ///
309 /// Defaults to `false`, matching the default `list` impl that
310 /// returns an empty vec to avoid accidental enumeration cost
311 /// on adapters with prohibitive scans (S3 with millions of
312 /// keys, IPFS). Adapters that genuinely enumerate (Mesh, fs)
313 /// override to `true` so consumers (the Deck BLOBS tab,
314 /// scripted exporters) can tell the two cases apart instead
315 /// of conflating "no rows" with "opt-out".
316 fn supports_list(&self) -> bool {
317 false
318 }
319}
320
321/// Options for [`BlobAdapter::list`]. Built to grow — additional
322/// filters (date range, encoding, refcount band) land here
323/// without changing the trait signature.
324#[derive(Clone, Debug, Default)]
325pub struct BlobListOptions {
326 /// Lowercase hex prefix matched against the content hash.
327 /// `None` matches every entry. Adapters that can't filter
328 /// on the prefix scan all and filter in-memory.
329 pub prefix_hex: Option<String>,
330 /// Cap on the returned set. `0` (the default for
331 /// `BlobListOptions::default()`) is interpreted as "no
332 /// caller cap"; consumers reading via the SDK pass a
333 /// concrete value (typically 1000–10000) to bound
334 /// memory.
335 pub limit: usize,
336}
337
338/// One row of the operator-facing blob inventory: a content
339/// hash the adapter has observed, plus the refcount-table
340/// metadata that goes with it. Chunk-level granularity per the
341/// note on [`BlobAdapter::list`].
342#[derive(Clone, Debug, PartialEq, Eq)]
343pub struct BlobInventoryEntry {
344 /// `adapter_id()` of the adapter that produced this entry.
345 /// Multi-adapter deployments surface this so the operator
346 /// can tell which backend holds the chunk; single-adapter
347 /// callers can ignore.
348 pub adapter_id: String,
349 /// 64-character lowercase hex of the blob's BLAKE3 content
350 /// hash. The canonical id at this granularity.
351 pub hash_hex: String,
352 /// Refcount the adapter tracks. `0` means quiescent and on
353 /// the GC retention clock; non-zero means at least one
354 /// source is holding a live reference.
355 pub refcount: u32,
356 /// `true` when the operator has explicitly pinned the
357 /// entry against GC (operators sometimes pin known-good
358 /// chunks during a debug session).
359 pub pinned: bool,
360 /// First wall-clock unix-ms the adapter observed this
361 /// hash (the retention floor measures from here).
362 pub first_seen_unix_ms: u64,
363 /// Most recent wall-clock unix-ms the adapter observed
364 /// this hash (any incr / decr / store).
365 pub last_seen_unix_ms: u64,
366 /// Payload size in bytes. `Some(n)` whenever the local
367 /// adapter has observed a store; `None` for hashes that
368 /// only entered the table via `incr` from a remote source
369 /// (the chunk isn't local yet — the size is the peer's to
370 /// advertise) and for adapters that don't track per-hash
371 /// size cheaply.
372 pub size_bytes: Option<u64>,
373 /// Distinct nodes observed advertising this hash via the
374 /// substrate's `causal:<hex>` capability tag. `None` for
375 /// adapters that don't participate in the advertisement
376 /// layer; mirrors [`BlobStat::replicas_observed`].
377 pub replicas_observed: Option<u32>,
378 /// Operator-configured replication factor for this
379 /// adapter. `None` for adapters whose durability isn't
380 /// governed by the substrate (S3, IPFS, FS); mirrors
381 /// [`BlobStat::replica_target`].
382 pub replica_target: Option<u32>,
383}