sync_engine/eviction/
redis.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Proactive Redis eviction manager.
5//!
6//! Provides intelligent eviction before Redis LRU kicks in, protecting
7//! infrastructure keys (merkle trees, indexes) while evicting data keys
8//! based on access patterns and memory pressure.
9//!
10//! Uses the same tan-curve scoring algorithm as L1 memory eviction.
11//!
12//! # Strategy
13//!
14//! ```text
15//! ┌─────────────────────────────────────────────────────────────┐
16//! │  Layer 1: Proactive Tan-Curve (sync-engine managed)        │
17//! │  └─ Score = f(last_access, access_count, size, pressure)   │
18//! │  └─ Protected: merkle:*, idx:*, RediSearch indexes         │
19//! │  └─ Evict lowest-scoring data keys first                   │
20//! ├─────────────────────────────────────────────────────────────┤
21//! │  Layer 2: Redis LRU (emergency fallback)                   │
22//! │  └─ Only kicks in if proactive eviction miscalculates      │
23//! │  └─ maxmemory-policy: allkeys-lru                          │
24//! └─────────────────────────────────────────────────────────────┘
25//! ```
26
27use std::collections::HashMap;
28use std::sync::Arc;
29use std::time::{Duration, Instant};
30use parking_lot::RwLock;
31use tracing::{debug, info};
32
33use super::tan_curve::{CacheEntry, TanCurvePolicy};
34
35/// Cached Redis memory profile to avoid round-trips.
36#[derive(Debug, Clone)]
37pub struct RedisMemoryProfile {
38    /// Used memory in bytes (from INFO MEMORY)
39    pub used_bytes: u64,
40    /// Max memory in bytes (from CONFIG GET maxmemory)
41    pub max_bytes: u64,
42    /// When we last refreshed from Redis
43    pub last_updated: Instant,
44    /// Estimated bytes written since last refresh
45    pub pending_writes_estimate: u64,
46}
47
48impl Default for RedisMemoryProfile {
49    fn default() -> Self {
50        Self {
51            used_bytes: 0,
52            max_bytes: 100 * 1024 * 1024, // 100MB default
53            last_updated: Instant::now(),
54            pending_writes_estimate: 0,
55        }
56    }
57}
58
59impl RedisMemoryProfile {
60    /// Current memory pressure (0.0 to 1.0+)
61    pub fn pressure(&self) -> f64 {
62        if self.max_bytes == 0 {
63            return 0.0;
64        }
65        (self.used_bytes + self.pending_writes_estimate) as f64 / self.max_bytes as f64
66    }
67    
68    /// Whether we need to refresh from Redis
69    pub fn needs_refresh(&self, max_staleness: Duration, max_drift_bytes: u64) -> bool {
70        self.last_updated.elapsed() > max_staleness
71            || self.pending_writes_estimate > max_drift_bytes
72    }
73    
74    /// Add pending writes estimate (call after batch write)
75    pub fn add_pending_writes(&mut self, bytes: u64) {
76        self.pending_writes_estimate += bytes;
77    }
78    
79    /// Refresh from Redis INFO output
80    pub fn refresh_from_info(&mut self, used_bytes: u64, max_bytes: u64) {
81        self.used_bytes = used_bytes;
82        self.max_bytes = max_bytes;
83        self.pending_writes_estimate = 0;
84        self.last_updated = Instant::now();
85    }
86}
87
88/// Metadata for a tracked Redis key (for eviction scoring).
89#[derive(Debug, Clone)]
90pub struct RedisKeyMeta {
91    /// When the key was last accessed
92    pub last_access: Instant,
93    /// Number of times accessed
94    pub access_count: u64,
95    /// Approximate size in bytes
96    pub size_bytes: usize,
97    /// Whether this key is protected from eviction
98    pub protected: bool,
99}
100
101impl RedisKeyMeta {
102    pub fn new(size_bytes: usize, protected: bool) -> Self {
103        Self {
104            last_access: Instant::now(),
105            access_count: 1,
106            size_bytes,
107            protected,
108        }
109    }
110    
111    pub fn touch(&mut self) {
112        self.last_access = Instant::now();
113        self.access_count += 1;
114    }
115    
116    /// Convert to CacheEntry for scoring with TanCurvePolicy
117    pub fn to_cache_entry(&self, id: String) -> CacheEntry {
118        CacheEntry {
119            id,
120            size_bytes: self.size_bytes,
121            created_at: self.last_access, // Approximate
122            last_access: self.last_access,
123            access_count: self.access_count,
124            is_dirty: self.protected, // Protected keys act like "dirty" (don't evict)
125        }
126    }
127}
128
129/// Configuration for Redis eviction.
130#[derive(Debug, Clone)]
131pub struct RedisEvictionConfig {
132    /// Pressure threshold to start proactive eviction (0.0-1.0)
133    pub eviction_start_pressure: f64,
134    /// Target pressure after eviction (0.0-1.0)
135    pub eviction_target_pressure: f64,
136    /// Max staleness before refreshing memory profile
137    pub max_profile_staleness: Duration,
138    /// Max drift in bytes before refreshing memory profile
139    pub max_profile_drift_bytes: u64,
140    /// Batch size for eviction (keys to delete per round)
141    pub eviction_batch_size: usize,
142    /// Key prefixes to protect from eviction
143    pub protected_prefixes: Vec<String>,
144}
145
146impl Default for RedisEvictionConfig {
147    fn default() -> Self {
148        Self {
149            eviction_start_pressure: 0.75,  // Start evicting at 75%
150            eviction_target_pressure: 0.60, // Evict down to 60%
151            max_profile_staleness: Duration::from_secs(5),
152            max_profile_drift_bytes: 1024 * 1024, // 1MB
153            eviction_batch_size: 100,
154            protected_prefixes: vec![
155                "merkle:".to_string(),
156                "idx:".to_string(),
157            ],
158        }
159    }
160}
161
162/// Proactive Redis eviction manager.
163/// 
164/// Reuses `TanCurvePolicy` from the memory eviction module for consistent
165/// scoring across L1 (memory) and L2 (Redis) caches.
166pub struct RedisEvictionManager {
167    config: RedisEvictionConfig,
168    policy: TanCurvePolicy,
169    memory_profile: Arc<RwLock<RedisMemoryProfile>>,
170    key_metadata: Arc<RwLock<HashMap<String, RedisKeyMeta>>>,
171    prefix: Option<String>,
172}
173
174impl RedisEvictionManager {
175    /// Create a new eviction manager.
176    pub fn new(config: RedisEvictionConfig, prefix: Option<String>) -> Self {
177        Self {
178            config,
179            policy: TanCurvePolicy::default(),
180            memory_profile: Arc::new(RwLock::new(RedisMemoryProfile::default())),
181            key_metadata: Arc::new(RwLock::new(HashMap::new())),
182            prefix,
183        }
184    }
185    
186    /// Get current memory pressure (0.0 to 1.0+).
187    pub fn pressure(&self) -> f64 {
188        self.memory_profile.read().pressure()
189    }
190    
191    /// Check if we need to refresh the memory profile.
192    pub fn needs_profile_refresh(&self) -> bool {
193        let profile = self.memory_profile.read();
194        profile.needs_refresh(
195            self.config.max_profile_staleness,
196            self.config.max_profile_drift_bytes,
197        )
198    }
199    
200    /// Update memory profile from Redis INFO output.
201    pub fn refresh_profile(&self, used_bytes: u64, max_bytes: u64) {
202        let mut profile = self.memory_profile.write();
203        profile.refresh_from_info(used_bytes, max_bytes);
204        debug!(
205            used_mb = used_bytes / 1024 / 1024,
206            max_mb = max_bytes / 1024 / 1024,
207            pressure = format!("{:.1}%", profile.pressure() * 100.0),
208            "Redis memory profile refreshed"
209        );
210    }
211    
212    /// Record a batch write (updates pending estimate).
213    pub fn record_batch_write(&self, bytes: u64) {
214        self.memory_profile.write().add_pending_writes(bytes);
215    }
216    
217    /// Record a key write (for eviction scoring).
218    pub fn record_key_write(&self, key: &str, size_bytes: usize) {
219        let protected = self.is_protected(key);
220        let mut metadata = self.key_metadata.write();
221        metadata.insert(key.to_string(), RedisKeyMeta::new(size_bytes, protected));
222    }
223    
224    /// Record a key access (touch for LRU scoring).
225    pub fn record_key_access(&self, key: &str) {
226        let mut metadata = self.key_metadata.write();
227        if let Some(meta) = metadata.get_mut(key) {
228            meta.touch();
229        }
230    }
231    
232    /// Remove key from tracking (after eviction or deletion).
233    pub fn remove_key(&self, key: &str) {
234        self.key_metadata.write().remove(key);
235    }
236    
237    /// Check if eviction is needed.
238    pub fn needs_eviction(&self) -> bool {
239        self.pressure() >= self.config.eviction_start_pressure
240    }
241    
242    /// Get keys to evict, sorted by eviction score (lowest first).
243    /// Returns up to `eviction_batch_size` keys.
244    pub fn get_eviction_candidates(&self) -> Vec<String> {
245        let pressure = self.pressure();
246        if pressure < self.config.eviction_start_pressure {
247            return vec![];
248        }
249        
250        let metadata = self.key_metadata.read();
251        
252        // Convert to CacheEntry for scoring with TanCurvePolicy
253        let entries: Vec<CacheEntry> = metadata
254            .iter()
255            .map(|(key, meta)| meta.to_cache_entry(key.clone()))
256            .collect();
257        
258        if entries.is_empty() {
259            return vec![];
260        }
261        
262        // Use TanCurvePolicy for consistent scoring
263        let victims = self.policy.select_victims(
264            &entries, 
265            self.config.eviction_batch_size, 
266            pressure
267        );
268        
269        if !victims.is_empty() {
270            info!(
271                candidates = victims.len(),
272                pressure = format!("{:.1}%", pressure * 100.0),
273                "Selected Redis eviction candidates"
274            );
275        }
276        
277        victims
278    }
279    
280    /// Check if a key is protected from eviction.
281    fn is_protected(&self, key: &str) -> bool {
282        // Strip prefix if present
283        let key_without_prefix = if let Some(ref prefix) = self.prefix {
284            key.strip_prefix(prefix).unwrap_or(key)
285        } else {
286            key
287        };
288        
289        // Check against protected prefixes
290        self.config.protected_prefixes.iter().any(|p| key_without_prefix.starts_with(p))
291    }
292    
293    /// Get statistics for monitoring.
294    pub fn stats(&self) -> RedisEvictionStats {
295        let profile = self.memory_profile.read();
296        let metadata = self.key_metadata.read();
297        
298        let protected_count = metadata.values().filter(|m| m.protected).count();
299        let data_count = metadata.len() - protected_count;
300        
301        RedisEvictionStats {
302            used_bytes: profile.used_bytes,
303            max_bytes: profile.max_bytes,
304            pending_writes_estimate: profile.pending_writes_estimate,
305            pressure: profile.pressure(),
306            tracked_keys: metadata.len(),
307            protected_keys: protected_count,
308            data_keys: data_count,
309            profile_age_secs: profile.last_updated.elapsed().as_secs_f64(),
310        }
311    }
312}
313
314/// Statistics for monitoring Redis eviction state.
315#[derive(Debug, Clone)]
316pub struct RedisEvictionStats {
317    pub used_bytes: u64,
318    pub max_bytes: u64,
319    pub pending_writes_estimate: u64,
320    pub pressure: f64,
321    pub tracked_keys: usize,
322    pub protected_keys: usize,
323    pub data_keys: usize,
324    pub profile_age_secs: f64,
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    
331    #[test]
332    fn test_memory_profile_pressure() {
333        let mut profile = RedisMemoryProfile {
334            used_bytes: 50 * 1024 * 1024,  // 50MB
335            max_bytes: 100 * 1024 * 1024,  // 100MB
336            ..Default::default()
337        };
338        
339        assert!((profile.pressure() - 0.5).abs() < 0.01);
340        
341        profile.add_pending_writes(25 * 1024 * 1024); // +25MB
342        assert!((profile.pressure() - 0.75).abs() < 0.01);
343    }
344    
345    #[test]
346    fn test_profile_needs_refresh() {
347        let mut profile = RedisMemoryProfile::default();
348        
349        // Fresh profile doesn't need refresh
350        assert!(!profile.needs_refresh(Duration::from_secs(5), 1024 * 1024));
351        
352        // After drift, needs refresh
353        profile.add_pending_writes(2 * 1024 * 1024);
354        assert!(profile.needs_refresh(Duration::from_secs(5), 1024 * 1024));
355    }
356    
357    #[test]
358    fn test_protected_keys() {
359        let config = RedisEvictionConfig::default();
360        let manager = RedisEvictionManager::new(config, Some("sync:".to_string()));
361        
362        // Merkle keys are protected (with global prefix stripped first)
363        assert!(manager.is_protected("sync:merkle:hash:user.alice")); // → "merkle:hash:user.alice"
364        assert!(manager.is_protected("sync:merkle:children:user"));   // → "merkle:children:user"
365        
366        // Index keys are protected  
367        assert!(manager.is_protected("sync:idx:users"));              // → "idx:users"
368        
369        // Data keys are NOT protected (they're the eviction candidates!)
370        assert!(!manager.is_protected("sync:user.alice"));            // → "user.alice"
371        assert!(!manager.is_protected("sync:config.app"));            // → "config.app"
372        
373        // Keys without global prefix still work (defensive)
374        assert!(manager.is_protected("merkle:hash:test"));            // → "merkle:hash:test"
375    }
376    
377    #[test]
378    fn test_no_eviction_under_threshold() {
379        let config = RedisEvictionConfig::default();
380        let manager = RedisEvictionManager::new(config, None);
381        
382        // Low pressure
383        manager.refresh_profile(30 * 1024 * 1024, 100 * 1024 * 1024); // 30%
384        manager.record_key_write("data:key", 10_000);
385        
386        let candidates = manager.get_eviction_candidates();
387        assert!(candidates.is_empty(), "Should not evict under threshold");
388    }
389    
390    #[test]
391    fn test_eviction_uses_tan_curve_policy() {
392        let config = RedisEvictionConfig {
393            eviction_start_pressure: 0.5,
394            eviction_batch_size: 10,
395            ..Default::default()
396        };
397        let manager = RedisEvictionManager::new(config, None);
398        
399        // Set up high pressure
400        manager.refresh_profile(80 * 1024 * 1024, 100 * 1024 * 1024); // 80%
401        
402        // Add data keys (not protected)
403        manager.record_key_write("data:old", 10_000);
404        manager.record_key_write("data:new", 10_000);
405        
406        // Make "old" key old by manipulating metadata
407        {
408            let mut meta = manager.key_metadata.write();
409            if let Some(m) = meta.get_mut("data:old") {
410                m.last_access = Instant::now() - Duration::from_secs(3600);
411            }
412        }
413        
414        // Get candidates - should prefer older key
415        let candidates = manager.get_eviction_candidates();
416        
417        // Both should be candidates (not protected)
418        assert!(!candidates.is_empty());
419        // Older key should be first
420        if candidates.len() >= 2 {
421            assert_eq!(candidates[0], "data:old");
422        }
423    }
424}