use anyhow::bail;
use bytes::Bytes;
use ordinary_config::ArtifactLimits;
use parking_lot::Mutex;
use saferlmdb::{
self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
};
use std::sync::Arc;
use tracing::Level;
use tracing::instrument;
#[derive(Debug)]
pub enum ArtifactKind {
Template,
Action,
}
pub struct ArtifactStore {
pub limits: ArtifactLimits,
env: Arc<Environment>,
artifact_db: Arc<Database<'static>>,
log_size: bool,
store_size: Arc<Mutex<u64>>,
}
impl ArtifactStore {
pub fn new(
limits: ArtifactLimits,
env: &Arc<Environment>,
log_size: bool,
) -> anyhow::Result<Self> {
let artifact_db = Arc::new(Database::open(
env.clone(),
Some("artifact"),
&DatabaseOptions::new(lmdb::db::Flags::CREATE),
)?);
let mut store_size = 0;
let txn = ReadTransaction::new(env.clone())?;
let access = txn.access();
let mut artifact_cursor = txn.cursor(artifact_db.clone())?;
if let Ok((key, val)) = artifact_cursor.first::<[u8], [u8]>(&access) {
store_size += key.len() as u64;
store_size += val.len() as u64;
while let Ok((key, val)) = artifact_cursor.next::<[u8], [u8]>(&access) {
store_size += key.len() as u64;
store_size += val.len() as u64;
}
}
Ok(Self {
limits,
env: env.clone(),
artifact_db,
log_size,
store_size: Arc::new(Mutex::new(store_size)),
})
}
#[instrument(skip_all, err)]
pub fn put(&self, idx: u8, kind: ArtifactKind, content: &[u8]) -> anyhow::Result<()> {
let key = match kind {
ArtifactKind::Action => [0, idx],
ArtifactKind::Template => [1, idx],
};
let compressed = zstd::stream::encode_all(std::io::Cursor::new(content), 17)?;
let size = (key.len() + compressed.len()) as u64;
if size > self.limits.max_artifact_size {
bail!("exceeds max artifact size");
}
let mut store_size = self.store_size.lock();
if *store_size + size > self.limits.max_store_size {
bail!("exceeds store size limit");
}
let mut overwrite_size = 0;
let txn = WriteTransaction::new(self.env.clone())?;
{
let mut access = txn.access();
if let Ok(result) = access.get::<[u8], [u8]>(&self.artifact_db, key.as_ref()) {
overwrite_size = (key.len() + result.len()) as u64;
}
access.put(&self.artifact_db, &key, &compressed, &put::Flags::empty())?;
}
txn.commit()?;
*store_size -= overwrite_size;
*store_size += size;
if self.log_size {
tracing::info!(
size.source = %bytesize::ByteSize((key.len() + content.len()) as u64).display().si_short(),
size.compressed = %bytesize::ByteSize((key.len() + compressed.len()) as u64).display().si_short(),
size.stored = %bytesize::ByteSize(*store_size).display().si_short(),
"size"
);
}
drop(store_size);
Ok(())
}
#[instrument(skip_all, err(level = Level::WARN))]
pub fn get(&self, idx: u8, kind: ArtifactKind) -> anyhow::Result<Bytes> {
let key = match kind {
ArtifactKind::Action => [0, idx],
ArtifactKind::Template => [1, idx],
};
let txn = ReadTransaction::new(self.env.clone())?;
let access = txn.access();
let result = access.get::<[u8], [u8]>(&self.artifact_db, &key)?;
let decompressed = zstd::stream::decode_all(result)?;
Ok(Bytes::copy_from_slice(&decompressed))
}
#[instrument(skip_all, err)]
pub fn delete(&self, idx: u8, kind: ArtifactKind) -> anyhow::Result<()> {
let key = match kind {
ArtifactKind::Action => [0, idx],
ArtifactKind::Template => [1, idx],
};
let txn = WriteTransaction::new(self.env.clone())?;
{
let mut access = txn.access();
let result = access.get::<[u8], [u8]>(&self.artifact_db, key.as_ref())?;
let mut store_size = self.store_size.lock();
*store_size -= (result.len() + key.len()) as u64;
drop(store_size);
access.del_key(&self.artifact_db, &key)?;
}
txn.commit()?;
Ok(())
}
}