vibesql_executor/memory/
spill.rs

1//! Spill file management for external operators
2//!
3//! This module provides temporary file handling for operators that need to
4//! spill data to disk when memory is exhausted.
5//!
6//! # Design
7//!
8//! - Files are automatically cleaned up when the `SpillFile` handle is dropped
9//! - Files are named with a unique prefix to avoid collisions
10//! - Files are created lazily (only when first written to)
11//! - Supports both sequential writes and random reads for merge operations
12
13use std::{
14    fs::{self, File, OpenOptions},
15    io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
16    path::{Path, PathBuf},
17    sync::atomic::{AtomicU64, Ordering},
18};
19
20/// Global counter for unique file naming
21static FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
22
23/// A handle to a temporary spill file
24///
25/// The file is automatically deleted when this handle is dropped.
26/// Files are created lazily - no disk I/O occurs until the first write.
27///
28/// # Example
29///
30/// ```text
31/// use vibesql_executor::memory::SpillFile;
32/// use std::path::Path;
33///
34/// let mut file = SpillFile::new(Path::new("/tmp/vibesql"))?;
35///
36/// // Write sorted run
37/// file.write_all(&serialized_data)?;
38/// file.flush()?;
39///
40/// // Read back
41/// file.seek(SeekFrom::Start(0))?;
42/// let data = file.read_to_vec()?;
43///
44/// // File is automatically deleted when `file` is dropped
45/// ```
46pub struct SpillFile {
47    /// Path to the spill file
48    path: PathBuf,
49
50    /// Buffered writer (created lazily)
51    writer: Option<BufWriter<File>>,
52
53    /// Buffered reader (created lazily)
54    reader: Option<BufReader<File>>,
55
56    /// Number of bytes written
57    bytes_written: usize,
58
59    /// Whether the file has been created
60    created: bool,
61}
62
63impl SpillFile {
64    /// Create a new spill file in the specified directory
65    ///
66    /// The file is not actually created until the first write.
67    pub fn new(temp_dir: &Path) -> io::Result<Self> {
68        // Ensure temp directory exists
69        fs::create_dir_all(temp_dir)?;
70
71        // Generate unique filename
72        let id = FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
73        let pid = std::process::id();
74        let filename = format!("vibesql_spill_{}_{}.tmp", pid, id);
75        let path = temp_dir.join(filename);
76
77        Ok(Self { path, writer: None, reader: None, bytes_written: 0, created: false })
78    }
79
80    /// Create a spill file with a specific name suffix
81    ///
82    /// Useful for debugging - creates files like "vibesql_spill_12345_0_sort_run.tmp"
83    pub fn with_suffix(temp_dir: &Path, suffix: &str) -> io::Result<Self> {
84        fs::create_dir_all(temp_dir)?;
85
86        let id = FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
87        let pid = std::process::id();
88        let filename = format!("vibesql_spill_{}_{}_{}.tmp", pid, id, suffix);
89        let path = temp_dir.join(filename);
90
91        Ok(Self { path, writer: None, reader: None, bytes_written: 0, created: false })
92    }
93
94    /// Get the path to the spill file
95    pub fn path(&self) -> &Path {
96        &self.path
97    }
98
99    /// Get the number of bytes written to this file
100    pub fn bytes_written(&self) -> usize {
101        self.bytes_written
102    }
103
104    /// Check if the file has been created on disk
105    pub fn is_created(&self) -> bool {
106        self.created
107    }
108
109    /// Ensure the file is created and return a writer
110    fn ensure_writer(&mut self) -> io::Result<&mut BufWriter<File>> {
111        if self.writer.is_none() {
112            let file = OpenOptions::new()
113                .read(true)
114                .write(true)
115                .create(true)
116                .truncate(true)
117                .open(&self.path)?;
118            self.writer = Some(BufWriter::with_capacity(64 * 1024, file)); // 64KB buffer
119            self.created = true;
120        }
121        Ok(self.writer.as_mut().unwrap())
122    }
123
124    /// Write data to the spill file
125    pub fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
126        let writer = self.ensure_writer()?;
127        writer.write_all(data)?;
128        self.bytes_written += data.len();
129        Ok(())
130    }
131
132    /// Flush buffered data to disk
133    pub fn flush(&mut self) -> io::Result<()> {
134        if let Some(writer) = self.writer.as_mut() {
135            writer.flush()?;
136        }
137        Ok(())
138    }
139
140    /// Prepare the file for reading
141    ///
142    /// This flushes any pending writes and switches to read mode.
143    pub fn prepare_for_read(&mut self) -> io::Result<()> {
144        // Flush and drop writer
145        if let Some(mut writer) = self.writer.take() {
146            writer.flush()?;
147            // Writer is dropped, file handle closed
148        }
149
150        // Open for reading
151        if self.created {
152            let file = File::open(&self.path)?;
153            self.reader = Some(BufReader::with_capacity(64 * 1024, file)); // 64KB buffer
154        }
155
156        Ok(())
157    }
158
159    /// Seek to a position in the file
160    pub fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
161        if let Some(reader) = self.reader.as_mut() {
162            reader.seek(pos)
163        } else if let Some(writer) = self.writer.as_mut() {
164            writer.seek(pos)
165        } else {
166            Ok(0)
167        }
168    }
169
170    /// Read data from the spill file
171    pub fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
172        if self.reader.is_none() {
173            self.prepare_for_read()?;
174        }
175        if let Some(reader) = self.reader.as_mut() {
176            reader.read(buf)
177        } else {
178            Ok(0)
179        }
180    }
181
182    /// Read exact number of bytes
183    pub fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
184        if self.reader.is_none() {
185            self.prepare_for_read()?;
186        }
187        if let Some(reader) = self.reader.as_mut() {
188            reader.read_exact(buf)
189        } else {
190            Err(io::Error::new(io::ErrorKind::UnexpectedEof, "spill file not created"))
191        }
192    }
193
194    /// Read entire file into a Vec
195    pub fn read_to_vec(&mut self) -> io::Result<Vec<u8>> {
196        self.seek(SeekFrom::Start(0))?;
197        let mut data = Vec::with_capacity(self.bytes_written);
198        if let Some(reader) = self.reader.as_mut() {
199            reader.read_to_end(&mut data)?;
200        }
201        Ok(data)
202    }
203
204    /// Delete the spill file (called automatically on drop)
205    fn delete(&mut self) {
206        // Close all handles first
207        self.writer = None;
208        self.reader = None;
209
210        // Delete the file
211        if self.created {
212            let _ = fs::remove_file(&self.path);
213        }
214    }
215}
216
217impl Drop for SpillFile {
218    fn drop(&mut self) {
219        self.delete();
220    }
221}
222
223/// A collection of spill files for managing multiple sorted runs
224///
225/// Used by external sort to manage multiple sorted runs that need
226/// to be merged together.
227pub struct SpillFileSet {
228    /// Directory for spill files
229    temp_dir: PathBuf,
230
231    /// Active spill files
232    files: Vec<SpillFile>,
233
234    /// Total bytes spilled across all files
235    total_bytes: usize,
236}
237
238impl SpillFileSet {
239    /// Create a new spill file set
240    pub fn new(temp_dir: PathBuf) -> Self {
241        Self { temp_dir, files: Vec::new(), total_bytes: 0 }
242    }
243
244    /// Create a new spill file in this set
245    pub fn create_file(&mut self) -> io::Result<&mut SpillFile> {
246        let file = SpillFile::new(&self.temp_dir)?;
247        self.files.push(file);
248        Ok(self.files.last_mut().unwrap())
249    }
250
251    /// Create a new spill file with a suffix
252    pub fn create_file_with_suffix(&mut self, suffix: &str) -> io::Result<&mut SpillFile> {
253        let file = SpillFile::with_suffix(&self.temp_dir, suffix)?;
254        self.files.push(file);
255        Ok(self.files.last_mut().unwrap())
256    }
257
258    /// Get the number of spill files
259    pub fn len(&self) -> usize {
260        self.files.len()
261    }
262
263    /// Check if the set is empty
264    pub fn is_empty(&self) -> bool {
265        self.files.is_empty()
266    }
267
268    /// Get total bytes spilled
269    pub fn total_bytes(&self) -> usize {
270        self.files.iter().map(|f| f.bytes_written()).sum()
271    }
272
273    /// Get all files for reading
274    pub fn files(&self) -> &[SpillFile] {
275        &self.files
276    }
277
278    /// Get mutable access to all files
279    pub fn files_mut(&mut self) -> &mut [SpillFile] {
280        &mut self.files
281    }
282
283    /// Take ownership of all files (consumes the set)
284    pub fn into_files(self) -> Vec<SpillFile> {
285        self.files
286    }
287
288    /// Prepare all files for reading
289    pub fn prepare_all_for_read(&mut self) -> io::Result<()> {
290        for file in &mut self.files {
291            file.prepare_for_read()?;
292        }
293        Ok(())
294    }
295
296    /// Clear all files (deletes them from disk)
297    pub fn clear(&mut self) {
298        self.files.clear();
299        self.total_bytes = 0;
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use tempfile::TempDir;
306
307    use super::*;
308
309    #[test]
310    fn test_spill_file_create_and_write() {
311        let temp = TempDir::new().unwrap();
312        let mut file = SpillFile::new(temp.path()).unwrap();
313
314        // File not created until first write
315        assert!(!file.is_created());
316        assert_eq!(file.bytes_written(), 0);
317
318        // Write data
319        file.write_all(b"hello world").unwrap();
320        file.flush().unwrap();
321
322        assert!(file.is_created());
323        assert_eq!(file.bytes_written(), 11);
324        assert!(file.path().exists());
325    }
326
327    #[test]
328    fn test_spill_file_read_back() {
329        let temp = TempDir::new().unwrap();
330        let mut file = SpillFile::new(temp.path()).unwrap();
331
332        let test_data = b"test data 12345";
333        file.write_all(test_data).unwrap();
334        file.flush().unwrap();
335
336        file.prepare_for_read().unwrap();
337        let read_data = file.read_to_vec().unwrap();
338
339        assert_eq!(read_data, test_data);
340    }
341
342    #[test]
343    fn test_spill_file_auto_delete() {
344        let temp = TempDir::new().unwrap();
345        let path;
346        {
347            let mut file = SpillFile::new(temp.path()).unwrap();
348            file.write_all(b"data").unwrap();
349            file.flush().unwrap();
350            path = file.path().to_path_buf();
351            assert!(path.exists());
352        }
353        // File should be deleted after drop
354        assert!(!path.exists());
355    }
356
357    #[test]
358    fn test_spill_file_with_suffix() {
359        let temp = TempDir::new().unwrap();
360        let file = SpillFile::with_suffix(temp.path(), "sort_run").unwrap();
361
362        assert!(file.path().to_string_lossy().contains("sort_run"));
363    }
364
365    #[test]
366    fn test_spill_file_set() {
367        let temp = TempDir::new().unwrap();
368        let mut set = SpillFileSet::new(temp.path().to_path_buf());
369
370        assert!(set.is_empty());
371
372        // Create multiple files
373        {
374            let file = set.create_file().unwrap();
375            file.write_all(b"run1").unwrap();
376        }
377        {
378            let file = set.create_file().unwrap();
379            file.write_all(b"run22").unwrap();
380        }
381
382        assert_eq!(set.len(), 2);
383        assert_eq!(set.total_bytes(), 9); // 4 + 5
384
385        // Clear deletes all files
386        set.clear();
387        assert!(set.is_empty());
388    }
389
390    #[test]
391    fn test_spill_file_sequential_read() {
392        let temp = TempDir::new().unwrap();
393        let mut file = SpillFile::new(temp.path()).unwrap();
394
395        // Write multiple chunks
396        file.write_all(b"chunk1").unwrap();
397        file.write_all(b"chunk2").unwrap();
398        file.write_all(b"chunk3").unwrap();
399        file.flush().unwrap();
400
401        // Read back
402        file.prepare_for_read().unwrap();
403
404        let mut buf = [0u8; 6];
405        file.read_exact(&mut buf).unwrap();
406        assert_eq!(&buf, b"chunk1");
407
408        file.read_exact(&mut buf).unwrap();
409        assert_eq!(&buf, b"chunk2");
410
411        file.read_exact(&mut buf).unwrap();
412        assert_eq!(&buf, b"chunk3");
413    }
414
415    #[test]
416    fn test_spill_file_seek() {
417        let temp = TempDir::new().unwrap();
418        let mut file = SpillFile::new(temp.path()).unwrap();
419
420        file.write_all(b"0123456789").unwrap();
421        file.flush().unwrap();
422        file.prepare_for_read().unwrap();
423
424        // Seek to position 5
425        file.seek(SeekFrom::Start(5)).unwrap();
426
427        let mut buf = [0u8; 5];
428        file.read_exact(&mut buf).unwrap();
429        assert_eq!(&buf, b"56789");
430    }
431}