ordinary_storage/stores/
asset.rs1use anyhow::bail;
6use bytes::{BufMut, Bytes, BytesMut};
7use ordinary_config::{AssetsLimits, CompressionAlgorithm};
8use parking_lot::Mutex;
9use saferlmdb::{
10 self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
11};
12use std::fmt::{Display, Formatter};
13use std::sync::Arc;
14use tracing::{Level, instrument};
15
16pub struct PercentageDisplay(pub f64);
17
18impl Display for PercentageDisplay {
19 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
20 write!(f, "{:.2}%", self.0)
21 }
22}
23
24pub struct AssetStore {
25 limits: AssetsLimits,
26
27 env: Arc<Environment>,
28
29 asset_db: Arc<Database<'static>>,
31
32 log_size: bool,
33
34 store_size: Arc<Mutex<u64>>,
35}
36
37impl AssetStore {
38 pub fn new(
39 limits: AssetsLimits,
40 env: &Arc<Environment>,
41 log_size: bool,
42 ) -> anyhow::Result<Self> {
43 let asset_db = Arc::new(Database::open(
44 env.clone(),
45 Some("asset"),
46 &DatabaseOptions::new(lmdb::db::Flags::CREATE),
47 )?);
48
49 let mut store_size = 0;
50
51 let txn = ReadTransaction::new(env.clone())?;
52 let access = txn.access();
53
54 let mut asset_cursor = txn.cursor(asset_db.clone())?;
55
56 if let Ok((key, val)) = asset_cursor.first::<[u8], [u8]>(&access) {
57 store_size += key.len() as u64;
58 store_size += val.len() as u64;
59
60 while let Ok((key, val)) = asset_cursor.next::<[u8], [u8]>(&access) {
61 store_size += key.len() as u64;
62 store_size += val.len() as u64;
63 }
64 }
65
66 Ok(Self {
67 limits,
68 env: env.clone(),
69 asset_db,
70 log_size,
71 store_size: Arc::new(Mutex::new(store_size)),
73 })
74 }
75
76 #[allow(clippy::cast_precision_loss)]
77 #[instrument(skip_all, err)]
78 pub fn put(
79 &self,
80 path: &str,
81 asset: &[u8],
82 compression: Option<(&CompressionAlgorithm, usize)>,
83 ) -> anyhow::Result<()> {
84 let mut key = BytesMut::with_capacity(path.len() + 1);
85 key.put(path.as_bytes());
86
87 if let Some(compression) = compression {
88 key.put_u8(compression.0.as_u8());
89 }
90
91 let size = (key.len() + asset.len()) as u64;
92
93 if size > self.limits.max_asset_size {
94 bail!("exceeds asset size limit");
95 }
96
97 let mut store_size = self.store_size.lock();
98
99 if *store_size + size > self.limits.max_store_size {
100 bail!("exceeds store size limit");
101 }
102
103 let mut overwrite_size = 0;
104
105 let txn = WriteTransaction::new(self.env.clone())?;
106
107 {
108 let mut access = txn.access();
109 if let Ok(result) = access.get::<[u8], [u8]>(&self.asset_db, key.as_ref()) {
110 overwrite_size = (key.len() + result.len()) as u64;
111 }
112
113 access.put(&self.asset_db, key.as_ref(), asset, &put::Flags::empty())?;
114 }
115
116 txn.commit()?;
117
118 *store_size -= overwrite_size;
119 *store_size += size;
120
121 if self.log_size {
122 let (reduction, source_size) = if let Some((_, pre_compression_size)) = compression {
123 (
124 Some(tracing::field::display(PercentageDisplay(
125 ((pre_compression_size as f64 - size as f64) / pre_compression_size as f64)
126 * 100.0,
127 ))),
128 pre_compression_size as u64,
129 )
130 } else {
131 (None, size)
132 };
133
134 tracing::info!(
135 %path,
136 store.size = %bytesize::ByteSize(*store_size).display().si_short(),
137 asset.compression = %match compression {
138 Some(c) => c.0.as_str(),
139 None => "none"
140 },
141 asset.size.source = %bytesize::ByteSize(source_size).display().si_short(),
142 asset.size.compressed = compression.is_some().then_some(display(bytesize::ByteSize(size).display().si_short())),
143 asset.size.reduction = reduction,
144 );
145 } else {
146 tracing::info!(
147 %path,
148 compression = %match compression {
149 Some(c) => c.0.as_str(),
150 None => "none",
151 }
152 );
153 }
154
155 drop(store_size);
156
157 Ok(())
158 }
159
160 #[instrument(skip_all, err(level = Level::WARN))]
161 pub fn get(
162 &self,
163 path: &str,
164 compression: Option<&CompressionAlgorithm>,
165 ) -> anyhow::Result<Bytes> {
166 tracing::info!(
167 path,
168 compression = match compression {
169 Some(c) => c.as_str(),
170 None => "none",
171 }
172 );
173
174 let mut key = BytesMut::with_capacity(path.len() + 1);
175 key.put(path.as_bytes());
176
177 if let Some(compression) = compression {
178 key.put_u8(compression.as_u8());
179 }
180
181 let txn = ReadTransaction::new(self.env.clone())?;
182
183 let access = txn.access();
184 let result = access.get(&self.asset_db, key.as_ref())?;
185
186 Ok(Bytes::copy_from_slice(result))
187 }
188
189 #[instrument(skip_all, err)]
190 pub fn delete(
191 &self,
192 path: &str,
193 compression: Option<&CompressionAlgorithm>,
194 ) -> anyhow::Result<()> {
195 tracing::info!(
196 path,
197 compression = match compression {
198 Some(c) => c.as_str(),
199 None => "none",
200 }
201 );
202
203 let mut key = BytesMut::with_capacity(path.len() + 1);
204 key.put(path.as_bytes());
205
206 if let Some(compression) = compression {
207 key.put_u8(compression.as_u8());
208 }
209
210 let txn = WriteTransaction::new(self.env.clone())?;
211
212 {
213 let mut access = txn.access();
214 let result = access.get::<[u8], [u8]>(&self.asset_db, key.as_ref())?;
215
216 let mut store_size = self.store_size.lock();
217 *store_size -= (result.len() + key.len()) as u64;
218
219 drop(store_size);
220
221 access.del_key(&self.asset_db, key.as_ref())?;
222 }
223
224 txn.commit()?;
225
226 Ok(())
227 }
228}