Skip to main content

stackforge_core/flow/
spill.rs

1//! Disk spill manager for memory-budgeted flow extraction.
2//!
3//! Provides `ReassemblyStorage` for data that can be transparently spilled to
4//! mmap'd temp files when RAM budget is exceeded, and `MemoryTracker` for
5//! global memory accounting.
6
7use std::io::Write;
8use std::path::Path;
9use std::sync::atomic::{AtomicUsize, Ordering};
10
11use memmap2::Mmap;
12use tempfile::NamedTempFile;
13
14/// A handle to reassembly data that may be in memory or on disk.
15///
16/// When in memory, behaves like a `Vec<u8>`. When spilled, data lives in a
17/// temporary file and is read back via `mmap` on demand. The temp file is
18/// auto-deleted when this value is dropped.
19#[derive(Debug)]
20pub enum ReassemblyStorage {
21    /// Data held in memory.
22    InMemory(Vec<u8>),
23    /// Data flushed to a memory-mapped temporary file.
24    OnDisk { file: NamedTempFile, len: usize },
25}
26
27impl ReassemblyStorage {
28    /// Create new empty in-memory storage.
29    pub fn new() -> Self {
30        Self::InMemory(Vec::new())
31    }
32
33    /// Current byte length of stored data.
34    pub fn len(&self) -> usize {
35        match self {
36            Self::InMemory(v) => v.len(),
37            Self::OnDisk { len, .. } => *len,
38        }
39    }
40
41    /// Whether storage is empty.
42    pub fn is_empty(&self) -> bool {
43        self.len() == 0
44    }
45
46    /// Append data. Works for both in-memory and on-disk storage.
47    ///
48    /// When in-memory, appends to the `Vec`. When on-disk, appends to the
49    /// temporary file and updates the tracked length.
50    pub fn extend_from_slice(&mut self, data: &[u8]) {
51        match self {
52            Self::InMemory(v) => v.extend_from_slice(data),
53            Self::OnDisk { file, len } => {
54                file.write_all(data)
55                    .expect("failed to append to spill file");
56                file.flush().expect("failed to flush spill file");
57                *len += data.len();
58            },
59        }
60    }
61
62    /// Spill in-memory data to disk, returning the number of bytes freed.
63    ///
64    /// If already on disk or empty, returns 0.
65    pub fn spill_to_disk(&mut self, spill_dir: Option<&Path>) -> std::io::Result<usize> {
66        let old = std::mem::replace(self, Self::InMemory(Vec::new()));
67        match old {
68            Self::InMemory(data) => {
69                if data.is_empty() {
70                    *self = Self::InMemory(data);
71                    return Ok(0);
72                }
73                let freed = data.len();
74                let mut tmpfile = match spill_dir {
75                    Some(dir) => NamedTempFile::new_in(dir)?,
76                    None => NamedTempFile::new()?,
77                };
78                tmpfile.write_all(&data)?;
79                tmpfile.flush()?;
80                *self = Self::OnDisk {
81                    file: tmpfile,
82                    len: freed,
83                };
84                Ok(freed)
85            },
86            already_on_disk @ Self::OnDisk { .. } => {
87                *self = already_on_disk;
88                Ok(0)
89            },
90        }
91    }
92
93    /// Read all data back. Works for both in-memory and on-disk storage.
94    pub fn read_all(&self) -> std::io::Result<Vec<u8>> {
95        match self {
96            Self::InMemory(v) => Ok(v.clone()),
97            Self::OnDisk { file, len } => {
98                if *len == 0 {
99                    return Ok(Vec::new());
100                }
101                // Safety: the file is exclusively ours (NamedTempFile), and we
102                // only read from it after flushing. The mmap is short-lived.
103                let mmap = unsafe { Mmap::map(file.as_file())? };
104                Ok(mmap[..*len].to_vec())
105            },
106        }
107    }
108
109    /// Get a reference to in-memory data. Returns `None` if spilled.
110    pub fn as_slice(&self) -> Option<&[u8]> {
111        match self {
112            Self::InMemory(v) => Some(v),
113            Self::OnDisk { .. } => None,
114        }
115    }
116
117    /// Whether data is currently on disk.
118    pub fn is_spilled(&self) -> bool {
119        matches!(self, Self::OnDisk { .. })
120    }
121
122    /// Bytes currently held in memory (0 if spilled).
123    pub fn in_memory_bytes(&self) -> usize {
124        match self {
125            Self::InMemory(v) => v.len(),
126            Self::OnDisk { .. } => 0,
127        }
128    }
129
130    /// Drain and return data, resetting to empty. Reads from disk if spilled.
131    pub fn drain(&mut self) -> std::io::Result<Vec<u8>> {
132        let data = self.read_all()?;
133        *self = Self::InMemory(Vec::new());
134        Ok(data)
135    }
136}
137
138impl Default for ReassemblyStorage {
139    fn default() -> Self {
140        Self::new()
141    }
142}
143
144/// Global memory tracker for the flow extraction engine.
145///
146/// Uses atomic operations for thread-safe accounting without locks.
147#[derive(Debug)]
148pub struct MemoryTracker {
149    /// Current estimated memory usage in bytes.
150    current: AtomicUsize,
151    /// Budget limit (None = unlimited).
152    budget: Option<usize>,
153}
154
155impl MemoryTracker {
156    /// Create a new tracker with an optional budget.
157    pub fn new(budget: Option<usize>) -> Self {
158        Self {
159            current: AtomicUsize::new(0),
160            budget,
161        }
162    }
163
164    /// Record newly allocated bytes.
165    pub fn add(&self, bytes: usize) {
166        self.current.fetch_add(bytes, Ordering::Relaxed);
167    }
168
169    /// Record freed bytes.
170    pub fn subtract(&self, bytes: usize) {
171        self.current.fetch_sub(bytes, Ordering::Relaxed);
172    }
173
174    /// Current estimated memory usage.
175    pub fn current_usage(&self) -> usize {
176        self.current.load(Ordering::Relaxed)
177    }
178
179    /// Whether current usage exceeds the budget.
180    pub fn is_over_budget(&self) -> bool {
181        match self.budget {
182            Some(b) => self.current_usage() > b,
183            None => false,
184        }
185    }
186
187    /// Whether a budget has been set.
188    pub fn has_budget(&self) -> bool {
189        self.budget.is_some()
190    }
191
192    /// The configured budget, if any.
193    pub fn budget(&self) -> Option<usize> {
194        self.budget
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201
202    #[test]
203    fn test_reassembly_storage_in_memory() {
204        let mut s = ReassemblyStorage::new();
205        assert!(s.is_empty());
206        assert_eq!(s.len(), 0);
207        assert!(!s.is_spilled());
208
209        s.extend_from_slice(b"hello");
210        assert_eq!(s.len(), 5);
211        assert_eq!(s.as_slice(), Some(b"hello".as_slice()));
212        assert_eq!(s.in_memory_bytes(), 5);
213
214        let data = s.read_all().unwrap();
215        assert_eq!(data, b"hello");
216    }
217
218    #[test]
219    fn test_reassembly_storage_spill_and_read() {
220        let mut s = ReassemblyStorage::new();
221        s.extend_from_slice(b"test data for spill");
222
223        let freed = s.spill_to_disk(None).unwrap();
224        assert_eq!(freed, 19);
225        assert!(s.is_spilled());
226        assert_eq!(s.len(), 19);
227        assert_eq!(s.in_memory_bytes(), 0);
228        assert!(s.as_slice().is_none());
229
230        let data = s.read_all().unwrap();
231        assert_eq!(data, b"test data for spill");
232    }
233
234    #[test]
235    fn test_reassembly_storage_spill_empty() {
236        let mut s = ReassemblyStorage::new();
237        let freed = s.spill_to_disk(None).unwrap();
238        assert_eq!(freed, 0);
239        assert!(!s.is_spilled());
240    }
241
242    #[test]
243    fn test_reassembly_storage_double_spill() {
244        let mut s = ReassemblyStorage::new();
245        s.extend_from_slice(b"data");
246        s.spill_to_disk(None).unwrap();
247
248        // Second spill should be a no-op
249        let freed = s.spill_to_disk(None).unwrap();
250        assert_eq!(freed, 0);
251    }
252
253    #[test]
254    fn test_reassembly_storage_drain() {
255        let mut s = ReassemblyStorage::new();
256        s.extend_from_slice(b"drain me");
257        let data = s.drain().unwrap();
258        assert_eq!(data, b"drain me");
259        assert!(s.is_empty());
260    }
261
262    #[test]
263    fn test_reassembly_storage_extend_after_spill() {
264        let mut s = ReassemblyStorage::new();
265        s.extend_from_slice(b"hello ");
266        s.spill_to_disk(None).unwrap();
267        assert!(s.is_spilled());
268
269        // Appending after spill should work (writes to file)
270        s.extend_from_slice(b"world");
271        assert_eq!(s.len(), 11);
272        assert!(s.is_spilled());
273
274        let data = s.read_all().unwrap();
275        assert_eq!(data, b"hello world");
276    }
277
278    #[test]
279    fn test_reassembly_storage_drain_spilled() {
280        let mut s = ReassemblyStorage::new();
281        s.extend_from_slice(b"spilled drain");
282        s.spill_to_disk(None).unwrap();
283        let data = s.drain().unwrap();
284        assert_eq!(data, b"spilled drain");
285        assert!(s.is_empty());
286        assert!(!s.is_spilled());
287    }
288
289    #[test]
290    fn test_memory_tracker_no_budget() {
291        let tracker = MemoryTracker::new(None);
292        assert!(!tracker.has_budget());
293        tracker.add(1_000_000);
294        assert!(!tracker.is_over_budget());
295    }
296
297    #[test]
298    fn test_memory_tracker_with_budget() {
299        let tracker = MemoryTracker::new(Some(1000));
300        assert!(tracker.has_budget());
301        assert_eq!(tracker.budget(), Some(1000));
302
303        tracker.add(500);
304        assert_eq!(tracker.current_usage(), 500);
305        assert!(!tracker.is_over_budget());
306
307        tracker.add(600);
308        assert_eq!(tracker.current_usage(), 1100);
309        assert!(tracker.is_over_budget());
310
311        tracker.subtract(200);
312        assert_eq!(tracker.current_usage(), 900);
313        assert!(!tracker.is_over_budget());
314    }
315}