use bytes::{BufMut, Bytes, BytesMut};
use ordinary_config::{AssetsLimits, CompressionAlgorithm};
use parking_lot::Mutex;
use saferlmdb::{
self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
};
use std::error::Error;
use std::sync::Arc;
use tracing::{Level, instrument};
pub struct AssetStore {
limits: AssetsLimits,
env: Arc<Environment>,
asset_db: Arc<Database<'static>>,
log_size: bool,
store_size: Arc<Mutex<u64>>,
}
impl AssetStore {
pub fn new(
limits: AssetsLimits,
env: &Arc<Environment>,
log_size: bool,
) -> Result<Self, Box<dyn Error>> {
let asset_db = Arc::new(Database::open(
env.clone(),
Some("asset"),
&DatabaseOptions::new(lmdb::db::Flags::CREATE),
)?);
let mut store_size = 0;
let txn = ReadTransaction::new(env.clone())?;
let access = txn.access();
let mut asset_cursor = txn.cursor(asset_db.clone())?;
if let Ok((key, val)) = asset_cursor.first::<[u8], [u8]>(&access) {
store_size += key.len() as u64;
store_size += val.len() as u64;
while let Ok((key, val)) = asset_cursor.next::<[u8], [u8]>(&access) {
store_size += key.len() as u64;
store_size += val.len() as u64;
}
}
Ok(Self {
limits,
env: env.clone(),
asset_db,
log_size,
store_size: Arc::new(Mutex::new(store_size)),
})
}
#[instrument(skip(self, asset, path, compression), err)]
pub fn put(
&self,
path: &str,
asset: &[u8],
compression: Option<&CompressionAlgorithm>,
) -> Result<(), Box<dyn Error>> {
let mut key = BytesMut::with_capacity(path.len() + 1);
key.put(path.as_bytes());
if let Some(compression) = compression {
key.put_u8(compression.as_u8());
}
let size = (key.len() + asset.len()) as u64;
if size > self.limits.max_asset_size {
return Err("exceeds asset size limit".into());
}
let mut store_size = self.store_size.lock();
if *store_size + size > self.limits.max_store_size {
return Err("exceeds store size limit".into());
}
let mut overwrite_size = 0;
let txn = WriteTransaction::new(self.env.clone())?;
{
let mut access = txn.access();
if let Ok(result) = access.get::<[u8], [u8]>(&self.asset_db, key.as_ref()) {
overwrite_size = (key.len() + result.len()) as u64;
}
access.put(&self.asset_db, key.as_ref(), asset, &put::Flags::empty())?;
}
txn.commit()?;
*store_size -= overwrite_size;
*store_size += size;
if self.log_size {
tracing::info!(
%path,
compression = %match compression {
Some(c) => c.as_str(),
None => "none"
},
size.asset = %bytesize::ByteSize(size).display().si_short(),
size.store = %bytesize::ByteSize(*store_size).display().si_short()
);
} else {
tracing::info!(
%path,
compression = %match compression {
Some(c) => c.as_str(),
None => "none",
}
);
}
drop(store_size);
Ok(())
}
#[instrument(skip(self, path, compression), err(level = Level::WARN))]
pub fn get(
&self,
path: &str,
compression: Option<&CompressionAlgorithm>,
) -> Result<Bytes, Box<dyn Error>> {
tracing::info!(
path,
compression = match compression {
Some(c) => c.as_str(),
None => "none",
}
);
let mut key = BytesMut::with_capacity(path.len() + 1);
key.put(path.as_bytes());
if let Some(compression) = compression {
key.put_u8(compression.as_u8());
}
let txn = ReadTransaction::new(self.env.clone())?;
let access = txn.access();
let result = access.get(&self.asset_db, key.as_ref())?;
Ok(Bytes::copy_from_slice(result))
}
#[instrument(skip(self, path), err)]
pub fn delete(
&self,
path: &str,
compression: Option<&CompressionAlgorithm>,
) -> Result<(), Box<dyn Error>> {
tracing::info!(
path,
compression = match compression {
Some(c) => c.as_str(),
None => "none",
}
);
let mut key = BytesMut::with_capacity(path.len() + 1);
key.put(path.as_bytes());
if let Some(compression) = compression {
key.put_u8(compression.as_u8());
}
let txn = WriteTransaction::new(self.env.clone())?;
{
let mut access = txn.access();
let result = access.get::<[u8], [u8]>(&self.asset_db, key.as_ref())?;
let mut store_size = self.store_size.lock();
*store_size -= (result.len() + key.len()) as u64;
drop(store_size);
access.del_key(&self.asset_db, key.as_ref())?;
}
txn.commit()?;
Ok(())
}
}