vibesql_executor/memory/
spill.rs1use std::{
14 fs::{self, File, OpenOptions},
15 io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write},
16 path::{Path, PathBuf},
17 sync::atomic::{AtomicU64, Ordering},
18};
19
20static FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
22
23pub struct SpillFile {
47 path: PathBuf,
49
50 writer: Option<BufWriter<File>>,
52
53 reader: Option<BufReader<File>>,
55
56 bytes_written: usize,
58
59 created: bool,
61}
62
63impl SpillFile {
64 pub fn new(temp_dir: &Path) -> io::Result<Self> {
68 fs::create_dir_all(temp_dir)?;
70
71 let id = FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
73 let pid = std::process::id();
74 let filename = format!("vibesql_spill_{}_{}.tmp", pid, id);
75 let path = temp_dir.join(filename);
76
77 Ok(Self { path, writer: None, reader: None, bytes_written: 0, created: false })
78 }
79
80 pub fn with_suffix(temp_dir: &Path, suffix: &str) -> io::Result<Self> {
84 fs::create_dir_all(temp_dir)?;
85
86 let id = FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
87 let pid = std::process::id();
88 let filename = format!("vibesql_spill_{}_{}_{}.tmp", pid, id, suffix);
89 let path = temp_dir.join(filename);
90
91 Ok(Self { path, writer: None, reader: None, bytes_written: 0, created: false })
92 }
93
94 pub fn path(&self) -> &Path {
96 &self.path
97 }
98
99 pub fn bytes_written(&self) -> usize {
101 self.bytes_written
102 }
103
104 pub fn is_created(&self) -> bool {
106 self.created
107 }
108
109 fn ensure_writer(&mut self) -> io::Result<&mut BufWriter<File>> {
111 if self.writer.is_none() {
112 let file = OpenOptions::new()
113 .read(true)
114 .write(true)
115 .create(true)
116 .truncate(true)
117 .open(&self.path)?;
118 self.writer = Some(BufWriter::with_capacity(64 * 1024, file)); self.created = true;
120 }
121 Ok(self.writer.as_mut().unwrap())
122 }
123
124 pub fn write_all(&mut self, data: &[u8]) -> io::Result<()> {
126 let writer = self.ensure_writer()?;
127 writer.write_all(data)?;
128 self.bytes_written += data.len();
129 Ok(())
130 }
131
132 pub fn flush(&mut self) -> io::Result<()> {
134 if let Some(writer) = self.writer.as_mut() {
135 writer.flush()?;
136 }
137 Ok(())
138 }
139
140 pub fn prepare_for_read(&mut self) -> io::Result<()> {
144 if let Some(mut writer) = self.writer.take() {
146 writer.flush()?;
147 }
149
150 if self.created {
152 let file = File::open(&self.path)?;
153 self.reader = Some(BufReader::with_capacity(64 * 1024, file)); }
155
156 Ok(())
157 }
158
159 pub fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
161 if let Some(reader) = self.reader.as_mut() {
162 reader.seek(pos)
163 } else if let Some(writer) = self.writer.as_mut() {
164 writer.seek(pos)
165 } else {
166 Ok(0)
167 }
168 }
169
170 pub fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
172 if self.reader.is_none() {
173 self.prepare_for_read()?;
174 }
175 if let Some(reader) = self.reader.as_mut() {
176 reader.read(buf)
177 } else {
178 Ok(0)
179 }
180 }
181
182 pub fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
184 if self.reader.is_none() {
185 self.prepare_for_read()?;
186 }
187 if let Some(reader) = self.reader.as_mut() {
188 reader.read_exact(buf)
189 } else {
190 Err(io::Error::new(io::ErrorKind::UnexpectedEof, "spill file not created"))
191 }
192 }
193
194 pub fn read_to_vec(&mut self) -> io::Result<Vec<u8>> {
196 self.seek(SeekFrom::Start(0))?;
197 let mut data = Vec::with_capacity(self.bytes_written);
198 if let Some(reader) = self.reader.as_mut() {
199 reader.read_to_end(&mut data)?;
200 }
201 Ok(data)
202 }
203
204 fn delete(&mut self) {
206 self.writer = None;
208 self.reader = None;
209
210 if self.created {
212 let _ = fs::remove_file(&self.path);
213 }
214 }
215}
216
217impl Drop for SpillFile {
218 fn drop(&mut self) {
219 self.delete();
220 }
221}
222
223pub struct SpillFileSet {
228 temp_dir: PathBuf,
230
231 files: Vec<SpillFile>,
233
234 total_bytes: usize,
236}
237
238impl SpillFileSet {
239 pub fn new(temp_dir: PathBuf) -> Self {
241 Self { temp_dir, files: Vec::new(), total_bytes: 0 }
242 }
243
244 pub fn create_file(&mut self) -> io::Result<&mut SpillFile> {
246 let file = SpillFile::new(&self.temp_dir)?;
247 self.files.push(file);
248 Ok(self.files.last_mut().unwrap())
249 }
250
251 pub fn create_file_with_suffix(&mut self, suffix: &str) -> io::Result<&mut SpillFile> {
253 let file = SpillFile::with_suffix(&self.temp_dir, suffix)?;
254 self.files.push(file);
255 Ok(self.files.last_mut().unwrap())
256 }
257
258 pub fn len(&self) -> usize {
260 self.files.len()
261 }
262
263 pub fn is_empty(&self) -> bool {
265 self.files.is_empty()
266 }
267
268 pub fn total_bytes(&self) -> usize {
270 self.files.iter().map(|f| f.bytes_written()).sum()
271 }
272
273 pub fn files(&self) -> &[SpillFile] {
275 &self.files
276 }
277
278 pub fn files_mut(&mut self) -> &mut [SpillFile] {
280 &mut self.files
281 }
282
283 pub fn into_files(self) -> Vec<SpillFile> {
285 self.files
286 }
287
288 pub fn prepare_all_for_read(&mut self) -> io::Result<()> {
290 for file in &mut self.files {
291 file.prepare_for_read()?;
292 }
293 Ok(())
294 }
295
296 pub fn clear(&mut self) {
298 self.files.clear();
299 self.total_bytes = 0;
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use tempfile::TempDir;
306
307 use super::*;
308
309 #[test]
310 fn test_spill_file_create_and_write() {
311 let temp = TempDir::new().unwrap();
312 let mut file = SpillFile::new(temp.path()).unwrap();
313
314 assert!(!file.is_created());
316 assert_eq!(file.bytes_written(), 0);
317
318 file.write_all(b"hello world").unwrap();
320 file.flush().unwrap();
321
322 assert!(file.is_created());
323 assert_eq!(file.bytes_written(), 11);
324 assert!(file.path().exists());
325 }
326
327 #[test]
328 fn test_spill_file_read_back() {
329 let temp = TempDir::new().unwrap();
330 let mut file = SpillFile::new(temp.path()).unwrap();
331
332 let test_data = b"test data 12345";
333 file.write_all(test_data).unwrap();
334 file.flush().unwrap();
335
336 file.prepare_for_read().unwrap();
337 let read_data = file.read_to_vec().unwrap();
338
339 assert_eq!(read_data, test_data);
340 }
341
342 #[test]
343 fn test_spill_file_auto_delete() {
344 let temp = TempDir::new().unwrap();
345 let path;
346 {
347 let mut file = SpillFile::new(temp.path()).unwrap();
348 file.write_all(b"data").unwrap();
349 file.flush().unwrap();
350 path = file.path().to_path_buf();
351 assert!(path.exists());
352 }
353 assert!(!path.exists());
355 }
356
357 #[test]
358 fn test_spill_file_with_suffix() {
359 let temp = TempDir::new().unwrap();
360 let file = SpillFile::with_suffix(temp.path(), "sort_run").unwrap();
361
362 assert!(file.path().to_string_lossy().contains("sort_run"));
363 }
364
365 #[test]
366 fn test_spill_file_set() {
367 let temp = TempDir::new().unwrap();
368 let mut set = SpillFileSet::new(temp.path().to_path_buf());
369
370 assert!(set.is_empty());
371
372 {
374 let file = set.create_file().unwrap();
375 file.write_all(b"run1").unwrap();
376 }
377 {
378 let file = set.create_file().unwrap();
379 file.write_all(b"run22").unwrap();
380 }
381
382 assert_eq!(set.len(), 2);
383 assert_eq!(set.total_bytes(), 9); set.clear();
387 assert!(set.is_empty());
388 }
389
390 #[test]
391 fn test_spill_file_sequential_read() {
392 let temp = TempDir::new().unwrap();
393 let mut file = SpillFile::new(temp.path()).unwrap();
394
395 file.write_all(b"chunk1").unwrap();
397 file.write_all(b"chunk2").unwrap();
398 file.write_all(b"chunk3").unwrap();
399 file.flush().unwrap();
400
401 file.prepare_for_read().unwrap();
403
404 let mut buf = [0u8; 6];
405 file.read_exact(&mut buf).unwrap();
406 assert_eq!(&buf, b"chunk1");
407
408 file.read_exact(&mut buf).unwrap();
409 assert_eq!(&buf, b"chunk2");
410
411 file.read_exact(&mut buf).unwrap();
412 assert_eq!(&buf, b"chunk3");
413 }
414
415 #[test]
416 fn test_spill_file_seek() {
417 let temp = TempDir::new().unwrap();
418 let mut file = SpillFile::new(temp.path()).unwrap();
419
420 file.write_all(b"0123456789").unwrap();
421 file.flush().unwrap();
422 file.prepare_for_read().unwrap();
423
424 file.seek(SeekFrom::Start(5)).unwrap();
426
427 let mut buf = [0u8; 5];
428 file.read_exact(&mut buf).unwrap();
429 assert_eq!(&buf, b"56789");
430 }
431}