Skip to main content

stackforge_core/flow/
spill.rs

1//! Disk spill manager for memory-budgeted flow extraction.
2//!
3//! Provides `ReassemblyStorage` for data that can be transparently spilled to
4//! mmap'd temp files when RAM budget is exceeded, and `MemoryTracker` for
5//! global memory accounting.
6
7use std::io::Write;
8use std::path::Path;
9use std::sync::atomic::{AtomicUsize, Ordering};
10
11use memmap2::Mmap;
12use tempfile::NamedTempFile;
13
14/// A handle to reassembly data that may be in memory or on disk.
15///
16/// When in memory, behaves like a `Vec<u8>`. When spilled, data lives in a
17/// temporary file and is read back via `mmap` on demand. The temp file is
18/// auto-deleted when this value is dropped.
19#[derive(Debug)]
20pub enum ReassemblyStorage {
21    /// Data held in memory.
22    InMemory(Vec<u8>),
23    /// Data flushed to a memory-mapped temporary file.
24    OnDisk { file: NamedTempFile, len: usize },
25}
26
27impl ReassemblyStorage {
28    /// Create new empty in-memory storage.
29    pub fn new() -> Self {
30        Self::InMemory(Vec::new())
31    }
32
33    /// Current byte length of stored data.
34    pub fn len(&self) -> usize {
35        match self {
36            Self::InMemory(v) => v.len(),
37            Self::OnDisk { len, .. } => *len,
38        }
39    }
40
41    /// Whether storage is empty.
42    pub fn is_empty(&self) -> bool {
43        self.len() == 0
44    }
45
46    /// Append data. Only valid when `InMemory`.
47    ///
48    /// # Panics
49    /// Panics if storage has been spilled to disk. Callers must ensure data
50    /// is not appended after a spill (in practice, spilling only happens
51    /// for completed/idle flows).
52    pub fn extend_from_slice(&mut self, data: &[u8]) {
53        match self {
54            Self::InMemory(v) => v.extend_from_slice(data),
55            Self::OnDisk { .. } => {
56                panic!("cannot extend spilled storage; data already on disk");
57            },
58        }
59    }
60
61    /// Spill in-memory data to disk, returning the number of bytes freed.
62    ///
63    /// If already on disk or empty, returns 0.
64    pub fn spill_to_disk(&mut self, spill_dir: Option<&Path>) -> std::io::Result<usize> {
65        let old = std::mem::replace(self, Self::InMemory(Vec::new()));
66        match old {
67            Self::InMemory(data) => {
68                if data.is_empty() {
69                    *self = Self::InMemory(data);
70                    return Ok(0);
71                }
72                let freed = data.len();
73                let mut tmpfile = match spill_dir {
74                    Some(dir) => NamedTempFile::new_in(dir)?,
75                    None => NamedTempFile::new()?,
76                };
77                tmpfile.write_all(&data)?;
78                tmpfile.flush()?;
79                *self = Self::OnDisk {
80                    file: tmpfile,
81                    len: freed,
82                };
83                Ok(freed)
84            },
85            already_on_disk @ Self::OnDisk { .. } => {
86                *self = already_on_disk;
87                Ok(0)
88            },
89        }
90    }
91
92    /// Read all data back. Works for both in-memory and on-disk storage.
93    pub fn read_all(&self) -> std::io::Result<Vec<u8>> {
94        match self {
95            Self::InMemory(v) => Ok(v.clone()),
96            Self::OnDisk { file, len } => {
97                if *len == 0 {
98                    return Ok(Vec::new());
99                }
100                // Safety: the file is exclusively ours (NamedTempFile), and we
101                // only read from it after flushing. The mmap is short-lived.
102                let mmap = unsafe { Mmap::map(file.as_file())? };
103                Ok(mmap[..*len].to_vec())
104            },
105        }
106    }
107
108    /// Get a reference to in-memory data. Returns `None` if spilled.
109    pub fn as_slice(&self) -> Option<&[u8]> {
110        match self {
111            Self::InMemory(v) => Some(v),
112            Self::OnDisk { .. } => None,
113        }
114    }
115
116    /// Whether data is currently on disk.
117    pub fn is_spilled(&self) -> bool {
118        matches!(self, Self::OnDisk { .. })
119    }
120
121    /// Bytes currently held in memory (0 if spilled).
122    pub fn in_memory_bytes(&self) -> usize {
123        match self {
124            Self::InMemory(v) => v.len(),
125            Self::OnDisk { .. } => 0,
126        }
127    }
128
129    /// Drain and return data, resetting to empty. Reads from disk if spilled.
130    pub fn drain(&mut self) -> std::io::Result<Vec<u8>> {
131        let data = self.read_all()?;
132        *self = Self::InMemory(Vec::new());
133        Ok(data)
134    }
135}
136
137impl Default for ReassemblyStorage {
138    fn default() -> Self {
139        Self::new()
140    }
141}
142
143/// Global memory tracker for the flow extraction engine.
144///
145/// Uses atomic operations for thread-safe accounting without locks.
146#[derive(Debug)]
147pub struct MemoryTracker {
148    /// Current estimated memory usage in bytes.
149    current: AtomicUsize,
150    /// Budget limit (None = unlimited).
151    budget: Option<usize>,
152}
153
154impl MemoryTracker {
155    /// Create a new tracker with an optional budget.
156    pub fn new(budget: Option<usize>) -> Self {
157        Self {
158            current: AtomicUsize::new(0),
159            budget,
160        }
161    }
162
163    /// Record newly allocated bytes.
164    pub fn add(&self, bytes: usize) {
165        self.current.fetch_add(bytes, Ordering::Relaxed);
166    }
167
168    /// Record freed bytes.
169    pub fn subtract(&self, bytes: usize) {
170        self.current.fetch_sub(bytes, Ordering::Relaxed);
171    }
172
173    /// Current estimated memory usage.
174    pub fn current_usage(&self) -> usize {
175        self.current.load(Ordering::Relaxed)
176    }
177
178    /// Whether current usage exceeds the budget.
179    pub fn is_over_budget(&self) -> bool {
180        match self.budget {
181            Some(b) => self.current_usage() > b,
182            None => false,
183        }
184    }
185
186    /// Whether a budget has been set.
187    pub fn has_budget(&self) -> bool {
188        self.budget.is_some()
189    }
190
191    /// The configured budget, if any.
192    pub fn budget(&self) -> Option<usize> {
193        self.budget
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[test]
202    fn test_reassembly_storage_in_memory() {
203        let mut s = ReassemblyStorage::new();
204        assert!(s.is_empty());
205        assert_eq!(s.len(), 0);
206        assert!(!s.is_spilled());
207
208        s.extend_from_slice(b"hello");
209        assert_eq!(s.len(), 5);
210        assert_eq!(s.as_slice(), Some(b"hello".as_slice()));
211        assert_eq!(s.in_memory_bytes(), 5);
212
213        let data = s.read_all().unwrap();
214        assert_eq!(data, b"hello");
215    }
216
217    #[test]
218    fn test_reassembly_storage_spill_and_read() {
219        let mut s = ReassemblyStorage::new();
220        s.extend_from_slice(b"test data for spill");
221
222        let freed = s.spill_to_disk(None).unwrap();
223        assert_eq!(freed, 19);
224        assert!(s.is_spilled());
225        assert_eq!(s.len(), 19);
226        assert_eq!(s.in_memory_bytes(), 0);
227        assert!(s.as_slice().is_none());
228
229        let data = s.read_all().unwrap();
230        assert_eq!(data, b"test data for spill");
231    }
232
233    #[test]
234    fn test_reassembly_storage_spill_empty() {
235        let mut s = ReassemblyStorage::new();
236        let freed = s.spill_to_disk(None).unwrap();
237        assert_eq!(freed, 0);
238        assert!(!s.is_spilled());
239    }
240
241    #[test]
242    fn test_reassembly_storage_double_spill() {
243        let mut s = ReassemblyStorage::new();
244        s.extend_from_slice(b"data");
245        s.spill_to_disk(None).unwrap();
246
247        // Second spill should be a no-op
248        let freed = s.spill_to_disk(None).unwrap();
249        assert_eq!(freed, 0);
250    }
251
252    #[test]
253    fn test_reassembly_storage_drain() {
254        let mut s = ReassemblyStorage::new();
255        s.extend_from_slice(b"drain me");
256        let data = s.drain().unwrap();
257        assert_eq!(data, b"drain me");
258        assert!(s.is_empty());
259    }
260
261    #[test]
262    fn test_reassembly_storage_drain_spilled() {
263        let mut s = ReassemblyStorage::new();
264        s.extend_from_slice(b"spilled drain");
265        s.spill_to_disk(None).unwrap();
266        let data = s.drain().unwrap();
267        assert_eq!(data, b"spilled drain");
268        assert!(s.is_empty());
269        assert!(!s.is_spilled());
270    }
271
272    #[test]
273    fn test_memory_tracker_no_budget() {
274        let tracker = MemoryTracker::new(None);
275        assert!(!tracker.has_budget());
276        tracker.add(1_000_000);
277        assert!(!tracker.is_over_budget());
278    }
279
280    #[test]
281    fn test_memory_tracker_with_budget() {
282        let tracker = MemoryTracker::new(Some(1000));
283        assert!(tracker.has_budget());
284        assert_eq!(tracker.budget(), Some(1000));
285
286        tracker.add(500);
287        assert_eq!(tracker.current_usage(), 500);
288        assert!(!tracker.is_over_budget());
289
290        tracker.add(600);
291        assert_eq!(tracker.current_usage(), 1100);
292        assert!(tracker.is_over_budget());
293
294        tracker.subtract(200);
295        assert_eq!(tracker.current_usage(), 900);
296        assert!(!tracker.is_over_budget());
297    }
298}