stackforge_core/flow/
spill.rs1use std::io::Write;
8use std::path::Path;
9use std::sync::atomic::{AtomicUsize, Ordering};
10
11use memmap2::Mmap;
12use tempfile::NamedTempFile;
13
14#[derive(Debug)]
20pub enum ReassemblyStorage {
21 InMemory(Vec<u8>),
23 OnDisk { file: NamedTempFile, len: usize },
25}
26
27impl ReassemblyStorage {
28 pub fn new() -> Self {
30 Self::InMemory(Vec::new())
31 }
32
33 pub fn len(&self) -> usize {
35 match self {
36 Self::InMemory(v) => v.len(),
37 Self::OnDisk { len, .. } => *len,
38 }
39 }
40
41 pub fn is_empty(&self) -> bool {
43 self.len() == 0
44 }
45
46 pub fn extend_from_slice(&mut self, data: &[u8]) {
51 match self {
52 Self::InMemory(v) => v.extend_from_slice(data),
53 Self::OnDisk { file, len } => {
54 file.write_all(data)
55 .expect("failed to append to spill file");
56 file.flush().expect("failed to flush spill file");
57 *len += data.len();
58 },
59 }
60 }
61
62 pub fn spill_to_disk(&mut self, spill_dir: Option<&Path>) -> std::io::Result<usize> {
66 let old = std::mem::replace(self, Self::InMemory(Vec::new()));
67 match old {
68 Self::InMemory(data) => {
69 if data.is_empty() {
70 *self = Self::InMemory(data);
71 return Ok(0);
72 }
73 let freed = data.len();
74 let mut tmpfile = match spill_dir {
75 Some(dir) => NamedTempFile::new_in(dir)?,
76 None => NamedTempFile::new()?,
77 };
78 tmpfile.write_all(&data)?;
79 tmpfile.flush()?;
80 *self = Self::OnDisk {
81 file: tmpfile,
82 len: freed,
83 };
84 Ok(freed)
85 },
86 already_on_disk @ Self::OnDisk { .. } => {
87 *self = already_on_disk;
88 Ok(0)
89 },
90 }
91 }
92
93 pub fn read_all(&self) -> std::io::Result<Vec<u8>> {
95 match self {
96 Self::InMemory(v) => Ok(v.clone()),
97 Self::OnDisk { file, len } => {
98 if *len == 0 {
99 return Ok(Vec::new());
100 }
101 let mmap = unsafe { Mmap::map(file.as_file())? };
104 Ok(mmap[..*len].to_vec())
105 },
106 }
107 }
108
109 pub fn as_slice(&self) -> Option<&[u8]> {
111 match self {
112 Self::InMemory(v) => Some(v),
113 Self::OnDisk { .. } => None,
114 }
115 }
116
117 pub fn is_spilled(&self) -> bool {
119 matches!(self, Self::OnDisk { .. })
120 }
121
122 pub fn in_memory_bytes(&self) -> usize {
124 match self {
125 Self::InMemory(v) => v.len(),
126 Self::OnDisk { .. } => 0,
127 }
128 }
129
130 pub fn truncate(&mut self, max_len: usize) {
135 match self {
136 Self::InMemory(v) => v.truncate(max_len),
137 Self::OnDisk { len, .. } => {
138 if *len > max_len {
139 *len = max_len;
140 }
141 },
142 }
143 }
144
145 pub fn drain(&mut self) -> std::io::Result<Vec<u8>> {
147 let data = self.read_all()?;
148 *self = Self::InMemory(Vec::new());
149 Ok(data)
150 }
151}
152
153impl Default for ReassemblyStorage {
154 fn default() -> Self {
155 Self::new()
156 }
157}
158
159#[derive(Debug)]
163pub struct MemoryTracker {
164 current: AtomicUsize,
166 budget: Option<usize>,
168}
169
170impl MemoryTracker {
171 pub fn new(budget: Option<usize>) -> Self {
173 Self {
174 current: AtomicUsize::new(0),
175 budget,
176 }
177 }
178
179 pub fn add(&self, bytes: usize) {
181 self.current.fetch_add(bytes, Ordering::Relaxed);
182 }
183
184 pub fn subtract(&self, bytes: usize) {
186 self.current.fetch_sub(bytes, Ordering::Relaxed);
187 }
188
189 pub fn current_usage(&self) -> usize {
191 self.current.load(Ordering::Relaxed)
192 }
193
194 pub fn is_over_budget(&self) -> bool {
196 match self.budget {
197 Some(b) => self.current_usage() > b,
198 None => false,
199 }
200 }
201
202 pub fn has_budget(&self) -> bool {
204 self.budget.is_some()
205 }
206
207 pub fn budget(&self) -> Option<usize> {
209 self.budget
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216
217 #[test]
218 fn test_reassembly_storage_in_memory() {
219 let mut s = ReassemblyStorage::new();
220 assert!(s.is_empty());
221 assert_eq!(s.len(), 0);
222 assert!(!s.is_spilled());
223
224 s.extend_from_slice(b"hello");
225 assert_eq!(s.len(), 5);
226 assert_eq!(s.as_slice(), Some(b"hello".as_slice()));
227 assert_eq!(s.in_memory_bytes(), 5);
228
229 let data = s.read_all().unwrap();
230 assert_eq!(data, b"hello");
231 }
232
233 #[test]
234 fn test_reassembly_storage_spill_and_read() {
235 let mut s = ReassemblyStorage::new();
236 s.extend_from_slice(b"test data for spill");
237
238 let freed = s.spill_to_disk(None).unwrap();
239 assert_eq!(freed, 19);
240 assert!(s.is_spilled());
241 assert_eq!(s.len(), 19);
242 assert_eq!(s.in_memory_bytes(), 0);
243 assert!(s.as_slice().is_none());
244
245 let data = s.read_all().unwrap();
246 assert_eq!(data, b"test data for spill");
247 }
248
249 #[test]
250 fn test_reassembly_storage_spill_empty() {
251 let mut s = ReassemblyStorage::new();
252 let freed = s.spill_to_disk(None).unwrap();
253 assert_eq!(freed, 0);
254 assert!(!s.is_spilled());
255 }
256
257 #[test]
258 fn test_reassembly_storage_double_spill() {
259 let mut s = ReassemblyStorage::new();
260 s.extend_from_slice(b"data");
261 s.spill_to_disk(None).unwrap();
262
263 let freed = s.spill_to_disk(None).unwrap();
265 assert_eq!(freed, 0);
266 }
267
268 #[test]
269 fn test_reassembly_storage_drain() {
270 let mut s = ReassemblyStorage::new();
271 s.extend_from_slice(b"drain me");
272 let data = s.drain().unwrap();
273 assert_eq!(data, b"drain me");
274 assert!(s.is_empty());
275 }
276
277 #[test]
278 fn test_reassembly_storage_extend_after_spill() {
279 let mut s = ReassemblyStorage::new();
280 s.extend_from_slice(b"hello ");
281 s.spill_to_disk(None).unwrap();
282 assert!(s.is_spilled());
283
284 s.extend_from_slice(b"world");
286 assert_eq!(s.len(), 11);
287 assert!(s.is_spilled());
288
289 let data = s.read_all().unwrap();
290 assert_eq!(data, b"hello world");
291 }
292
293 #[test]
294 fn test_reassembly_storage_drain_spilled() {
295 let mut s = ReassemblyStorage::new();
296 s.extend_from_slice(b"spilled drain");
297 s.spill_to_disk(None).unwrap();
298 let data = s.drain().unwrap();
299 assert_eq!(data, b"spilled drain");
300 assert!(s.is_empty());
301 assert!(!s.is_spilled());
302 }
303
304 #[test]
305 fn test_memory_tracker_no_budget() {
306 let tracker = MemoryTracker::new(None);
307 assert!(!tracker.has_budget());
308 tracker.add(1_000_000);
309 assert!(!tracker.is_over_budget());
310 }
311
312 #[test]
313 fn test_memory_tracker_with_budget() {
314 let tracker = MemoryTracker::new(Some(1000));
315 assert!(tracker.has_budget());
316 assert_eq!(tracker.budget(), Some(1000));
317
318 tracker.add(500);
319 assert_eq!(tracker.current_usage(), 500);
320 assert!(!tracker.is_over_budget());
321
322 tracker.add(600);
323 assert_eq!(tracker.current_usage(), 1100);
324 assert!(tracker.is_over_budget());
325
326 tracker.subtract(200);
327 assert_eq!(tracker.current_usage(), 900);
328 assert!(!tracker.is_over_budget());
329 }
330}