1use super::EvictionManager;
18use crate::key::CompactCacheKey;
19
20use async_trait::async_trait;
21use pingora_error::{BError, ErrorType::*, OrErr, Result};
22use pingora_lru::Lru;
23use serde::de::SeqAccess;
24use serde::{Deserialize, Serialize};
25use std::fs::File;
26use std::hash::{Hash, Hasher};
27use std::io::prelude::*;
28use std::path::Path;
29use std::time::SystemTime;
30
31pub struct Manager<const N: usize>(Lru<CompactCacheKey, N>);
39
40#[derive(Debug, Serialize, Deserialize)]
41struct SerdeHelperNode(CompactCacheKey, usize);
42
43impl<const N: usize> Manager<N> {
44 pub fn with_capacity(limit: usize, capacity: usize) -> Self {
48 Manager(Lru::with_capacity(limit, capacity))
49 }
50
51 pub fn serialize_shard(&self, shard: usize) -> Result<Vec<u8>> {
53 use rmp_serde::encode::Serializer;
54 use serde::ser::SerializeSeq;
55 use serde::ser::Serializer as _;
56
57 assert!(shard < N);
58
59 let mut nodes = Vec::with_capacity(self.0.shard_len(shard));
62 self.0.iter_for_each(shard, |(node, size)| {
63 nodes.push(SerdeHelperNode(node.clone(), size));
64 });
65 let mut ser = Serializer::new(vec![]);
66 let mut seq = ser
67 .serialize_seq(Some(self.0.shard_len(shard)))
68 .or_err(InternalError, "fail to serialize node")?;
69 for node in nodes {
70 seq.serialize_element(&node).unwrap(); }
72
73 seq.end().or_err(InternalError, "when serializing LRU")?;
74 Ok(ser.into_inner())
75 }
76
77 pub fn deserialize_shard(&self, buf: &[u8]) -> Result<()> {
81 use rmp_serde::decode::Deserializer;
82 use serde::de::Deserializer as _;
83
84 let mut de = Deserializer::new(buf);
85 let visitor = InsertToManager { lru: self };
86 de.deserialize_seq(visitor)
87 .or_err(InternalError, "when deserializing LRU")?;
88 Ok(())
89 }
90}
91
92struct InsertToManager<'a, const N: usize> {
93 lru: &'a Manager<N>,
94}
95
96impl<'de, const N: usize> serde::de::Visitor<'de> for InsertToManager<'_, N> {
97 type Value = ();
98
99 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
100 formatter.write_str("array of lru nodes")
101 }
102
103 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
104 where
105 A: SeqAccess<'de>,
106 {
107 while let Some(node) = seq.next_element::<SerdeHelperNode>()? {
108 let key = u64key(&node.0);
109 self.lru.0.insert_tail(key, node.0, node.1); }
111 Ok(())
112 }
113}
114
115#[inline]
116fn u64key(key: &CompactCacheKey) -> u64 {
117 let mut hasher = ahash::AHasher::default();
119 key.hash(&mut hasher);
120 hasher.finish()
121}
122
123const FILE_NAME: &str = "lru.data";
124
125#[inline]
126fn err_str_path(s: &str, path: &Path) -> String {
127 format!("{s} {}", path.display())
128}
129
130#[async_trait]
131impl<const N: usize> EvictionManager for Manager<N> {
132 fn total_size(&self) -> usize {
133 self.0.weight()
134 }
135 fn total_items(&self) -> usize {
136 self.0.len()
137 }
138 fn evicted_size(&self) -> usize {
139 self.0.evicted_weight()
140 }
141 fn evicted_items(&self) -> usize {
142 self.0.evicted_len()
143 }
144
145 fn admit(
146 &self,
147 item: CompactCacheKey,
148 size: usize,
149 _fresh_until: SystemTime,
150 ) -> Vec<CompactCacheKey> {
151 let key = u64key(&item);
152 self.0.admit(key, item, size);
153 self.0
154 .evict_to_limit()
155 .into_iter()
156 .map(|(key, _weight)| key)
157 .collect()
158 }
159
160 fn increment_weight(&self, item: CompactCacheKey, delta: usize) -> Vec<CompactCacheKey> {
161 let key = u64key(&item);
162 self.0.increment_weight(key, delta);
163 self.0
164 .evict_to_limit()
165 .into_iter()
166 .map(|(key, _weight)| key)
167 .collect()
168 }
169
170 fn remove(&self, item: &CompactCacheKey) {
171 let key = u64key(item);
172 self.0.remove(key);
173 }
174
175 fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
176 let key = u64key(item);
177 if !self.0.promote(key) {
178 self.0.admit(key, item.clone(), size);
179 false
180 } else {
181 true
182 }
183 }
184
185 fn peek(&self, item: &CompactCacheKey) -> bool {
186 let key = u64key(item);
187 self.0.peek(key)
188 }
189
190 async fn save(&self, dir_path: &str) -> Result<()> {
191 let dir_path_str = dir_path.to_owned();
192
193 tokio::task::spawn_blocking(move || {
194 let dir_path = Path::new(&dir_path_str);
195 std::fs::create_dir_all(dir_path)
196 .or_err_with(InternalError, || err_str_path("fail to create", dir_path))
197 })
198 .await
199 .or_err(InternalError, "async blocking IO failure")??;
200
201 for i in 0..N {
202 let data = self.serialize_shard(i)?;
203 let dir_path = dir_path.to_owned();
204 tokio::task::spawn_blocking(move || {
205 let file_path = Path::new(&dir_path).join(format!("{}.{i}", FILE_NAME));
206 let mut file = File::create(&file_path)
207 .or_err_with(InternalError, || err_str_path("fail to create", &file_path))?;
208 file.write_all(&data).or_err_with(InternalError, || {
209 err_str_path("fail to write to", &file_path)
210 })
211 })
212 .await
213 .or_err(InternalError, "async blocking IO failure")??;
214 }
215 Ok(())
216 }
217
218 async fn load(&self, dir_path: &str) -> Result<()> {
219 for i in 0..N {
221 let dir_path = dir_path.to_owned();
222
223 let data = tokio::task::spawn_blocking(move || {
224 let file_path = Path::new(&dir_path).join(format!("{}.{i}", FILE_NAME));
225 let mut file = File::open(&file_path)
226 .or_err_with(InternalError, || err_str_path("fail to open", &file_path))?;
227 let mut buffer = Vec::with_capacity(8192);
228 file.read_to_end(&mut buffer)
229 .or_err_with(InternalError, || {
230 err_str_path("fail to read from", &file_path)
231 })?;
232 Ok::<Vec<u8>, BError>(buffer)
233 })
234 .await
235 .or_err(InternalError, "async blocking IO failure")??;
236 self.deserialize_shard(&data)?;
237 }
238
239 Ok(())
240 }
241}
242
243#[cfg(test)]
244mod test {
245 use super::*;
246 use crate::CacheKey;
247
248 #[test]
251 fn test_admission() {
252 let lru = Manager::<1>::with_capacity(4, 10);
253 let key1 = CacheKey::new("", "a", "1").to_compact();
254 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
256 assert_eq!(v.len(), 0);
257 let key2 = CacheKey::new("", "b", "1").to_compact();
258 let v = lru.admit(key2.clone(), 2, until);
259 assert_eq!(v.len(), 0);
260 let key3 = CacheKey::new("", "c", "1").to_compact();
261 let v = lru.admit(key3, 1, until);
262 assert_eq!(v.len(), 0);
263
264 let key4 = CacheKey::new("", "d", "1").to_compact();
267 let v = lru.admit(key4, 2, until);
268 assert_eq!(v.len(), 2);
270 assert_eq!(v[0], key1);
271 assert_eq!(v[1], key2);
272 }
273
274 #[test]
275 fn test_access() {
276 let lru = Manager::<1>::with_capacity(4, 10);
277 let key1 = CacheKey::new("", "a", "1").to_compact();
278 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
280 assert_eq!(v.len(), 0);
281 let key2 = CacheKey::new("", "b", "1").to_compact();
282 let v = lru.admit(key2.clone(), 2, until);
283 assert_eq!(v.len(), 0);
284 let key3 = CacheKey::new("", "c", "1").to_compact();
285 let v = lru.admit(key3, 1, until);
286 assert_eq!(v.len(), 0);
287
288 lru.access(&key1, 1, until);
291 assert_eq!(v.len(), 0);
292
293 let key4 = CacheKey::new("", "d", "1").to_compact();
294 let v = lru.admit(key4, 2, until);
295 assert_eq!(v.len(), 1);
296 assert_eq!(v[0], key2);
297 }
298
299 #[test]
300 fn test_remove() {
301 let lru = Manager::<1>::with_capacity(4, 10);
302 let key1 = CacheKey::new("", "a", "1").to_compact();
303 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
305 assert_eq!(v.len(), 0);
306 let key2 = CacheKey::new("", "b", "1").to_compact();
307 let v = lru.admit(key2.clone(), 2, until);
308 assert_eq!(v.len(), 0);
309 let key3 = CacheKey::new("", "c", "1").to_compact();
310 let v = lru.admit(key3, 1, until);
311 assert_eq!(v.len(), 0);
312
313 lru.remove(&key1);
316
317 let key4 = CacheKey::new("", "d", "1").to_compact();
319 let v = lru.admit(key4, 2, until);
320 assert_eq!(v.len(), 1);
321 assert_eq!(v[0], key2);
322 }
323
324 #[test]
325 fn test_access_add() {
326 let lru = Manager::<1>::with_capacity(4, 10);
327 let until = SystemTime::now(); let key1 = CacheKey::new("", "a", "1").to_compact();
330 lru.access(&key1, 1, until);
331 let key2 = CacheKey::new("", "b", "1").to_compact();
332 lru.access(&key2, 2, until);
333 let key3 = CacheKey::new("", "c", "1").to_compact();
334 lru.access(&key3, 2, until);
335
336 let key4 = CacheKey::new("", "d", "1").to_compact();
337 let v = lru.admit(key4, 2, until);
338 assert_eq!(v.len(), 2);
340 assert_eq!(v[0], key1);
341 assert_eq!(v[1], key2);
342 }
343
344 #[test]
345 fn test_admit_update() {
346 let lru = Manager::<1>::with_capacity(4, 10);
347 let key1 = CacheKey::new("", "a", "1").to_compact();
348 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
350 assert_eq!(v.len(), 0);
351 let key2 = CacheKey::new("", "b", "1").to_compact();
352 let v = lru.admit(key2.clone(), 2, until);
353 assert_eq!(v.len(), 0);
354 let key3 = CacheKey::new("", "c", "1").to_compact();
355 let v = lru.admit(key3, 1, until);
356 assert_eq!(v.len(), 0);
357
358 let v = lru.admit(key2, 1, until);
361 assert_eq!(v.len(), 0);
362
363 let key4 = CacheKey::new("", "d", "1").to_compact();
365 let v = lru.admit(key4.clone(), 1, until);
366 assert_eq!(v.len(), 0);
367
368 let v = lru.admit(key4, 2, until);
370 assert_eq!(v.len(), 1);
372 assert_eq!(v[0], key1);
373 }
374
375 #[test]
376 fn test_peek() {
377 let lru = Manager::<1>::with_capacity(4, 10);
378 let until = SystemTime::now(); let key1 = CacheKey::new("", "a", "1").to_compact();
381 lru.access(&key1, 1, until);
382 let key2 = CacheKey::new("", "b", "1").to_compact();
383 lru.access(&key2, 2, until);
384 assert!(lru.peek(&key1));
385 assert!(lru.peek(&key2));
386 }
387
388 #[test]
389 fn test_serde() {
390 let lru = Manager::<1>::with_capacity(4, 10);
391 let key1 = CacheKey::new("", "a", "1").to_compact();
392 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
394 assert_eq!(v.len(), 0);
395 let key2 = CacheKey::new("", "b", "1").to_compact();
396 let v = lru.admit(key2.clone(), 2, until);
397 assert_eq!(v.len(), 0);
398 let key3 = CacheKey::new("", "c", "1").to_compact();
399 let v = lru.admit(key3, 1, until);
400 assert_eq!(v.len(), 0);
401
402 lru.access(&key1, 1, until);
405 assert_eq!(v.len(), 0);
406
407 let ser = lru.serialize_shard(0).unwrap();
409 let lru2 = Manager::<1>::with_capacity(4, 10);
410 lru2.deserialize_shard(&ser).unwrap();
411
412 let key4 = CacheKey::new("", "d", "1").to_compact();
413 let v = lru2.admit(key4, 2, until);
414 assert_eq!(v.len(), 1);
415 assert_eq!(v[0], key2);
416 }
417
418 #[tokio::test]
419 async fn test_save_to_disk() {
420 let until = SystemTime::now(); let lru = Manager::<2>::with_capacity(10, 10);
422
423 lru.admit(CacheKey::new("", "a", "1").to_compact(), 1, until);
424 lru.admit(CacheKey::new("", "b", "1").to_compact(), 2, until);
425 lru.admit(CacheKey::new("", "c", "1").to_compact(), 1, until);
426 lru.admit(CacheKey::new("", "d", "1").to_compact(), 1, until);
427 lru.admit(CacheKey::new("", "e", "1").to_compact(), 2, until);
428 lru.admit(CacheKey::new("", "f", "1").to_compact(), 1, until);
429
430 lru.save("/tmp/test_lru_save").await.unwrap();
432 let lru2 = Manager::<2>::with_capacity(4, 10);
433 lru2.load("/tmp/test_lru_save").await.unwrap();
434
435 let ser0 = lru.serialize_shard(0).unwrap();
436 let ser1 = lru.serialize_shard(1).unwrap();
437
438 assert_eq!(ser0, lru2.serialize_shard(0).unwrap());
439 assert_eq!(ser1, lru2.serialize_shard(1).unwrap());
440 }
441}