Skip to main content

rusty_sponge/
buffer.rs

1//! Hybrid in-memory + tempfile-spill buffer for soaking up stdin.
2//!
3//! Per AD-002: small inputs stay in a `Vec<u8>`; once the buffer would
4//! exceed the spill threshold, the in-memory bytes are flushed into a
5//! sibling tempfile and the remainder of stdin streams through a
6//! `BufWriter<NamedTempFile>`.
7//!
8//! The spill tempfile is placed in the *target's parent directory* (not
9//! `$TMPDIR`) so that the eventual atomic rename in [`crate::atomic`]
10//! works without crossing a filesystem boundary (HINT-002).
11
12use std::io::{self, BufWriter, Read, Seek, SeekFrom, Write};
13use std::path::Path;
14
15use tempfile::NamedTempFile;
16
17/// Read chunk size for the drain loop. 64 KiB matches typical filesystem
18/// readahead and pages cleanly on all supported targets (HINT-005).
19const READ_CHUNK_SIZE: usize = 64 * 1024;
20
21/// The buffered input. Transitions from in-memory to spilled as data accumulates.
22pub enum Buffer {
23    /// In-memory accumulation (small inputs).
24    InMemory(Vec<u8>),
25    /// Spilled-to-tempfile accumulation (large inputs).
26    Spilled {
27        writer: BufWriter<NamedTempFile>,
28        /// Tracked size in bytes (since BufWriter writes are not all flushed yet).
29        len: u64,
30    },
31}
32
33impl Buffer {
34    /// Construct an empty in-memory buffer with no preallocation.
35    pub fn new() -> Self {
36        Buffer::InMemory(Vec::new())
37    }
38
39    /// Return the current logical length of the buffered bytes.
40    pub fn len(&self) -> u64 {
41        match self {
42            Buffer::InMemory(v) => v.len() as u64,
43            Buffer::Spilled { len, .. } => *len,
44        }
45    }
46
47    /// True iff no bytes have been appended to this buffer.
48    pub fn is_empty(&self) -> bool {
49        self.len() == 0
50    }
51
52    /// Drain the entire reader into this buffer, transitioning to the
53    /// spilled variant when the accumulated size would exceed `threshold`.
54    ///
55    /// `spill_dir` is the directory in which the spill tempfile is created
56    /// when the transition triggers. Caller MUST pass the *target file's
57    /// parent directory* (HINT-002).
58    ///
59    /// On the binary path (feature `cli`), the loop polls the process-wide
60    /// cancellation flag between chunks; if a signal was delivered, the
61    /// drain returns `io::ErrorKind::Interrupted` so the in-progress
62    /// tempfile is dropped via the normal `Drop` chain before exit.
63    pub fn drain_reader<R: Read>(
64        &mut self,
65        mut reader: R,
66        threshold: usize,
67        spill_dir: &Path,
68    ) -> io::Result<()> {
69        let mut chunk = vec![0u8; READ_CHUNK_SIZE];
70        loop {
71            #[cfg(feature = "cli")]
72            if crate::signal::is_cancelled() {
73                return Err(io::Error::new(
74                    io::ErrorKind::Interrupted,
75                    "rusty-sponge: cancelled by signal",
76                ));
77            }
78            let n = reader.read(&mut chunk)?;
79            if n == 0 {
80                break;
81            }
82            self.append(&chunk[..n], threshold, spill_dir)?;
83        }
84        Ok(())
85    }
86
87    /// Append a slice of bytes, transitioning to spilled storage if doing so
88    /// would push the buffer past `threshold`.
89    pub fn append(&mut self, bytes: &[u8], threshold: usize, spill_dir: &Path) -> io::Result<()> {
90        let threshold_u64 = threshold as u64;
91        let projected_len = self.len() + bytes.len() as u64;
92
93        // If we are in-memory and the projected size crosses the threshold,
94        // spill before writing the new bytes.
95        if matches!(self, Buffer::InMemory(_)) && projected_len > threshold_u64 {
96            self.transition_to_spilled(spill_dir)?;
97        }
98
99        match self {
100            Buffer::InMemory(v) => v.extend_from_slice(bytes),
101            Buffer::Spilled { writer, len } => {
102                writer.write_all(bytes)?;
103                *len += bytes.len() as u64;
104            }
105        }
106        Ok(())
107    }
108
109    /// Promote an `InMemory` buffer to a `Spilled` one, flushing the existing
110    /// bytes into the new tempfile. No-op if already spilled.
111    pub fn transition_to_spilled(&mut self, spill_dir: &Path) -> io::Result<()> {
112        if let Buffer::InMemory(bytes) = std::mem::replace(self, Buffer::InMemory(Vec::new())) {
113            let tempfile = tempfile::Builder::new()
114                .prefix(".rusty-sponge-spill-")
115                .tempfile_in(spill_dir)?;
116            let mut writer = BufWriter::with_capacity(READ_CHUNK_SIZE, tempfile);
117            writer.write_all(&bytes)?;
118            let len = bytes.len() as u64;
119            *self = Buffer::Spilled { writer, len };
120        }
121        Ok(())
122    }
123
124    /// Consume the buffer and write its bytes to `out`. Implementations:
125    /// - `InMemory`: a single `write_all` of the Vec.
126    /// - `Spilled`: flush BufWriter, rewind the NamedTempFile to start,
127    ///   copy through to `out` in 64 KiB chunks.
128    pub fn write_to<W: Write>(self, mut out: W) -> io::Result<()> {
129        match self {
130            Buffer::InMemory(v) => out.write_all(&v),
131            Buffer::Spilled { writer, .. } => {
132                let mut tempfile = writer
133                    .into_inner()
134                    .map_err(|e| io::Error::other(format!("BufWriter flush failed: {e}")))?;
135                tempfile.as_file_mut().seek(SeekFrom::Start(0))?;
136                let mut chunk = vec![0u8; READ_CHUNK_SIZE];
137                let mut reader = tempfile.as_file();
138                loop {
139                    let n = reader.read(&mut chunk)?;
140                    if n == 0 {
141                        break;
142                    }
143                    out.write_all(&chunk[..n])?;
144                }
145                Ok(())
146            }
147        }
148    }
149}
150
151impl Default for Buffer {
152    fn default() -> Self {
153        Self::new()
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use std::io::Cursor;
161
162    #[test]
163    fn empty_buffer_has_len_zero() {
164        let buf = Buffer::new();
165        assert_eq!(buf.len(), 0);
166    }
167
168    #[test]
169    fn drain_small_input_stays_in_memory() {
170        let tmpdir = tempfile::tempdir().unwrap();
171        let mut buf = Buffer::new();
172        let input = Cursor::new(b"hello world\n");
173        buf.drain_reader(input, 1024 * 1024, tmpdir.path()).unwrap();
174        assert!(matches!(buf, Buffer::InMemory(_)));
175        assert_eq!(buf.len(), 12);
176    }
177
178    #[test]
179    fn drain_large_input_transitions_to_spilled() {
180        let tmpdir = tempfile::tempdir().unwrap();
181        let mut buf = Buffer::new();
182        // 256 KiB input with 64 KiB threshold → must spill
183        let big = vec![0xAAu8; 256 * 1024];
184        buf.drain_reader(Cursor::new(&big), 64 * 1024, tmpdir.path())
185            .unwrap();
186        assert!(matches!(buf, Buffer::Spilled { .. }));
187        assert_eq!(buf.len(), 256 * 1024);
188    }
189
190    #[test]
191    fn write_to_roundtrips_in_memory() {
192        let tmpdir = tempfile::tempdir().unwrap();
193        let mut buf = Buffer::new();
194        buf.drain_reader(Cursor::new(b"abc\n"), 1024 * 1024, tmpdir.path())
195            .unwrap();
196        let mut out = Vec::new();
197        buf.write_to(&mut out).unwrap();
198        assert_eq!(out, b"abc\n");
199    }
200
201    #[test]
202    fn write_to_roundtrips_spilled() {
203        let tmpdir = tempfile::tempdir().unwrap();
204        let mut buf = Buffer::new();
205        let big = (0u8..=255u8).cycle().take(256 * 1024).collect::<Vec<_>>();
206        buf.drain_reader(Cursor::new(&big), 1024, tmpdir.path())
207            .unwrap();
208        assert!(matches!(buf, Buffer::Spilled { .. }));
209        let mut out = Vec::new();
210        buf.write_to(&mut out).unwrap();
211        assert_eq!(out, big);
212    }
213
214    #[test]
215    fn binary_bytes_pass_through_unchanged() {
216        let tmpdir = tempfile::tempdir().unwrap();
217        let mut buf = Buffer::new();
218        let bytes: &[u8] = &[0x00, 0xFE, 0xFF, 0xC3, 0x28, 0xA0, 0xA1];
219        buf.drain_reader(Cursor::new(bytes), 1024 * 1024, tmpdir.path())
220            .unwrap();
221        let mut out = Vec::new();
222        buf.write_to(&mut out).unwrap();
223        assert_eq!(out, bytes);
224    }
225
226    #[test]
227    fn empty_input_writes_zero_bytes() {
228        let tmpdir = tempfile::tempdir().unwrap();
229        let mut buf = Buffer::new();
230        buf.drain_reader(Cursor::new(&[][..]), 1024 * 1024, tmpdir.path())
231            .unwrap();
232        let mut out = Vec::new();
233        buf.write_to(&mut out).unwrap();
234        assert_eq!(out, Vec::<u8>::new());
235    }
236}