1use super::EvictionManager;
18use crate::key::CompactCacheKey;
19
20use async_trait::async_trait;
21use lru::LruCache;
22use parking_lot::RwLock;
23use pingora_error::{BError, ErrorType::*, OrErr, Result};
24use rand::Rng;
25use serde::de::SeqAccess;
26use serde::{Deserialize, Serialize};
27use std::collections::hash_map::DefaultHasher;
28use std::fs::File;
29use std::hash::{Hash, Hasher};
30use std::io::prelude::*;
31use std::path::Path;
32use std::sync::atomic::{AtomicUsize, Ordering};
33use std::time::SystemTime;
34
35#[derive(Debug, Deserialize, Serialize)]
36struct Node {
37 key: CompactCacheKey,
38 size: usize,
39}
40
41pub struct Manager {
45 lru: RwLock<LruCache<u64, Node>>,
46 limit: usize,
47 used: AtomicUsize,
48 items: AtomicUsize,
49 evicted_size: AtomicUsize,
50 evicted_items: AtomicUsize,
51}
52
53impl Manager {
54 pub fn new(limit: usize) -> Self {
56 Manager {
57 lru: RwLock::new(LruCache::unbounded()),
58 limit,
59 used: AtomicUsize::new(0),
60 items: AtomicUsize::new(0),
61 evicted_size: AtomicUsize::new(0),
62 evicted_items: AtomicUsize::new(0),
63 }
64 }
65
66 fn insert(&self, hash_key: u64, node: CompactCacheKey, size: usize, reverse: bool) {
67 use std::cmp::Ordering::*;
68 let node = Node { key: node, size };
69 let old = {
70 let mut lru = self.lru.write();
71 let old = lru.push(hash_key, node);
72 if reverse && old.is_none() {
73 lru.demote(&hash_key);
74 }
75 old
76 };
77 if let Some(old) = old {
78 match size.cmp(&old.1.size) {
80 Greater => self.used.fetch_add(size - old.1.size, Ordering::Relaxed),
81 Less => self.used.fetch_sub(old.1.size - size, Ordering::Relaxed),
82 Equal => 0, };
84 } else {
85 self.used.fetch_add(size, Ordering::Relaxed);
86 self.items.fetch_add(1, Ordering::Relaxed);
87 }
88 }
89
90 fn increase_weight(&self, key: u64, delta: usize) {
91 let mut lru = self.lru.write();
92 let Some(node) = lru.get_key_value_mut(&key) else {
93 return;
94 };
95 node.1.size += delta;
96 self.used.fetch_add(delta, Ordering::Relaxed);
97 }
98
99 fn evict(&self) -> Vec<CompactCacheKey> {
101 if self.used.load(Ordering::Relaxed) <= self.limit {
102 return vec![];
103 }
104 let mut to_evict = Vec::with_capacity(1); while self.used.load(Ordering::Relaxed) > self.limit {
106 if let Some((_, node)) = self.lru.write().pop_lru() {
107 self.used.fetch_sub(node.size, Ordering::Relaxed);
108 self.items.fetch_sub(1, Ordering::Relaxed);
109 self.evicted_size.fetch_add(node.size, Ordering::Relaxed);
110 self.evicted_items.fetch_add(1, Ordering::Relaxed);
111 to_evict.push(node.key);
112 } else {
113 return to_evict;
115 }
116 }
117 to_evict
118 }
119
120 fn serialize(&self) -> Result<Vec<u8>> {
123 use rmp_serde::encode::Serializer;
124 use serde::ser::SerializeSeq;
125 use serde::ser::Serializer as _;
126 let mut ser = Serializer::new(vec![]);
128 let lru = self.lru.read();
130 let mut seq = ser
131 .serialize_seq(Some(lru.len()))
132 .or_err(InternalError, "fail to serialize node")?;
133 for item in lru.iter() {
134 seq.serialize_element(item.1).unwrap(); }
136 seq.end().or_err(InternalError, "when serializing LRU")?;
137 Ok(ser.into_inner())
138 }
139
140 fn deserialize(&self, buf: &[u8]) -> Result<()> {
141 use rmp_serde::decode::Deserializer;
142 use serde::de::Deserializer as _;
143 let mut de = Deserializer::new(buf);
144 let visitor = InsertToManager { lru: self };
145 de.deserialize_seq(visitor)
146 .or_err(InternalError, "when deserializing LRU")?;
147 Ok(())
148 }
149}
150
151struct InsertToManager<'a> {
152 lru: &'a Manager,
153}
154
155impl<'de> serde::de::Visitor<'de> for InsertToManager<'_> {
156 type Value = ();
157
158 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
159 formatter.write_str("array of lru nodes")
160 }
161
162 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
163 where
164 A: SeqAccess<'de>,
165 {
166 while let Some(node) = seq.next_element::<Node>()? {
167 let key = u64key(&node.key);
168 self.lru.insert(key, node.key, node.size, true); }
170 Ok(())
171 }
172}
173
174#[inline]
175fn u64key(key: &CompactCacheKey) -> u64 {
176 let mut hasher = DefaultHasher::new();
177 key.hash(&mut hasher);
178 hasher.finish()
179}
180
181const FILE_NAME: &str = "simple_lru.data";
182
183#[async_trait]
184impl EvictionManager for Manager {
185 fn total_size(&self) -> usize {
186 self.used.load(Ordering::Relaxed)
187 }
188 fn total_items(&self) -> usize {
189 self.items.load(Ordering::Relaxed)
190 }
191 fn evicted_size(&self) -> usize {
192 self.evicted_size.load(Ordering::Relaxed)
193 }
194 fn evicted_items(&self) -> usize {
195 self.evicted_items.load(Ordering::Relaxed)
196 }
197
198 fn admit(
199 &self,
200 item: CompactCacheKey,
201 size: usize,
202 _fresh_until: SystemTime,
203 ) -> Vec<CompactCacheKey> {
204 let key = u64key(&item);
205 self.insert(key, item, size, false);
206 self.evict()
207 }
208
209 fn increment_weight(&self, item: CompactCacheKey, delta: usize) -> Vec<CompactCacheKey> {
210 let key = u64key(&item);
211 self.increase_weight(key, delta);
212 self.evict()
213 }
214
215 fn remove(&self, item: &CompactCacheKey) {
216 let key = u64key(item);
217 let node = self.lru.write().pop(&key);
218 if let Some(n) = node {
219 self.used.fetch_sub(n.size, Ordering::Relaxed);
220 self.items.fetch_sub(1, Ordering::Relaxed);
221 }
222 }
223
224 fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
225 let key = u64key(item);
226 if self.lru.write().get(&key).is_none() {
227 self.insert(key, item.clone(), size, false);
228 false
229 } else {
230 true
231 }
232 }
233
234 fn peek(&self, item: &CompactCacheKey) -> bool {
235 let key = u64key(item);
236 self.lru.read().peek(&key).is_some()
237 }
238
239 async fn save(&self, dir_path: &str) -> Result<()> {
240 let data = self.serialize()?;
241 let dir_str = dir_path.to_owned();
242 tokio::task::spawn_blocking(move || {
243 let dir_path = Path::new(&dir_str);
244 std::fs::create_dir_all(dir_path)
245 .or_err_with(InternalError, || format!("fail to create {dir_str}"))?;
246
247 let final_file_path = dir_path.join(FILE_NAME);
248 let random_suffix: u32 = rand::thread_rng().gen();
250 let temp_file_path = dir_path.join(format!("{}.{:08x}.tmp", FILE_NAME, random_suffix));
251 let mut file = File::create(&temp_file_path).or_err_with(InternalError, || {
252 format!("fail to create temporary file {}", temp_file_path.display())
253 })?;
254 file.write_all(&data).or_err_with(InternalError, || {
255 format!("fail to write to {}", temp_file_path.display())
256 })?;
257 file.flush().or_err_with(InternalError, || {
258 format!("fail to flush temp file {}", temp_file_path.display())
259 })?;
260 std::fs::rename(&temp_file_path, &final_file_path).or_err_with(InternalError, || {
261 format!(
262 "fail to rename temporary file {} to {}",
263 temp_file_path.display(),
264 final_file_path.display()
265 )
266 })
267 })
268 .await
269 .or_err(InternalError, "async blocking IO failure")?
270 }
271
272 async fn load(&self, dir_path: &str) -> Result<()> {
273 let dir_path = dir_path.to_owned();
274 let data = tokio::task::spawn_blocking(move || {
275 let file_path = Path::new(&dir_path).join(FILE_NAME);
276 let mut file = File::open(file_path.clone()).or_err_with(InternalError, || {
277 format!("fail to open {}", file_path.display())
278 })?;
279 let mut buffer = Vec::with_capacity(8192);
280 file.read_to_end(&mut buffer)
281 .or_err(InternalError, "fail to read from {file_path}")?;
282 Ok::<Vec<u8>, BError>(buffer)
283 })
284 .await
285 .or_err(InternalError, "async blocking IO failure")??;
286 self.deserialize(&data)
287 }
288}
289
290#[cfg(test)]
291mod test {
292 use super::*;
293 use crate::CacheKey;
294
295 #[test]
296 fn test_admission() {
297 let lru = Manager::new(4);
298 let key1 = CacheKey::new("", "a", "1").to_compact();
299 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
301 assert_eq!(v.len(), 0);
302 let key2 = CacheKey::new("", "b", "1").to_compact();
303 let v = lru.admit(key2.clone(), 2, until);
304 assert_eq!(v.len(), 0);
305 let key3 = CacheKey::new("", "c", "1").to_compact();
306 let v = lru.admit(key3, 1, until);
307 assert_eq!(v.len(), 0);
308
309 let key4 = CacheKey::new("", "d", "1").to_compact();
312 let v = lru.admit(key4, 2, until);
313 assert_eq!(v.len(), 2);
315 assert_eq!(v[0], key1);
316 assert_eq!(v[1], key2);
317 }
318
319 #[test]
320 fn test_access() {
321 let lru = Manager::new(4);
322 let key1 = CacheKey::new("", "a", "1").to_compact();
323 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
325 assert_eq!(v.len(), 0);
326 let key2 = CacheKey::new("", "b", "1").to_compact();
327 let v = lru.admit(key2.clone(), 2, until);
328 assert_eq!(v.len(), 0);
329 let key3 = CacheKey::new("", "c", "1").to_compact();
330 let v = lru.admit(key3, 1, until);
331 assert_eq!(v.len(), 0);
332
333 lru.access(&key1, 1, until);
336 assert_eq!(v.len(), 0);
337
338 let key4 = CacheKey::new("", "d", "1").to_compact();
339 let v = lru.admit(key4, 2, until);
340 assert_eq!(v.len(), 1);
341 assert_eq!(v[0], key2);
342 }
343
344 #[test]
345 fn test_remove() {
346 let lru = Manager::new(4);
347 let key1 = CacheKey::new("", "a", "1").to_compact();
348 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
350 assert_eq!(v.len(), 0);
351 let key2 = CacheKey::new("", "b", "1").to_compact();
352 let v = lru.admit(key2.clone(), 2, until);
353 assert_eq!(v.len(), 0);
354 let key3 = CacheKey::new("", "c", "1").to_compact();
355 let v = lru.admit(key3, 1, until);
356 assert_eq!(v.len(), 0);
357
358 lru.remove(&key1);
361
362 let key4 = CacheKey::new("", "d", "1").to_compact();
364 let v = lru.admit(key4, 2, until);
365 assert_eq!(v.len(), 1);
366 assert_eq!(v[0], key2);
367 }
368
369 #[test]
370 fn test_access_add() {
371 let lru = Manager::new(4);
372 let until = SystemTime::now(); let key1 = CacheKey::new("", "a", "1").to_compact();
375 lru.access(&key1, 1, until);
376 let key2 = CacheKey::new("", "b", "1").to_compact();
377 lru.access(&key2, 2, until);
378 let key3 = CacheKey::new("", "c", "1").to_compact();
379 lru.access(&key3, 2, until);
380
381 let key4 = CacheKey::new("", "d", "1").to_compact();
382 let v = lru.admit(key4, 2, until);
383 assert_eq!(v.len(), 2);
385 assert_eq!(v[0], key1);
386 assert_eq!(v[1], key2);
387 }
388
389 #[test]
390 fn test_admit_update() {
391 let lru = Manager::new(4);
392 let key1 = CacheKey::new("", "a", "1").to_compact();
393 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
395 assert_eq!(v.len(), 0);
396 let key2 = CacheKey::new("", "b", "1").to_compact();
397 let v = lru.admit(key2.clone(), 2, until);
398 assert_eq!(v.len(), 0);
399 let key3 = CacheKey::new("", "c", "1").to_compact();
400 let v = lru.admit(key3, 1, until);
401 assert_eq!(v.len(), 0);
402
403 let v = lru.admit(key2, 1, until);
406 assert_eq!(v.len(), 0);
407
408 let key4 = CacheKey::new("", "d", "1").to_compact();
410 let v = lru.admit(key4.clone(), 1, until);
411 assert_eq!(v.len(), 0);
412
413 let v = lru.admit(key4, 2, until);
415 assert_eq!(v.len(), 1);
417 assert_eq!(v[0], key1);
418 }
419
420 #[test]
421 fn test_serde() {
422 let lru = Manager::new(4);
423 let key1 = CacheKey::new("", "a", "1").to_compact();
424 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
426 assert_eq!(v.len(), 0);
427 let key2 = CacheKey::new("", "b", "1").to_compact();
428 let v = lru.admit(key2.clone(), 2, until);
429 assert_eq!(v.len(), 0);
430 let key3 = CacheKey::new("", "c", "1").to_compact();
431 let v = lru.admit(key3, 1, until);
432 assert_eq!(v.len(), 0);
433
434 lru.access(&key1, 1, until);
437 assert_eq!(v.len(), 0);
438
439 let ser = lru.serialize().unwrap();
441 let lru2 = Manager::new(4);
442 lru2.deserialize(&ser).unwrap();
443
444 let key4 = CacheKey::new("", "d", "1").to_compact();
445 let v = lru2.admit(key4, 2, until);
446 assert_eq!(v.len(), 1);
447 assert_eq!(v[0], key2);
448 }
449
450 #[tokio::test]
451 async fn test_save_to_disk() {
452 let lru = Manager::new(4);
453 let key1 = CacheKey::new("", "a", "1").to_compact();
454 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
456 assert_eq!(v.len(), 0);
457 let key2 = CacheKey::new("", "b", "1").to_compact();
458 let v = lru.admit(key2.clone(), 2, until);
459 assert_eq!(v.len(), 0);
460 let key3 = CacheKey::new("", "c", "1").to_compact();
461 let v = lru.admit(key3, 1, until);
462 assert_eq!(v.len(), 0);
463
464 lru.access(&key1, 1, until);
467 assert_eq!(v.len(), 0);
468
469 lru.save("/tmp/test_simple_lru_save").await.unwrap();
471 let lru2 = Manager::new(4);
472 lru2.load("/tmp/test_simple_lru_save").await.unwrap();
473
474 let key4 = CacheKey::new("", "d", "1").to_compact();
475 let v = lru2.admit(key4, 2, until);
476 assert_eq!(v.len(), 1);
477 assert_eq!(v[0], key2);
478 }
479}