1use super::EvictionManager;
18use crate::key::CompactCacheKey;
19
20use async_trait::async_trait;
21use lru::LruCache;
22use parking_lot::RwLock;
23use pingora_error::{BError, ErrorType::*, OrErr, Result};
24use rand::Rng;
25use serde::de::SeqAccess;
26use serde::{Deserialize, Serialize};
27use std::collections::hash_map::DefaultHasher;
28use std::fs::File;
29use std::hash::{Hash, Hasher};
30use std::io::prelude::*;
31use std::path::Path;
32use std::sync::atomic::{AtomicUsize, Ordering};
33use std::time::SystemTime;
34
35#[derive(Debug, Deserialize, Serialize)]
36struct Node {
37 key: CompactCacheKey,
38 size: usize,
39}
40
41pub struct Manager {
45 lru: RwLock<LruCache<u64, Node>>,
46 limit: usize,
47 items_watermark: Option<usize>,
48 used: AtomicUsize,
49 items: AtomicUsize,
50 evicted_size: AtomicUsize,
51 evicted_items: AtomicUsize,
52}
53
54impl Manager {
55 pub fn new(limit: usize) -> Self {
57 Manager {
58 lru: RwLock::new(LruCache::unbounded()),
59 limit,
60 items_watermark: None,
61 used: AtomicUsize::new(0),
62 items: AtomicUsize::new(0),
63 evicted_size: AtomicUsize::new(0),
64 evicted_items: AtomicUsize::new(0),
65 }
66 }
67
68 pub fn new_with_watermark(limit: usize, items_watermark: Option<usize>) -> Self {
70 Manager {
71 lru: RwLock::new(LruCache::unbounded()),
72 limit,
73 items_watermark,
74 used: AtomicUsize::new(0),
75 items: AtomicUsize::new(0),
76 evicted_size: AtomicUsize::new(0),
77 evicted_items: AtomicUsize::new(0),
78 }
79 }
80
81 fn insert(&self, hash_key: u64, node: CompactCacheKey, size: usize, reverse: bool) {
82 use std::cmp::Ordering::*;
83 let node = Node { key: node, size };
84 let old = {
85 let mut lru = self.lru.write();
86 let old = lru.push(hash_key, node);
87 if reverse && old.is_none() {
88 lru.demote(&hash_key);
89 }
90 old
91 };
92 if let Some(old) = old {
93 match size.cmp(&old.1.size) {
95 Greater => self.used.fetch_add(size - old.1.size, Ordering::Relaxed),
96 Less => self.used.fetch_sub(old.1.size - size, Ordering::Relaxed),
97 Equal => 0, };
99 } else {
100 self.used.fetch_add(size, Ordering::Relaxed);
101 self.items.fetch_add(1, Ordering::Relaxed);
102 }
103 }
104
105 fn increase_weight(&self, key: u64, delta: usize) {
106 let mut lru = self.lru.write();
107 let Some(node) = lru.get_key_value_mut(&key) else {
108 return;
109 };
110 node.1.size += delta;
111 self.used.fetch_add(delta, Ordering::Relaxed);
112 }
113
114 #[inline]
115 fn over_limits(&self) -> bool {
116 self.used.load(Ordering::Relaxed) > self.limit
117 || self
118 .items_watermark
119 .is_some_and(|w| self.items.load(Ordering::Relaxed) > w)
120 }
121
122 fn evict(&self) -> Vec<CompactCacheKey> {
124 if self.used.load(Ordering::Relaxed) <= self.limit
125 && self
126 .items_watermark
127 .is_none_or(|w| self.items.load(Ordering::Relaxed) <= w)
128 {
129 return vec![];
130 }
131
132 let mut to_evict = Vec::with_capacity(1); while self.over_limits() {
135 if let Some((_, node)) = self.lru.write().pop_lru() {
136 self.used.fetch_sub(node.size, Ordering::Relaxed);
137 self.items.fetch_sub(1, Ordering::Relaxed);
138 self.evicted_size.fetch_add(node.size, Ordering::Relaxed);
139 self.evicted_items.fetch_add(1, Ordering::Relaxed);
140 to_evict.push(node.key);
141 } else {
142 return to_evict;
144 }
145 }
146 to_evict
147 }
148
149 fn serialize(&self) -> Result<Vec<u8>> {
152 use rmp_serde::encode::Serializer;
153 use serde::ser::SerializeSeq;
154 use serde::ser::Serializer as _;
155 let mut ser = Serializer::new(vec![]);
157 let lru = self.lru.read();
159 let mut seq = ser
160 .serialize_seq(Some(lru.len()))
161 .or_err(InternalError, "fail to serialize node")?;
162 for item in lru.iter() {
163 seq.serialize_element(item.1).unwrap(); }
165 seq.end().or_err(InternalError, "when serializing LRU")?;
166 Ok(ser.into_inner())
167 }
168
169 fn deserialize(&self, buf: &[u8]) -> Result<()> {
170 use rmp_serde::decode::Deserializer;
171 use serde::de::Deserializer as _;
172 let mut de = Deserializer::new(buf);
173 let visitor = InsertToManager { lru: self };
174 de.deserialize_seq(visitor)
175 .or_err(InternalError, "when deserializing LRU")?;
176 Ok(())
177 }
178}
179
180struct InsertToManager<'a> {
181 lru: &'a Manager,
182}
183
184impl<'de> serde::de::Visitor<'de> for InsertToManager<'_> {
185 type Value = ();
186
187 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
188 formatter.write_str("array of lru nodes")
189 }
190
191 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
192 where
193 A: SeqAccess<'de>,
194 {
195 while let Some(node) = seq.next_element::<Node>()? {
196 let key = u64key(&node.key);
197 self.lru.insert(key, node.key, node.size, true); }
199 Ok(())
200 }
201}
202
203#[inline]
204fn u64key(key: &CompactCacheKey) -> u64 {
205 let mut hasher = DefaultHasher::new();
206 key.hash(&mut hasher);
207 hasher.finish()
208}
209
210const FILE_NAME: &str = "simple_lru.data";
211
212#[async_trait]
213impl EvictionManager for Manager {
214 fn total_size(&self) -> usize {
215 self.used.load(Ordering::Relaxed)
216 }
217 fn total_items(&self) -> usize {
218 self.items.load(Ordering::Relaxed)
219 }
220 fn evicted_size(&self) -> usize {
221 self.evicted_size.load(Ordering::Relaxed)
222 }
223 fn evicted_items(&self) -> usize {
224 self.evicted_items.load(Ordering::Relaxed)
225 }
226
227 fn admit(
228 &self,
229 item: CompactCacheKey,
230 size: usize,
231 _fresh_until: SystemTime,
232 ) -> Vec<CompactCacheKey> {
233 let key = u64key(&item);
234 self.insert(key, item, size, false);
235 self.evict()
236 }
237
238 fn increment_weight(
239 &self,
240 item: &CompactCacheKey,
241 delta: usize,
242 _max_weight: Option<usize>,
243 ) -> Vec<CompactCacheKey> {
244 let key = u64key(item);
245 self.increase_weight(key, delta);
246 self.evict()
247 }
248
249 fn remove(&self, item: &CompactCacheKey) {
250 let key = u64key(item);
251 let node = self.lru.write().pop(&key);
252 if let Some(n) = node {
253 self.used.fetch_sub(n.size, Ordering::Relaxed);
254 self.items.fetch_sub(1, Ordering::Relaxed);
255 }
256 }
257
258 fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
259 let key = u64key(item);
260 if self.lru.write().get(&key).is_none() {
261 self.insert(key, item.clone(), size, false);
262 false
263 } else {
264 true
265 }
266 }
267
268 fn peek(&self, item: &CompactCacheKey) -> bool {
269 let key = u64key(item);
270 self.lru.read().peek(&key).is_some()
271 }
272
273 async fn save(&self, dir_path: &str) -> Result<()> {
274 let data = self.serialize()?;
275 let dir_str = dir_path.to_owned();
276 tokio::task::spawn_blocking(move || {
277 let dir_path = Path::new(&dir_str);
278 std::fs::create_dir_all(dir_path)
279 .or_err_with(InternalError, || format!("fail to create {dir_str}"))?;
280
281 let final_file_path = dir_path.join(FILE_NAME);
282 let random_suffix: u32 = rand::thread_rng().gen();
284 let temp_file_path = dir_path.join(format!("{}.{:08x}.tmp", FILE_NAME, random_suffix));
285 let mut file = File::create(&temp_file_path).or_err_with(InternalError, || {
286 format!("fail to create temporary file {}", temp_file_path.display())
287 })?;
288 file.write_all(&data).or_err_with(InternalError, || {
289 format!("fail to write to {}", temp_file_path.display())
290 })?;
291 file.flush().or_err_with(InternalError, || {
292 format!("fail to flush temp file {}", temp_file_path.display())
293 })?;
294 std::fs::rename(&temp_file_path, &final_file_path).or_err_with(InternalError, || {
295 format!(
296 "fail to rename temporary file {} to {}",
297 temp_file_path.display(),
298 final_file_path.display()
299 )
300 })
301 })
302 .await
303 .or_err(InternalError, "async blocking IO failure")?
304 }
305
306 async fn load(&self, dir_path: &str) -> Result<()> {
307 let dir_path = dir_path.to_owned();
308 let data = tokio::task::spawn_blocking(move || {
309 let file_path = Path::new(&dir_path).join(FILE_NAME);
310 let mut file = File::open(file_path.clone()).or_err_with(InternalError, || {
311 format!("fail to open {}", file_path.display())
312 })?;
313 let mut buffer = Vec::with_capacity(8192);
314 file.read_to_end(&mut buffer)
315 .or_err(InternalError, "fail to read from {file_path}")?;
316 Ok::<Vec<u8>, BError>(buffer)
317 })
318 .await
319 .or_err(InternalError, "async blocking IO failure")??;
320 self.deserialize(&data)
321 }
322}
323
324#[cfg(test)]
325mod test {
326 use super::*;
327 use crate::CacheKey;
328
329 #[test]
330 fn test_admission() {
331 let lru = Manager::new(4);
332 let key1 = CacheKey::new("", "a", "1").to_compact();
333 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
335 assert_eq!(v.len(), 0);
336 let key2 = CacheKey::new("", "b", "1").to_compact();
337 let v = lru.admit(key2.clone(), 2, until);
338 assert_eq!(v.len(), 0);
339 let key3 = CacheKey::new("", "c", "1").to_compact();
340 let v = lru.admit(key3, 1, until);
341 assert_eq!(v.len(), 0);
342
343 let key4 = CacheKey::new("", "d", "1").to_compact();
346 let v = lru.admit(key4, 2, until);
347 assert_eq!(v.len(), 2);
349 assert_eq!(v[0], key1);
350 assert_eq!(v[1], key2);
351 }
352
353 #[test]
354 fn test_access() {
355 let lru = Manager::new(4);
356 let key1 = CacheKey::new("", "a", "1").to_compact();
357 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
359 assert_eq!(v.len(), 0);
360 let key2 = CacheKey::new("", "b", "1").to_compact();
361 let v = lru.admit(key2.clone(), 2, until);
362 assert_eq!(v.len(), 0);
363 let key3 = CacheKey::new("", "c", "1").to_compact();
364 let v = lru.admit(key3, 1, until);
365 assert_eq!(v.len(), 0);
366
367 lru.access(&key1, 1, until);
370 assert_eq!(v.len(), 0);
371
372 let key4 = CacheKey::new("", "d", "1").to_compact();
373 let v = lru.admit(key4, 2, until);
374 assert_eq!(v.len(), 1);
375 assert_eq!(v[0], key2);
376 }
377
378 #[test]
379 fn test_remove() {
380 let lru = Manager::new(4);
381 let key1 = CacheKey::new("", "a", "1").to_compact();
382 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
384 assert_eq!(v.len(), 0);
385 let key2 = CacheKey::new("", "b", "1").to_compact();
386 let v = lru.admit(key2.clone(), 2, until);
387 assert_eq!(v.len(), 0);
388 let key3 = CacheKey::new("", "c", "1").to_compact();
389 let v = lru.admit(key3, 1, until);
390 assert_eq!(v.len(), 0);
391
392 lru.remove(&key1);
395
396 let key4 = CacheKey::new("", "d", "1").to_compact();
398 let v = lru.admit(key4, 2, until);
399 assert_eq!(v.len(), 1);
400 assert_eq!(v[0], key2);
401 }
402
403 #[test]
404 fn test_access_add() {
405 let lru = Manager::new(4);
406 let until = SystemTime::now(); let key1 = CacheKey::new("", "a", "1").to_compact();
409 lru.access(&key1, 1, until);
410 let key2 = CacheKey::new("", "b", "1").to_compact();
411 lru.access(&key2, 2, until);
412 let key3 = CacheKey::new("", "c", "1").to_compact();
413 lru.access(&key3, 2, until);
414
415 let key4 = CacheKey::new("", "d", "1").to_compact();
416 let v = lru.admit(key4, 2, until);
417 assert_eq!(v.len(), 2);
419 assert_eq!(v[0], key1);
420 assert_eq!(v[1], key2);
421 }
422
423 #[test]
424 fn test_admit_update() {
425 let lru = Manager::new(4);
426 let key1 = CacheKey::new("", "a", "1").to_compact();
427 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
429 assert_eq!(v.len(), 0);
430 let key2 = CacheKey::new("", "b", "1").to_compact();
431 let v = lru.admit(key2.clone(), 2, until);
432 assert_eq!(v.len(), 0);
433 let key3 = CacheKey::new("", "c", "1").to_compact();
434 let v = lru.admit(key3, 1, until);
435 assert_eq!(v.len(), 0);
436
437 let v = lru.admit(key2, 1, until);
440 assert_eq!(v.len(), 0);
441
442 let key4 = CacheKey::new("", "d", "1").to_compact();
444 let v = lru.admit(key4.clone(), 1, until);
445 assert_eq!(v.len(), 0);
446
447 let v = lru.admit(key4, 2, until);
449 assert_eq!(v.len(), 1);
451 assert_eq!(v[0], key1);
452 }
453
454 #[test]
455 fn test_serde() {
456 let lru = Manager::new(4);
457 let key1 = CacheKey::new("", "a", "1").to_compact();
458 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
460 assert_eq!(v.len(), 0);
461 let key2 = CacheKey::new("", "b", "1").to_compact();
462 let v = lru.admit(key2.clone(), 2, until);
463 assert_eq!(v.len(), 0);
464 let key3 = CacheKey::new("", "c", "1").to_compact();
465 let v = lru.admit(key3, 1, until);
466 assert_eq!(v.len(), 0);
467
468 lru.access(&key1, 1, until);
471 assert_eq!(v.len(), 0);
472
473 let ser = lru.serialize().unwrap();
475 let lru2 = Manager::new(4);
476 lru2.deserialize(&ser).unwrap();
477
478 let key4 = CacheKey::new("", "d", "1").to_compact();
479 let v = lru2.admit(key4, 2, until);
480 assert_eq!(v.len(), 1);
481 assert_eq!(v[0], key2);
482 }
483
484 #[tokio::test]
485 async fn test_save_to_disk() {
486 let lru = Manager::new(4);
487 let key1 = CacheKey::new("", "a", "1").to_compact();
488 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
490 assert_eq!(v.len(), 0);
491 let key2 = CacheKey::new("", "b", "1").to_compact();
492 let v = lru.admit(key2.clone(), 2, until);
493 assert_eq!(v.len(), 0);
494 let key3 = CacheKey::new("", "c", "1").to_compact();
495 let v = lru.admit(key3, 1, until);
496 assert_eq!(v.len(), 0);
497
498 lru.access(&key1, 1, until);
501 assert_eq!(v.len(), 0);
502
503 lru.save("/tmp/test_simple_lru_save").await.unwrap();
505 let lru2 = Manager::new(4);
506 lru2.load("/tmp/test_simple_lru_save").await.unwrap();
507
508 let key4 = CacheKey::new("", "d", "1").to_compact();
509 let v = lru2.admit(key4, 2, until);
510 assert_eq!(v.len(), 1);
511 assert_eq!(v[0], key2);
512 }
513
514 #[test]
515 fn test_watermark_eviction() {
516 const SIZE_LIMIT: usize = usize::MAX / 2;
517 let lru = Manager::new_with_watermark(SIZE_LIMIT, Some(4));
518 let until = SystemTime::now();
519
520 for name in ["a", "b", "c", "d", "e", "f"] {
522 let key = CacheKey::new("", name, "1").to_compact();
523 let _ = lru.admit(key, 1, until);
524 }
525
526 assert_eq!(lru.total_items(), 4);
528 assert_eq!(lru.evicted_items(), 2);
529 assert_eq!(lru.evicted_size(), 2);
530 assert!(lru.total_size() <= SIZE_LIMIT);
531 }
532}