net-mesh-sdk 0.27.5

Ergonomic Rust SDK for the Net mesh network
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
//! Transport SDK surface — fairscheduler transport, blob transfer,
//! directory transfer.
//!
//! These are the on-demand, cross-peer *movement* primitives: pull a
//! content-addressed blob (or a whole directory tree) from a peer over
//! the substrate's reliable, fair-scheduled stream transport. They are
//! distinct from the two neighbouring surfaces:
//!
//! - `dataforts` exposes the *storage* + operator read side (the
//!   [`MeshBlobAdapter`] constructor, metrics, inventory).
//! - RedEX replication is a push/replication primitive; nRPC is a
//!   request/reply primitive. Transport is "fetch this exact content
//!   from that peer", multiplexed fairly against other traffic so a
//!   bulk pull can't starve interactive streams.
//!
//! The module re-exports the substrate primitives (the engine, the
//! `TransferControl` / `TransferHeader` wire types, the stream-id
//! helpers, and the directory types + [`store_dir`]) for advanced
//! callers, and adds thin ergonomic wrappers over a [`Mesh`] handle:
//! [`fetch_blob`] / [`fetch_blob_stream`] (and the holder-discovering
//! [`fetch_blob_discovered`]), [`fetch_dir`], and [`serve_blob_transfer`].
//!
//! # Serving is required to fetch
//!
//! The transfer engine is installed per node by [`serve_blob_transfer`].
//! A node must install it before it can *either* serve chunks to peers
//! *or* issue fetches — [`fetch_blob`] registers a pending transfer on
//! the local engine, so a node with no engine installed gets a
//! [`TransferError::Substrate`] error. Install once after the node is
//! built, against the same [`MeshBlobAdapter`] the node stores into.
//!
//! ```no_run
//! # use std::sync::Arc;
//! # async fn ex(mesh: &net_sdk::mesh::Mesh, adapter: Arc<net_sdk::dataforts::MeshBlobAdapter>, blob_ref: &net_sdk::transport::BlobRef, source: u64) -> Result<(), net_sdk::transport::TransferError> {
//! use net_sdk::transport;
//!
//! // Install the engine (idempotent; needed to serve AND to fetch).
//! transport::serve_blob_transfer(mesh, adapter);
//!
//! // Pull a blob from a known holder peer.
//! let bytes = transport::fetch_blob(mesh, source, blob_ref).await?;
//! # let _ = bytes;
//! # Ok(())
//! # }
//! ```

use std::path::Path;
use std::sync::Arc;

use bytes::{Bytes, BytesMut};
use futures::Stream;

use crate::mesh::Mesh;

// ── Re-exports ──────────────────────────────────────────────────────

// Blob-transfer engine + wire types, plus the stream-id convention
// helpers, for callers composing their own transfer-shaped operations
// on the fairscheduler transport.
pub use net::adapter::net::dataforts::blob::transfer::{
    BlobTransferEngine, TransferControl, TransferHeader,
};
pub use net::adapter::net::dataforts::blob::{
    is_transfer_stream_id, next_transfer_stream_id, transfer_stream_id, BlobError, BlobRef,
    MeshBlobAdapter, SUBPROTOCOL_BLOB_TRANSFER,
};

// Operator-introspection RPC over a node's transfer engine: the typed
// client + status shape behind `net transfer ls / status / cancel`, and
// the server-side error variants for matching. [`serve_blob_transfer_rpc`]
// (below) installs the matching handler.
pub use net::adapter::net::dataforts::blob::{
    BlobTransferClient, TransferClientError, TransferRpcError, TransferStatus,
};
// Returned by [`serve_blob_transfer_rpc`]; the caller holds it to keep the
// RPC registered (drop = stop answering).
pub use net::adapter::net::mesh_rpc::{ServeError, ServeHandle};

// Store-side helpers for building a content-addressed [`BlobRef`] from raw
// bytes without reaching into the substrate: [`chunk_payload`] splits a byte
// slice into the inline-or-chunked shape and [`ChunkedPayload::into_blob_ref`]
// finishes it into a [`BlobRef::Small`] / [`BlobRef::Manifest`] under an
// [`Encoding`]. These are the inverse of [`fetch_blob`] — what a publisher runs
// to learn the reference a peer will [`fetch_blob`] by. Re-exported so the
// `net transfer send-blob` CLI verb (and any SDK consumer staging content for
// fetch) doesn't reimplement chunk sizing / hashing.
pub use net::adapter::net::dataforts::{chunk_payload, ChunkedPayload, Encoding};

