Skip to main content

stackforge_core/flow/
spill.rs

1//! Disk spill manager for memory-budgeted flow extraction.
2//!
3//! Provides `ReassemblyStorage` for data that can be transparently spilled to
4//! mmap'd temp files when RAM budget is exceeded, and `MemoryTracker` for
5//! global memory accounting.
6
7use std::io::Write;
8use std::path::Path;
9use std::sync::atomic::{AtomicUsize, Ordering};
10
11use memmap2::Mmap;
12use tempfile::NamedTempFile;
13
14/// A handle to reassembly data that may be in memory or on disk.
15///
16/// When in memory, behaves like a `Vec<u8>`. When spilled, data lives in a
17/// temporary file and is read back via `mmap` on demand. The temp file is
18/// auto-deleted when this value is dropped.
19#[derive(Debug)]
20pub enum ReassemblyStorage {
21    /// Data held in memory.
22    InMemory(Vec<u8>),
23    /// Data flushed to a memory-mapped temporary file.
24    OnDisk { file: NamedTempFile, len: usize },
25}
26
27impl ReassemblyStorage {
28    /// Create new empty in-memory storage.
29    pub fn new() -> Self {
30        Self::InMemory(Vec::new())
31    }
32
33    /// Current byte length of stored data.
34    pub fn len(&self) -> usize {
35        match self {
36            Self::InMemory(v) => v.len(),
37            Self::OnDisk { len, .. } => *len,
38        }
39    }
40
41    /// Whether storage is empty.
42    pub fn is_empty(&self) -> bool {
43        self.len() == 0
44    }
45
46    /// Append data. Works for both in-memory and on-disk storage.
47    ///
48    /// When in-memory, appends to the `Vec`. When on-disk, appends to the
49    /// temporary file and updates the tracked length.
50    pub fn extend_from_slice(&mut self, data: &[u8]) {
51        match self {
52            Self::InMemory(v) => v.extend_from_slice(data),
53            Self::OnDisk { file, len } => {
54                file.write_all(data)
55                    .expect("failed to append to spill file");
56                file.flush().expect("failed to flush spill file");
57                *len += data.len();
58            },
59        }
60    }
61
62    /// Spill in-memory data to disk, returning the number of bytes freed.
63    ///
64    /// If already on disk or empty, returns 0.
65    pub fn spill_to_disk(&mut self, spill_dir: Option<&Path>) -> std::io::Result<usize> {
66        let old = std::mem::replace(self, Self::InMemory(Vec::new()));
67        match old {
68            Self::InMemory(data) => {
69                if data.is_empty() {
70                    *self = Self::InMemory(data);
71                    return Ok(0);
72                }
73                let freed = data.len();
74                let mut tmpfile = match spill_dir {
75                    Some(dir) => NamedTempFile::new_in(dir)?,
76                    None => NamedTempFile::new()?,
77                };
78                tmpfile.write_all(&data)?;
79                tmpfile.flush()?;
80                *self = Self::OnDisk {
81                    file: tmpfile,
82                    len: freed,
83                };
84                Ok(freed)
85            },
86            already_on_disk @ Self::OnDisk { .. } => {
87                *self = already_on_disk;
88                Ok(0)
89            },
90        }
91    }
92
93    /// Read all data back. Works for both in-memory and on-disk storage.
94    pub fn read_all(&self) -> std::io::Result<Vec<u8>> {
95        match self {
96            Self::InMemory(v) => Ok(v.clone()),
97            Self::OnDisk { file, len } => {
98                if *len == 0 {
99                    return Ok(Vec::new());
100                }
101                // Safety: the file is exclusively ours (NamedTempFile), and we
102                // only read from it after flushing. The mmap is short-lived.
103                let mmap = unsafe { Mmap::map(file.as_file())? };
104                Ok(mmap[..*len].to_vec())
105            },
106        }
107    }
108
109    /// Get a reference to in-memory data. Returns `None` if spilled.
110    pub fn as_slice(&self) -> Option<&[u8]> {
111        match self {
112            Self::InMemory(v) => Some(v),
113            Self::OnDisk { .. } => None,
114        }
115    }
116
117    /// Whether data is currently on disk.
118    pub fn is_spilled(&self) -> bool {
119        matches!(self, Self::OnDisk { .. })
120    }
121
122    /// Bytes currently held in memory (0 if spilled).
123    pub fn in_memory_bytes(&self) -> usize {
124        match self {
125            Self::InMemory(v) => v.len(),
126            Self::OnDisk { .. } => 0,
127        }
128    }
129
130    /// Truncate stored data to at most `max_len` bytes.
131    ///
132    /// For in-memory storage, truncates the `Vec`. For on-disk storage,
133    /// only updates the tracked length (the file is not physically truncated).
134    pub fn truncate(&mut self, max_len: usize) {
135        match self {
136            Self::InMemory(v) => v.truncate(max_len),
137            Self::OnDisk { len, .. } => {
138                if *len > max_len {
139                    *len = max_len;
140                }
141            },
142        }
143    }
144
145    /// Drain and return data, resetting to empty. Reads from disk if spilled.
146    pub fn drain(&mut self) -> std::io::Result<Vec<u8>> {
147        let data = self.read_all()?;
148        *self = Self::InMemory(Vec::new());
149        Ok(data)
150    }
151}
152
153impl Default for ReassemblyStorage {
154    fn default() -> Self {
155        Self::new()
156    }
157}
158
159/// Global memory tracker for the flow extraction engine.
160///
161/// Uses atomic operations for thread-safe accounting without locks.
162#[derive(Debug)]
163pub struct MemoryTracker {
164    /// Current estimated memory usage in bytes.
165    current: AtomicUsize,
166    /// Budget limit (None = unlimited).
167    budget: Option<usize>,
168}
169
170impl MemoryTracker {
171    /// Create a new tracker with an optional budget.
172    pub fn new(budget: Option<usize>) -> Self {
173        Self {
174            current: AtomicUsize::new(0),
175            budget,
176        }
177    }
178
179    /// Record newly allocated bytes.
180    pub fn add(&self, bytes: usize) {
181        self.current.fetch_add(bytes, Ordering::Relaxed);
182    }
183
184    /// Record freed bytes.
185    pub fn subtract(&self, bytes: usize) {
186        self.current.fetch_sub(bytes, Ordering::Relaxed);
187    }
188
189    /// Current estimated memory usage.
190    pub fn current_usage(&self) -> usize {
191        self.current.load(Ordering::Relaxed)
192    }
193
194    /// Whether current usage exceeds the budget.
195    pub fn is_over_budget(&self) -> bool {
196        match self.budget {
197            Some(b) => self.current_usage() > b,
198            None => false,
199        }
200    }
201
202    /// Whether a budget has been set.
203    pub fn has_budget(&self) -> bool {
204        self.budget.is_some()
205    }
206
207    /// The configured budget, if any.
208    pub fn budget(&self) -> Option<usize> {
209        self.budget
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    #[test]
218    fn test_reassembly_storage_in_memory() {
219        let mut s = ReassemblyStorage::new();
220        assert!(s.is_empty());
221        assert_eq!(s.len(), 0);
222        assert!(!s.is_spilled());
223
224        s.extend_from_slice(b"hello");
225        assert_eq!(s.len(), 5);
226        assert_eq!(s.as_slice(), Some(b"hello".as_slice()));
227        assert_eq!(s.in_memory_bytes(), 5);
228
229        let data = s.read_all().unwrap();
230        assert_eq!(data, b"hello");
231    }
232
233    #[test]
234    fn test_reassembly_storage_spill_and_read() {
235        let mut s = ReassemblyStorage::new();
236        s.extend_from_slice(b"test data for spill");
237
238        let freed = s.spill_to_disk(None).unwrap();
239        assert_eq!(freed, 19);
240        assert!(s.is_spilled());
241        assert_eq!(s.len(), 19);
242        assert_eq!(s.in_memory_bytes(), 0);
243        assert!(s.as_slice().is_none());
244
245        let data = s.read_all().unwrap();
246        assert_eq!(data, b"test data for spill");
247    }
248
249    #[test]
250    fn test_reassembly_storage_spill_empty() {
251        let mut s = ReassemblyStorage::new();
252        let freed = s.spill_to_disk(None).unwrap();
253        assert_eq!(freed, 0);
254        assert!(!s.is_spilled());
255    }
256
257    #[test]
258    fn test_reassembly_storage_double_spill() {
259        let mut s = ReassemblyStorage::new();
260        s.extend_from_slice(b"data");
261        s.spill_to_disk(None).unwrap();
262
263        // Second spill should be a no-op
264        let freed = s.spill_to_disk(None).unwrap();
265        assert_eq!(freed, 0);
266    }
267
268    #[test]
269    fn test_reassembly_storage_drain() {
270        let mut s = ReassemblyStorage::new();
271        s.extend_from_slice(b"drain me");
272        let data = s.drain().unwrap();
273        assert_eq!(data, b"drain me");
274        assert!(s.is_empty());
275    }
276
277    #[test]
278    fn test_reassembly_storage_extend_after_spill() {
279        let mut s = ReassemblyStorage::new();
280        s.extend_from_slice(b"hello ");
281        s.spill_to_disk(None).unwrap();
282        assert!(s.is_spilled());
283
284        // Appending after spill should work (writes to file)
285        s.extend_from_slice(b"world");
286        assert_eq!(s.len(), 11);
287        assert!(s.is_spilled());
288
289        let data = s.read_all().unwrap();
290        assert_eq!(data, b"hello world");
291    }
292
293    #[test]
294    fn test_reassembly_storage_drain_spilled() {
295        let mut s = ReassemblyStorage::new();
296        s.extend_from_slice(b"spilled drain");
297        s.spill_to_disk(None).unwrap();
298        let data = s.drain().unwrap();
299        assert_eq!(data, b"spilled drain");
300        assert!(s.is_empty());
301        assert!(!s.is_spilled());
302    }
303
304    #[test]
305    fn test_memory_tracker_no_budget() {
306        let tracker = MemoryTracker::new(None);
307        assert!(!tracker.has_budget());
308        tracker.add(1_000_000);
309        assert!(!tracker.is_over_budget());
310    }
311
312    #[test]
313    fn test_memory_tracker_with_budget() {
314        let tracker = MemoryTracker::new(Some(1000));
315        assert!(tracker.has_budget());
316        assert_eq!(tracker.budget(), Some(1000));
317
318        tracker.add(500);
319        assert_eq!(tracker.current_usage(), 500);
320        assert!(!tracker.is_over_budget());
321
322        tracker.add(600);
323        assert_eq!(tracker.current_usage(), 1100);
324        assert!(tracker.is_over_budget());
325
326        tracker.subtract(200);
327        assert_eq!(tracker.current_usage(), 900);
328        assert!(!tracker.is_over_budget());
329    }
330}