Skip to main content

clawft_kernel/
artifact_store.rs

1//! Content-addressed artifact store using BLAKE3 hashes (K3-G1).
2//!
3//! Stores arbitrary binary artifacts (WASM modules, app manifests,
4//! config bundles) indexed by their BLAKE3 content hash. Deduplicates
5//! automatically: storing the same content twice returns the same hash
6//! and increments the reference count.
7//!
8//! This module requires the `ecc` feature (which pulls `blake3`).
9
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
12
13use async_trait::async_trait;
14use chrono::{DateTime, Utc};
15use dashmap::DashMap;
16use serde::{Deserialize, Serialize};
17use tracing::info;
18
19use crate::error::KernelError;
20use crate::health::HealthStatus;
21use crate::service::{ServiceType, SystemService};
22
23// ---------------------------------------------------------------------------
24// ArtifactType
25// ---------------------------------------------------------------------------
26
27/// The kind of content stored in the artifact.
28#[non_exhaustive]
29#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
30pub enum ArtifactType {
31    /// WebAssembly module (.wasm).
32    WasmModule,
33    /// Application manifest (app.json).
34    AppManifest,
35    /// Configuration bundle.
36    ConfigBundle,
37    /// Untyped binary blob.
38    Generic,
39}
40
41impl std::fmt::Display for ArtifactType {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        match self {
44            Self::WasmModule => write!(f, "wasm-module"),
45            Self::AppManifest => write!(f, "app-manifest"),
46            Self::ConfigBundle => write!(f, "config-bundle"),
47            Self::Generic => write!(f, "generic"),
48        }
49    }
50}
51
52// ---------------------------------------------------------------------------
53// StoredArtifact (metadata)
54// ---------------------------------------------------------------------------
55
56/// Metadata about a stored artifact (the data lives in the backend).
57pub struct StoredArtifact {
58    /// BLAKE3 hex hash of the content.
59    pub hash: String,
60    /// Content size in bytes.
61    pub size: u64,
62    /// Content type classification.
63    pub content_type: ArtifactType,
64    /// When this artifact was first stored.
65    pub stored_at: DateTime<Utc>,
66    /// Number of active references (for GC).
67    pub reference_count: AtomicU32,
68}
69
70// ---------------------------------------------------------------------------
71// ArtifactBackend
72// ---------------------------------------------------------------------------
73
74/// Storage backend for artifact data.
75#[non_exhaustive]
76pub enum ArtifactBackend {
77    /// In-memory storage (for tests / embedded use).
78    Memory(DashMap<String, Vec<u8>>),
79    /// File-system storage with two-level directory sharding.
80    File {
81        /// Base directory for artifact files.
82        base_path: PathBuf,
83    },
84}
85
86// ---------------------------------------------------------------------------
87// ArtifactStore
88// ---------------------------------------------------------------------------
89
90/// Content-addressed artifact store using BLAKE3 hashes.
91///
92/// Thread-safe via `DashMap`. The store guarantees:
93/// - **Deduplication**: same content always produces the same hash.
94/// - **Integrity**: every load verifies the content hash.
95/// - **Reference counting**: tracks how many consumers reference each artifact.
96pub struct ArtifactStore {
97    /// Hash -> metadata index.
98    artifacts: DashMap<String, StoredArtifact>,
99    /// Data storage backend.
100    backend: ArtifactBackend,
101    /// Total stored bytes across all artifacts.
102    total_size: AtomicU64,
103}
104
105impl ArtifactStore {
106    /// Create a new in-memory artifact store.
107    pub fn new_memory() -> Self {
108        Self {
109            artifacts: DashMap::new(),
110            backend: ArtifactBackend::Memory(DashMap::new()),
111            total_size: AtomicU64::new(0),
112        }
113    }
114
115    /// Create a new file-backed artifact store.
116    pub fn new_file(base_path: PathBuf) -> Self {
117        Self {
118            artifacts: DashMap::new(),
119            backend: ArtifactBackend::File { base_path },
120            total_size: AtomicU64::new(0),
121        }
122    }
123
124    /// Store content and return the BLAKE3 hash.
125    ///
126    /// If the content is already stored, increments the reference count
127    /// and returns the existing hash (deduplication).
128    pub fn store(
129        &self,
130        content: &[u8],
131        content_type: ArtifactType,
132    ) -> Result<String, KernelError> {
133        let hash = blake3::hash(content).to_hex().to_string();
134
135        // Dedup: if hash exists, just bump reference count.
136        if let Some(existing) = self.artifacts.get(&hash) {
137            existing.reference_count.fetch_add(1, Ordering::Relaxed);
138            return Ok(hash);
139        }
140
141        // Write to backend.
142        match &self.backend {
143            ArtifactBackend::Memory(map) => {
144                map.insert(hash.clone(), content.to_vec());
145            }
146            ArtifactBackend::File { base_path } => {
147                let prefix = &hash[..2.min(hash.len())];
148                let dir = base_path.join(prefix);
149                std::fs::create_dir_all(&dir).map_err(|e| {
150                    KernelError::Service(format!("artifact dir create: {e}"))
151                })?;
152                std::fs::write(dir.join(&hash), content).map_err(|e| {
153                    KernelError::Service(format!("artifact write: {e}"))
154                })?;
155            }
156        }
157
158        // Record metadata.
159        let size = content.len() as u64;
160        self.artifacts.insert(
161            hash.clone(),
162            StoredArtifact {
163                hash: hash.clone(),
164                size,
165                content_type,
166                stored_at: Utc::now(),
167                reference_count: AtomicU32::new(1),
168            },
169        );
170        self.total_size.fetch_add(size, Ordering::Relaxed);
171
172        info!(hash = %hash, size, "artifact stored");
173        Ok(hash)
174    }
175
176    /// Load content by hash, verifying integrity on read.
177    pub fn load(&self, hash: &str) -> Result<Vec<u8>, KernelError> {
178        if !self.artifacts.contains_key(hash) {
179            return Err(KernelError::Service(format!(
180                "artifact not found: {hash}"
181            )));
182        }
183
184        let content = match &self.backend {
185            ArtifactBackend::Memory(map) => map
186                .get(hash)
187                .map(|v| v.value().clone()),
188            ArtifactBackend::File { base_path } => {
189                let prefix = &hash[..2.min(hash.len())];
190                let path = base_path.join(prefix).join(hash);
191                std::fs::read(&path).ok()
192            }
193        };
194
195        let content = content.ok_or_else(|| {
196            KernelError::Service(format!("artifact data missing: {hash}"))
197        })?;
198
199        // Verify integrity.
200        let actual_hash = blake3::hash(&content).to_hex().to_string();
201        if actual_hash != hash {
202            return Err(KernelError::Service(format!(
203                "artifact integrity error: expected {hash}, got {actual_hash}"
204            )));
205        }
206
207        Ok(content)
208    }
209
210    /// Check whether an artifact exists by hash.
211    pub fn contains(&self, hash: &str) -> bool {
212        self.artifacts.contains_key(hash)
213    }
214
215    /// Decrement the reference count for an artifact.
216    ///
217    /// Returns `true` if the reference count reached zero (eligible for GC).
218    pub fn release(&self, hash: &str) -> bool {
219        if let Some(entry) = self.artifacts.get(hash) {
220            let prev = entry.reference_count.fetch_sub(1, Ordering::Relaxed);
221            return prev <= 1;
222        }
223        false
224    }
225
226    /// Remove an artifact and its data from storage.
227    pub fn remove(&self, hash: &str) -> Result<(), KernelError> {
228        if let Some((_, meta)) = self.artifacts.remove(hash) {
229            self.total_size.fetch_sub(meta.size, Ordering::Relaxed);
230            match &self.backend {
231                ArtifactBackend::Memory(map) => {
232                    map.remove(hash);
233                }
234                ArtifactBackend::File { base_path } => {
235                    let prefix = &hash[..2.min(hash.len())];
236                    let path = base_path.join(prefix).join(hash);
237                    let _ = std::fs::remove_file(path);
238                }
239            }
240        }
241        Ok(())
242    }
243
244    /// Total number of stored artifacts.
245    pub fn count(&self) -> usize {
246        self.artifacts.len()
247    }
248
249    /// Total stored bytes.
250    pub fn total_bytes(&self) -> u64 {
251        self.total_size.load(Ordering::Relaxed)
252    }
253
254    /// Get metadata for an artifact (hash, size, type, stored_at, refcount).
255    pub fn metadata(&self, hash: &str) -> Option<(String, u64, ArtifactType, DateTime<Utc>, u32)> {
256        self.artifacts.get(hash).map(|a| {
257            (
258                a.hash.clone(),
259                a.size,
260                a.content_type.clone(),
261                a.stored_at,
262                a.reference_count.load(Ordering::Relaxed),
263            )
264        })
265    }
266}
267
268#[async_trait]
269impl SystemService for ArtifactStore {
270    fn name(&self) -> &str {
271        "artifact-store"
272    }
273
274    fn service_type(&self) -> ServiceType {
275        ServiceType::Core
276    }
277
278    async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
279        info!("artifact store started");
280        Ok(())
281    }
282
283    async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
284        info!(
285            count = self.count(),
286            total_bytes = self.total_bytes(),
287            "artifact store stopped"
288        );
289        Ok(())
290    }
291
292    async fn health_check(&self) -> HealthStatus {
293        HealthStatus::Healthy
294    }
295}
296
297// ── Tests ─────────────────────────────────────────────────────────────────
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302
303    #[test]
304    fn store_and_load_roundtrip() {
305        let store = ArtifactStore::new_memory();
306        let content = b"hello weftos artifact";
307        let hash = store.store(content, ArtifactType::Generic).unwrap();
308        let loaded = store.load(&hash).unwrap();
309        assert_eq!(loaded, content);
310    }
311
312    #[test]
313    fn hash_verification_on_load() {
314        let store = ArtifactStore::new_memory();
315        let content = b"original content";
316        let hash = store.store(content, ArtifactType::WasmModule).unwrap();
317
318        // Tamper with stored data.
319        if let ArtifactBackend::Memory(map) = &store.backend {
320            map.insert(hash.clone(), b"tampered content".to_vec());
321        }
322
323        let result = store.load(&hash);
324        assert!(result.is_err());
325        let err = result.unwrap_err().to_string();
326        assert!(err.contains("integrity error"), "got: {err}");
327    }
328
329    #[test]
330    fn duplicate_store_returns_same_hash() {
331        let store = ArtifactStore::new_memory();
332        let content = b"dedup test";
333        let h1 = store.store(content, ArtifactType::Generic).unwrap();
334        let h2 = store.store(content, ArtifactType::Generic).unwrap();
335        assert_eq!(h1, h2);
336        assert_eq!(store.count(), 1);
337    }
338
339    #[test]
340    fn duplicate_store_increments_refcount() {
341        let store = ArtifactStore::new_memory();
342        let content = b"refcount test";
343        let hash = store.store(content, ArtifactType::Generic).unwrap();
344        let _ = store.store(content, ArtifactType::Generic).unwrap();
345        let (_, _, _, _, refcount) = store.metadata(&hash).unwrap();
346        assert_eq!(refcount, 2);
347    }
348
349    #[test]
350    fn reference_counting_release() {
351        let store = ArtifactStore::new_memory();
352        let content = b"rc data";
353        let hash = store.store(content, ArtifactType::Generic).unwrap();
354        let _ = store.store(content, ArtifactType::Generic).unwrap();
355
356        assert!(!store.release(&hash)); // 2 -> 1, not zero yet
357        assert!(store.release(&hash)); // 1 -> 0, eligible for GC
358    }
359
360    #[test]
361    fn load_nonexistent_fails() {
362        let store = ArtifactStore::new_memory();
363        let result = store.load("0000000000000000000000000000000000000000000000000000000000000000");
364        assert!(result.is_err());
365    }
366
367    #[test]
368    fn total_bytes_tracked() {
369        let store = ArtifactStore::new_memory();
370        store.store(b"aaaa", ArtifactType::Generic).unwrap();
371        store.store(b"bbb", ArtifactType::Generic).unwrap();
372        assert_eq!(store.total_bytes(), 7);
373    }
374
375    #[test]
376    fn remove_artifact() {
377        let store = ArtifactStore::new_memory();
378        let hash = store.store(b"remove me", ArtifactType::Generic).unwrap();
379        assert!(store.contains(&hash));
380        store.remove(&hash).unwrap();
381        assert!(!store.contains(&hash));
382        assert_eq!(store.count(), 0);
383    }
384
385    #[test]
386    fn file_backend_roundtrip() {
387        let dir = std::env::temp_dir().join(format!("artifact_test_{}", uuid::Uuid::new_v4()));
388        std::fs::create_dir_all(&dir).unwrap();
389
390        let store = ArtifactStore::new_file(dir.clone());
391        let content = b"file backend content";
392        let hash = store.store(content, ArtifactType::AppManifest).unwrap();
393
394        // Verify two-level directory structure.
395        let prefix = &hash[..2];
396        let file_path = dir.join(prefix).join(&hash);
397        assert!(file_path.exists(), "expected file at {file_path:?}");
398
399        let loaded = store.load(&hash).unwrap();
400        assert_eq!(loaded, content);
401
402        // Cleanup.
403        let _ = std::fs::remove_dir_all(&dir);
404    }
405
406    #[test]
407    fn different_content_different_hash() {
408        let store = ArtifactStore::new_memory();
409        let h1 = store.store(b"alpha", ArtifactType::Generic).unwrap();
410        let h2 = store.store(b"beta", ArtifactType::Generic).unwrap();
411        assert_ne!(h1, h2);
412        assert_eq!(store.count(), 2);
413    }
414
415    #[tokio::test]
416    async fn system_service_impl() {
417        let store = ArtifactStore::new_memory();
418        assert_eq!(store.name(), "artifact-store");
419        assert_eq!(store.service_type(), ServiceType::Core);
420        store.start().await.unwrap();
421        let health = store.health_check().await;
422        assert_eq!(health, HealthStatus::Healthy);
423        store.stop().await.unwrap();
424    }
425}