stackforge_core/flow/
spill.rs1use std::io::Write;
8use std::path::Path;
9use std::sync::atomic::{AtomicUsize, Ordering};
10
11use memmap2::Mmap;
12use tempfile::NamedTempFile;
13
14#[derive(Debug)]
20pub enum ReassemblyStorage {
21 InMemory(Vec<u8>),
23 OnDisk { file: NamedTempFile, len: usize },
25}
26
27impl ReassemblyStorage {
28 pub fn new() -> Self {
30 Self::InMemory(Vec::new())
31 }
32
33 pub fn len(&self) -> usize {
35 match self {
36 Self::InMemory(v) => v.len(),
37 Self::OnDisk { len, .. } => *len,
38 }
39 }
40
41 pub fn is_empty(&self) -> bool {
43 self.len() == 0
44 }
45
46 pub fn extend_from_slice(&mut self, data: &[u8]) {
53 match self {
54 Self::InMemory(v) => v.extend_from_slice(data),
55 Self::OnDisk { .. } => {
56 panic!("cannot extend spilled storage; data already on disk");
57 },
58 }
59 }
60
61 pub fn spill_to_disk(&mut self, spill_dir: Option<&Path>) -> std::io::Result<usize> {
65 let old = std::mem::replace(self, Self::InMemory(Vec::new()));
66 match old {
67 Self::InMemory(data) => {
68 if data.is_empty() {
69 *self = Self::InMemory(data);
70 return Ok(0);
71 }
72 let freed = data.len();
73 let mut tmpfile = match spill_dir {
74 Some(dir) => NamedTempFile::new_in(dir)?,
75 None => NamedTempFile::new()?,
76 };
77 tmpfile.write_all(&data)?;
78 tmpfile.flush()?;
79 *self = Self::OnDisk {
80 file: tmpfile,
81 len: freed,
82 };
83 Ok(freed)
84 },
85 already_on_disk @ Self::OnDisk { .. } => {
86 *self = already_on_disk;
87 Ok(0)
88 },
89 }
90 }
91
92 pub fn read_all(&self) -> std::io::Result<Vec<u8>> {
94 match self {
95 Self::InMemory(v) => Ok(v.clone()),
96 Self::OnDisk { file, len } => {
97 if *len == 0 {
98 return Ok(Vec::new());
99 }
100 let mmap = unsafe { Mmap::map(file.as_file())? };
103 Ok(mmap[..*len].to_vec())
104 },
105 }
106 }
107
108 pub fn as_slice(&self) -> Option<&[u8]> {
110 match self {
111 Self::InMemory(v) => Some(v),
112 Self::OnDisk { .. } => None,
113 }
114 }
115
116 pub fn is_spilled(&self) -> bool {
118 matches!(self, Self::OnDisk { .. })
119 }
120
121 pub fn in_memory_bytes(&self) -> usize {
123 match self {
124 Self::InMemory(v) => v.len(),
125 Self::OnDisk { .. } => 0,
126 }
127 }
128
129 pub fn drain(&mut self) -> std::io::Result<Vec<u8>> {
131 let data = self.read_all()?;
132 *self = Self::InMemory(Vec::new());
133 Ok(data)
134 }
135}
136
137impl Default for ReassemblyStorage {
138 fn default() -> Self {
139 Self::new()
140 }
141}
142
143#[derive(Debug)]
147pub struct MemoryTracker {
148 current: AtomicUsize,
150 budget: Option<usize>,
152}
153
154impl MemoryTracker {
155 pub fn new(budget: Option<usize>) -> Self {
157 Self {
158 current: AtomicUsize::new(0),
159 budget,
160 }
161 }
162
163 pub fn add(&self, bytes: usize) {
165 self.current.fetch_add(bytes, Ordering::Relaxed);
166 }
167
168 pub fn subtract(&self, bytes: usize) {
170 self.current.fetch_sub(bytes, Ordering::Relaxed);
171 }
172
173 pub fn current_usage(&self) -> usize {
175 self.current.load(Ordering::Relaxed)
176 }
177
178 pub fn is_over_budget(&self) -> bool {
180 match self.budget {
181 Some(b) => self.current_usage() > b,
182 None => false,
183 }
184 }
185
186 pub fn has_budget(&self) -> bool {
188 self.budget.is_some()
189 }
190
191 pub fn budget(&self) -> Option<usize> {
193 self.budget
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 #[test]
202 fn test_reassembly_storage_in_memory() {
203 let mut s = ReassemblyStorage::new();
204 assert!(s.is_empty());
205 assert_eq!(s.len(), 0);
206 assert!(!s.is_spilled());
207
208 s.extend_from_slice(b"hello");
209 assert_eq!(s.len(), 5);
210 assert_eq!(s.as_slice(), Some(b"hello".as_slice()));
211 assert_eq!(s.in_memory_bytes(), 5);
212
213 let data = s.read_all().unwrap();
214 assert_eq!(data, b"hello");
215 }
216
217 #[test]
218 fn test_reassembly_storage_spill_and_read() {
219 let mut s = ReassemblyStorage::new();
220 s.extend_from_slice(b"test data for spill");
221
222 let freed = s.spill_to_disk(None).unwrap();
223 assert_eq!(freed, 19);
224 assert!(s.is_spilled());
225 assert_eq!(s.len(), 19);
226 assert_eq!(s.in_memory_bytes(), 0);
227 assert!(s.as_slice().is_none());
228
229 let data = s.read_all().unwrap();
230 assert_eq!(data, b"test data for spill");
231 }
232
233 #[test]
234 fn test_reassembly_storage_spill_empty() {
235 let mut s = ReassemblyStorage::new();
236 let freed = s.spill_to_disk(None).unwrap();
237 assert_eq!(freed, 0);
238 assert!(!s.is_spilled());
239 }
240
241 #[test]
242 fn test_reassembly_storage_double_spill() {
243 let mut s = ReassemblyStorage::new();
244 s.extend_from_slice(b"data");
245 s.spill_to_disk(None).unwrap();
246
247 let freed = s.spill_to_disk(None).unwrap();
249 assert_eq!(freed, 0);
250 }
251
252 #[test]
253 fn test_reassembly_storage_drain() {
254 let mut s = ReassemblyStorage::new();
255 s.extend_from_slice(b"drain me");
256 let data = s.drain().unwrap();
257 assert_eq!(data, b"drain me");
258 assert!(s.is_empty());
259 }
260
261 #[test]
262 fn test_reassembly_storage_drain_spilled() {
263 let mut s = ReassemblyStorage::new();
264 s.extend_from_slice(b"spilled drain");
265 s.spill_to_disk(None).unwrap();
266 let data = s.drain().unwrap();
267 assert_eq!(data, b"spilled drain");
268 assert!(s.is_empty());
269 assert!(!s.is_spilled());
270 }
271
272 #[test]
273 fn test_memory_tracker_no_budget() {
274 let tracker = MemoryTracker::new(None);
275 assert!(!tracker.has_budget());
276 tracker.add(1_000_000);
277 assert!(!tracker.is_over_budget());
278 }
279
280 #[test]
281 fn test_memory_tracker_with_budget() {
282 let tracker = MemoryTracker::new(Some(1000));
283 assert!(tracker.has_budget());
284 assert_eq!(tracker.budget(), Some(1000));
285
286 tracker.add(500);
287 assert_eq!(tracker.current_usage(), 500);
288 assert!(!tracker.is_over_budget());
289
290 tracker.add(600);
291 assert_eq!(tracker.current_usage(), 1100);
292 assert!(tracker.is_over_budget());
293
294 tracker.subtract(200);
295 assert_eq!(tracker.current_usage(), 900);
296 assert!(!tracker.is_over_budget());
297 }
298}