rust_expect/util/
buffer.rs

1//! Memory-efficient buffer implementations.
2//!
3//! This module provides buffer implementations optimized for different
4//! use cases:
5//!
6//! - `RingBuffer`: Efficient circular buffer for streaming data
7//! - `MmapBuffer`: Memory-mapped buffer for large data sets
8//! - `SplitBuffer`: Buffer that spills to disk when threshold is exceeded
9//!
10//! # Memory-Mapped Buffers
11//!
12//! For very large terminal sessions (e.g., log processing, long-running scripts),
13//! memory-mapped buffers can reduce heap pressure by using virtual memory
14//! backed by temporary files.
15//!
16//! ```rust,ignore
17//! use rust_expect::util::buffer::{MmapBuffer, BufferConfig};
18//!
19//! // Create a 1GB memory-mapped buffer
20//! let buffer = MmapBuffer::new(1024 * 1024 * 1024)?;
21//!
22//! // Use like a regular buffer
23//! buffer.write(b"Hello, world!")?;
24//! let data = buffer.read_all();
25//! ```
26
27use std::io::{self, Read, Seek, SeekFrom, Write};
28use std::path::PathBuf;
29use std::sync::atomic::{AtomicUsize, Ordering};
30
31/// Configuration for buffer behavior.
32#[derive(Debug, Clone)]
33pub struct LargeBufferConfig {
34    /// Initial capacity in bytes.
35    pub initial_capacity: usize,
36    /// Maximum capacity in bytes.
37    pub max_capacity: usize,
38    /// Threshold at which to spill to disk (0 = never).
39    pub spill_threshold: usize,
40    /// Directory for temporary files.
41    pub temp_dir: Option<PathBuf>,
42}
43
44impl Default for LargeBufferConfig {
45    fn default() -> Self {
46        Self {
47            initial_capacity: 64 * 1024,       // 64KB
48            max_capacity: 1024 * 1024 * 1024,  // 1GB
49            spill_threshold: 64 * 1024 * 1024, // 64MB
50            temp_dir: None,
51        }
52    }
53}
54
55impl LargeBufferConfig {
56    /// Create a new configuration with the given max capacity.
57    #[must_use]
58    pub fn new(max_capacity: usize) -> Self {
59        Self {
60            max_capacity,
61            ..Default::default()
62        }
63    }
64
65    /// Set initial capacity.
66    #[must_use]
67    pub const fn initial_capacity(mut self, capacity: usize) -> Self {
68        self.initial_capacity = capacity;
69        self
70    }
71
72    /// Set spill threshold.
73    #[must_use]
74    pub const fn spill_threshold(mut self, threshold: usize) -> Self {
75        self.spill_threshold = threshold;
76        self
77    }
78
79    /// Set temporary directory.
80    #[must_use]
81    pub fn temp_dir(mut self, dir: impl Into<PathBuf>) -> Self {
82        self.temp_dir = Some(dir.into());
83        self
84    }
85}
86
87/// A circular ring buffer for streaming data.
88///
89/// When the buffer is full, oldest data is overwritten.
90#[derive(Debug)]
91pub struct RingBuffer {
92    data: Vec<u8>,
93    capacity: usize,
94    head: usize,
95    tail: usize,
96    full: bool,
97}
98
99impl RingBuffer {
100    /// Create a new ring buffer with the given capacity.
101    #[must_use]
102    pub fn new(capacity: usize) -> Self {
103        Self {
104            data: vec![0u8; capacity],
105            capacity,
106            head: 0,
107            tail: 0,
108            full: false,
109        }
110    }
111
112    /// Get the current length of data in the buffer.
113    #[must_use]
114    pub const fn len(&self) -> usize {
115        if self.full {
116            self.capacity
117        } else if self.head >= self.tail {
118            self.head - self.tail
119        } else {
120            self.capacity - self.tail + self.head
121        }
122    }
123
124    /// Check if the buffer is empty.
125    #[must_use]
126    pub const fn is_empty(&self) -> bool {
127        !self.full && self.head == self.tail
128    }
129
130    /// Check if the buffer is full.
131    #[must_use]
132    pub const fn is_full(&self) -> bool {
133        self.full
134    }
135
136    /// Get the capacity of the buffer.
137    #[must_use]
138    pub const fn capacity(&self) -> usize {
139        self.capacity
140    }
141
142    /// Write data to the buffer.
143    ///
144    /// If the buffer is full, oldest data is overwritten.
145    pub fn write(&mut self, data: &[u8]) {
146        for &byte in data {
147            self.data[self.head] = byte;
148            self.head = (self.head + 1) % self.capacity;
149
150            if self.full {
151                self.tail = (self.tail + 1) % self.capacity;
152            }
153
154            if self.head == self.tail {
155                self.full = true;
156            }
157        }
158    }
159
160    /// Read all data from the buffer.
161    ///
162    /// This does not consume the data.
163    #[must_use]
164    pub fn read_all(&self) -> Vec<u8> {
165        let len = self.len();
166        let mut result = Vec::with_capacity(len);
167
168        if len == 0 {
169            return result;
170        }
171
172        if self.head > self.tail {
173            result.extend_from_slice(&self.data[self.tail..self.head]);
174        } else {
175            result.extend_from_slice(&self.data[self.tail..]);
176            result.extend_from_slice(&self.data[..self.head]);
177        }
178
179        result
180    }
181
182    /// Read as a string (lossy UTF-8 conversion).
183    #[must_use]
184    pub fn as_string(&self) -> String {
185        String::from_utf8_lossy(&self.read_all()).into_owned()
186    }
187
188    /// Clear the buffer.
189    pub const fn clear(&mut self) {
190        self.head = 0;
191        self.tail = 0;
192        self.full = false;
193    }
194
195    /// Get the last N bytes from the buffer.
196    #[must_use]
197    pub fn tail_bytes(&self, n: usize) -> Vec<u8> {
198        let len = self.len();
199        if n >= len {
200            return self.read_all();
201        }
202
203        let all = self.read_all();
204        all[len - n..].to_vec()
205    }
206}
207
208/// Storage backend for large buffers.
209enum Storage {
210    /// In-memory storage.
211    Memory(Vec<u8>),
212    /// File-backed storage.
213    File {
214        file: std::fs::File,
215        path: PathBuf,
216        size: usize,
217    },
218}
219
220/// A buffer that can spill to disk for very large data sets.
221///
222/// Starts in memory and automatically spills to disk when the
223/// spill threshold is exceeded.
224pub struct SpillBuffer {
225    storage: Storage,
226    config: LargeBufferConfig,
227    write_pos: usize,
228    spilled: bool,
229}
230
231impl SpillBuffer {
232    /// Create a new spill buffer with default configuration.
233    #[must_use]
234    pub fn new() -> Self {
235        Self::with_config(LargeBufferConfig::default())
236    }
237
238    /// Create a new spill buffer with custom configuration.
239    #[must_use]
240    pub fn with_config(config: LargeBufferConfig) -> Self {
241        Self {
242            storage: Storage::Memory(Vec::with_capacity(config.initial_capacity)),
243            config,
244            write_pos: 0,
245            spilled: false,
246        }
247    }
248
249    /// Check if the buffer has spilled to disk.
250    #[must_use]
251    pub const fn is_spilled(&self) -> bool {
252        self.spilled
253    }
254
255    /// Get the current size of the buffer.
256    #[must_use]
257    pub const fn len(&self) -> usize {
258        self.write_pos
259    }
260
261    /// Check if the buffer is empty.
262    #[must_use]
263    pub const fn is_empty(&self) -> bool {
264        self.write_pos == 0
265    }
266
267    /// Write data to the buffer.
268    ///
269    /// # Errors
270    ///
271    /// Returns an error if disk I/O fails during spill.
272    pub fn write(&mut self, data: &[u8]) -> io::Result<()> {
273        let new_size = self.write_pos + data.len();
274
275        // Check if we need to spill
276        if !self.spilled
277            && self.config.spill_threshold > 0
278            && new_size > self.config.spill_threshold
279        {
280            self.spill_to_disk()?;
281        }
282
283        // Check capacity
284        if new_size > self.config.max_capacity {
285            return Err(io::Error::new(
286                io::ErrorKind::StorageFull,
287                "Buffer exceeded maximum capacity",
288            ));
289        }
290
291        match &mut self.storage {
292            Storage::Memory(vec) => {
293                vec.extend_from_slice(data);
294                self.write_pos = vec.len();
295            }
296            Storage::File { file, size, .. } => {
297                file.seek(SeekFrom::End(0))?;
298                file.write_all(data)?;
299                *size += data.len();
300                self.write_pos = *size;
301            }
302        }
303
304        Ok(())
305    }
306
307    /// Spill the buffer contents to disk.
308    fn spill_to_disk(&mut self) -> io::Result<()> {
309        if self.spilled {
310            return Ok(());
311        }
312
313        let temp_dir = self
314            .config
315            .temp_dir
316            .as_ref()
317            .map_or_else(std::env::temp_dir, std::clone::Clone::clone);
318
319        let path = temp_dir.join(format!("rust_expect_buffer_{}", std::process::id()));
320
321        let mut file = std::fs::OpenOptions::new()
322            .read(true)
323            .write(true)
324            .create(true)
325            .truncate(true)
326            .open(&path)?;
327
328        // Write existing memory contents to file
329        if let Storage::Memory(vec) = &self.storage {
330            file.write_all(vec)?;
331        }
332
333        let size = self.write_pos;
334        self.storage = Storage::File { file, path, size };
335        self.spilled = true;
336
337        Ok(())
338    }
339
340    /// Read all data from the buffer.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error if disk I/O fails.
345    pub fn read_all(&mut self) -> io::Result<Vec<u8>> {
346        match &mut self.storage {
347            Storage::Memory(vec) => Ok(vec.clone()),
348            Storage::File { file, size, .. } => {
349                file.seek(SeekFrom::Start(0))?;
350                let mut data = vec![0u8; *size];
351                file.read_exact(&mut data)?;
352                Ok(data)
353            }
354        }
355    }
356
357    /// Read as a string (lossy UTF-8 conversion).
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if disk I/O fails.
362    pub fn as_string(&mut self) -> io::Result<String> {
363        Ok(String::from_utf8_lossy(&self.read_all()?).into_owned())
364    }
365
366    /// Clear the buffer.
367    ///
368    /// If spilled to disk, the file is truncated.
369    ///
370    /// # Errors
371    ///
372    /// Returns an error if disk I/O fails.
373    pub fn clear(&mut self) -> io::Result<()> {
374        match &mut self.storage {
375            Storage::Memory(vec) => {
376                vec.clear();
377            }
378            Storage::File { file, size, .. } => {
379                file.set_len(0)?;
380                *size = 0;
381            }
382        }
383        self.write_pos = 0;
384        Ok(())
385    }
386}
387
388impl Default for SpillBuffer {
389    fn default() -> Self {
390        Self::new()
391    }
392}
393
394impl Drop for SpillBuffer {
395    fn drop(&mut self) {
396        // Clean up temporary file if it exists
397        if let Storage::File { path, .. } = &self.storage {
398            let _ = std::fs::remove_file(path);
399        }
400    }
401}
402
403impl std::fmt::Debug for SpillBuffer {
404    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
405        f.debug_struct("SpillBuffer")
406            .field("len", &self.write_pos)
407            .field("spilled", &self.spilled)
408            .field("max_capacity", &self.config.max_capacity)
409            .finish()
410    }
411}
412
413/// Thread-safe atomic buffer size tracker.
414#[derive(Debug, Default)]
415pub struct AtomicBufferSize {
416    size: AtomicUsize,
417}
418
419impl AtomicBufferSize {
420    /// Create a new size tracker.
421    #[must_use]
422    pub const fn new() -> Self {
423        Self {
424            size: AtomicUsize::new(0),
425        }
426    }
427
428    /// Get current size.
429    #[must_use]
430    pub fn get(&self) -> usize {
431        self.size.load(Ordering::Relaxed)
432    }
433
434    /// Add to size.
435    pub fn add(&self, n: usize) {
436        self.size.fetch_add(n, Ordering::Relaxed);
437    }
438
439    /// Subtract from size.
440    pub fn sub(&self, n: usize) {
441        self.size.fetch_sub(n, Ordering::Relaxed);
442    }
443
444    /// Set size.
445    pub fn set(&self, n: usize) {
446        self.size.store(n, Ordering::Relaxed);
447    }
448
449    /// Reset to zero.
450    pub fn reset(&self) {
451        self.size.store(0, Ordering::Relaxed);
452    }
453}
454
455/// Allocate a page-aligned buffer for zero-copy I/O.
456///
457/// # Safety
458///
459/// This allocates raw memory. The returned buffer should be deallocated
460/// properly when no longer needed.
461#[cfg(unix)]
462#[must_use]
463pub fn allocate_page_aligned(size: usize) -> Vec<u8> {
464    // Round up to page size
465    let page_size = page_size();
466    let aligned_size = (size + page_size - 1) & !(page_size - 1);
467
468    // For now, use regular allocation which may or may not be page-aligned
469    // A more advanced implementation would use mmap or posix_memalign
470    vec![0u8; aligned_size]
471}
472
473/// Get the system page size.
474#[cfg(unix)]
475#[must_use]
476#[allow(unsafe_code)]
477pub fn page_size() -> usize {
478    // SAFETY: sysconf is safe to call
479    let size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) };
480    if size <= 0 {
481        4096 // Default fallback
482    } else {
483        size as usize
484    }
485}
486
487/// Get the system page size.
488#[cfg(windows)]
489#[must_use]
490pub fn page_size() -> usize {
491    4096 // Default for Windows
492}
493
494/// Allocate a page-aligned buffer for zero-copy I/O.
495#[cfg(windows)]
496#[must_use]
497pub fn allocate_page_aligned(size: usize) -> Vec<u8> {
498    vec![0u8; size]
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504
505    #[test]
506    fn ring_buffer_basic() {
507        let mut buf = RingBuffer::new(10);
508        assert!(buf.is_empty());
509        assert_eq!(buf.capacity(), 10);
510
511        buf.write(b"hello");
512        assert_eq!(buf.len(), 5);
513        assert_eq!(buf.as_string(), "hello");
514    }
515
516    #[test]
517    fn ring_buffer_wrap() {
518        let mut buf = RingBuffer::new(10);
519        buf.write(b"12345678"); // 8 bytes
520        assert_eq!(buf.len(), 8);
521
522        buf.write(b"ABCD"); // 4 more bytes, should wrap
523        assert_eq!(buf.len(), 10); // Full
524        assert!(buf.is_full());
525
526        // Should contain last 10 bytes
527        let content = buf.as_string();
528        assert_eq!(content.len(), 10);
529        assert!(content.ends_with("ABCD"));
530    }
531
532    #[test]
533    fn ring_buffer_tail_bytes() {
534        let mut buf = RingBuffer::new(20);
535        buf.write(b"hello world");
536
537        let tail = buf.tail_bytes(5);
538        assert_eq!(tail, b"world");
539
540        let tail = buf.tail_bytes(100);
541        assert_eq!(tail, b"hello world");
542    }
543
544    #[test]
545    fn ring_buffer_clear() {
546        let mut buf = RingBuffer::new(10);
547        buf.write(b"hello");
548        buf.clear();
549
550        assert!(buf.is_empty());
551        assert_eq!(buf.len(), 0);
552    }
553
554    #[test]
555    fn spill_buffer_memory() {
556        let config = LargeBufferConfig::new(1024 * 1024).spill_threshold(0); // Never spill
557
558        let mut buf = SpillBuffer::with_config(config);
559        buf.write(b"hello world").unwrap();
560
561        assert!(!buf.is_spilled());
562        assert_eq!(buf.len(), 11);
563        assert_eq!(buf.as_string().unwrap(), "hello world");
564    }
565
566    #[test]
567    fn spill_buffer_spill() {
568        let config = LargeBufferConfig::new(1024 * 1024).spill_threshold(10); // Spill after 10 bytes
569
570        let mut buf = SpillBuffer::with_config(config);
571        buf.write(b"hello").unwrap();
572        assert!(!buf.is_spilled());
573
574        buf.write(b"world!!!").unwrap();
575        assert!(buf.is_spilled());
576        assert_eq!(buf.as_string().unwrap(), "helloworld!!!");
577    }
578
579    #[test]
580    fn atomic_buffer_size() {
581        let size = AtomicBufferSize::new();
582        assert_eq!(size.get(), 0);
583
584        size.add(100);
585        assert_eq!(size.get(), 100);
586
587        size.sub(30);
588        assert_eq!(size.get(), 70);
589
590        size.set(500);
591        assert_eq!(size.get(), 500);
592
593        size.reset();
594        assert_eq!(size.get(), 0);
595    }
596
597    #[test]
598    fn page_aligned_allocation() {
599        let buf = allocate_page_aligned(1000);
600        assert!(buf.len() >= 1000);
601
602        let page = page_size();
603        assert!(page >= 4096);
604    }
605}