greentic_flow/cache/
memory.rs1use std::collections::{HashMap, VecDeque};
2use std::sync::{Arc, Mutex};
3
4use wasmtime::component::Component;
5
6use crate::cache::keys::ArtifactKey;
7
8#[derive(Clone, Debug)]
9pub struct MemoryCache {
10 max_bytes: u64,
11 lfu_protect_hits: u64,
12 state: Arc<Mutex<MemoryState>>,
13}
14
15#[derive(Debug, Default)]
16struct MemoryState {
17 entries: HashMap<ArtifactKey, CacheEntry>,
18 lru: VecDeque<ArtifactKey>,
19 total_bytes: u64,
20 hits: u64,
21 misses: u64,
22 evictions: u64,
23}
24
25struct CacheEntry {
26 component: Arc<Component>,
27 bytes_estimate: u64,
28 hit_count: u64,
29 pinned: bool,
30}
31
32impl std::fmt::Debug for CacheEntry {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 f.debug_struct("CacheEntry")
35 .field("bytes_estimate", &self.bytes_estimate)
36 .field("hit_count", &self.hit_count)
37 .field("pinned", &self.pinned)
38 .finish()
39 }
40}
41
42#[derive(Clone, Debug, Default)]
43pub struct MemoryStats {
44 pub hits: u64,
45 pub misses: u64,
46 pub evictions: u64,
47 pub entries: u64,
48 pub total_bytes: u64,
49}
50
51impl MemoryCache {
52 pub fn new(max_bytes: u64, lfu_protect_hits: u64) -> Self {
53 Self {
54 max_bytes,
55 lfu_protect_hits,
56 state: Arc::new(Mutex::new(MemoryState::default())),
57 }
58 }
59
60 pub fn get(&self, key: &ArtifactKey) -> Option<Arc<Component>> {
61 let mut state = self.state.lock().ok()?;
62 if state.entries.contains_key(key) {
63 state.hits = state.hits.saturating_add(1);
64 let component = {
65 let entry = state.entries.get_mut(key)?;
66 entry.hit_count = entry.hit_count.saturating_add(1);
67 Arc::clone(&entry.component)
68 };
69 touch_lru(&mut state.lru, key);
70 return Some(component);
71 }
72 state.misses = state.misses.saturating_add(1);
73 None
74 }
75
76 pub fn insert(
77 &self,
78 key: ArtifactKey,
79 value: Arc<Component>,
80 bytes_estimate: usize,
81 pinned: bool,
82 ) {
83 let mut state = match self.state.lock() {
84 Ok(state) => state,
85 Err(_) => return,
86 };
87 let bytes_estimate = bytes_estimate as u64;
88 if let Some(existing) = state.entries.remove(&key) {
89 state.total_bytes = state.total_bytes.saturating_sub(existing.bytes_estimate);
90 remove_lru(&mut state.lru, &key);
91 }
92 state.entries.insert(
93 key.clone(),
94 CacheEntry {
95 component: value,
96 bytes_estimate,
97 hit_count: 0,
98 pinned,
99 },
100 );
101 state.total_bytes = state.total_bytes.saturating_add(bytes_estimate);
102 state.lru.push_front(key.clone());
103 self.evict_if_needed(&mut state);
104 }
105
106 pub fn stats(&self) -> MemoryStats {
107 let state = match self.state.lock() {
108 Ok(state) => state,
109 Err(_) => return MemoryStats::default(),
110 };
111 MemoryStats {
112 hits: state.hits,
113 misses: state.misses,
114 evictions: state.evictions,
115 entries: state.entries.len() as u64,
116 total_bytes: state.total_bytes,
117 }
118 }
119
120 fn evict_if_needed(&self, state: &mut MemoryState) {
121 if self.max_bytes == 0 {
122 return;
123 }
124 let mut attempts = state.lru.len();
125 while state.total_bytes > self.max_bytes && attempts > 0 {
126 attempts -= 1;
127 let Some(candidate) = state.lru.pop_back() else {
128 break;
129 };
130 if should_skip_candidate(state, &candidate, true, true, self.lfu_protect_hits) {
131 state.lru.push_front(candidate);
132 continue;
133 }
134 if let Some(entry) = state.entries.remove(&candidate) {
135 state.total_bytes = state.total_bytes.saturating_sub(entry.bytes_estimate);
136 state.evictions = state.evictions.saturating_add(1);
137 }
138 }
139 if state.total_bytes <= self.max_bytes {
140 return;
141 }
142 let mut attempts = state.lru.len();
143 while state.total_bytes > self.max_bytes && attempts > 0 {
144 attempts -= 1;
145 let Some(candidate) = state.lru.pop_back() else {
146 break;
147 };
148 if should_skip_candidate(state, &candidate, false, false, self.lfu_protect_hits) {
149 state.lru.push_front(candidate);
150 continue;
151 }
152 if let Some(entry) = state.entries.remove(&candidate) {
153 state.total_bytes = state.total_bytes.saturating_sub(entry.bytes_estimate);
154 state.evictions = state.evictions.saturating_add(1);
155 }
156 }
157 }
158}
159
160fn should_skip_candidate(
161 state: &MemoryState,
162 key: &ArtifactKey,
163 protect_pinned: bool,
164 protect_lfu: bool,
165 lfu_threshold: u64,
166) -> bool {
167 let Some(entry) = state.entries.get(key) else {
168 return false;
169 };
170 if protect_pinned && entry.pinned {
171 return true;
172 }
173 if protect_lfu && lfu_threshold > 0 && entry.hit_count >= lfu_threshold {
174 return true;
175 }
176 false
177}
178
179fn touch_lru(lru: &mut VecDeque<ArtifactKey>, key: &ArtifactKey) {
180 if let Some(pos) = lru.iter().position(|item| item == key) {
181 lru.remove(pos);
182 lru.push_front(key.clone());
183 }
184}
185
186fn remove_lru(lru: &mut VecDeque<ArtifactKey>, key: &ArtifactKey) {
187 if let Some(pos) = lru.iter().position(|item| item == key) {
188 lru.remove(pos);
189 }
190}