Skip to main content

grafeo_core/execution/spill/
file.rs

1//! Spill file read/write abstraction.
2
3use std::fs::File;
4use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
5use std::path::{Path, PathBuf};
6
7/// Buffer size for spill file I/O (64 KB).
8const BUFFER_SIZE: usize = 64 * 1024;
9
10/// Handle for a single spill file.
11///
12/// SpillFile manages a temporary file used for spilling operator state to disk.
13/// It supports:
14/// - Buffered writing for efficiency
15/// - Multiple readers for concurrent access
16/// - Automatic byte counting
17pub struct SpillFile {
18    /// Path to the spill file.
19    path: PathBuf,
20    /// Buffered writer (Some during write phase, None after finish).
21    writer: Option<BufWriter<File>>,
22    /// Total bytes written to this file.
23    bytes_written: u64,
24}
25
26impl SpillFile {
27    /// Creates a new spill file at the given path.
28    ///
29    /// # Errors
30    ///
31    /// Returns an error if the file cannot be created.
32    pub fn new(path: PathBuf) -> std::io::Result<Self> {
33        let file = File::create(&path)?;
34        let writer = BufWriter::with_capacity(BUFFER_SIZE, file);
35
36        Ok(Self {
37            path,
38            writer: Some(writer),
39            bytes_written: 0,
40        })
41    }
42
43    /// Returns the path to this spill file.
44    #[must_use]
45    pub fn path(&self) -> &Path {
46        &self.path
47    }
48
49    /// Returns the number of bytes written to this file.
50    #[must_use]
51    pub fn bytes_written(&self) -> u64 {
52        self.bytes_written
53    }
54
55    /// Writes raw bytes to the file.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if the write fails.
60    pub fn write_all(&mut self, data: &[u8]) -> std::io::Result<()> {
61        let writer = self
62            .writer
63            .as_mut()
64            .ok_or_else(|| std::io::Error::other("Write phase ended"))?;
65
66        writer.write_all(data)?;
67        self.bytes_written += data.len() as u64;
68        Ok(())
69    }
70
71    /// Writes a u64 in little-endian format.
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if the write fails.
76    pub fn write_u64_le(&mut self, value: u64) -> std::io::Result<()> {
77        self.write_all(&value.to_le_bytes())
78    }
79
80    /// Writes an i64 in little-endian format.
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the write fails.
85    pub fn write_i64_le(&mut self, value: i64) -> std::io::Result<()> {
86        self.write_all(&value.to_le_bytes())
87    }
88
89    /// Writes a length-prefixed byte slice.
90    ///
91    /// Format: [length: u64][data: bytes]
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the write fails.
96    pub fn write_bytes(&mut self, data: &[u8]) -> std::io::Result<()> {
97        self.write_u64_le(data.len() as u64)?;
98        self.write_all(data)
99    }
100
101    /// Finishes writing and flushes buffers.
102    ///
103    /// After this call, the file is ready for reading.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the flush fails.
108    pub fn finish_write(&mut self) -> std::io::Result<()> {
109        if let Some(mut writer) = self.writer.take() {
110            writer.flush()?;
111        }
112        Ok(())
113    }
114
115    /// Returns whether this file is still in write mode.
116    #[must_use]
117    pub fn is_writable(&self) -> bool {
118        self.writer.is_some()
119    }
120
121    /// Creates a reader for this file.
122    ///
123    /// Can be called multiple times to create multiple readers.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the file cannot be opened for reading.
128    pub fn reader(&self) -> std::io::Result<SpillFileReader> {
129        let file = File::open(&self.path)?;
130        let reader = BufReader::with_capacity(BUFFER_SIZE, file);
131        Ok(SpillFileReader { reader })
132    }
133
134    /// Deletes this spill file.
135    ///
136    /// Consumes the SpillFile handle.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the file cannot be deleted.
141    pub fn delete(mut self) -> std::io::Result<()> {
142        // Close the writer first
143        self.writer = None;
144        std::fs::remove_file(&self.path)
145    }
146}
147
148impl std::fmt::Debug for SpillFile {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct("SpillFile")
151            .field("path", &self.path)
152            .field("bytes_written", &self.bytes_written)
153            .field("is_writable", &self.is_writable())
154            .finish()
155    }
156}
157
158/// Reader for a spill file.
159///
160/// Provides buffered reading of spill file contents.
161pub struct SpillFileReader {
162    /// Buffered reader.
163    reader: BufReader<File>,
164}
165
166impl SpillFileReader {
167    /// Reads exactly `buf.len()` bytes from the file.
168    ///
169    /// # Errors
170    ///
171    /// Returns an error if not enough bytes are available.
172    pub fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
173        self.reader.read_exact(buf)
174    }
175
176    /// Reads a u64 in little-endian format.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if the read fails.
181    pub fn read_u64_le(&mut self) -> std::io::Result<u64> {
182        let mut buf = [0u8; 8];
183        self.read_exact(&mut buf)?;
184        Ok(u64::from_le_bytes(buf))
185    }
186
187    /// Reads an i64 in little-endian format.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the read fails.
192    pub fn read_i64_le(&mut self) -> std::io::Result<i64> {
193        let mut buf = [0u8; 8];
194        self.read_exact(&mut buf)?;
195        Ok(i64::from_le_bytes(buf))
196    }
197
198    /// Reads a f64 in little-endian format.
199    ///
200    /// # Errors
201    ///
202    /// Returns an error if the read fails.
203    pub fn read_f64_le(&mut self) -> std::io::Result<f64> {
204        let mut buf = [0u8; 8];
205        self.read_exact(&mut buf)?;
206        Ok(f64::from_le_bytes(buf))
207    }
208
209    /// Reads a u8 byte.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the read fails.
214    pub fn read_u8(&mut self) -> std::io::Result<u8> {
215        let mut buf = [0u8; 1];
216        self.read_exact(&mut buf)?;
217        Ok(buf[0])
218    }
219
220    /// Reads a length-prefixed byte slice.
221    ///
222    /// Format: [length: u64][data: bytes]
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if the read fails.
227    pub fn read_bytes(&mut self) -> std::io::Result<Vec<u8>> {
228        let raw_len = self.read_u64_le()?;
229        let len = usize::try_from(raw_len).map_err(|_| {
230            std::io::Error::new(
231                std::io::ErrorKind::InvalidData,
232                format!("byte-prefix length {raw_len} exceeds addressable range"),
233            )
234        })?;
235        let mut buf = vec![0u8; len];
236        self.read_exact(&mut buf)?;
237        Ok(buf)
238    }
239
240    /// Seeks to a position in the file.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if the seek fails.
245    pub fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
246        self.reader.seek(pos)
247    }
248
249    /// Seeks to the beginning of the file.
250    ///
251    /// # Errors
252    ///
253    /// Returns an error if the seek fails.
254    pub fn rewind(&mut self) -> std::io::Result<()> {
255        self.reader.seek(SeekFrom::Start(0))?;
256        Ok(())
257    }
258
259    /// Returns the current position in the file.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the operation fails.
264    pub fn position(&mut self) -> std::io::Result<u64> {
265        self.reader.stream_position()
266    }
267}
268
269impl std::fmt::Debug for SpillFileReader {
270    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271        f.debug_struct("SpillFileReader").finish()
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use super::*;
278    use tempfile::TempDir;
279
280    #[test]
281    fn test_spill_file_write_read() {
282        let temp_dir = TempDir::new().unwrap();
283        let file_path = temp_dir.path().join("test.spill");
284
285        // Write phase
286        let mut file = SpillFile::new(file_path).unwrap();
287        file.write_all(b"hello ").unwrap();
288        file.write_all(b"world").unwrap();
289        assert_eq!(file.bytes_written(), 11);
290        file.finish_write().unwrap();
291
292        // Read phase
293        let mut reader = file.reader().unwrap();
294        let mut buf = [0u8; 11];
295        reader.read_exact(&mut buf).unwrap();
296        assert_eq!(&buf, b"hello world");
297    }
298
299    #[test]
300    fn test_spill_file_integers() {
301        let temp_dir = TempDir::new().unwrap();
302        let file_path = temp_dir.path().join("test.spill");
303
304        let mut file = SpillFile::new(file_path).unwrap();
305        file.write_u64_le(u64::MAX).unwrap();
306        file.write_i64_le(i64::MIN).unwrap();
307        file.finish_write().unwrap();
308
309        let mut reader = file.reader().unwrap();
310        assert_eq!(reader.read_u64_le().unwrap(), u64::MAX);
311        assert_eq!(reader.read_i64_le().unwrap(), i64::MIN);
312    }
313
314    #[test]
315    fn test_spill_file_bytes_prefixed() {
316        let temp_dir = TempDir::new().unwrap();
317        let file_path = temp_dir.path().join("test.spill");
318
319        let mut file = SpillFile::new(file_path).unwrap();
320        file.write_bytes(b"short").unwrap();
321        file.write_bytes(b"longer string here").unwrap();
322        file.finish_write().unwrap();
323
324        let mut reader = file.reader().unwrap();
325        assert_eq!(reader.read_bytes().unwrap(), b"short");
326        assert_eq!(reader.read_bytes().unwrap(), b"longer string here");
327    }
328
329    #[test]
330    fn test_spill_file_multiple_readers() {
331        let temp_dir = TempDir::new().unwrap();
332        let file_path = temp_dir.path().join("test.spill");
333
334        let mut file = SpillFile::new(file_path).unwrap();
335        file.write_u64_le(42).unwrap();
336        file.write_u64_le(100).unwrap();
337        file.finish_write().unwrap();
338
339        // Create multiple readers
340        let mut reader1 = file.reader().unwrap();
341        let mut reader2 = file.reader().unwrap();
342
343        // Read from reader1
344        assert_eq!(reader1.read_u64_le().unwrap(), 42);
345
346        // reader2 still at beginning
347        assert_eq!(reader2.read_u64_le().unwrap(), 42);
348        assert_eq!(reader2.read_u64_le().unwrap(), 100);
349
350        // reader1 continues
351        assert_eq!(reader1.read_u64_le().unwrap(), 100);
352    }
353
354    #[test]
355    fn test_spill_file_delete() {
356        let temp_dir = TempDir::new().unwrap();
357        let file_path = temp_dir.path().join("test.spill");
358
359        let mut file = SpillFile::new(file_path.clone()).unwrap();
360        file.write_all(b"data").unwrap();
361        file.finish_write().unwrap();
362
363        assert!(file_path.exists());
364        file.delete().unwrap();
365        assert!(!file_path.exists());
366    }
367
368    #[test]
369    fn test_reader_seek() {
370        let temp_dir = TempDir::new().unwrap();
371        let file_path = temp_dir.path().join("test.spill");
372
373        let mut file = SpillFile::new(file_path).unwrap();
374        file.write_u64_le(1).unwrap();
375        file.write_u64_le(2).unwrap();
376        file.write_u64_le(3).unwrap();
377        file.finish_write().unwrap();
378
379        let mut reader = file.reader().unwrap();
380
381        // Read second value directly
382        reader.seek(SeekFrom::Start(8)).unwrap();
383        assert_eq!(reader.read_u64_le().unwrap(), 2);
384
385        // Rewind and read from beginning
386        reader.rewind().unwrap();
387        assert_eq!(reader.read_u64_le().unwrap(), 1);
388    }
389}