coreason-runtime 0.1.0

Kinetic Plane execution engine for the CoReason Tripartite Cybernetic Manifold
Documentation
// Copyright (c) 2026 CoReason, Inc.
// All rights reserved.

//! Universal Blob Store.
//!
//! Replaces `coreason_runtime/execution_plane/blob_store.py`.
//!
//! S3-compliant object storage abstraction that decouples the runtime from
//! proprietary cloud SDKs. Allows identical operations on AWS S3, MinIO,
//! local filesystem, or in-memory storage.
//!
//! Zero Waste: All I/O operations delegated to `opendal` (Apache Foundation,
//! Apache-2.0). We only write the environment-variable-driven initialization
//! logic specific to CoReason's deployment topology.

use opendal::Operator;
use sha2::{Digest, Sha256};

/// Metadata for a stored blob.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BlobMetadata {
    pub key: String,
    pub content_hash: String,
    pub size_bytes: u64,
}

/// Universal blob store backed by Apache OpenDAL.
///
/// Zero Waste: Delegates all I/O to `opendal::Operator`.
/// Supports S3, filesystem, and in-memory backends via environment variables.
pub struct UniversalBlobStore {
    operator: Operator,
    scheme: String,
}

impl UniversalBlobStore {
    /// Create a new blob store from environment configuration.
    ///
    /// Reads `COREASON_STORAGE_SCHEME` to select backend:
    /// - `"memory"` (default): In-memory storage via opendal
    /// - `"s3"`: S3-compatible storage (AWS, MinIO)
    /// - `"fs"`: Local filesystem
    pub fn new() -> Result<Self, String> {
        let scheme =
            std::env::var("COREASON_STORAGE_SCHEME").unwrap_or_else(|_| "memory".to_string());

        let operator = match scheme.as_str() {
            "s3" => {
                let mut builder = opendal::services::S3::default();
                builder = builder.bucket(
                    &std::env::var("COREASON_S3_BUCKET")
                        .unwrap_or_else(|_| "coreason-mesh-state".to_string()),
                );
                if let Ok(endpoint) = std::env::var("S3_ENDPOINT_URL") {
                    builder = builder.endpoint(&endpoint);
                }
                if let Ok(region) = std::env::var("AWS_REGION") {
                    builder = builder.region(&region);
                }
                if let Ok(key_id) = std::env::var("AWS_ACCESS_KEY_ID") {
                    builder = builder.access_key_id(&key_id);
                }
                if let Ok(secret) = std::env::var("AWS_SECRET_ACCESS_KEY") {
                    builder = builder.secret_access_key(&secret);
                }
                Operator::new(builder)
                    .map_err(|e| format!("Failed to create S3 operator: {}", e))?
                    .finish()
            }
            "fs" => {
                let root = std::env::var("COREASON_STORAGE_PATH").unwrap_or_else(|_| {
                    std::env::temp_dir()
                        .join("coreason")
                        .to_string_lossy()
                        .to_string()
                });
                let builder = opendal::services::Fs::default().root(&root);
                Operator::new(builder)
                    .map_err(|e| format!("Failed to create Fs operator: {}", e))?
                    .finish()
            }
            _ => {
                // Default: in-memory storage
                let builder = opendal::services::Memory::default();
                Operator::new(builder)
                    .map_err(|e| format!("Failed to create Memory operator: {}", e))?
                    .finish()
            }
        };

        Ok(Self { operator, scheme })
    }

    /// Create a blob store with a specific opendal operator (for testing).
    pub fn from_operator(operator: Operator, scheme: &str) -> Self {
        Self {
            operator,
            scheme: scheme.to_string(),
        }
    }

    /// Write bytes to the store under the given key.
    ///
    /// Returns the storage URI.
    /// Zero Waste: Delegates to `opendal::Operator::blocking_write`.
    pub fn write_bytes(&self, key: &str, data: &[u8]) -> Result<String, String> {
        self.operator
            .blocking()
            .write(key, data.to_vec())
            .map_err(|e| format!("Failed to write object {}: {}", key, e))?;
        let bucket = std::env::var("COREASON_S3_BUCKET")
            .unwrap_or_else(|_| "coreason-mesh-state".to_string());
        Ok(format!("{}://{}/{}", self.scheme, bucket, key))
    }

