grafeo_core/execution/spill/
file.rs1use std::fs::File;
4use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
5use std::path::{Path, PathBuf};
6
7const BUFFER_SIZE: usize = 64 * 1024;
9
10pub struct SpillFile {
18 path: PathBuf,
20 writer: Option<BufWriter<File>>,
22 bytes_written: u64,
24}
25
26impl SpillFile {
27 pub fn new(path: PathBuf) -> std::io::Result<Self> {
33 let file = File::create(&path)?;
34 let writer = BufWriter::with_capacity(BUFFER_SIZE, file);
35
36 Ok(Self {
37 path,
38 writer: Some(writer),
39 bytes_written: 0,
40 })
41 }
42
43 #[must_use]
45 pub fn path(&self) -> &Path {
46 &self.path
47 }
48
49 #[must_use]
51 pub fn bytes_written(&self) -> u64 {
52 self.bytes_written
53 }
54
55 pub fn write_all(&mut self, data: &[u8]) -> std::io::Result<()> {
61 let writer = self
62 .writer
63 .as_mut()
64 .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "Write phase ended"))?;
65
66 writer.write_all(data)?;
67 self.bytes_written += data.len() as u64;
68 Ok(())
69 }
70
71 pub fn write_u64_le(&mut self, value: u64) -> std::io::Result<()> {
77 self.write_all(&value.to_le_bytes())
78 }
79
80 pub fn write_i64_le(&mut self, value: i64) -> std::io::Result<()> {
86 self.write_all(&value.to_le_bytes())
87 }
88
89 pub fn write_bytes(&mut self, data: &[u8]) -> std::io::Result<()> {
97 self.write_u64_le(data.len() as u64)?;
98 self.write_all(data)
99 }
100
101 pub fn finish_write(&mut self) -> std::io::Result<()> {
109 if let Some(mut writer) = self.writer.take() {
110 writer.flush()?;
111 }
112 Ok(())
113 }
114
115 #[must_use]
117 pub fn is_writable(&self) -> bool {
118 self.writer.is_some()
119 }
120
121 pub fn reader(&self) -> std::io::Result<SpillFileReader> {
129 let file = File::open(&self.path)?;
130 let reader = BufReader::with_capacity(BUFFER_SIZE, file);
131 Ok(SpillFileReader { reader })
132 }
133
134 pub fn delete(mut self) -> std::io::Result<()> {
142 self.writer = None;
144 std::fs::remove_file(&self.path)
145 }
146}
147
148impl std::fmt::Debug for SpillFile {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct("SpillFile")
151 .field("path", &self.path)
152 .field("bytes_written", &self.bytes_written)
153 .field("is_writable", &self.is_writable())
154 .finish()
155 }
156}
157
158pub struct SpillFileReader {
162 reader: BufReader<File>,
164}
165
166impl SpillFileReader {
167 pub fn read_exact(&mut self, buf: &mut [u8]) -> std::io::Result<()> {
173 self.reader.read_exact(buf)
174 }
175
176 pub fn read_u64_le(&mut self) -> std::io::Result<u64> {
182 let mut buf = [0u8; 8];
183 self.read_exact(&mut buf)?;
184 Ok(u64::from_le_bytes(buf))
185 }
186
187 pub fn read_i64_le(&mut self) -> std::io::Result<i64> {
193 let mut buf = [0u8; 8];
194 self.read_exact(&mut buf)?;
195 Ok(i64::from_le_bytes(buf))
196 }
197
198 pub fn read_f64_le(&mut self) -> std::io::Result<f64> {
204 let mut buf = [0u8; 8];
205 self.read_exact(&mut buf)?;
206 Ok(f64::from_le_bytes(buf))
207 }
208
209 pub fn read_u8(&mut self) -> std::io::Result<u8> {
215 let mut buf = [0u8; 1];
216 self.read_exact(&mut buf)?;
217 Ok(buf[0])
218 }
219
220 pub fn read_bytes(&mut self) -> std::io::Result<Vec<u8>> {
228 let len = self.read_u64_le()? as usize;
229 let mut buf = vec![0u8; len];
230 self.read_exact(&mut buf)?;
231 Ok(buf)
232 }
233
234 pub fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
240 self.reader.seek(pos)
241 }
242
243 pub fn rewind(&mut self) -> std::io::Result<()> {
249 self.reader.seek(SeekFrom::Start(0))?;
250 Ok(())
251 }
252
253 pub fn position(&mut self) -> std::io::Result<u64> {
259 self.reader.stream_position()
260 }
261}
262
263impl std::fmt::Debug for SpillFileReader {
264 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265 f.debug_struct("SpillFileReader").finish()
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use tempfile::TempDir;
273
274 #[test]
275 fn test_spill_file_write_read() {
276 let temp_dir = TempDir::new().unwrap();
277 let file_path = temp_dir.path().join("test.spill");
278
279 let mut file = SpillFile::new(file_path).unwrap();
281 file.write_all(b"hello ").unwrap();
282 file.write_all(b"world").unwrap();
283 assert_eq!(file.bytes_written(), 11);
284 file.finish_write().unwrap();
285
286 let mut reader = file.reader().unwrap();
288 let mut buf = [0u8; 11];
289 reader.read_exact(&mut buf).unwrap();
290 assert_eq!(&buf, b"hello world");
291 }
292
293 #[test]
294 fn test_spill_file_integers() {
295 let temp_dir = TempDir::new().unwrap();
296 let file_path = temp_dir.path().join("test.spill");
297
298 let mut file = SpillFile::new(file_path).unwrap();
299 file.write_u64_le(u64::MAX).unwrap();
300 file.write_i64_le(i64::MIN).unwrap();
301 file.finish_write().unwrap();
302
303 let mut reader = file.reader().unwrap();
304 assert_eq!(reader.read_u64_le().unwrap(), u64::MAX);
305 assert_eq!(reader.read_i64_le().unwrap(), i64::MIN);
306 }
307
308 #[test]
309 fn test_spill_file_bytes_prefixed() {
310 let temp_dir = TempDir::new().unwrap();
311 let file_path = temp_dir.path().join("test.spill");
312
313 let mut file = SpillFile::new(file_path).unwrap();
314 file.write_bytes(b"short").unwrap();
315 file.write_bytes(b"longer string here").unwrap();
316 file.finish_write().unwrap();
317
318 let mut reader = file.reader().unwrap();
319 assert_eq!(reader.read_bytes().unwrap(), b"short");
320 assert_eq!(reader.read_bytes().unwrap(), b"longer string here");
321 }
322
323 #[test]
324 fn test_spill_file_multiple_readers() {
325 let temp_dir = TempDir::new().unwrap();
326 let file_path = temp_dir.path().join("test.spill");
327
328 let mut file = SpillFile::new(file_path).unwrap();
329 file.write_u64_le(42).unwrap();
330 file.write_u64_le(100).unwrap();
331 file.finish_write().unwrap();
332
333 let mut reader1 = file.reader().unwrap();
335 let mut reader2 = file.reader().unwrap();
336
337 assert_eq!(reader1.read_u64_le().unwrap(), 42);
339
340 assert_eq!(reader2.read_u64_le().unwrap(), 42);
342 assert_eq!(reader2.read_u64_le().unwrap(), 100);
343
344 assert_eq!(reader1.read_u64_le().unwrap(), 100);
346 }
347
348 #[test]
349 fn test_spill_file_delete() {
350 let temp_dir = TempDir::new().unwrap();
351 let file_path = temp_dir.path().join("test.spill");
352
353 let mut file = SpillFile::new(file_path.clone()).unwrap();
354 file.write_all(b"data").unwrap();
355 file.finish_write().unwrap();
356
357 assert!(file_path.exists());
358 file.delete().unwrap();
359 assert!(!file_path.exists());
360 }
361
362 #[test]
363 fn test_reader_seek() {
364 let temp_dir = TempDir::new().unwrap();
365 let file_path = temp_dir.path().join("test.spill");
366
367 let mut file = SpillFile::new(file_path).unwrap();
368 file.write_u64_le(1).unwrap();
369 file.write_u64_le(2).unwrap();
370 file.write_u64_le(3).unwrap();
371 file.finish_write().unwrap();
372
373 let mut reader = file.reader().unwrap();
374
375 reader.seek(SeekFrom::Start(8)).unwrap();
377 assert_eq!(reader.read_u64_le().unwrap(), 2);
378
379 reader.rewind().unwrap();
381 assert_eq!(reader.read_u64_le().unwrap(), 1);
382 }
383}