1use std::path::PathBuf;
11use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
12
13use async_trait::async_trait;
14use chrono::{DateTime, Utc};
15use dashmap::DashMap;
16use serde::{Deserialize, Serialize};
17use tracing::info;
18
19use crate::error::KernelError;
20use crate::health::HealthStatus;
21use crate::service::{ServiceType, SystemService};
22
23#[non_exhaustive]
29#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
30pub enum ArtifactType {
31 WasmModule,
33 AppManifest,
35 ConfigBundle,
37 Generic,
39}
40
41impl std::fmt::Display for ArtifactType {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 match self {
44 Self::WasmModule => write!(f, "wasm-module"),
45 Self::AppManifest => write!(f, "app-manifest"),
46 Self::ConfigBundle => write!(f, "config-bundle"),
47 Self::Generic => write!(f, "generic"),
48 }
49 }
50}
51
52pub struct StoredArtifact {
58 pub hash: String,
60 pub size: u64,
62 pub content_type: ArtifactType,
64 pub stored_at: DateTime<Utc>,
66 pub reference_count: AtomicU32,
68}
69
70#[non_exhaustive]
76pub enum ArtifactBackend {
77 Memory(DashMap<String, Vec<u8>>),
79 File {
81 base_path: PathBuf,
83 },
84}
85
86pub struct ArtifactStore {
97 artifacts: DashMap<String, StoredArtifact>,
99 backend: ArtifactBackend,
101 total_size: AtomicU64,
103}
104
105impl ArtifactStore {
106 pub fn new_memory() -> Self {
108 Self {
109 artifacts: DashMap::new(),
110 backend: ArtifactBackend::Memory(DashMap::new()),
111 total_size: AtomicU64::new(0),
112 }
113 }
114
115 pub fn new_file(base_path: PathBuf) -> Self {
117 Self {
118 artifacts: DashMap::new(),
119 backend: ArtifactBackend::File { base_path },
120 total_size: AtomicU64::new(0),
121 }
122 }
123
124 pub fn store(
129 &self,
130 content: &[u8],
131 content_type: ArtifactType,
132 ) -> Result<String, KernelError> {
133 let hash = blake3::hash(content).to_hex().to_string();
134
135 if let Some(existing) = self.artifacts.get(&hash) {
137 existing.reference_count.fetch_add(1, Ordering::Relaxed);
138 return Ok(hash);
139 }
140
141 match &self.backend {
143 ArtifactBackend::Memory(map) => {
144 map.insert(hash.clone(), content.to_vec());
145 }
146 ArtifactBackend::File { base_path } => {
147 let prefix = &hash[..2.min(hash.len())];
148 let dir = base_path.join(prefix);
149 std::fs::create_dir_all(&dir).map_err(|e| {
150 KernelError::Service(format!("artifact dir create: {e}"))
151 })?;
152 std::fs::write(dir.join(&hash), content).map_err(|e| {
153 KernelError::Service(format!("artifact write: {e}"))
154 })?;
155 }
156 }
157
158 let size = content.len() as u64;
160 self.artifacts.insert(
161 hash.clone(),
162 StoredArtifact {
163 hash: hash.clone(),
164 size,
165 content_type,
166 stored_at: Utc::now(),
167 reference_count: AtomicU32::new(1),
168 },
169 );
170 self.total_size.fetch_add(size, Ordering::Relaxed);
171
172 info!(hash = %hash, size, "artifact stored");
173 Ok(hash)
174 }
175
176 pub fn load(&self, hash: &str) -> Result<Vec<u8>, KernelError> {
178 if !self.artifacts.contains_key(hash) {
179 return Err(KernelError::Service(format!(
180 "artifact not found: {hash}"
181 )));
182 }
183
184 let content = match &self.backend {
185 ArtifactBackend::Memory(map) => map
186 .get(hash)
187 .map(|v| v.value().clone()),
188 ArtifactBackend::File { base_path } => {
189 let prefix = &hash[..2.min(hash.len())];
190 let path = base_path.join(prefix).join(hash);
191 std::fs::read(&path).ok()
192 }
193 };
194
195 let content = content.ok_or_else(|| {
196 KernelError::Service(format!("artifact data missing: {hash}"))
197 })?;
198
199 let actual_hash = blake3::hash(&content).to_hex().to_string();
201 if actual_hash != hash {
202 return Err(KernelError::Service(format!(
203 "artifact integrity error: expected {hash}, got {actual_hash}"
204 )));
205 }
206
207 Ok(content)
208 }
209
210 pub fn contains(&self, hash: &str) -> bool {
212 self.artifacts.contains_key(hash)
213 }
214
215 pub fn release(&self, hash: &str) -> bool {
219 if let Some(entry) = self.artifacts.get(hash) {
220 let prev = entry.reference_count.fetch_sub(1, Ordering::Relaxed);
221 return prev <= 1;
222 }
223 false
224 }
225
226 pub fn remove(&self, hash: &str) -> Result<(), KernelError> {
228 if let Some((_, meta)) = self.artifacts.remove(hash) {
229 self.total_size.fetch_sub(meta.size, Ordering::Relaxed);
230 match &self.backend {
231 ArtifactBackend::Memory(map) => {
232 map.remove(hash);
233 }
234 ArtifactBackend::File { base_path } => {
235 let prefix = &hash[..2.min(hash.len())];
236 let path = base_path.join(prefix).join(hash);
237 let _ = std::fs::remove_file(path);
238 }
239 }
240 }
241 Ok(())
242 }
243
244 pub fn count(&self) -> usize {
246 self.artifacts.len()
247 }
248
249 pub fn total_bytes(&self) -> u64 {
251 self.total_size.load(Ordering::Relaxed)
252 }
253
254 pub fn metadata(&self, hash: &str) -> Option<(String, u64, ArtifactType, DateTime<Utc>, u32)> {
256 self.artifacts.get(hash).map(|a| {
257 (
258 a.hash.clone(),
259 a.size,
260 a.content_type.clone(),
261 a.stored_at,
262 a.reference_count.load(Ordering::Relaxed),
263 )
264 })
265 }
266}
267
268#[async_trait]
269impl SystemService for ArtifactStore {
270 fn name(&self) -> &str {
271 "artifact-store"
272 }
273
274 fn service_type(&self) -> ServiceType {
275 ServiceType::Core
276 }
277
278 async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
279 info!("artifact store started");
280 Ok(())
281 }
282
283 async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
284 info!(
285 count = self.count(),
286 total_bytes = self.total_bytes(),
287 "artifact store stopped"
288 );
289 Ok(())
290 }
291
292 async fn health_check(&self) -> HealthStatus {
293 HealthStatus::Healthy
294 }
295}
296
297#[cfg(test)]
300mod tests {
301 use super::*;
302
303 #[test]
304 fn store_and_load_roundtrip() {
305 let store = ArtifactStore::new_memory();
306 let content = b"hello weftos artifact";
307 let hash = store.store(content, ArtifactType::Generic).unwrap();
308 let loaded = store.load(&hash).unwrap();
309 assert_eq!(loaded, content);
310 }
311
312 #[test]
313 fn hash_verification_on_load() {
314 let store = ArtifactStore::new_memory();
315 let content = b"original content";
316 let hash = store.store(content, ArtifactType::WasmModule).unwrap();
317
318 if let ArtifactBackend::Memory(map) = &store.backend {
320 map.insert(hash.clone(), b"tampered content".to_vec());
321 }
322
323 let result = store.load(&hash);
324 assert!(result.is_err());
325 let err = result.unwrap_err().to_string();
326 assert!(err.contains("integrity error"), "got: {err}");
327 }
328
329 #[test]
330 fn duplicate_store_returns_same_hash() {
331 let store = ArtifactStore::new_memory();
332 let content = b"dedup test";
333 let h1 = store.store(content, ArtifactType::Generic).unwrap();
334 let h2 = store.store(content, ArtifactType::Generic).unwrap();
335 assert_eq!(h1, h2);
336 assert_eq!(store.count(), 1);
337 }
338
339 #[test]
340 fn duplicate_store_increments_refcount() {
341 let store = ArtifactStore::new_memory();
342 let content = b"refcount test";
343 let hash = store.store(content, ArtifactType::Generic).unwrap();
344 let _ = store.store(content, ArtifactType::Generic).unwrap();
345 let (_, _, _, _, refcount) = store.metadata(&hash).unwrap();
346 assert_eq!(refcount, 2);
347 }
348
349 #[test]
350 fn reference_counting_release() {
351 let store = ArtifactStore::new_memory();
352 let content = b"rc data";
353 let hash = store.store(content, ArtifactType::Generic).unwrap();
354 let _ = store.store(content, ArtifactType::Generic).unwrap();
355
356 assert!(!store.release(&hash)); assert!(store.release(&hash)); }
359
360 #[test]
361 fn load_nonexistent_fails() {
362 let store = ArtifactStore::new_memory();
363 let result = store.load("0000000000000000000000000000000000000000000000000000000000000000");
364 assert!(result.is_err());
365 }
366
367 #[test]
368 fn total_bytes_tracked() {
369 let store = ArtifactStore::new_memory();
370 store.store(b"aaaa", ArtifactType::Generic).unwrap();
371 store.store(b"bbb", ArtifactType::Generic).unwrap();
372 assert_eq!(store.total_bytes(), 7);
373 }
374
375 #[test]
376 fn remove_artifact() {
377 let store = ArtifactStore::new_memory();
378 let hash = store.store(b"remove me", ArtifactType::Generic).unwrap();
379 assert!(store.contains(&hash));
380 store.remove(&hash).unwrap();
381 assert!(!store.contains(&hash));
382 assert_eq!(store.count(), 0);
383 }
384
385 #[test]
386 fn file_backend_roundtrip() {
387 let dir = std::env::temp_dir().join(format!("artifact_test_{}", uuid::Uuid::new_v4()));
388 std::fs::create_dir_all(&dir).unwrap();
389
390 let store = ArtifactStore::new_file(dir.clone());
391 let content = b"file backend content";
392 let hash = store.store(content, ArtifactType::AppManifest).unwrap();
393
394 let prefix = &hash[..2];
396 let file_path = dir.join(prefix).join(&hash);
397 assert!(file_path.exists(), "expected file at {file_path:?}");
398
399 let loaded = store.load(&hash).unwrap();
400 assert_eq!(loaded, content);
401
402 let _ = std::fs::remove_dir_all(&dir);
404 }
405
406 #[test]
407 fn different_content_different_hash() {
408 let store = ArtifactStore::new_memory();
409 let h1 = store.store(b"alpha", ArtifactType::Generic).unwrap();
410 let h2 = store.store(b"beta", ArtifactType::Generic).unwrap();
411 assert_ne!(h1, h2);
412 assert_eq!(store.count(), 2);
413 }
414
415 #[tokio::test]
416 async fn system_service_impl() {
417 let store = ArtifactStore::new_memory();
418 assert_eq!(store.name(), "artifact-store");
419 assert_eq!(store.service_type(), ServiceType::Core);
420 store.start().await.unwrap();
421 let health = store.health_check().await;
422 assert_eq!(health, HealthStatus::Healthy);
423 store.stop().await.unwrap();
424 }
425}