use std::path::Path;
use std::sync::Arc;
use bytes::{Bytes, BytesMut};
use futures::Stream;
use crate::mesh::Mesh;
pub use net::adapter::net::dataforts::blob::transfer::{
BlobTransferEngine, TransferControl, TransferHeader,
};
pub use net::adapter::net::dataforts::blob::{
is_transfer_stream_id, next_transfer_stream_id, transfer_stream_id, BlobError, BlobRef,
MeshBlobAdapter, SUBPROTOCOL_BLOB_TRANSFER,
};
pub use net::adapter::net::dataforts::blob::{
BlobTransferClient, TransferClientError, TransferRpcError, TransferStatus,
};
pub use net::adapter::net::mesh_rpc::{ServeError, ServeHandle};
pub use net::adapter::net::dataforts::{chunk_payload, ChunkedPayload, Encoding};
pub use net::adapter::net::dataforts::store_blob_reader;
pub use net::adapter::net::dataforts::{
store_dir, DirEntry, DirManifest, DirStats, EntryKind, DEFAULT_FETCH_CONCURRENCY,
DIR_MANIFEST_VERSION,
};
pub use net::adapter::net::dataforts::DirError;
#[derive(Debug)]
pub enum TransferError {
NotFound(String),
AllPeersFailed(String),
HashMismatch {
expected: [u8; 32],
actual: [u8; 32],
},
Substrate(String),
}
impl std::fmt::Display for TransferError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NotFound(m) => write!(f, "transfer: not found: {m}"),
Self::AllPeersFailed(m) => write!(f, "transfer: all peers failed: {m}"),
Self::HashMismatch { expected, actual } => write!(
f,
"transfer: hash mismatch (expected {}, got {})",
hex32(expected),
hex32(actual)
),
Self::Substrate(m) => write!(f, "transfer: {m}"),
}
}
}
impl std::error::Error for TransferError {}
impl From<BlobError> for TransferError {
fn from(e: BlobError) -> Self {
match e {
BlobError::NotFound(m) => Self::NotFound(m),
BlobError::HashMismatch { expected, actual } => Self::HashMismatch { expected, actual },
other => Self::Substrate(other.to_string()),
}
}
}
impl From<DirError> for TransferError {
fn from(e: DirError) -> Self {
match e {
DirError::Blob(b) => b.into(),
other => Self::Substrate(other.to_string()),
}
}
}
pub fn serve_blob_transfer(mesh: &Mesh, adapter: Arc<MeshBlobAdapter>) {
mesh.node().serve_blob_transfer(adapter);
}
pub fn serve_blob_transfer_rpc(
mesh: &Mesh,
adapter: Arc<MeshBlobAdapter>,
) -> Result<ServeHandle, ServeError> {
let engine = mesh.node().serve_blob_transfer(adapter);
mesh.serve_rpc(
net::adapter::net::dataforts::blob::TRANSFER_SERVICE,
Arc::new(net::adapter::net::dataforts::blob::TransferRpcHandler::new(
engine,
)),
)
}
pub async fn fetch_blob(
mesh: &Mesh,
source: u64,
blob_ref: &BlobRef,
) -> Result<Bytes, TransferError> {
let node = mesh.node();
match blob_ref {
BlobRef::Small { hash, .. } => Ok(node.transfer_fetch_chunk(source, *hash).await?),
BlobRef::Manifest {
chunks, total_size, ..
} => {
let mut buf = BytesMut::with_capacity(*total_size as usize);
for chunk in chunks {
let bytes = node.transfer_fetch_chunk(source, chunk.hash).await?;
buf.extend_from_slice(&bytes);
}
Ok(buf.freeze())
}
BlobRef::Tree { .. } => Err(TransferError::Substrate(
"BlobRef::Tree not supported by the transport wrapper".into(),
)),
}
}
pub async fn fetch_blob_discovered(
mesh: &Mesh,
blob_ref: &BlobRef,
) -> Result<Bytes, TransferError> {
let node = mesh.node();
let discovered = |hash: [u8; 32]| async move {
node.transfer_fetch_chunk_discovered(hash)
.await
.map_err(|e| match e {
BlobError::NotFound(m) => TransferError::AllPeersFailed(m),
other => other.into(),
})
};
match blob_ref {
BlobRef::Small { hash, .. } => discovered(*hash).await,
BlobRef::Manifest {
chunks, total_size, ..
} => {
let mut buf = BytesMut::with_capacity(*total_size as usize);
for chunk in chunks {
let bytes = discovered(chunk.hash).await?;
buf.extend_from_slice(&bytes);
}
Ok(buf.freeze())
}
BlobRef::Tree { .. } => Err(TransferError::Substrate(
"BlobRef::Tree not supported by the transport wrapper".into(),
)),
}
}
pub fn fetch_blob_stream(
mesh: &Mesh,
source: u64,
blob_ref: &BlobRef,
) -> impl Stream<Item = Result<Bytes, TransferError>> {
let node = mesh.node().clone();
let items: Vec<Result<[u8; 32], TransferError>> = match blob_ref {
BlobRef::Small { hash, .. } => vec![Ok(*hash)],
BlobRef::Manifest { chunks, .. } => chunks.iter().map(|c| Ok(c.hash)).collect(),
BlobRef::Tree { .. } => vec![Err(TransferError::Substrate(
"BlobRef::Tree not supported by the transport wrapper".into(),
))],
};
futures::stream::unfold(items.into_iter(), move |mut remaining| {
let node = node.clone();
async move {
let next = remaining.next()?; let out = match next {
Ok(hash) => node
.transfer_fetch_chunk(source, hash)
.await
.map_err(TransferError::from),
Err(e) => Err(e),
};
let rest = if out.is_err() {
Vec::<Result<[u8; 32], TransferError>>::new().into_iter()
} else {
remaining
};
Some((out, rest))
}
})
}
pub async fn fetch_dir(
mesh: &Mesh,
source: u64,
manifest_ref: &BlobRef,
dest: &Path,
concurrency: usize,
) -> Result<DirStats, TransferError> {
net::adapter::net::dataforts::fetch_dir(mesh.node(), source, manifest_ref, dest, concurrency)
.await
.map_err(TransferError::from)
}
fn hex32(hash: &[u8; 32]) -> String {
use std::fmt::Write as _;
let mut s = String::with_capacity(64);
for b in hash {
let _ = write!(s, "{b:02x}");
}
s
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn blob_error_maps_to_stable_shape() {
assert!(matches!(
TransferError::from(BlobError::NotFound("x".into())),
TransferError::NotFound(_)
));
assert!(matches!(
TransferError::from(BlobError::HashMismatch {
expected: [1u8; 32],
actual: [2u8; 32],
}),
TransferError::HashMismatch { .. }
));
assert!(matches!(
TransferError::from(BlobError::Backend("boom".into())),
TransferError::Substrate(_)
));
assert!(matches!(
TransferError::from(BlobError::Cancelled),
TransferError::Substrate(_)
));
}
#[test]
fn dir_error_routes_blob_failures_through_blob_mapping() {
assert!(matches!(
TransferError::from(DirError::Blob(BlobError::NotFound("x".into()))),
TransferError::NotFound(_)
));
assert!(matches!(
TransferError::from(DirError::UnsafePath("../escape".into())),
TransferError::Substrate(_)
));
}
#[test]
fn hash_mismatch_display_renders_both_hashes() {
let e = TransferError::HashMismatch {
expected: [0xABu8; 32],
actual: [0xCDu8; 32],
};
let s = e.to_string();
assert!(s.contains(&"ab".repeat(32)));
assert!(s.contains(&"cd".repeat(32)));
}
}