stackforge_core/flow/
spill.rs1use std::io::Write;
8use std::path::Path;
9use std::sync::atomic::{AtomicUsize, Ordering};
10
11use memmap2::Mmap;
12use tempfile::NamedTempFile;
13
14#[derive(Debug)]
20pub enum ReassemblyStorage {
21 InMemory(Vec<u8>),
23 OnDisk { file: NamedTempFile, len: usize },
25}
26
27impl ReassemblyStorage {
28 pub fn new() -> Self {
30 Self::InMemory(Vec::new())
31 }
32
33 pub fn len(&self) -> usize {
35 match self {
36 Self::InMemory(v) => v.len(),
37 Self::OnDisk { len, .. } => *len,
38 }
39 }
40
41 pub fn is_empty(&self) -> bool {
43 self.len() == 0
44 }
45
46 pub fn extend_from_slice(&mut self, data: &[u8]) {
51 match self {
52 Self::InMemory(v) => v.extend_from_slice(data),
53 Self::OnDisk { file, len } => {
54 file.write_all(data)
55 .expect("failed to append to spill file");
56 file.flush().expect("failed to flush spill file");
57 *len += data.len();
58 },
59 }
60 }
61
62 pub fn spill_to_disk(&mut self, spill_dir: Option<&Path>) -> std::io::Result<usize> {
66 let old = std::mem::replace(self, Self::InMemory(Vec::new()));
67 match old {
68 Self::InMemory(data) => {
69 if data.is_empty() {
70 *self = Self::InMemory(data);
71 return Ok(0);
72 }
73 let freed = data.len();
74 let mut tmpfile = match spill_dir {
75 Some(dir) => NamedTempFile::new_in(dir)?,
76 None => NamedTempFile::new()?,
77 };
78 tmpfile.write_all(&data)?;
79 tmpfile.flush()?;
80 *self = Self::OnDisk {
81 file: tmpfile,
82 len: freed,
83 };
84 Ok(freed)
85 },
86 already_on_disk @ Self::OnDisk { .. } => {
87 *self = already_on_disk;
88 Ok(0)
89 },
90 }
91 }
92
93 pub fn read_all(&self) -> std::io::Result<Vec<u8>> {
95 match self {
96 Self::InMemory(v) => Ok(v.clone()),
97 Self::OnDisk { file, len } => {
98 if *len == 0 {
99 return Ok(Vec::new());
100 }
101 let mmap = unsafe { Mmap::map(file.as_file())? };
104 Ok(mmap[..*len].to_vec())
105 },
106 }
107 }
108
109 pub fn as_slice(&self) -> Option<&[u8]> {
111 match self {
112 Self::InMemory(v) => Some(v),
113 Self::OnDisk { .. } => None,
114 }
115 }
116
117 pub fn is_spilled(&self) -> bool {
119 matches!(self, Self::OnDisk { .. })
120 }
121
122 pub fn in_memory_bytes(&self) -> usize {
124 match self {
125 Self::InMemory(v) => v.len(),
126 Self::OnDisk { .. } => 0,
127 }
128 }
129
130 pub fn drain(&mut self) -> std::io::Result<Vec<u8>> {
132 let data = self.read_all()?;
133 *self = Self::InMemory(Vec::new());
134 Ok(data)
135 }
136}
137
138impl Default for ReassemblyStorage {
139 fn default() -> Self {
140 Self::new()
141 }
142}
143
144#[derive(Debug)]
148pub struct MemoryTracker {
149 current: AtomicUsize,
151 budget: Option<usize>,
153}
154
155impl MemoryTracker {
156 pub fn new(budget: Option<usize>) -> Self {
158 Self {
159 current: AtomicUsize::new(0),
160 budget,
161 }
162 }
163
164 pub fn add(&self, bytes: usize) {
166 self.current.fetch_add(bytes, Ordering::Relaxed);
167 }
168
169 pub fn subtract(&self, bytes: usize) {
171 self.current.fetch_sub(bytes, Ordering::Relaxed);
172 }
173
174 pub fn current_usage(&self) -> usize {
176 self.current.load(Ordering::Relaxed)
177 }
178
179 pub fn is_over_budget(&self) -> bool {
181 match self.budget {
182 Some(b) => self.current_usage() > b,
183 None => false,
184 }
185 }
186
187 pub fn has_budget(&self) -> bool {
189 self.budget.is_some()
190 }
191
192 pub fn budget(&self) -> Option<usize> {
194 self.budget
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201
202 #[test]
203 fn test_reassembly_storage_in_memory() {
204 let mut s = ReassemblyStorage::new();
205 assert!(s.is_empty());
206 assert_eq!(s.len(), 0);
207 assert!(!s.is_spilled());
208
209 s.extend_from_slice(b"hello");
210 assert_eq!(s.len(), 5);
211 assert_eq!(s.as_slice(), Some(b"hello".as_slice()));
212 assert_eq!(s.in_memory_bytes(), 5);
213
214 let data = s.read_all().unwrap();
215 assert_eq!(data, b"hello");
216 }
217
218 #[test]
219 fn test_reassembly_storage_spill_and_read() {
220 let mut s = ReassemblyStorage::new();
221 s.extend_from_slice(b"test data for spill");
222
223 let freed = s.spill_to_disk(None).unwrap();
224 assert_eq!(freed, 19);
225 assert!(s.is_spilled());
226 assert_eq!(s.len(), 19);
227 assert_eq!(s.in_memory_bytes(), 0);
228 assert!(s.as_slice().is_none());
229
230 let data = s.read_all().unwrap();
231 assert_eq!(data, b"test data for spill");
232 }
233
234 #[test]
235 fn test_reassembly_storage_spill_empty() {
236 let mut s = ReassemblyStorage::new();
237 let freed = s.spill_to_disk(None).unwrap();
238 assert_eq!(freed, 0);
239 assert!(!s.is_spilled());
240 }
241
242 #[test]
243 fn test_reassembly_storage_double_spill() {
244 let mut s = ReassemblyStorage::new();
245 s.extend_from_slice(b"data");
246 s.spill_to_disk(None).unwrap();
247
248 let freed = s.spill_to_disk(None).unwrap();
250 assert_eq!(freed, 0);
251 }
252
253 #[test]
254 fn test_reassembly_storage_drain() {
255 let mut s = ReassemblyStorage::new();
256 s.extend_from_slice(b"drain me");
257 let data = s.drain().unwrap();
258 assert_eq!(data, b"drain me");
259 assert!(s.is_empty());
260 }
261
262 #[test]
263 fn test_reassembly_storage_extend_after_spill() {
264 let mut s = ReassemblyStorage::new();
265 s.extend_from_slice(b"hello ");
266 s.spill_to_disk(None).unwrap();
267 assert!(s.is_spilled());
268
269 s.extend_from_slice(b"world");
271 assert_eq!(s.len(), 11);
272 assert!(s.is_spilled());
273
274 let data = s.read_all().unwrap();
275 assert_eq!(data, b"hello world");
276 }
277
278 #[test]
279 fn test_reassembly_storage_drain_spilled() {
280 let mut s = ReassemblyStorage::new();
281 s.extend_from_slice(b"spilled drain");
282 s.spill_to_disk(None).unwrap();
283 let data = s.drain().unwrap();
284 assert_eq!(data, b"spilled drain");
285 assert!(s.is_empty());
286 assert!(!s.is_spilled());
287 }
288
289 #[test]
290 fn test_memory_tracker_no_budget() {
291 let tracker = MemoryTracker::new(None);
292 assert!(!tracker.has_budget());
293 tracker.add(1_000_000);
294 assert!(!tracker.is_over_budget());
295 }
296
297 #[test]
298 fn test_memory_tracker_with_budget() {
299 let tracker = MemoryTracker::new(Some(1000));
300 assert!(tracker.has_budget());
301 assert_eq!(tracker.budget(), Some(1000));
302
303 tracker.add(500);
304 assert_eq!(tracker.current_usage(), 500);
305 assert!(!tracker.is_over_budget());
306
307 tracker.add(600);
308 assert_eq!(tracker.current_usage(), 1100);
309 assert!(tracker.is_over_budget());
310
311 tracker.subtract(200);
312 assert_eq!(tracker.current_usage(), 900);
313 assert!(!tracker.is_over_budget());
314 }
315}