use anyhow::bail;
use arrayvec::ArrayVec;
use async_compression::Level;
use async_compression::tokio::write::ZstdEncoder;
use bytes::{BufMut, Bytes, BytesMut};
use hashbrown::HashMap;
use ordinary_config::{CacheLimits, StoredCache as StoredCacheConfig, StoredCachePolicy};
use parking_lot::Mutex;
use saferlmdb::{
self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
};
use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::io::AsyncWriteExt;
use tracing::instrument;
pub enum CacheRead {
Template,
Action,
Integration,
}
impl CacheRead {
fn as_u8(&self) -> u8 {
match self {
CacheRead::Action => 1,
CacheRead::Template => 2,
CacheRead::Integration => 3,
}
}
fn from_u8(v: u8) -> Self {
match v {
1 => CacheRead::Action,
2 => CacheRead::Template,
3 => CacheRead::Integration,
_ => panic!("invalid u8 {v} for CacheRead"),
}
}
fn from_write(w: &CacheWrite) -> Self {
match w {
CacheWrite::Template(..) => CacheRead::Template,
CacheWrite::Action => CacheRead::Action,
CacheWrite::Integration => CacheRead::Integration,
}
}
}
pub enum CacheWrite<'a> {
Template(&'a str, &'a str, &'a Bytes),
Action,
Integration,
}
impl CacheWrite<'_> {
fn as_u8(&self) -> u8 {
match self {
CacheWrite::Action => 1,
CacheWrite::Template(..) => 2,
CacheWrite::Integration => 3,
}
}
}
#[derive(Debug)]
pub enum CacheCompression {
Gzip,
Zstd { level: u8 },
Brotli,
Deflate,
}
impl CacheCompression {
fn as_u8(&self) -> u8 {
match self {
CacheCompression::Gzip => 1,
CacheCompression::Zstd { level: _ } => 2,
CacheCompression::Brotli => 3,
CacheCompression::Deflate => 4,
}
}
#[must_use]
pub fn as_char(&self) -> char {
match self {
CacheCompression::Gzip => '1',
CacheCompression::Zstd { level: _ } => '2',
CacheCompression::Brotli => '3',
CacheCompression::Deflate => '4',
}
}
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
CacheCompression::Gzip => "gzip",
CacheCompression::Zstd { level: _ } => "zstd",
CacheCompression::Brotli => "br",
CacheCompression::Deflate => "deflate",
}
}
}
#[derive(Debug, Clone, Eq, Hash, PartialEq)]
pub enum CacheDependency {
Content,
Model([u8; 16]),
}
#[derive(Debug, Hash, PartialEq, Eq, Default)]
pub struct FRsEvictionCandidate {
total_hits: (i64, i64),
last_hit: (i64, i64),
size: usize,
address: String,
index: usize,
}
impl PartialOrd for FRsEvictionCandidate {
#[inline]
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
#[inline]
fn lt(&self, other: &Self) -> bool {
let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
return false;
};
if th_delta.abs() > self.total_hits.1 {
return self.total_hits.0 >= other.total_hits.0;
}
let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
return false;
};
if lh_delta.abs() > self.last_hit.1 {
return self.last_hit.0 >= other.last_hit.0;
}
self.size < other.size
}
#[inline]
fn le(&self, other: &Self) -> bool {
let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
return false;
};
if th_delta.abs() > self.total_hits.1 {
return self.total_hits.0 > other.total_hits.0;
}
let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
return false;
};
if lh_delta.abs() > self.last_hit.1 {
return self.last_hit.0 > other.last_hit.0;
}
self.size <= other.size
}
#[inline]
fn gt(&self, other: &Self) -> bool {
let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
return false;
};
if th_delta.abs() > self.total_hits.1 {
return self.total_hits.0 <= other.total_hits.0;
}
let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
return false;
};
if lh_delta.abs() > self.last_hit.1 {
return self.last_hit.0 <= other.last_hit.0;
}
self.size > other.size
}
#[inline]
fn ge(&self, other: &Self) -> bool {
let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
return false;
};
if th_delta.abs() > self.total_hits.1 {
return self.total_hits.0 < other.total_hits.0;
}
let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
return false;
};
if lh_delta.abs() > self.last_hit.1 {
return self.last_hit.0 < other.last_hit.0;
}
self.size >= other.size
}
}
impl Ord for FRsEvictionCandidate {
#[inline]
fn cmp(&self, other: &Self) -> Ordering {
let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
return Ordering::Equal;
};
if th_delta.abs() > self.total_hits.1 {
return other.total_hits.0.cmp(&self.total_hits.0);
}
let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
return Ordering::Equal;
};
if lh_delta.abs() > self.last_hit.1 {
return other.last_hit.0.cmp(&self.last_hit.0);
}
self.size.cmp(&other.size)
}
}
#[derive(Debug)]
struct AddressDetails {
compression: u8,
stored_at: u64,
last_hit: u64,
size: usize,
hit_distribution: VecDeque<u64>,
dependencies: Vec<CacheDependency>,
}
type AddressesMap =
Arc<Mutex<HashMap<(u8, u8), (usize, usize, BTreeMap<String, ArrayVec<AddressDetails, 5>>)>>>;
type EvictionQueue = Arc<Mutex<BinaryHeap<FRsEvictionCandidate>>>;
type DependencyMap = Arc<Mutex<HashMap<CacheDependency, BTreeSet<(u8, u8, String, u8)>>>>;
pub struct CacheStore {
pub limits: CacheLimits,
env: Arc<Environment>,
cache_db: Arc<Database<'static>>,
log_size: bool,
addresses_map: AddressesMap,
eviction_queue: EvictionQueue,
dependency_map: DependencyMap,
}
impl CacheStore {
#[allow(clippy::too_many_lines, clippy::missing_panics_doc)]
pub fn new(
limits: CacheLimits,
env: &Arc<Environment>,
log_size: bool,
) -> anyhow::Result<Self> {
let eviction_queue = BinaryHeap::new();
let mut addresses_map = HashMap::new();
let mut dependency_map = HashMap::new();
let cache_db = Arc::new(Database::open(
env.clone(),
Some("cache"),
&DatabaseOptions::new(lmdb::db::Flags::CREATE),
)?);
let txn = ReadTransaction::new(env.clone())?;
let access = txn.access();
let mut cursor = txn.cursor(cache_db.clone())?;
if let Ok((k, v)) = cursor.seek_range_k::<[u8], [u8]>(&access, &[0u8]) {
let mut key = k;
let mut value = v;
loop {
if key.len() == 3 && key[0] == 0 {
if let Ok(decompressed) = zstd::stream::decode_all(value)
&& !decompressed.is_empty()
{
let root = flexbuffers::Reader::get_root(decompressed.as_slice())?;
let addresses_vec = root.as_vector();
let mut inner_addresses_map = BTreeMap::new();
let mut total_size = 0;
let mut total_count = 0;
for address in &addresses_vec {
let address_vec = address.as_vector();
let address = address_vec.idx(0).as_str();
let mut variants = ArrayVec::<AddressDetails, 5>::new();
for variant in &address_vec.idx(1).as_vector() {
let variant_vec = variant.as_vector();
let compression = variant_vec.idx(0).as_u8();
tracing::debug!(
kind = key[1],
i = key[2],
address,
compression,
"restoring from sync"
);
let last_hit = variant_vec.idx(1).as_u64();
let mut hit_distribution = VecDeque::new();
for hit in &variant_vec.idx(2).as_vector() {
hit_distribution.push_back(hit.as_u64());
}
let mut lookup = BytesMut::new();
lookup.put(&key[1..3]);
lookup.put(address.as_bytes());
lookup.put_u8(compression);
if let Ok(val) =
access.get::<[u8], [u8]>(&cache_db, lookup.as_ref())
{
Self::process_details(
key[1],
key[2],
address,
&mut total_size,
&mut total_count,
&mut variants,
compression,
last_hit,
hit_distribution,
&lookup,
val,
false,
&mut dependency_map,
)?;
}
}
inner_addresses_map.insert(address.to_owned(), variants);
}
addresses_map.insert(
(key[1], key[2]),
(total_size, total_count, inner_addresses_map),
);
}
} else {
let (total_size, total_count, addresses) = addresses_map
.entry((key[0], key[1]))
.or_insert((0, 0, BTreeMap::new()));
let address = std::str::from_utf8(&key[2..key.len() - 1])?;
let compression = *key.last().expect("length is not greater than 1");
let variants = addresses
.entry(address.to_owned())
.or_insert(ArrayVec::new());
if !variants.iter().any(|v| v.compression == compression) {
tracing::debug!(
kind = key[0],
i = key[1],
address,
compression,
"restoring from cache"
);
Self::process_details(
key[0],
key[1],
address,
total_size,
total_count,
variants,
compression,
0,
VecDeque::new(),
key,
value,
true,
&mut dependency_map,
)?;
}
}
if let Ok((k, v)) = cursor.next::<[u8], [u8]>(&access) {
key = k;
value = v;
} else {
break;
}
}
}
Ok(Self {
limits,
env: env.clone(),
cache_db,
log_size,
addresses_map: Arc::new(Mutex::new(addresses_map)),
eviction_queue: Arc::new(Mutex::new(eviction_queue)),
dependency_map: Arc::new(Mutex::new(dependency_map)),
})
}
#[allow(clippy::too_many_arguments)]
fn process_details(
artifact_kind: u8,
idx: u8,
address: &str,
total_size: &mut usize,
total_count: &mut usize,
variants: &mut ArrayVec<AddressDetails, 5>,
compression: u8,
last_hit: u64,
hit_distribution: VecDeque<u64>,
lookup: &[u8],
val: &[u8],
last_hit_is_stored_at: bool,
dependency_map: &mut HashMap<CacheDependency, BTreeSet<(u8, u8, String, u8)>>,
) -> anyhow::Result<()> {
let root = flexbuffers::Reader::get_root(val)?;
let internal_vec = root.as_vector().idx(0).as_vector();
let mut dependencies = vec![];
for dep in &internal_vec.idx(1).as_vector() {
let dep_vec = dep.as_vector();
let kind = dep_vec.idx(0).as_u8();
if kind == 0 {
dependencies.push(CacheDependency::Content);
dependency_map
.entry(CacheDependency::Content)
.or_default()
.insert((artifact_kind, idx, address.to_owned(), compression));
} else if kind == 1 {
let uuid: [u8; 16] = dep_vec.idx(1).as_blob().0.try_into()?;
dependencies.push(CacheDependency::Model(uuid));
dependency_map
.entry(CacheDependency::Model(uuid))
.or_default()
.insert((artifact_kind, idx, address.to_owned(), compression));
}
}
let size = lookup.len() + val.len();
*total_size += size;
*total_count += 1;
variants.push(AddressDetails {
compression,
last_hit: if last_hit_is_stored_at {
internal_vec.idx(0).as_u64()
} else {
last_hit
},
hit_distribution,
size,
stored_at: internal_vec.idx(0).as_u64(),
dependencies,
});
Ok(())
}
#[allow(clippy::type_complexity)]
#[instrument(skip_all, err)]
pub async fn check<'a>(
&self,
cache_kind: &CacheRead,
compression: &'a ArrayVec<CacheCompression, 4>,
idx: u8,
addr: &str,
) -> anyhow::Result<Option<(Bytes, Option<&'a CacheCompression>)>> {
let mut key = BytesMut::new();
let addr_bytes = addr.as_bytes();
let base_key_len = addr_bytes.len() + 2;
key.put_u8(cache_kind.as_u8());
key.put_u8(idx);
key.put(addr_bytes);
let txn = ReadTransaction::new(self.env.clone())?;
let access = txn.access();
for compression in compression {
key.truncate(base_key_len);
key.put_u8(compression.as_u8());
if let Ok(result) = access.get::<[u8], [u8]>(&self.cache_db, key.as_ref()) {
let mut lock = self.addresses_map.lock();
if let Some((_total_size, _total_count, addresses)) =
lock.get_mut(&(cache_kind.as_u8(), idx))
&& let Some(details) = addresses.get_mut(addr)
&& let Some(details) = details
.iter_mut()
.find(|v| v.compression == compression.as_u8())
{
details.last_hit = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
if let Some(back) = details.hit_distribution.back_mut() {
*back += 1;
}
}
drop(lock);
if self.log_size {
tracing::info!(size = %bytesize::ByteSize((key.len() + result.len()) as u64).display().si_short(), compressed = compression.as_str(), "hit");
} else {
tracing::info!(compressed = compression.as_str(), "hit");
}
return Ok(Some((Bytes::copy_from_slice(result), Some(compression))));
}
}
key.truncate(base_key_len);
key.put_u8(0);
if let Ok(result) = access.get::<[u8], [u8]>(&self.cache_db, key.as_ref()) {
let mut lock = self.addresses_map.lock();
if let Some((_total_size, _total_count, addresses)) =
lock.get_mut(&(cache_kind.as_u8(), idx))
&& let Some(details) = addresses.get_mut(addr)
&& let Some(details) = details.iter_mut().find(|v| v.compression == 0)
{
details.last_hit = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
if let Some(back) = details.hit_distribution.back_mut() {
*back += 1;
}
}
drop(lock);
if self.log_size {
tracing::info!(size = %bytesize::ByteSize((key.len() + result.len()) as u64).display().si_short(), compressed = "false", "hit");
} else {
tracing::info!(compressed = "false", "hit");
}
Ok(Some((Bytes::copy_from_slice(result), None)))
} else {
tracing::info!("miss");
Ok(None)
}
}
#[allow(
clippy::too_many_lines,
clippy::too_many_arguments,
clippy::similar_names
)]
#[instrument(skip_all, err)]
pub async fn write(
&self,
cache_kind: CacheWrite<'_>,
compression: Option<&CacheCompression>,
idx: u8,
config: &StoredCacheConfig,
addr: &str,
dependencies: Vec<CacheDependency>,
) -> anyhow::Result<()> {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
let dependencies = if config.evict_on_dependency_change == Some(true) {
dependencies
} else {
vec![]
};
let mut builder = flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
let mut builder_vec = builder.start_vector();
let mut internal_vec = builder_vec.start_vector();
internal_vec.push(now);
let mut deps_vec = internal_vec.start_vector();
for dep in &dependencies {
let mut dep_vec = deps_vec.start_vector();
match dep {
CacheDependency::Content => {
dep_vec.push(0);
}
CacheDependency::Model(uuid) => {
dep_vec.push(1);
dep_vec.push(flexbuffers::Blob(uuid.as_ref()));
}
}
dep_vec.end_vector();
}
deps_vec.end_vector();
internal_vec.end_vector();
match cache_kind {
CacheWrite::Template(etag, last_modified, blob) => {
builder_vec.push(etag);
builder_vec.push(last_modified);
builder_vec.push(flexbuffers::Blob(blob.as_ref()));
}
_ => unimplemented!(),
}
builder_vec.end_vector();
let val = builder.view();
let mut base_key = BytesMut::new();
base_key.put_u8(cache_kind.as_u8());
base_key.put_u8(idx);
let mut key = base_key.clone();
key.put(addr.as_bytes());
let compression_byte = compression.map_or(0, CacheCompression::as_u8);
let compression_str = compression.map_or("false", CacheCompression::as_str);
key.put_u8(compression_byte);
let size = val.len() + key.len();
if let Some(max_size) = config.max_size
&& size > usize::try_from(max_size)?
{
tracing::warn!(
address = addr,
compressed = compression_str,
"exceeds 'max_size' for entire cache"
);
return Ok(());
}
let mut hit_distribution = VecDeque::new();
hit_distribution.push_back(1);
{
let txn = WriteTransaction::new(self.env.clone())?;
let mut lock = self.addresses_map.lock();
let (total_size, total_count, addresses) = lock
.entry((cache_kind.as_u8(), idx))
.or_insert((0, 0, BTreeMap::new()));
if let Some(max_size) = config.max_size
&& *total_size + size > usize::try_from(max_size)?
{
let mut evicted_size: usize = 0;
let mut lock = self.eviction_queue.lock();
let mut dep_lock = self.dependency_map.lock();
while evicted_size < size {
if let Some(FRsEvictionCandidate {
size,
address,
index,
..
}) = (*lock).pop()
&& let Some(variations) = addresses.get_mut(&address)
{
let variation = variations.remove(index);
for dep in variation.dependencies {
if let Some(dep) = dep_lock.get_mut(&dep) {
dep.remove(&(
cache_kind.as_u8(),
idx,
address.clone(),
u8::try_from(index)?,
));
}
}
base_key.truncate(2);
base_key.put(address.as_bytes());
base_key.put_u8(variation.compression);
let mut access = txn.access();
if access.del_key(&self.cache_db, base_key.as_ref()).is_ok() {
evicted_size += size;
*total_count -= 1;
*total_size -= variation.size;
}
}
}
drop(lock);
drop(dep_lock);
} else if let Some(max_count) = config.max_count
&& *total_count + 1 > max_count
{
let mut evicted = false;
let mut lock = self.eviction_queue.lock();
let mut dep_lock = self.dependency_map.lock();
while !evicted {
if let Some(FRsEvictionCandidate { address, index, .. }) = (*lock).pop()
&& let Some(variations) = addresses.get_mut(&address)
{
let variation = variations.remove(index);
for dep in variation.dependencies {
if let Some(dep) = dep_lock.get_mut(&dep) {
dep.remove(&(
cache_kind.as_u8(),
idx,
address.clone(),
u8::try_from(index)?,
));
}
}
base_key.put(address.as_bytes());
base_key.put_u8(variation.compression);
let mut access = txn.access();
if access.del_key(&self.cache_db, base_key.as_ref()).is_ok() {
evicted = true;
*total_count -= 1;
*total_size -= variation.size;
}
}
}
drop(lock);
drop(dep_lock);
}
*total_size += size;
*total_count += 1;
let details = addresses.entry(addr.to_string()).or_insert(ArrayVec::new());
let new_details = AddressDetails {
compression: compression_byte,
stored_at: now,
last_hit: now,
size,
hit_distribution,
dependencies: dependencies.clone(),
};
if let Some(existing_pos) = details
.iter()
.position(|v| v.compression == compression_byte)
{
details[existing_pos] = new_details;
} else {
details.push(new_details);
}
drop(lock);
let mut lock = self.dependency_map.lock();
for dependency in dependencies {
let existing_dep = lock.entry(dependency).or_insert(BTreeSet::new());
existing_dep.insert((cache_kind.as_u8(), idx, addr.to_string(), compression_byte));
}
drop(lock);
{
let mut access = txn.access();
access.put(&self.cache_db, key.as_ref(), val, &put::Flags::empty())?;
}
txn.commit()?;
}
if self.log_size {
tracing::info!(
size = %bytesize::ByteSize(size as u64).display().si_short(),
compressed = compression_str,
"stored"
);
} else {
tracing::info!(compressed = compression_str, "stored");
}
self.sync(&CacheRead::from_write(&cache_kind), idx).await?;
Ok(())
}
#[instrument(skip_all, err)]
pub async fn dependency_evict(&self, dependencies: Vec<CacheDependency>) -> anyhow::Result<()> {
let mut sync_list = BTreeSet::new();
{
let txn = WriteTransaction::new(self.env.clone())?;
let mut lock_dep_map = self.dependency_map.lock();
let mut lock_addr_map = self.addresses_map.lock();
{
let mut access = txn.access();
for dependency in dependencies {
if let Some(addrs) = lock_dep_map.get(&dependency) {
for (kind, service_idx, addr, compression_idx) in addrs {
if let Some((_, _, variants_map)) =
lock_addr_map.get_mut(&(*kind, *service_idx))
&& let Some(variants) = variants_map.get_mut(addr)
{
variants.remove((*compression_idx) as usize);
sync_list.insert((*kind, *service_idx));
let mut key = BytesMut::new();
key.put_u8(*kind);
key.put_u8(*service_idx);
key.put(addr.as_bytes());
key.put_u8(*compression_idx);
tracing::debug!(
kind,
i = service_idx,
address = addr,
compression = compression_idx,
"evicting for dependency"
);
access.del_key(&self.cache_db, key.as_ref())?;
}
}
}
lock_dep_map.remove(&dependency);
}
}
txn.commit()?;
}
for (kind, idx) in sync_list {
self.sync(&CacheRead::from_u8(kind), idx).await?;
}
Ok(())
}
#[instrument(skip_all, err)]
pub async fn artifact_evict(&self, kind: CacheRead, idx: u8) -> anyhow::Result<()> {
let mut key = BytesMut::new();
key.put_u8(kind.as_u8());
key.put_u8(idx);
{
let txn = WriteTransaction::new(self.env.clone())?;
let mut lock_dep_map = self.dependency_map.lock();
let mut lock_addr_map = self.addresses_map.lock();
{
let mut access = txn.access();
if let Some((total_size, total_count, addrs_map)) =
lock_addr_map.get_mut(&(kind.as_u8(), idx))
{
for (addr, variants) in addrs_map.iter() {
key.truncate(2);
let addr_bytes = addr.as_bytes();
let base_len = addr_bytes.len() + 2;
key.put(addr_bytes);
for variant in variants {
key.truncate(base_len);
key.put_u8(variant.compression);
tracing::debug!(
kind = kind.as_u8(),
i = idx,
address = addr,
compression = variant.compression,
"evicting for artifact"
);
access.del_key(&self.cache_db, key.as_ref())?;
for dep in &variant.dependencies {
if let Some(addrs) = lock_dep_map.get_mut(dep) {
addrs.remove(&(
kind.as_u8(),
idx,
addr.clone(),
variant.compression,
));
}
}
}
}
addrs_map.clear();
*total_size = 0;
*total_count = 0;
if let Err(err) = access.del_key(&self.cache_db, &[0, kind.as_u8(), idx]) {
tracing::warn!(%err);
}
}
}
txn.commit()?;
}
self.sync(&kind, idx).await?;
Ok(())
}
#[allow(
clippy::missing_panics_doc,
clippy::cast_precision_loss,
clippy::too_many_lines
)]
#[instrument(skip_all, err)]
pub async fn clean_cache(
&self,
cache_kind: &CacheRead,
config: &StoredCacheConfig,
idx: u8,
) -> anyhow::Result<()> {
if let StoredCachePolicy::Permanent = config.policy {
bail!("'Permanent' cache should never be cleaned up");
}
let mut key = BytesMut::new();
key.put_u8(cache_kind.as_u8());
key.put_u8(idx);
let now = SystemTime::now();
let min_stored_at = now
.checked_sub(Duration::from_secs(config.max_ttl.unwrap_or(600))) .expect("time to work")
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
let min_last_hit = now
.checked_sub(Duration::from_secs(config.hit_ttl.unwrap_or(300))) .expect("time to work")
.duration_since(SystemTime::UNIX_EPOCH)?
.as_secs();
let max_distribution = config.frequency_window.map(|frequency_window| {
let (clean_min, clean_max) = config.clean_interval.unwrap_or((60, 60 * 3));
let avg_clean_interval = (clean_min + clean_max) as f64 / 2.0;
frequency_window as f64 / avg_clean_interval
});
let mut addrs_to_remove = vec![];
let mut eviction_queue = BinaryHeap::new();
{
let txn = WriteTransaction::new(self.env.clone())?;
let mut lock = self.addresses_map.lock();
if let Some((total_size, total_count, addresses)) =
lock.get_mut(&(cache_kind.as_u8(), idx))
{
tracing::info!(count = total_count, size = total_size, "before");
for (address, details) in addresses {
for (i, variation) in details.iter_mut().enumerate() {
if variation.stored_at < min_stored_at || variation.last_hit < min_last_hit
{
addrs_to_remove.push((address.clone(), variation.compression, i));
} else if let Some(max_distribution) = max_distribution {
if variation.hit_distribution.len() as f64 > max_distribution {
variation.hit_distribution.pop_front();
}
let total_hits: u64 = variation.hit_distribution.iter().sum();
match config.policy {
StoredCachePolicy::FRs(th_eq_threshold, lh_equality_threshold) => {
eviction_queue.push(FRsEvictionCandidate {
total_hits: (
total_hits.cast_signed(),
th_eq_threshold.cast_signed(),
),
last_hit: (
variation.last_hit.cast_signed(),
lh_equality_threshold.cast_signed(),
),
size: variation.size,
address: address.clone(),
index: i,
});
}
StoredCachePolicy::Permanent => unreachable!(),
}
}
}
}
}
let mut dep_lock = self.dependency_map.lock();
if let Some((total_size, total_count, addresses)) =
lock.get_mut(&(cache_kind.as_u8(), idx))
{
{
let mut access = txn.access();
for (remove_addr, compression, i) in &addrs_to_remove {
key.truncate(2);
key.put(remove_addr.as_bytes());
key.put_u8(*compression);
access.del_key(&self.cache_db, key.as_ref())?;
if let Some(variations) = addresses.get_mut(remove_addr) {
let variation = variations.remove(*i);
for dep in variation.dependencies {
if let Some(dep) = dep_lock.get_mut(&dep) {
dep.remove(&(
cache_kind.as_u8(),
idx,
remove_addr.clone(),
*compression,
));
}
}
*total_size -= variation.size;
*total_count -= 1;
}
}
tracing::info!(count = total_count, size = total_size, "after");
}
}
let mut lock = self.eviction_queue.lock();
*lock = eviction_queue;
txn.commit()?;
}
self.sync(cache_kind, idx).await?;
tracing::info!("cleaned");
Ok(())
}
#[allow(clippy::similar_names)]
#[instrument(skip_all, err)]
pub async fn sync(&self, cache_kind: &CacheRead, idx: u8) -> anyhow::Result<()> {
let mut builder = flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
{
let lock = self.addresses_map.lock();
if let Some((_total_size, _total_count, addresses)) =
lock.get(&(cache_kind.as_u8(), idx))
{
let mut addresses_vec = builder.start_vector();
for (address, variants) in addresses {
let mut address_vec = addresses_vec.start_vector();
address_vec.push(address.as_str());
let mut variants_vec = address_vec.start_vector();
for variant in variants {
let mut variant_vec = variants_vec.start_vector();
variant_vec.push(variant.compression);
variant_vec.push(variant.last_hit);
let mut hit_distribution_vec = variant_vec.start_vector();
for hit_count in &variant.hit_distribution {
hit_distribution_vec.push(*hit_count);
}
hit_distribution_vec.end_vector();
variant_vec.end_vector();
}
variants_vec.end_vector();
address_vec.end_vector();
}
addresses_vec.end_vector();
}
}
let mut encoder = ZstdEncoder::with_quality(Vec::new(), Level::Precise(17));
encoder.write_all(builder.view()).await?;
encoder.shutdown().await?;
let compressed = encoder.into_inner();
{
let txn = WriteTransaction::new(self.env.clone())?;
{
let mut access = txn.access();
access.put(
&self.cache_db,
&[0, cache_kind.as_u8(), idx],
compressed.as_slice(),
&put::Flags::empty(),
)?;
}
txn.commit()?;
}
if self.log_size {
tracing::info!(size = %bytesize::ByteSize(compressed.len() as u64).display().si_short(), "synced");
} else {
tracing::info!("synced");
}
Ok(())
}
}