use super::adapter::BlobAdapter;
use super::blob_ref::BlobRef;
use super::error::BlobError;
pub async fn run_conformance_suite<A: BlobAdapter + ?Sized>(
adapter: &A,
) -> Result<(), &'static str> {
use std::sync::atomic::{AtomicU64, Ordering};
static SUITE_NONCE: AtomicU64 = AtomicU64::new(1);
let run_nonce = SUITE_NONCE.fetch_add(1, Ordering::Relaxed);
let payload: Vec<u8> = format!(
"conformance-fixture-payload-{}-{}",
std::process::id(),
run_nonce
)
.into_bytes();
let hash: [u8; 32] = blake3::hash(&payload).into();
let blob = BlobRef::small(
format!("conformance://{}/payload", adapter.adapter_id()),
hash,
payload.len() as u64,
);
if adapter
.exists(&blob)
.await
.map_err(|_| "exists pre-store")?
{
return Err("exists returned true before store");
}
adapter
.store(&blob, &payload)
.await
.map_err(|_| "store failed")?;
if !adapter
.exists(&blob)
.await
.map_err(|_| "exists post-store")?
{
return Err("exists returned false after store");
}
let fetched = adapter
.fetch(&blob)
.await
.map_err(|_| "fetch after store failed")?;
if fetched != payload {
return Err("fetched bytes != stored bytes");
}
blob.verify(&fetched)
.map_err(|_| "BlobRef::verify on fetched bytes failed")?;
let slice = adapter
.fetch_range(&blob, 5..10)
.await
.map_err(|_| "fetch_range mid failed")?;
if slice.as_ref() != &payload[5..10] {
return Err("fetch_range mid returned wrong slice");
}
let empty = adapter
.fetch_range(&blob, 3..3)
.await
.map_err(|_| "fetch_range empty failed")?;
if !empty.is_empty() {
return Err("fetch_range empty returned non-empty result");
}
adapter
.store(&blob, &payload)
.await
.map_err(|_| "idempotent re-store failed")?;
let refetched = adapter
.fetch(&blob)
.await
.map_err(|_| "fetch after re-store failed")?;
if refetched != payload {
return Err("idempotent re-store changed content");
}
let tampered: Vec<u8> = format!("tampered-{}", run_nonce).into_bytes();
let tampered_hash: [u8; 32] = blake3::hash(&tampered).into();
if !tampered.is_empty() && Some(&tampered_hash) != blob.small_hash() {
match adapter.store(&blob, &tampered).await {
Err(BlobError::HashMismatch { .. }) => {}
Ok(()) => {
return Err("store accepted bytes that don't hash to the claimed BlobRef.hash")
}
Err(_) => return Err("store rejected mismatched bytes with the wrong error variant"),
}
}
let past_end = blob.size() + 1;
if adapter
.fetch_range(&blob, blob.size()..past_end)
.await
.is_ok()
{
return Err("fetch_range past end returned Ok");
}
let second_payload: Vec<u8> =
format!("second-payload-{}-{}", std::process::id(), run_nonce).into_bytes();
let second_hash: [u8; 32] = blake3::hash(&second_payload).into();
let second_blob = BlobRef::small(
format!("conformance://{}/second", adapter.adapter_id()),
second_hash,
second_payload.len() as u64,
);
adapter
.store(&second_blob, &second_payload)
.await
.map_err(|_| "second store failed")?;
let first_again = adapter
.fetch(&blob)
.await
.map_err(|_| "fetch first after second store failed")?;
if first_again != payload {
return Err("second blob's store corrupted first blob's content");
}
{
use futures::StreamExt;
let mut stream = adapter
.fetch_stream(&blob)
.await
.map_err(|_| "fetch_stream failed")?;
let mut buf: Vec<u8> = Vec::new();
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(|_| "fetch_stream chunk error")?;
buf.extend_from_slice(&chunk);
}
if buf != payload {
return Err("fetch_stream bytes != stored bytes");
}
blob.verify(&buf)
.map_err(|_| "BlobRef::verify on fetch_stream bytes failed")?;
}
let mut ghost_hash = [0u8; 32];
let nonce_bytes = run_nonce.to_le_bytes();
ghost_hash[..8].copy_from_slice(&nonce_bytes);
ghost_hash[8..16].copy_from_slice(&nonce_bytes);
ghost_hash[16..24].copy_from_slice(&(std::process::id() as u64).to_le_bytes());
ghost_hash[24..32].copy_from_slice(b"NO-GHOST");
let ghost = BlobRef::small(
format!("conformance://{}/ghost-{}", adapter.adapter_id(), run_nonce),
ghost_hash,
0,
);
match adapter.fetch(&ghost).await {
Err(BlobError::NotFound(_)) => {}
Ok(_) => return Err("fetch of missing blob returned Ok"),
Err(_) => return Err("fetch of missing blob returned wrong error variant"),
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::super::fs::FileSystemAdapter;
use super::super::noop::NoopAdapter;
use super::*;
#[tokio::test]
async fn fs_adapter_passes_full_conformance_suite() {
let root = std::env::temp_dir().join(format!(
"net-blob-conformance-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
));
let adapter = FileSystemAdapter::new("conformance-fs", &root);
run_conformance_suite(&adapter).await.unwrap();
let _ = std::fs::remove_dir_all(&root);
}
#[tokio::test]
async fn noop_adapter_fails_at_store_visibility() {
let adapter = NoopAdapter::new("conformance-noop");
let err = run_conformance_suite(&adapter).await.unwrap_err();
assert_eq!(err, "exists returned false after store");
}
}