Skip to main content

anvil_core/
storage.rs

1//! Storage subsystem. Thin wrapper over object_store for local/S3/GCS.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use object_store::{path::Path as ObjectPath, ObjectStore};
8
9use crate::Error;
10
11#[async_trait]
12pub trait StorageDisk: Send + Sync {
13    async fn put(&self, key: &str, data: Bytes) -> Result<(), Error>;
14    async fn get(&self, key: &str) -> Result<Bytes, Error>;
15    async fn delete(&self, key: &str) -> Result<(), Error>;
16    async fn exists(&self, key: &str) -> Result<bool, Error>;
17    fn public_url(&self, key: &str) -> Option<String>;
18}
19
20#[derive(Clone)]
21pub struct StorageManager {
22    disks: Arc<parking_lot::RwLock<indexmap::IndexMap<String, Arc<dyn StorageDisk>>>>,
23    default: String,
24}
25
26impl StorageManager {
27    pub fn new(default: impl Into<String>) -> Self {
28        Self {
29            disks: Arc::new(parking_lot::RwLock::new(indexmap::IndexMap::new())),
30            default: default.into(),
31        }
32    }
33
34    pub fn local_default() -> Self {
35        let mgr = Self::new("local");
36        let local = ObjectStoreDisk::local("storage/app").expect("local disk init");
37        mgr.register("local", Arc::new(local));
38        mgr
39    }
40
41    pub fn register(&self, name: impl Into<String>, disk: Arc<dyn StorageDisk>) {
42        self.disks.write().insert(name.into(), disk);
43    }
44
45    pub fn disk(&self, name: &str) -> Result<Arc<dyn StorageDisk>, Error> {
46        self.disks
47            .read()
48            .get(name)
49            .cloned()
50            .ok_or_else(|| Error::Storage(format!("disk '{name}' not registered")))
51    }
52
53    pub fn default(&self) -> Result<Arc<dyn StorageDisk>, Error> {
54        self.disk(&self.default)
55    }
56}
57
58pub struct ObjectStoreDisk {
59    store: Arc<dyn ObjectStore>,
60    base_url: Option<String>,
61}
62
63impl ObjectStoreDisk {
64    pub fn local(root: &str) -> Result<Self, Error> {
65        std::fs::create_dir_all(root).ok();
66        let store = object_store::local::LocalFileSystem::new_with_prefix(root)
67            .map_err(|e| Error::Storage(e.to_string()))?;
68        Ok(Self {
69            store: Arc::new(store),
70            base_url: None,
71        })
72    }
73}
74
75#[async_trait]
76impl StorageDisk for ObjectStoreDisk {
77    async fn put(&self, key: &str, data: Bytes) -> Result<(), Error> {
78        let path = ObjectPath::from(key);
79        self.store
80            .put(&path, data.into())
81            .await
82            .map_err(|e| Error::Storage(e.to_string()))?;
83        Ok(())
84    }
85
86    async fn get(&self, key: &str) -> Result<Bytes, Error> {
87        let path = ObjectPath::from(key);
88        let result = self
89            .store
90            .get(&path)
91            .await
92            .map_err(|e| Error::Storage(e.to_string()))?;
93        result
94            .bytes()
95            .await
96            .map_err(|e| Error::Storage(e.to_string()))
97    }
98
99    async fn delete(&self, key: &str) -> Result<(), Error> {
100        let path = ObjectPath::from(key);
101        self.store
102            .delete(&path)
103            .await
104            .map_err(|e| Error::Storage(e.to_string()))?;
105        Ok(())
106    }
107
108    async fn exists(&self, key: &str) -> Result<bool, Error> {
109        let path = ObjectPath::from(key);
110        match self.store.head(&path).await {
111            Ok(_) => Ok(true),
112            Err(object_store::Error::NotFound { .. }) => Ok(false),
113            Err(e) => Err(Error::Storage(e.to_string())),
114        }
115    }
116
117    fn public_url(&self, key: &str) -> Option<String> {
118        self.base_url.as_ref().map(|b| format!("{b}/{key}"))
119    }
120}