// Streaming store: chunk an `AsyncRead` into content-addressed chunks and
// assemble the same [`BlobRef`] `chunk_payload(...).into_blob_ref(...)`
// yields, but without buffering the whole payload — peak memory is one
// chunk. `adapter = Some` persists each chunk as it is read (the
// `send-blob --store` path); `None` computes the reference only (the dry
// path). The streaming inverse of [`fetch_blob_stream`].
pub use net::adapter::net::dataforts::store_blob_reader;

// Directory transfer: the substrate `store_dir` is usable as-is (it
// takes a `&MeshBlobAdapter`); the fetch side is wrapped below because
// the substrate function needs the internal node handle. The manifest
// types are re-exported so applications can introspect a tree before
// (or instead of) reconstructing it — the hook a future directory-sync
// composition layer builds on.
pub use net::adapter::net::dataforts::{
    store_dir, DirEntry, DirManifest, DirStats, EntryKind, DEFAULT_FETCH_CONCURRENCY,
    DIR_MANIFEST_VERSION,
};
// The substrate's directory-error type, surfaced for the `From`
// conversion into [`TransferError`] and for callers that re-export it.
pub use net::adapter::net::dataforts::DirError;

// ── Error surface ───────────────────────────────────────────────────

/// Stable SDK-facing transfer error. Translates the substrate's
/// [`BlobError`] / [`DirError`] into a small, durable shape so the SDK
/// contract doesn't churn when the substrate's internal error enums
/// grow variants.
#[derive(Debug)]
pub enum TransferError {
    /// The content was not available — a known holder didn't have it, or
    /// (for [`fetch_blob_discovered`]) the search ran out of candidates
    /// before any peer served it (see [`Self::AllPeersFailed`]).
    NotFound(String),
    /// Holder discovery exhausted every connected peer without one
    /// serving the content. Distinct from [`Self::NotFound`] so a caller
    /// can tell "this specific holder lacks it" from "nobody I'm
    /// connected to has it".
    AllPeersFailed(String),
    /// Fetched bytes did not hash to the expected content address — the
    /// substrate verifies every fetch, so this is a hard integrity
    /// failure, never silently accepted.
    HashMismatch {
        /// Hash recorded on the [`BlobRef`].
        expected: [u8; 32],
        /// Hash computed over the fetched bytes.
        actual: [u8; 32],
    },
    /// Any other substrate-level failure (engine not installed, transport
    /// error, manifest decode, unsafe path, cancellation, …). The string
    /// is the underlying error's `Display`.
    Substrate(String),
}

impl std::fmt::Display for TransferError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::NotFound(m) => write!(f, "transfer: not found: {m}"),
            Self::AllPeersFailed(m) => write!(f, "transfer: all peers failed: {m}"),
            Self::HashMismatch { expected, actual } => write!(
                f,
                "transfer: hash mismatch (expected {}, got {})",
                hex32(expected),
                hex32(actual)
            ),
            Self::Substrate(m) => write!(f, "transfer: {m}"),
        }
    }
}

impl std::error::Error for TransferError {}

impl From<BlobError> for TransferError {
    fn from(e: BlobError) -> Self {
        match e {
            BlobError::NotFound(m) => Self::NotFound(m),
            BlobError::HashMismatch { expected, actual } => Self::HashMismatch { expected, actual },
            other => Self::Substrate(other.to_string()),
        }
    }
}

impl From<DirError> for TransferError {
    fn from(e: DirError) -> Self {
        // Map the blob-carrying directory error through the BlobError
        // mapping so a directory fetch reports the same NotFound /
        // HashMismatch shape as a bare blob fetch; everything else
        // (unsafe path, manifest decode, io) is opaque substrate detail.
        match e {
            DirError::Blob(b) => b.into(),
            other => Self::Substrate(other.to_string()),
        }
    }
}

// ── Serving ─────────────────────────────────────────────────────────

/// Install the blob-transfer engine on `mesh` over `adapter`. Required
/// before the node can serve chunk fetches to peers **or** issue its own
/// fetches ([`fetch_blob`] registers state on the local engine).
/// Idempotent — re-installing replaces the engine.
pub fn serve_blob_transfer(mesh: &Mesh, adapter: Arc<MeshBlobAdapter>) {
    mesh.node().serve_blob_transfer(adapter);
}

