1use std::collections::HashMap;
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30use parking_lot::RwLock;
31use tracing::{debug, info};
32
33use super::tan_curve::{CacheEntry, TanCurvePolicy};
34
35#[derive(Debug, Clone)]
37pub struct RedisMemoryProfile {
38 pub used_bytes: u64,
40 pub max_bytes: u64,
42 pub last_updated: Instant,
44 pub pending_writes_estimate: u64,
46}
47
48impl Default for RedisMemoryProfile {
49 fn default() -> Self {
50 Self {
51 used_bytes: 0,
52 max_bytes: 100 * 1024 * 1024, last_updated: Instant::now(),
54 pending_writes_estimate: 0,
55 }
56 }
57}
58
59impl RedisMemoryProfile {
60 pub fn pressure(&self) -> f64 {
62 if self.max_bytes == 0 {
63 return 0.0;
64 }
65 (self.used_bytes + self.pending_writes_estimate) as f64 / self.max_bytes as f64
66 }
67
68 pub fn needs_refresh(&self, max_staleness: Duration, max_drift_bytes: u64) -> bool {
70 self.last_updated.elapsed() > max_staleness
71 || self.pending_writes_estimate > max_drift_bytes
72 }
73
74 pub fn add_pending_writes(&mut self, bytes: u64) {
76 self.pending_writes_estimate += bytes;
77 }
78
79 pub fn refresh_from_info(&mut self, used_bytes: u64, max_bytes: u64) {
81 self.used_bytes = used_bytes;
82 self.max_bytes = max_bytes;
83 self.pending_writes_estimate = 0;
84 self.last_updated = Instant::now();
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct RedisKeyMeta {
91 pub last_access: Instant,
93 pub access_count: u64,
95 pub size_bytes: usize,
97 pub protected: bool,
99}
100
101impl RedisKeyMeta {
102 pub fn new(size_bytes: usize, protected: bool) -> Self {
103 Self {
104 last_access: Instant::now(),
105 access_count: 1,
106 size_bytes,
107 protected,
108 }
109 }
110
111 pub fn touch(&mut self) {
112 self.last_access = Instant::now();
113 self.access_count += 1;
114 }
115
116 pub fn to_cache_entry(&self, id: String) -> CacheEntry {
118 CacheEntry {
119 id,
120 size_bytes: self.size_bytes,
121 created_at: self.last_access, last_access: self.last_access,
123 access_count: self.access_count,
124 is_dirty: self.protected, }
126 }
127}
128
129#[derive(Debug, Clone)]
131pub struct RedisEvictionConfig {
132 pub eviction_start_pressure: f64,
134 pub eviction_target_pressure: f64,
136 pub max_profile_staleness: Duration,
138 pub max_profile_drift_bytes: u64,
140 pub eviction_batch_size: usize,
142 pub protected_prefixes: Vec<String>,
144}
145
146impl Default for RedisEvictionConfig {
147 fn default() -> Self {
148 Self {
149 eviction_start_pressure: 0.75, eviction_target_pressure: 0.60, max_profile_staleness: Duration::from_secs(5),
152 max_profile_drift_bytes: 1024 * 1024, eviction_batch_size: 100,
154 protected_prefixes: vec![
155 "merkle:".to_string(),
156 "idx:".to_string(),
157 ],
158 }
159 }
160}
161
162pub struct RedisEvictionManager {
167 config: RedisEvictionConfig,
168 policy: TanCurvePolicy,
169 memory_profile: Arc<RwLock<RedisMemoryProfile>>,
170 key_metadata: Arc<RwLock<HashMap<String, RedisKeyMeta>>>,
171 prefix: Option<String>,
172}
173
174impl RedisEvictionManager {
175 pub fn new(config: RedisEvictionConfig, prefix: Option<String>) -> Self {
177 Self {
178 config,
179 policy: TanCurvePolicy::default(),
180 memory_profile: Arc::new(RwLock::new(RedisMemoryProfile::default())),
181 key_metadata: Arc::new(RwLock::new(HashMap::new())),
182 prefix,
183 }
184 }
185
186 pub fn pressure(&self) -> f64 {
188 self.memory_profile.read().pressure()
189 }
190
191 pub fn needs_profile_refresh(&self) -> bool {
193 let profile = self.memory_profile.read();
194 profile.needs_refresh(
195 self.config.max_profile_staleness,
196 self.config.max_profile_drift_bytes,
197 )
198 }
199
200 pub fn refresh_profile(&self, used_bytes: u64, max_bytes: u64) {
202 let mut profile = self.memory_profile.write();
203 profile.refresh_from_info(used_bytes, max_bytes);
204 debug!(
205 used_mb = used_bytes / 1024 / 1024,
206 max_mb = max_bytes / 1024 / 1024,
207 pressure = format!("{:.1}%", profile.pressure() * 100.0),
208 "Redis memory profile refreshed"
209 );
210 }
211
212 pub fn record_batch_write(&self, bytes: u64) {
214 self.memory_profile.write().add_pending_writes(bytes);
215 }
216
217 pub fn record_key_write(&self, key: &str, size_bytes: usize) {
219 let protected = self.is_protected(key);
220 let mut metadata = self.key_metadata.write();
221 metadata.insert(key.to_string(), RedisKeyMeta::new(size_bytes, protected));
222 }
223
224 pub fn record_key_access(&self, key: &str) {
226 let mut metadata = self.key_metadata.write();
227 if let Some(meta) = metadata.get_mut(key) {
228 meta.touch();
229 }
230 }
231
232 pub fn remove_key(&self, key: &str) {
234 self.key_metadata.write().remove(key);
235 }
236
237 pub fn needs_eviction(&self) -> bool {
239 self.pressure() >= self.config.eviction_start_pressure
240 }
241
242 pub fn get_eviction_candidates(&self) -> Vec<String> {
245 let pressure = self.pressure();
246 if pressure < self.config.eviction_start_pressure {
247 return vec![];
248 }
249
250 let metadata = self.key_metadata.read();
251
252 let entries: Vec<CacheEntry> = metadata
254 .iter()
255 .map(|(key, meta)| meta.to_cache_entry(key.clone()))
256 .collect();
257
258 if entries.is_empty() {
259 return vec![];
260 }
261
262 let victims = self.policy.select_victims(
264 &entries,
265 self.config.eviction_batch_size,
266 pressure
267 );
268
269 if !victims.is_empty() {
270 info!(
271 candidates = victims.len(),
272 pressure = format!("{:.1}%", pressure * 100.0),
273 "Selected Redis eviction candidates"
274 );
275 }
276
277 victims
278 }
279
280 fn is_protected(&self, key: &str) -> bool {
282 let key_without_prefix = if let Some(ref prefix) = self.prefix {
284 key.strip_prefix(prefix).unwrap_or(key)
285 } else {
286 key
287 };
288
289 self.config.protected_prefixes.iter().any(|p| key_without_prefix.starts_with(p))
291 }
292
293 pub fn stats(&self) -> RedisEvictionStats {
295 let profile = self.memory_profile.read();
296 let metadata = self.key_metadata.read();
297
298 let protected_count = metadata.values().filter(|m| m.protected).count();
299 let data_count = metadata.len() - protected_count;
300
301 RedisEvictionStats {
302 used_bytes: profile.used_bytes,
303 max_bytes: profile.max_bytes,
304 pending_writes_estimate: profile.pending_writes_estimate,
305 pressure: profile.pressure(),
306 tracked_keys: metadata.len(),
307 protected_keys: protected_count,
308 data_keys: data_count,
309 profile_age_secs: profile.last_updated.elapsed().as_secs_f64(),
310 }
311 }
312}
313
314#[derive(Debug, Clone)]
316pub struct RedisEvictionStats {
317 pub used_bytes: u64,
318 pub max_bytes: u64,
319 pub pending_writes_estimate: u64,
320 pub pressure: f64,
321 pub tracked_keys: usize,
322 pub protected_keys: usize,
323 pub data_keys: usize,
324 pub profile_age_secs: f64,
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 #[test]
332 fn test_memory_profile_pressure() {
333 let mut profile = RedisMemoryProfile {
334 used_bytes: 50 * 1024 * 1024, max_bytes: 100 * 1024 * 1024, ..Default::default()
337 };
338
339 assert!((profile.pressure() - 0.5).abs() < 0.01);
340
341 profile.add_pending_writes(25 * 1024 * 1024); assert!((profile.pressure() - 0.75).abs() < 0.01);
343 }
344
345 #[test]
346 fn test_profile_needs_refresh() {
347 let mut profile = RedisMemoryProfile::default();
348
349 assert!(!profile.needs_refresh(Duration::from_secs(5), 1024 * 1024));
351
352 profile.add_pending_writes(2 * 1024 * 1024);
354 assert!(profile.needs_refresh(Duration::from_secs(5), 1024 * 1024));
355 }
356
357 #[test]
358 fn test_protected_keys() {
359 let config = RedisEvictionConfig::default();
360 let manager = RedisEvictionManager::new(config, Some("sync:".to_string()));
361
362 assert!(manager.is_protected("sync:merkle:hash:user.alice")); assert!(manager.is_protected("sync:merkle:children:user")); assert!(manager.is_protected("sync:idx:users")); assert!(!manager.is_protected("sync:user.alice")); assert!(!manager.is_protected("sync:config.app")); assert!(manager.is_protected("merkle:hash:test")); }
376
377 #[test]
378 fn test_no_eviction_under_threshold() {
379 let config = RedisEvictionConfig::default();
380 let manager = RedisEvictionManager::new(config, None);
381
382 manager.refresh_profile(30 * 1024 * 1024, 100 * 1024 * 1024); manager.record_key_write("data:key", 10_000);
385
386 let candidates = manager.get_eviction_candidates();
387 assert!(candidates.is_empty(), "Should not evict under threshold");
388 }
389
390 #[test]
391 fn test_eviction_uses_tan_curve_policy() {
392 let config = RedisEvictionConfig {
393 eviction_start_pressure: 0.5,
394 eviction_batch_size: 10,
395 ..Default::default()
396 };
397 let manager = RedisEvictionManager::new(config, None);
398
399 manager.refresh_profile(80 * 1024 * 1024, 100 * 1024 * 1024); manager.record_key_write("data:old", 10_000);
404 manager.record_key_write("data:new", 10_000);
405
406 {
408 let mut meta = manager.key_metadata.write();
409 if let Some(m) = meta.get_mut("data:old") {
410 m.last_access = Instant::now() - Duration::from_secs(3600);
411 }
412 }
413
414 let candidates = manager.get_eviction_candidates();
416
417 assert!(!candidates.is_empty());
419 if candidates.len() >= 2 {
421 assert_eq!(candidates[0], "data:old");
422 }
423 }
424}