grafeo_core/execution/spill/
file.rs1use std::fs::File;
4use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
5use std::path::{Path, PathBuf};
6
7const BUFFER_SIZE: usize = 64 * 1024;
9
10pub struct SpillFile {
18 path: PathBuf,
20 writer: Option<BufWriter<File>>,
22 bytes_written: u64,
24}
25
26impl SpillFile {
27 pub fn new(path: PathBuf) -> std::io::Result<Self> {
33 let file = File::create(&path)?;
34 let writer = BufWriter::with_capacity(BUFFER_SIZE, file);
35
36 Ok(Self {
37 path,
38 writer: Some(writer),
39 bytes_written: 0,
40 })
41 }
42
43 #[must_use]
45 pub fn path(&self) -> &Path {
46 &self.path
47 }
48
49 #[must_use]
51 pub fn bytes_written(&self) -> u64 {
52 self.bytes_written
53 }
54
55 pub fn write_all(&mut self, data: &[u8]) -> std::io::Result<()> {
61 let writer = self
62 .writer
63 .as_mut()
64 .ok_or_else(|| std::io::Error::other("Write phase ended"))?;
65
66 writer.write_all(data)?;
67 self.bytes_written += data.len() as u64;
68 Ok(())
69 }
70
71 pub fn write_u64_le(&mut self, value: u64) -> std::io::Result<()> {
77 self.write_all(&value.to_le_bytes())
78 }
79
80 pub fn write_i64_le(&mut self, value: i64) -> std::io::Result<()> {
86 self.write_all(&value.to_le_bytes())
87 }
88
89 pub fn write_bytes(&mut self, data: &[u8]) -> std::io::Result<()> {
97 self.write_u64_le(data.len() as u64)?;
98 self.write_all(data)
99 }
100
101 pub fn finish_write(&mut self) -> std::io::Result<()> {
109 if let Some(mut writer) = self.writer.take() {
110 writer.flush()?;
111 }
112 Ok(())
113 }
114
115 #[must_use]
117 pub fn is_writable(&self) -> bool {
118 self.writer.is_some()
119 }
120
121 pub fn reader(&self) -> std::io::Result<SpillFileReader> {
129 let file = File::open(&self.path)?;
130 let reader = BufReader::with_capacity(BUFFER_SIZE, file);
131 Ok(SpillFileReader { reader })
132 }
133
134 pub fn delete(mut self) -> std::io::Result<()> {
142 self.writer = None;
144 std::fs::remove_file(&self.path)
145 }
146}
147
148impl std::fmt::Debug for SpillFile {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct("SpillFile")
151 .field("path", &self.path)
152 .field("bytes_written", &self.bytes_written)
153 .field("is_writable", &self.is_writable())
154 .finish()
155 }
156}
157
158pub struct SpillFileReader {
162 reader: BufReader<File>,
164}
165
166impl SpillFileReader {
167 pub fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
173 self.reader.read_exact(buf)
174 }
175
176 pub fn read_u64_le(&mut self) -> std::io::Result<u64> {
182 let mut buf = [0u8; 8];
183 self.read_exact(&mut buf)?;
184 Ok(u64::from_le_bytes(buf))
185 }
186
187 pub fn read_i64_le(&mut self) -> std::io::Result<i64> {
193 let mut buf = [0u8; 8];
194 self.read_exact(&mut buf)?;
195 Ok(i64::from_le_bytes(buf))
196 }
197
198 pub fn read_f64_le(&mut self) -> std::io::Result<f64> {
204 let mut buf = [0u8; 8];
205 self.read_exact(&mut buf)?;
206 Ok(f64::from_le_bytes(buf))
207 }
208
209 pub fn read_u8(&mut self) -> std::io::Result<u8> {
215 let mut buf = [0u8; 1];
216 self.read_exact(&mut buf)?;
217 Ok(buf[0])
218 }
219
220 pub fn read_bytes(&mut self) -> std::io::Result<Vec<u8>> {
228 let raw_len = self.read_u64_le()?;
229 let len = usize::try_from(raw_len).map_err(|_| {
230 std::io::Error::new(
231 std::io::ErrorKind::InvalidData,
232 format!("byte-prefix length {raw_len} exceeds addressable range"),
233 )
234 })?;
235 let mut buf = vec![0u8; len];
236 self.read_exact(&mut buf)?;
237 Ok(buf)
238 }
239
240 pub fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
246 self.reader.seek(pos)
247 }
248
249 pub fn rewind(&mut self) -> std::io::Result<()> {
255 self.reader.seek(SeekFrom::Start(0))?;
256 Ok(())
257 }
258
259 pub fn position(&mut self) -> std::io::Result<u64> {
265 self.reader.stream_position()
266 }
267}
268
269impl std::fmt::Debug for SpillFileReader {
270 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
271 f.debug_struct("SpillFileReader").finish()
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use super::*;
278 use tempfile::TempDir;
279
280 #[test]
281 fn test_spill_file_write_read() {
282 let temp_dir = TempDir::new().unwrap();
283 let file_path = temp_dir.path().join("test.spill");
284
285 let mut file = SpillFile::new(file_path).unwrap();
287 file.write_all(b"hello ").unwrap();
288 file.write_all(b"world").unwrap();
289 assert_eq!(file.bytes_written(), 11);
290 file.finish_write().unwrap();
291
292 let mut reader = file.reader().unwrap();
294 let mut buf = [0u8; 11];
295 reader.read_exact(&mut buf).unwrap();
296 assert_eq!(&buf, b"hello world");
297 }
298
299 #[test]
300 fn test_spill_file_integers() {
301 let temp_dir = TempDir::new().unwrap();
302 let file_path = temp_dir.path().join("test.spill");
303
304 let mut file = SpillFile::new(file_path).unwrap();
305 file.write_u64_le(u64::MAX).unwrap();
306 file.write_i64_le(i64::MIN).unwrap();
307 file.finish_write().unwrap();
308
309 let mut reader = file.reader().unwrap();
310 assert_eq!(reader.read_u64_le().unwrap(), u64::MAX);
311 assert_eq!(reader.read_i64_le().unwrap(), i64::MIN);
312 }
313
314 #[test]
315 fn test_spill_file_bytes_prefixed() {
316 let temp_dir = TempDir::new().unwrap();
317 let file_path = temp_dir.path().join("test.spill");
318
319 let mut file = SpillFile::new(file_path).unwrap();
320 file.write_bytes(b"short").unwrap();
321 file.write_bytes(b"longer string here").unwrap();
322 file.finish_write().unwrap();
323
324 let mut reader = file.reader().unwrap();
325 assert_eq!(reader.read_bytes().unwrap(), b"short");
326 assert_eq!(reader.read_bytes().unwrap(), b"longer string here");
327 }
328
329 #[test]
330 fn test_spill_file_multiple_readers() {
331 let temp_dir = TempDir::new().unwrap();
332 let file_path = temp_dir.path().join("test.spill");
333
334 let mut file = SpillFile::new(file_path).unwrap();
335 file.write_u64_le(42).unwrap();
336 file.write_u64_le(100).unwrap();
337 file.finish_write().unwrap();
338
339 let mut reader1 = file.reader().unwrap();
341 let mut reader2 = file.reader().unwrap();
342
343 assert_eq!(reader1.read_u64_le().unwrap(), 42);
345
346 assert_eq!(reader2.read_u64_le().unwrap(), 42);
348 assert_eq!(reader2.read_u64_le().unwrap(), 100);
349
350 assert_eq!(reader1.read_u64_le().unwrap(), 100);
352 }
353
354 #[test]
355 fn test_spill_file_delete() {
356 let temp_dir = TempDir::new().unwrap();
357 let file_path = temp_dir.path().join("test.spill");
358
359 let mut file = SpillFile::new(file_path.clone()).unwrap();
360 file.write_all(b"data").unwrap();
361 file.finish_write().unwrap();
362
363 assert!(file_path.exists());
364 file.delete().unwrap();
365 assert!(!file_path.exists());
366 }
367
368 #[test]
369 fn test_reader_seek() {
370 let temp_dir = TempDir::new().unwrap();
371 let file_path = temp_dir.path().join("test.spill");
372
373 let mut file = SpillFile::new(file_path).unwrap();
374 file.write_u64_le(1).unwrap();
375 file.write_u64_le(2).unwrap();
376 file.write_u64_le(3).unwrap();
377 file.finish_write().unwrap();
378
379 let mut reader = file.reader().unwrap();
380
381 reader.seek(SeekFrom::Start(8)).unwrap();
383 assert_eq!(reader.read_u64_le().unwrap(), 2);
384
385 reader.rewind().unwrap();
387 assert_eq!(reader.read_u64_le().unwrap(), 1);
388 }
389}