/// Like [`serve_blob_transfer`] but also registers the `blob.transfers`
/// operator-introspection RPC (list / status / cancel) over the same
/// engine, so a remote operator (`net transfer ls / status / cancel`) can
/// query and cancel this node's in-flight, requester-side transfers.
/// Returns the [`ServeHandle`]; **hold it** for as long as the RPC should
/// answer (dropping it stops the RPC; the engine itself stays installed).
pub fn serve_blob_transfer_rpc(
    mesh: &Mesh,
    adapter: Arc<MeshBlobAdapter>,
) -> Result<ServeHandle, ServeError> {
    // Install the engine (and hold its handle so the RPC reports on the
    // exact registry doing the fetches), then serve the handler through
    // `Mesh::serve_rpc` — which auto-registers the `blob.transfers` request
    // channel + reply prefix in the SDK's ChannelConfigRegistry, so a node
    // with a restrictive registry (the MeshBuilder default) still admits
    // the RPC's channel membership. Serving via `MeshNode::serve_rpc`
    // directly would skip that registration and fail with UnknownChannel.
    let engine = mesh.node().serve_blob_transfer(adapter);
    mesh.serve_rpc(
        net::adapter::net::dataforts::blob::TRANSFER_SERVICE,
        Arc::new(net::adapter::net::dataforts::blob::TransferRpcHandler::new(
            engine,
        )),
    )
}

// ── Blob fetch ──────────────────────────────────────────────────────

/// Fetch a whole blob (every chunk of a [`BlobRef`]) from the known
/// holder `source`, returning the reassembled, BLAKE3-verified bytes.
///
/// A [`BlobRef::Small`] is one chunk; a [`BlobRef::Manifest`] is its
/// ordered chunk list, concatenated in manifest order. [`BlobRef::Tree`]
/// is not supported by this wrapper (use the substrate tree walk).
///
/// Each chunk is fetched over the reliable, fair-scheduled stream
/// transport; verification is enforced by the substrate, so a hash
/// disagreement surfaces as [`TransferError::HashMismatch`] rather than
/// returning suspect bytes.
pub async fn fetch_blob(
    mesh: &Mesh,
    source: u64,
    blob_ref: &BlobRef,
) -> Result<Bytes, TransferError> {
    let node = mesh.node();
    match blob_ref {
        BlobRef::Small { hash, .. } => Ok(node.transfer_fetch_chunk(source, *hash).await?),
        BlobRef::Manifest {
            chunks, total_size, ..
        } => {
            let mut buf = BytesMut::with_capacity(*total_size as usize);
            for chunk in chunks {
                let bytes = node.transfer_fetch_chunk(source, chunk.hash).await?;
                buf.extend_from_slice(&bytes);
            }
            Ok(buf.freeze())
        }
        BlobRef::Tree { .. } => Err(TransferError::Substrate(
            "BlobRef::Tree not supported by the transport wrapper".into(),
        )),
    }
}

/// Like [`fetch_blob`], but discovers the holder among connected peers
/// instead of the caller naming one. Probes peers in turn; the first to
/// serve the verified bytes wins. Returns [`TransferError::AllPeersFailed`]
/// if no connected peer has the content.
///
/// Per-chunk discovery is more expensive than a known-source fetch
/// (each chunk re-probes), so prefer [`fetch_blob`] when the holder is
/// known (e.g. directory transfer pulls from a single source).
pub async fn fetch_blob_discovered(
    mesh: &Mesh,
    blob_ref: &BlobRef,
) -> Result<Bytes, TransferError> {
    let node = mesh.node();
    let discovered = |hash: [u8; 32]| async move {
        node.transfer_fetch_chunk_discovered(hash)
            .await
            .map_err(|e| match e {
                // Discovery returns NotFound when no peer served it;
                // re-tag as AllPeersFailed so the caller can distinguish.
                BlobError::NotFound(m) => TransferError::AllPeersFailed(m),
                other => other.into(),
            })
    };
    match blob_ref {
        BlobRef::Small { hash, .. } => discovered(*hash).await,
        BlobRef::Manifest {
            chunks, total_size, ..
        } => {
            let mut buf = BytesMut::with_capacity(*total_size as usize);
            for chunk in chunks {
                let bytes = discovered(chunk.hash).await?;
                buf.extend_from_slice(&bytes);
            }
            Ok(buf.freeze())
        }
        BlobRef::Tree { .. } => Err(TransferError::Substrate(
            "BlobRef::Tree not supported by the transport wrapper".into(),
        )),
    }
}

