Skip to main content

ordinary_storage/stores/
cache.rs

1// Copyright (C) 2026 Ordinary Labs, LLC.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4
5use anyhow::bail;
6use arrayvec::ArrayVec;
7use async_compression::Level;
8use async_compression::tokio::write::ZstdEncoder;
9use bytes::{BufMut, Bytes, BytesMut};
10use hashbrown::HashMap;
11use ordinary_config::{CacheLimits, StoredCache as StoredCacheConfig, StoredCachePolicy};
12use parking_lot::Mutex;
13use saferlmdb::{
14    self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
15};
16use std::cmp::Ordering;
17use std::collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque};
18use std::sync::Arc;
19use std::time::{Duration, SystemTime};
20use tokio::io::AsyncWriteExt;
21use tracing::instrument;
22
23pub enum CacheRead {
24    Template,
25    Action,
26    Integration,
27}
28
29impl CacheRead {
30    fn as_u8(&self) -> u8 {
31        match self {
32            CacheRead::Action => 1,
33            CacheRead::Template => 2,
34            CacheRead::Integration => 3,
35        }
36    }
37
38    fn from_u8(v: u8) -> Self {
39        match v {
40            1 => CacheRead::Action,
41            2 => CacheRead::Template,
42            3 => CacheRead::Integration,
43            _ => panic!("invalid u8 {v} for CacheRead"),
44        }
45    }
46
47    fn from_write(w: &CacheWrite) -> Self {
48        match w {
49            CacheWrite::Template(..) => CacheRead::Template,
50            CacheWrite::Action => CacheRead::Action,
51            CacheWrite::Integration => CacheRead::Integration,
52        }
53    }
54}
55
56pub enum CacheWrite<'a> {
57    /// `(etag, last_modified, blob)`
58    Template(&'a str, &'a str, &'a Bytes),
59    Action,
60    Integration,
61}
62
63impl CacheWrite<'_> {
64    fn as_u8(&self) -> u8 {
65        match self {
66            CacheWrite::Action => 1,
67            CacheWrite::Template(..) => 2,
68            CacheWrite::Integration => 3,
69        }
70    }
71}
72
73#[derive(Debug)]
74pub enum CacheCompression {
75    Gzip,
76    Zstd { level: u8 },
77    Brotli,
78    Deflate,
79}
80
81impl CacheCompression {
82    fn as_u8(&self) -> u8 {
83        match self {
84            CacheCompression::Gzip => 1,
85            CacheCompression::Zstd { level: _ } => 2,
86            CacheCompression::Brotli => 3,
87            CacheCompression::Deflate => 4,
88        }
89    }
90
91    #[must_use]
92    pub fn as_char(&self) -> char {
93        match self {
94            CacheCompression::Gzip => '1',
95            CacheCompression::Zstd { level: _ } => '2',
96            CacheCompression::Brotli => '3',
97            CacheCompression::Deflate => '4',
98        }
99    }
100
101    #[must_use]
102    pub fn as_str(&self) -> &'static str {
103        match self {
104            CacheCompression::Gzip => "gzip",
105            CacheCompression::Zstd { level: _ } => "zstd",
106            CacheCompression::Brotli => "br",
107            CacheCompression::Deflate => "deflate",
108        }
109    }
110}
111
112#[derive(Debug, Clone, Eq, Hash, PartialEq)]
113pub enum CacheDependency {
114    Content,
115    Model([u8; 16]),
116}
117
118// todo: consider including time-to-render in the weights as well.
119/// `(total_hits, last_hit, size, addr, idx)`
120#[derive(Debug, Hash, PartialEq, Eq, Default)]
121pub struct FRsEvictionCandidate {
122    /// `(hits, eq_threshold)`
123    total_hits: (i64, i64),
124    /// `(timestamp, eq_threshold)`
125    last_hit: (i64, i64),
126    size: usize,
127    address: String,
128    index: usize,
129}
130
131impl PartialOrd for FRsEvictionCandidate {
132    #[inline]
133    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
134        Some(self.cmp(other))
135    }
136
137    #[inline]
138    fn lt(&self, other: &Self) -> bool {
139        let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
140            return false;
141        };
142
143        if th_delta.abs() > self.total_hits.1 {
144            return self.total_hits.0 >= other.total_hits.0;
145        }
146
147        let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
148            return false;
149        };
150
151        if lh_delta.abs() > self.last_hit.1 {
152            return self.last_hit.0 >= other.last_hit.0;
153        }
154
155        self.size < other.size
156    }
157    #[inline]
158    fn le(&self, other: &Self) -> bool {
159        let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
160            return false;
161        };
162
163        if th_delta.abs() > self.total_hits.1 {
164            return self.total_hits.0 > other.total_hits.0;
165        }
166
167        let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
168            return false;
169        };
170
171        if lh_delta.abs() > self.last_hit.1 {
172            return self.last_hit.0 > other.last_hit.0;
173        }
174
175        self.size <= other.size
176    }
177    #[inline]
178    fn gt(&self, other: &Self) -> bool {
179        let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
180            return false;
181        };
182
183        if th_delta.abs() > self.total_hits.1 {
184            return self.total_hits.0 <= other.total_hits.0;
185        }
186
187        let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
188            return false;
189        };
190
191        if lh_delta.abs() > self.last_hit.1 {
192            return self.last_hit.0 <= other.last_hit.0;
193        }
194
195        self.size > other.size
196    }
197    #[inline]
198    fn ge(&self, other: &Self) -> bool {
199        let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
200            return false;
201        };
202
203        if th_delta.abs() > self.total_hits.1 {
204            return self.total_hits.0 < other.total_hits.0;
205        }
206
207        let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
208            return false;
209        };
210
211        if lh_delta.abs() > self.last_hit.1 {
212            return self.last_hit.0 < other.last_hit.0;
213        }
214
215        self.size >= other.size
216    }
217}
218
219impl Ord for FRsEvictionCandidate {
220    #[inline]
221    fn cmp(&self, other: &Self) -> Ordering {
222        let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
223            return Ordering::Equal;
224        };
225
226        if th_delta.abs() > self.total_hits.1 {
227            return other.total_hits.0.cmp(&self.total_hits.0);
228        }
229
230        let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
231            return Ordering::Equal;
232        };
233
234        if lh_delta.abs() > self.last_hit.1 {
235            return other.last_hit.0.cmp(&self.last_hit.0);
236        }
237
238        self.size.cmp(&other.size)
239    }
240}
241
242#[derive(Debug)]
243struct AddressDetails {
244    /// what compression was used when stored
245    compression: u8,
246
247    /// unix timestamp of when the record was first stored
248    stored_at: u64,
249
250    /// unix timestamp of last time this record was hit
251    last_hit: u64,
252
253    /// total record size in bytes
254    size: usize,
255
256    /// distribution of hits within the sync window,
257    /// chunked by the min clean interval.
258    ///
259    /// always increment `.last()` on hit, and always unshift
260    /// `.first()` on clean.
261    ///
262    /// number of elements in the Vec are `frequency_window/avg(clean_interval.0, clean_interval.1)`
263    hit_distribution: VecDeque<u64>,
264
265    dependencies: Vec<CacheDependency>,
266}
267
268/// `(cache_kind, idx) -> (total_size, total_count, { [addr] -> details[] }`
269type AddressesMap =
270    Arc<Mutex<HashMap<(u8, u8), (usize, usize, BTreeMap<String, ArrayVec<AddressDetails, 5>>)>>>;
271
272/// Max-heap
273type EvictionQueue = Arc<Mutex<BinaryHeap<FRsEvictionCandidate>>>;
274
275/// `(cache_kind, service_idx, addr, compression_idx)`
276type DependencyMap = Arc<Mutex<HashMap<CacheDependency, BTreeSet<(u8, u8, String, u8)>>>>;
277
278pub struct CacheStore {
279    pub limits: CacheLimits,
280    env: Arc<Environment>,
281
282    /// stores cached results
283    cache_db: Arc<Database<'static>>,
284
285    log_size: bool,
286
287    addresses_map: AddressesMap,
288    eviction_queue: EvictionQueue,
289
290    dependency_map: DependencyMap,
291}
292
293impl CacheStore {
294    #[allow(clippy::too_many_lines, clippy::missing_panics_doc)]
295    pub fn new(
296        limits: CacheLimits,
297        env: &Arc<Environment>,
298        log_size: bool,
299    ) -> anyhow::Result<Self> {
300        // todo: get from `cache_db` on startup
301        let eviction_queue = BinaryHeap::new();
302
303        let mut addresses_map = HashMap::new();
304        let mut dependency_map = HashMap::new();
305
306        let cache_db = Arc::new(Database::open(
307            env.clone(),
308            Some("cache"),
309            &DatabaseOptions::new(lmdb::db::Flags::CREATE),
310        )?);
311
312        let txn = ReadTransaction::new(env.clone())?;
313        let access = txn.access();
314
315        let mut cursor = txn.cursor(cache_db.clone())?;
316
317        if let Ok((k, v)) = cursor.seek_range_k::<[u8], [u8]>(&access, &[0u8]) {
318            let mut key = k;
319            let mut value = v;
320
321            loop {
322                if key.len() == 3 && key[0] == 0 {
323                    if let Ok(decompressed) = zstd::stream::decode_all(value)
324                        && !decompressed.is_empty()
325                    {
326                        let root = flexbuffers::Reader::get_root(decompressed.as_slice())?;
327
328                        let addresses_vec = root.as_vector();
329
330                        let mut inner_addresses_map = BTreeMap::new();
331
332                        let mut total_size = 0;
333                        let mut total_count = 0;
334
335                        for address in &addresses_vec {
336                            let address_vec = address.as_vector();
337                            let address = address_vec.idx(0).as_str();
338
339                            let mut variants = ArrayVec::<AddressDetails, 5>::new();
340
341                            for variant in &address_vec.idx(1).as_vector() {
342                                let variant_vec = variant.as_vector();
343
344                                let compression = variant_vec.idx(0).as_u8();
345
346                                tracing::debug!(
347                                    kind = key[1],
348                                    i = key[2],
349                                    address,
350                                    compression,
351                                    "restoring from sync"
352                                );
353
354                                let last_hit = variant_vec.idx(1).as_u64();
355
356                                let mut hit_distribution = VecDeque::new();
357
358                                for hit in &variant_vec.idx(2).as_vector() {
359                                    hit_distribution.push_back(hit.as_u64());
360                                }
361
362                                let mut lookup = BytesMut::new();
363
364                                lookup.put(&key[1..3]);
365                                lookup.put(address.as_bytes());
366                                lookup.put_u8(compression);
367
368                                if let Ok(val) =
369                                    access.get::<[u8], [u8]>(&cache_db, lookup.as_ref())
370                                {
371                                    Self::process_details(
372                                        key[1],
373                                        key[2],
374                                        address,
375                                        &mut total_size,
376                                        &mut total_count,
377                                        &mut variants,
378                                        compression,
379                                        last_hit,
380                                        hit_distribution,
381                                        &lookup,
382                                        val,
383                                        false,
384                                        &mut dependency_map,
385                                    )?;
386                                }
387                            }
388
389                            inner_addresses_map.insert(address.to_owned(), variants);
390                        }
391
392                        addresses_map.insert(
393                            (key[1], key[2]),
394                            (total_size, total_count, inner_addresses_map),
395                        );
396                    }
397                } else {
398                    let (total_size, total_count, addresses) = addresses_map
399                        .entry((key[0], key[1]))
400                        .or_insert((0, 0, BTreeMap::new()));
401
402                    let address = std::str::from_utf8(&key[2..key.len() - 1])?;
403                    let compression = *key.last().expect("length is not greater than 1");
404
405                    let variants = addresses
406                        .entry(address.to_owned())
407                        .or_insert(ArrayVec::new());
408
409                    if !variants.iter().any(|v| v.compression == compression) {
410                        tracing::debug!(
411                            kind = key[0],
412                            i = key[1],
413                            address,
414                            compression,
415                            "restoring from cache"
416                        );
417
418                        Self::process_details(
419                            key[0],
420                            key[1],
421                            address,
422                            total_size,
423                            total_count,
424                            variants,
425                            compression,
426                            0,
427                            VecDeque::new(),
428                            key,
429                            value,
430                            true,
431                            &mut dependency_map,
432                        )?;
433                    }
434                }
435
436                if let Ok((k, v)) = cursor.next::<[u8], [u8]>(&access) {
437                    key = k;
438                    value = v;
439                } else {
440                    break;
441                }
442            }
443        }
444
445        Ok(Self {
446            limits,
447            env: env.clone(),
448            cache_db,
449            log_size,
450            addresses_map: Arc::new(Mutex::new(addresses_map)),
451            eviction_queue: Arc::new(Mutex::new(eviction_queue)),
452            dependency_map: Arc::new(Mutex::new(dependency_map)),
453        })
454    }
455
456    #[allow(clippy::too_many_arguments)]
457    fn process_details(
458        artifact_kind: u8,
459        idx: u8,
460        address: &str,
461        total_size: &mut usize,
462        total_count: &mut usize,
463        variants: &mut ArrayVec<AddressDetails, 5>,
464        compression: u8,
465        last_hit: u64,
466        hit_distribution: VecDeque<u64>,
467        lookup: &[u8],
468        val: &[u8],
469        last_hit_is_stored_at: bool,
470        dependency_map: &mut HashMap<CacheDependency, BTreeSet<(u8, u8, String, u8)>>,
471    ) -> anyhow::Result<()> {
472        let root = flexbuffers::Reader::get_root(val)?;
473        let internal_vec = root.as_vector().idx(0).as_vector();
474
475        let mut dependencies = vec![];
476
477        for dep in &internal_vec.idx(1).as_vector() {
478            let dep_vec = dep.as_vector();
479
480            let kind = dep_vec.idx(0).as_u8();
481
482            if kind == 0 {
483                dependencies.push(CacheDependency::Content);
484                dependency_map
485                    .entry(CacheDependency::Content)
486                    .or_default()
487                    .insert((artifact_kind, idx, address.to_owned(), compression));
488            } else if kind == 1 {
489                let uuid: [u8; 16] = dep_vec.idx(1).as_blob().0.try_into()?;
490                dependencies.push(CacheDependency::Model(uuid));
491                dependency_map
492                    .entry(CacheDependency::Model(uuid))
493                    .or_default()
494                    .insert((artifact_kind, idx, address.to_owned(), compression));
495            }
496        }
497
498        let size = lookup.len() + val.len();
499
500        *total_size += size;
501        *total_count += 1;
502
503        variants.push(AddressDetails {
504            compression,
505            last_hit: if last_hit_is_stored_at {
506                internal_vec.idx(0).as_u64()
507            } else {
508                last_hit
509            },
510            hit_distribution,
511
512            size,
513            stored_at: internal_vec.idx(0).as_u64(),
514            dependencies,
515        });
516        Ok(())
517    }
518
519    /// Check the `cache_db` for a hit; returns `Err()` if not.
520    #[allow(clippy::type_complexity)]
521    #[instrument(skip_all, err)]
522    pub async fn check<'a>(
523        &self,
524        cache_kind: &CacheRead,
525        compression: &'a ArrayVec<CacheCompression, 4>,
526        idx: u8,
527        addr: &str,
528    ) -> anyhow::Result<Option<(Bytes, Option<&'a CacheCompression>)>> {
529        let mut key = BytesMut::new();
530
531        let addr_bytes = addr.as_bytes();
532        let base_key_len = addr_bytes.len() + 2;
533
534        key.put_u8(cache_kind.as_u8());
535        key.put_u8(idx);
536        key.put(addr_bytes);
537
538        let txn = ReadTransaction::new(self.env.clone())?;
539        let access = txn.access();
540
541        for compression in compression {
542            key.truncate(base_key_len);
543            key.put_u8(compression.as_u8());
544
545            if let Ok(result) = access.get::<[u8], [u8]>(&self.cache_db, key.as_ref()) {
546                let mut lock = self.addresses_map.lock();
547
548                if let Some((_total_size, _total_count, addresses)) =
549                    lock.get_mut(&(cache_kind.as_u8(), idx))
550                    && let Some(details) = addresses.get_mut(addr)
551                    && let Some(details) = details
552                        .iter_mut()
553                        .find(|v| v.compression == compression.as_u8())
554                {
555                    details.last_hit = SystemTime::now()
556                        .duration_since(SystemTime::UNIX_EPOCH)?
557                        .as_secs();
558
559                    if let Some(back) = details.hit_distribution.back_mut() {
560                        *back += 1;
561                    }
562                }
563
564                drop(lock);
565
566                if self.log_size {
567                    tracing::info!(size = %bytesize::ByteSize((key.len() + result.len()) as u64).display().si_short(), compressed = compression.as_str(), "hit");
568                } else {
569                    tracing::info!(compressed = compression.as_str(), "hit");
570                }
571                return Ok(Some((Bytes::copy_from_slice(result), Some(compression))));
572            }
573        }
574
575        key.truncate(base_key_len);
576        key.put_u8(0);
577
578        if let Ok(result) = access.get::<[u8], [u8]>(&self.cache_db, key.as_ref()) {
579            let mut lock = self.addresses_map.lock();
580
581            if let Some((_total_size, _total_count, addresses)) =
582                lock.get_mut(&(cache_kind.as_u8(), idx))
583                && let Some(details) = addresses.get_mut(addr)
584                && let Some(details) = details.iter_mut().find(|v| v.compression == 0)
585            {
586                details.last_hit = SystemTime::now()
587                    .duration_since(SystemTime::UNIX_EPOCH)?
588                    .as_secs();
589
590                if let Some(back) = details.hit_distribution.back_mut() {
591                    *back += 1;
592                }
593            }
594
595            drop(lock);
596
597            if self.log_size {
598                tracing::info!(size = %bytesize::ByteSize((key.len() + result.len()) as u64).display().si_short(), compressed = "false", "hit");
599            } else {
600                tracing::info!(compressed = "false", "hit");
601            }
602            Ok(Some((Bytes::copy_from_slice(result), None)))
603        } else {
604            tracing::info!("miss");
605            Ok(None)
606        }
607    }
608
609    /// Caches item for kind and index at specified addr.
610    #[allow(
611        clippy::too_many_lines,
612        clippy::too_many_arguments,
613        clippy::similar_names
614    )]
615    #[instrument(skip_all, err)]
616    pub async fn write(
617        &self,
618        cache_kind: CacheWrite<'_>,
619        compression: Option<&CacheCompression>,
620        idx: u8,
621        config: &StoredCacheConfig,
622        addr: &str,
623        dependencies: Vec<CacheDependency>,
624    ) -> anyhow::Result<()> {
625        let now = SystemTime::now()
626            .duration_since(SystemTime::UNIX_EPOCH)?
627            .as_secs();
628
629        let dependencies = if config.evict_on_dependency_change == Some(true) {
630            dependencies
631        } else {
632            vec![]
633        };
634
635        let mut builder = flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
636        let mut builder_vec = builder.start_vector();
637
638        let mut internal_vec = builder_vec.start_vector();
639
640        internal_vec.push(now);
641
642        let mut deps_vec = internal_vec.start_vector();
643
644        for dep in &dependencies {
645            let mut dep_vec = deps_vec.start_vector();
646
647            match dep {
648                CacheDependency::Content => {
649                    dep_vec.push(0);
650                }
651                CacheDependency::Model(uuid) => {
652                    dep_vec.push(1);
653                    dep_vec.push(flexbuffers::Blob(uuid.as_ref()));
654                }
655            }
656
657            dep_vec.end_vector();
658        }
659
660        deps_vec.end_vector();
661        internal_vec.end_vector();
662
663        match cache_kind {
664            CacheWrite::Template(etag, last_modified, blob) => {
665                builder_vec.push(etag);
666
667                builder_vec.push(last_modified);
668                builder_vec.push(flexbuffers::Blob(blob.as_ref()));
669            }
670            _ => unimplemented!(),
671        }
672
673        builder_vec.end_vector();
674
675        let val = builder.view();
676
677        let mut base_key = BytesMut::new();
678
679        base_key.put_u8(cache_kind.as_u8());
680        base_key.put_u8(idx);
681
682        let mut key = base_key.clone();
683        key.put(addr.as_bytes());
684
685        let compression_byte = compression.map_or(0, CacheCompression::as_u8);
686        let compression_str = compression.map_or("false", CacheCompression::as_str);
687
688        key.put_u8(compression_byte);
689
690        let size = val.len() + key.len();
691
692        if let Some(max_size) = config.max_size
693            && size > usize::try_from(max_size)?
694        {
695            tracing::warn!(
696                address = addr,
697                compressed = compression_str,
698                "exceeds 'max_size' for entire cache"
699            );
700            return Ok(());
701        }
702
703        let mut hit_distribution = VecDeque::new();
704        hit_distribution.push_back(1);
705
706        {
707            let txn = WriteTransaction::new(self.env.clone())?;
708            let mut lock = self.addresses_map.lock();
709
710            let (total_size, total_count, addresses) = lock
711                .entry((cache_kind.as_u8(), idx))
712                .or_insert((0, 0, BTreeMap::new()));
713
714            if let Some(max_size) = config.max_size
715                && *total_size + size > usize::try_from(max_size)?
716            {
717                let mut evicted_size: usize = 0;
718                let mut lock = self.eviction_queue.lock();
719
720                let mut dep_lock = self.dependency_map.lock();
721
722                while evicted_size < size {
723                    if let Some(FRsEvictionCandidate {
724                        size,
725                        address,
726                        index,
727                        ..
728                    }) = (*lock).pop()
729                        && let Some(variations) = addresses.get_mut(&address)
730                    {
731                        let variation = variations.remove(index);
732
733                        for dep in variation.dependencies {
734                            if let Some(dep) = dep_lock.get_mut(&dep) {
735                                dep.remove(&(
736                                    cache_kind.as_u8(),
737                                    idx,
738                                    address.clone(),
739                                    u8::try_from(index)?,
740                                ));
741                            }
742                        }
743
744                        base_key.truncate(2);
745                        base_key.put(address.as_bytes());
746                        base_key.put_u8(variation.compression);
747
748                        let mut access = txn.access();
749                        if access.del_key(&self.cache_db, base_key.as_ref()).is_ok() {
750                            evicted_size += size;
751
752                            *total_count -= 1;
753                            *total_size -= variation.size;
754                        }
755                    }
756                }
757
758                drop(lock);
759                drop(dep_lock);
760            } else if let Some(max_count) = config.max_count
761                && *total_count + 1 > max_count
762            {
763                let mut evicted = false;
764                let mut lock = self.eviction_queue.lock();
765
766                let mut dep_lock = self.dependency_map.lock();
767
768                while !evicted {
769                    if let Some(FRsEvictionCandidate { address, index, .. }) = (*lock).pop()
770                        && let Some(variations) = addresses.get_mut(&address)
771                    {
772                        let variation = variations.remove(index);
773
774                        for dep in variation.dependencies {
775                            if let Some(dep) = dep_lock.get_mut(&dep) {
776                                dep.remove(&(
777                                    cache_kind.as_u8(),
778                                    idx,
779                                    address.clone(),
780                                    u8::try_from(index)?,
781                                ));
782                            }
783                        }
784
785                        base_key.put(address.as_bytes());
786                        base_key.put_u8(variation.compression);
787
788                        let mut access = txn.access();
789                        if access.del_key(&self.cache_db, base_key.as_ref()).is_ok() {
790                            evicted = true;
791
792                            *total_count -= 1;
793                            *total_size -= variation.size;
794                        }
795                    }
796                }
797
798                drop(lock);
799                drop(dep_lock);
800            }
801
802            *total_size += size;
803            *total_count += 1;
804
805            let details = addresses.entry(addr.to_string()).or_insert(ArrayVec::new());
806
807            let new_details = AddressDetails {
808                compression: compression_byte,
809                stored_at: now,
810                last_hit: now,
811                size,
812                hit_distribution,
813                dependencies: dependencies.clone(),
814            };
815
816            if let Some(existing_pos) = details
817                .iter()
818                .position(|v| v.compression == compression_byte)
819            {
820                details[existing_pos] = new_details;
821            } else {
822                details.push(new_details);
823            }
824
825            drop(lock);
826
827            let mut lock = self.dependency_map.lock();
828
829            for dependency in dependencies {
830                let existing_dep = lock.entry(dependency).or_insert(BTreeSet::new());
831                existing_dep.insert((cache_kind.as_u8(), idx, addr.to_string(), compression_byte));
832            }
833
834            drop(lock);
835
836            {
837                let mut access = txn.access();
838                access.put(&self.cache_db, key.as_ref(), val, &put::Flags::empty())?;
839            }
840
841            txn.commit()?;
842        }
843
844        if self.log_size {
845            tracing::info!(
846                size = %bytesize::ByteSize(size as u64).display().si_short(),
847                compressed = compression_str,
848                "stored"
849            );
850        } else {
851            tracing::info!(compressed = compression_str, "stored");
852        }
853
854        self.sync(&CacheRead::from_write(&cache_kind), idx).await?;
855
856        Ok(())
857    }
858
859    #[instrument(skip_all, err)]
860    pub async fn dependency_evict(&self, dependencies: Vec<CacheDependency>) -> anyhow::Result<()> {
861        let mut sync_list = BTreeSet::new();
862
863        {
864            let txn = WriteTransaction::new(self.env.clone())?;
865
866            let mut lock_dep_map = self.dependency_map.lock();
867            let mut lock_addr_map = self.addresses_map.lock();
868
869            {
870                let mut access = txn.access();
871
872                for dependency in dependencies {
873                    if let Some(addrs) = lock_dep_map.get(&dependency) {
874                        for (kind, service_idx, addr, compression_idx) in addrs {
875                            if let Some((_, _, variants_map)) =
876                                lock_addr_map.get_mut(&(*kind, *service_idx))
877                                && let Some(variants) = variants_map.get_mut(addr)
878                            {
879                                variants.remove((*compression_idx) as usize);
880
881                                sync_list.insert((*kind, *service_idx));
882
883                                let mut key = BytesMut::new();
884
885                                key.put_u8(*kind);
886                                key.put_u8(*service_idx);
887
888                                key.put(addr.as_bytes());
889                                key.put_u8(*compression_idx);
890
891                                tracing::debug!(
892                                    kind,
893                                    i = service_idx,
894                                    address = addr,
895                                    compression = compression_idx,
896                                    "evicting for dependency"
897                                );
898
899                                access.del_key(&self.cache_db, key.as_ref())?;
900                            }
901                        }
902                    }
903
904                    lock_dep_map.remove(&dependency);
905                }
906            }
907
908            txn.commit()?;
909        }
910
911        for (kind, idx) in sync_list {
912            self.sync(&CacheRead::from_u8(kind), idx).await?;
913        }
914
915        Ok(())
916    }
917
918    #[instrument(skip_all, err)]
919    pub async fn artifact_evict(&self, kind: CacheRead, idx: u8) -> anyhow::Result<()> {
920        let mut key = BytesMut::new();
921
922        key.put_u8(kind.as_u8());
923        key.put_u8(idx);
924
925        {
926            let txn = WriteTransaction::new(self.env.clone())?;
927
928            let mut lock_dep_map = self.dependency_map.lock();
929            let mut lock_addr_map = self.addresses_map.lock();
930
931            {
932                let mut access = txn.access();
933
934                if let Some((total_size, total_count, addrs_map)) =
935                    lock_addr_map.get_mut(&(kind.as_u8(), idx))
936                {
937                    for (addr, variants) in addrs_map.iter() {
938                        key.truncate(2);
939
940                        let addr_bytes = addr.as_bytes();
941                        let base_len = addr_bytes.len() + 2;
942
943                        key.put(addr_bytes);
944
945                        for variant in variants {
946                            key.truncate(base_len);
947                            key.put_u8(variant.compression);
948
949                            tracing::debug!(
950                                kind = kind.as_u8(),
951                                i = idx,
952                                address = addr,
953                                compression = variant.compression,
954                                "evicting for artifact"
955                            );
956
957                            access.del_key(&self.cache_db, key.as_ref())?;
958
959                            for dep in &variant.dependencies {
960                                if let Some(addrs) = lock_dep_map.get_mut(dep) {
961                                    addrs.remove(&(
962                                        kind.as_u8(),
963                                        idx,
964                                        addr.clone(),
965                                        variant.compression,
966                                    ));
967                                }
968                            }
969                        }
970                    }
971
972                    addrs_map.clear();
973
974                    *total_size = 0;
975                    *total_count = 0;
976
977                    if let Err(err) = access.del_key(&self.cache_db, &[0, kind.as_u8(), idx]) {
978                        tracing::warn!(%err);
979                    }
980                }
981            }
982
983            txn.commit()?;
984        }
985
986        self.sync(&kind, idx).await?;
987
988        Ok(())
989    }
990
991    /// Cleans caches
992    #[allow(
993        clippy::missing_panics_doc,
994        clippy::cast_precision_loss,
995        clippy::too_many_lines
996    )]
997    #[instrument(skip_all, err)]
998    pub async fn clean_cache(
999        &self,
1000        cache_kind: &CacheRead,
1001        config: &StoredCacheConfig,
1002        idx: u8,
1003    ) -> anyhow::Result<()> {
1004        if let StoredCachePolicy::Permanent = config.policy {
1005            bail!("'Permanent' cache should never be cleaned up");
1006        }
1007
1008        let mut key = BytesMut::new();
1009
1010        key.put_u8(cache_kind.as_u8());
1011        key.put_u8(idx);
1012
1013        let now = SystemTime::now();
1014
1015        let min_stored_at = now
1016            .checked_sub(Duration::from_secs(config.max_ttl.unwrap_or(600))) // todo: default should come from Ordinary API
1017            .expect("time to work")
1018            .duration_since(SystemTime::UNIX_EPOCH)?
1019            .as_secs();
1020
1021        let min_last_hit = now
1022            .checked_sub(Duration::from_secs(config.hit_ttl.unwrap_or(300))) // todo: default should come from Ordinary API
1023            .expect("time to work")
1024            .duration_since(SystemTime::UNIX_EPOCH)?
1025            .as_secs();
1026
1027        let max_distribution = config.frequency_window.map(|frequency_window| {
1028            // todo: pass this default and a range check from Ordinary API
1029            let (clean_min, clean_max) = config.clean_interval.unwrap_or((60, 60 * 3));
1030
1031            let avg_clean_interval = (clean_min + clean_max) as f64 / 2.0;
1032            frequency_window as f64 / avg_clean_interval
1033        });
1034
1035        let mut addrs_to_remove = vec![];
1036        let mut eviction_queue = BinaryHeap::new();
1037
1038        {
1039            let txn = WriteTransaction::new(self.env.clone())?;
1040            let mut lock = self.addresses_map.lock();
1041
1042            if let Some((total_size, total_count, addresses)) =
1043                lock.get_mut(&(cache_kind.as_u8(), idx))
1044            {
1045                tracing::info!(count = total_count, size = total_size, "before");
1046
1047                for (address, details) in addresses {
1048                    for (i, variation) in details.iter_mut().enumerate() {
1049                        if variation.stored_at < min_stored_at || variation.last_hit < min_last_hit
1050                        {
1051                            addrs_to_remove.push((address.clone(), variation.compression, i));
1052                        } else if let Some(max_distribution) = max_distribution {
1053                            if variation.hit_distribution.len() as f64 > max_distribution {
1054                                variation.hit_distribution.pop_front();
1055                            }
1056
1057                            let total_hits: u64 = variation.hit_distribution.iter().sum();
1058
1059                            match config.policy {
1060                                StoredCachePolicy::FRs(th_eq_threshold, lh_equality_threshold) => {
1061                                    eviction_queue.push(FRsEvictionCandidate {
1062                                        total_hits: (
1063                                            total_hits.cast_signed(),
1064                                            th_eq_threshold.cast_signed(),
1065                                        ),
1066                                        last_hit: (
1067                                            variation.last_hit.cast_signed(),
1068                                            lh_equality_threshold.cast_signed(),
1069                                        ),
1070                                        size: variation.size,
1071                                        address: address.clone(),
1072                                        index: i,
1073                                    });
1074                                }
1075                                StoredCachePolicy::Permanent => unreachable!(),
1076                            }
1077                        }
1078                    }
1079                }
1080            }
1081
1082            let mut dep_lock = self.dependency_map.lock();
1083
1084            if let Some((total_size, total_count, addresses)) =
1085                lock.get_mut(&(cache_kind.as_u8(), idx))
1086            {
1087                {
1088                    let mut access = txn.access();
1089
1090                    for (remove_addr, compression, i) in &addrs_to_remove {
1091                        key.truncate(2);
1092                        key.put(remove_addr.as_bytes());
1093                        key.put_u8(*compression);
1094
1095                        access.del_key(&self.cache_db, key.as_ref())?;
1096
1097                        if let Some(variations) = addresses.get_mut(remove_addr) {
1098                            let variation = variations.remove(*i);
1099
1100                            for dep in variation.dependencies {
1101                                if let Some(dep) = dep_lock.get_mut(&dep) {
1102                                    dep.remove(&(
1103                                        cache_kind.as_u8(),
1104                                        idx,
1105                                        remove_addr.clone(),
1106                                        *compression,
1107                                    ));
1108                                }
1109                            }
1110
1111                            *total_size -= variation.size;
1112                            *total_count -= 1;
1113                        }
1114                    }
1115
1116                    tracing::info!(count = total_count, size = total_size, "after");
1117                }
1118            }
1119
1120            let mut lock = self.eviction_queue.lock();
1121            *lock = eviction_queue;
1122
1123            txn.commit()?;
1124        }
1125
1126        self.sync(cache_kind, idx).await?;
1127
1128        tracing::info!("cleaned");
1129
1130        Ok(())
1131    }
1132
1133    /// Syncs in-memory variables to disk
1134    #[allow(clippy::similar_names)]
1135    #[instrument(skip_all, err)]
1136    pub async fn sync(&self, cache_kind: &CacheRead, idx: u8) -> anyhow::Result<()> {
1137        let mut builder = flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
1138
1139        {
1140            let lock = self.addresses_map.lock();
1141
1142            if let Some((_total_size, _total_count, addresses)) =
1143                lock.get(&(cache_kind.as_u8(), idx))
1144            {
1145                let mut addresses_vec = builder.start_vector();
1146
1147                for (address, variants) in addresses {
1148                    let mut address_vec = addresses_vec.start_vector();
1149                    address_vec.push(address.as_str());
1150
1151                    let mut variants_vec = address_vec.start_vector();
1152
1153                    for variant in variants {
1154                        let mut variant_vec = variants_vec.start_vector();
1155
1156                        variant_vec.push(variant.compression);
1157                        variant_vec.push(variant.last_hit);
1158
1159                        let mut hit_distribution_vec = variant_vec.start_vector();
1160
1161                        for hit_count in &variant.hit_distribution {
1162                            hit_distribution_vec.push(*hit_count);
1163                        }
1164
1165                        hit_distribution_vec.end_vector();
1166                        variant_vec.end_vector();
1167                    }
1168
1169                    variants_vec.end_vector();
1170                    address_vec.end_vector();
1171                }
1172
1173                addresses_vec.end_vector();
1174            }
1175        }
1176
1177        let mut encoder = ZstdEncoder::with_quality(Vec::new(), Level::Precise(17));
1178        encoder.write_all(builder.view()).await?;
1179        encoder.shutdown().await?;
1180        let compressed = encoder.into_inner();
1181
1182        {
1183            let txn = WriteTransaction::new(self.env.clone())?;
1184
1185            {
1186                let mut access = txn.access();
1187                access.put(
1188                    &self.cache_db,
1189                    &[0, cache_kind.as_u8(), idx],
1190                    compressed.as_slice(),
1191                    &put::Flags::empty(),
1192                )?;
1193            }
1194
1195            txn.commit()?;
1196        }
1197
1198        if self.log_size {
1199            tracing::info!(size = %bytesize::ByteSize(compressed.len() as u64).display().si_short(), "synced");
1200        } else {
1201            tracing::info!("synced");
1202        }
1203
1204        Ok(())
1205    }
1206}