Skip to main content

reddb_server/storage/cache/
spill.rs

1//! Graph Spill Manager
2//!
3//! Manages memory limits for large graphs by spilling cold data to disk.
4//! Provides transparent access to spilled data by reloading on demand.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────┐
10//! │                   SpillManager                          │
11//! ├─────────────────────────────────────────────────────────┤
12//! │  ┌──────────┐   ┌──────────┐   ┌──────────────────┐    │
13//! │  │MemTracker│   │AccessLog │   │  SpillDirectory  │    │
14//! │  │(current) │   │(LRU/LFU) │   │  (temp files)    │    │
15//! │  └────┬─────┘   └────┬─────┘   └────────┬─────────┘    │
16//! │       │              │                   │              │
17//! │  ┌────▼──────────────▼───────────────────▼─────────┐   │
18//! │  │              Spill Policy Engine                 │   │
19//! │  │  - Threshold detection (80% memory limit)        │   │
20//! │  │  - Cold segment identification (LRU + freq)      │   │
21//! │  │  - Async spill/reload operations                 │   │
22//! │  └──────────────────────────────────────────────────┘   │
23//! └─────────────────────────────────────────────────────────┘
24//! ```
25//!
26//! # Example
27//!
28//! ```ignore
29//! use storage::cache::spill::{SpillManager, SpillConfig};
30//!
31//! let config = SpillConfig::new()
32//!     .max_memory(512 * 1024 * 1024)  // 512MB
33//!     .spill_threshold(0.8)            // 80%
34//!     .spill_dir("/tmp/reddb-spill");
35//!
36//! let mut manager = SpillManager::new(config);
37//!
38//! // Register a graph segment
39//! manager.register_segment("hosts", 50_000_000);  // 50MB
40//!
41//! // Track access
42//! manager.access("hosts");
43//!
44//! // Check if spill needed
45//! if let Some(segments) = manager.needs_spill() {
46//!     for seg in segments {
47//!         let data = get_segment_data(&seg);
48//!         manager.spill(&seg, &data)?;
49//!     }
50//! }
51//!
52//! // Reload spilled data
53//! if let Some(data) = manager.reload("hosts")? {
54//!     restore_segment("hosts", &data);
55//! }
56//! ```
57
58use std::collections::{HashMap, HashSet, VecDeque};
59use std::fs::{self, File};
60use std::io::{self, BufReader, BufWriter, Read, Write};
61use std::path::{Path, PathBuf};
62use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
63use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
64use std::time::Instant;
65
66fn recover_read_guard<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
67    match lock.read() {
68        Ok(guard) => guard,
69        Err(poisoned) => poisoned.into_inner(),
70    }
71}
72
73fn recover_write_guard<'a, T>(lock: &'a RwLock<T>) -> RwLockWriteGuard<'a, T> {
74    match lock.write() {
75        Ok(guard) => guard,
76        Err(poisoned) => poisoned.into_inner(),
77    }
78}
79
80fn spill_lock_error(context: &'static str) -> SpillError {
81    SpillError::Io(io::Error::other(format!("{context} lock poisoned")))
82}
83
84fn read_guard_or_err<'a, T>(
85    lock: &'a RwLock<T>,
86    context: &'static str,
87) -> Result<RwLockReadGuard<'a, T>, SpillError> {
88    lock.read().map_err(|_| spill_lock_error(context))
89}
90
91fn write_guard_or_err<'a, T>(
92    lock: &'a RwLock<T>,
93    context: &'static str,
94) -> Result<RwLockWriteGuard<'a, T>, SpillError> {
95    lock.write().map_err(|_| spill_lock_error(context))
96}
97
98// ============================================================================
99// Configuration
100// ============================================================================
101
102/// Configuration for the spill manager
103#[derive(Debug, Clone)]
104pub struct SpillConfig {
105    /// Maximum memory limit in bytes
106    pub max_memory: usize,
107    /// Threshold (0.0-1.0) at which to start spilling
108    pub spill_threshold: f64,
109    /// Directory for spill files
110    pub spill_dir: PathBuf,
111    /// Target memory after spill (0.0-1.0)
112    pub target_after_spill: f64,
113    /// Minimum segment size to consider for spilling (bytes)
114    pub min_spill_size: usize,
115    /// Access weight decay factor (0.0-1.0)
116    pub access_decay: f64,
117}
118
119impl SpillConfig {
120    /// Create a new config with reasonable defaults
121    pub fn new() -> Self {
122        Self {
123            max_memory: 512 * 1024 * 1024, // 512MB default
124            spill_threshold: 0.80,         // Spill at 80%
125            spill_dir: std::env::temp_dir().join("reddb-spill"),
126            target_after_spill: 0.60,    // Target 60% after spill
127            min_spill_size: 1024 * 1024, // 1MB minimum
128            access_decay: 0.95,          // 5% decay per check cycle
129        }
130    }
131
132    /// Set maximum memory
133    pub fn max_memory(mut self, bytes: usize) -> Self {
134        self.max_memory = bytes;
135        self
136    }
137
138    /// Set spill threshold
139    pub fn spill_threshold(mut self, threshold: f64) -> Self {
140        self.spill_threshold = threshold.clamp(0.1, 0.99);
141        self
142    }
143
144    /// Set spill directory
145    pub fn spill_dir<P: AsRef<Path>>(mut self, path: P) -> Self {
146        self.spill_dir = path.as_ref().to_path_buf();
147        self
148    }
149
150    /// Set target memory after spill
151    pub fn target_after_spill(mut self, target: f64) -> Self {
152        self.target_after_spill = target.clamp(0.1, 0.9);
153        self
154    }
155
156    /// Set minimum spill size
157    pub fn min_spill_size(mut self, size: usize) -> Self {
158        self.min_spill_size = size;
159        self
160    }
161}
162
163impl Default for SpillConfig {
164    fn default() -> Self {
165        Self::new()
166    }
167}
168
169// ============================================================================
170// Segment Tracking
171// ============================================================================
172
173/// Information about a tracked memory segment
174#[derive(Debug)]
175struct SegmentInfo {
176    /// Segment name/identifier
177    name: String,
178    /// Current size in bytes
179    size: AtomicUsize,
180    /// Access count (weighted by recency)
181    access_score: AtomicU64,
182    /// Raw access count
183    access_count: AtomicU64,
184    /// Last access time
185    last_access: RwLock<Instant>,
186    /// Whether segment is currently spilled
187    is_spilled: RwLock<bool>,
188    /// Spill file path (if spilled)
189    spill_path: RwLock<Option<PathBuf>>,
190}
191
192impl SegmentInfo {
193    fn new(name: String, size: usize) -> Self {
194        Self {
195            name,
196            size: AtomicUsize::new(size),
197            access_score: AtomicU64::new(100), // Initial score
198            access_count: AtomicU64::new(0),
199            last_access: RwLock::new(Instant::now()),
200            is_spilled: RwLock::new(false),
201            spill_path: RwLock::new(None),
202        }
203    }
204
205    fn touch(&self) {
206        self.access_count.fetch_add(1, Ordering::Relaxed);
207        // Boost score on access
208        self.access_score.fetch_add(10, Ordering::Relaxed);
209        *recover_write_guard(&self.last_access) = Instant::now();
210    }
211
212    fn decay_score(&self, factor: f64) {
213        let current = self.access_score.load(Ordering::Relaxed);
214        let new = (current as f64 * factor) as u64;
215        self.access_score.store(new.max(1), Ordering::Relaxed);
216    }
217
218    fn coldness_score(&self) -> u64 {
219        // Lower score = colder (more likely to spill)
220        // Invert access score and factor in size
221        let access = self.access_score.load(Ordering::Relaxed).max(1);
222        let size = self.size.load(Ordering::Relaxed) as u64;
223
224        // Larger segments with low access are coldest
225        size / access
226    }
227}
228
229// ============================================================================
230// Spill Statistics
231// ============================================================================
232
233/// Statistics about spill operations
234#[derive(Debug, Clone, Default)]
235pub struct SpillStats {
236    /// Current memory usage in bytes
237    pub current_memory: usize,
238    /// Maximum memory limit
239    pub max_memory: usize,
240    /// Number of segments tracked
241    pub segment_count: usize,
242    /// Number of segments currently spilled
243    pub spilled_count: usize,
244    /// Total bytes spilled to disk
245    pub bytes_spilled: u64,
246    /// Total bytes reloaded from disk
247    pub bytes_reloaded: u64,
248    /// Number of spill operations
249    pub spill_operations: u64,
250    /// Number of reload operations
251    pub reload_operations: u64,
252    /// Total spill file size on disk
253    pub disk_usage: u64,
254}
255
256impl SpillStats {
257    /// Calculate memory utilization (0.0-1.0)
258    pub fn utilization(&self) -> f64 {
259        if self.max_memory == 0 {
260            0.0
261        } else {
262            self.current_memory as f64 / self.max_memory as f64
263        }
264    }
265
266    /// Check if at spill threshold
267    pub fn at_threshold(&self, threshold: f64) -> bool {
268        self.utilization() >= threshold
269    }
270}
271
272// ============================================================================
273// Spill Manager
274// ============================================================================
275
276/// Error types for spill operations
277#[derive(Debug)]
278pub enum SpillError {
279    /// IO error during spill/reload
280    Io(io::Error),
281    /// Segment not found
282    SegmentNotFound(String),
283    /// Segment not spilled (for reload)
284    NotSpilled(String),
285    /// Segment already spilled
286    AlreadySpilled(String),
287    /// Directory creation failed
288    DirectoryCreation(io::Error),
289    /// Invalid checksum on reload
290    ChecksumMismatch,
291    /// Segment name contains path-traversal characters
292    InvalidName(String),
293}
294
295impl std::fmt::Display for SpillError {
296    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297        match self {
298            Self::Io(e) => write!(f, "IO error: {}", e),
299            Self::SegmentNotFound(s) => write!(f, "Segment not found: {}", s),
300            Self::NotSpilled(s) => write!(f, "Segment not spilled: {}", s),
301            Self::AlreadySpilled(s) => write!(f, "Segment already spilled: {}", s),
302            Self::DirectoryCreation(e) => write!(f, "Failed to create spill dir: {}", e),
303            Self::ChecksumMismatch => write!(f, "Checksum mismatch on reload"),
304            Self::InvalidName(s) => write!(f, "Invalid spill segment name: {}", s),
305        }
306    }
307}
308
309/// Reject names that could escape the spill directory when used as a filename component.
310fn sanitize_spill_name(name: &str) -> Result<(), SpillError> {
311    if name.is_empty() || name.contains('/') || name.contains('\\') || name.contains("..") {
312        return Err(SpillError::InvalidName(name.to_string()));
313    }
314    Ok(())
315}
316
317impl std::error::Error for SpillError {}
318
319impl From<io::Error> for SpillError {
320    fn from(e: io::Error) -> Self {
321        Self::Io(e)
322    }
323}
324
325/// Manages memory limits by spilling cold data to disk
326pub struct SpillManager {
327    /// Configuration
328    config: SpillConfig,
329    /// Tracked segments
330    segments: RwLock<HashMap<String, SegmentInfo>>,
331    /// Current total memory usage
332    current_memory: AtomicUsize,
333    /// Statistics
334    stats: RwLock<SpillStats>,
335    /// Access history for LRU ordering
336    access_history: RwLock<VecDeque<String>>,
337    /// Spilled segment names
338    spilled_segments: RwLock<HashSet<String>>,
339}
340
341impl SpillManager {
342    /// Create a new spill manager
343    pub fn new(config: SpillConfig) -> Self {
344        let max_memory = config.max_memory;
345
346        Self {
347            config,
348            segments: RwLock::new(HashMap::new()),
349            current_memory: AtomicUsize::new(0),
350            stats: RwLock::new(SpillStats {
351                max_memory,
352                ..Default::default()
353            }),
354            access_history: RwLock::new(VecDeque::with_capacity(1000)),
355            spilled_segments: RwLock::new(HashSet::new()),
356        }
357    }
358
359    /// Ensure spill directory exists
360    fn ensure_spill_dir(&self) -> Result<(), SpillError> {
361        if !self.config.spill_dir.exists() {
362            fs::create_dir_all(&self.config.spill_dir).map_err(SpillError::DirectoryCreation)?;
363        }
364        Ok(())
365    }
366
367    /// Register a memory segment for tracking
368    pub fn register_segment(&self, name: &str, size: usize) {
369        let info = SegmentInfo::new(name.to_string(), size);
370
371        {
372            let mut segments = recover_write_guard(&self.segments);
373            // If replacing, subtract old size
374            if let Some(old) = segments.get(name) {
375                let old_size = old.size.load(Ordering::Relaxed);
376                self.current_memory.fetch_sub(old_size, Ordering::Relaxed);
377            }
378
379            segments.insert(name.to_string(), info);
380            self.current_memory.fetch_add(size, Ordering::Relaxed);
381        }
382
383        self.update_stats();
384    }
385
386    /// Unregister a segment
387    pub fn unregister_segment(&self, name: &str) {
388        {
389            let mut segments = recover_write_guard(&self.segments);
390            if let Some(info) = segments.remove(name) {
391                let size = info.size.load(Ordering::Relaxed);
392                self.current_memory.fetch_sub(size, Ordering::Relaxed);
393
394                // Clean up spill file if exists
395                let path = recover_read_guard(&info.spill_path);
396                if let Some(p) = path.as_ref() {
397                    let _ = fs::remove_file(p);
398                }
399            }
400        }
401
402        recover_write_guard(&self.spilled_segments).remove(name);
403
404        self.update_stats();
405    }
406
407    /// Update segment size
408    pub fn update_size(&self, name: &str, new_size: usize) {
409        {
410            let segments = recover_read_guard(&self.segments);
411            if let Some(info) = segments.get(name) {
412                let old_size = info.size.swap(new_size, Ordering::Relaxed);
413                if new_size > old_size {
414                    self.current_memory
415                        .fetch_add(new_size - old_size, Ordering::Relaxed);
416                } else {
417                    self.current_memory
418                        .fetch_sub(old_size - new_size, Ordering::Relaxed);
419                }
420            }
421        }
422        self.update_stats();
423    }
424
425    /// Record access to a segment
426    pub fn access(&self, name: &str) {
427        let segments = recover_read_guard(&self.segments);
428        if let Some(info) = segments.get(name) {
429            info.touch();
430        }
431
432        // Update access history
433        let mut history = recover_write_guard(&self.access_history);
434        history.push_back(name.to_string());
435        // Keep limited history
436        while history.len() > 10000 {
437            history.pop_front();
438        }
439    }
440
441    /// Check if spilling is needed, return segments to spill
442    pub fn needs_spill(&self) -> Option<Vec<String>> {
443        let current = self.current_memory.load(Ordering::Relaxed);
444        let threshold = (self.config.max_memory as f64 * self.config.spill_threshold) as usize;
445
446        if current < threshold {
447            return None;
448        }
449
450        // Apply decay to all segments
451        self.decay_all_scores();
452
453        // Calculate how much we need to free
454        let target = (self.config.max_memory as f64 * self.config.target_after_spill) as usize;
455        let to_free = current.saturating_sub(target);
456
457        if to_free == 0 {
458            return None;
459        }
460
461        // Find coldest segments to spill
462        let mut candidates: Vec<(String, u64, usize)> = Vec::new();
463
464        let segments = recover_read_guard(&self.segments);
465        for (name, info) in segments.iter() {
466            // Skip already spilled segments
467            if *recover_read_guard(&info.is_spilled) {
468                continue;
469            }
470
471            let size = info.size.load(Ordering::Relaxed);
472            if size < self.config.min_spill_size {
473                continue;
474            }
475
476            let coldness = info.coldness_score();
477            candidates.push((name.clone(), coldness, size));
478        }
479
480        // Sort by coldness (descending - higher = colder)
481        candidates.sort_by_key(|b| std::cmp::Reverse(b.1));
482
483        // Select segments until we've freed enough
484        let mut freed = 0usize;
485        let mut to_spill = Vec::new();
486
487        for (name, _, size) in candidates {
488            if freed >= to_free {
489                break;
490            }
491            to_spill.push(name);
492            freed += size;
493        }
494
495        if to_spill.is_empty() {
496            None
497        } else {
498            Some(to_spill)
499        }
500    }
501
502    /// Spill a segment to disk
503    pub fn spill(&self, name: &str, data: &[u8]) -> Result<PathBuf, SpillError> {
504        self.ensure_spill_dir()?;
505
506        let segments = read_guard_or_err(&self.segments, "spill manager segments")?;
507
508        let info = segments
509            .get(name)
510            .ok_or_else(|| SpillError::SegmentNotFound(name.to_string()))?;
511
512        // Check if already spilled
513        if *read_guard_or_err(&info.is_spilled, "spill manager segment flag")? {
514            return Err(SpillError::AlreadySpilled(name.to_string()));
515        }
516
517        // Validate name before using it as a filename component
518        sanitize_spill_name(name)?;
519
520        // Generate spill file path
521        let filename = format!("{}-{}.spill", name, std::process::id());
522        let path = self.config.spill_dir.join(&filename);
523
524        // Paranoia check: path must stay within spill dir
525        if !path.starts_with(&self.config.spill_dir) {
526            return Err(SpillError::InvalidName(name.to_string()));
527        }
528
529        // Write data with checksum
530        let file = File::create(&path)?;
531        let mut writer = BufWriter::new(file);
532
533        // Header: magic(4) + version(1) + checksum(4) + size(8)
534        writer.write_all(b"SPIL")?; // Magic
535        writer.write_all(&[2u8])?; // Version 2 — crc32 checksum
536
537        let checksum = crate::storage::engine::crc32::crc32(data);
538        writer.write_all(&checksum.to_le_bytes())?;
539        writer.write_all(&(data.len() as u64).to_le_bytes())?;
540
541        // Write data
542        writer.write_all(data)?;
543        writer.flush()?;
544
545        // Update segment state
546        drop(segments);
547
548        let segments = read_guard_or_err(&self.segments, "spill manager segments")?;
549        if let Some(info) = segments.get(name) {
550            *write_guard_or_err(&info.is_spilled, "spill manager segment flag")? = true;
551            *write_guard_or_err(&info.spill_path, "spill manager segment spill path")? =
552                Some(path.clone());
553        }
554
555        // Update memory tracking
556        self.current_memory.fetch_sub(data.len(), Ordering::Relaxed);
557
558        // Track in spilled set
559        write_guard_or_err(&self.spilled_segments, "spill manager spilled set")?
560            .insert(name.to_string());
561
562        // Update stats
563        let mut stats = write_guard_or_err(&self.stats, "spill manager stats")?;
564        stats.spill_operations += 1;
565        stats.bytes_spilled += data.len() as u64;
566        stats.spilled_count += 1;
567        stats.disk_usage += data.len() as u64;
568        drop(stats);
569
570        self.update_stats();
571
572        Ok(path)
573    }
574
575    /// Reload a spilled segment from disk
576    pub fn reload(&self, name: &str) -> Result<Option<Vec<u8>>, SpillError> {
577        let segments = read_guard_or_err(&self.segments, "spill manager segments")?;
578
579        let info = segments
580            .get(name)
581            .ok_or_else(|| SpillError::SegmentNotFound(name.to_string()))?;
582
583        // Check if actually spilled
584        if !*read_guard_or_err(&info.is_spilled, "spill manager segment flag")? {
585            return Ok(None);
586        }
587
588        let path = info
589            .spill_path
590            .read()
591            .map_err(|_| spill_lock_error("spill manager segment spill path"))?
592            .clone()
593            .ok_or_else(|| SpillError::NotSpilled(name.to_string()))?;
594
595        // Read and validate
596        let file = File::open(&path)?;
597        let mut reader = BufReader::new(file);
598
599        // Read header
600        let mut magic = [0u8; 4];
601        reader.read_exact(&mut magic)?;
602        if &magic != b"SPIL" {
603            return Err(SpillError::ChecksumMismatch);
604        }
605
606        let mut version = [0u8; 1];
607        reader.read_exact(&mut version)?;
608
609        let mut checksum_bytes = [0u8; 4];
610        reader.read_exact(&mut checksum_bytes)?;
611        let expected_checksum = u32::from_le_bytes(checksum_bytes);
612
613        let mut size_bytes = [0u8; 8];
614        reader.read_exact(&mut size_bytes)?;
615        let size = u64::from_le_bytes(size_bytes) as usize;
616
617        // Read data
618        let mut data = vec![0u8; size];
619        reader.read_exact(&mut data)?;
620
621        // Validate checksum — v1 used wrapping-add fold, v2 uses crc32
622        let actual_checksum = match version[0] {
623            1 => data.iter().fold(0u32, |acc, &b| acc.wrapping_add(b as u32)),
624            2 => crate::storage::engine::crc32::crc32(&data),
625            _ => return Err(SpillError::ChecksumMismatch),
626        };
627        if actual_checksum != expected_checksum {
628            return Err(SpillError::ChecksumMismatch);
629        }
630
631        // Update segment state
632        drop(segments);
633
634        let segments = read_guard_or_err(&self.segments, "spill manager segments")?;
635        if let Some(info) = segments.get(name) {
636            *write_guard_or_err(&info.is_spilled, "spill manager segment flag")? = false;
637            *write_guard_or_err(&info.spill_path, "spill manager segment spill path")? = None;
638        }
639
640        // Update memory tracking
641        self.current_memory.fetch_add(data.len(), Ordering::Relaxed);
642
643        // Remove from spilled set
644        write_guard_or_err(&self.spilled_segments, "spill manager spilled set")?.remove(name);
645
646        // Delete spill file
647        let _ = fs::remove_file(&path);
648
649        // Update stats
650        let mut stats = write_guard_or_err(&self.stats, "spill manager stats")?;
651        stats.reload_operations += 1;
652        stats.bytes_reloaded += data.len() as u64;
653        stats.spilled_count = stats.spilled_count.saturating_sub(1);
654        stats.disk_usage = stats.disk_usage.saturating_sub(data.len() as u64);
655        drop(stats);
656
657        self.update_stats();
658
659        Ok(Some(data))
660    }
661
662    /// Check if a segment is spilled
663    pub fn is_spilled(&self, name: &str) -> bool {
664        recover_read_guard(&self.spilled_segments).contains(name)
665    }
666
667    /// Get current statistics
668    pub fn stats(&self) -> SpillStats {
669        recover_read_guard(&self.stats).clone()
670    }
671
672    /// Get current memory usage
673    pub fn memory_usage(&self) -> usize {
674        self.current_memory.load(Ordering::Relaxed)
675    }
676
677    /// Get memory utilization (0.0-1.0)
678    pub fn utilization(&self) -> f64 {
679        let current = self.current_memory.load(Ordering::Relaxed);
680        if self.config.max_memory == 0 {
681            0.0
682        } else {
683            current as f64 / self.config.max_memory as f64
684        }
685    }
686
687    /// List all tracked segments
688    pub fn list_segments(&self) -> Vec<(String, usize, bool)> {
689        let segments = recover_read_guard(&self.segments);
690        segments
691            .iter()
692            .map(|(name, info)| {
693                (
694                    name.clone(),
695                    info.size.load(Ordering::Relaxed),
696                    *recover_read_guard(&info.is_spilled),
697                )
698            })
699            .collect()
700    }
701
702    /// Clean up all spill files
703    pub fn cleanup(&self) -> io::Result<()> {
704        if self.config.spill_dir.exists() {
705            for entry in fs::read_dir(&self.config.spill_dir)? {
706                let entry = entry?;
707                let path = entry.path();
708                if path.extension().map(|e| e == "spill").unwrap_or(false) {
709                    let _ = fs::remove_file(path);
710                }
711            }
712        }
713
714        // Clear spilled state
715        let segments = recover_read_guard(&self.segments);
716        for info in segments.values() {
717            *recover_write_guard(&info.is_spilled) = false;
718            *recover_write_guard(&info.spill_path) = None;
719        }
720
721        recover_write_guard(&self.spilled_segments).clear();
722
723        Ok(())
724    }
725
726    /// Decay all segment scores (called periodically)
727    fn decay_all_scores(&self) {
728        let segments = recover_read_guard(&self.segments);
729        for info in segments.values() {
730            info.decay_score(self.config.access_decay);
731        }
732    }
733
734    /// Update stats from current state
735    fn update_stats(&self) {
736        let mut stats = recover_write_guard(&self.stats);
737        stats.current_memory = self.current_memory.load(Ordering::Relaxed);
738
739        let segments = recover_read_guard(&self.segments);
740        stats.segment_count = segments.len();
741        drop(segments);
742
743        let spilled = recover_read_guard(&self.spilled_segments);
744        stats.spilled_count = spilled.len();
745    }
746}
747
748impl Default for SpillManager {
749    fn default() -> Self {
750        Self::new(SpillConfig::default())
751    }
752}
753
754impl Drop for SpillManager {
755    fn drop(&mut self) {
756        // Clean up spill files on drop
757        let _ = self.cleanup();
758    }
759}
760
761// ============================================================================
762// Memory-Limited Graph Wrapper
763// ============================================================================
764
765/// A graph wrapper that automatically spills to disk when memory limit is reached
766pub struct SpillableGraph<G> {
767    /// The underlying graph
768    pub graph: G,
769    /// Spill manager
770    pub spill_manager: SpillManager,
771    /// Segment name for this graph
772    segment_name: String,
773}
774
775impl<G> SpillableGraph<G> {
776    /// Create a new spillable graph wrapper
777    pub fn new(graph: G, segment_name: &str, config: SpillConfig) -> Self {
778        Self {
779            graph,
780            spill_manager: SpillManager::new(config),
781            segment_name: segment_name.to_string(),
782        }
783    }
784
785    /// Get the segment name
786    pub fn segment_name(&self) -> &str {
787        &self.segment_name
788    }
789
790    /// Check memory and spill if needed
791    pub fn check_memory(&mut self, current_size: usize) -> bool {
792        self.spill_manager
793            .update_size(&self.segment_name, current_size);
794        self.spill_manager.needs_spill().is_some()
795    }
796
797    /// Get spill manager stats
798    pub fn stats(&self) -> SpillStats {
799        self.spill_manager.stats()
800    }
801}
802
803// ============================================================================
804// Tests
805// ============================================================================
806
807#[cfg(test)]
808mod tests {
809    use super::*;
810    use std::env;
811
812    fn test_config() -> SpillConfig {
813        use std::sync::atomic::{AtomicU64, Ordering};
814        static COUNTER: AtomicU64 = AtomicU64::new(0);
815        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
816        SpillConfig::new()
817            .max_memory(1024 * 1024) // 1MB for testing
818            .spill_threshold(0.5)
819            .target_after_spill(0.3)
820            .min_spill_size(100)
821            .spill_dir(env::temp_dir().join(format!(
822                "reddb-spill-test-{}-{}",
823                std::process::id(),
824                id
825            )))
826    }
827
828    #[test]
829    fn test_register_segment() {
830        let manager = SpillManager::new(test_config());
831
832        manager.register_segment("seg1", 100_000);
833        manager.register_segment("seg2", 200_000);
834
835        assert_eq!(manager.memory_usage(), 300_000);
836
837        let stats = manager.stats();
838        assert_eq!(stats.segment_count, 2);
839    }
840
841    #[test]
842    fn test_update_size() {
843        let manager = SpillManager::new(test_config());
844
845        manager.register_segment("seg1", 100_000);
846        assert_eq!(manager.memory_usage(), 100_000);
847
848        manager.update_size("seg1", 150_000);
849        assert_eq!(manager.memory_usage(), 150_000);
850
851        manager.update_size("seg1", 50_000);
852        assert_eq!(manager.memory_usage(), 50_000);
853    }
854
855    #[test]
856    fn test_needs_spill() {
857        let manager = SpillManager::new(test_config());
858
859        // Below threshold
860        manager.register_segment("seg1", 400_000); // 40%
861        assert!(manager.needs_spill().is_none());
862
863        // Above threshold (50%)
864        manager.register_segment("seg2", 200_000); // Now 60%
865
866        // Access seg1 more to make seg2 colder
867        for _ in 0..100 {
868            manager.access("seg1");
869        }
870
871        let to_spill = manager.needs_spill();
872        assert!(to_spill.is_some());
873        let segments = to_spill.unwrap();
874        assert!(segments.contains(&"seg2".to_string()));
875    }
876
877    #[test]
878    fn test_spill_and_reload() {
879        let manager = SpillManager::new(test_config());
880
881        manager.register_segment("test_seg", 1000);
882
883        let data = vec![1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10];
884        let path = manager.spill("test_seg", &data).unwrap();
885
886        assert!(path.exists());
887        assert!(manager.is_spilled("test_seg"));
888
889        let reloaded = manager.reload("test_seg").unwrap();
890        assert!(reloaded.is_some());
891        assert_eq!(reloaded.unwrap(), data);
892        assert!(!manager.is_spilled("test_seg"));
893    }
894
895    #[test]
896    fn test_checksum_validation() {
897        let manager = SpillManager::new(test_config());
898
899        // Use unique segment name to avoid conflicts
900        manager.register_segment("checksum_test_seg", 100);
901
902        let data = b"test data for checksum validation";
903        manager.spill("checksum_test_seg", data).unwrap();
904
905        // Should reload successfully
906        let reloaded = manager.reload("checksum_test_seg").unwrap();
907        assert!(reloaded.is_some());
908        assert_eq!(&reloaded.unwrap()[..], data);
909    }
910
911    #[test]
912    fn test_list_segments() {
913        let manager = SpillManager::new(test_config());
914
915        manager.register_segment("alpha", 1000);
916        manager.register_segment("beta", 2000);
917        manager.register_segment("gamma", 3000);
918
919        let segments = manager.list_segments();
920        assert_eq!(segments.len(), 3);
921
922        let names: Vec<_> = segments.iter().map(|(n, _, _)| n.as_str()).collect();
923        assert!(names.contains(&"alpha"));
924        assert!(names.contains(&"beta"));
925        assert!(names.contains(&"gamma"));
926    }
927
928    #[test]
929    fn test_unregister_segment() {
930        let manager = SpillManager::new(test_config());
931
932        manager.register_segment("seg1", 100_000);
933        manager.register_segment("seg2", 200_000);
934
935        assert_eq!(manager.memory_usage(), 300_000);
936
937        manager.unregister_segment("seg1");
938
939        assert_eq!(manager.memory_usage(), 200_000);
940        assert_eq!(manager.stats().segment_count, 1);
941    }
942
943    #[test]
944    fn test_cleanup() {
945        let manager = SpillManager::new(test_config());
946
947        manager.register_segment("seg1", 100);
948        manager.spill("seg1", b"test data").unwrap();
949
950        assert!(manager.is_spilled("seg1"));
951
952        manager.cleanup().unwrap();
953
954        assert!(!manager.is_spilled("seg1"));
955    }
956
957    #[test]
958    fn test_utilization() {
959        let config = SpillConfig::new().max_memory(1000);
960        let manager = SpillManager::new(config);
961
962        manager.register_segment("seg", 500);
963
964        let util = manager.utilization();
965        assert!((util - 0.5).abs() < 0.001);
966    }
967
968    // Data section starts at byte 17: 4 (magic) + 1 (version) + 4 (checksum) + 8 (size)
969    const HEADER_LEN: usize = 17;
970
971    #[test]
972    fn test_v2_round_trip() {
973        let manager = SpillManager::new(test_config());
974        manager.register_segment("rt_seg", 100);
975        let data: Vec<u8> = (0u8..=127).collect();
976        manager.spill("rt_seg", &data).unwrap();
977        let out = manager.reload("rt_seg").unwrap().unwrap();
978        assert_eq!(out, data);
979    }
980
981    #[test]
982    fn test_single_byte_mutation_detected() {
983        let manager = SpillManager::new(test_config());
984        manager.register_segment("mut_seg", 100);
985        let data = b"hello world mutation test data!!";
986        let path = manager.spill("mut_seg", data).unwrap();
987
988        // Flip a byte in the data section
989        let mut raw = std::fs::read(&path).unwrap();
990        raw[HEADER_LEN] ^= 0xFF;
991        std::fs::write(&path, &raw).unwrap();
992
993        let result = manager.reload("mut_seg");
994        assert!(
995            matches!(result, Err(SpillError::ChecksumMismatch)),
996            "expected ChecksumMismatch, got {:?}",
997            result
998        );
999    }
1000
1001    #[test]
1002    fn test_byte_permutation_detected() {
1003        // The old fold checksum is commutative — swapping two bytes leaves the
1004        // sum unchanged. CRC-32 is not commutative, so v2 catches this.
1005        let manager = SpillManager::new(test_config());
1006        manager.register_segment("perm_seg", 100);
1007        let data = b"abcdefghij"; // all distinct bytes
1008        let path = manager.spill("perm_seg", data).unwrap();
1009
1010        // Swap two bytes in the data section
1011        let mut raw = std::fs::read(&path).unwrap();
1012        raw.swap(HEADER_LEN, HEADER_LEN + 1);
1013        std::fs::write(&path, &raw).unwrap();
1014
1015        let result = manager.reload("perm_seg");
1016        assert!(
1017            matches!(result, Err(SpillError::ChecksumMismatch)),
1018            "expected ChecksumMismatch, got {:?}",
1019            result
1020        );
1021    }
1022
1023    #[test]
1024    fn test_path_traversal_rejected() {
1025        let manager = SpillManager::new(test_config());
1026        for bad_name in &["../foo", "/etc/passwd", "a/b"] {
1027            manager.register_segment(bad_name, 100);
1028            let result = manager.spill(bad_name, b"data");
1029            assert!(
1030                matches!(result, Err(SpillError::InvalidName(_))),
1031                "expected InvalidName for {:?}, got {:?}",
1032                bad_name,
1033                result
1034            );
1035        }
1036    }
1037}