/// Stream a blob from `source` chunk-by-chunk: each yielded item is one
/// verified chunk's bytes in manifest order, so a large blob is consumed
/// incrementally without buffering the whole payload in memory.
///
/// A [`BlobRef::Small`] yields a single item; a [`BlobRef::Manifest`]
/// yields one item per chunk. [`BlobRef::Tree`] yields a single
/// [`TransferError::Substrate`] error item. The first error terminates
/// the stream (no further chunks are fetched).
pub fn fetch_blob_stream(
    mesh: &Mesh,
    source: u64,
    blob_ref: &BlobRef,
) -> impl Stream<Item = Result<Bytes, TransferError>> {
    let node = mesh.node().clone();
    // Resolve the ordered per-chunk hash list (or a terminal error for
    // the unsupported Tree case) eagerly, then fetch lazily as the
    // consumer polls. `unfold` carries the remaining-chunks iterator as
    // its state so a fetch only happens on demand, and so an error can
    // be surfaced AND end the stream (a `take_while` would drop the
    // error item; `then` alone would keep fetching after it).
    let items: Vec<Result<[u8; 32], TransferError>> = match blob_ref {
        BlobRef::Small { hash, .. } => vec![Ok(*hash)],
        BlobRef::Manifest { chunks, .. } => chunks.iter().map(|c| Ok(c.hash)).collect(),
        BlobRef::Tree { .. } => vec![Err(TransferError::Substrate(
            "BlobRef::Tree not supported by the transport wrapper".into(),
        ))],
    };
    futures::stream::unfold(items.into_iter(), move |mut remaining| {
        let node = node.clone();
        async move {
            let next = remaining.next()?; // exhausted → end the stream
            let out = match next {
                Ok(hash) => node
                    .transfer_fetch_chunk(source, hash)
                    .await
                    .map_err(TransferError::from),
                Err(e) => Err(e),
            };
            // First error terminates the stream after it's surfaced:
            // once a chunk fails the blob can't be completed, so drop the
            // rest and fetch nothing further.
            let rest = if out.is_err() {
                Vec::<Result<[u8; 32], TransferError>>::new().into_iter()
            } else {
                remaining
            };
            Some((out, rest))
        }
    })
}

// ── Directory fetch ─────────────────────────────────────────────────

/// Pull the directory whose manifest is `manifest_ref` from `source` and
/// reconstruct it under `dest`, returning what was written. `concurrency`
/// bounds how many leaf files race for the transport at once
/// ([`DEFAULT_FETCH_CONCURRENCY`] when `0`).
///
/// Manifest paths are validated to stay within `dest` (a hostile sender
/// cannot escape the destination root). [`store_dir`] is the matching
/// store side and is usable directly (it takes a [`MeshBlobAdapter`]).
pub async fn fetch_dir(
    mesh: &Mesh,
    source: u64,
    manifest_ref: &BlobRef,
    dest: &Path,
    concurrency: usize,
) -> Result<DirStats, TransferError> {
    net::adapter::net::dataforts::fetch_dir(mesh.node(), source, manifest_ref, dest, concurrency)
        .await
        .map_err(TransferError::from)
}

/// Lowercase-hex render of a 32-byte hash for error messages.
fn hex32(hash: &[u8; 32]) -> String {
    use std::fmt::Write as _;
    let mut s = String::with_capacity(64);
    for b in hash {
        let _ = write!(s, "{b:02x}");
    }
    s
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn blob_error_maps_to_stable_shape() {
        // NotFound and HashMismatch get dedicated variants; everything
        // else collapses to Substrate so the SDK shape is durable.
        assert!(matches!(
            TransferError::from(BlobError::NotFound("x".into())),
            TransferError::NotFound(_)
        ));
        assert!(matches!(
            TransferError::from(BlobError::HashMismatch {
                expected: [1u8; 32],
                actual: [2u8; 32],
            }),
            TransferError::HashMismatch { .. }
        ));
        assert!(matches!(
            TransferError::from(BlobError::Backend("boom".into())),
            TransferError::Substrate(_)
        ));
        assert!(matches!(
            TransferError::from(BlobError::Cancelled),
            TransferError::Substrate(_)
        ));
    }

    #[test]
    fn dir_error_routes_blob_failures_through_blob_mapping() {
        // A blob-carrying directory error reports the same NotFound shape
        // as a bare blob fetch; other directory errors stay opaque.
        assert!(matches!(
            TransferError::from(DirError::Blob(BlobError::NotFound("x".into()))),
            TransferError::NotFound(_)
        ));
        assert!(matches!(
            TransferError::from(DirError::UnsafePath("../escape".into())),
            TransferError::Substrate(_)
        ));
    }

    #[test]
    fn hash_mismatch_display_renders_both_hashes() {
        let e = TransferError::HashMismatch {
            expected: [0xABu8; 32],
            actual: [0xCDu8; 32],
        };
        let s = e.to_string();
        assert!(s.contains(&"ab".repeat(32)));
        assert!(s.contains(&"cd".repeat(32)));
    }
}