use opendal::Operator;
use sha2::{Digest, Sha256};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BlobMetadata {
pub key: String,
pub content_hash: String,
pub size_bytes: u64,
}
pub struct UniversalBlobStore {
operator: Operator,
scheme: String,
}
impl UniversalBlobStore {
pub fn new() -> Result<Self, String> {
let scheme =
std::env::var("COREASON_STORAGE_SCHEME").unwrap_or_else(|_| "memory".to_string());
let operator = match scheme.as_str() {
"s3" => {
let mut builder = opendal::services::S3::default();
builder = builder.bucket(
&std::env::var("COREASON_S3_BUCKET")
.unwrap_or_else(|_| "coreason-mesh-state".to_string()),
);
if let Ok(endpoint) = std::env::var("S3_ENDPOINT_URL") {
builder = builder.endpoint(&endpoint);
}
if let Ok(region) = std::env::var("AWS_REGION") {
builder = builder.region(®ion);
}
if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
builder = builder.access_key_id(&key_id);
}
if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
builder = builder.secret_access_key(&secret);
}
Operator::new(builder)
.map_err(|e| format!("Failed to create S3 operator: {}", e))?
.finish()
}
"fs" => {
let root = std::env::var("COREASON_STORAGE_PATH").unwrap_or_else(|_| {
std::env::temp_dir()
.join("coreason")
.to_string_lossy()
.to_string()
});
let builder = opendal::services::Fs::default().root(&root);
Operator::new(builder)
.map_err(|e| format!("Failed to create Fs operator: {}", e))?
.finish()
}
_ => {
let builder = opendal::services::Memory::default();
Operator::new(builder)
.map_err(|e| format!("Failed to create Memory operator: {}", e))?
.finish()
}
};
Ok(Self { operator, scheme })
}
pub fn from_operator(operator: Operator, scheme: &str) -> Self {
Self {
operator,
scheme: scheme.to_string(),
}
}
pub fn write_bytes(&self, key: &str, data: &[u8]) -> Result<String, String> {
self.operator
.blocking()
.write(key, data.to_vec())
.map_err(|e| format!("Failed to write object {}: {}", key, e))?;
let bucket = std::env::var("COREASON_S3_BUCKET")
.unwrap_or_else(|_| "coreason-mesh-state".to_string());
Ok(format!("{}://{}/{}", self.scheme, bucket, key))
}
pub fn read_bytes(&self, key: &str) -> Result<Vec<u8>, String> {
self.operator
.blocking()
.read(key)
.map(|buf| buf.to_vec())
.map_err(|e| format!("Failed to read object {}: {}", key, e))
}
pub fn stat(&self, key: &str) -> Result<BlobMetadata, String> {
let meta = self
.operator
.blocking()
.stat(key)
.map_err(|e| format!("Failed to stat object {}: {}", key, e))?;
let data = self.read_bytes(key)?;
let content_hash = Self::content_hash(&data);
Ok(BlobMetadata {
key: key.to_string(),
content_hash,
size_bytes: meta.content_length(),
})
}
pub fn delete(&self, key: &str) -> Result<(), String> {
self.operator
.blocking()
.delete(key)
.map_err(|e| format!("Failed to delete object {}: {}", key, e))
}
pub fn content_hash(data: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(data);
format!("{:x}", hasher.finalize())
}
pub fn scheme(&self) -> &str {
&self.scheme
}
}
impl Default for UniversalBlobStore {
fn default() -> Self {
Self::new().expect("Failed to create default blob store")
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_store() -> UniversalBlobStore {
let builder = opendal::services::Memory::default();
let op = Operator::new(builder).unwrap().finish();
UniversalBlobStore::from_operator(op, "memory")
}
#[test]
fn test_write_and_read() {
let store = test_store();
let data = b"hello coreason";
let uri = store.write_bytes("test/key.bin", data).unwrap();
assert!(uri.contains("test/key.bin"));
let read_back = store.read_bytes("test/key.bin").unwrap();
assert_eq!(read_back, data);
}
#[test]
fn test_read_not_found() {
let store = test_store();
let result = store.read_bytes("nonexistent");
assert!(result.is_err());
}
#[test]
fn test_stat() {
let store = test_store();
store.write_bytes("meta/test", b"some data").unwrap();
let meta = store.stat("meta/test").unwrap();
assert_eq!(meta.key, "meta/test");
assert_eq!(meta.size_bytes, 9);
assert!(!meta.content_hash.is_empty());
}
#[test]
fn test_delete() {
let store = test_store();
store.write_bytes("to_delete", b"temp").unwrap();
store.delete("to_delete").unwrap();
assert!(store.read_bytes("to_delete").is_err());
}
#[test]
fn test_content_hash_deterministic() {
let h1 = UniversalBlobStore::content_hash(b"test");
let h2 = UniversalBlobStore::content_hash(b"test");
assert_eq!(h1, h2);
assert_eq!(h1.len(), 64);
}
#[test]
fn test_content_hash_different_inputs() {
let h1 = UniversalBlobStore::content_hash(b"a");
let h2 = UniversalBlobStore::content_hash(b"b");
assert_ne!(h1, h2);
}
}