use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
use ordinary_config::{AssetsLimits, CompressionAlgorithm};
use parking_lot::Mutex;
use saferlmdb::{
self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
};
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use tracing::{Level, instrument};
pub struct PercentageDisplay(pub f64);
impl Display for PercentageDisplay {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:.2}%", self.0)
}
}
pub struct AssetStore {
limits: AssetsLimits,
env: Arc<Environment>,
asset_db: Arc<Database<'static>>,
log_size: bool,
store_size: Arc<Mutex<u64>>,
}
impl AssetStore {
pub fn new(
limits: AssetsLimits,
env: &Arc<Environment>,
log_size: bool,
) -> anyhow::Result<Self> {
let asset_db = Arc::new(Database::open(
env.clone(),
Some("asset"),
&DatabaseOptions::new(lmdb::db::Flags::CREATE),
)?);
let mut store_size = 0;
let txn = ReadTransaction::new(env.clone())?;
let access = txn.access();
let mut asset_cursor = txn.cursor(asset_db.clone())?;
if let Ok((key, val)) = asset_cursor.first::<[u8], [u8]>(&access) {
store_size += key.len() as u64;
store_size += val.len() as u64;
while let Ok((key, val)) = asset_cursor.next::<[u8], [u8]>(&access) {
store_size += key.len() as u64;
store_size += val.len() as u64;
}
}
Ok(Self {
limits,
env: env.clone(),
asset_db,
log_size,
store_size: Arc::new(Mutex::new(store_size)),
})
}
#[allow(clippy::cast_precision_loss)]
#[instrument(skip_all, err)]
pub fn put(
&self,
path: &str,
asset: &[u8],
compression: Option<(&CompressionAlgorithm, usize)>,
) -> anyhow::Result<()> {
let mut key = BytesMut::with_capacity(path.len() + 1);
key.put(path.as_bytes());
if let Some(compression) = compression {
key.put_u8(compression.0.as_u8());
}
let size = (key.len() + asset.len()) as u64;
if size > self.limits.max_asset_size {
bail!("exceeds asset size limit");
}
let mut store_size = self.store_size.lock();
if *store_size + size > self.limits.max_store_size {
bail!("exceeds store size limit");
}
let mut overwrite_size = 0;
let txn = WriteTransaction::new(self.env.clone())?;
{
let mut access = txn.access();
if let Ok(result) = access.get::<[u8], [u8]>(&self.asset_db, key.as_ref()) {
overwrite_size = (key.len() + result.len()) as u64;
}
access.put(&self.asset_db, key.as_ref(), asset, &put::Flags::empty())?;
}
txn.commit()?;
*store_size -= overwrite_size;
*store_size += size;
if self.log_size {
let (reduction, source_size) = if let Some((_, pre_compression_size)) = compression {
(
Some(tracing::field::display(PercentageDisplay(
((pre_compression_size as f64 - size as f64) / pre_compression_size as f64)
* 100.0,
))),
pre_compression_size as u64,
)
} else {
(None, size)
};
tracing::info!(
%path,
store.size = %bytesize::ByteSize(*store_size).display().si_short(),
asset.compression = %match compression {
Some(c) => c.0.as_str(),
None => "none"
},
asset.size.source = %bytesize::ByteSize(source_size).display().si_short(),
asset.size.compressed = compression.is_some().then_some(display(bytesize::ByteSize(size).display().si_short())),
asset.size.reduction = reduction,
);
} else {
tracing::info!(
%path,
compression = %match compression {
Some(c) => c.0.as_str(),
None => "none",
}
);
}
drop(store_size);
Ok(())
}
#[instrument(skip_all, err(level = Level::WARN))]
pub fn get(
&self,
path: &str,
compression: Option<&CompressionAlgorithm>,
) -> anyhow::Result<Bytes> {
tracing::info!(
path,
compression = match compression {
Some(c) => c.as_str(),
None => "none",
}
);
let mut key = BytesMut::with_capacity(path.len() + 1);
key.put(path.as_bytes());
if let Some(compression) = compression {
key.put_u8(compression.as_u8());
}
let txn = ReadTransaction::new(self.env.clone())?;
let access = txn.access();
let result = access.get(&self.asset_db, key.as_ref())?;
Ok(Bytes::copy_from_slice(result))
}
#[instrument(skip_all, err)]
pub fn delete(
&self,
path: &str,
compression: Option<&CompressionAlgorithm>,
) -> anyhow::Result<()> {
tracing::info!(
path,
compression = match compression {
Some(c) => c.as_str(),
None => "none",
}
);
let mut key = BytesMut::with_capacity(path.len() + 1);
key.put(path.as_bytes());
if let Some(compression) = compression {
key.put_u8(compression.as_u8());
}
let txn = WriteTransaction::new(self.env.clone())?;
{
let mut access = txn.access();
let result = access.get::<[u8], [u8]>(&self.asset_db, key.as_ref())?;
let mut store_size = self.store_size.lock();
*store_size -= (result.len() + key.len()) as u64;
drop(store_size);
access.del_key(&self.asset_db, key.as_ref())?;
}
txn.commit()?;
Ok(())
}
}