use std::ops::Range;
use std::pin::Pin;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::Stream;
use super::blob_ref::{BlobRef, Encoding};
use super::error::BlobError;
pub type BlobByteStream = Pin<Box<dyn Stream<Item = Result<Bytes, BlobError>> + Send>>;
#[derive(Clone, Debug, Default)]
pub struct BlobStat {
pub size: u64,
pub replicas_observed: u32,
pub replica_target: Option<u8>,
pub last_seen_unix_ms: Option<u64>,
pub encoding: Option<Encoding>,
}
#[async_trait]
pub trait BlobAdapter: Send + Sync + 'static {
fn adapter_id(&self) -> &str;
fn accepted_schemes(&self) -> &[&str] {
&[]
}
async fn store(&self, blob_ref: &BlobRef, bytes: &[u8]) -> Result<(), BlobError>;
async fn fetch(&self, blob_ref: &BlobRef) -> Result<Bytes, BlobError>;
async fn fetch_range(&self, blob_ref: &BlobRef, range: Range<u64>) -> Result<Bytes, BlobError>;
async fn exists(&self, blob_ref: &BlobRef) -> Result<bool, BlobError>;
async fn fetch_stream(&self, blob_ref: &BlobRef) -> Result<BlobByteStream, BlobError> {
let bytes = self.fetch(blob_ref).await?;
let stream = futures::stream::once(async move { Ok(bytes) });
Ok(Box::pin(stream))
}
async fn store_stream(
&self,
blob_ref: &BlobRef,
mut stream: BlobByteStream,
size_hint: Option<u64>,
) -> Result<(), BlobError> {
use futures::StreamExt;
const MAX_STREAM_BYTES: u64 = 16 * 1024 * 1024 * 1024;
let effective_cap = MAX_STREAM_BYTES.min(usize::MAX as u64);
let mut buf: Vec<u8> = match size_hint {
Some(n) if (n as usize) <= 16 * 1024 * 1024 => Vec::with_capacity(n as usize),
_ => Vec::new(),
};
while let Some(chunk) = stream.next().await {
let bytes = chunk?;
if (buf.len() as u64).saturating_add(bytes.len() as u64) > effective_cap {
return Err(BlobError::Backend(format!(
"store_stream: accumulated {} bytes exceeds {} cap",
buf.len(),
effective_cap
)));
}
buf.extend_from_slice(&bytes);
}
self.store(blob_ref, &buf).await
}
async fn delete(&self, _blob_ref: &BlobRef) -> Result<(), BlobError> {
Ok(())
}
async fn prefetch(&self, _blob_ref: &BlobRef) -> Result<(), BlobError> {
Ok(())
}
async fn stat(&self, blob_ref: &BlobRef) -> Result<BlobStat, BlobError> {
Ok(BlobStat {
size: blob_ref.size(),
encoding: blob_ref.encoding(),
..Default::default()
})
}
async fn list(&self, _opts: &BlobListOptions) -> Result<Vec<BlobInventoryEntry>, BlobError> {
Ok(Vec::new())
}
fn supports_list(&self) -> bool {
false
}
}
#[derive(Clone, Debug, Default)]
pub struct BlobListOptions {
pub prefix_hex: Option<String>,
pub limit: usize,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BlobInventoryEntry {
pub adapter_id: String,
pub hash_hex: String,
pub refcount: u32,
pub pinned: bool,
pub first_seen_unix_ms: u64,
pub last_seen_unix_ms: u64,
pub size_bytes: Option<u64>,
pub replicas_observed: Option<u32>,
pub replica_target: Option<u32>,
}