use bytes::Bytes;
use super::blob_ref::{chunk_payload, BlobRef, ChunkedPayload, Encoding};
use super::error::BlobError;
use super::mesh::MeshBlobAdapter;
use crate::adapter::net::channel::{ChannelPublisher, PublishReport};
use crate::adapter::net::mesh::MeshNode;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BlobDurability {
BestEffort,
DurableOnLocal,
ReplicatedTo(u8),
}
impl Default for BlobDurability {
fn default() -> Self {
Self::DurableOnLocal
}
}
#[derive(Debug)]
pub struct PublishWithBlobReceipt {
pub blob_ref: BlobRef,
pub publish_report: PublishReport,
}
pub async fn publish_with_blob(
mesh: &MeshNode,
adapter: &MeshBlobAdapter,
publisher: &ChannelPublisher,
uri_hint: impl Into<String>,
bytes: Bytes,
durability: BlobDurability,
) -> Result<PublishWithBlobReceipt, BlobError> {
use crate::adapter::net::dataforts::blob::adapter::BlobAdapter;
let uri = uri_hint.into();
let chunked = chunk_payload(&bytes)?;
let blob_ref = match chunked {
ChunkedPayload::Inline { hash, .. } => {
BlobRef::small(uri.clone(), hash, bytes.len() as u64)
}
ChunkedPayload::Chunked { ref chunks, .. } => {
let chunk_refs = chunks.iter().map(|(r, _)| *r).collect();
BlobRef::manifest(uri.clone(), Encoding::Replicated, chunk_refs)?
}
};
adapter.store(&blob_ref, &bytes).await?;
match durability {
BlobDurability::BestEffort => {
}
BlobDurability::DurableOnLocal => {
adapter.sync_blob(&blob_ref).await?;
}
BlobDurability::ReplicatedTo(n) => {
return Err(BlobError::Backend(format!(
"ReplicatedTo({}) durability is not yet implemented \
in v0.2 PR-3; track DATAFORTS_BLOB_STORAGE_PLAN.md \
§ PR-5 for the cross-node wait wiring",
n
)));
}
}
let payload = Bytes::from(blob_ref.encode());
let publish_report = mesh
.publish(publisher, payload)
.await
.map_err(|e| BlobError::Backend(format!("mesh publish: {}", e)))?;
Ok(PublishWithBlobReceipt {
blob_ref,
publish_report,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::dataforts::blob::adapter::BlobAdapter;
use crate::adapter::net::redex::Redex;
use std::sync::Arc;
fn make_adapter() -> MeshBlobAdapter {
let redex = Arc::new(Redex::new());
MeshBlobAdapter::new("mesh-pub-test", redex)
}
fn make_persistent_adapter() -> (MeshBlobAdapter, std::path::PathBuf) {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let root = std::env::temp_dir().join(format!("net-pwb-test-{}-{}", std::process::id(), n));
let redex = Arc::new(Redex::new().with_persistent_dir(&root));
let adapter = MeshBlobAdapter::new("mesh-pub-persistent", redex).with_persistent(true);
(adapter, root)
}
#[tokio::test]
async fn best_effort_stores_blob_and_returns_blob_ref() {
let adapter = make_adapter();
let payload = Bytes::from_static(b"hello publish_with_blob");
let chunked = chunk_payload(&payload).unwrap();
let blob_ref = match chunked {
ChunkedPayload::Inline { hash, .. } => {
BlobRef::small("mesh://x", hash, payload.len() as u64)
}
_ => panic!("expected Inline for small payload"),
};
adapter.store(&blob_ref, &payload).await.unwrap();
let fetched = adapter.fetch(&blob_ref).await.unwrap();
assert_eq!(fetched, payload);
}
#[tokio::test]
async fn durable_on_local_syncs_chunk_files() {
let (adapter, _root) = make_persistent_adapter();
let payload = Bytes::from_static(b"durable on local");
let hash: [u8; 32] = blake3::hash(&payload).into();
let blob_ref = BlobRef::small("mesh://durable", hash, payload.len() as u64);
adapter.store(&blob_ref, &payload).await.unwrap();
adapter.sync_blob(&blob_ref).await.unwrap();
let fetched = adapter.fetch(&blob_ref).await.unwrap();
assert_eq!(fetched, payload);
}
#[tokio::test]
async fn sync_blob_before_store_returns_not_found() {
let adapter = make_adapter();
let blob_ref = BlobRef::small("mesh://ghost", [0xFF; 32], 0);
let err = adapter.sync_blob(&blob_ref).await.unwrap_err();
assert!(matches!(err, BlobError::NotFound(_)));
}
#[tokio::test]
async fn every_chunk_is_locally_fetchable_after_store_and_sync() {
use super::super::blob_ref::{ChunkRef, BLOB_CHUNK_SIZE_BYTES};
let (adapter, _root) = make_persistent_adapter();
let payload: Vec<u8> = (0..(BLOB_CHUNK_SIZE_BYTES as usize + 4096))
.map(|i| (i % 251) as u8)
.collect();
let chunked = chunk_payload(&payload).unwrap();
let chunk_refs: Vec<ChunkRef> = match chunked {
ChunkedPayload::Chunked { chunks, .. } => chunks.into_iter().map(|(r, _)| r).collect(),
_ => panic!("expected Chunked"),
};
assert!(chunk_refs.len() >= 2, "test fixture must produce ≥2 chunks");
let blob_ref = BlobRef::manifest("mesh://pwb", Encoding::Replicated, chunk_refs.clone())
.expect("manifest");
adapter.store(&blob_ref, &payload).await.unwrap();
adapter.sync_blob(&blob_ref).await.unwrap();
let fetched = adapter.fetch(&blob_ref).await.unwrap();
assert_eq!(fetched.as_ref(), payload.as_slice());
}
#[tokio::test]
async fn sync_blob_walks_every_chunk_of_a_manifest() {
use super::super::blob_ref::{ChunkRef, BLOB_CHUNK_SIZE_BYTES};
let (adapter, _root) = make_persistent_adapter();
let payload: Vec<u8> = (0..(BLOB_CHUNK_SIZE_BYTES as usize + 100))
.map(|i| (i % 251) as u8)
.collect();
let chunked = chunk_payload(&payload).unwrap();
let chunk_refs: Vec<ChunkRef> = match chunked {
ChunkedPayload::Chunked { chunks, .. } => chunks.into_iter().map(|(r, _)| r).collect(),
_ => panic!("expected Chunked"),
};
let blob_ref = BlobRef::manifest("mesh://multi", Encoding::Replicated, chunk_refs).unwrap();
adapter.store(&blob_ref, &payload).await.unwrap();
adapter.sync_blob(&blob_ref).await.unwrap();
}
#[tokio::test]
async fn replicated_to_returns_not_yet_implemented_error() {
let d = BlobDurability::ReplicatedTo(3);
assert!(matches!(d, BlobDurability::ReplicatedTo(3)));
}
#[test]
fn default_durability_is_durable_on_local() {
assert_eq!(BlobDurability::default(), BlobDurability::DurableOnLocal);
}
}