1use std::collections::HashMap;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use parking_lot::RwLock;
28use tracing::{debug, info};
29
30use super::tan_curve::{CacheEntry, TanCurvePolicy};
31
32#[derive(Debug, Clone)]
34pub struct RedisMemoryProfile {
35 pub used_bytes: u64,
37 pub max_bytes: u64,
39 pub last_updated: Instant,
41 pub pending_writes_estimate: u64,
43}
44
45impl Default for RedisMemoryProfile {
46 fn default() -> Self {
47 Self {
48 used_bytes: 0,
49 max_bytes: 100 * 1024 * 1024, last_updated: Instant::now(),
51 pending_writes_estimate: 0,
52 }
53 }
54}
55
56impl RedisMemoryProfile {
57 pub fn pressure(&self) -> f64 {
59 if self.max_bytes == 0 {
60 return 0.0;
61 }
62 (self.used_bytes + self.pending_writes_estimate) as f64 / self.max_bytes as f64
63 }
64
65 pub fn needs_refresh(&self, max_staleness: Duration, max_drift_bytes: u64) -> bool {
67 self.last_updated.elapsed() > max_staleness
68 || self.pending_writes_estimate > max_drift_bytes
69 }
70
71 pub fn add_pending_writes(&mut self, bytes: u64) {
73 self.pending_writes_estimate += bytes;
74 }
75
76 pub fn refresh_from_info(&mut self, used_bytes: u64, max_bytes: u64) {
78 self.used_bytes = used_bytes;
79 self.max_bytes = max_bytes;
80 self.pending_writes_estimate = 0;
81 self.last_updated = Instant::now();
82 }
83}
84
85#[derive(Debug, Clone)]
87pub struct RedisKeyMeta {
88 pub last_access: Instant,
90 pub access_count: u64,
92 pub size_bytes: usize,
94 pub protected: bool,
96}
97
98impl RedisKeyMeta {
99 pub fn new(size_bytes: usize, protected: bool) -> Self {
100 Self {
101 last_access: Instant::now(),
102 access_count: 1,
103 size_bytes,
104 protected,
105 }
106 }
107
108 pub fn touch(&mut self) {
109 self.last_access = Instant::now();
110 self.access_count += 1;
111 }
112
113 pub fn to_cache_entry(&self, id: String) -> CacheEntry {
115 CacheEntry {
116 id,
117 size_bytes: self.size_bytes,
118 created_at: self.last_access, last_access: self.last_access,
120 access_count: self.access_count,
121 is_dirty: self.protected, }
123 }
124}
125
126#[derive(Debug, Clone)]
128pub struct RedisEvictionConfig {
129 pub eviction_start_pressure: f64,
131 pub eviction_target_pressure: f64,
133 pub max_profile_staleness: Duration,
135 pub max_profile_drift_bytes: u64,
137 pub eviction_batch_size: usize,
139 pub protected_prefixes: Vec<String>,
141}
142
143impl Default for RedisEvictionConfig {
144 fn default() -> Self {
145 Self {
146 eviction_start_pressure: 0.75, eviction_target_pressure: 0.60, max_profile_staleness: Duration::from_secs(5),
149 max_profile_drift_bytes: 1024 * 1024, eviction_batch_size: 100,
151 protected_prefixes: vec![
152 "merkle:".to_string(),
153 "idx:".to_string(),
154 ],
155 }
156 }
157}
158
159pub struct RedisEvictionManager {
164 config: RedisEvictionConfig,
165 policy: TanCurvePolicy,
166 memory_profile: Arc<RwLock<RedisMemoryProfile>>,
167 key_metadata: Arc<RwLock<HashMap<String, RedisKeyMeta>>>,
168 prefix: Option<String>,
169}
170
171impl RedisEvictionManager {
172 pub fn new(config: RedisEvictionConfig, prefix: Option<String>) -> Self {
174 Self {
175 config,
176 policy: TanCurvePolicy::default(),
177 memory_profile: Arc::new(RwLock::new(RedisMemoryProfile::default())),
178 key_metadata: Arc::new(RwLock::new(HashMap::new())),
179 prefix,
180 }
181 }
182
183 pub fn pressure(&self) -> f64 {
185 self.memory_profile.read().pressure()
186 }
187
188 pub fn needs_profile_refresh(&self) -> bool {
190 let profile = self.memory_profile.read();
191 profile.needs_refresh(
192 self.config.max_profile_staleness,
193 self.config.max_profile_drift_bytes,
194 )
195 }
196
197 pub fn refresh_profile(&self, used_bytes: u64, max_bytes: u64) {
199 let mut profile = self.memory_profile.write();
200 profile.refresh_from_info(used_bytes, max_bytes);
201 debug!(
202 used_mb = used_bytes / 1024 / 1024,
203 max_mb = max_bytes / 1024 / 1024,
204 pressure = format!("{:.1}%", profile.pressure() * 100.0),
205 "Redis memory profile refreshed"
206 );
207 }
208
209 pub fn record_batch_write(&self, bytes: u64) {
211 self.memory_profile.write().add_pending_writes(bytes);
212 }
213
214 pub fn record_key_write(&self, key: &str, size_bytes: usize) {
216 let protected = self.is_protected(key);
217 let mut metadata = self.key_metadata.write();
218 metadata.insert(key.to_string(), RedisKeyMeta::new(size_bytes, protected));
219 }
220
221 pub fn record_key_access(&self, key: &str) {
223 let mut metadata = self.key_metadata.write();
224 if let Some(meta) = metadata.get_mut(key) {
225 meta.touch();
226 }
227 }
228
229 pub fn remove_key(&self, key: &str) {
231 self.key_metadata.write().remove(key);
232 }
233
234 pub fn needs_eviction(&self) -> bool {
236 self.pressure() >= self.config.eviction_start_pressure
237 }
238
239 pub fn get_eviction_candidates(&self) -> Vec<String> {
242 let pressure = self.pressure();
243 if pressure < self.config.eviction_start_pressure {
244 return vec![];
245 }
246
247 let metadata = self.key_metadata.read();
248
249 let entries: Vec<CacheEntry> = metadata
251 .iter()
252 .map(|(key, meta)| meta.to_cache_entry(key.clone()))
253 .collect();
254
255 if entries.is_empty() {
256 return vec![];
257 }
258
259 let victims = self.policy.select_victims(
261 &entries,
262 self.config.eviction_batch_size,
263 pressure
264 );
265
266 if !victims.is_empty() {
267 info!(
268 candidates = victims.len(),
269 pressure = format!("{:.1}%", pressure * 100.0),
270 "Selected Redis eviction candidates"
271 );
272 }
273
274 victims
275 }
276
277 fn is_protected(&self, key: &str) -> bool {
279 let key_without_prefix = if let Some(ref prefix) = self.prefix {
281 key.strip_prefix(prefix).unwrap_or(key)
282 } else {
283 key
284 };
285
286 self.config.protected_prefixes.iter().any(|p| key_without_prefix.starts_with(p))
288 }
289
290 pub fn stats(&self) -> RedisEvictionStats {
292 let profile = self.memory_profile.read();
293 let metadata = self.key_metadata.read();
294
295 let protected_count = metadata.values().filter(|m| m.protected).count();
296 let data_count = metadata.len() - protected_count;
297
298 RedisEvictionStats {
299 used_bytes: profile.used_bytes,
300 max_bytes: profile.max_bytes,
301 pending_writes_estimate: profile.pending_writes_estimate,
302 pressure: profile.pressure(),
303 tracked_keys: metadata.len(),
304 protected_keys: protected_count,
305 data_keys: data_count,
306 profile_age_secs: profile.last_updated.elapsed().as_secs_f64(),
307 }
308 }
309}
310
311#[derive(Debug, Clone)]
313pub struct RedisEvictionStats {
314 pub used_bytes: u64,
315 pub max_bytes: u64,
316 pub pending_writes_estimate: u64,
317 pub pressure: f64,
318 pub tracked_keys: usize,
319 pub protected_keys: usize,
320 pub data_keys: usize,
321 pub profile_age_secs: f64,
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn test_memory_profile_pressure() {
330 let mut profile = RedisMemoryProfile {
331 used_bytes: 50 * 1024 * 1024, max_bytes: 100 * 1024 * 1024, ..Default::default()
334 };
335
336 assert!((profile.pressure() - 0.5).abs() < 0.01);
337
338 profile.add_pending_writes(25 * 1024 * 1024); assert!((profile.pressure() - 0.75).abs() < 0.01);
340 }
341
342 #[test]
343 fn test_profile_needs_refresh() {
344 let mut profile = RedisMemoryProfile::default();
345
346 assert!(!profile.needs_refresh(Duration::from_secs(5), 1024 * 1024));
348
349 profile.add_pending_writes(2 * 1024 * 1024);
351 assert!(profile.needs_refresh(Duration::from_secs(5), 1024 * 1024));
352 }
353
354 #[test]
355 fn test_protected_keys() {
356 let config = RedisEvictionConfig::default();
357 let manager = RedisEvictionManager::new(config, Some("sync:".to_string()));
358
359 assert!(manager.is_protected("sync:merkle:hash:user.alice")); assert!(manager.is_protected("sync:merkle:children:user")); assert!(manager.is_protected("sync:idx:users")); assert!(!manager.is_protected("sync:user.alice")); assert!(!manager.is_protected("sync:config.app")); assert!(manager.is_protected("merkle:hash:test")); }
373
374 #[test]
375 fn test_no_eviction_under_threshold() {
376 let config = RedisEvictionConfig::default();
377 let manager = RedisEvictionManager::new(config, None);
378
379 manager.refresh_profile(30 * 1024 * 1024, 100 * 1024 * 1024); manager.record_key_write("data:key", 10_000);
382
383 let candidates = manager.get_eviction_candidates();
384 assert!(candidates.is_empty(), "Should not evict under threshold");
385 }
386
387 #[test]
388 fn test_eviction_uses_tan_curve_policy() {
389 let config = RedisEvictionConfig {
390 eviction_start_pressure: 0.5,
391 eviction_batch_size: 10,
392 ..Default::default()
393 };
394 let manager = RedisEvictionManager::new(config, None);
395
396 manager.refresh_profile(80 * 1024 * 1024, 100 * 1024 * 1024); manager.record_key_write("data:old", 10_000);
401 manager.record_key_write("data:new", 10_000);
402
403 {
405 let mut meta = manager.key_metadata.write();
406 if let Some(m) = meta.get_mut("data:old") {
407 m.last_access = Instant::now() - Duration::from_secs(3600);
408 }
409 }
410
411 let candidates = manager.get_eviction_candidates();
413
414 assert!(!candidates.is_empty());
416 if candidates.len() >= 2 {
418 assert_eq!(candidates[0], "data:old");
419 }
420 }
421}