1use std::sync::Arc;
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use object_store::{path::Path as ObjectPath, ObjectStore};
8
9use crate::Error;
10
11#[async_trait]
12pub trait StorageDisk: Send + Sync {
13 async fn put(&self, key: &str, data: Bytes) -> Result<(), Error>;
14 async fn get(&self, key: &str) -> Result<Bytes, Error>;
15 async fn delete(&self, key: &str) -> Result<(), Error>;
16 async fn exists(&self, key: &str) -> Result<bool, Error>;
17 fn public_url(&self, key: &str) -> Option<String>;
18}
19
20#[derive(Clone)]
21pub struct StorageManager {
22 disks: Arc<parking_lot::RwLock<indexmap::IndexMap<String, Arc<dyn StorageDisk>>>>,
23 default: String,
24}
25
26impl StorageManager {
27 pub fn new(default: impl Into<String>) -> Self {
28 Self {
29 disks: Arc::new(parking_lot::RwLock::new(indexmap::IndexMap::new())),
30 default: default.into(),
31 }
32 }
33
34 pub fn local_default() -> Self {
35 let mgr = Self::new("local");
36 let local = ObjectStoreDisk::local("storage/app").expect("local disk init");
37 mgr.register("local", Arc::new(local));
38 mgr
39 }
40
41 pub fn register(&self, name: impl Into<String>, disk: Arc<dyn StorageDisk>) {
42 self.disks.write().insert(name.into(), disk);
43 }
44
45 pub fn disk(&self, name: &str) -> Result<Arc<dyn StorageDisk>, Error> {
46 self.disks
47 .read()
48 .get(name)
49 .cloned()
50 .ok_or_else(|| Error::Storage(format!("disk '{name}' not registered")))
51 }
52
53 pub fn default(&self) -> Result<Arc<dyn StorageDisk>, Error> {
54 self.disk(&self.default)
55 }
56}
57
58pub struct ObjectStoreDisk {
59 store: Arc<dyn ObjectStore>,
60 base_url: Option<String>,
61}
62
63impl ObjectStoreDisk {
64 pub fn local(root: &str) -> Result<Self, Error> {
65 std::fs::create_dir_all(root).ok();
66 let store = object_store::local::LocalFileSystem::new_with_prefix(root)
67 .map_err(|e| Error::Storage(e.to_string()))?;
68 Ok(Self {
69 store: Arc::new(store),
70 base_url: None,
71 })
72 }
73}
74
75#[async_trait]
76impl StorageDisk for ObjectStoreDisk {
77 async fn put(&self, key: &str, data: Bytes) -> Result<(), Error> {
78 let path = ObjectPath::from(key);
79 self.store
80 .put(&path, data.into())
81 .await
82 .map_err(|e| Error::Storage(e.to_string()))?;
83 Ok(())
84 }
85
86 async fn get(&self, key: &str) -> Result<Bytes, Error> {
87 let path = ObjectPath::from(key);
88 let result = self
89 .store
90 .get(&path)
91 .await
92 .map_err(|e| Error::Storage(e.to_string()))?;
93 result
94 .bytes()
95 .await
96 .map_err(|e| Error::Storage(e.to_string()))
97 }
98
99 async fn delete(&self, key: &str) -> Result<(), Error> {
100 let path = ObjectPath::from(key);
101 self.store
102 .delete(&path)
103 .await
104 .map_err(|e| Error::Storage(e.to_string()))?;
105 Ok(())
106 }
107
108 async fn exists(&self, key: &str) -> Result<bool, Error> {
109 let path = ObjectPath::from(key);
110 match self.store.head(&path).await {
111 Ok(_) => Ok(true),
112 Err(object_store::Error::NotFound { .. }) => Ok(false),
113 Err(e) => Err(Error::Storage(e.to_string())),
114 }
115 }
116
117 fn public_url(&self, key: &str) -> Option<String> {
118 self.base_url.as_ref().map(|b| format!("{b}/{key}"))
119 }
120}