ordinary-storage 0.6.0

Storage for Ordinary
Documentation
// Copyright (C) 2026 Ordinary Labs, LLC.
//
// SPDX-License-Identifier: AGPL-3.0-only

use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
use ordinary_config::{AssetsLimits, CompressionAlgorithm};
use parking_lot::Mutex;
use saferlmdb::{
    self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
};
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use tracing::{Level, instrument};

pub struct PercentageDisplay(pub f64);

impl Display for PercentageDisplay {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "{:.2}%", self.0)
    }
}

pub struct AssetStore {
    limits: AssetsLimits,

    env: Arc<Environment>,

    /// stores static assets to be served by web server
    asset_db: Arc<Database<'static>>,

    log_size: bool,

    store_size: Arc<Mutex<u64>>,
}

impl AssetStore {
    pub fn new(
        limits: AssetsLimits,
        env: &Arc<Environment>,
        log_size: bool,
    ) -> anyhow::Result<Self> {
        let asset_db = Arc::new(Database::open(
            env.clone(),
            Some("asset"),
            &DatabaseOptions::new(lmdb::db::Flags::CREATE),
        )?);

        let mut store_size = 0;

        let txn = ReadTransaction::new(env.clone())?;
        let access = txn.access();

        let mut asset_cursor = txn.cursor(asset_db.clone())?;

        if let Ok((key, val)) = asset_cursor.first::<[u8], [u8]>(&access) {
            store_size += key.len() as u64;
            store_size += val.len() as u64;

            while let Ok((key, val)) = asset_cursor.next::<[u8], [u8]>(&access) {
                store_size += key.len() as u64;
                store_size += val.len() as u64;
            }
        }

        Ok(Self {
            limits,
            env: env.clone(),
            asset_db,
            log_size,
            // todo: read in on start up
            store_size: Arc::new(Mutex::new(store_size)),
        })
    }

    #[allow(clippy::cast_precision_loss)]
    #[instrument(skip_all, err)]
    pub fn put(
        &self,
        path: &str,
        asset: &[u8],
        compression: Option<(&CompressionAlgorithm, usize)>,
    ) -> anyhow::Result<()> {
        let mut key = BytesMut::with_capacity(path.len() + 1);
        key.put(path.as_bytes());

        if let Some(compression) = compression {
            key.put_u8(compression.0.as_u8());
        }

        let size = (key.len() + asset.len()) as u64;

        if size > self.limits.max_asset_size {
            bail!("exceeds asset size limit");
        }

        let mut store_size = self.store_size.lock();

        if *store_size + size > self.limits.max_store_size {
            bail!("exceeds store size limit");
        }

        let mut overwrite_size = 0;

        let txn = WriteTransaction::new(self.env.clone())?;

        {
            let mut access = txn.access();
            if let Ok(result) = access.get::<[u8], [u8]>(&self.asset_db, key.as_ref()) {
                overwrite_size = (key.len() + result.len()) as u64;
            }

            access.put(&self.asset_db, key.as_ref(), asset, &put::Flags::empty())?;
        }

        txn.commit()?;

        *store_size -= overwrite_size;
        *store_size += size;

        if self.log_size {
            let (reduction, source_size) = if let Some((_, pre_compression_size)) = compression {
                (
                    Some(tracing::field::display(PercentageDisplay(
                        ((pre_compression_size as f64 - size as f64) / pre_compression_size as f64)
                            * 100.0,
                    ))),
                    pre_compression_size as u64,
                )
            } else {
                (None, size)
            };

            tracing::info!(
                %path,
                store.size = %bytesize::ByteSize(*store_size).display().si_short(),
                asset.compression = %match compression {
                    Some(c) => c.0.as_str(),
                    None => "none"
                },
                asset.size.source = %bytesize::ByteSize(source_size).display().si_short(),
                asset.size.compressed = compression.is_some().then_some(display(bytesize::ByteSize(size).display().si_short())),
                asset.size.reduction = reduction,
            );
        } else {
            tracing::info!(
                %path,
                compression = %match compression {
                    Some(c) => c.0.as_str(),
                    None => "none",
                }
            );
        }

        drop(store_size);

        Ok(())
    }

    #[instrument(skip_all, err(level = Level::WARN))]
    pub fn get(
        &self,
        path: &str,
        compression: Option<&CompressionAlgorithm>,
    ) -> anyhow::Result<Bytes> {
        tracing::info!(
            path,
            compression = match compression {
                Some(c) => c.as_str(),
                None => "none",
            }
        );

        let mut key = BytesMut::with_capacity(path.len() + 1);
        key.put(path.as_bytes());

        if let Some(compression) = compression {
            key.put_u8(compression.as_u8());
        }

        let txn = ReadTransaction::new(self.env.clone())?;

        let access = txn.access();
        let result = access.get(&self.asset_db, key.as_ref())?;

        Ok(Bytes::copy_from_slice(result))
    }

    #[instrument(skip_all, err)]
    pub fn delete(
        &self,
        path: &str,
        compression: Option<&CompressionAlgorithm>,
    ) -> anyhow::Result<()> {
        tracing::info!(
            path,
            compression = match compression {
                Some(c) => c.as_str(),
                None => "none",
            }
        );

        let mut key = BytesMut::with_capacity(path.len() + 1);
        key.put(path.as_bytes());

        if let Some(compression) = compression {
            key.put_u8(compression.as_u8());
        }

        let txn = WriteTransaction::new(self.env.clone())?;

        {
            let mut access = txn.access();
            let result = access.get::<[u8], [u8]>(&self.asset_db, key.as_ref())?;

            let mut store_size = self.store_size.lock();
            *store_size -= (result.len() + key.len()) as u64;

            drop(store_size);

            access.del_key(&self.asset_db, key.as_ref())?;
        }

        txn.commit()?;

        Ok(())
    }
}