1use std::collections::{hash_map::DefaultHasher, HashMap, VecDeque};
2use std::hash::{Hash, Hasher};
3use std::path::PathBuf;
4use std::process;
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Arc;
7
8use tokio::sync::{broadcast, RwLock};
9
10use crate::compression::ContentEncoding;
11pub use crate::CacheStorageMode;
12
13static BODY_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
14
15#[derive(Clone, Debug)]
17pub enum RefreshMessage {
18 All,
20 Pattern(String),
22}
23
24#[derive(Clone)]
27pub struct RefreshTrigger {
28 sender: broadcast::Sender<RefreshMessage>,
29}
30
31impl RefreshTrigger {
32 pub fn new() -> Self {
33 let (sender, _) = broadcast::channel(16);
34 Self { sender }
35 }
36
37 pub fn trigger(&self) {
39 let _ = self.sender.send(RefreshMessage::All);
41 }
42
43 pub fn trigger_by_key_match(&self, pattern: &str) {
46 let _ = self
48 .sender
49 .send(RefreshMessage::Pattern(pattern.to_string()));
50 }
51
52 pub fn subscribe(&self) -> broadcast::Receiver<RefreshMessage> {
54 self.sender.subscribe()
55 }
56}
57
58fn matches_pattern(key: &str, pattern: &str) -> bool {
60 if key == pattern {
62 return true;
63 }
64
65 let parts: Vec<&str> = pattern.split('*').collect();
67
68 if parts.len() == 1 {
69 return false;
71 }
72
73 let mut current_pos = 0;
74
75 for (i, part) in parts.iter().enumerate() {
76 if part.is_empty() {
77 continue;
78 }
79
80 if i == 0 {
82 if !key.starts_with(part) {
83 return false;
84 }
85 current_pos = part.len();
86 }
87 else if i == parts.len() - 1 {
89 if !key[current_pos..].ends_with(part) {
90 return false;
91 }
92 }
93 else if let Some(pos) = key[current_pos..].find(part) {
95 current_pos += pos + part.len();
96 } else {
97 return false;
98 }
99 }
100
101 true
102}
103
104#[derive(Clone)]
106pub struct CacheStore {
107 store: Arc<RwLock<HashMap<String, StoredCachedResponse>>>,
108 store_404: Arc<RwLock<HashMap<String, StoredCachedResponse>>>,
110 keys_404: Arc<RwLock<VecDeque<String>>>,
111 cache_404_capacity: usize,
112 refresh_trigger: RefreshTrigger,
113 body_store: CacheBodyStore,
114}
115
116#[derive(Clone, Debug)]
117pub struct CachedResponse {
118 pub body: Vec<u8>,
119 pub headers: HashMap<String, String>,
120 pub status: u16,
121 pub content_encoding: Option<ContentEncoding>,
122}
123
124#[derive(Clone, Debug)]
125struct StoredCachedResponse {
126 body: StoredBody,
127 headers: HashMap<String, String>,
128 status: u16,
129 content_encoding: Option<ContentEncoding>,
130}
131
132#[derive(Clone, Debug)]
133enum StoredBody {
134 Memory(Vec<u8>),
135 File(PathBuf),
136}
137
138#[derive(Clone, Copy, Debug)]
139enum CacheBucket {
140 Standard,
141 NotFound,
142}
143
144impl CacheBucket {
145 fn directory_name(self) -> &'static str {
146 match self {
147 Self::Standard => "responses",
148 Self::NotFound => "responses-404",
149 }
150 }
151}
152
153#[derive(Clone, Debug)]
154struct CacheBodyStore {
155 mode: CacheStorageMode,
156 root_dir: Option<PathBuf>,
157}
158
159impl CacheBodyStore {
160 fn new(mode: CacheStorageMode, root_dir: Option<PathBuf>) -> Self {
161 let root_dir = match mode {
162 CacheStorageMode::Memory => None,
163 CacheStorageMode::Filesystem => {
164 let root_dir = root_dir.unwrap_or_else(default_cache_directory);
165 cleanup_orphaned_cache_files(&root_dir);
166 Some(root_dir)
167 }
168 };
169
170 Self { mode, root_dir }
171 }
172
173 async fn store(&self, key: &str, body: Vec<u8>, bucket: CacheBucket) -> StoredBody {
174 match self.mode {
175 CacheStorageMode::Memory => StoredBody::Memory(body),
176 CacheStorageMode::Filesystem => match self.write_body(key, &body, bucket).await {
177 Ok(path) => StoredBody::File(path),
178 Err(error) => {
179 tracing::warn!(
180 "Failed to persist cache body for '{}' to filesystem storage: {}",
181 key,
182 error
183 );
184 StoredBody::Memory(body)
185 }
186 },
187 }
188 }
189
190 async fn load(&self, body: &StoredBody) -> Option<Vec<u8>> {
191 match body {
192 StoredBody::Memory(bytes) => Some(bytes.clone()),
193 StoredBody::File(path) => match tokio::fs::read(path).await {
194 Ok(bytes) => Some(bytes),
195 Err(error) => {
196 tracing::warn!(
197 "Failed to read cached response body from '{}': {}",
198 path.display(),
199 error
200 );
201 None
202 }
203 },
204 }
205 }
206
207 async fn remove(&self, body: StoredBody) {
208 if let StoredBody::File(path) = body {
209 if let Err(error) = tokio::fs::remove_file(&path).await {
210 if error.kind() != std::io::ErrorKind::NotFound {
211 tracing::warn!(
212 "Failed to delete cached response body '{}': {}",
213 path.display(),
214 error
215 );
216 }
217 }
218 }
219 }
220
221 async fn write_body(
222 &self,
223 key: &str,
224 body: &[u8],
225 bucket: CacheBucket,
226 ) -> std::io::Result<PathBuf> {
227 let root_dir = self
228 .root_dir
229 .as_ref()
230 .expect("filesystem cache storage requires a root directory");
231 let bucket_dir = root_dir.join(bucket.directory_name());
232 tokio::fs::create_dir_all(&bucket_dir).await?;
233
234 let stem = cache_file_stem(key);
235 let tmp_path = bucket_dir.join(format!("{}.tmp", stem));
236 let final_path = bucket_dir.join(format!("{}.bin", stem));
237
238 tokio::fs::write(&tmp_path, body).await?;
239 tokio::fs::rename(&tmp_path, &final_path).await?;
240
241 Ok(final_path)
242 }
243}
244
245impl StoredCachedResponse {
246 async fn materialize(self, body_store: &CacheBodyStore) -> Option<CachedResponse> {
247 let body = body_store.load(&self.body).await?;
248
249 Some(CachedResponse {
250 body,
251 headers: self.headers,
252 status: self.status,
253 content_encoding: self.content_encoding,
254 })
255 }
256}
257
258fn default_cache_directory() -> PathBuf {
259 std::env::temp_dir().join("phantom-frame-cache")
260}
261
262fn cleanup_orphaned_cache_files(root_dir: &std::path::Path) {
263 for bucket in [CacheBucket::Standard, CacheBucket::NotFound] {
264 let bucket_dir = root_dir.join(bucket.directory_name());
265 cleanup_bucket_directory(&bucket_dir);
266 }
267}
268
269fn cleanup_bucket_directory(bucket_dir: &std::path::Path) {
270 let entries = match std::fs::read_dir(bucket_dir) {
271 Ok(entries) => entries,
272 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return,
273 Err(error) => {
274 tracing::warn!(
275 "Failed to inspect cache directory '{}' during startup cleanup: {}",
276 bucket_dir.display(),
277 error
278 );
279 return;
280 }
281 };
282
283 for entry in entries {
284 let entry = match entry {
285 Ok(entry) => entry,
286 Err(error) => {
287 tracing::warn!(
288 "Failed to enumerate cache directory '{}' during startup cleanup: {}",
289 bucket_dir.display(),
290 error
291 );
292 continue;
293 }
294 };
295
296 let path = entry.path();
297 let file_type = match entry.file_type() {
298 Ok(file_type) => file_type,
299 Err(error) => {
300 tracing::warn!(
301 "Failed to inspect cache entry '{}' during startup cleanup: {}",
302 path.display(),
303 error
304 );
305 continue;
306 }
307 };
308
309 let cleanup_result = if file_type.is_dir() {
310 std::fs::remove_dir_all(&path)
311 } else {
312 std::fs::remove_file(&path)
313 };
314
315 if let Err(error) = cleanup_result {
316 tracing::warn!(
317 "Failed to remove orphaned cache entry '{}' during startup cleanup: {}",
318 path.display(),
319 error
320 );
321 }
322 }
323}
324
325fn cache_file_stem(key: &str) -> String {
326 let mut hasher = DefaultHasher::new();
327 key.hash(&mut hasher);
328
329 let hash = hasher.finish();
330 let counter = BODY_FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
331
332 format!("{:016x}-{:x}-{:016x}", hash, process::id(), counter)
333}
334
335fn into_stored_response(body: StoredBody, response: CachedResponse) -> StoredCachedResponse {
336 StoredCachedResponse {
337 body,
338 headers: response.headers,
339 status: response.status,
340 content_encoding: response.content_encoding,
341 }
342}
343
344impl CacheStore {
345 pub fn new(refresh_trigger: RefreshTrigger, cache_404_capacity: usize) -> Self {
346 Self::with_storage(
347 refresh_trigger,
348 cache_404_capacity,
349 CacheStorageMode::Memory,
350 None,
351 )
352 }
353
354 pub fn with_storage(
355 refresh_trigger: RefreshTrigger,
356 cache_404_capacity: usize,
357 storage_mode: CacheStorageMode,
358 cache_directory: Option<PathBuf>,
359 ) -> Self {
360 Self {
361 store: Arc::new(RwLock::new(HashMap::new())),
362 store_404: Arc::new(RwLock::new(HashMap::new())),
363 keys_404: Arc::new(RwLock::new(VecDeque::new())),
364 cache_404_capacity,
365 refresh_trigger,
366 body_store: CacheBodyStore::new(storage_mode, cache_directory),
367 }
368 }
369
370 pub async fn get(&self, key: &str) -> Option<CachedResponse> {
371 let cached = {
372 let store = self.store.read().await;
373 store.get(key).cloned()
374 }?;
375
376 cached.materialize(&self.body_store).await
377 }
378
379 pub async fn get_404(&self, key: &str) -> Option<CachedResponse> {
381 let cached = {
382 let store = self.store_404.read().await;
383 store.get(key).cloned()
384 }?;
385
386 cached.materialize(&self.body_store).await
387 }
388
389 pub async fn set(&self, key: String, response: CachedResponse) {
390 let body = self
391 .body_store
392 .store(&key, response.body.clone(), CacheBucket::Standard)
393 .await;
394 let stored = into_stored_response(body, response);
395
396 let replaced = {
397 let mut store = self.store.write().await;
398 store.insert(key, stored)
399 };
400
401 if let Some(old) = replaced {
402 self.body_store.remove(old.body).await;
403 }
404 }
405
406 pub async fn set_404(&self, key: String, response: CachedResponse) {
408 if self.cache_404_capacity == 0 {
409 return;
411 }
412
413 let body = self
414 .body_store
415 .store(&key, response.body.clone(), CacheBucket::NotFound)
416 .await;
417 let stored = into_stored_response(body, response);
418
419 let removed_bodies = {
420 let mut store = self.store_404.write().await;
421 let mut keys = self.keys_404.write().await;
422 let mut removed = Vec::new();
423
424 if store.contains_key(&key) {
425 if let Some(pos) = keys.iter().position(|existing_key| existing_key == &key) {
426 keys.remove(pos);
427 }
428 }
429
430 if let Some(old) = store.insert(key.clone(), stored) {
431 removed.push(old.body);
432 }
433 keys.push_back(key);
434
435 while keys.len() > self.cache_404_capacity {
436 if let Some(old_key) = keys.pop_front() {
437 if let Some(old) = store.remove(&old_key) {
438 removed.push(old.body);
439 }
440 }
441 }
442
443 removed
444 };
445
446 for body in removed_bodies {
447 self.body_store.remove(body).await;
448 }
449 }
450
451 pub async fn clear(&self) {
452 let removed_bodies = {
453 let mut removed = Vec::new();
454
455 let mut store = self.store.write().await;
456 removed.extend(store.drain().map(|(_, response)| response.body));
457
458 let mut store404 = self.store_404.write().await;
459 removed.extend(store404.drain().map(|(_, response)| response.body));
460
461 let mut keys = self.keys_404.write().await;
462 keys.clear();
463
464 removed
465 };
466
467 for body in removed_bodies {
468 self.body_store.remove(body).await;
469 }
470 }
471
472 pub async fn clear_by_pattern(&self, pattern: &str) {
474 let removed_bodies = {
475 let mut removed = Vec::new();
476
477 let mut store = self.store.write().await;
478 let keys_to_remove: Vec<String> = store
479 .keys()
480 .filter(|key| matches_pattern(key, pattern))
481 .cloned()
482 .collect();
483 for key in keys_to_remove {
484 if let Some(old) = store.remove(&key) {
485 removed.push(old.body);
486 }
487 }
488
489 let mut store404 = self.store_404.write().await;
490 let keys_to_remove_404: Vec<String> = store404
491 .keys()
492 .filter(|key| matches_pattern(key, pattern))
493 .cloned()
494 .collect();
495 for key in &keys_to_remove_404 {
496 if let Some(old) = store404.remove(key) {
497 removed.push(old.body);
498 }
499 }
500
501 let mut keys = self.keys_404.write().await;
502 keys.retain(|key| !matches_pattern(key, pattern));
503
504 removed
505 };
506
507 for body in removed_bodies {
508 self.body_store.remove(body).await;
509 }
510 }
511
512 pub fn refresh_trigger(&self) -> &RefreshTrigger {
513 &self.refresh_trigger
514 }
515
516 pub async fn size(&self) -> usize {
518 let store = self.store.read().await;
519 store.len()
520 }
521
522 pub async fn size_404(&self) -> usize {
524 let store = self.store_404.read().await;
525 store.len()
526 }
527}
528
529impl Default for RefreshTrigger {
530 fn default() -> Self {
531 Self::new()
532 }
533}
534
535#[cfg(test)]
536mod tests {
537 use super::*;
538
539 fn unique_test_directory(name: &str) -> PathBuf {
540 std::env::temp_dir().join(format!(
541 "phantom-frame-test-{}-{:x}-{:016x}",
542 name,
543 process::id(),
544 BODY_FILE_COUNTER.fetch_add(1, Ordering::Relaxed)
545 ))
546 }
547
548 #[test]
549 fn test_matches_pattern_exact() {
550 assert!(matches_pattern("GET:/api/users", "GET:/api/users"));
551 assert!(!matches_pattern("GET:/api/users", "GET:/api/posts"));
552 }
553
554 #[test]
555 fn test_matches_pattern_wildcard() {
556 assert!(matches_pattern("GET:/api/users", "GET:/api/*"));
558 assert!(matches_pattern("GET:/api/users/123", "GET:/api/*"));
559 assert!(!matches_pattern("GET:/v2/users", "GET:/api/*"));
560
561 assert!(matches_pattern("GET:/api/users", "*/users"));
563 assert!(matches_pattern("POST:/v2/users", "*/users"));
564 assert!(!matches_pattern("GET:/api/posts", "*/users"));
565
566 assert!(matches_pattern("GET:/api/v1/users", "GET:/api/*/users"));
568 assert!(matches_pattern("GET:/api/v2/users", "GET:/api/*/users"));
569 assert!(!matches_pattern("GET:/api/v1/posts", "GET:/api/*/users"));
570
571 assert!(matches_pattern("GET:/api/v1/users/123", "GET:*/users/*"));
573 assert!(matches_pattern("POST:/v2/admin/users/456", "*/users/*"));
574 }
575
576 #[test]
577 fn test_matches_pattern_wildcard_only() {
578 assert!(matches_pattern("GET:/api/users", "*"));
579 assert!(matches_pattern("POST:/anything", "*"));
580 }
581
582 #[tokio::test]
583 async fn test_404_cache_set_get_and_eviction() {
584 let trigger = RefreshTrigger::new();
585 let store = CacheStore::new(trigger, 2);
587
588 let resp1 = CachedResponse {
589 body: vec![1],
590 headers: HashMap::new(),
591 status: 404,
592 content_encoding: None,
593 };
594 let resp2 = CachedResponse {
595 body: vec![2],
596 headers: HashMap::new(),
597 status: 404,
598 content_encoding: None,
599 };
600 let resp3 = CachedResponse {
601 body: vec![3],
602 headers: HashMap::new(),
603 status: 404,
604 content_encoding: None,
605 };
606
607 store
609 .set_404("GET:/notfound1".to_string(), resp1.clone())
610 .await;
611 store
612 .set_404("GET:/notfound2".to_string(), resp2.clone())
613 .await;
614
615 assert_eq!(store.size_404().await, 2);
616 assert_eq!(store.get_404("GET:/notfound1").await.unwrap().body, vec![1]);
617
618 store
620 .set_404("GET:/notfound3".to_string(), resp3.clone())
621 .await;
622 assert_eq!(store.size_404().await, 2);
623 assert!(store.get_404("GET:/notfound1").await.is_none());
624 assert_eq!(store.get_404("GET:/notfound2").await.unwrap().body, vec![2]);
625 assert_eq!(store.get_404("GET:/notfound3").await.unwrap().body, vec![3]);
626 }
627
628 #[tokio::test]
629 async fn test_clear_by_pattern_removes_404_entries() {
630 let trigger = RefreshTrigger::new();
631 let store = CacheStore::new(trigger, 10);
632
633 let resp = CachedResponse {
634 body: vec![1],
635 headers: HashMap::new(),
636 status: 404,
637 content_encoding: None,
638 };
639 store
640 .set_404("GET:/api/notfound".to_string(), resp.clone())
641 .await;
642 store
643 .set_404("GET:/api/another".to_string(), resp.clone())
644 .await;
645 assert_eq!(store.size_404().await, 2);
646
647 store.clear_by_pattern("GET:/api/*").await;
648 assert_eq!(store.size_404().await, 0);
649 }
650
651 #[tokio::test]
652 async fn test_filesystem_cache_round_trip() {
653 let cache_dir = unique_test_directory("round-trip");
654 let trigger = RefreshTrigger::new();
655 let store =
656 CacheStore::with_storage(trigger, 10, CacheStorageMode::Filesystem, Some(cache_dir));
657
658 let response = CachedResponse {
659 body: vec![1, 2, 3, 4],
660 headers: HashMap::from([("content-type".to_string(), "text/plain".to_string())]),
661 status: 200,
662 content_encoding: None,
663 };
664
665 store
666 .set("GET:/asset.js".to_string(), response.clone())
667 .await;
668
669 let stored_path = {
670 let store_guard = store.store.read().await;
671 match &store_guard.get("GET:/asset.js").unwrap().body {
672 StoredBody::File(path) => path.clone(),
673 StoredBody::Memory(_) => panic!("expected filesystem-backed cache body"),
674 }
675 };
676
677 assert!(tokio::fs::metadata(&stored_path).await.is_ok());
678
679 let cached = store.get("GET:/asset.js").await.unwrap();
680 assert_eq!(cached.body, response.body);
681
682 store.clear().await;
683 assert!(tokio::fs::metadata(&stored_path).await.is_err());
684 }
685
686 #[tokio::test]
687 async fn test_filesystem_404_eviction_removes_body_file() {
688 let cache_dir = unique_test_directory("eviction");
689 let trigger = RefreshTrigger::new();
690 let store =
691 CacheStore::with_storage(trigger, 2, CacheStorageMode::Filesystem, Some(cache_dir));
692
693 for index in 1..=2 {
694 store
695 .set_404(
696 format!("GET:/missing{}", index),
697 CachedResponse {
698 body: vec![index as u8],
699 headers: HashMap::new(),
700 status: 404,
701 content_encoding: None,
702 },
703 )
704 .await;
705 }
706
707 let evicted_path = {
708 let store_guard = store.store_404.read().await;
709 match &store_guard.get("GET:/missing1").unwrap().body {
710 StoredBody::File(path) => path.clone(),
711 StoredBody::Memory(_) => panic!("expected filesystem-backed cache body"),
712 }
713 };
714
715 store
716 .set_404(
717 "GET:/missing3".to_string(),
718 CachedResponse {
719 body: vec![3],
720 headers: HashMap::new(),
721 status: 404,
722 content_encoding: None,
723 },
724 )
725 .await;
726
727 assert!(store.get_404("GET:/missing1").await.is_none());
728 assert!(tokio::fs::metadata(&evicted_path).await.is_err());
729 }
730
731 #[tokio::test]
732 async fn test_filesystem_clear_by_pattern_removes_matching_files() {
733 let cache_dir = unique_test_directory("pattern-clear");
734 let trigger = RefreshTrigger::new();
735 let store =
736 CacheStore::with_storage(trigger, 10, CacheStorageMode::Filesystem, Some(cache_dir));
737
738 store
739 .set(
740 "GET:/api/one".to_string(),
741 CachedResponse {
742 body: vec![1],
743 headers: HashMap::new(),
744 status: 200,
745 content_encoding: None,
746 },
747 )
748 .await;
749 store
750 .set(
751 "GET:/other/two".to_string(),
752 CachedResponse {
753 body: vec![2],
754 headers: HashMap::new(),
755 status: 200,
756 content_encoding: None,
757 },
758 )
759 .await;
760
761 let (removed_path, kept_path) = {
762 let store_guard = store.store.read().await;
763 let removed = match &store_guard.get("GET:/api/one").unwrap().body {
764 StoredBody::File(path) => path.clone(),
765 StoredBody::Memory(_) => panic!("expected filesystem-backed cache body"),
766 };
767 let kept = match &store_guard.get("GET:/other/two").unwrap().body {
768 StoredBody::File(path) => path.clone(),
769 StoredBody::Memory(_) => panic!("expected filesystem-backed cache body"),
770 };
771 (removed, kept)
772 };
773
774 store.clear_by_pattern("GET:/api/*").await;
775
776 assert!(store.get("GET:/api/one").await.is_none());
777 assert!(store.get("GET:/other/two").await.is_some());
778 assert!(tokio::fs::metadata(&removed_path).await.is_err());
779 assert!(tokio::fs::metadata(&kept_path).await.is_ok());
780
781 store.clear().await;
782 }
783
784 #[test]
785 fn test_filesystem_startup_cleanup_removes_orphaned_cache_files() {
786 let cache_dir = unique_test_directory("startup-cleanup");
787 let standard_dir = cache_dir.join(CacheBucket::Standard.directory_name());
788 let not_found_dir = cache_dir.join(CacheBucket::NotFound.directory_name());
789 let unrelated_file = cache_dir.join("keep.txt");
790
791 std::fs::create_dir_all(&standard_dir).unwrap();
792 std::fs::create_dir_all(¬_found_dir).unwrap();
793 std::fs::write(standard_dir.join("stale.bin"), b"stale").unwrap();
794 std::fs::write(standard_dir.join("stale.tmp"), b"stale tmp").unwrap();
795 std::fs::write(not_found_dir.join("stale.bin"), b"stale 404").unwrap();
796 std::fs::write(&unrelated_file, b"keep me").unwrap();
797
798 let trigger = RefreshTrigger::new();
799 let _store = CacheStore::with_storage(
800 trigger,
801 10,
802 CacheStorageMode::Filesystem,
803 Some(cache_dir.clone()),
804 );
805
806 let standard_entries = std::fs::read_dir(&standard_dir)
807 .unwrap()
808 .collect::<Result<Vec<_>, _>>()
809 .unwrap();
810 let not_found_entries = std::fs::read_dir(¬_found_dir)
811 .unwrap()
812 .collect::<Result<Vec<_>, _>>()
813 .unwrap();
814
815 assert!(standard_entries.is_empty());
816 assert!(not_found_entries.is_empty());
817 assert_eq!(std::fs::read(&unrelated_file).unwrap(), b"keep me");
818
819 std::fs::remove_dir_all(&cache_dir).unwrap();
820 }
821}