1use super::EvictionManager;
18use crate::key::CompactCacheKey;
19
20use async_trait::async_trait;
21use lru::LruCache;
22use parking_lot::RwLock;
23use pingora_error::{BError, ErrorType::*, OrErr, Result};
24use serde::de::SeqAccess;
25use serde::{Deserialize, Serialize};
26use std::collections::hash_map::DefaultHasher;
27use std::fs::File;
28use std::hash::{Hash, Hasher};
29use std::io::prelude::*;
30use std::path::Path;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::time::SystemTime;
33
34#[derive(Debug, Deserialize, Serialize)]
35struct Node {
36 key: CompactCacheKey,
37 size: usize,
38}
39
40pub struct Manager {
44 lru: RwLock<LruCache<u64, Node>>,
45 limit: usize,
46 used: AtomicUsize,
47 items: AtomicUsize,
48 evicted_size: AtomicUsize,
49 evicted_items: AtomicUsize,
50}
51
52impl Manager {
53 pub fn new(limit: usize) -> Self {
55 Manager {
56 lru: RwLock::new(LruCache::unbounded()),
57 limit,
58 used: AtomicUsize::new(0),
59 items: AtomicUsize::new(0),
60 evicted_size: AtomicUsize::new(0),
61 evicted_items: AtomicUsize::new(0),
62 }
63 }
64
65 fn insert(&self, hash_key: u64, node: CompactCacheKey, size: usize, reverse: bool) {
66 use std::cmp::Ordering::*;
67 let node = Node { key: node, size };
68 let old = {
69 let mut lru = self.lru.write();
70 let old = lru.push(hash_key, node);
71 if reverse && old.is_none() {
72 lru.demote(&hash_key);
73 }
74 old
75 };
76 if let Some(old) = old {
77 match size.cmp(&old.1.size) {
79 Greater => self.used.fetch_add(size - old.1.size, Ordering::Relaxed),
80 Less => self.used.fetch_sub(old.1.size - size, Ordering::Relaxed),
81 Equal => 0, };
83 } else {
84 self.used.fetch_add(size, Ordering::Relaxed);
85 self.items.fetch_add(1, Ordering::Relaxed);
86 }
87 }
88
89 fn increase_weight(&self, key: u64, delta: usize) {
90 let mut lru = self.lru.write();
91 let Some(node) = lru.get_key_value_mut(&key) else {
92 return;
93 };
94 node.1.size += delta;
95 self.used.fetch_add(delta, Ordering::Relaxed);
96 }
97
98 fn evict(&self) -> Vec<CompactCacheKey> {
100 if self.used.load(Ordering::Relaxed) <= self.limit {
101 return vec![];
102 }
103 let mut to_evict = Vec::with_capacity(1); while self.used.load(Ordering::Relaxed) > self.limit {
105 if let Some((_, node)) = self.lru.write().pop_lru() {
106 self.used.fetch_sub(node.size, Ordering::Relaxed);
107 self.items.fetch_sub(1, Ordering::Relaxed);
108 self.evicted_size.fetch_add(node.size, Ordering::Relaxed);
109 self.evicted_items.fetch_add(1, Ordering::Relaxed);
110 to_evict.push(node.key);
111 } else {
112 return to_evict;
114 }
115 }
116 to_evict
117 }
118
119 fn serialize(&self) -> Result<Vec<u8>> {
122 use rmp_serde::encode::Serializer;
123 use serde::ser::SerializeSeq;
124 use serde::ser::Serializer as _;
125 let mut ser = Serializer::new(vec![]);
127 let lru = self.lru.read();
129 let mut seq = ser
130 .serialize_seq(Some(lru.len()))
131 .or_err(InternalError, "fail to serialize node")?;
132 for item in lru.iter() {
133 seq.serialize_element(item.1).unwrap(); }
135 seq.end().or_err(InternalError, "when serializing LRU")?;
136 Ok(ser.into_inner())
137 }
138
139 fn deserialize(&self, buf: &[u8]) -> Result<()> {
140 use rmp_serde::decode::Deserializer;
141 use serde::de::Deserializer as _;
142 let mut de = Deserializer::new(buf);
143 let visitor = InsertToManager { lru: self };
144 de.deserialize_seq(visitor)
145 .or_err(InternalError, "when deserializing LRU")?;
146 Ok(())
147 }
148}
149
150struct InsertToManager<'a> {
151 lru: &'a Manager,
152}
153
154impl<'de> serde::de::Visitor<'de> for InsertToManager<'_> {
155 type Value = ();
156
157 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
158 formatter.write_str("array of lru nodes")
159 }
160
161 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
162 where
163 A: SeqAccess<'de>,
164 {
165 while let Some(node) = seq.next_element::<Node>()? {
166 let key = u64key(&node.key);
167 self.lru.insert(key, node.key, node.size, true); }
169 Ok(())
170 }
171}
172
173#[inline]
174fn u64key(key: &CompactCacheKey) -> u64 {
175 let mut hasher = DefaultHasher::new();
176 key.hash(&mut hasher);
177 hasher.finish()
178}
179
180const FILE_NAME: &str = "simple_lru.data";
181
182#[async_trait]
183impl EvictionManager for Manager {
184 fn total_size(&self) -> usize {
185 self.used.load(Ordering::Relaxed)
186 }
187 fn total_items(&self) -> usize {
188 self.items.load(Ordering::Relaxed)
189 }
190 fn evicted_size(&self) -> usize {
191 self.evicted_size.load(Ordering::Relaxed)
192 }
193 fn evicted_items(&self) -> usize {
194 self.evicted_items.load(Ordering::Relaxed)
195 }
196
197 fn admit(
198 &self,
199 item: CompactCacheKey,
200 size: usize,
201 _fresh_until: SystemTime,
202 ) -> Vec<CompactCacheKey> {
203 let key = u64key(&item);
204 self.insert(key, item, size, false);
205 self.evict()
206 }
207
208 fn increment_weight(&self, item: CompactCacheKey, delta: usize) -> Vec<CompactCacheKey> {
209 let key = u64key(&item);
210 self.increase_weight(key, delta);
211 self.evict()
212 }
213
214 fn remove(&self, item: &CompactCacheKey) {
215 let key = u64key(item);
216 let node = self.lru.write().pop(&key);
217 if let Some(n) = node {
218 self.used.fetch_sub(n.size, Ordering::Relaxed);
219 self.items.fetch_sub(1, Ordering::Relaxed);
220 }
221 }
222
223 fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
224 let key = u64key(item);
225 if self.lru.write().get(&key).is_none() {
226 self.insert(key, item.clone(), size, false);
227 false
228 } else {
229 true
230 }
231 }
232
233 fn peek(&self, item: &CompactCacheKey) -> bool {
234 let key = u64key(item);
235 self.lru.read().peek(&key).is_some()
236 }
237
238 async fn save(&self, dir_path: &str) -> Result<()> {
239 let data = self.serialize()?;
240 let dir_str = dir_path.to_owned();
241 tokio::task::spawn_blocking(move || {
242 let dir_path = Path::new(&dir_str);
243 std::fs::create_dir_all(dir_path)
244 .or_err_with(InternalError, || format!("fail to create {dir_str}"))?;
245 let file_path = dir_path.join(FILE_NAME);
246 let mut file = File::create(&file_path).or_err_with(InternalError, || {
247 format!("fail to create {}", file_path.display())
248 })?;
249 file.write_all(&data).or_err_with(InternalError, || {
250 format!("fail to write to {}", file_path.display())
251 })
252 })
253 .await
254 .or_err(InternalError, "async blocking IO failure")?
255 }
256
257 async fn load(&self, dir_path: &str) -> Result<()> {
258 let dir_path = dir_path.to_owned();
259 let data = tokio::task::spawn_blocking(move || {
260 let file_path = Path::new(&dir_path).join(FILE_NAME);
261 let mut file = File::open(file_path.clone()).or_err_with(InternalError, || {
262 format!("fail to open {}", file_path.display())
263 })?;
264 let mut buffer = Vec::with_capacity(8192);
265 file.read_to_end(&mut buffer)
266 .or_err(InternalError, "fail to read from {file_path}")?;
267 Ok::<Vec<u8>, BError>(buffer)
268 })
269 .await
270 .or_err(InternalError, "async blocking IO failure")??;
271 self.deserialize(&data)
272 }
273}
274
275#[cfg(test)]
276mod test {
277 use super::*;
278 use crate::CacheKey;
279
280 #[test]
281 fn test_admission() {
282 let lru = Manager::new(4);
283 let key1 = CacheKey::new("", "a", "1").to_compact();
284 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
286 assert_eq!(v.len(), 0);
287 let key2 = CacheKey::new("", "b", "1").to_compact();
288 let v = lru.admit(key2.clone(), 2, until);
289 assert_eq!(v.len(), 0);
290 let key3 = CacheKey::new("", "c", "1").to_compact();
291 let v = lru.admit(key3, 1, until);
292 assert_eq!(v.len(), 0);
293
294 let key4 = CacheKey::new("", "d", "1").to_compact();
297 let v = lru.admit(key4, 2, until);
298 assert_eq!(v.len(), 2);
300 assert_eq!(v[0], key1);
301 assert_eq!(v[1], key2);
302 }
303
304 #[test]
305 fn test_access() {
306 let lru = Manager::new(4);
307 let key1 = CacheKey::new("", "a", "1").to_compact();
308 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
310 assert_eq!(v.len(), 0);
311 let key2 = CacheKey::new("", "b", "1").to_compact();
312 let v = lru.admit(key2.clone(), 2, until);
313 assert_eq!(v.len(), 0);
314 let key3 = CacheKey::new("", "c", "1").to_compact();
315 let v = lru.admit(key3, 1, until);
316 assert_eq!(v.len(), 0);
317
318 lru.access(&key1, 1, until);
321 assert_eq!(v.len(), 0);
322
323 let key4 = CacheKey::new("", "d", "1").to_compact();
324 let v = lru.admit(key4, 2, until);
325 assert_eq!(v.len(), 1);
326 assert_eq!(v[0], key2);
327 }
328
329 #[test]
330 fn test_remove() {
331 let lru = Manager::new(4);
332 let key1 = CacheKey::new("", "a", "1").to_compact();
333 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
335 assert_eq!(v.len(), 0);
336 let key2 = CacheKey::new("", "b", "1").to_compact();
337 let v = lru.admit(key2.clone(), 2, until);
338 assert_eq!(v.len(), 0);
339 let key3 = CacheKey::new("", "c", "1").to_compact();
340 let v = lru.admit(key3, 1, until);
341 assert_eq!(v.len(), 0);
342
343 lru.remove(&key1);
346
347 let key4 = CacheKey::new("", "d", "1").to_compact();
349 let v = lru.admit(key4, 2, until);
350 assert_eq!(v.len(), 1);
351 assert_eq!(v[0], key2);
352 }
353
354 #[test]
355 fn test_access_add() {
356 let lru = Manager::new(4);
357 let until = SystemTime::now(); let key1 = CacheKey::new("", "a", "1").to_compact();
360 lru.access(&key1, 1, until);
361 let key2 = CacheKey::new("", "b", "1").to_compact();
362 lru.access(&key2, 2, until);
363 let key3 = CacheKey::new("", "c", "1").to_compact();
364 lru.access(&key3, 2, until);
365
366 let key4 = CacheKey::new("", "d", "1").to_compact();
367 let v = lru.admit(key4, 2, until);
368 assert_eq!(v.len(), 2);
370 assert_eq!(v[0], key1);
371 assert_eq!(v[1], key2);
372 }
373
374 #[test]
375 fn test_admit_update() {
376 let lru = Manager::new(4);
377 let key1 = CacheKey::new("", "a", "1").to_compact();
378 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
380 assert_eq!(v.len(), 0);
381 let key2 = CacheKey::new("", "b", "1").to_compact();
382 let v = lru.admit(key2.clone(), 2, until);
383 assert_eq!(v.len(), 0);
384 let key3 = CacheKey::new("", "c", "1").to_compact();
385 let v = lru.admit(key3, 1, until);
386 assert_eq!(v.len(), 0);
387
388 let v = lru.admit(key2, 1, until);
391 assert_eq!(v.len(), 0);
392
393 let key4 = CacheKey::new("", "d", "1").to_compact();
395 let v = lru.admit(key4.clone(), 1, until);
396 assert_eq!(v.len(), 0);
397
398 let v = lru.admit(key4, 2, until);
400 assert_eq!(v.len(), 1);
402 assert_eq!(v[0], key1);
403 }
404
405 #[test]
406 fn test_serde() {
407 let lru = Manager::new(4);
408 let key1 = CacheKey::new("", "a", "1").to_compact();
409 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
411 assert_eq!(v.len(), 0);
412 let key2 = CacheKey::new("", "b", "1").to_compact();
413 let v = lru.admit(key2.clone(), 2, until);
414 assert_eq!(v.len(), 0);
415 let key3 = CacheKey::new("", "c", "1").to_compact();
416 let v = lru.admit(key3, 1, until);
417 assert_eq!(v.len(), 0);
418
419 lru.access(&key1, 1, until);
422 assert_eq!(v.len(), 0);
423
424 let ser = lru.serialize().unwrap();
426 let lru2 = Manager::new(4);
427 lru2.deserialize(&ser).unwrap();
428
429 let key4 = CacheKey::new("", "d", "1").to_compact();
430 let v = lru2.admit(key4, 2, until);
431 assert_eq!(v.len(), 1);
432 assert_eq!(v[0], key2);
433 }
434
435 #[tokio::test]
436 async fn test_save_to_disk() {
437 let lru = Manager::new(4);
438 let key1 = CacheKey::new("", "a", "1").to_compact();
439 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
441 assert_eq!(v.len(), 0);
442 let key2 = CacheKey::new("", "b", "1").to_compact();
443 let v = lru.admit(key2.clone(), 2, until);
444 assert_eq!(v.len(), 0);
445 let key3 = CacheKey::new("", "c", "1").to_compact();
446 let v = lru.admit(key3, 1, until);
447 assert_eq!(v.len(), 0);
448
449 lru.access(&key1, 1, until);
452 assert_eq!(v.len(), 0);
453
454 lru.save("/tmp/test_simple_lru_save").await.unwrap();
456 let lru2 = Manager::new(4);
457 lru2.load("/tmp/test_simple_lru_save").await.unwrap();
458
459 let key4 = CacheKey::new("", "d", "1").to_compact();
460 let v = lru2.admit(key4, 2, until);
461 assert_eq!(v.len(), 1);
462 assert_eq!(v[0], key2);
463 }
464}