Skip to main content

graphos_core/execution/spill/
file.rs

1//! Spill file read/write abstraction.
2
3use std::fs::File;
4use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
5use std::path::{Path, PathBuf};
6
7/// Buffer size for spill file I/O (64 KB).
8const BUFFER_SIZE: usize = 64 * 1024;
9
10/// Handle for a single spill file.
11///
12/// SpillFile manages a temporary file used for spilling operator state to disk.
13/// It supports:
14/// - Buffered writing for efficiency
15/// - Multiple readers for concurrent access
16/// - Automatic byte counting
17pub struct SpillFile {
18    /// Path to the spill file.
19    path: PathBuf,
20    /// Buffered writer (Some during write phase, None after finish).
21    writer: Option<BufWriter<File>>,
22    /// Total bytes written to this file.
23    bytes_written: u64,
24}
25
26impl SpillFile {
27    /// Creates a new spill file at the given path.
28    ///
29    /// # Errors
30    ///
31    /// Returns an error if the file cannot be created.
32    pub fn new(path: PathBuf) -> std::io::Result<Self> {
33        let file = File::create(&path)?;
34        let writer = BufWriter::with_capacity(BUFFER_SIZE, file);
35
36        Ok(Self {
37            path,
38            writer: Some(writer),
39            bytes_written: 0,
40        })
41    }
42
43    /// Returns the path to this spill file.
44    #[must_use]
45    pub fn path(&self) -> &Path {
46        &self.path
47    }
48
49    /// Returns the number of bytes written to this file.
50    #[must_use]
51    pub fn bytes_written(&self) -> u64 {
52        self.bytes_written
53    }
54
55    /// Writes raw bytes to the file.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if the write fails.
60    pub fn write_all(&mut self, data: &[u8]) -> std::io::Result<()> {
61        let writer = self
62            .writer
63            .as_mut()
64            .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "Write phase ended"))?;
65
66        writer.write_all(data)?;
67        self.bytes_written += data.len() as u64;
68        Ok(())
69    }
70
71    /// Writes a u64 in little-endian format.
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if the write fails.
76    pub fn write_u64_le(&mut self, value: u64) -> std::io::Result<()> {
77        self.write_all(&value.to_le_bytes())
78    }
79
80    /// Writes an i64 in little-endian format.
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the write fails.
85    pub fn write_i64_le(&mut self, value: i64) -> std::io::Result<()> {
86        self.write_all(&value.to_le_bytes())
87    }
88
89    /// Writes a length-prefixed byte slice.
90    ///
91    /// Format: [length: u64][data: bytes]
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the write fails.
96    pub fn write_bytes(&mut self, data: &[u8]) -> std::io::Result<()> {
97        self.write_u64_le(data.len() as u64)?;
98        self.write_all(data)
99    }
100
101    /// Finishes writing and flushes buffers.
102    ///
103    /// After this call, the file is ready for reading.
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the flush fails.
108    pub fn finish_write(&mut self) -> std::io::Result<()> {
109        if let Some(mut writer) = self.writer.take() {
110            writer.flush()?;
111        }
112        Ok(())
113    }
114
115    /// Returns whether this file is still in write mode.
116    #[must_use]
117    pub fn is_writable(&self) -> bool {
118        self.writer.is_some()
119    }
120
121    /// Creates a reader for this file.
122    ///
123    /// Can be called multiple times to create multiple readers.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if the file cannot be opened for reading.
128    pub fn reader(&self) -> std::io::Result<SpillFileReader> {
129        let file = File::open(&self.path)?;
130        let reader = BufReader::with_capacity(BUFFER_SIZE, file);
131        Ok(SpillFileReader { reader })
132    }
133
134    /// Deletes this spill file.
135    ///
136    /// Consumes the SpillFile handle.
137    ///
138    /// # Errors
139    ///
140    /// Returns an error if the file cannot be deleted.
141    pub fn delete(mut self) -> std::io::Result<()> {
142        // Close the writer first
143        self.writer = None;
144        std::fs::remove_file(&self.path)
145    }
146}
147
148impl std::fmt::Debug for SpillFile {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct("SpillFile")
151            .field("path", &self.path)
152            .field("bytes_written", &self.bytes_written)
153            .field("is_writable", &self.is_writable())
154            .finish()
155    }
156}
157
158/// Reader for a spill file.
159///
160/// Provides buffered reading of spill file contents.
161pub struct SpillFileReader {
162    /// Buffered reader.
163    reader: BufReader<File>,
164}
165
166impl SpillFileReader {
167    /// Reads exactly `buf.len()` bytes from the file.
168    ///
169    /// # Errors
170    ///
171    /// Returns an error if not enough bytes are available.
172    pub fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
173        self.reader.read_exact(buf)
174    }
175
176    /// Reads a u64 in little-endian format.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if the read fails.
181    pub fn read_u64_le(&mut self) -> std::io::Result<u64> {
182        let mut buf = [0u8; 8];
183        self.read_exact(&mut buf)?;
184        Ok(u64::from_le_bytes(buf))
185    }
186
187    /// Reads an i64 in little-endian format.
188    ///
189    /// # Errors
190    ///
191    /// Returns an error if the read fails.
192    pub fn read_i64_le(&mut self) -> std::io::Result<i64> {
193        let mut buf = [0u8; 8];
194        self.read_exact(&mut buf)?;
195        Ok(i64::from_le_bytes(buf))
196    }
197
198    /// Reads a f64 in little-endian format.
199    ///
200    /// # Errors
201    ///
202    /// Returns an error if the read fails.
203    pub fn read_f64_le(&mut self) -> std::io::Result<f64> {
204        let mut buf = [0u8; 8];
205        self.read_exact(&mut buf)?;
206        Ok(f64::from_le_bytes(buf))
207    }
208
209    /// Reads a u8 byte.
210    ///
211    /// # Errors
212    ///
213    /// Returns an error if the read fails.
214    pub fn read_u8(&mut self) -> std::io::Result<u8> {
215        let mut buf = [0u8; 1];
216        self.read_exact(&mut buf)?;
217        Ok(buf[0])
218    }
219
220    /// Reads a length-prefixed byte slice.
221    ///
222    /// Format: [length: u64][data: bytes]
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if the read fails.
227    pub fn read_bytes(&mut self) -> std::io::Result<Vec<u8>> {
228        let len = self.read_u64_le()? as usize;
229        let mut buf = vec![0u8; len];
230        self.read_exact(&mut buf)?;
231        Ok(buf)
232    }
233
234    /// Seeks to a position in the file.
235    ///
236    /// # Errors
237    ///
238    /// Returns an error if the seek fails.
239    pub fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
240        self.reader.seek(pos)
241    }
242
243    /// Seeks to the beginning of the file.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the seek fails.
248    pub fn rewind(&mut self) -> std::io::Result<()> {
249        self.reader.seek(SeekFrom::Start(0))?;
250        Ok(())
251    }
252
253    /// Returns the current position in the file.
254    ///
255    /// # Errors
256    ///
257    /// Returns an error if the operation fails.
258    pub fn position(&mut self) -> std::io::Result<u64> {
259        self.reader.stream_position()
260    }
261}
262
263impl std::fmt::Debug for SpillFileReader {
264    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265        f.debug_struct("SpillFileReader").finish()
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use tempfile::TempDir;
273
274    #[test]
275    fn test_spill_file_write_read() {
276        let temp_dir = TempDir::new().unwrap();
277        let file_path = temp_dir.path().join("test.spill");
278
279        // Write phase
280        let mut file = SpillFile::new(file_path).unwrap();
281        file.write_all(b"hello ").unwrap();
282        file.write_all(b"world").unwrap();
283        assert_eq!(file.bytes_written(), 11);
284        file.finish_write().unwrap();
285
286        // Read phase
287        let mut reader = file.reader().unwrap();
288        let mut buf = [0u8; 11];
289        reader.read_exact(&mut buf).unwrap();
290        assert_eq!(&buf, b"hello world");
291    }
292
293    #[test]
294    fn test_spill_file_integers() {
295        let temp_dir = TempDir::new().unwrap();
296        let file_path = temp_dir.path().join("test.spill");
297
298        let mut file = SpillFile::new(file_path).unwrap();
299        file.write_u64_le(u64::MAX).unwrap();
300        file.write_i64_le(i64::MIN).unwrap();
301        file.finish_write().unwrap();
302
303        let mut reader = file.reader().unwrap();
304        assert_eq!(reader.read_u64_le().unwrap(), u64::MAX);
305        assert_eq!(reader.read_i64_le().unwrap(), i64::MIN);
306    }
307
308    #[test]
309    fn test_spill_file_bytes_prefixed() {
310        let temp_dir = TempDir::new().unwrap();
311        let file_path = temp_dir.path().join("test.spill");
312
313        let mut file = SpillFile::new(file_path).unwrap();
314        file.write_bytes(b"short").unwrap();
315        file.write_bytes(b"longer string here").unwrap();
316        file.finish_write().unwrap();
317
318        let mut reader = file.reader().unwrap();
319        assert_eq!(reader.read_bytes().unwrap(), b"short");
320        assert_eq!(reader.read_bytes().unwrap(), b"longer string here");
321    }
322
323    #[test]
324    fn test_spill_file_multiple_readers() {
325        let temp_dir = TempDir::new().unwrap();
326        let file_path = temp_dir.path().join("test.spill");
327
328        let mut file = SpillFile::new(file_path).unwrap();
329        file.write_u64_le(42).unwrap();
330        file.write_u64_le(100).unwrap();
331        file.finish_write().unwrap();
332
333        // Create multiple readers
334        let mut reader1 = file.reader().unwrap();
335        let mut reader2 = file.reader().unwrap();
336
337        // Read from reader1
338        assert_eq!(reader1.read_u64_le().unwrap(), 42);
339
340        // reader2 still at beginning
341        assert_eq!(reader2.read_u64_le().unwrap(), 42);
342        assert_eq!(reader2.read_u64_le().unwrap(), 100);
343
344        // reader1 continues
345        assert_eq!(reader1.read_u64_le().unwrap(), 100);
346    }
347
348    #[test]
349    fn test_spill_file_delete() {
350        let temp_dir = TempDir::new().unwrap();
351        let file_path = temp_dir.path().join("test.spill");
352
353        let mut file = SpillFile::new(file_path.clone()).unwrap();
354        file.write_all(b"data").unwrap();
355        file.finish_write().unwrap();
356
357        assert!(file_path.exists());
358        file.delete().unwrap();
359        assert!(!file_path.exists());
360    }
361
362    #[test]
363    fn test_reader_seek() {
364        let temp_dir = TempDir::new().unwrap();
365        let file_path = temp_dir.path().join("test.spill");
366
367        let mut file = SpillFile::new(file_path).unwrap();
368        file.write_u64_le(1).unwrap();
369        file.write_u64_le(2).unwrap();
370        file.write_u64_le(3).unwrap();
371        file.finish_write().unwrap();
372
373        let mut reader = file.reader().unwrap();
374
375        // Read second value directly
376        reader.seek(SeekFrom::Start(8)).unwrap();
377        assert_eq!(reader.read_u64_le().unwrap(), 2);
378
379        // Rewind and read from beginning
380        reader.rewind().unwrap();
381        assert_eq!(reader.read_u64_le().unwrap(), 1);
382    }
383}