1use super::EvictionManager;
18use crate::key::CompactCacheKey;
19
20use async_trait::async_trait;
21use log::{info, warn};
22use pingora_error::{BError, ErrorType::*, OrErr, Result};
23use pingora_lru::Lru;
24use rand::Rng;
25use serde::de::SeqAccess;
26use serde::{Deserialize, Serialize};
27use std::fs::{rename, File};
28use std::hash::{Hash, Hasher};
29use std::io::prelude::*;
30use std::path::Path;
31use std::time::SystemTime;
32
33pub struct Manager<const N: usize>(Lru<CompactCacheKey, N>);
41
42#[derive(Debug, Serialize, Deserialize)]
43struct SerdeHelperNode(CompactCacheKey, usize);
44
45impl<const N: usize> Manager<N> {
46 pub fn with_capacity(limit: usize, capacity: usize) -> Self {
50 Manager(Lru::with_capacity(limit, capacity))
51 }
52
53 pub fn with_capacity_and_watermark(
58 limit: usize,
59 capacity: usize,
60 watermark: Option<usize>,
61 ) -> Self {
62 Manager(Lru::with_capacity_and_watermark(limit, capacity, watermark))
63 }
64
65 pub fn shards(&self) -> usize {
67 self.0.shards()
68 }
69
70 pub fn shard_weight(&self, shard: usize) -> usize {
72 self.0.shard_weight(shard)
73 }
74
75 pub fn shard_len(&self, shard: usize) -> usize {
77 self.0.shard_len(shard)
78 }
79
80 pub fn get_shard_for_key(&self, key: &CompactCacheKey) -> usize {
85 (u64key(key) % N as u64) as usize
86 }
87
88 pub fn serialize_shard(&self, shard: usize) -> Result<Vec<u8>> {
90 use rmp_serde::encode::Serializer;
91 use serde::ser::SerializeSeq;
92 use serde::ser::Serializer as _;
93
94 assert!(shard < N);
95
96 let mut nodes = Vec::with_capacity(self.0.shard_len(shard));
99 self.0.iter_for_each(shard, |(node, size)| {
100 nodes.push(SerdeHelperNode(node.clone(), size));
101 });
102 let mut ser = Serializer::new(vec![]);
103 let mut seq = ser
104 .serialize_seq(Some(self.0.shard_len(shard)))
105 .or_err(InternalError, "fail to serialize node")?;
106 for node in nodes {
107 seq.serialize_element(&node).unwrap(); }
109
110 seq.end().or_err(InternalError, "when serializing LRU")?;
111 Ok(ser.into_inner())
112 }
113
114 pub fn deserialize_shard(&self, buf: &[u8]) -> Result<()> {
118 use rmp_serde::decode::Deserializer;
119 use serde::de::Deserializer as _;
120
121 let mut de = Deserializer::new(buf);
122 let visitor = InsertToManager { lru: self };
123 de.deserialize_seq(visitor)
124 .or_err(InternalError, "when deserializing LRU")?;
125 Ok(())
126 }
127
128 pub fn peek_weight(&self, item: &CompactCacheKey) -> Option<usize> {
130 let key = u64key(item);
131 self.0.peek_weight(key)
132 }
133}
134
135struct InsertToManager<'a, const N: usize> {
136 lru: &'a Manager<N>,
137}
138
139impl<'de, const N: usize> serde::de::Visitor<'de> for InsertToManager<'_, N> {
140 type Value = ();
141
142 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
143 formatter.write_str("array of lru nodes")
144 }
145
146 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
147 where
148 A: SeqAccess<'de>,
149 {
150 while let Some(node) = seq.next_element::<SerdeHelperNode>()? {
151 let key = u64key(&node.0);
152 self.lru.0.insert_tail(key, node.0, node.1); }
154 Ok(())
155 }
156}
157
158#[inline]
159fn u64key(key: &CompactCacheKey) -> u64 {
160 let mut hasher = ahash::AHasher::default();
162 key.hash(&mut hasher);
163 hasher.finish()
164}
165
166const FILE_NAME: &str = "lru.data";
167
168#[inline]
169fn err_str_path(s: &str, path: &Path) -> String {
170 format!("{s} {}", path.display())
171}
172
173#[async_trait]
174impl<const N: usize> EvictionManager for Manager<N> {
175 fn total_size(&self) -> usize {
176 self.0.weight()
177 }
178 fn total_items(&self) -> usize {
179 self.0.len()
180 }
181 fn evicted_size(&self) -> usize {
182 self.0.evicted_weight()
183 }
184 fn evicted_items(&self) -> usize {
185 self.0.evicted_len()
186 }
187
188 fn admit(
189 &self,
190 item: CompactCacheKey,
191 size: usize,
192 _fresh_until: SystemTime,
193 ) -> Vec<CompactCacheKey> {
194 let key = u64key(&item);
195 self.0.admit(key, item, size);
196 self.0
197 .evict_to_limit()
198 .into_iter()
199 .map(|(key, _weight)| key)
200 .collect()
201 }
202
203 fn increment_weight(
204 &self,
205 item: &CompactCacheKey,
206 delta: usize,
207 max_weight: Option<usize>,
208 ) -> Vec<CompactCacheKey> {
209 let key = u64key(item);
210 self.0.increment_weight(key, delta, max_weight);
211 self.0
212 .evict_to_limit()
213 .into_iter()
214 .map(|(key, _weight)| key)
215 .collect()
216 }
217
218 fn remove(&self, item: &CompactCacheKey) {
219 let key = u64key(item);
220 self.0.remove(key);
221 }
222
223 fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
224 let key = u64key(item);
225 if !self.0.promote(key) {
226 self.0.admit(key, item.clone(), size);
227 false
228 } else {
229 true
230 }
231 }
232
233 fn peek(&self, item: &CompactCacheKey) -> bool {
234 let key = u64key(item);
235 self.0.peek(key)
236 }
237
238 async fn save(&self, dir_path: &str) -> Result<()> {
239 let dir_path_str = dir_path.to_owned();
240
241 tokio::task::spawn_blocking(move || {
242 let dir_path = Path::new(&dir_path_str);
243 std::fs::create_dir_all(dir_path)
244 .or_err_with(InternalError, || err_str_path("fail to create", dir_path))
245 })
246 .await
247 .or_err(InternalError, "async blocking IO failure")??;
248
249 for i in 0..N {
250 let data = self.serialize_shard(i)?;
251 let dir_path = dir_path.to_owned();
252 tokio::task::spawn_blocking(move || {
253 let dir_path = Path::new(&dir_path);
254 let final_path = dir_path.join(format!("{}.{i}", FILE_NAME));
255 let random_suffix: u32 = rand::thread_rng().gen();
257 let temp_path =
258 dir_path.join(format!("{}.{i}.{:08x}.tmp", FILE_NAME, random_suffix));
259 let mut file = File::create(&temp_path)
260 .or_err_with(InternalError, || err_str_path("fail to create", &temp_path))?;
261 file.write_all(&data).or_err_with(InternalError, || {
262 err_str_path("fail to write to", &temp_path)
263 })?;
264 file.flush().or_err_with(InternalError, || {
265 err_str_path("fail to flush temp file", &temp_path)
266 })?;
267 rename(&temp_path, &final_path).or_err_with(InternalError, || {
268 format!(
269 "Failed to rename file from {} to {}",
270 temp_path.display(),
271 final_path.display(),
272 )
273 })
274 })
275 .await
276 .or_err(InternalError, "async blocking IO failure")??;
277 }
278 Ok(())
279 }
280
281 async fn load(&self, dir_path: &str) -> Result<()> {
282 let mut loaded_shards = 0;
284 for i in 0..N {
285 let dir_path = dir_path.to_owned();
286
287 let data = tokio::task::spawn_blocking(move || {
288 let file_path = Path::new(&dir_path).join(format!("{}.{i}", FILE_NAME));
289 let mut file = File::open(&file_path)
290 .or_err_with(InternalError, || err_str_path("fail to open", &file_path))?;
291 let mut buffer = Vec::with_capacity(8192);
292 file.read_to_end(&mut buffer)
293 .or_err_with(InternalError, || {
294 err_str_path("fail to read from", &file_path)
295 })?;
296 Ok::<Vec<u8>, BError>(buffer)
297 })
298 .await
299 .or_err(InternalError, "async blocking IO failure")??;
300
301 if let Err(e) = self.deserialize_shard(&data) {
302 warn!("Failed to deserialize shard {}: {}. Skipping shard.", i, e);
303 continue; }
305 loaded_shards += 1;
306 }
307
308 if loaded_shards < N {
310 warn!(
311 "Only loaded {}/{} shards. Cache may be incomplete.",
312 loaded_shards, N
313 )
314 } else {
315 info!("Successfully loaded {}/{} shards.", loaded_shards, N)
316 }
317
318 cleanup_temp_files(dir_path);
319
320 Ok(())
321 }
322}
323
324fn cleanup_temp_files(dir_path: &str) {
325 let dir_path = Path::new(dir_path).to_owned();
326
327 tokio::task::spawn_blocking({
328 move || {
329 if !dir_path.exists() {
330 return;
331 }
332
333 let entries = match std::fs::read_dir(&dir_path) {
334 Ok(entries) => entries,
335 Err(e) => {
336 warn!("Failed to read directory {}: {e}", dir_path.display());
337 return;
338 }
339 };
340
341 let mut cleaned_count = 0;
342 let mut error_count = 0;
343
344 for entry in entries {
345 let entry = match entry {
346 Ok(entry) => entry,
347 Err(e) => {
348 warn!(
349 "Failed to read directory entry in {}: {e}",
350 dir_path.display()
351 );
352 error_count += 1;
353 continue;
354 }
355 };
356
357 let file_name = entry.file_name();
358 let file_name_str = file_name.to_string_lossy();
359
360 if file_name_str.starts_with(FILE_NAME) && file_name_str.ends_with(".tmp") {
361 match std::fs::remove_file(entry.path()) {
362 Ok(()) => {
363 info!("Cleaned up orphaned temp file: {}", entry.path().display());
364 cleaned_count += 1;
365 }
366 Err(e) => {
367 warn!("Failed to remove temp file {}: {e}", entry.path().display());
368 error_count += 1;
369 }
370 }
371 }
372 }
373
374 if cleaned_count > 0 || error_count > 0 {
375 info!(
376 "Temp file cleanup completed. Removed: {cleaned_count}, Errors: {error_count}"
377 );
378 }
379 }
380 });
381}
382
383#[cfg(test)]
384mod test {
385 use super::*;
386 use crate::CacheKey;
387
388 #[test]
391 fn test_admission() {
392 let lru = Manager::<1>::with_capacity(4, 10);
393 let key1 = CacheKey::new("", "a", "1").to_compact();
394 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
396 assert_eq!(v.len(), 0);
397 let key2 = CacheKey::new("", "b", "1").to_compact();
398 let v = lru.admit(key2.clone(), 2, until);
399 assert_eq!(v.len(), 0);
400 let key3 = CacheKey::new("", "c", "1").to_compact();
401 let v = lru.admit(key3, 1, until);
402 assert_eq!(v.len(), 0);
403
404 let key4 = CacheKey::new("", "d", "1").to_compact();
407 let v = lru.admit(key4, 2, until);
408 assert_eq!(v.len(), 2);
410 assert_eq!(v[0], key1);
411 assert_eq!(v[1], key2);
412 }
413
414 #[test]
415 fn test_access() {
416 let lru = Manager::<1>::with_capacity(4, 10);
417 let key1 = CacheKey::new("", "a", "1").to_compact();
418 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
420 assert_eq!(v.len(), 0);
421 let key2 = CacheKey::new("", "b", "1").to_compact();
422 let v = lru.admit(key2.clone(), 2, until);
423 assert_eq!(v.len(), 0);
424 let key3 = CacheKey::new("", "c", "1").to_compact();
425 let v = lru.admit(key3, 1, until);
426 assert_eq!(v.len(), 0);
427
428 lru.access(&key1, 1, until);
431 assert_eq!(v.len(), 0);
432
433 let key4 = CacheKey::new("", "d", "1").to_compact();
434 let v = lru.admit(key4, 2, until);
435 assert_eq!(v.len(), 1);
436 assert_eq!(v[0], key2);
437 }
438
439 #[test]
440 fn test_remove() {
441 let lru = Manager::<1>::with_capacity(4, 10);
442 let key1 = CacheKey::new("", "a", "1").to_compact();
443 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
445 assert_eq!(v.len(), 0);
446 let key2 = CacheKey::new("", "b", "1").to_compact();
447 let v = lru.admit(key2.clone(), 2, until);
448 assert_eq!(v.len(), 0);
449 let key3 = CacheKey::new("", "c", "1").to_compact();
450 let v = lru.admit(key3, 1, until);
451 assert_eq!(v.len(), 0);
452
453 lru.remove(&key1);
456
457 let key4 = CacheKey::new("", "d", "1").to_compact();
459 let v = lru.admit(key4, 2, until);
460 assert_eq!(v.len(), 1);
461 assert_eq!(v[0], key2);
462 }
463
464 #[test]
465 fn test_access_add() {
466 let lru = Manager::<1>::with_capacity(4, 10);
467 let until = SystemTime::now(); let key1 = CacheKey::new("", "a", "1").to_compact();
470 lru.access(&key1, 1, until);
471 let key2 = CacheKey::new("", "b", "1").to_compact();
472 lru.access(&key2, 2, until);
473 let key3 = CacheKey::new("", "c", "1").to_compact();
474 lru.access(&key3, 2, until);
475
476 let key4 = CacheKey::new("", "d", "1").to_compact();
477 let v = lru.admit(key4, 2, until);
478 assert_eq!(v.len(), 2);
480 assert_eq!(v[0], key1);
481 assert_eq!(v[1], key2);
482 }
483
484 #[test]
485 fn test_admit_update() {
486 let lru = Manager::<1>::with_capacity(4, 10);
487 let key1 = CacheKey::new("", "a", "1").to_compact();
488 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
490 assert_eq!(v.len(), 0);
491 let key2 = CacheKey::new("", "b", "1").to_compact();
492 let v = lru.admit(key2.clone(), 2, until);
493 assert_eq!(v.len(), 0);
494 let key3 = CacheKey::new("", "c", "1").to_compact();
495 let v = lru.admit(key3, 1, until);
496 assert_eq!(v.len(), 0);
497
498 let v = lru.admit(key2, 1, until);
501 assert_eq!(v.len(), 0);
502
503 let key4 = CacheKey::new("", "d", "1").to_compact();
505 let v = lru.admit(key4.clone(), 1, until);
506 assert_eq!(v.len(), 0);
507
508 let v = lru.admit(key4, 2, until);
510 assert_eq!(v.len(), 1);
512 assert_eq!(v[0], key1);
513 }
514
515 #[test]
516 fn test_peek() {
517 let lru = Manager::<1>::with_capacity(4, 10);
518 let until = SystemTime::now(); let key1 = CacheKey::new("", "a", "1").to_compact();
521 lru.access(&key1, 1, until);
522 let key2 = CacheKey::new("", "b", "1").to_compact();
523 lru.access(&key2, 2, until);
524 assert!(lru.peek(&key1));
525 assert!(lru.peek(&key2));
526 }
527
528 #[test]
529 fn test_serde() {
530 let lru = Manager::<1>::with_capacity(4, 10);
531 let key1 = CacheKey::new("", "a", "1").to_compact();
532 let until = SystemTime::now(); let v = lru.admit(key1.clone(), 1, until);
534 assert_eq!(v.len(), 0);
535 let key2 = CacheKey::new("", "b", "1").to_compact();
536 let v = lru.admit(key2.clone(), 2, until);
537 assert_eq!(v.len(), 0);
538 let key3 = CacheKey::new("", "c", "1").to_compact();
539 let v = lru.admit(key3, 1, until);
540 assert_eq!(v.len(), 0);
541
542 lru.access(&key1, 1, until);
545 assert_eq!(v.len(), 0);
546
547 let ser = lru.serialize_shard(0).unwrap();
549 let lru2 = Manager::<1>::with_capacity(4, 10);
550 lru2.deserialize_shard(&ser).unwrap();
551
552 let key4 = CacheKey::new("", "d", "1").to_compact();
553 let v = lru2.admit(key4, 2, until);
554 assert_eq!(v.len(), 1);
555 assert_eq!(v[0], key2);
556 }
557
558 #[tokio::test]
559 async fn test_save_to_disk() {
560 let until = SystemTime::now(); let lru = Manager::<2>::with_capacity(10, 10);
562
563 lru.admit(CacheKey::new("", "a", "1").to_compact(), 1, until);
564 lru.admit(CacheKey::new("", "b", "1").to_compact(), 2, until);
565 lru.admit(CacheKey::new("", "c", "1").to_compact(), 1, until);
566 lru.admit(CacheKey::new("", "d", "1").to_compact(), 1, until);
567 lru.admit(CacheKey::new("", "e", "1").to_compact(), 2, until);
568 lru.admit(CacheKey::new("", "f", "1").to_compact(), 1, until);
569
570 lru.save("/tmp/test_lru_save").await.unwrap();
572 let lru2 = Manager::<2>::with_capacity(4, 10);
573 lru2.load("/tmp/test_lru_save").await.unwrap();
574
575 let ser0 = lru.serialize_shard(0).unwrap();
576 let ser1 = lru.serialize_shard(1).unwrap();
577
578 assert_eq!(ser0, lru2.serialize_shard(0).unwrap());
579 assert_eq!(ser1, lru2.serialize_shard(1).unwrap());
580 }
581
582 #[tokio::test]
583 async fn test_temp_file_cleanup() {
584 let test_dir = "/tmp/test_lru_cleanup";
585 let dir_path = Path::new(test_dir);
586
587 std::fs::create_dir_all(dir_path).unwrap();
589
590 let temp_files = [
592 "lru.data.0.12345678.tmp",
593 "lru.data.1.abcdef00.tmp",
594 "other_file.tmp", "lru.data.2", ];
597
598 for file in temp_files {
599 let file_path = dir_path.join(file);
600 std::fs::write(&file_path, b"test").unwrap();
601 }
602
603 cleanup_temp_files(test_dir);
605
606 tokio::time::sleep(core::time::Duration::from_secs(1)).await;
607
608 assert!(!dir_path.join("lru.data.0.12345678.tmp").exists());
610 assert!(!dir_path.join("lru.data.1.abcdef00.tmp").exists());
611 assert!(dir_path.join("other_file.tmp").exists()); assert!(dir_path.join("lru.data.2").exists()); std::fs::remove_dir_all(dir_path).unwrap();
616 }
617}