Skip to main content

stewball/stores/
cache.rs

1use arrayvec::ArrayVec;
2use bytes::{BufMut, Bytes, BytesMut};
3use hashbrown::HashMap;
4use ordinary_config::{CacheLimits, StoredCache as StoredCacheConfig, StoredCachePolicy};
5use parking_lot::Mutex;
6use saferlmdb::{
7    self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
8};
9use std::cmp::Ordering;
10use std::collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque};
11use std::error::Error;
12use std::sync::Arc;
13use std::time::{Duration, SystemTime};
14use tracing::instrument;
15
16pub enum CacheRead {
17    Template,
18    Action,
19    Integration,
20}
21
22impl CacheRead {
23    fn as_u8(&self) -> u8 {
24        match self {
25            CacheRead::Action => 1,
26            CacheRead::Template => 2,
27            CacheRead::Integration => 3,
28        }
29    }
30}
31
32pub enum CacheWrite<'a> {
33    /// `(etag, last_modified, blob)`
34    Template(&'a str, &'a str, &'a Bytes),
35    Action,
36    Integration,
37}
38
39impl CacheWrite<'_> {
40    fn as_u8(&self) -> u8 {
41        match self {
42            CacheWrite::Action => 1,
43            CacheWrite::Template(..) => 2,
44            CacheWrite::Integration => 3,
45        }
46    }
47}
48
49#[derive(Debug)]
50pub enum CacheCompression {
51    Gzip,
52    Zstd { level: u8 },
53    Brotli,
54    Deflate,
55}
56
57impl CacheCompression {
58    fn as_u8(&self) -> u8 {
59        match self {
60            CacheCompression::Gzip => 1,
61            CacheCompression::Zstd { level: _ } => 2,
62            CacheCompression::Brotli => 3,
63            CacheCompression::Deflate => 4,
64        }
65    }
66
67    #[must_use]
68    pub fn as_char(&self) -> char {
69        match self {
70            CacheCompression::Gzip => '1',
71            CacheCompression::Zstd { level: _ } => '2',
72            CacheCompression::Brotli => '3',
73            CacheCompression::Deflate => '4',
74        }
75    }
76
77    #[must_use]
78    pub fn as_str(&self) -> &'static str {
79        match self {
80            CacheCompression::Gzip => "gzip",
81            CacheCompression::Zstd { level: _ } => "zstd",
82            CacheCompression::Brotli => "br",
83            CacheCompression::Deflate => "deflate",
84        }
85    }
86}
87
88#[derive(Debug, Clone, Eq, Hash, PartialEq)]
89pub enum CacheDependency {
90    Content,
91    Model([u8; 16]),
92}
93
94// todo: consider including time-to-render in the weights as well.
95/// `(total_hits, last_hit, size, addr, idx)`
96#[derive(Debug, Hash, PartialEq, Eq, Default)]
97pub struct FRsEvictionCandidate {
98    /// `(hits, eq_threshold)`
99    total_hits: (i64, i64),
100    /// `(timestamp, eq_threshold)`
101    last_hit: (i64, i64),
102    size: usize,
103    address: String,
104    index: usize,
105}
106
107impl PartialOrd for FRsEvictionCandidate {
108    #[inline]
109    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
110        Some(self.cmp(other))
111    }
112
113    #[inline]
114    fn lt(&self, other: &Self) -> bool {
115        let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
116            return false;
117        };
118
119        if th_delta.abs() > self.total_hits.1 {
120            return self.total_hits.0 >= other.total_hits.0;
121        }
122
123        let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
124            return false;
125        };
126
127        if lh_delta.abs() > self.last_hit.1 {
128            return self.last_hit.0 >= other.last_hit.0;
129        }
130
131        self.size < other.size
132    }
133    #[inline]
134    fn le(&self, other: &Self) -> bool {
135        let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
136            return false;
137        };
138
139        if th_delta.abs() > self.total_hits.1 {
140            return self.total_hits.0 > other.total_hits.0;
141        }
142
143        let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
144            return false;
145        };
146
147        if lh_delta.abs() > self.last_hit.1 {
148            return self.last_hit.0 > other.last_hit.0;
149        }
150
151        self.size <= other.size
152    }
153    #[inline]
154    fn gt(&self, other: &Self) -> bool {
155        let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
156            return false;
157        };
158
159        if th_delta.abs() > self.total_hits.1 {
160            return self.total_hits.0 <= other.total_hits.0;
161        }
162
163        let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
164            return false;
165        };
166
167        if lh_delta.abs() > self.last_hit.1 {
168            return self.last_hit.0 <= other.last_hit.0;
169        }
170
171        self.size > other.size
172    }
173    #[inline]
174    fn ge(&self, other: &Self) -> bool {
175        let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
176            return false;
177        };
178
179        if th_delta.abs() > self.total_hits.1 {
180            return self.total_hits.0 < other.total_hits.0;
181        }
182
183        let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
184            return false;
185        };
186
187        if lh_delta.abs() > self.last_hit.1 {
188            return self.last_hit.0 < other.last_hit.0;
189        }
190
191        self.size >= other.size
192    }
193}
194
195impl Ord for FRsEvictionCandidate {
196    #[inline]
197    fn cmp(&self, other: &Self) -> Ordering {
198        let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
199            return Ordering::Equal;
200        };
201
202        if th_delta.abs() > self.total_hits.1 {
203            return other.total_hits.0.cmp(&self.total_hits.0);
204        }
205
206        let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
207            return Ordering::Equal;
208        };
209
210        if lh_delta.abs() > self.last_hit.1 {
211            return other.last_hit.0.cmp(&self.last_hit.0);
212        }
213
214        self.size.cmp(&other.size)
215    }
216}
217
218#[derive(Debug)]
219struct AddressDetails {
220    /// what compression was used when stored
221    compression: u8,
222
223    /// unix timestamp of when the record was first stored
224    stored_at: u64,
225
226    /// unix timestamp of last time this record was hit
227    last_hit: u64,
228
229    /// total record size in bytes
230    size: usize,
231
232    /// distribution of hits within the sync window,
233    /// chunked by the min clean interval.
234    ///
235    /// always increment `.last()` on hit, and always unshift
236    /// `.first()` on clean.
237    ///
238    /// number of elements in the Vec are `frequency_window/avg(clean_interval.0, clean_interval.1)`
239    hit_distribution: VecDeque<u64>,
240
241    dependencies: Vec<CacheDependency>,
242}
243
244/// `(cache_kind, idx) -> (total_size, total_count, { [addr] -> details[] }`
245type AddressesMap =
246    Arc<Mutex<HashMap<(u8, u8), (usize, usize, BTreeMap<String, ArrayVec<AddressDetails, 5>>)>>>;
247
248/// Max-heap
249type EvictionQueue = Arc<Mutex<BinaryHeap<FRsEvictionCandidate>>>;
250
251/// `(cache_kind, service_idx, addr, compression_idx)`
252type DependencyMap = Arc<Mutex<HashMap<CacheDependency, BTreeSet<(u8, u8, String, u8)>>>>;
253
254pub struct CacheStore {
255    pub limits: CacheLimits,
256    env: Arc<Environment>,
257
258    /// stores cached results
259    cache_db: Arc<Database<'static>>,
260
261    log_size: bool,
262
263    addresses_map: AddressesMap,
264    eviction_queue: EvictionQueue,
265
266    dependency_map: DependencyMap,
267}
268
269impl CacheStore {
270    #[allow(clippy::too_many_lines, clippy::missing_panics_doc)]
271    pub fn new(
272        limits: CacheLimits,
273        env: &Arc<Environment>,
274        log_size: bool,
275    ) -> Result<Self, Box<dyn Error>> {
276        // todo: get from `cache_db` on startup
277        let eviction_queue = BinaryHeap::new();
278
279        let mut addresses_map = HashMap::new();
280        let mut dependency_map = HashMap::new();
281
282        let cache_db = Arc::new(Database::open(
283            env.clone(),
284            Some("cache"),
285            &DatabaseOptions::new(lmdb::db::Flags::CREATE),
286        )?);
287
288        let txn = ReadTransaction::new(env.clone())?;
289        let access = txn.access();
290
291        let mut cursor = txn.cursor(cache_db.clone())?;
292
293        if let Ok((k, v)) = cursor.seek_range_k::<[u8], [u8]>(&access, &[0u8]) {
294            let mut key = k;
295            let mut value = v;
296
297            loop {
298                if key.len() == 3 && key[0] == 0 {
299                    if let Ok(decompressed) = zstd::stream::decode_all(value)
300                        && !decompressed.is_empty()
301                    {
302                        let root = flexbuffers::Reader::get_root(decompressed.as_slice())?;
303
304                        let addresses_vec = root.as_vector();
305
306                        let mut inner_addresses_map = BTreeMap::new();
307
308                        let mut total_size = 0;
309                        let mut total_count = 0;
310
311                        for address in &addresses_vec {
312                            let address_vec = address.as_vector();
313                            let address = address_vec.idx(0).as_str();
314
315                            let mut variants = ArrayVec::<AddressDetails, 5>::new();
316
317                            for variant in &address_vec.idx(1).as_vector() {
318                                let variant_vec = variant.as_vector();
319
320                                let compression = variant_vec.idx(0).as_u8();
321
322                                tracing::debug!(
323                                    kind = key[1],
324                                    i = key[2],
325                                    address,
326                                    compression,
327                                    "restoring from sync"
328                                );
329
330                                let last_hit = variant_vec.idx(1).as_u64();
331
332                                let mut hit_distribution = VecDeque::new();
333
334                                for hit in &variant_vec.idx(2).as_vector() {
335                                    hit_distribution.push_back(hit.as_u64());
336                                }
337
338                                let mut lookup = BytesMut::new();
339
340                                lookup.put(&key[1..3]);
341                                lookup.put(address.as_bytes());
342                                lookup.put_u8(compression);
343
344                                if let Ok(val) =
345                                    access.get::<[u8], [u8]>(&cache_db, lookup.as_ref())
346                                {
347                                    Self::process_details(
348                                        key[1],
349                                        key[2],
350                                        address,
351                                        &mut total_size,
352                                        &mut total_count,
353                                        &mut variants,
354                                        compression,
355                                        last_hit,
356                                        hit_distribution,
357                                        &lookup,
358                                        val,
359                                        false,
360                                        &mut dependency_map,
361                                    )?;
362                                }
363                            }
364
365                            inner_addresses_map.insert(address.to_owned(), variants);
366                        }
367
368                        addresses_map.insert(
369                            (key[1], key[2]),
370                            (total_size, total_count, inner_addresses_map),
371                        );
372                    }
373                } else {
374                    let (total_size, total_count, addresses) = addresses_map
375                        .entry((key[0], key[1]))
376                        .or_insert((0, 0, BTreeMap::new()));
377
378                    let address = std::str::from_utf8(&key[2..key.len() - 1])?;
379                    let compression = *key.last().expect("length is not greater than 1");
380
381                    let variants = addresses
382                        .entry(address.to_owned())
383                        .or_insert(ArrayVec::new());
384
385                    if !variants.iter().any(|v| v.compression == compression) {
386                        tracing::debug!(
387                            kind = key[0],
388                            i = key[1],
389                            address,
390                            compression,
391                            "restoring from cache"
392                        );
393
394                        Self::process_details(
395                            key[0],
396                            key[1],
397                            address,
398                            total_size,
399                            total_count,
400                            variants,
401                            compression,
402                            0,
403                            VecDeque::new(),
404                            key,
405                            value,
406                            true,
407                            &mut dependency_map,
408                        )?;
409                    }
410                }
411
412                if let Ok((k, v)) = cursor.next::<[u8], [u8]>(&access) {
413                    key = k;
414                    value = v;
415                } else {
416                    break;
417                }
418            }
419        }
420
421        Ok(Self {
422            limits,
423            env: env.clone(),
424            cache_db,
425            log_size,
426            addresses_map: Arc::new(Mutex::new(addresses_map)),
427            eviction_queue: Arc::new(Mutex::new(eviction_queue)),
428            dependency_map: Arc::new(Mutex::new(dependency_map)),
429        })
430    }
431
432    #[allow(clippy::too_many_arguments)]
433    fn process_details(
434        artifact_kind: u8,
435        idx: u8,
436        address: &str,
437        total_size: &mut usize,
438        total_count: &mut usize,
439        variants: &mut ArrayVec<AddressDetails, 5>,
440        compression: u8,
441        last_hit: u64,
442        hit_distribution: VecDeque<u64>,
443        lookup: &[u8],
444        val: &[u8],
445        last_hit_is_stored_at: bool,
446        dependency_map: &mut HashMap<CacheDependency, BTreeSet<(u8, u8, String, u8)>>,
447    ) -> Result<(), Box<dyn Error>> {
448        let root = flexbuffers::Reader::get_root(val)?;
449        let internal_vec = root.as_vector().idx(0).as_vector();
450
451        let mut dependencies = vec![];
452
453        for dep in &internal_vec.idx(1).as_vector() {
454            let dep_vec = dep.as_vector();
455
456            let kind = dep_vec.idx(0).as_u8();
457
458            if kind == 0 {
459                dependencies.push(CacheDependency::Content);
460                dependency_map
461                    .entry(CacheDependency::Content)
462                    .or_default()
463                    .insert((artifact_kind, idx, address.to_owned(), compression));
464            } else if kind == 1 {
465                let uuid: [u8; 16] = dep_vec.idx(1).as_blob().0.try_into()?;
466                dependencies.push(CacheDependency::Model(uuid));
467                dependency_map
468                    .entry(CacheDependency::Model(uuid))
469                    .or_default()
470                    .insert((artifact_kind, idx, address.to_owned(), compression));
471            }
472        }
473
474        let size = lookup.len() + val.len();
475
476        *total_size += size;
477        *total_count += 1;
478
479        variants.push(AddressDetails {
480            compression,
481            last_hit: if last_hit_is_stored_at {
482                internal_vec.idx(0).as_u64()
483            } else {
484                last_hit
485            },
486            hit_distribution,
487
488            size,
489            stored_at: internal_vec.idx(0).as_u64(),
490            dependencies,
491        });
492        Ok(())
493    }
494
495    /// Check the `cache_db` for a hit; returns `Err()` if not.
496    #[allow(clippy::type_complexity)]
497    #[instrument(skip(self, cache_kind, compression, idx, addr), err)]
498    pub fn check<'a>(
499        &self,
500        cache_kind: &CacheRead,
501        compression: &'a ArrayVec<CacheCompression, 4>,
502        idx: u8,
503        addr: &str,
504    ) -> Result<Option<(Bytes, Option<&'a CacheCompression>)>, Box<dyn Error>> {
505        let mut key = BytesMut::new();
506
507        let addr_bytes = addr.as_bytes();
508        let base_key_len = addr_bytes.len() + 2;
509
510        key.put_u8(cache_kind.as_u8());
511        key.put_u8(idx);
512        key.put(addr_bytes);
513
514        let txn = ReadTransaction::new(self.env.clone())?;
515        let access = txn.access();
516
517        for compression in compression {
518            key.truncate(base_key_len);
519            key.put_u8(compression.as_u8());
520
521            if let Ok(result) = access.get::<[u8], [u8]>(&self.cache_db, key.as_ref()) {
522                let mut lock = self.addresses_map.lock();
523
524                if let Some((_total_size, _total_count, addresses)) =
525                    lock.get_mut(&(cache_kind.as_u8(), idx))
526                    && let Some(details) = addresses.get_mut(addr)
527                    && let Some(details) = details
528                        .iter_mut()
529                        .find(|v| v.compression == compression.as_u8())
530                {
531                    details.last_hit = SystemTime::now()
532                        .duration_since(SystemTime::UNIX_EPOCH)?
533                        .as_secs();
534
535                    if let Some(back) = details.hit_distribution.back_mut() {
536                        *back += 1;
537                    }
538                }
539
540                drop(lock);
541
542                if self.log_size {
543                    tracing::info!(size = %bytesize::ByteSize((key.len() + result.len()) as u64).display().si_short(), compressed = compression.as_str(), "hit");
544                } else {
545                    tracing::info!(compressed = compression.as_str(), "hit");
546                }
547                return Ok(Some((Bytes::copy_from_slice(result), Some(compression))));
548            }
549        }
550
551        key.truncate(base_key_len);
552        key.put_u8(0);
553
554        if let Ok(result) = access.get::<[u8], [u8]>(&self.cache_db, key.as_ref()) {
555            let mut lock = self.addresses_map.lock();
556
557            if let Some((_total_size, _total_count, addresses)) =
558                lock.get_mut(&(cache_kind.as_u8(), idx))
559                && let Some(details) = addresses.get_mut(addr)
560                && let Some(details) = details.iter_mut().find(|v| v.compression == 0)
561            {
562                details.last_hit = SystemTime::now()
563                    .duration_since(SystemTime::UNIX_EPOCH)?
564                    .as_secs();
565
566                if let Some(back) = details.hit_distribution.back_mut() {
567                    *back += 1;
568                }
569            }
570
571            drop(lock);
572
573            if self.log_size {
574                tracing::info!(size = %bytesize::ByteSize((key.len() + result.len()) as u64).display().si_short(), compressed = "false", "hit");
575            } else {
576                tracing::info!(compressed = "false", "hit");
577            }
578            Ok(Some((Bytes::copy_from_slice(result), None)))
579        } else {
580            tracing::info!("miss");
581            Ok(None)
582        }
583    }
584
585    /// Caches item for kind and index at specified addr.
586    #[allow(
587        clippy::too_many_lines,
588        clippy::too_many_arguments,
589        clippy::similar_names
590    )]
591    #[instrument(
592        skip(self, cache_kind, compression, idx, config, addr, dependencies),
593        err
594    )]
595    pub fn write(
596        &self,
597        cache_kind: CacheWrite,
598        compression: Option<&CacheCompression>,
599        idx: u8,
600        config: &StoredCacheConfig,
601        addr: &str,
602        dependencies: Vec<CacheDependency>,
603    ) -> Result<(), Box<dyn Error>> {
604        let now = SystemTime::now()
605            .duration_since(SystemTime::UNIX_EPOCH)?
606            .as_secs();
607
608        let dependencies = if config.evict_on_dependency_change == Some(true) {
609            dependencies
610        } else {
611            vec![]
612        };
613
614        let mut builder = flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
615        let mut builder_vec = builder.start_vector();
616
617        let mut internal_vec = builder_vec.start_vector();
618
619        internal_vec.push(now);
620
621        let mut deps_vec = internal_vec.start_vector();
622
623        for dep in &dependencies {
624            let mut dep_vec = deps_vec.start_vector();
625
626            match dep {
627                CacheDependency::Content => {
628                    dep_vec.push(0);
629                }
630                CacheDependency::Model(uuid) => {
631                    dep_vec.push(1);
632                    dep_vec.push(flexbuffers::Blob(uuid.as_ref()));
633                }
634            }
635
636            dep_vec.end_vector();
637        }
638
639        deps_vec.end_vector();
640        internal_vec.end_vector();
641
642        match cache_kind {
643            CacheWrite::Template(etag, last_modified, blob) => {
644                builder_vec.push(etag);
645
646                builder_vec.push(last_modified);
647                builder_vec.push(flexbuffers::Blob(blob.as_ref()));
648            }
649            _ => unimplemented!(),
650        }
651
652        builder_vec.end_vector();
653
654        let val = builder.view();
655
656        let mut base_key = BytesMut::new();
657
658        base_key.put_u8(cache_kind.as_u8());
659        base_key.put_u8(idx);
660
661        let mut key = base_key.clone();
662        key.put(addr.as_bytes());
663
664        let compression_byte = compression.map_or(0, CacheCompression::as_u8);
665        let compression_str = compression.map_or("false", CacheCompression::as_str);
666
667        key.put_u8(compression_byte);
668
669        let size = val.len() + key.len();
670
671        if let Some(max_size) = config.max_size
672            && size > usize::try_from(max_size)?
673        {
674            tracing::warn!(
675                address = addr,
676                compressed = compression_str,
677                "exceeds 'max_size' for entire cache"
678            );
679            return Ok(());
680        }
681
682        let mut hit_distribution = VecDeque::new();
683        hit_distribution.push_back(1);
684
685        let txn = WriteTransaction::new(self.env.clone())?;
686
687        let mut lock = self.addresses_map.lock();
688
689        let (total_size, total_count, addresses) = lock
690            .entry((cache_kind.as_u8(), idx))
691            .or_insert((0, 0, BTreeMap::new()));
692
693        if let Some(max_size) = config.max_size
694            && *total_size + size > usize::try_from(max_size)?
695        {
696            let mut evicted_size: usize = 0;
697            let mut lock = self.eviction_queue.lock();
698
699            let mut dep_lock = self.dependency_map.lock();
700
701            while evicted_size < size {
702                if let Some(FRsEvictionCandidate {
703                    size,
704                    address,
705                    index,
706                    ..
707                }) = (*lock).pop()
708                    && let Some(variations) = addresses.get_mut(&address)
709                {
710                    let variation = variations.remove(index);
711
712                    for dep in variation.dependencies {
713                        if let Some(dep) = dep_lock.get_mut(&dep) {
714                            dep.remove(&(
715                                cache_kind.as_u8(),
716                                idx,
717                                address.clone(),
718                                u8::try_from(index)?,
719                            ));
720                        }
721                    }
722
723                    base_key.truncate(2);
724                    base_key.put(address.as_bytes());
725                    base_key.put_u8(variation.compression);
726
727                    let mut access = txn.access();
728                    if access.del_key(&self.cache_db, base_key.as_ref()).is_ok() {
729                        evicted_size += size;
730
731                        *total_count -= 1;
732                        *total_size -= variation.size;
733                    }
734                }
735            }
736
737            drop(lock);
738            drop(dep_lock);
739        } else if let Some(max_count) = config.max_count
740            && *total_count + 1 > max_count
741        {
742            let mut evicted = false;
743            let mut lock = self.eviction_queue.lock();
744
745            let mut dep_lock = self.dependency_map.lock();
746
747            while !evicted {
748                if let Some(FRsEvictionCandidate { address, index, .. }) = (*lock).pop()
749                    && let Some(variations) = addresses.get_mut(&address)
750                {
751                    let variation = variations.remove(index);
752
753                    for dep in variation.dependencies {
754                        if let Some(dep) = dep_lock.get_mut(&dep) {
755                            dep.remove(&(
756                                cache_kind.as_u8(),
757                                idx,
758                                address.clone(),
759                                u8::try_from(index)?,
760                            ));
761                        }
762                    }
763
764                    base_key.put(address.as_bytes());
765                    base_key.put_u8(variation.compression);
766
767                    let mut access = txn.access();
768                    if access.del_key(&self.cache_db, base_key.as_ref()).is_ok() {
769                        evicted = true;
770
771                        *total_count -= 1;
772                        *total_size -= variation.size;
773                    }
774                }
775            }
776
777            drop(lock);
778            drop(dep_lock);
779        }
780
781        *total_size += size;
782        *total_count += 1;
783
784        let details = addresses.entry(addr.to_string()).or_insert(ArrayVec::new());
785
786        let new_details = AddressDetails {
787            compression: compression_byte,
788            stored_at: now,
789            last_hit: now,
790            size,
791            hit_distribution,
792            dependencies: dependencies.clone(),
793        };
794
795        if let Some(existing_pos) = details
796            .iter()
797            .position(|v| v.compression == compression_byte)
798        {
799            details[existing_pos] = new_details;
800        } else {
801            details.push(new_details);
802        }
803
804        drop(lock);
805
806        let mut lock = self.dependency_map.lock();
807
808        for dependency in dependencies {
809            let existing_dep = lock.entry(dependency).or_insert(BTreeSet::new());
810            existing_dep.insert((cache_kind.as_u8(), idx, addr.to_string(), compression_byte));
811        }
812
813        drop(lock);
814
815        {
816            let mut access = txn.access();
817            access.put(&self.cache_db, key.as_ref(), val, &put::Flags::empty())?;
818        }
819
820        txn.commit()?;
821
822        if self.log_size {
823            tracing::info!(
824                size = %bytesize::ByteSize(size as u64).display().si_short(),
825                compressed = compression_str,
826                "stored"
827            );
828        } else {
829            tracing::info!(compressed = compression_str, "stored");
830        }
831
832        Ok(())
833    }
834
835    #[instrument(skip(self, dependencies), err)]
836    pub fn dependency_evict(
837        &self,
838        dependencies: Vec<CacheDependency>,
839    ) -> Result<(), Box<dyn Error>> {
840        let mut lock_dep_map = self.dependency_map.lock();
841        let mut lock_addr_map = self.addresses_map.lock();
842
843        let txn = WriteTransaction::new(self.env.clone())?;
844
845        {
846            let mut access = txn.access();
847
848            for dependency in dependencies {
849                if let Some(addrs) = lock_dep_map.get(&dependency) {
850                    for (kind, service_idx, addr, compression_idx) in addrs {
851                        if let Some((_, _, variants_map)) =
852                            lock_addr_map.get_mut(&(*kind, *service_idx))
853                            && let Some(variants) = variants_map.get_mut(addr)
854                        {
855                            variants.remove((*compression_idx) as usize);
856
857                            let mut key = BytesMut::new();
858
859                            key.put_u8(*kind);
860                            key.put_u8(*service_idx);
861
862                            key.put(addr.as_bytes());
863                            key.put_u8(*compression_idx);
864
865                            tracing::debug!(
866                                kind,
867                                i = service_idx,
868                                address = addr,
869                                compression = compression_idx,
870                                "evicting for dependency"
871                            );
872
873                            access.del_key(&self.cache_db, key.as_ref())?;
874                        }
875                    }
876                }
877
878                lock_dep_map.remove(&dependency);
879            }
880        }
881
882        txn.commit()?;
883
884        drop(lock_dep_map);
885        drop(lock_addr_map);
886
887        Ok(())
888    }
889
890    #[instrument(skip(self, kind, idx), err)]
891    pub fn artifact_evict(&self, kind: CacheRead, idx: u8) -> Result<(), Box<dyn Error>> {
892        let mut key = BytesMut::new();
893
894        key.put_u8(kind.as_u8());
895        key.put_u8(idx);
896
897        let mut lock_dep_map = self.dependency_map.lock();
898        let mut lock_addr_map = self.addresses_map.lock();
899
900        let txn = WriteTransaction::new(self.env.clone())?;
901
902        {
903            let mut access = txn.access();
904
905            if let Some((total_size, total_count, addrs_map)) =
906                lock_addr_map.get_mut(&(kind.as_u8(), idx))
907            {
908                for (addr, variants) in addrs_map.iter() {
909                    key.truncate(2);
910
911                    let addr_bytes = addr.as_bytes();
912                    let base_len = addr_bytes.len() + 2;
913
914                    key.put(addr_bytes);
915
916                    for variant in variants {
917                        key.truncate(base_len);
918                        key.put_u8(variant.compression);
919
920                        tracing::debug!(
921                            kind = kind.as_u8(),
922                            i = idx,
923                            address = addr,
924                            compression = variant.compression,
925                            "evicting for artifact"
926                        );
927
928                        access.del_key(&self.cache_db, key.as_ref())?;
929
930                        for dep in &variant.dependencies {
931                            if let Some(addrs) = lock_dep_map.get_mut(dep) {
932                                addrs.remove(&(
933                                    kind.as_u8(),
934                                    idx,
935                                    addr.clone(),
936                                    variant.compression,
937                                ));
938                            }
939                        }
940                    }
941                }
942
943                addrs_map.clear();
944
945                *total_size = 0;
946                *total_count = 0;
947
948                if let Err(err) = access.del_key(&self.cache_db, &[0, kind.as_u8(), idx]) {
949                    tracing::warn!(%err);
950                }
951            }
952        }
953
954        txn.commit()?;
955
956        drop(lock_dep_map);
957        drop(lock_addr_map);
958
959        Ok(())
960    }
961
962    /// Cleans caches
963    #[allow(clippy::missing_panics_doc, clippy::cast_precision_loss)]
964    #[instrument(skip(self, cache_kind, config, idx), err)]
965    pub fn clean_cache(
966        &self,
967        cache_kind: &CacheRead,
968        config: &StoredCacheConfig,
969        idx: u8,
970    ) -> Result<(), Box<dyn Error>> {
971        if let StoredCachePolicy::Permanent = config.policy {
972            return Err("'Permanent' cache should never be cleaned up".into());
973        }
974
975        let mut key = BytesMut::new();
976
977        key.put_u8(cache_kind.as_u8());
978        key.put_u8(idx);
979
980        let now = SystemTime::now();
981
982        let min_stored_at = now
983            .checked_sub(Duration::from_secs(config.max_ttl.unwrap_or(600))) // todo: default should come from admin
984            .expect("time to work")
985            .duration_since(SystemTime::UNIX_EPOCH)?
986            .as_secs();
987
988        let min_last_hit = now
989            .checked_sub(Duration::from_secs(config.hit_ttl.unwrap_or(300))) // todo: default should come from admin
990            .expect("time to work")
991            .duration_since(SystemTime::UNIX_EPOCH)?
992            .as_secs();
993
994        let max_distribution = config.frequency_window.map(|frequency_window| {
995            // todo: pass this default and a range check from admin
996            let (clean_min, clean_max) = config.clean_interval.unwrap_or((60, 60 * 3));
997
998            let avg_clean_interval = (clean_min + clean_max) as f64 / 2.0;
999            frequency_window as f64 / avg_clean_interval
1000        });
1001
1002        let mut addrs_to_remove = vec![];
1003        let mut eviction_queue = BinaryHeap::new();
1004
1005        let mut lock = self.addresses_map.lock();
1006
1007        if let Some((total_size, total_count, addresses)) = lock.get_mut(&(cache_kind.as_u8(), idx))
1008        {
1009            tracing::info!(count = total_count, size = total_size, "before");
1010
1011            for (address, details) in addresses {
1012                for (i, variation) in details.iter_mut().enumerate() {
1013                    if variation.stored_at < min_stored_at || variation.last_hit < min_last_hit {
1014                        addrs_to_remove.push((address.clone(), variation.compression, i));
1015                    } else if let Some(max_distribution) = max_distribution {
1016                        if variation.hit_distribution.len() as f64 > max_distribution {
1017                            variation.hit_distribution.pop_front();
1018                        }
1019
1020                        let total_hits: u64 = variation.hit_distribution.iter().sum();
1021
1022                        match config.policy {
1023                            StoredCachePolicy::FRs(th_eq_threshold, lh_equality_threshold) => {
1024                                eviction_queue.push(FRsEvictionCandidate {
1025                                    total_hits: (
1026                                        total_hits.cast_signed(),
1027                                        th_eq_threshold.cast_signed(),
1028                                    ),
1029                                    last_hit: (
1030                                        variation.last_hit.cast_signed(),
1031                                        lh_equality_threshold.cast_signed(),
1032                                    ),
1033                                    size: variation.size,
1034                                    address: address.clone(),
1035                                    index: i,
1036                                });
1037                            }
1038                            StoredCachePolicy::Permanent => unreachable!(),
1039                        }
1040                    }
1041                }
1042            }
1043        }
1044
1045        let txn = WriteTransaction::new(self.env.clone())?;
1046
1047        let mut dep_lock = self.dependency_map.lock();
1048
1049        if let Some((total_size, total_count, addresses)) = lock.get_mut(&(cache_kind.as_u8(), idx))
1050        {
1051            {
1052                let mut access = txn.access();
1053
1054                for (remove_addr, compression, i) in &addrs_to_remove {
1055                    key.truncate(2);
1056                    key.put(remove_addr.as_bytes());
1057                    key.put_u8(*compression);
1058
1059                    access.del_key(&self.cache_db, key.as_ref())?;
1060
1061                    if let Some(variations) = addresses.get_mut(remove_addr) {
1062                        let variation = variations.remove(*i);
1063
1064                        for dep in variation.dependencies {
1065                            if let Some(dep) = dep_lock.get_mut(&dep) {
1066                                dep.remove(&(
1067                                    cache_kind.as_u8(),
1068                                    idx,
1069                                    remove_addr.clone(),
1070                                    *compression,
1071                                ));
1072                            }
1073                        }
1074
1075                        *total_size -= variation.size;
1076                        *total_count -= 1;
1077                    }
1078                }
1079
1080                tracing::info!(count = total_count, size = total_size, "after");
1081            }
1082        }
1083
1084        drop(dep_lock);
1085        drop(lock);
1086
1087        txn.commit()?;
1088
1089        let mut lock = self.eviction_queue.lock();
1090        *lock = eviction_queue;
1091
1092        drop(lock);
1093
1094        tracing::info!("cleaned");
1095
1096        Ok(())
1097    }
1098
1099    /// Syncs in-memory variables to disk
1100    #[allow(clippy::similar_names)]
1101    #[instrument(skip(self, cache_kind, idx), err)]
1102    pub fn sync(&self, cache_kind: &CacheRead, idx: u8) -> Result<(), Box<dyn Error>> {
1103        let mut builder = flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
1104
1105        let lock = self.addresses_map.lock();
1106
1107        if let Some((_total_size, _total_count, addresses)) = lock.get(&(cache_kind.as_u8(), idx)) {
1108            let mut addresses_vec = builder.start_vector();
1109
1110            for (address, variants) in addresses {
1111                let mut address_vec = addresses_vec.start_vector();
1112                address_vec.push(address.as_str());
1113
1114                let mut variants_vec = address_vec.start_vector();
1115
1116                for variant in variants {
1117                    let mut variant_vec = variants_vec.start_vector();
1118
1119                    variant_vec.push(variant.compression);
1120                    variant_vec.push(variant.last_hit);
1121
1122                    let mut hit_distribution_vec = variant_vec.start_vector();
1123
1124                    for hit_count in &variant.hit_distribution {
1125                        hit_distribution_vec.push(*hit_count);
1126                    }
1127
1128                    hit_distribution_vec.end_vector();
1129                    variant_vec.end_vector();
1130                }
1131
1132                variants_vec.end_vector();
1133                address_vec.end_vector();
1134            }
1135
1136            addresses_vec.end_vector();
1137        }
1138
1139        drop(lock);
1140
1141        if let Ok(compressed) = zstd::stream::encode_all(std::io::Cursor::new(builder.view()), 17) {
1142            let txn = WriteTransaction::new(self.env.clone())?;
1143
1144            {
1145                let mut access = txn.access();
1146                access.put(
1147                    &self.cache_db,
1148                    &[0, cache_kind.as_u8(), idx],
1149                    &compressed,
1150                    &put::Flags::empty(),
1151                )?;
1152            }
1153
1154            txn.commit()?;
1155
1156            if self.log_size {
1157                tracing::info!(size = %bytesize::ByteSize(compressed.len() as u64).display().si_short(), "synced");
1158            } else {
1159                tracing::info!("synced");
1160            }
1161        }
1162
1163        Ok(())
1164    }
1165}