sync_engine/eviction/
redis.rs

1//! Proactive Redis eviction manager.
2//!
3//! Provides intelligent eviction before Redis LRU kicks in, protecting
4//! infrastructure keys (merkle trees, indexes) while evicting data keys
5//! based on access patterns and memory pressure.
6//!
7//! Uses the same tan-curve scoring algorithm as L1 memory eviction.
8//!
9//! # Strategy
10//!
11//! ```text
12//! ┌─────────────────────────────────────────────────────────────┐
13//! │  Layer 1: Proactive Tan-Curve (sync-engine managed)        │
14//! │  └─ Score = f(last_access, access_count, size, pressure)   │
15//! │  └─ Protected: merkle:*, idx:*, RediSearch indexes         │
16//! │  └─ Evict lowest-scoring data keys first                   │
17//! ├─────────────────────────────────────────────────────────────┤
18//! │  Layer 2: Redis LRU (emergency fallback)                   │
19//! │  └─ Only kicks in if proactive eviction miscalculates      │
20//! │  └─ maxmemory-policy: allkeys-lru                          │
21//! └─────────────────────────────────────────────────────────────┘
22//! ```
23
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use parking_lot::RwLock;
28use tracing::{debug, info};
29
30use super::tan_curve::{CacheEntry, TanCurvePolicy};
31
32/// Cached Redis memory profile to avoid round-trips.
33#[derive(Debug, Clone)]
34pub struct RedisMemoryProfile {
35    /// Used memory in bytes (from INFO MEMORY)
36    pub used_bytes: u64,
37    /// Max memory in bytes (from CONFIG GET maxmemory)
38    pub max_bytes: u64,
39    /// When we last refreshed from Redis
40    pub last_updated: Instant,
41    /// Estimated bytes written since last refresh
42    pub pending_writes_estimate: u64,
43}
44
45impl Default for RedisMemoryProfile {
46    fn default() -> Self {
47        Self {
48            used_bytes: 0,
49            max_bytes: 100 * 1024 * 1024, // 100MB default
50            last_updated: Instant::now(),
51            pending_writes_estimate: 0,
52        }
53    }
54}
55
56impl RedisMemoryProfile {
57    /// Current memory pressure (0.0 to 1.0+)
58    pub fn pressure(&self) -> f64 {
59        if self.max_bytes == 0 {
60            return 0.0;
61        }
62        (self.used_bytes + self.pending_writes_estimate) as f64 / self.max_bytes as f64
63    }
64    
65    /// Whether we need to refresh from Redis
66    pub fn needs_refresh(&self, max_staleness: Duration, max_drift_bytes: u64) -> bool {
67        self.last_updated.elapsed() > max_staleness
68            || self.pending_writes_estimate > max_drift_bytes
69    }
70    
71    /// Add pending writes estimate (call after batch write)
72    pub fn add_pending_writes(&mut self, bytes: u64) {
73        self.pending_writes_estimate += bytes;
74    }
75    
76    /// Refresh from Redis INFO output
77    pub fn refresh_from_info(&mut self, used_bytes: u64, max_bytes: u64) {
78        self.used_bytes = used_bytes;
79        self.max_bytes = max_bytes;
80        self.pending_writes_estimate = 0;
81        self.last_updated = Instant::now();
82    }
83}
84
85/// Metadata for a tracked Redis key (for eviction scoring).
86#[derive(Debug, Clone)]
87pub struct RedisKeyMeta {
88    /// When the key was last accessed
89    pub last_access: Instant,
90    /// Number of times accessed
91    pub access_count: u64,
92    /// Approximate size in bytes
93    pub size_bytes: usize,
94    /// Whether this key is protected from eviction
95    pub protected: bool,
96}
97
98impl RedisKeyMeta {
99    pub fn new(size_bytes: usize, protected: bool) -> Self {
100        Self {
101            last_access: Instant::now(),
102            access_count: 1,
103            size_bytes,
104            protected,
105        }
106    }
107    
108    pub fn touch(&mut self) {
109        self.last_access = Instant::now();
110        self.access_count += 1;
111    }
112    
113    /// Convert to CacheEntry for scoring with TanCurvePolicy
114    pub fn to_cache_entry(&self, id: String) -> CacheEntry {
115        CacheEntry {
116            id,
117            size_bytes: self.size_bytes,
118            created_at: self.last_access, // Approximate
119            last_access: self.last_access,
120            access_count: self.access_count,
121            is_dirty: self.protected, // Protected keys act like "dirty" (don't evict)
122        }
123    }
124}
125
126/// Configuration for Redis eviction.
127#[derive(Debug, Clone)]
128pub struct RedisEvictionConfig {
129    /// Pressure threshold to start proactive eviction (0.0-1.0)
130    pub eviction_start_pressure: f64,
131    /// Target pressure after eviction (0.0-1.0)
132    pub eviction_target_pressure: f64,
133    /// Max staleness before refreshing memory profile
134    pub max_profile_staleness: Duration,
135    /// Max drift in bytes before refreshing memory profile
136    pub max_profile_drift_bytes: u64,
137    /// Batch size for eviction (keys to delete per round)
138    pub eviction_batch_size: usize,
139    /// Key prefixes to protect from eviction
140    pub protected_prefixes: Vec<String>,
141}
142
143impl Default for RedisEvictionConfig {
144    fn default() -> Self {
145        Self {
146            eviction_start_pressure: 0.75,  // Start evicting at 75%
147            eviction_target_pressure: 0.60, // Evict down to 60%
148            max_profile_staleness: Duration::from_secs(5),
149            max_profile_drift_bytes: 1024 * 1024, // 1MB
150            eviction_batch_size: 100,
151            protected_prefixes: vec![
152                "merkle:".to_string(),
153                "idx:".to_string(),
154            ],
155        }
156    }
157}
158
159/// Proactive Redis eviction manager.
160/// 
161/// Reuses `TanCurvePolicy` from the memory eviction module for consistent
162/// scoring across L1 (memory) and L2 (Redis) caches.
163pub struct RedisEvictionManager {
164    config: RedisEvictionConfig,
165    policy: TanCurvePolicy,
166    memory_profile: Arc<RwLock<RedisMemoryProfile>>,
167    key_metadata: Arc<RwLock<HashMap<String, RedisKeyMeta>>>,
168    prefix: Option<String>,
169}
170
171impl RedisEvictionManager {
172    /// Create a new eviction manager.
173    pub fn new(config: RedisEvictionConfig, prefix: Option<String>) -> Self {
174        Self {
175            config,
176            policy: TanCurvePolicy::default(),
177            memory_profile: Arc::new(RwLock::new(RedisMemoryProfile::default())),
178            key_metadata: Arc::new(RwLock::new(HashMap::new())),
179            prefix,
180        }
181    }
182    
183    /// Get current memory pressure (0.0 to 1.0+).
184    pub fn pressure(&self) -> f64 {
185        self.memory_profile.read().pressure()
186    }
187    
188    /// Check if we need to refresh the memory profile.
189    pub fn needs_profile_refresh(&self) -> bool {
190        let profile = self.memory_profile.read();
191        profile.needs_refresh(
192            self.config.max_profile_staleness,
193            self.config.max_profile_drift_bytes,
194        )
195    }
196    
197    /// Update memory profile from Redis INFO output.
198    pub fn refresh_profile(&self, used_bytes: u64, max_bytes: u64) {
199        let mut profile = self.memory_profile.write();
200        profile.refresh_from_info(used_bytes, max_bytes);
201        debug!(
202            used_mb = used_bytes / 1024 / 1024,
203            max_mb = max_bytes / 1024 / 1024,
204            pressure = format!("{:.1}%", profile.pressure() * 100.0),
205            "Redis memory profile refreshed"
206        );
207    }
208    
209    /// Record a batch write (updates pending estimate).
210    pub fn record_batch_write(&self, bytes: u64) {
211        self.memory_profile.write().add_pending_writes(bytes);
212    }
213    
214    /// Record a key write (for eviction scoring).
215    pub fn record_key_write(&self, key: &str, size_bytes: usize) {
216        let protected = self.is_protected(key);
217        let mut metadata = self.key_metadata.write();
218        metadata.insert(key.to_string(), RedisKeyMeta::new(size_bytes, protected));
219    }
220    
221    /// Record a key access (touch for LRU scoring).
222    pub fn record_key_access(&self, key: &str) {
223        let mut metadata = self.key_metadata.write();
224        if let Some(meta) = metadata.get_mut(key) {
225            meta.touch();
226        }
227    }
228    
229    /// Remove key from tracking (after eviction or deletion).
230    pub fn remove_key(&self, key: &str) {
231        self.key_metadata.write().remove(key);
232    }
233    
234    /// Check if eviction is needed.
235    pub fn needs_eviction(&self) -> bool {
236        self.pressure() >= self.config.eviction_start_pressure
237    }
238    
239    /// Get keys to evict, sorted by eviction score (lowest first).
240    /// Returns up to `eviction_batch_size` keys.
241    pub fn get_eviction_candidates(&self) -> Vec<String> {
242        let pressure = self.pressure();
243        if pressure < self.config.eviction_start_pressure {
244            return vec![];
245        }
246        
247        let metadata = self.key_metadata.read();
248        
249        // Convert to CacheEntry for scoring with TanCurvePolicy
250        let entries: Vec<CacheEntry> = metadata
251            .iter()
252            .map(|(key, meta)| meta.to_cache_entry(key.clone()))
253            .collect();
254        
255        if entries.is_empty() {
256            return vec![];
257        }
258        
259        // Use TanCurvePolicy for consistent scoring
260        let victims = self.policy.select_victims(
261            &entries, 
262            self.config.eviction_batch_size, 
263            pressure
264        );
265        
266        if !victims.is_empty() {
267            info!(
268                candidates = victims.len(),
269                pressure = format!("{:.1}%", pressure * 100.0),
270                "Selected Redis eviction candidates"
271            );
272        }
273        
274        victims
275    }
276    
277    /// Check if a key is protected from eviction.
278    fn is_protected(&self, key: &str) -> bool {
279        // Strip prefix if present
280        let key_without_prefix = if let Some(ref prefix) = self.prefix {
281            key.strip_prefix(prefix).unwrap_or(key)
282        } else {
283            key
284        };
285        
286        // Check against protected prefixes
287        self.config.protected_prefixes.iter().any(|p| key_without_prefix.starts_with(p))
288    }
289    
290    /// Get statistics for monitoring.
291    pub fn stats(&self) -> RedisEvictionStats {
292        let profile = self.memory_profile.read();
293        let metadata = self.key_metadata.read();
294        
295        let protected_count = metadata.values().filter(|m| m.protected).count();
296        let data_count = metadata.len() - protected_count;
297        
298        RedisEvictionStats {
299            used_bytes: profile.used_bytes,
300            max_bytes: profile.max_bytes,
301            pending_writes_estimate: profile.pending_writes_estimate,
302            pressure: profile.pressure(),
303            tracked_keys: metadata.len(),
304            protected_keys: protected_count,
305            data_keys: data_count,
306            profile_age_secs: profile.last_updated.elapsed().as_secs_f64(),
307        }
308    }
309}
310
311/// Statistics for monitoring Redis eviction state.
312#[derive(Debug, Clone)]
313pub struct RedisEvictionStats {
314    pub used_bytes: u64,
315    pub max_bytes: u64,
316    pub pending_writes_estimate: u64,
317    pub pressure: f64,
318    pub tracked_keys: usize,
319    pub protected_keys: usize,
320    pub data_keys: usize,
321    pub profile_age_secs: f64,
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327    
328    #[test]
329    fn test_memory_profile_pressure() {
330        let mut profile = RedisMemoryProfile {
331            used_bytes: 50 * 1024 * 1024,  // 50MB
332            max_bytes: 100 * 1024 * 1024,  // 100MB
333            ..Default::default()
334        };
335        
336        assert!((profile.pressure() - 0.5).abs() < 0.01);
337        
338        profile.add_pending_writes(25 * 1024 * 1024); // +25MB
339        assert!((profile.pressure() - 0.75).abs() < 0.01);
340    }
341    
342    #[test]
343    fn test_profile_needs_refresh() {
344        let mut profile = RedisMemoryProfile::default();
345        
346        // Fresh profile doesn't need refresh
347        assert!(!profile.needs_refresh(Duration::from_secs(5), 1024 * 1024));
348        
349        // After drift, needs refresh
350        profile.add_pending_writes(2 * 1024 * 1024);
351        assert!(profile.needs_refresh(Duration::from_secs(5), 1024 * 1024));
352    }
353    
354    #[test]
355    fn test_protected_keys() {
356        let config = RedisEvictionConfig::default();
357        let manager = RedisEvictionManager::new(config, Some("sync:".to_string()));
358        
359        // Merkle keys are protected (with global prefix stripped first)
360        assert!(manager.is_protected("sync:merkle:hash:user.alice")); // → "merkle:hash:user.alice"
361        assert!(manager.is_protected("sync:merkle:children:user"));   // → "merkle:children:user"
362        
363        // Index keys are protected  
364        assert!(manager.is_protected("sync:idx:users"));              // → "idx:users"
365        
366        // Data keys are NOT protected (they're the eviction candidates!)
367        assert!(!manager.is_protected("sync:user.alice"));            // → "user.alice"
368        assert!(!manager.is_protected("sync:config.app"));            // → "config.app"
369        
370        // Keys without global prefix still work (defensive)
371        assert!(manager.is_protected("merkle:hash:test"));            // → "merkle:hash:test"
372    }
373    
374    #[test]
375    fn test_no_eviction_under_threshold() {
376        let config = RedisEvictionConfig::default();
377        let manager = RedisEvictionManager::new(config, None);
378        
379        // Low pressure
380        manager.refresh_profile(30 * 1024 * 1024, 100 * 1024 * 1024); // 30%
381        manager.record_key_write("data:key", 10_000);
382        
383        let candidates = manager.get_eviction_candidates();
384        assert!(candidates.is_empty(), "Should not evict under threshold");
385    }
386    
387    #[test]
388    fn test_eviction_uses_tan_curve_policy() {
389        let config = RedisEvictionConfig {
390            eviction_start_pressure: 0.5,
391            eviction_batch_size: 10,
392            ..Default::default()
393        };
394        let manager = RedisEvictionManager::new(config, None);
395        
396        // Set up high pressure
397        manager.refresh_profile(80 * 1024 * 1024, 100 * 1024 * 1024); // 80%
398        
399        // Add data keys (not protected)
400        manager.record_key_write("data:old", 10_000);
401        manager.record_key_write("data:new", 10_000);
402        
403        // Make "old" key old by manipulating metadata
404        {
405            let mut meta = manager.key_metadata.write();
406            if let Some(m) = meta.get_mut("data:old") {
407                m.last_access = Instant::now() - Duration::from_secs(3600);
408            }
409        }
410        
411        // Get candidates - should prefer older key
412        let candidates = manager.get_eviction_candidates();
413        
414        // Both should be candidates (not protected)
415        assert!(!candidates.is_empty());
416        // Older key should be first
417        if candidates.len() >= 2 {
418            assert_eq!(candidates[0], "data:old");
419        }
420    }
421}