1use arrayvec::ArrayVec;
2use bytes::{BufMut, Bytes, BytesMut};
3use hashbrown::HashMap;
4use ordinary_config::{CacheLimits, StoredCache as StoredCacheConfig, StoredCachePolicy};
5use parking_lot::Mutex;
6use saferlmdb::{
7 self as lmdb, Database, DatabaseOptions, Environment, ReadTransaction, WriteTransaction, put,
8};
9use std::cmp::Ordering;
10use std::collections::{BTreeMap, BTreeSet, BinaryHeap, VecDeque};
11use std::error::Error;
12use std::sync::Arc;
13use std::time::{Duration, SystemTime};
14use tracing::instrument;
15
16pub enum CacheRead {
17 Template,
18 Action,
19 Integration,
20}
21
22impl CacheRead {
23 fn as_u8(&self) -> u8 {
24 match self {
25 CacheRead::Action => 1,
26 CacheRead::Template => 2,
27 CacheRead::Integration => 3,
28 }
29 }
30}
31
32pub enum CacheWrite<'a> {
33 Template(&'a str, &'a str, &'a Bytes),
35 Action,
36 Integration,
37}
38
39impl CacheWrite<'_> {
40 fn as_u8(&self) -> u8 {
41 match self {
42 CacheWrite::Action => 1,
43 CacheWrite::Template(..) => 2,
44 CacheWrite::Integration => 3,
45 }
46 }
47}
48
49#[derive(Debug)]
50pub enum CacheCompression {
51 Gzip,
52 Zstd { level: u8 },
53 Brotli,
54 Deflate,
55}
56
57impl CacheCompression {
58 fn as_u8(&self) -> u8 {
59 match self {
60 CacheCompression::Gzip => 1,
61 CacheCompression::Zstd { level: _ } => 2,
62 CacheCompression::Brotli => 3,
63 CacheCompression::Deflate => 4,
64 }
65 }
66
67 #[must_use]
68 pub fn as_char(&self) -> char {
69 match self {
70 CacheCompression::Gzip => '1',
71 CacheCompression::Zstd { level: _ } => '2',
72 CacheCompression::Brotli => '3',
73 CacheCompression::Deflate => '4',
74 }
75 }
76
77 #[must_use]
78 pub fn as_str(&self) -> &'static str {
79 match self {
80 CacheCompression::Gzip => "gzip",
81 CacheCompression::Zstd { level: _ } => "zstd",
82 CacheCompression::Brotli => "br",
83 CacheCompression::Deflate => "deflate",
84 }
85 }
86}
87
88#[derive(Debug, Clone, Eq, Hash, PartialEq)]
89pub enum CacheDependency {
90 Content,
91 Model([u8; 16]),
92}
93
94#[derive(Debug, Hash, PartialEq, Eq, Default)]
97pub struct FRsEvictionCandidate {
98 total_hits: (i64, i64),
100 last_hit: (i64, i64),
102 size: usize,
103 address: String,
104 index: usize,
105}
106
107impl PartialOrd for FRsEvictionCandidate {
108 #[inline]
109 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
110 Some(self.cmp(other))
111 }
112
113 #[inline]
114 fn lt(&self, other: &Self) -> bool {
115 let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
116 return false;
117 };
118
119 if th_delta.abs() > self.total_hits.1 {
120 return self.total_hits.0 >= other.total_hits.0;
121 }
122
123 let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
124 return false;
125 };
126
127 if lh_delta.abs() > self.last_hit.1 {
128 return self.last_hit.0 >= other.last_hit.0;
129 }
130
131 self.size < other.size
132 }
133 #[inline]
134 fn le(&self, other: &Self) -> bool {
135 let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
136 return false;
137 };
138
139 if th_delta.abs() > self.total_hits.1 {
140 return self.total_hits.0 > other.total_hits.0;
141 }
142
143 let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
144 return false;
145 };
146
147 if lh_delta.abs() > self.last_hit.1 {
148 return self.last_hit.0 > other.last_hit.0;
149 }
150
151 self.size <= other.size
152 }
153 #[inline]
154 fn gt(&self, other: &Self) -> bool {
155 let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
156 return false;
157 };
158
159 if th_delta.abs() > self.total_hits.1 {
160 return self.total_hits.0 <= other.total_hits.0;
161 }
162
163 let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
164 return false;
165 };
166
167 if lh_delta.abs() > self.last_hit.1 {
168 return self.last_hit.0 <= other.last_hit.0;
169 }
170
171 self.size > other.size
172 }
173 #[inline]
174 fn ge(&self, other: &Self) -> bool {
175 let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
176 return false;
177 };
178
179 if th_delta.abs() > self.total_hits.1 {
180 return self.total_hits.0 < other.total_hits.0;
181 }
182
183 let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
184 return false;
185 };
186
187 if lh_delta.abs() > self.last_hit.1 {
188 return self.last_hit.0 < other.last_hit.0;
189 }
190
191 self.size >= other.size
192 }
193}
194
195impl Ord for FRsEvictionCandidate {
196 #[inline]
197 fn cmp(&self, other: &Self) -> Ordering {
198 let Some(th_delta) = other.total_hits.0.checked_sub(self.total_hits.0) else {
199 return Ordering::Equal;
200 };
201
202 if th_delta.abs() > self.total_hits.1 {
203 return other.total_hits.0.cmp(&self.total_hits.0);
204 }
205
206 let Some(lh_delta) = other.last_hit.0.checked_sub(self.last_hit.0) else {
207 return Ordering::Equal;
208 };
209
210 if lh_delta.abs() > self.last_hit.1 {
211 return other.last_hit.0.cmp(&self.last_hit.0);
212 }
213
214 self.size.cmp(&other.size)
215 }
216}
217
218#[derive(Debug)]
219struct AddressDetails {
220 compression: u8,
222
223 stored_at: u64,
225
226 last_hit: u64,
228
229 size: usize,
231
232 hit_distribution: VecDeque<u64>,
240
241 dependencies: Vec<CacheDependency>,
242}
243
244type AddressesMap =
246 Arc<Mutex<HashMap<(u8, u8), (usize, usize, BTreeMap<String, ArrayVec<AddressDetails, 5>>)>>>;
247
248type EvictionQueue = Arc<Mutex<BinaryHeap<FRsEvictionCandidate>>>;
250
251type DependencyMap = Arc<Mutex<HashMap<CacheDependency, BTreeSet<(u8, u8, String, u8)>>>>;
253
254pub struct CacheStore {
255 pub limits: CacheLimits,
256 env: Arc<Environment>,
257
258 cache_db: Arc<Database<'static>>,
260
261 log_size: bool,
262
263 addresses_map: AddressesMap,
264 eviction_queue: EvictionQueue,
265
266 dependency_map: DependencyMap,
267}
268
269impl CacheStore {
270 #[allow(clippy::too_many_lines, clippy::missing_panics_doc)]
271 pub fn new(
272 limits: CacheLimits,
273 env: &Arc<Environment>,
274 log_size: bool,
275 ) -> Result<Self, Box<dyn Error>> {
276 let eviction_queue = BinaryHeap::new();
278
279 let mut addresses_map = HashMap::new();
280 let mut dependency_map = HashMap::new();
281
282 let cache_db = Arc::new(Database::open(
283 env.clone(),
284 Some("cache"),
285 &DatabaseOptions::new(lmdb::db::Flags::CREATE),
286 )?);
287
288 let txn = ReadTransaction::new(env.clone())?;
289 let access = txn.access();
290
291 let mut cursor = txn.cursor(cache_db.clone())?;
292
293 if let Ok((k, v)) = cursor.seek_range_k::<[u8], [u8]>(&access, &[0u8]) {
294 let mut key = k;
295 let mut value = v;
296
297 loop {
298 if key.len() == 3 && key[0] == 0 {
299 if let Ok(decompressed) = zstd::stream::decode_all(value)
300 && !decompressed.is_empty()
301 {
302 let root = flexbuffers::Reader::get_root(decompressed.as_slice())?;
303
304 let addresses_vec = root.as_vector();
305
306 let mut inner_addresses_map = BTreeMap::new();
307
308 let mut total_size = 0;
309 let mut total_count = 0;
310
311 for address in &addresses_vec {
312 let address_vec = address.as_vector();
313 let address = address_vec.idx(0).as_str();
314
315 let mut variants = ArrayVec::<AddressDetails, 5>::new();
316
317 for variant in &address_vec.idx(1).as_vector() {
318 let variant_vec = variant.as_vector();
319
320 let compression = variant_vec.idx(0).as_u8();
321
322 tracing::debug!(
323 kind = key[1],
324 i = key[2],
325 address,
326 compression,
327 "restoring from sync"
328 );
329
330 let last_hit = variant_vec.idx(1).as_u64();
331
332 let mut hit_distribution = VecDeque::new();
333
334 for hit in &variant_vec.idx(2).as_vector() {
335 hit_distribution.push_back(hit.as_u64());
336 }
337
338 let mut lookup = BytesMut::new();
339
340 lookup.put(&key[1..3]);
341 lookup.put(address.as_bytes());
342 lookup.put_u8(compression);
343
344 if let Ok(val) =
345 access.get::<[u8], [u8]>(&cache_db, lookup.as_ref())
346 {
347 Self::process_details(
348 key[1],
349 key[2],
350 address,
351 &mut total_size,
352 &mut total_count,
353 &mut variants,
354 compression,
355 last_hit,
356 hit_distribution,
357 &lookup,
358 val,
359 false,
360 &mut dependency_map,
361 )?;
362 }
363 }
364
365 inner_addresses_map.insert(address.to_owned(), variants);
366 }
367
368 addresses_map.insert(
369 (key[1], key[2]),
370 (total_size, total_count, inner_addresses_map),
371 );
372 }
373 } else {
374 let (total_size, total_count, addresses) = addresses_map
375 .entry((key[0], key[1]))
376 .or_insert((0, 0, BTreeMap::new()));
377
378 let address = std::str::from_utf8(&key[2..key.len() - 1])?;
379 let compression = *key.last().expect("length is not greater than 1");
380
381 let variants = addresses
382 .entry(address.to_owned())
383 .or_insert(ArrayVec::new());
384
385 if !variants.iter().any(|v| v.compression == compression) {
386 tracing::debug!(
387 kind = key[0],
388 i = key[1],
389 address,
390 compression,
391 "restoring from cache"
392 );
393
394 Self::process_details(
395 key[0],
396 key[1],
397 address,
398 total_size,
399 total_count,
400 variants,
401 compression,
402 0,
403 VecDeque::new(),
404 key,
405 value,
406 true,
407 &mut dependency_map,
408 )?;
409 }
410 }
411
412 if let Ok((k, v)) = cursor.next::<[u8], [u8]>(&access) {
413 key = k;
414 value = v;
415 } else {
416 break;
417 }
418 }
419 }
420
421 Ok(Self {
422 limits,
423 env: env.clone(),
424 cache_db,
425 log_size,
426 addresses_map: Arc::new(Mutex::new(addresses_map)),
427 eviction_queue: Arc::new(Mutex::new(eviction_queue)),
428 dependency_map: Arc::new(Mutex::new(dependency_map)),
429 })
430 }
431
432 #[allow(clippy::too_many_arguments)]
433 fn process_details(
434 artifact_kind: u8,
435 idx: u8,
436 address: &str,
437 total_size: &mut usize,
438 total_count: &mut usize,
439 variants: &mut ArrayVec<AddressDetails, 5>,
440 compression: u8,
441 last_hit: u64,
442 hit_distribution: VecDeque<u64>,
443 lookup: &[u8],
444 val: &[u8],
445 last_hit_is_stored_at: bool,
446 dependency_map: &mut HashMap<CacheDependency, BTreeSet<(u8, u8, String, u8)>>,
447 ) -> Result<(), Box<dyn Error>> {
448 let root = flexbuffers::Reader::get_root(val)?;
449 let internal_vec = root.as_vector().idx(0).as_vector();
450
451 let mut dependencies = vec![];
452
453 for dep in &internal_vec.idx(1).as_vector() {
454 let dep_vec = dep.as_vector();
455
456 let kind = dep_vec.idx(0).as_u8();
457
458 if kind == 0 {
459 dependencies.push(CacheDependency::Content);
460 dependency_map
461 .entry(CacheDependency::Content)
462 .or_default()
463 .insert((artifact_kind, idx, address.to_owned(), compression));
464 } else if kind == 1 {
465 let uuid: [u8; 16] = dep_vec.idx(1).as_blob().0.try_into()?;
466 dependencies.push(CacheDependency::Model(uuid));
467 dependency_map
468 .entry(CacheDependency::Model(uuid))
469 .or_default()
470 .insert((artifact_kind, idx, address.to_owned(), compression));
471 }
472 }
473
474 let size = lookup.len() + val.len();
475
476 *total_size += size;
477 *total_count += 1;
478
479 variants.push(AddressDetails {
480 compression,
481 last_hit: if last_hit_is_stored_at {
482 internal_vec.idx(0).as_u64()
483 } else {
484 last_hit
485 },
486 hit_distribution,
487
488 size,
489 stored_at: internal_vec.idx(0).as_u64(),
490 dependencies,
491 });
492 Ok(())
493 }
494
495 #[allow(clippy::type_complexity)]
497 #[instrument(skip(self, cache_kind, compression, idx, addr), err)]
498 pub fn check<'a>(
499 &self,
500 cache_kind: &CacheRead,
501 compression: &'a ArrayVec<CacheCompression, 4>,
502 idx: u8,
503 addr: &str,
504 ) -> Result<Option<(Bytes, Option<&'a CacheCompression>)>, Box<dyn Error>> {
505 let mut key = BytesMut::new();
506
507 let addr_bytes = addr.as_bytes();
508 let base_key_len = addr_bytes.len() + 2;
509
510 key.put_u8(cache_kind.as_u8());
511 key.put_u8(idx);
512 key.put(addr_bytes);
513
514 let txn = ReadTransaction::new(self.env.clone())?;
515 let access = txn.access();
516
517 for compression in compression {
518 key.truncate(base_key_len);
519 key.put_u8(compression.as_u8());
520
521 if let Ok(result) = access.get::<[u8], [u8]>(&self.cache_db, key.as_ref()) {
522 let mut lock = self.addresses_map.lock();
523
524 if let Some((_total_size, _total_count, addresses)) =
525 lock.get_mut(&(cache_kind.as_u8(), idx))
526 && let Some(details) = addresses.get_mut(addr)
527 && let Some(details) = details
528 .iter_mut()
529 .find(|v| v.compression == compression.as_u8())
530 {
531 details.last_hit = SystemTime::now()
532 .duration_since(SystemTime::UNIX_EPOCH)?
533 .as_secs();
534
535 if let Some(back) = details.hit_distribution.back_mut() {
536 *back += 1;
537 }
538 }
539
540 drop(lock);
541
542 if self.log_size {
543 tracing::info!(size = %bytesize::ByteSize((key.len() + result.len()) as u64).display().si_short(), compressed = compression.as_str(), "hit");
544 } else {
545 tracing::info!(compressed = compression.as_str(), "hit");
546 }
547 return Ok(Some((Bytes::copy_from_slice(result), Some(compression))));
548 }
549 }
550
551 key.truncate(base_key_len);
552 key.put_u8(0);
553
554 if let Ok(result) = access.get::<[u8], [u8]>(&self.cache_db, key.as_ref()) {
555 let mut lock = self.addresses_map.lock();
556
557 if let Some((_total_size, _total_count, addresses)) =
558 lock.get_mut(&(cache_kind.as_u8(), idx))
559 && let Some(details) = addresses.get_mut(addr)
560 && let Some(details) = details.iter_mut().find(|v| v.compression == 0)
561 {
562 details.last_hit = SystemTime::now()
563 .duration_since(SystemTime::UNIX_EPOCH)?
564 .as_secs();
565
566 if let Some(back) = details.hit_distribution.back_mut() {
567 *back += 1;
568 }
569 }
570
571 drop(lock);
572
573 if self.log_size {
574 tracing::info!(size = %bytesize::ByteSize((key.len() + result.len()) as u64).display().si_short(), compressed = "false", "hit");
575 } else {
576 tracing::info!(compressed = "false", "hit");
577 }
578 Ok(Some((Bytes::copy_from_slice(result), None)))
579 } else {
580 tracing::info!("miss");
581 Ok(None)
582 }
583 }
584
585 #[allow(
587 clippy::too_many_lines,
588 clippy::too_many_arguments,
589 clippy::similar_names
590 )]
591 #[instrument(
592 skip(self, cache_kind, compression, idx, config, addr, dependencies),
593 err
594 )]
595 pub fn write(
596 &self,
597 cache_kind: CacheWrite,
598 compression: Option<&CacheCompression>,
599 idx: u8,
600 config: &StoredCacheConfig,
601 addr: &str,
602 dependencies: Vec<CacheDependency>,
603 ) -> Result<(), Box<dyn Error>> {
604 let now = SystemTime::now()
605 .duration_since(SystemTime::UNIX_EPOCH)?
606 .as_secs();
607
608 let dependencies = if config.evict_on_dependency_change == Some(true) {
609 dependencies
610 } else {
611 vec![]
612 };
613
614 let mut builder = flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
615 let mut builder_vec = builder.start_vector();
616
617 let mut internal_vec = builder_vec.start_vector();
618
619 internal_vec.push(now);
620
621 let mut deps_vec = internal_vec.start_vector();
622
623 for dep in &dependencies {
624 let mut dep_vec = deps_vec.start_vector();
625
626 match dep {
627 CacheDependency::Content => {
628 dep_vec.push(0);
629 }
630 CacheDependency::Model(uuid) => {
631 dep_vec.push(1);
632 dep_vec.push(flexbuffers::Blob(uuid.as_ref()));
633 }
634 }
635
636 dep_vec.end_vector();
637 }
638
639 deps_vec.end_vector();
640 internal_vec.end_vector();
641
642 match cache_kind {
643 CacheWrite::Template(etag, last_modified, blob) => {
644 builder_vec.push(etag);
645
646 builder_vec.push(last_modified);
647 builder_vec.push(flexbuffers::Blob(blob.as_ref()));
648 }
649 _ => unimplemented!(),
650 }
651
652 builder_vec.end_vector();
653
654 let val = builder.view();
655
656 let mut base_key = BytesMut::new();
657
658 base_key.put_u8(cache_kind.as_u8());
659 base_key.put_u8(idx);
660
661 let mut key = base_key.clone();
662 key.put(addr.as_bytes());
663
664 let compression_byte = compression.map_or(0, CacheCompression::as_u8);
665 let compression_str = compression.map_or("false", CacheCompression::as_str);
666
667 key.put_u8(compression_byte);
668
669 let size = val.len() + key.len();
670
671 if let Some(max_size) = config.max_size
672 && size > usize::try_from(max_size)?
673 {
674 tracing::warn!(
675 address = addr,
676 compressed = compression_str,
677 "exceeds 'max_size' for entire cache"
678 );
679 return Ok(());
680 }
681
682 let mut hit_distribution = VecDeque::new();
683 hit_distribution.push_back(1);
684
685 let txn = WriteTransaction::new(self.env.clone())?;
686
687 let mut lock = self.addresses_map.lock();
688
689 let (total_size, total_count, addresses) = lock
690 .entry((cache_kind.as_u8(), idx))
691 .or_insert((0, 0, BTreeMap::new()));
692
693 if let Some(max_size) = config.max_size
694 && *total_size + size > usize::try_from(max_size)?
695 {
696 let mut evicted_size: usize = 0;
697 let mut lock = self.eviction_queue.lock();
698
699 let mut dep_lock = self.dependency_map.lock();
700
701 while evicted_size < size {
702 if let Some(FRsEvictionCandidate {
703 size,
704 address,
705 index,
706 ..
707 }) = (*lock).pop()
708 && let Some(variations) = addresses.get_mut(&address)
709 {
710 let variation = variations.remove(index);
711
712 for dep in variation.dependencies {
713 if let Some(dep) = dep_lock.get_mut(&dep) {
714 dep.remove(&(
715 cache_kind.as_u8(),
716 idx,
717 address.clone(),
718 u8::try_from(index)?,
719 ));
720 }
721 }
722
723 base_key.truncate(2);
724 base_key.put(address.as_bytes());
725 base_key.put_u8(variation.compression);
726
727 let mut access = txn.access();
728 if access.del_key(&self.cache_db, base_key.as_ref()).is_ok() {
729 evicted_size += size;
730
731 *total_count -= 1;
732 *total_size -= variation.size;
733 }
734 }
735 }
736
737 drop(lock);
738 drop(dep_lock);
739 } else if let Some(max_count) = config.max_count
740 && *total_count + 1 > max_count
741 {
742 let mut evicted = false;
743 let mut lock = self.eviction_queue.lock();
744
745 let mut dep_lock = self.dependency_map.lock();
746
747 while !evicted {
748 if let Some(FRsEvictionCandidate { address, index, .. }) = (*lock).pop()
749 && let Some(variations) = addresses.get_mut(&address)
750 {
751 let variation = variations.remove(index);
752
753 for dep in variation.dependencies {
754 if let Some(dep) = dep_lock.get_mut(&dep) {
755 dep.remove(&(
756 cache_kind.as_u8(),
757 idx,
758 address.clone(),
759 u8::try_from(index)?,
760 ));
761 }
762 }
763
764 base_key.put(address.as_bytes());
765 base_key.put_u8(variation.compression);
766
767 let mut access = txn.access();
768 if access.del_key(&self.cache_db, base_key.as_ref()).is_ok() {
769 evicted = true;
770
771 *total_count -= 1;
772 *total_size -= variation.size;
773 }
774 }
775 }
776
777 drop(lock);
778 drop(dep_lock);
779 }
780
781 *total_size += size;
782 *total_count += 1;
783
784 let details = addresses.entry(addr.to_string()).or_insert(ArrayVec::new());
785
786 let new_details = AddressDetails {
787 compression: compression_byte,
788 stored_at: now,
789 last_hit: now,
790 size,
791 hit_distribution,
792 dependencies: dependencies.clone(),
793 };
794
795 if let Some(existing_pos) = details
796 .iter()
797 .position(|v| v.compression == compression_byte)
798 {
799 details[existing_pos] = new_details;
800 } else {
801 details.push(new_details);
802 }
803
804 drop(lock);
805
806 let mut lock = self.dependency_map.lock();
807
808 for dependency in dependencies {
809 let existing_dep = lock.entry(dependency).or_insert(BTreeSet::new());
810 existing_dep.insert((cache_kind.as_u8(), idx, addr.to_string(), compression_byte));
811 }
812
813 drop(lock);
814
815 {
816 let mut access = txn.access();
817 access.put(&self.cache_db, key.as_ref(), val, &put::Flags::empty())?;
818 }
819
820 txn.commit()?;
821
822 if self.log_size {
823 tracing::info!(
824 size = %bytesize::ByteSize(size as u64).display().si_short(),
825 compressed = compression_str,
826 "stored"
827 );
828 } else {
829 tracing::info!(compressed = compression_str, "stored");
830 }
831
832 Ok(())
833 }
834
835 #[instrument(skip(self, dependencies), err)]
836 pub fn dependency_evict(
837 &self,
838 dependencies: Vec<CacheDependency>,
839 ) -> Result<(), Box<dyn Error>> {
840 let mut lock_dep_map = self.dependency_map.lock();
841 let mut lock_addr_map = self.addresses_map.lock();
842
843 let txn = WriteTransaction::new(self.env.clone())?;
844
845 {
846 let mut access = txn.access();
847
848 for dependency in dependencies {
849 if let Some(addrs) = lock_dep_map.get(&dependency) {
850 for (kind, service_idx, addr, compression_idx) in addrs {
851 if let Some((_, _, variants_map)) =
852 lock_addr_map.get_mut(&(*kind, *service_idx))
853 && let Some(variants) = variants_map.get_mut(addr)
854 {
855 variants.remove((*compression_idx) as usize);
856
857 let mut key = BytesMut::new();
858
859 key.put_u8(*kind);
860 key.put_u8(*service_idx);
861
862 key.put(addr.as_bytes());
863 key.put_u8(*compression_idx);
864
865 tracing::debug!(
866 kind,
867 i = service_idx,
868 address = addr,
869 compression = compression_idx,
870 "evicting for dependency"
871 );
872
873 access.del_key(&self.cache_db, key.as_ref())?;
874 }
875 }
876 }
877
878 lock_dep_map.remove(&dependency);
879 }
880 }
881
882 txn.commit()?;
883
884 drop(lock_dep_map);
885 drop(lock_addr_map);
886
887 Ok(())
888 }
889
890 #[instrument(skip(self, kind, idx), err)]
891 pub fn artifact_evict(&self, kind: CacheRead, idx: u8) -> Result<(), Box<dyn Error>> {
892 let mut key = BytesMut::new();
893
894 key.put_u8(kind.as_u8());
895 key.put_u8(idx);
896
897 let mut lock_dep_map = self.dependency_map.lock();
898 let mut lock_addr_map = self.addresses_map.lock();
899
900 let txn = WriteTransaction::new(self.env.clone())?;
901
902 {
903 let mut access = txn.access();
904
905 if let Some((total_size, total_count, addrs_map)) =
906 lock_addr_map.get_mut(&(kind.as_u8(), idx))
907 {
908 for (addr, variants) in addrs_map.iter() {
909 key.truncate(2);
910
911 let addr_bytes = addr.as_bytes();
912 let base_len = addr_bytes.len() + 2;
913
914 key.put(addr_bytes);
915
916 for variant in variants {
917 key.truncate(base_len);
918 key.put_u8(variant.compression);
919
920 tracing::debug!(
921 kind = kind.as_u8(),
922 i = idx,
923 address = addr,
924 compression = variant.compression,
925 "evicting for artifact"
926 );
927
928 access.del_key(&self.cache_db, key.as_ref())?;
929
930 for dep in &variant.dependencies {
931 if let Some(addrs) = lock_dep_map.get_mut(dep) {
932 addrs.remove(&(
933 kind.as_u8(),
934 idx,
935 addr.clone(),
936 variant.compression,
937 ));
938 }
939 }
940 }
941 }
942
943 addrs_map.clear();
944
945 *total_size = 0;
946 *total_count = 0;
947
948 if let Err(err) = access.del_key(&self.cache_db, &[0, kind.as_u8(), idx]) {
949 tracing::warn!(%err);
950 }
951 }
952 }
953
954 txn.commit()?;
955
956 drop(lock_dep_map);
957 drop(lock_addr_map);
958
959 Ok(())
960 }
961
962 #[allow(clippy::missing_panics_doc, clippy::cast_precision_loss)]
964 #[instrument(skip(self, cache_kind, config, idx), err)]
965 pub fn clean_cache(
966 &self,
967 cache_kind: &CacheRead,
968 config: &StoredCacheConfig,
969 idx: u8,
970 ) -> Result<(), Box<dyn Error>> {
971 if let StoredCachePolicy::Permanent = config.policy {
972 return Err("'Permanent' cache should never be cleaned up".into());
973 }
974
975 let mut key = BytesMut::new();
976
977 key.put_u8(cache_kind.as_u8());
978 key.put_u8(idx);
979
980 let now = SystemTime::now();
981
982 let min_stored_at = now
983 .checked_sub(Duration::from_secs(config.max_ttl.unwrap_or(600))) .expect("time to work")
985 .duration_since(SystemTime::UNIX_EPOCH)?
986 .as_secs();
987
988 let min_last_hit = now
989 .checked_sub(Duration::from_secs(config.hit_ttl.unwrap_or(300))) .expect("time to work")
991 .duration_since(SystemTime::UNIX_EPOCH)?
992 .as_secs();
993
994 let max_distribution = config.frequency_window.map(|frequency_window| {
995 let (clean_min, clean_max) = config.clean_interval.unwrap_or((60, 60 * 3));
997
998 let avg_clean_interval = (clean_min + clean_max) as f64 / 2.0;
999 frequency_window as f64 / avg_clean_interval
1000 });
1001
1002 let mut addrs_to_remove = vec![];
1003 let mut eviction_queue = BinaryHeap::new();
1004
1005 let mut lock = self.addresses_map.lock();
1006
1007 if let Some((total_size, total_count, addresses)) = lock.get_mut(&(cache_kind.as_u8(), idx))
1008 {
1009 tracing::info!(count = total_count, size = total_size, "before");
1010
1011 for (address, details) in addresses {
1012 for (i, variation) in details.iter_mut().enumerate() {
1013 if variation.stored_at < min_stored_at || variation.last_hit < min_last_hit {
1014 addrs_to_remove.push((address.clone(), variation.compression, i));
1015 } else if let Some(max_distribution) = max_distribution {
1016 if variation.hit_distribution.len() as f64 > max_distribution {
1017 variation.hit_distribution.pop_front();
1018 }
1019
1020 let total_hits: u64 = variation.hit_distribution.iter().sum();
1021
1022 match config.policy {
1023 StoredCachePolicy::FRs(th_eq_threshold, lh_equality_threshold) => {
1024 eviction_queue.push(FRsEvictionCandidate {
1025 total_hits: (
1026 total_hits.cast_signed(),
1027 th_eq_threshold.cast_signed(),
1028 ),
1029 last_hit: (
1030 variation.last_hit.cast_signed(),
1031 lh_equality_threshold.cast_signed(),
1032 ),
1033 size: variation.size,
1034 address: address.clone(),
1035 index: i,
1036 });
1037 }
1038 StoredCachePolicy::Permanent => unreachable!(),
1039 }
1040 }
1041 }
1042 }
1043 }
1044
1045 let txn = WriteTransaction::new(self.env.clone())?;
1046
1047 let mut dep_lock = self.dependency_map.lock();
1048
1049 if let Some((total_size, total_count, addresses)) = lock.get_mut(&(cache_kind.as_u8(), idx))
1050 {
1051 {
1052 let mut access = txn.access();
1053
1054 for (remove_addr, compression, i) in &addrs_to_remove {
1055 key.truncate(2);
1056 key.put(remove_addr.as_bytes());
1057 key.put_u8(*compression);
1058
1059 access.del_key(&self.cache_db, key.as_ref())?;
1060
1061 if let Some(variations) = addresses.get_mut(remove_addr) {
1062 let variation = variations.remove(*i);
1063
1064 for dep in variation.dependencies {
1065 if let Some(dep) = dep_lock.get_mut(&dep) {
1066 dep.remove(&(
1067 cache_kind.as_u8(),
1068 idx,
1069 remove_addr.clone(),
1070 *compression,
1071 ));
1072 }
1073 }
1074
1075 *total_size -= variation.size;
1076 *total_count -= 1;
1077 }
1078 }
1079
1080 tracing::info!(count = total_count, size = total_size, "after");
1081 }
1082 }
1083
1084 drop(dep_lock);
1085 drop(lock);
1086
1087 txn.commit()?;
1088
1089 let mut lock = self.eviction_queue.lock();
1090 *lock = eviction_queue;
1091
1092 drop(lock);
1093
1094 tracing::info!("cleaned");
1095
1096 Ok(())
1097 }
1098
1099 #[allow(clippy::similar_names)]
1101 #[instrument(skip(self, cache_kind, idx), err)]
1102 pub fn sync(&self, cache_kind: &CacheRead, idx: u8) -> Result<(), Box<dyn Error>> {
1103 let mut builder = flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
1104
1105 let lock = self.addresses_map.lock();
1106
1107 if let Some((_total_size, _total_count, addresses)) = lock.get(&(cache_kind.as_u8(), idx)) {
1108 let mut addresses_vec = builder.start_vector();
1109
1110 for (address, variants) in addresses {
1111 let mut address_vec = addresses_vec.start_vector();
1112 address_vec.push(address.as_str());
1113
1114 let mut variants_vec = address_vec.start_vector();
1115
1116 for variant in variants {
1117 let mut variant_vec = variants_vec.start_vector();
1118
1119 variant_vec.push(variant.compression);
1120 variant_vec.push(variant.last_hit);
1121
1122 let mut hit_distribution_vec = variant_vec.start_vector();
1123
1124 for hit_count in &variant.hit_distribution {
1125 hit_distribution_vec.push(*hit_count);
1126 }
1127
1128 hit_distribution_vec.end_vector();
1129 variant_vec.end_vector();
1130 }
1131
1132 variants_vec.end_vector();
1133 address_vec.end_vector();
1134 }
1135
1136 addresses_vec.end_vector();
1137 }
1138
1139 drop(lock);
1140
1141 if let Ok(compressed) = zstd::stream::encode_all(std::io::Cursor::new(builder.view()), 17) {
1142 let txn = WriteTransaction::new(self.env.clone())?;
1143
1144 {
1145 let mut access = txn.access();
1146 access.put(
1147 &self.cache_db,
1148 &[0, cache_kind.as_u8(), idx],
1149 &compressed,
1150 &put::Flags::empty(),
1151 )?;
1152 }
1153
1154 txn.commit()?;
1155
1156 if self.log_size {
1157 tracing::info!(size = %bytesize::ByteSize(compressed.len() as u64).display().si_short(), "synced");
1158 } else {
1159 tracing::info!("synced");
1160 }
1161 }
1162
1163 Ok(())
1164 }
1165}