1use super::EvictionManager;
18use crate::key::CompactCacheKey;
19
20use async_trait::async_trait;
21use log::{info, warn};
22use pingora_error::{BError, ErrorType::*, OrErr, Result};
23use pingora_lru::Lru;
24use rand::Rng;
25use serde::de::SeqAccess;
26use serde::{Deserialize, Serialize};
27use std::fs::{rename, File};
28use std::hash::{Hash, Hasher};
29use std::io::prelude::*;
30use std::path::Path;
31use std::time::SystemTime;
32
33pub struct Manager<const N: usize>(Lru<CompactCacheKey, N>);
41
42#[derive(Debug, Serialize, Deserialize)]
43struct SerdeHelperNode(CompactCacheKey, usize);
44
45impl<const N: usize> Manager<N> {
46 pub fn with_capacity(limit: usize, capacity: usize) -> Self {
50 Manager(Lru::with_capacity(limit, capacity))
51 }
52
53 pub fn serialize_shard(&self, shard: usize) -> Result<Vec<u8>> {
55 use rmp_serde::encode::Serializer;
56 use serde::ser::SerializeSeq;
57 use serde::ser::Serializer as _;
58
59 assert!(shard < N);
60
61 let mut nodes = Vec::with_capacity(self.0.shard_len(shard));
64 self.0.iter_for_each(shard, |(node, size)| {
65 nodes.push(SerdeHelperNode(node.clone(), size));
66 });
67 let mut ser = Serializer::new(vec![]);
68 let mut seq = ser
69 .serialize_seq(Some(self.0.shard_len(shard)))
70 .or_err(InternalError, "fail to serialize node")?;
71 for node in nodes {
72 seq.serialize_element(&node).unwrap(); }
74
75 seq.end().or_err(InternalError, "when serializing LRU")?;
76 Ok(ser.into_inner())
77 }
78
79 pub fn deserialize_shard(&self, buf: &[u8]) -> Result<()> {
83 use rmp_serde::decode::Deserializer;
84 use serde::de::Deserializer as _;
85
86 let mut de = Deserializer::new(buf);
87 let visitor = InsertToManager { lru: self };
88 de.deserialize_seq(visitor)
89 .or_err(InternalError, "when deserializing LRU")?;
90 Ok(())
91 }
92}
93
94struct InsertToManager<'a, const N: usize> {
95 lru: &'a Manager<N>,
96}
97
98impl<'de, const N: usize> serde::de::Visitor<'de> for InsertToManager<'_, N> {
99 type Value = ();
100
101 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
102 formatter.write_str("array of lru nodes")
103 }
104
105 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
106 where
107 A: SeqAccess<'de>,
108 {
109 while let Some(node) = seq.next_element::<SerdeHelperNode>()? {
110 let key = u64key(&node.0);
111 self.lru.0.insert_tail(key, node.0, node.1); }
113 Ok(())
114 }
115}
116
117#[inline]
118fn u64key(key: &CompactCacheKey) -> u64 {
119 let mut hasher = ahash::AHasher::default();
121 key.hash(&mut hasher);
122 hasher.finish()
123}
124
125const FILE_NAME: &str = "lru.data";
126
127#[inline]
128fn err_str_path(s: &str, path: &Path) -> String {
129 format!("{s} {}", path.display())
130}
131
132#[async_trait]
133impl<const N: usize> EvictionManager for Manager<N> {
134 fn total_size(&self) -> usize {
135 self.0.weight()
136 }
137 fn total_items(&self) -> usize {
138 self.0.len()
139 }
140 fn evicted_size(&self) -> usize {
141 self.0.evicted_weight()
142 }
143 fn evicted_items(&self) -> usize {
144 self.0.evicted_len()
145 }
146
147 fn admit(
148 &self,
149 item: CompactCacheKey,
150 size: usize,
151 _fresh_until: SystemTime,
152 ) -> Vec<CompactCacheKey> {
153 let key = u64key(&item);
154 self.0.admit(key, item, size);
155 self.0
156 .evict_to_limit()
157 .into_iter()
158 .map(|(key, _weight)| key)
159 .collect()
160 }
161
162 fn increment_weight(&self, item: CompactCacheKey, delta: usize) -> Vec<CompactCacheKey> {
163 let key = u64key(&item);
164 self.0.increment_weight(key, delta);
165 self.0
166 .evict_to_limit()
167 .into_iter()
168 .map(|(key, _weight)| key)
169 .collect()
170 }
171
172 fn remove(&self, item: &CompactCacheKey) {
173 let key = u64key(item);
174 self.0.remove(key);
175 }
176
177 fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
178 let key = u64key(item);
179 if !self.0.promote(key) {
180 self.0.admit(key, item.clone(), size);
181 false
182 } else {
183 true
184 }
185 }
186
187 fn peek(&self, item: &CompactCacheKey) -> bool {
188 let key = u64key(item);
189 self.0.peek(key)
190 }
191
192 async fn save(&self, dir_path: &str) -> Result<()> {
193 let dir_path_str = dir_path.to_owned();
194
195 tokio::task::spawn_blocking(move || {
196 let dir_path = Path::new(&dir_path_str);
197 std::fs::create_dir_all(dir_path)
198 .or_err_with(InternalError, || err_str_path("fail to create", dir_path))
199 })
200 .await
201 .or_err(InternalError, "async blocking IO failure")??;
202
203 for i in 0..N {
204 let data = self.serialize_shard(i)?;
205 let dir_path = dir_path.to_owned();
206 tokio::task::spawn_blocking(move || {
207 let dir_path = Path::new(&dir_path);
208 let final_path = dir_path.join(format!("{}.{i}", FILE_NAME));
209 let random_suffix: u32 = rand::thread_rng().gen();
211 let temp_path =
212 dir_path.join(format!("{}.{i}.{:08x}.tmp", FILE_NAME, random_suffix));
213 let mut file = File::create(&temp_path)
214 .or_err_with(InternalError, || err_str_path("fail to create", &temp_path))?;
215 file.write_all(&data).or_err_with(InternalError, || {
216 err_str_path("fail to write to", &temp_path)
217 })?;
218 file.flush().or_err_with(InternalError, || {
219 err_str_path("fail to flush temp file", &temp_path)
220 })?;
221 rename(&temp_path, &final_path).or_err_with(InternalError, || {
222 format!(
223 "Failed to rename file from {} to {}",
224 temp_path.display(),
225 final_path.display(),
226 )
227 })
228 })
229 .await
230 .or_err(InternalError, "async blocking IO failure")??;
231 }
232 Ok(())
233 }
234
235 async fn load(&self, dir_path: &str) -> Result<()> {
236 let mut loaded_shards = 0;
238 for i in 0..N {
239 let dir_path = dir_path.to_owned();
240
241 let data = tokio::task::spawn_blocking(move || {
242 let file_path = Path::new(&dir_path).join(format!("{}.{i}", FILE_NAME));
243 let mut file = File::open(&file_path)
244 .or_err_with(InternalError, || err_str_path("fail to open", &file_path))?;
245 let mut buffer = Vec::with_capacity(8192);
246 file.read_to_end(&mut buffer)
247 .or_err_with(InternalError, || {
248 err_str_path("fail to read from", &file_path)
249 })?;
250 Ok::<Vec<u8>, BError>(buffer)
251 })
252 .await
253 .or_err(InternalError, "async blocking IO failure")??;
254
255 if let Err(e) = self.deserialize_shard(&data) {
256 warn!("Failed to deserialize shard {}: {}. Skipping shard.", i, e);
257 continue; }
259 loaded_shards += 1;
260 }
261
262 if loaded_shards < N {
264 warn!(
265 "Only loaded {}/{} shards. Cache may be incomplete.",
266 loaded_shards, N
267 )
268 } else {
269 info!("Successfully loaded {}/{} shards.", loaded_shards, N)
270 }
271
272 Ok(())
273 }
274}
275
276#[cfg(test)]
277mod test {
278 use super::*;
279 use crate::CacheKey;
280
281 #[test]
284 fn test_admission() {
285 let lru = Manager::<1>::with_capacity(4, 10);
286 let key1 = CacheKey::new("", "a", "1").to_compact();
287 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
289 assert_eq!(v.len(), 0);
290 let key2 = CacheKey::new("", "b", "1").to_compact();
291 let v = lru.admit(key2.clone(), 2, until);
292 assert_eq!(v.len(), 0);
293 let key3 = CacheKey::new("", "c", "1").to_compact();
294 let v = lru.admit(key3, 1, until);
295 assert_eq!(v.len(), 0);
296
297 let key4 = CacheKey::new("", "d", "1").to_compact();
300 let v = lru.admit(key4, 2, until);
301 assert_eq!(v.len(), 2);
303 assert_eq!(v[0], key1);
304 assert_eq!(v[1], key2);
305 }
306
307 #[test]
308 fn test_access() {
309 let lru = Manager::<1>::with_capacity(4, 10);
310 let key1 = CacheKey::new("", "a", "1").to_compact();
311 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
313 assert_eq!(v.len(), 0);
314 let key2 = CacheKey::new("", "b", "1").to_compact();
315 let v = lru.admit(key2.clone(), 2, until);
316 assert_eq!(v.len(), 0);
317 let key3 = CacheKey::new("", "c", "1").to_compact();
318 let v = lru.admit(key3, 1, until);
319 assert_eq!(v.len(), 0);
320
321 lru.access(&key1, 1, until);
324 assert_eq!(v.len(), 0);
325
326 let key4 = CacheKey::new("", "d", "1").to_compact();
327 let v = lru.admit(key4, 2, until);
328 assert_eq!(v.len(), 1);
329 assert_eq!(v[0], key2);
330 }
331
332 #[test]
333 fn test_remove() {
334 let lru = Manager::<1>::with_capacity(4, 10);
335 let key1 = CacheKey::new("", "a", "1").to_compact();
336 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
338 assert_eq!(v.len(), 0);
339 let key2 = CacheKey::new("", "b", "1").to_compact();
340 let v = lru.admit(key2.clone(), 2, until);
341 assert_eq!(v.len(), 0);
342 let key3 = CacheKey::new("", "c", "1").to_compact();
343 let v = lru.admit(key3, 1, until);
344 assert_eq!(v.len(), 0);
345
346 lru.remove(&key1);
349
350 let key4 = CacheKey::new("", "d", "1").to_compact();
352 let v = lru.admit(key4, 2, until);
353 assert_eq!(v.len(), 1);
354 assert_eq!(v[0], key2);
355 }
356
357 #[test]
358 fn test_access_add() {
359 let lru = Manager::<1>::with_capacity(4, 10);
360 let until = SystemTime::now(); let key1 = CacheKey::new("", "a", "1").to_compact();
363 lru.access(&key1, 1, until);
364 let key2 = CacheKey::new("", "b", "1").to_compact();
365 lru.access(&key2, 2, until);
366 let key3 = CacheKey::new("", "c", "1").to_compact();
367 lru.access(&key3, 2, until);
368
369 let key4 = CacheKey::new("", "d", "1").to_compact();
370 let v = lru.admit(key4, 2, until);
371 assert_eq!(v.len(), 2);
373 assert_eq!(v[0], key1);
374 assert_eq!(v[1], key2);
375 }
376
377 #[test]
378 fn test_admit_update() {
379 let lru = Manager::<1>::with_capacity(4, 10);
380 let key1 = CacheKey::new("", "a", "1").to_compact();
381 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
383 assert_eq!(v.len(), 0);
384 let key2 = CacheKey::new("", "b", "1").to_compact();
385 let v = lru.admit(key2.clone(), 2, until);
386 assert_eq!(v.len(), 0);
387 let key3 = CacheKey::new("", "c", "1").to_compact();
388 let v = lru.admit(key3, 1, until);
389 assert_eq!(v.len(), 0);
390
391 let v = lru.admit(key2, 1, until);
394 assert_eq!(v.len(), 0);
395
396 let key4 = CacheKey::new("", "d", "1").to_compact();
398 let v = lru.admit(key4.clone(), 1, until);
399 assert_eq!(v.len(), 0);
400
401 let v = lru.admit(key4, 2, until);
403 assert_eq!(v.len(), 1);
405 assert_eq!(v[0], key1);
406 }
407
408 #[test]
409 fn test_peek() {
410 let lru = Manager::<1>::with_capacity(4, 10);
411 let until = SystemTime::now(); let key1 = CacheKey::new("", "a", "1").to_compact();
414 lru.access(&key1, 1, until);
415 let key2 = CacheKey::new("", "b", "1").to_compact();
416 lru.access(&key2, 2, until);
417 assert!(lru.peek(&key1));
418 assert!(lru.peek(&key2));
419 }
420
421 #[test]
422 fn test_serde() {
423 let lru = Manager::<1>::with_capacity(4, 10);
424 let key1 = CacheKey::new("", "a", "1").to_compact();
425 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
427 assert_eq!(v.len(), 0);
428 let key2 = CacheKey::new("", "b", "1").to_compact();
429 let v = lru.admit(key2.clone(), 2, until);
430 assert_eq!(v.len(), 0);
431 let key3 = CacheKey::new("", "c", "1").to_compact();
432 let v = lru.admit(key3, 1, until);
433 assert_eq!(v.len(), 0);
434
435 lru.access(&key1, 1, until);
438 assert_eq!(v.len(), 0);
439
440 let ser = lru.serialize_shard(0).unwrap();
442 let lru2 = Manager::<1>::with_capacity(4, 10);
443 lru2.deserialize_shard(&ser).unwrap();
444
445 let key4 = CacheKey::new("", "d", "1").to_compact();
446 let v = lru2.admit(key4, 2, until);
447 assert_eq!(v.len(), 1);
448 assert_eq!(v[0], key2);
449 }
450
451 #[tokio::test]
452 async fn test_save_to_disk() {
453 let until = SystemTime::now(); let lru = Manager::<2>::with_capacity(10, 10);
455
456 lru.admit(CacheKey::new("", "a", "1").to_compact(), 1, until);
457 lru.admit(CacheKey::new("", "b", "1").to_compact(), 2, until);
458 lru.admit(CacheKey::new("", "c", "1").to_compact(), 1, until);
459 lru.admit(CacheKey::new("", "d", "1").to_compact(), 1, until);
460 lru.admit(CacheKey::new("", "e", "1").to_compact(), 2, until);
461 lru.admit(CacheKey::new("", "f", "1").to_compact(), 1, until);
462
463 lru.save("/tmp/test_lru_save").await.unwrap();
465 let lru2 = Manager::<2>::with_capacity(4, 10);
466 lru2.load("/tmp/test_lru_save").await.unwrap();
467
468 let ser0 = lru.serialize_shard(0).unwrap();
469 let ser1 = lru.serialize_shard(1).unwrap();
470
471 assert_eq!(ser0, lru2.serialize_shard(0).unwrap());
472 assert_eq!(ser1, lru2.serialize_shard(1).unwrap());
473 }
474}