Skip to main content

ordinary_storage/stores/
asset.rs

1// Copyright (C) 2026 Ordinary Labs, LLC.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4
5use anyhow::bail;
6use bytes::{BufMut, Bytes, BytesMut};
7use ordinary_config::{AssetsLimits, CompressionAlgorithm};
8use parking_lot::Mutex;
9use saferlmdb::{
10    self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
11};
12use std::fmt::{Display, Formatter};
13use std::sync::Arc;
14use tracing::{Level, instrument};
15
16pub struct PercentageDisplay(pub f64);
17
18impl Display for PercentageDisplay {
19    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
20        write!(f, "{:.2}%", self.0)
21    }
22}
23
24pub struct AssetStore {
25    limits: AssetsLimits,
26
27    env: Arc<Environment>,
28
29    /// stores static assets to be served by web server
30    asset_db: Arc<Database<'static>>,
31
32    log_size: bool,
33
34    store_size: Arc<Mutex<u64>>,
35}
36
37impl AssetStore {
38    pub fn new(
39        limits: AssetsLimits,
40        env: &Arc<Environment>,
41        log_size: bool,
42    ) -> anyhow::Result<Self> {
43        let asset_db = Arc::new(Database::open(
44            env.clone(),
45            Some("asset"),
46            &DatabaseOptions::new(lmdb::db::Flags::CREATE),
47        )?);
48
49        let mut store_size = 0;
50
51        let txn = ReadTransaction::new(env.clone())?;
52        let access = txn.access();
53
54        let mut asset_cursor = txn.cursor(asset_db.clone())?;
55
56        if let Ok((key, val)) = asset_cursor.first::<[u8], [u8]>(&access) {
57            store_size += key.len() as u64;
58            store_size += val.len() as u64;
59
60            while let Ok((key, val)) = asset_cursor.next::<[u8], [u8]>(&access) {
61                store_size += key.len() as u64;
62                store_size += val.len() as u64;
63            }
64        }
65
66        Ok(Self {
67            limits,
68            env: env.clone(),
69            asset_db,
70            log_size,
71            // todo: read in on start up
72            store_size: Arc::new(Mutex::new(store_size)),
73        })
74    }
75
76    #[allow(clippy::cast_precision_loss)]
77    #[instrument(skip_all, err)]
78    pub fn put(
79        &self,
80        path: &str,
81        asset: &[u8],
82        compression: Option<(&CompressionAlgorithm, usize)>,
83    ) -> anyhow::Result<()> {
84        let mut key = BytesMut::with_capacity(path.len() + 1);
85        key.put(path.as_bytes());
86
87        if let Some(compression) = compression {
88            key.put_u8(compression.0.as_u8());
89        }
90
91        let size = (key.len() + asset.len()) as u64;
92
93        if size > self.limits.max_asset_size {
94            bail!("exceeds asset size limit");
95        }
96
97        let mut store_size = self.store_size.lock();
98
99        if *store_size + size > self.limits.max_store_size {
100            bail!("exceeds store size limit");
101        }
102
103        let mut overwrite_size = 0;
104
105        let txn = WriteTransaction::new(self.env.clone())?;
106
107        {
108            let mut access = txn.access();
109            if let Ok(result) = access.get::<[u8], [u8]>(&self.asset_db, key.as_ref()) {
110                overwrite_size = (key.len() + result.len()) as u64;
111            }
112
113            access.put(&self.asset_db, key.as_ref(), asset, &put::Flags::empty())?;
114        }
115
116        txn.commit()?;
117
118        *store_size -= overwrite_size;
119        *store_size += size;
120
121        if self.log_size {
122            let (reduction, source_size) = if let Some((_, pre_compression_size)) = compression {
123                (
124                    Some(tracing::field::display(PercentageDisplay(
125                        ((pre_compression_size as f64 - size as f64) / pre_compression_size as f64)
126                            * 100.0,
127                    ))),
128                    pre_compression_size as u64,
129                )
130            } else {
131                (None, size)
132            };
133
134            tracing::info!(
135                %path,
136                store.size = %bytesize::ByteSize(*store_size).display().si_short(),
137                asset.compression = %match compression {
138                    Some(c) => c.0.as_str(),
139                    None => "none"
140                },
141                asset.size.source = %bytesize::ByteSize(source_size).display().si_short(),
142                asset.size.compressed = compression.is_some().then_some(display(bytesize::ByteSize(size).display().si_short())),
143                asset.size.reduction = reduction,
144            );
145        } else {
146            tracing::info!(
147                %path,
148                compression = %match compression {
149                    Some(c) => c.0.as_str(),
150                    None => "none",
151                }
152            );
153        }
154
155        drop(store_size);
156
157        Ok(())
158    }
159
160    #[instrument(skip_all, err(level = Level::WARN))]
161    pub fn get(
162        &self,
163        path: &str,
164        compression: Option<&CompressionAlgorithm>,
165    ) -> anyhow::Result<Bytes> {
166        tracing::info!(
167            path,
168            compression = match compression {
169                Some(c) => c.as_str(),
170                None => "none",
171            }
172        );
173
174        let mut key = BytesMut::with_capacity(path.len() + 1);
175        key.put(path.as_bytes());
176
177        if let Some(compression) = compression {
178            key.put_u8(compression.as_u8());
179        }
180
181        let txn = ReadTransaction::new(self.env.clone())?;
182
183        let access = txn.access();
184        let result = access.get(&self.asset_db, key.as_ref())?;
185
186        Ok(Bytes::copy_from_slice(result))
187    }
188
189    #[instrument(skip_all, err)]
190    pub fn delete(
191        &self,
192        path: &str,
193        compression: Option<&CompressionAlgorithm>,
194    ) -> anyhow::Result<()> {
195        tracing::info!(
196            path,
197            compression = match compression {
198                Some(c) => c.as_str(),
199                None => "none",
200            }
201        );
202
203        let mut key = BytesMut::with_capacity(path.len() + 1);
204        key.put(path.as_bytes());
205
206        if let Some(compression) = compression {
207            key.put_u8(compression.as_u8());
208        }
209
210        let txn = WriteTransaction::new(self.env.clone())?;
211
212        {
213            let mut access = txn.access();
214            let result = access.get::<[u8], [u8]>(&self.asset_db, key.as_ref())?;
215
216            let mut store_size = self.store_size.lock();
217            *store_size -= (result.len() + key.len()) as u64;
218
219            drop(store_size);
220
221            access.del_key(&self.asset_db, key.as_ref())?;
222        }
223
224        txn.commit()?;
225
226        Ok(())
227    }
228}