1use anyhow::bail;
6use arrayvec::ArrayVec;
7use async_compression::Level;
8use async_compression::tokio::write::ZstdEncoder;
9use bytes::{BufMut, Bytes, BytesMut};
10use hashbrown::HashMap;
11use ordinary_config::{CacheLimits, StoredCache as StoredCacheConfig, StoredCachePolicy};
12use parking_lot::Mutex;
13use saferlmdb::{
14 self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
15};
16use std::cmp::Ordering;
17use std::collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque};
18use std::sync::Arc;
19use std::time::{Duration, SystemTime};
20use tokio::io::AsyncWriteExt;
21use tracing::instrument;
22
23pub enum CacheRead {
24 Template,
25 Action,
26 Integration,
27}
28
29impl CacheRead {
30 fn as_u8(&self) -> u8 {
31 match self {
32 CacheRead::Action => 1,
33 CacheRead::Template => 2,
34 CacheRead::Integration => 3,
35 }
36 }
37
38 fn from_u8(v: u8) -> Self {
39 match v {
40 1 => CacheRead::Action,
41 2 => CacheRead::Template,
42 3 => CacheRead::Integration,
43 _ => panic!("invalid u8 {v} for CacheRead"),
44 }
45 }
46
47 fn from_write(w: &CacheWrite) -> Self {
48 match w {
49 CacheWrite::Template(..) => CacheRead::Template,
50 CacheWrite::Action => CacheRead::Action,
51 CacheWrite::Integration => CacheRead::Integration,
52 }
53 }
54}
55
56pub enum CacheWrite<'a> {
57 Template(&'a str, &'a str, &'a Bytes),
59 Action,
60 Integration,
61}
62
63impl CacheWrite<'_> {
64 fn as_u8(&self) -> u8 {
65 match self {
66 CacheWrite::Action => 1,
67 CacheWrite::Template(..) => 2,
68 CacheWrite::Integration => 3,
69 }
70 }
71}
72
73#[derive(Debug)]
74pub enum CacheCompression {
75 Gzip,
76 Zstd { level: u8 },
77 Brotli,
78 Deflate,
79}
80
81impl CacheCompression {
82 fn as_u8(&self) -> u8 {
83 match self {
84 CacheCompression::Gzip => 1,
85 CacheCompression::Zstd { level: _ } => 2,
86 CacheCompression::Brotli => 3,
87 CacheCompression::Deflate => 4,
88 }
89 }
90
91 #[must_use]
92 pub fn as_char(&self) -> char {
93 match self {
94 CacheCompression::Gzip => '1',
95 CacheCompression::Zstd { level: _ } => '2',
96 CacheCompression::Brotli => '3',
97 CacheCompression::Deflate => '4',
98 }
99 }
100
101 #[must_use]
102 pub fn as_str(&self) -> &'static str {
103 match self {
104 CacheCompression::Gzip => "gzip",
105 CacheCompression::Zstd { level: _ } => "zstd",
106 CacheCompression::Brotli => "br",
107 CacheCompression::Deflate => "deflate",
108 }
109 }
110}
111
112#[derive(Debug, Clone, Eq, Hash, PartialEq)]
113pub enum CacheDependency {
114 Content,
115 Model([u8; 16]),
116}
117
118#[derive(Debug, Hash, PartialEq, Eq, Default)]
121pub struct FRsEvictionCandidate {
122 total_hits: (i64, i64),
124 last_hit: (i64, i64),
126 size: usize,
127 address: String,
128 index: usize,
129}
130
131impl PartialOrd for FRsEvictionCandidate {
132 #[inline]
133 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
134 Some(self.cmp(other))
135 }
136
137 #[inline]
138 fn lt(&self, other: &Self) -> bool {
139 let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
140 return false;
141 };
142
143 if th_delta.abs() > self.total_hits.1 {
144 return self.total_hits.0 >= other.total_hits.0;
145 }
146
147 let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
148 return false;
149 };
150
151 if lh_delta.abs() > self.last_hit.1 {
152 return self.last_hit.0 >= other.last_hit.0;
153 }
154
155 self.size < other.size
156 }
157 #[inline]
158 fn le(&self, other: &Self) -> bool {
159 let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
160 return false;
161 };
162
163 if th_delta.abs() > self.total_hits.1 {
164 return self.total_hits.0 > other.total_hits.0;
165 }
166
167 let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
168 return false;
169 };
170
171 if lh_delta.abs() > self.last_hit.1 {
172 return self.last_hit.0 > other.last_hit.0;
173 }
174
175 self.size <= other.size
176 }
177 #[inline]
178 fn gt(&self, other: &Self) -> bool {
179 let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
180 return false;
181 };
182
183 if th_delta.abs() > self.total_hits.1 {
184 return self.total_hits.0 <= other.total_hits.0;
185 }
186
187 let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
188 return false;
189 };
190
191 if lh_delta.abs() > self.last_hit.1 {
192 return self.last_hit.0 <= other.last_hit.0;
193 }
194
195 self.size > other.size
196 }
197 #[inline]
198 fn ge(&self, other: &Self) -> bool {
199 let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
200 return false;
201 };
202
203 if th_delta.abs() > self.total_hits.1 {
204 return self.total_hits.0 < other.total_hits.0;
205 }
206
207 let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
208 return false;
209 };
210
211 if lh_delta.abs() > self.last_hit.1 {
212 return self.last_hit.0 < other.last_hit.0;
213 }
214
215 self.size >= other.size
216 }
217}
218
219impl Ord for FRsEvictionCandidate {
220 #[inline]
221 fn cmp(&self, other: &Self) -> Ordering {
222 let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
223 return Ordering::Equal;
224 };
225
226 if th_delta.abs() > self.total_hits.1 {
227 return other.total_hits.0.cmp(&self.total_hits.0);
228 }
229
230 let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
231 return Ordering::Equal;
232 };
233
234 if lh_delta.abs() > self.last_hit.1 {
235 return other.last_hit.0.cmp(&self.last_hit.0);
236 }
237
238 self.size.cmp(&other.size)
239 }
240}
241
242#[derive(Debug)]
243struct AddressDetails {
244 compression: u8,
246
247 stored_at: u64,
249
250 last_hit: u64,
252
253 size: usize,
255
256 hit_distribution: VecDeque<u64>,
264
265 dependencies: Vec<CacheDependency>,
266}
267
268type AddressesMap =
270 Arc<Mutex<HashMap<(u8, u8), (usize, usize, BTreeMap<String, ArrayVec<AddressDetails, 5>>)>>>;
271
272type EvictionQueue = Arc<Mutex<BinaryHeap<FRsEvictionCandidate>>>;
274
275type DependencyMap = Arc<Mutex<HashMap<CacheDependency, BTreeSet<(u8, u8, String, u8)>>>>;
277
278pub struct CacheStore {
279 pub limits: CacheLimits,
280 env: Arc<Environment>,
281
282 cache_db: Arc<Database<'static>>,
284
285 log_size: bool,
286
287 addresses_map: AddressesMap,
288 eviction_queue: EvictionQueue,
289
290 dependency_map: DependencyMap,
291}
292
293impl CacheStore {
294 #[allow(clippy::too_many_lines, clippy::missing_panics_doc)]
295 pub fn new(
296 limits: CacheLimits,
297 env: &Arc<Environment>,
298 log_size: bool,
299 ) -> anyhow::Result<Self> {
300 let eviction_queue = BinaryHeap::new();
302
303 let mut addresses_map = HashMap::new();
304 let mut dependency_map = HashMap::new();
305
306 let cache_db = Arc::new(Database::open(
307 env.clone(),
308 Some("cache"),
309 &DatabaseOptions::new(lmdb::db::Flags::CREATE),
310 )?);
311
312 let txn = ReadTransaction::new(env.clone())?;
313 let access = txn.access();
314
315 let mut cursor = txn.cursor(cache_db.clone())?;
316
317 if let Ok((k, v)) = cursor.seek_range_k::<[u8], [u8]>(&access, &[0u8]) {
318 let mut key = k;
319 let mut value = v;
320
321 loop {
322 if key.len() == 3 && key[0] == 0 {
323 if let Ok(decompressed) = zstd::stream::decode_all(value)
324 && !decompressed.is_empty()
325 {
326 let root = flexbuffers::Reader::get_root(decompressed.as_slice())?;
327
328 let addresses_vec = root.as_vector();
329
330 let mut inner_addresses_map = BTreeMap::new();
331
332 let mut total_size = 0;
333 let mut total_count = 0;
334
335 for address in &addresses_vec {
336 let address_vec = address.as_vector();
337 let address = address_vec.idx(0).as_str();
338
339 let mut variants = ArrayVec::<AddressDetails, 5>::new();
340
341 for variant in &address_vec.idx(1).as_vector() {
342 let variant_vec = variant.as_vector();
343
344 let compression = variant_vec.idx(0).as_u8();
345
346 tracing::debug!(
347 kind = key[1],
348 i = key[2],
349 address,
350 compression,
351 "restoring from sync"
352 );
353
354 let last_hit = variant_vec.idx(1).as_u64();
355
356 let mut hit_distribution = VecDeque::new();
357
358 for hit in &variant_vec.idx(2).as_vector() {
359 hit_distribution.push_back(hit.as_u64());
360 }
361
362 let mut lookup = BytesMut::new();
363
364 lookup.put(&key[1..3]);
365 lookup.put(address.as_bytes());
366 lookup.put_u8(compression);
367
368 if let Ok(val) =
369 access.get::<[u8], [u8]>(&cache_db, lookup.as_ref())
370 {
371 Self::process_details(
372 key[1],
373 key[2],
374 address,
375 &mut total_size,
376 &mut total_count,
377 &mut variants,
378 compression,
379 last_hit,
380 hit_distribution,
381 &lookup,
382 val,
383 false,
384 &mut dependency_map,
385 )?;
386 }
387 }
388
389 inner_addresses_map.insert(address.to_owned(), variants);
390 }
391
392 addresses_map.insert(
393 (key[1], key[2]),
394 (total_size, total_count, inner_addresses_map),
395 );
396 }
397 } else {
398 let (total_size, total_count, addresses) = addresses_map
399 .entry((key[0], key[1]))
400 .or_insert((0, 0, BTreeMap::new()));
401
402 let address = std::str::from_utf8(&key[2..key.len() - 1])?;
403 let compression = *key.last().expect("length is not greater than 1");
404
405 let variants = addresses
406 .entry(address.to_owned())
407 .or_insert(ArrayVec::new());
408
409 if !variants.iter().any(|v| v.compression == compression) {
410 tracing::debug!(
411 kind = key[0],
412 i = key[1],
413 address,
414 compression,
415 "restoring from cache"
416 );
417
418 Self::process_details(
419 key[0],
420 key[1],
421 address,
422 total_size,
423 total_count,
424 variants,
425 compression,
426 0,
427 VecDeque::new(),
428 key,
429 value,
430 true,
431 &mut dependency_map,
432 )?;
433 }
434 }
435
436 if let Ok((k, v)) = cursor.next::<[u8], [u8]>(&access) {
437 key = k;
438 value = v;
439 } else {
440 break;
441 }
442 }
443 }
444
445 Ok(Self {
446 limits,
447 env: env.clone(),
448 cache_db,
449 log_size,
450 addresses_map: Arc::new(Mutex::new(addresses_map)),
451 eviction_queue: Arc::new(Mutex::new(eviction_queue)),
452 dependency_map: Arc::new(Mutex::new(dependency_map)),
453 })
454 }
455
456 #[allow(clippy::too_many_arguments)]
457 fn process_details(
458 artifact_kind: u8,
459 idx: u8,
460 address: &str,
461 total_size: &mut usize,
462 total_count: &mut usize,
463 variants: &mut ArrayVec<AddressDetails, 5>,
464 compression: u8,
465 last_hit: u64,
466 hit_distribution: VecDeque<u64>,
467 lookup: &[u8],
468 val: &[u8],
469 last_hit_is_stored_at: bool,
470 dependency_map: &mut HashMap<CacheDependency, BTreeSet<(u8, u8, String, u8)>>,
471 ) -> anyhow::Result<()> {
472 let root = flexbuffers::Reader::get_root(val)?;
473 let internal_vec = root.as_vector().idx(0).as_vector();
474
475 let mut dependencies = vec![];
476
477 for dep in &internal_vec.idx(1).as_vector() {
478 let dep_vec = dep.as_vector();
479
480 let kind = dep_vec.idx(0).as_u8();
481
482 if kind == 0 {
483 dependencies.push(CacheDependency::Content);
484 dependency_map
485 .entry(CacheDependency::Content)
486 .or_default()
487 .insert((artifact_kind, idx, address.to_owned(), compression));
488 } else if kind == 1 {
489 let uuid: [u8; 16] = dep_vec.idx(1).as_blob().0.try_into()?;
490 dependencies.push(CacheDependency::Model(uuid));
491 dependency_map
492 .entry(CacheDependency::Model(uuid))
493 .or_default()
494 .insert((artifact_kind, idx, address.to_owned(), compression));
495 }
496 }
497
498 let size = lookup.len() + val.len();
499
500 *total_size += size;
501 *total_count += 1;
502
503 variants.push(AddressDetails {
504 compression,
505 last_hit: if last_hit_is_stored_at {
506 internal_vec.idx(0).as_u64()
507 } else {
508 last_hit
509 },
510 hit_distribution,
511
512 size,
513 stored_at: internal_vec.idx(0).as_u64(),
514 dependencies,
515 });
516 Ok(())
517 }
518
519 #[allow(clippy::type_complexity)]
521 #[instrument(skip_all, err)]
522 pub async fn check<'a>(
523 &self,
524 cache_kind: &CacheRead,
525 compression: &'a ArrayVec<CacheCompression, 4>,
526 idx: u8,
527 addr: &str,
528 ) -> anyhow::Result<Option<(Bytes, Option<&'a CacheCompression>)>> {
529 let mut key = BytesMut::new();
530
531 let addr_bytes = addr.as_bytes();
532 let base_key_len = addr_bytes.len() + 2;
533
534 key.put_u8(cache_kind.as_u8());
535 key.put_u8(idx);
536 key.put(addr_bytes);
537
538 let txn = ReadTransaction::new(self.env.clone())?;
539 let access = txn.access();
540
541 for compression in compression {
542 key.truncate(base_key_len);
543 key.put_u8(compression.as_u8());
544
545 if let Ok(result) = access.get::<[u8], [u8]>(&self.cache_db, key.as_ref()) {
546 let mut lock = self.addresses_map.lock();
547
548 if let Some((_total_size, _total_count, addresses)) =
549 lock.get_mut(&(cache_kind.as_u8(), idx))
550 && let Some(details) = addresses.get_mut(addr)
551 && let Some(details) = details
552 .iter_mut()
553 .find(|v| v.compression == compression.as_u8())
554 {
555 details.last_hit = SystemTime::now()
556 .duration_since(SystemTime::UNIX_EPOCH)?
557 .as_secs();
558
559 if let Some(back) = details.hit_distribution.back_mut() {
560 *back += 1;
561 }
562 }
563
564 drop(lock);
565
566 if self.log_size {
567 tracing::info!(size = %bytesize::ByteSize((key.len() + result.len()) as u64).display().si_short(), compressed = compression.as_str(), "hit");
568 } else {
569 tracing::info!(compressed = compression.as_str(), "hit");
570 }
571 return Ok(Some((Bytes::copy_from_slice(result), Some(compression))));
572 }
573 }
574
575 key.truncate(base_key_len);
576 key.put_u8(0);
577
578 if let Ok(result) = access.get::<[u8], [u8]>(&self.cache_db, key.as_ref()) {
579 let mut lock = self.addresses_map.lock();
580
581 if let Some((_total_size, _total_count, addresses)) =
582 lock.get_mut(&(cache_kind.as_u8(), idx))
583 && let Some(details) = addresses.get_mut(addr)
584 && let Some(details) = details.iter_mut().find(|v| v.compression == 0)
585 {
586 details.last_hit = SystemTime::now()
587 .duration_since(SystemTime::UNIX_EPOCH)?
588 .as_secs();
589
590 if let Some(back) = details.hit_distribution.back_mut() {
591 *back += 1;
592 }
593 }
594
595 drop(lock);
596
597 if self.log_size {
598 tracing::info!(size = %bytesize::ByteSize((key.len() + result.len()) as u64).display().si_short(), compressed = "false", "hit");
599 } else {
600 tracing::info!(compressed = "false", "hit");
601 }
602 Ok(Some((Bytes::copy_from_slice(result), None)))
603 } else {
604 tracing::info!("miss");
605 Ok(None)
606 }
607 }
608
609 #[allow(
611 clippy::too_many_lines,
612 clippy::too_many_arguments,
613 clippy::similar_names
614 )]
615 #[instrument(skip_all, err)]
616 pub async fn write(
617 &self,
618 cache_kind: CacheWrite<'_>,
619 compression: Option<&CacheCompression>,
620 idx: u8,
621 config: &StoredCacheConfig,
622 addr: &str,
623 dependencies: Vec<CacheDependency>,
624 ) -> anyhow::Result<()> {
625 let now = SystemTime::now()
626 .duration_since(SystemTime::UNIX_EPOCH)?
627 .as_secs();
628
629 let dependencies = if config.evict_on_dependency_change == Some(true) {
630 dependencies
631 } else {
632 vec![]
633 };
634
635 let mut builder = flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
636 let mut builder_vec = builder.start_vector();
637
638 let mut internal_vec = builder_vec.start_vector();
639
640 internal_vec.push(now);
641
642 let mut deps_vec = internal_vec.start_vector();
643
644 for dep in &dependencies {
645 let mut dep_vec = deps_vec.start_vector();
646
647 match dep {
648 CacheDependency::Content => {
649 dep_vec.push(0);
650 }
651 CacheDependency::Model(uuid) => {
652 dep_vec.push(1);
653 dep_vec.push(flexbuffers::Blob(uuid.as_ref()));
654 }
655 }
656
657 dep_vec.end_vector();
658 }
659
660 deps_vec.end_vector();
661 internal_vec.end_vector();
662
663 match cache_kind {
664 CacheWrite::Template(etag, last_modified, blob) => {
665 builder_vec.push(etag);
666
667 builder_vec.push(last_modified);
668 builder_vec.push(flexbuffers::Blob(blob.as_ref()));
669 }
670 _ => unimplemented!(),
671 }
672
673 builder_vec.end_vector();
674
675 let val = builder.view();
676
677 let mut base_key = BytesMut::new();
678
679 base_key.put_u8(cache_kind.as_u8());
680 base_key.put_u8(idx);
681
682 let mut key = base_key.clone();
683 key.put(addr.as_bytes());
684
685 let compression_byte = compression.map_or(0, CacheCompression::as_u8);
686 let compression_str = compression.map_or("false", CacheCompression::as_str);
687
688 key.put_u8(compression_byte);
689
690 let size = val.len() + key.len();
691
692 if let Some(max_size) = config.max_size
693 && size > usize::try_from(max_size)?
694 {
695 tracing::warn!(
696 address = addr,
697 compressed = compression_str,
698 "exceeds 'max_size' for entire cache"
699 );
700 return Ok(());
701 }
702
703 let mut hit_distribution = VecDeque::new();
704 hit_distribution.push_back(1);
705
706 {
707 let txn = WriteTransaction::new(self.env.clone())?;
708 let mut lock = self.addresses_map.lock();
709
710 let (total_size, total_count, addresses) = lock
711 .entry((cache_kind.as_u8(), idx))
712 .or_insert((0, 0, BTreeMap::new()));
713
714 if let Some(max_size) = config.max_size
715 && *total_size + size > usize::try_from(max_size)?
716 {
717 let mut evicted_size: usize = 0;
718 let mut lock = self.eviction_queue.lock();
719
720 let mut dep_lock = self.dependency_map.lock();
721
722 while evicted_size < size {
723 if let Some(FRsEvictionCandidate {
724 size,
725 address,
726 index,
727 ..
728 }) = (*lock).pop()
729 && let Some(variations) = addresses.get_mut(&address)
730 {
731 let variation = variations.remove(index);
732
733 for dep in variation.dependencies {
734 if let Some(dep) = dep_lock.get_mut(&dep) {
735 dep.remove(&(
736 cache_kind.as_u8(),
737 idx,
738 address.clone(),
739 u8::try_from(index)?,
740 ));
741 }
742 }
743
744 base_key.truncate(2);
745 base_key.put(address.as_bytes());
746 base_key.put_u8(variation.compression);
747
748 let mut access = txn.access();
749 if access.del_key(&self.cache_db, base_key.as_ref()).is_ok() {
750 evicted_size += size;
751
752 *total_count -= 1;
753 *total_size -= variation.size;
754 }
755 }
756 }
757
758 drop(lock);
759 drop(dep_lock);
760 } else if let Some(max_count) = config.max_count
761 && *total_count + 1 > max_count
762 {
763 let mut evicted = false;
764 let mut lock = self.eviction_queue.lock();
765
766 let mut dep_lock = self.dependency_map.lock();
767
768 while !evicted {
769 if let Some(FRsEvictionCandidate { address, index, .. }) = (*lock).pop()
770 && let Some(variations) = addresses.get_mut(&address)
771 {
772 let variation = variations.remove(index);
773
774 for dep in variation.dependencies {
775 if let Some(dep) = dep_lock.get_mut(&dep) {
776 dep.remove(&(
777 cache_kind.as_u8(),
778 idx,
779 address.clone(),
780 u8::try_from(index)?,
781 ));
782 }
783 }
784
785 base_key.put(address.as_bytes());
786 base_key.put_u8(variation.compression);
787
788 let mut access = txn.access();
789 if access.del_key(&self.cache_db, base_key.as_ref()).is_ok() {
790 evicted = true;
791
792 *total_count -= 1;
793 *total_size -= variation.size;
794 }
795 }
796 }
797
798 drop(lock);
799 drop(dep_lock);
800 }
801
802 *total_size += size;
803 *total_count += 1;
804
805 let details = addresses.entry(addr.to_string()).or_insert(ArrayVec::new());
806
807 let new_details = AddressDetails {
808 compression: compression_byte,
809 stored_at: now,
810 last_hit: now,
811 size,
812 hit_distribution,
813 dependencies: dependencies.clone(),
814 };
815
816 if let Some(existing_pos) = details
817 .iter()
818 .position(|v| v.compression == compression_byte)
819 {
820 details[existing_pos] = new_details;
821 } else {
822 details.push(new_details);
823 }
824
825 drop(lock);
826
827 let mut lock = self.dependency_map.lock();
828
829 for dependency in dependencies {
830 let existing_dep = lock.entry(dependency).or_insert(BTreeSet::new());
831 existing_dep.insert((cache_kind.as_u8(), idx, addr.to_string(), compression_byte));
832 }
833
834 drop(lock);
835
836 {
837 let mut access = txn.access();
838 access.put(&self.cache_db, key.as_ref(), val, &put::Flags::empty())?;
839 }
840
841 txn.commit()?;
842 }
843
844 if self.log_size {
845 tracing::info!(
846 size = %bytesize::ByteSize(size as u64).display().si_short(),
847 compressed = compression_str,
848 "stored"
849 );
850 } else {
851 tracing::info!(compressed = compression_str, "stored");
852 }
853
854 self.sync(&CacheRead::from_write(&cache_kind), idx).await?;
855
856 Ok(())
857 }
858
859 #[instrument(skip_all, err)]
860 pub async fn dependency_evict(&self, dependencies: Vec<CacheDependency>) -> anyhow::Result<()> {
861 let mut sync_list = BTreeSet::new();
862
863 {
864 let txn = WriteTransaction::new(self.env.clone())?;
865
866 let mut lock_dep_map = self.dependency_map.lock();
867 let mut lock_addr_map = self.addresses_map.lock();
868
869 {
870 let mut access = txn.access();
871
872 for dependency in dependencies {
873 if let Some(addrs) = lock_dep_map.get(&dependency) {
874 for (kind, service_idx, addr, compression_idx) in addrs {
875 if let Some((_, _, variants_map)) =
876 lock_addr_map.get_mut(&(*kind, *service_idx))
877 && let Some(variants) = variants_map.get_mut(addr)
878 {
879 variants.remove((*compression_idx) as usize);
880
881 sync_list.insert((*kind, *service_idx));
882
883 let mut key = BytesMut::new();
884
885 key.put_u8(*kind);
886 key.put_u8(*service_idx);
887
888 key.put(addr.as_bytes());
889 key.put_u8(*compression_idx);
890
891 tracing::debug!(
892 kind,
893 i = service_idx,
894 address = addr,
895 compression = compression_idx,
896 "evicting for dependency"
897 );
898
899 access.del_key(&self.cache_db, key.as_ref())?;
900 }
901 }
902 }
903
904 lock_dep_map.remove(&dependency);
905 }
906 }
907
908 txn.commit()?;
909 }
910
911 for (kind, idx) in sync_list {
912 self.sync(&CacheRead::from_u8(kind), idx).await?;
913 }
914
915 Ok(())
916 }
917
918 #[instrument(skip_all, err)]
919 pub async fn artifact_evict(&self, kind: CacheRead, idx: u8) -> anyhow::Result<()> {
920 let mut key = BytesMut::new();
921
922 key.put_u8(kind.as_u8());
923 key.put_u8(idx);
924
925 {
926 let txn = WriteTransaction::new(self.env.clone())?;
927
928 let mut lock_dep_map = self.dependency_map.lock();
929 let mut lock_addr_map = self.addresses_map.lock();
930
931 {
932 let mut access = txn.access();
933
934 if let Some((total_size, total_count, addrs_map)) =
935 lock_addr_map.get_mut(&(kind.as_u8(), idx))
936 {
937 for (addr, variants) in addrs_map.iter() {
938 key.truncate(2);
939
940 let addr_bytes = addr.as_bytes();
941 let base_len = addr_bytes.len() + 2;
942
943 key.put(addr_bytes);
944
945 for variant in variants {
946 key.truncate(base_len);
947 key.put_u8(variant.compression);
948
949 tracing::debug!(
950 kind = kind.as_u8(),
951 i = idx,
952 address = addr,
953 compression = variant.compression,
954 "evicting for artifact"
955 );
956
957 access.del_key(&self.cache_db, key.as_ref())?;
958
959 for dep in &variant.dependencies {
960 if let Some(addrs) = lock_dep_map.get_mut(dep) {
961 addrs.remove(&(
962 kind.as_u8(),
963 idx,
964 addr.clone(),
965 variant.compression,
966 ));
967 }
968 }
969 }
970 }
971
972 addrs_map.clear();
973
974 *total_size = 0;
975 *total_count = 0;
976
977 if let Err(err) = access.del_key(&self.cache_db, &[0, kind.as_u8(), idx]) {
978 tracing::warn!(%err);
979 }
980 }
981 }
982
983 txn.commit()?;
984 }
985
986 self.sync(&kind, idx).await?;
987
988 Ok(())
989 }
990
991 #[allow(
993 clippy::missing_panics_doc,
994 clippy::cast_precision_loss,
995 clippy::too_many_lines
996 )]
997 #[instrument(skip_all, err)]
998 pub async fn clean_cache(
999 &self,
1000 cache_kind: &CacheRead,
1001 config: &StoredCacheConfig,
1002 idx: u8,
1003 ) -> anyhow::Result<()> {
1004 if let StoredCachePolicy::Permanent = config.policy {
1005 bail!("'Permanent' cache should never be cleaned up");
1006 }
1007
1008 let mut key = BytesMut::new();
1009
1010 key.put_u8(cache_kind.as_u8());
1011 key.put_u8(idx);
1012
1013 let now = SystemTime::now();
1014
1015 let min_stored_at = now
1016 .checked_sub(Duration::from_secs(config.max_ttl.unwrap_or(600))) .expect("time to work")
1018 .duration_since(SystemTime::UNIX_EPOCH)?
1019 .as_secs();
1020
1021 let min_last_hit = now
1022 .checked_sub(Duration::from_secs(config.hit_ttl.unwrap_or(300))) .expect("time to work")
1024 .duration_since(SystemTime::UNIX_EPOCH)?
1025 .as_secs();
1026
1027 let max_distribution = config.frequency_window.map(|frequency_window| {
1028 let (clean_min, clean_max) = config.clean_interval.unwrap_or((60, 60 * 3));
1030
1031 let avg_clean_interval = (clean_min + clean_max) as f64 / 2.0;
1032 frequency_window as f64 / avg_clean_interval
1033 });
1034
1035 let mut addrs_to_remove = vec![];
1036 let mut eviction_queue = BinaryHeap::new();
1037
1038 {
1039 let txn = WriteTransaction::new(self.env.clone())?;
1040 let mut lock = self.addresses_map.lock();
1041
1042 if let Some((total_size, total_count, addresses)) =
1043 lock.get_mut(&(cache_kind.as_u8(), idx))
1044 {
1045 tracing::info!(count = total_count, size = total_size, "before");
1046
1047 for (address, details) in addresses {
1048 for (i, variation) in details.iter_mut().enumerate() {
1049 if variation.stored_at < min_stored_at || variation.last_hit < min_last_hit
1050 {
1051 addrs_to_remove.push((address.clone(), variation.compression, i));
1052 } else if let Some(max_distribution) = max_distribution {
1053 if variation.hit_distribution.len() as f64 > max_distribution {
1054 variation.hit_distribution.pop_front();
1055 }
1056
1057 let total_hits: u64 = variation.hit_distribution.iter().sum();
1058
1059 match config.policy {
1060 StoredCachePolicy::FRs(th_eq_threshold, lh_equality_threshold) => {
1061 eviction_queue.push(FRsEvictionCandidate {
1062 total_hits: (
1063 total_hits.cast_signed(),
1064 th_eq_threshold.cast_signed(),
1065 ),
1066 last_hit: (
1067 variation.last_hit.cast_signed(),
1068 lh_equality_threshold.cast_signed(),
1069 ),
1070 size: variation.size,
1071 address: address.clone(),
1072 index: i,
1073 });
1074 }
1075 StoredCachePolicy::Permanent => unreachable!(),
1076 }
1077 }
1078 }
1079 }
1080 }
1081
1082 let mut dep_lock = self.dependency_map.lock();
1083
1084 if let Some((total_size, total_count, addresses)) =
1085 lock.get_mut(&(cache_kind.as_u8(), idx))
1086 {
1087 {
1088 let mut access = txn.access();
1089
1090 for (remove_addr, compression, i) in &addrs_to_remove {
1091 key.truncate(2);
1092 key.put(remove_addr.as_bytes());
1093 key.put_u8(*compression);
1094
1095 access.del_key(&self.cache_db, key.as_ref())?;
1096
1097 if let Some(variations) = addresses.get_mut(remove_addr) {
1098 let variation = variations.remove(*i);
1099
1100 for dep in variation.dependencies {
1101 if let Some(dep) = dep_lock.get_mut(&dep) {
1102 dep.remove(&(
1103 cache_kind.as_u8(),
1104 idx,
1105 remove_addr.clone(),
1106 *compression,
1107 ));
1108 }
1109 }
1110
1111 *total_size -= variation.size;
1112 *total_count -= 1;
1113 }
1114 }
1115
1116 tracing::info!(count = total_count, size = total_size, "after");
1117 }
1118 }
1119
1120 let mut lock = self.eviction_queue.lock();
1121 *lock = eviction_queue;
1122
1123 txn.commit()?;
1124 }
1125
1126 self.sync(cache_kind, idx).await?;
1127
1128 tracing::info!("cleaned");
1129
1130 Ok(())
1131 }
1132
1133 #[allow(clippy::similar_names)]
1135 #[instrument(skip_all, err)]
1136 pub async fn sync(&self, cache_kind: &CacheRead, idx: u8) -> anyhow::Result<()> {
1137 let mut builder = flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
1138
1139 {
1140 let lock = self.addresses_map.lock();
1141
1142 if let Some((_total_size, _total_count, addresses)) =
1143 lock.get(&(cache_kind.as_u8(), idx))
1144 {
1145 let mut addresses_vec = builder.start_vector();
1146
1147 for (address, variants) in addresses {
1148 let mut address_vec = addresses_vec.start_vector();
1149 address_vec.push(address.as_str());
1150
1151 let mut variants_vec = address_vec.start_vector();
1152
1153 for variant in variants {
1154 let mut variant_vec = variants_vec.start_vector();
1155
1156 variant_vec.push(variant.compression);
1157 variant_vec.push(variant.last_hit);
1158
1159 let mut hit_distribution_vec = variant_vec.start_vector();
1160
1161 for hit_count in &variant.hit_distribution {
1162 hit_distribution_vec.push(*hit_count);
1163 }
1164
1165 hit_distribution_vec.end_vector();
1166 variant_vec.end_vector();
1167 }
1168
1169 variants_vec.end_vector();
1170 address_vec.end_vector();
1171 }
1172
1173 addresses_vec.end_vector();
1174 }
1175 }
1176
1177 let mut encoder = ZstdEncoder::with_quality(Vec::new(), Level::Precise(17));
1178 encoder.write_all(builder.view()).await?;
1179 encoder.shutdown().await?;
1180 let compressed = encoder.into_inner();
1181
1182 {
1183 let txn = WriteTransaction::new(self.env.clone())?;
1184
1185 {
1186 let mut access = txn.access();
1187 access.put(
1188 &self.cache_db,
1189 &[0, cache_kind.as_u8(), idx],
1190 compressed.as_slice(),
1191 &put::Flags::empty(),
1192 )?;
1193 }
1194
1195 txn.commit()?;
1196 }
1197
1198 if self.log_size {
1199 tracing::info!(size = %bytesize::ByteSize(compressed.len() as u64).display().si_short(), "synced");
1200 } else {
1201 tracing::info!("synced");
1202 }
1203
1204 Ok(())
1205 }
1206}