1use std::io::{self, Read, Seek, SeekFrom, Write};
28use std::path::PathBuf;
29use std::sync::atomic::{AtomicUsize, Ordering};
30
31#[derive(Debug, Clone)]
33pub struct LargeBufferConfig {
34 pub initial_capacity: usize,
36 pub max_capacity: usize,
38 pub spill_threshold: usize,
40 pub temp_dir: Option<PathBuf>,
42}
43
44impl Default for LargeBufferConfig {
45 fn default() -> Self {
46 Self {
47 initial_capacity: 64 * 1024, max_capacity: 1024 * 1024 * 1024, spill_threshold: 64 * 1024 * 1024, temp_dir: None,
51 }
52 }
53}
54
55impl LargeBufferConfig {
56 #[must_use]
58 pub fn new(max_capacity: usize) -> Self {
59 Self {
60 max_capacity,
61 ..Default::default()
62 }
63 }
64
65 #[must_use]
67 pub const fn initial_capacity(mut self, capacity: usize) -> Self {
68 self.initial_capacity = capacity;
69 self
70 }
71
72 #[must_use]
74 pub const fn spill_threshold(mut self, threshold: usize) -> Self {
75 self.spill_threshold = threshold;
76 self
77 }
78
79 #[must_use]
81 pub fn temp_dir(mut self, dir: impl Into<PathBuf>) -> Self {
82 self.temp_dir = Some(dir.into());
83 self
84 }
85}
86
87#[derive(Debug)]
91pub struct RingBuffer {
92 data: Vec<u8>,
93 capacity: usize,
94 head: usize,
95 tail: usize,
96 full: bool,
97}
98
99impl RingBuffer {
100 #[must_use]
102 pub fn new(capacity: usize) -> Self {
103 Self {
104 data: vec![0u8; capacity],
105 capacity,
106 head: 0,
107 tail: 0,
108 full: false,
109 }
110 }
111
112 #[must_use]
114 pub const fn len(&self) -> usize {
115 if self.full {
116 self.capacity
117 } else if self.head >= self.tail {
118 self.head - self.tail
119 } else {
120 self.capacity - self.tail + self.head
121 }
122 }
123
124 #[must_use]
126 pub const fn is_empty(&self) -> bool {
127 !self.full && self.head == self.tail
128 }
129
130 #[must_use]
132 pub const fn is_full(&self) -> bool {
133 self.full
134 }
135
136 #[must_use]
138 pub const fn capacity(&self) -> usize {
139 self.capacity
140 }
141
142 pub fn write(&mut self, data: &[u8]) {
146 for &byte in data {
147 self.data[self.head] = byte;
148 self.head = (self.head + 1) % self.capacity;
149
150 if self.full {
151 self.tail = (self.tail + 1) % self.capacity;
152 }
153
154 if self.head == self.tail {
155 self.full = true;
156 }
157 }
158 }
159
160 #[must_use]
164 pub fn read_all(&self) -> Vec<u8> {
165 let len = self.len();
166 let mut result = Vec::with_capacity(len);
167
168 if len == 0 {
169 return result;
170 }
171
172 if self.head > self.tail {
173 result.extend_from_slice(&self.data[self.tail..self.head]);
174 } else {
175 result.extend_from_slice(&self.data[self.tail..]);
176 result.extend_from_slice(&self.data[..self.head]);
177 }
178
179 result
180 }
181
182 #[must_use]
184 pub fn as_string(&self) -> String {
185 String::from_utf8_lossy(&self.read_all()).into_owned()
186 }
187
188 pub const fn clear(&mut self) {
190 self.head = 0;
191 self.tail = 0;
192 self.full = false;
193 }
194
195 #[must_use]
197 pub fn tail_bytes(&self, n: usize) -> Vec<u8> {
198 let len = self.len();
199 if n >= len {
200 return self.read_all();
201 }
202
203 let all = self.read_all();
204 all[len - n..].to_vec()
205 }
206}
207
208enum Storage {
210 Memory(Vec<u8>),
212 File {
214 file: std::fs::File,
215 path: PathBuf,
216 size: usize,
217 },
218}
219
220pub struct SpillBuffer {
225 storage: Storage,
226 config: LargeBufferConfig,
227 write_pos: usize,
228 spilled: bool,
229}
230
231impl SpillBuffer {
232 #[must_use]
234 pub fn new() -> Self {
235 Self::with_config(LargeBufferConfig::default())
236 }
237
238 #[must_use]
240 pub fn with_config(config: LargeBufferConfig) -> Self {
241 Self {
242 storage: Storage::Memory(Vec::with_capacity(config.initial_capacity)),
243 config,
244 write_pos: 0,
245 spilled: false,
246 }
247 }
248
249 #[must_use]
251 pub const fn is_spilled(&self) -> bool {
252 self.spilled
253 }
254
255 #[must_use]
257 pub const fn len(&self) -> usize {
258 self.write_pos
259 }
260
261 #[must_use]
263 pub const fn is_empty(&self) -> bool {
264 self.write_pos == 0
265 }
266
267 pub fn write(&mut self, data: &[u8]) -> io::Result<()> {
273 let new_size = self.write_pos + data.len();
274
275 if !self.spilled
277 && self.config.spill_threshold > 0
278 && new_size > self.config.spill_threshold
279 {
280 self.spill_to_disk()?;
281 }
282
283 if new_size > self.config.max_capacity {
285 return Err(io::Error::new(
286 io::ErrorKind::StorageFull,
287 "Buffer exceeded maximum capacity",
288 ));
289 }
290
291 match &mut self.storage {
292 Storage::Memory(vec) => {
293 vec.extend_from_slice(data);
294 self.write_pos = vec.len();
295 }
296 Storage::File { file, size, .. } => {
297 file.seek(SeekFrom::End(0))?;
298 file.write_all(data)?;
299 *size += data.len();
300 self.write_pos = *size;
301 }
302 }
303
304 Ok(())
305 }
306
307 fn spill_to_disk(&mut self) -> io::Result<()> {
309 if self.spilled {
310 return Ok(());
311 }
312
313 let temp_dir = self
314 .config
315 .temp_dir
316 .as_ref()
317 .map_or_else(std::env::temp_dir, std::clone::Clone::clone);
318
319 let path = temp_dir.join(format!("rust_expect_buffer_{}", std::process::id()));
320
321 let mut file = std::fs::OpenOptions::new()
322 .read(true)
323 .write(true)
324 .create(true)
325 .truncate(true)
326 .open(&path)?;
327
328 if let Storage::Memory(vec) = &self.storage {
330 file.write_all(vec)?;
331 }
332
333 let size = self.write_pos;
334 self.storage = Storage::File { file, path, size };
335 self.spilled = true;
336
337 Ok(())
338 }
339
340 pub fn read_all(&mut self) -> io::Result<Vec<u8>> {
346 match &mut self.storage {
347 Storage::Memory(vec) => Ok(vec.clone()),
348 Storage::File { file, size, .. } => {
349 file.seek(SeekFrom::Start(0))?;
350 let mut data = vec![0u8; *size];
351 file.read_exact(&mut data)?;
352 Ok(data)
353 }
354 }
355 }
356
357 pub fn as_string(&mut self) -> io::Result<String> {
363 Ok(String::from_utf8_lossy(&self.read_all()?).into_owned())
364 }
365
366 pub fn clear(&mut self) -> io::Result<()> {
374 match &mut self.storage {
375 Storage::Memory(vec) => {
376 vec.clear();
377 }
378 Storage::File { file, size, .. } => {
379 file.set_len(0)?;
380 *size = 0;
381 }
382 }
383 self.write_pos = 0;
384 Ok(())
385 }
386}
387
388impl Default for SpillBuffer {
389 fn default() -> Self {
390 Self::new()
391 }
392}
393
394impl Drop for SpillBuffer {
395 fn drop(&mut self) {
396 if let Storage::File { path, .. } = &self.storage {
398 let _ = std::fs::remove_file(path);
399 }
400 }
401}
402
403impl std::fmt::Debug for SpillBuffer {
404 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
405 f.debug_struct("SpillBuffer")
406 .field("len", &self.write_pos)
407 .field("spilled", &self.spilled)
408 .field("max_capacity", &self.config.max_capacity)
409 .finish()
410 }
411}
412
413#[derive(Debug, Default)]
415pub struct AtomicBufferSize {
416 size: AtomicUsize,
417}
418
419impl AtomicBufferSize {
420 #[must_use]
422 pub const fn new() -> Self {
423 Self {
424 size: AtomicUsize::new(0),
425 }
426 }
427
428 #[must_use]
430 pub fn get(&self) -> usize {
431 self.size.load(Ordering::Relaxed)
432 }
433
434 pub fn add(&self, n: usize) {
436 self.size.fetch_add(n, Ordering::Relaxed);
437 }
438
439 pub fn sub(&self, n: usize) {
441 self.size.fetch_sub(n, Ordering::Relaxed);
442 }
443
444 pub fn set(&self, n: usize) {
446 self.size.store(n, Ordering::Relaxed);
447 }
448
449 pub fn reset(&self) {
451 self.size.store(0, Ordering::Relaxed);
452 }
453}
454
455#[cfg(unix)]
462#[must_use]
463pub fn allocate_page_aligned(size: usize) -> Vec<u8> {
464 let page_size = page_size();
466 let aligned_size = (size + page_size - 1) & !(page_size - 1);
467
468 vec![0u8; aligned_size]
471}
472
473#[cfg(unix)]
475#[must_use]
476#[allow(unsafe_code)]
477pub fn page_size() -> usize {
478 let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
480 if size <= 0 {
481 4096 } else {
483 size as usize
484 }
485}
486
487#[cfg(windows)]
489#[must_use]
490pub fn page_size() -> usize {
491 4096 }
493
494#[cfg(windows)]
496#[must_use]
497pub fn allocate_page_aligned(size: usize) -> Vec<u8> {
498 vec![0u8; size]
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504
505 #[test]
506 fn ring_buffer_basic() {
507 let mut buf = RingBuffer::new(10);
508 assert!(buf.is_empty());
509 assert_eq!(buf.capacity(), 10);
510
511 buf.write(b"hello");
512 assert_eq!(buf.len(), 5);
513 assert_eq!(buf.as_string(), "hello");
514 }
515
516 #[test]
517 fn ring_buffer_wrap() {
518 let mut buf = RingBuffer::new(10);
519 buf.write(b"12345678"); assert_eq!(buf.len(), 8);
521
522 buf.write(b"ABCD"); assert_eq!(buf.len(), 10); assert!(buf.is_full());
525
526 let content = buf.as_string();
528 assert_eq!(content.len(), 10);
529 assert!(content.ends_with("ABCD"));
530 }
531
532 #[test]
533 fn ring_buffer_tail_bytes() {
534 let mut buf = RingBuffer::new(20);
535 buf.write(b"hello world");
536
537 let tail = buf.tail_bytes(5);
538 assert_eq!(tail, b"world");
539
540 let tail = buf.tail_bytes(100);
541 assert_eq!(tail, b"hello world");
542 }
543
544 #[test]
545 fn ring_buffer_clear() {
546 let mut buf = RingBuffer::new(10);
547 buf.write(b"hello");
548 buf.clear();
549
550 assert!(buf.is_empty());
551 assert_eq!(buf.len(), 0);
552 }
553
554 #[test]
555 fn spill_buffer_memory() {
556 let config = LargeBufferConfig::new(1024 * 1024).spill_threshold(0); let mut buf = SpillBuffer::with_config(config);
559 buf.write(b"hello world").unwrap();
560
561 assert!(!buf.is_spilled());
562 assert_eq!(buf.len(), 11);
563 assert_eq!(buf.as_string().unwrap(), "hello world");
564 }
565
566 #[test]
567 fn spill_buffer_spill() {
568 let config = LargeBufferConfig::new(1024 * 1024).spill_threshold(10); let mut buf = SpillBuffer::with_config(config);
571 buf.write(b"hello").unwrap();
572 assert!(!buf.is_spilled());
573
574 buf.write(b"world!!!").unwrap();
575 assert!(buf.is_spilled());
576 assert_eq!(buf.as_string().unwrap(), "helloworld!!!");
577 }
578
579 #[test]
580 fn atomic_buffer_size() {
581 let size = AtomicBufferSize::new();
582 assert_eq!(size.get(), 0);
583
584 size.add(100);
585 assert_eq!(size.get(), 100);
586
587 size.sub(30);
588 assert_eq!(size.get(), 70);
589
590 size.set(500);
591 assert_eq!(size.get(), 500);
592
593 size.reset();
594 assert_eq!(size.get(), 0);
595 }
596
597 #[test]
598 fn page_aligned_allocation() {
599 let buf = allocate_page_aligned(1000);
600 assert!(buf.len() >= 1000);
601
602 let page = page_size();
603 assert!(page >= 4096);
604 }
605}