use bytes::Bytes;
use super::adapter::BlobAdapter;
use super::blob_ref::BlobRef;
use super::error::BlobError;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EventPayload<'a> {
Inline(&'a [u8]),
Blob(BlobRef),
}
pub fn classify_payload(bytes: &[u8]) -> Result<EventPayload<'_>, BlobError> {
match BlobRef::decode(bytes)? {
Some(blob) => Ok(EventPayload::Blob(blob)),
None => Ok(EventPayload::Inline(bytes)),
}
}
pub async fn resolve_payload<A: BlobAdapter + ?Sized>(
bytes: &[u8],
adapter: &A,
) -> Result<Bytes, BlobError> {
match classify_payload(bytes)? {
EventPayload::Inline(b) => Ok(Bytes::copy_from_slice(b)),
EventPayload::Blob(blob) => {
let accepted = adapter.accepted_schemes();
if !accepted.is_empty() {
let scheme = uri_scheme(blob.uri());
if !accepted.contains(&scheme) {
return Err(BlobError::UnsupportedScheme(blob.uri().to_owned()));
}
}
let fetched = adapter.fetch(&blob).await?;
if !blob.is_chunked() {
blob.verify(&fetched)?;
} else {
verify_manifest_chunks(&blob, &fetched)?;
}
Ok(fetched)
}
}
}
fn verify_manifest_chunks(
blob: &super::blob_ref::BlobRef,
fetched: &[u8],
) -> Result<(), BlobError> {
use super::blob_ref::BlobRef;
let chunks = match blob {
BlobRef::Manifest { chunks, .. } => chunks,
BlobRef::Small { .. } => return Ok(()),
BlobRef::Tree { .. } => {
return Err(BlobError::Backend(
"resolve_payload: BlobRef::Tree cannot be resolved through a flat \
fetch — route through a tree-walking fetch_range so per-chunk \
verification runs along the descent"
.to_owned(),
));
}
};
let mut offset: usize = 0;
for chunk in chunks.iter() {
let end = match offset.checked_add(chunk.size as usize) {
Some(end) if end <= fetched.len() => end,
_ => {
return Err(BlobError::Backend(format!(
"manifest chunk layout overruns fetched buffer: \
offset {offset} + size {} > len {}",
chunk.size,
fetched.len()
)));
}
};
let region = &fetched[offset..end];
let computed: [u8; 32] = blake3::hash(region).into();
if computed != chunk.hash {
return Err(BlobError::HashMismatch {
expected: chunk.hash,
actual: computed,
});
}
offset = end;
}
if offset != fetched.len() {
return Err(BlobError::Backend(format!(
"manifest reassembled length {} != sum of chunk sizes {}",
fetched.len(),
offset
)));
}
Ok(())
}
fn uri_scheme(uri: &str) -> &str {
match uri.find(':') {
Some(i) => &uri[..i],
None => "",
}
}
pub async fn publish_blob<A: BlobAdapter + ?Sized>(
adapter: &A,
uri: impl Into<String>,
bytes: &[u8],
) -> Result<Vec<u8>, BlobError> {
let blob = publish_blob_ref(adapter, uri, bytes).await?;
Ok(blob.encode())
}
pub async fn publish_blob_ref<A: BlobAdapter + ?Sized>(
adapter: &A,
uri: impl Into<String>,
bytes: &[u8],
) -> Result<BlobRef, BlobError> {
let hash: [u8; 32] = blake3::hash(bytes).into();
let blob = BlobRef::small(uri, hash, bytes.len() as u64);
adapter.store(&blob, bytes).await?;
Ok(blob)
}
#[cfg(test)]
mod tests {
use super::super::fs::FileSystemAdapter;
use super::super::noop::NoopAdapter;
use super::*;
fn fixture_blob_ref(payload: &[u8]) -> BlobRef {
BlobRef::small(
"file:///dispatch",
blake3::hash(payload).into(),
payload.len() as u64,
)
}
#[test]
fn classify_inline_when_first_byte_is_not_discriminator() {
let bytes = b"plain payload";
match classify_payload(bytes).unwrap() {
EventPayload::Inline(b) => assert_eq!(b, bytes),
other => panic!("expected Inline, got {:?}", other),
}
}
#[test]
fn classify_blob_when_first_byte_is_discriminator() {
let payload = b"out of band";
let blob = fixture_blob_ref(payload);
let encoded = blob.encode();
match classify_payload(&encoded).unwrap() {
EventPayload::Blob(decoded) => assert_eq!(decoded, blob),
other => panic!("expected Blob, got {:?}", other),
}
}
#[test]
fn classify_empty_payload_is_inline() {
let bytes: &[u8] = &[];
match classify_payload(bytes).unwrap() {
EventPayload::Inline(b) => assert!(b.is_empty()),
other => panic!("expected empty Inline, got {:?}", other),
}
}
#[tokio::test]
async fn resolve_passes_inline_through() {
let adapter = NoopAdapter::default();
let bytes = b"inline goes straight through";
let resolved = resolve_payload(bytes, &adapter).await.unwrap();
assert_eq!(resolved.as_ref(), bytes);
}
#[tokio::test]
async fn resolve_fetches_and_verifies_blob() {
let root = std::env::temp_dir().join(format!(
"net-blob-resolve-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
let adapter = FileSystemAdapter::new("resolve-test", &root);
let payload = b"this content lives out of band";
let blob = fixture_blob_ref(payload);
adapter.store(&blob, payload).await.unwrap();
let encoded = blob.encode();
let resolved = resolve_payload(&encoded, &adapter).await.unwrap();
assert_eq!(resolved.as_ref(), payload);
let _ = std::fs::remove_dir_all(&root);
}
#[tokio::test]
async fn publish_blob_round_trips_through_resolve_payload() {
let root = std::env::temp_dir().join(format!(
"net-blob-publish-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
let adapter = FileSystemAdapter::new("publish-test", &root);
let payload = b"write side equivalent of resolve_payload";
let encoded = publish_blob(&adapter, "file:///published", payload)
.await
.unwrap();
assert_eq!(
&encoded[..4],
&crate::adapter::net::dataforts::blob::BLOB_REF_MAGIC,
);
let resolved = resolve_payload(&encoded, &adapter).await.unwrap();
assert_eq!(resolved.as_ref(), payload);
let _ = std::fs::remove_dir_all(&root);
}
#[tokio::test]
async fn publish_blob_ref_returns_structured_ref() {
let root = std::env::temp_dir().join(format!(
"net-blob-publish-ref-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
let adapter = FileSystemAdapter::new("publish-ref", &root);
let payload = b"explicit ref shape";
let blob = publish_blob_ref(&adapter, "file:///structured", payload)
.await
.unwrap();
let expected: [u8; 32] = blake3::hash(payload).into();
assert_eq!(blob.small_hash(), Some(&expected));
assert_eq!(blob.size(), payload.len() as u64);
assert_eq!(blob.uri(), "file:///structured");
let fetched = adapter.fetch(&blob).await.unwrap();
assert_eq!(fetched.as_ref(), payload);
blob.verify(&fetched).unwrap();
let _ = std::fs::remove_dir_all(&root);
}
#[tokio::test]
async fn resolve_passes_chunked_manifest_through_without_top_level_verify() {
use super::super::adapter::BlobAdapter;
use super::super::blob_ref::{ChunkRef, Encoding, BLOB_CHUNK_SIZE_BYTES};
#[derive(Debug)]
struct StubManifestAdapter(Vec<u8>);
#[async_trait::async_trait]
impl BlobAdapter for StubManifestAdapter {
fn adapter_id(&self) -> &str {
"stub-manifest"
}
async fn store(&self, _: &BlobRef, _: &[u8]) -> Result<(), BlobError> {
Ok(())
}
async fn fetch(&self, _: &BlobRef) -> Result<Bytes, BlobError> {
Ok(Bytes::from(self.0.clone()))
}
async fn fetch_range(
&self,
_: &BlobRef,
range: std::ops::Range<u64>,
) -> Result<Bytes, BlobError> {
Ok(Bytes::copy_from_slice(
&self.0[range.start as usize..range.end as usize],
))
}
async fn exists(&self, _: &BlobRef) -> Result<bool, BlobError> {
Ok(true)
}
}
let payload = vec![0x5A; (BLOB_CHUNK_SIZE_BYTES as usize) + 16];
let chunk_1 = vec![0x5A; BLOB_CHUNK_SIZE_BYTES as usize];
let chunk_2 = vec![0x5A; 16];
let chunks = vec![
ChunkRef {
hash: blake3::hash(&chunk_1).into(),
size: BLOB_CHUNK_SIZE_BYTES as u32,
},
ChunkRef {
hash: blake3::hash(&chunk_2).into(),
size: 16,
},
];
let blob = BlobRef::manifest("mesh://chunked-resolve", Encoding::Replicated, chunks)
.expect("manifest construct");
let encoded = blob.encode();
let adapter = StubManifestAdapter(payload.clone());
let resolved = resolve_payload(&encoded, &adapter)
.await
.expect("resolve must accept Manifest without top-level verify");
assert_eq!(resolved.as_ref(), payload.as_slice());
}
#[tokio::test]
async fn resolve_rejects_chunked_manifest_with_tampered_chunk_bytes() {
use super::super::adapter::BlobAdapter;
use super::super::blob_ref::{ChunkRef, Encoding, BLOB_CHUNK_SIZE_BYTES};
#[derive(Debug)]
struct TamperingAdapter {
payload: Vec<u8>,
}
#[async_trait::async_trait]
impl BlobAdapter for TamperingAdapter {
fn adapter_id(&self) -> &str {
"tampering"
}
async fn store(&self, _: &BlobRef, _: &[u8]) -> Result<(), BlobError> {
Ok(())
}
async fn fetch(&self, _: &BlobRef) -> Result<Bytes, BlobError> {
Ok(Bytes::from(self.payload.clone()))
}
async fn fetch_range(
&self,
_: &BlobRef,
range: std::ops::Range<u64>,
) -> Result<Bytes, BlobError> {
Ok(Bytes::copy_from_slice(
&self.payload[range.start as usize..range.end as usize],
))
}
async fn exists(&self, _: &BlobRef) -> Result<bool, BlobError> {
Ok(true)
}
}
let legit_chunk_1 = vec![0xAA; BLOB_CHUNK_SIZE_BYTES as usize];
let legit_chunk_2 = vec![0xBB; 16];
let chunks = vec![
ChunkRef {
hash: blake3::hash(&legit_chunk_1).into(),
size: BLOB_CHUNK_SIZE_BYTES as u32,
},
ChunkRef {
hash: blake3::hash(&legit_chunk_2).into(),
size: 16,
},
];
let blob = BlobRef::manifest("mesh://tampered", Encoding::Replicated, chunks)
.expect("manifest construct");
let encoded = blob.encode();
let mut tampered = legit_chunk_1.clone();
tampered.extend(vec![0xCC; 16]);
let adapter = TamperingAdapter { payload: tampered };
let err = resolve_payload(&encoded, &adapter)
.await
.expect_err("tampered chunk must fail verification");
assert!(
matches!(err, BlobError::HashMismatch { .. }),
"expected HashMismatch on tampered chunk 2, got {:?}",
err
);
}
#[tokio::test]
async fn resolve_rejects_manifest_when_fetched_buffer_is_short() {
use super::super::adapter::BlobAdapter;
use super::super::blob_ref::{ChunkRef, Encoding, BLOB_CHUNK_SIZE_BYTES};
#[derive(Debug)]
struct ShortAdapter {
payload: Vec<u8>,
}
#[async_trait::async_trait]
impl BlobAdapter for ShortAdapter {
fn adapter_id(&self) -> &str {
"short"
}
async fn store(&self, _: &BlobRef, _: &[u8]) -> Result<(), BlobError> {
Ok(())
}
async fn fetch(&self, _: &BlobRef) -> Result<Bytes, BlobError> {
Ok(Bytes::from(self.payload.clone()))
}
async fn fetch_range(
&self,
_: &BlobRef,
range: std::ops::Range<u64>,
) -> Result<Bytes, BlobError> {
let end = (range.end as usize).min(self.payload.len());
Ok(Bytes::copy_from_slice(
&self.payload[range.start as usize..end],
))
}
async fn exists(&self, _: &BlobRef) -> Result<bool, BlobError> {
Ok(true)
}
}
let legit_chunk_1 = vec![0xAA; BLOB_CHUNK_SIZE_BYTES as usize];
let legit_chunk_2 = vec![0xBB; 16];
let chunks = vec![
ChunkRef {
hash: blake3::hash(&legit_chunk_1).into(),
size: BLOB_CHUNK_SIZE_BYTES as u32,
},
ChunkRef {
hash: blake3::hash(&legit_chunk_2).into(),
size: 16,
},
];
let blob = BlobRef::manifest("mesh://short", Encoding::Replicated, chunks)
.expect("manifest construct");
let encoded = blob.encode();
let adapter = ShortAdapter {
payload: legit_chunk_1,
};
let err = resolve_payload(&encoded, &adapter)
.await
.expect_err("short buffer must reject");
let msg = err.to_string();
assert!(
msg.contains("overruns") || msg.contains("!="),
"expected layout-overrun / length-mismatch error; got: {msg}",
);
}
#[tokio::test]
async fn resolve_accepts_chunked_manifest_with_matching_chunk_bytes() {
use super::super::adapter::BlobAdapter;
use super::super::blob_ref::{ChunkRef, Encoding, BLOB_CHUNK_SIZE_BYTES};
#[derive(Debug)]
struct LegitAdapter(Vec<u8>);
#[async_trait::async_trait]
impl BlobAdapter for LegitAdapter {
fn adapter_id(&self) -> &str {
"legit"
}
async fn store(&self, _: &BlobRef, _: &[u8]) -> Result<(), BlobError> {
Ok(())
}
async fn fetch(&self, _: &BlobRef) -> Result<Bytes, BlobError> {
Ok(Bytes::from(self.0.clone()))
}
async fn fetch_range(
&self,
_: &BlobRef,
range: std::ops::Range<u64>,
) -> Result<Bytes, BlobError> {
Ok(Bytes::copy_from_slice(
&self.0[range.start as usize..range.end as usize],
))
}
async fn exists(&self, _: &BlobRef) -> Result<bool, BlobError> {
Ok(true)
}
}
let chunk_1 = vec![0x11; BLOB_CHUNK_SIZE_BYTES as usize];
let chunk_2 = vec![0x22; 32];
let mut full = chunk_1.clone();
full.extend(&chunk_2);
let chunks = vec![
ChunkRef {
hash: blake3::hash(&chunk_1).into(),
size: BLOB_CHUNK_SIZE_BYTES as u32,
},
ChunkRef {
hash: blake3::hash(&chunk_2).into(),
size: 32,
},
];
let blob = BlobRef::manifest("mesh://legit", Encoding::Replicated, chunks)
.expect("manifest construct");
let encoded = blob.encode();
let adapter = LegitAdapter(full.clone());
let resolved = resolve_payload(&encoded, &adapter)
.await
.expect("legitimate manifest must verify");
assert_eq!(resolved.as_ref(), full.as_slice());
}
#[tokio::test]
async fn resolve_rejects_uri_with_unaccepted_scheme() {
let root = std::env::temp_dir().join(format!(
"net-blob-scheme-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
let adapter = FileSystemAdapter::new("scheme-test", &root);
let payload = b"unused";
let blob = BlobRef::small(
"s3://attacker/key",
blake3::hash(payload).into(),
payload.len() as u64,
);
let encoded = blob.encode();
let err = resolve_payload(&encoded, &adapter).await.unwrap_err();
assert!(matches!(err, BlobError::UnsupportedScheme(_)));
let _ = std::fs::remove_dir_all(&root);
}
#[tokio::test]
async fn resolve_rejects_tree_blob_to_force_tree_walking_fetch_range() {
use super::super::blob_ref::Encoding;
use async_trait::async_trait;
use std::ops::Range;
struct UnverifiedTreeAdapter(Vec<u8>);
#[async_trait]
impl BlobAdapter for UnverifiedTreeAdapter {
fn adapter_id(&self) -> &str {
"unverified-tree"
}
async fn store(&self, _: &BlobRef, _: &[u8]) -> Result<(), BlobError> {
Ok(())
}
async fn fetch(&self, _: &BlobRef) -> Result<Bytes, BlobError> {
Ok(Bytes::from(self.0.clone()))
}
async fn fetch_range(
&self,
_: &BlobRef,
_range: Range<u64>,
) -> Result<Bytes, BlobError> {
Ok(Bytes::from(self.0.clone()))
}
async fn exists(&self, _: &BlobRef) -> Result<bool, BlobError> {
Ok(true)
}
}
let tree = BlobRef::tree("test://tree", Encoding::Replicated, [0xAB; 32], 64, 1)
.expect("tree ref");
let encoded = tree.encode();
let adapter = UnverifiedTreeAdapter(vec![0u8; 64]);
let err = resolve_payload(&encoded, &adapter).await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("BlobRef::Tree cannot be resolved through a flat fetch"),
"expected Tree-rejected error; got: {msg}",
);
}
#[tokio::test]
async fn resolve_rejects_blob_with_corrupted_content() {
use async_trait::async_trait;
use std::ops::Range;
struct AdversarialAdapter {
id: String,
bytes: Vec<u8>,
}
#[async_trait]
impl BlobAdapter for AdversarialAdapter {
fn adapter_id(&self) -> &str {
&self.id
}
async fn store(&self, _blob_ref: &BlobRef, _bytes: &[u8]) -> Result<(), BlobError> {
Ok(())
}
async fn fetch(&self, _blob_ref: &BlobRef) -> Result<Bytes, BlobError> {
Ok(Bytes::from(self.bytes.clone()))
}
async fn fetch_range(
&self,
_blob_ref: &BlobRef,
_range: Range<u64>,
) -> Result<Bytes, BlobError> {
Ok(Bytes::from(self.bytes.clone()))
}
async fn exists(&self, _blob_ref: &BlobRef) -> Result<bool, BlobError> {
Ok(true)
}
}
let advertised = b"the truth";
let actual: &[u8] = b"a different lie";
let blob = BlobRef::small(
"test://tamper",
blake3::hash(advertised).into(),
advertised.len() as u64,
);
let adapter = AdversarialAdapter {
id: "tamper".into(),
bytes: actual.to_vec(),
};
let encoded = blob.encode();
let err = resolve_payload(&encoded, &adapter).await.unwrap_err();
assert!(matches!(err, BlobError::HashMismatch { .. }));
}
}