    /// Read bytes from the store by key.
    ///
    /// Zero Waste: Delegates to `opendal::Operator::blocking_read`.
    pub fn read_bytes(&self, key: &str) -> Result<Vec<u8>, String> {
        self.operator
            .blocking()
            .read(key)
            .map(|buf| buf.to_vec())
            .map_err(|e| format!("Failed to read object {}: {}", key, e))
    }

    /// Get metadata for a stored object.
    ///
    /// Zero Waste: Delegates to `opendal::Operator::blocking_stat`.
    pub fn stat(&self, key: &str) -> Result<BlobMetadata, String> {
        let meta = self
            .operator
            .blocking()
            .stat(key)
            .map_err(|e| format!("Failed to stat object {}: {}", key, e))?;

        // Read content for hash computation
        let data = self.read_bytes(key)?;
        let content_hash = Self::content_hash(&data);

        Ok(BlobMetadata {
            key: key.to_string(),
            content_hash,
            size_bytes: meta.content_length(),
        })
    }

    /// Delete an object from the store.
    ///
    /// Zero Waste: Delegates to `opendal::Operator::blocking_delete`.
    pub fn delete(&self, key: &str) -> Result<(), String> {
        self.operator
            .blocking()
            .delete(key)
            .map_err(|e| format!("Failed to delete object {}: {}", key, e))
    }

    /// Compute SHA-256 content hash for data.
    ///
    /// Zero Waste: Delegates to `sha2::Sha256`.
    pub fn content_hash(data: &[u8]) -> String {
        let mut hasher = Sha256::new();
        hasher.update(data);
        format!("{:x}", hasher.finalize())
    }

    /// Get the storage scheme name.
    pub fn scheme(&self) -> &str {
        &self.scheme
    }
}

impl Default for UniversalBlobStore {
    fn default() -> Self {
        Self::new().expect("Failed to create default blob store")
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn test_store() -> UniversalBlobStore {
        let builder = opendal::services::Memory::default();
        let op = Operator::new(builder).unwrap().finish();
        UniversalBlobStore::from_operator(op, "memory")
    }

    #[test]
    fn test_write_and_read() {
        let store = test_store();
        let data = b"hello coreason";
        let uri = store.write_bytes("test/key.bin", data).unwrap();
        assert!(uri.contains("test/key.bin"));

        let read_back = store.read_bytes("test/key.bin").unwrap();
        assert_eq!(read_back, data);
    }

    #[test]
    fn test_read_not_found() {
        let store = test_store();
        let result = store.read_bytes("nonexistent");
        assert!(result.is_err());
    }

    #[test]
    fn test_stat() {
        let store = test_store();
        store.write_bytes("meta/test", b"some data").unwrap();

        let meta = store.stat("meta/test").unwrap();
        assert_eq!(meta.key, "meta/test");
        assert_eq!(meta.size_bytes, 9);
        assert!(!meta.content_hash.is_empty());
    }

    #[test]
    fn test_delete() {
        let store = test_store();
        store.write_bytes("to_delete", b"temp").unwrap();
        store.delete("to_delete").unwrap();
        assert!(store.read_bytes("to_delete").is_err());
    }

    #[test]
    fn test_content_hash_deterministic() {
        let h1 = UniversalBlobStore::content_hash(b"test");
        let h2 = UniversalBlobStore::content_hash(b"test");
        assert_eq!(h1, h2);
        assert_eq!(h1.len(), 64);
    }

    #[test]
    fn test_content_hash_different_inputs() {
        let h1 = UniversalBlobStore::content_hash(b"a");
        let h2 = UniversalBlobStore::content_hash(b"b");
        assert_ne!(h1, h2);
    }
}