Skip to main content

midi_toolkit/io/
virtual_file.rs

1use std::{
2    collections::BTreeMap,
3    fs::{remove_file, File, OpenOptions},
4    io::{self, Read, Seek, SeekFrom, Write},
5    path::{Path, PathBuf},
6    process,
7    sync::{
8        atomic::{AtomicU64, Ordering},
9        Arc, Mutex,
10    },
11    time::{SystemTime, UNIX_EPOCH},
12};
13
14use super::VirtualFileError;
15
16static TEMP_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
17
18#[derive(Clone, Copy, Debug)]
19struct Extent {
20    offset: u64,
21    len: u64,
22}
23
24#[derive(Debug)]
25struct State {
26    writer: Option<File>,
27    next_offset: u64,
28    next_stream_id: u64,
29    streams: BTreeMap<u64, Vec<Extent>>,
30}
31
32#[derive(Debug)]
33struct Inner {
34    path: PathBuf,
35    delete_on_drop: bool,
36    state: Mutex<State>,
37}
38
39impl Inner {
40    fn append(&self, stream_id: u64, bytes: &[u8]) -> Result<usize, VirtualFileError> {
41        if bytes.is_empty() {
42            return Ok(0);
43        }
44
45        let mut state = self.state.lock().unwrap();
46        if !state.streams.contains_key(&stream_id) {
47            return Err(VirtualFileError::UnknownStream { stream_id });
48        }
49
50        let offset = state.next_offset;
51        {
52            let writer = state.writer.as_mut().ok_or_else(|| {
53                io::Error::new(io::ErrorKind::BrokenPipe, "backing file already closed")
54            })?;
55            writer.seek(SeekFrom::Start(offset))?;
56            writer.write_all(bytes)?;
57        }
58
59        state.next_offset += bytes.len() as u64;
60        state
61            .streams
62            .get_mut(&stream_id)
63            .expect("stream presence was checked above")
64            .push(Extent {
65                offset,
66                len: bytes.len() as u64,
67            });
68
69        Ok(bytes.len())
70    }
71
72    fn flush(&self) -> io::Result<()> {
73        let mut state = self.state.lock().unwrap();
74        match state.writer.as_mut() {
75            Some(writer) => writer.flush(),
76            None => Ok(()),
77        }
78    }
79}
80
81impl Drop for Inner {
82    fn drop(&mut self) {
83        if !self.delete_on_drop {
84            return;
85        }
86
87        let writer = self.state.lock().unwrap().writer.take();
88        drop(writer);
89        let _ = remove_file(&self.path);
90    }
91}
92
93fn create_unique_temp_file() -> io::Result<(File, PathBuf)> {
94    let base = std::env::temp_dir();
95    let pid = process::id();
96    let nanos = SystemTime::now()
97        .duration_since(UNIX_EPOCH)
98        .unwrap_or_default()
99        .as_nanos();
100
101    for _ in 0..32 {
102        let counter = TEMP_FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
103        let path = base.join(format!(
104            "midi-toolkit-interleaved-{pid}-{nanos}-{counter}.bin"
105        ));
106
107        let file = OpenOptions::new()
108            .read(true)
109            .write(true)
110            .create_new(true)
111            .open(&path);
112
113        match file {
114            Ok(file) => return Ok((file, path)),
115            Err(err) if err.kind() == io::ErrorKind::AlreadyExists => continue,
116            Err(err) => return Err(err),
117        }
118    }
119
120    Err(io::Error::new(
121        io::ErrorKind::AlreadyExists,
122        "failed to allocate a unique temporary file",
123    ))
124}
125
126#[derive(Clone, Debug)]
127pub struct InterleavedTempFile {
128    inner: Arc<Inner>,
129}
130
131impl InterleavedTempFile {
132    pub fn new_temp() -> Result<Self, VirtualFileError> {
133        let (file, path) = create_unique_temp_file()?;
134        Ok(Self::new_inner(file, path, true))
135    }
136
137    pub fn new_at_path(path: impl AsRef<Path>) -> Result<Self, VirtualFileError> {
138        let path = path.as_ref().to_path_buf();
139        let file = OpenOptions::new()
140            .read(true)
141            .write(true)
142            .create(true)
143            .truncate(true)
144            .open(&path)?;
145        Ok(Self::new_inner(file, path, false))
146    }
147
148    pub fn new_temp_at_path(path: impl AsRef<Path>) -> Result<Self, VirtualFileError> {
149        let path = path.as_ref().to_path_buf();
150        let file = OpenOptions::new()
151            .read(true)
152            .write(true)
153            .create_new(true)
154            .open(&path)?;
155        Ok(Self::new_inner(file, path, true))
156    }
157
158    fn new_inner(file: File, path: PathBuf, delete_on_drop: bool) -> Self {
159        Self {
160            inner: Arc::new(Inner {
161                path,
162                delete_on_drop,
163                state: Mutex::new(State {
164                    writer: Some(file),
165                    next_offset: 0,
166                    next_stream_id: 0,
167                    streams: BTreeMap::new(),
168                }),
169            }),
170        }
171    }
172
173    pub fn path(&self) -> &Path {
174        &self.inner.path
175    }
176
177    pub fn spawn_stream(&self) -> VirtualStreamWriter {
178        let mut state = self.inner.state.lock().unwrap();
179        let stream_id = state.next_stream_id;
180        state.next_stream_id += 1;
181        state.streams.insert(stream_id, Vec::new());
182
183        VirtualStreamWriter {
184            inner: Arc::clone(&self.inner),
185            stream_id,
186        }
187    }
188
189    pub fn stream_ids(&self) -> Vec<u64> {
190        self.inner
191            .state
192            .lock()
193            .unwrap()
194            .streams
195            .keys()
196            .copied()
197            .collect()
198    }
199
200    pub fn open_reader(&self, stream_id: u64) -> Result<VirtualStreamReader, VirtualFileError> {
201        let extents = self
202            .inner
203            .state
204            .lock()
205            .unwrap()
206            .streams
207            .get(&stream_id)
208            .cloned()
209            .ok_or(VirtualFileError::UnknownStream { stream_id })?;
210
211        let file = File::open(&self.inner.path)?;
212        Ok(VirtualStreamReader {
213            _inner: Arc::clone(&self.inner),
214            file,
215            extents,
216            extent_index: 0,
217            extent_offset: 0,
218        })
219    }
220
221    pub fn flush(&self) -> Result<(), VirtualFileError> {
222        Ok(self.inner.flush()?)
223    }
224
225    pub fn remove_backing_file(&self) -> Result<(), VirtualFileError> {
226        let writer = self.inner.state.lock().unwrap().writer.take();
227        drop(writer);
228
229        match remove_file(&self.inner.path) {
230            Ok(()) => Ok(()),
231            Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(()),
232            Err(err) => Err(err.into()),
233        }
234    }
235}
236
237#[derive(Debug)]
238pub struct VirtualStreamWriter {
239    inner: Arc<Inner>,
240    stream_id: u64,
241}
242
243impl VirtualStreamWriter {
244    pub fn stream_id(&self) -> u64 {
245        self.stream_id
246    }
247
248    pub fn flush_to_disk(&self) -> Result<(), VirtualFileError> {
249        Ok(self.inner.flush()?)
250    }
251}
252
253impl Write for VirtualStreamWriter {
254    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
255        self.inner
256            .append(self.stream_id, buf)
257            .map_err(|err| match err {
258                VirtualFileError::FilesystemError(err) => err,
259                other => io::Error::new(io::ErrorKind::NotFound, other),
260            })
261    }
262
263    fn flush(&mut self) -> io::Result<()> {
264        self.inner.flush()
265    }
266}
267
268#[derive(Debug)]
269pub struct VirtualStreamReader {
270    _inner: Arc<Inner>,
271    file: File,
272    extents: Vec<Extent>,
273    extent_index: usize,
274    extent_offset: u64,
275}
276
277impl Read for VirtualStreamReader {
278    fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
279        let mut total = 0;
280
281        while !buf.is_empty() && self.extent_index < self.extents.len() {
282            let extent = self.extents[self.extent_index];
283            if self.extent_offset >= extent.len {
284                self.extent_index += 1;
285                self.extent_offset = 0;
286                continue;
287            }
288
289            let remaining = (extent.len - self.extent_offset) as usize;
290            let to_read = remaining.min(buf.len());
291            self.file
292                .seek(SeekFrom::Start(extent.offset + self.extent_offset))?;
293
294            let read = self.file.read(&mut buf[..to_read])?;
295            if read == 0 {
296                break;
297            }
298
299            total += read;
300            self.extent_offset += read as u64;
301
302            let (_, rest) = buf.split_at_mut(read);
303            buf = rest;
304        }
305
306        Ok(total)
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::{InterleavedTempFile, VirtualFileError};
313    use std::{
314        io::{Read, Write},
315        thread,
316        time::Instant,
317    };
318
319    fn assert_send<T: Send>() {}
320    fn assert_send_sync<T: Send + Sync>() {}
321
322    fn make_chunk(stream_index: usize, chunk_index: usize, chunk_len: usize) -> Vec<u8> {
323        (0..chunk_len)
324            .map(|byte_index| ((stream_index * 31 + chunk_index * 17 + byte_index) & 0xFF) as u8)
325            .collect()
326    }
327
328    fn run_stress_case(stream_count: usize, chunks_per_stream: usize, chunk_len: usize) {
329        let storage = InterleavedTempFile::new_temp().unwrap();
330        let total_bytes = stream_count * chunks_per_stream * chunk_len;
331
332        let write_started = Instant::now();
333        let mut handles = Vec::new();
334        for stream_index in 0..stream_count {
335            let mut writer = storage.spawn_stream();
336            handles.push(thread::spawn(move || {
337                let stream_id = writer.stream_id();
338                for chunk_index in 0..chunks_per_stream {
339                    let chunk = make_chunk(stream_index, chunk_index, chunk_len);
340                    writer.write_all(&chunk).unwrap();
341                }
342                writer.flush().unwrap();
343                stream_id
344            }));
345        }
346
347        let mut stream_ids = Vec::new();
348        for handle in handles {
349            stream_ids.push(handle.join().unwrap());
350        }
351        let write_elapsed = write_started.elapsed();
352
353        let read_started = Instant::now();
354        let mut read_handles = Vec::new();
355        for (stream_index, stream_id) in stream_ids.into_iter().enumerate() {
356            let storage = storage.clone();
357            read_handles.push(thread::spawn(move || {
358                let mut reader = storage.open_reader(stream_id).unwrap();
359                for chunk_index in 0..chunks_per_stream {
360                    let expected = make_chunk(stream_index, chunk_index, chunk_len);
361                    let mut actual = vec![0; chunk_len];
362                    reader.read_exact(&mut actual).unwrap();
363                    assert_eq!(actual, expected);
364                }
365
366                let mut tail = [0_u8; 1];
367                assert_eq!(reader.read(&mut tail).unwrap(), 0);
368            }));
369        }
370
371        for handle in read_handles {
372            handle.join().unwrap();
373        }
374        let read_elapsed = read_started.elapsed();
375
376        let total_mib = total_bytes as f64 / (1024.0 * 1024.0);
377        println!(
378            "stress_case streams={stream_count} chunks_per_stream={chunks_per_stream} chunk_len={chunk_len} total_bytes={total_bytes} write={write_elapsed:?} ({:.2} MiB/s) read={read_elapsed:?} ({:.2} MiB/s)",
379            total_mib / write_elapsed.as_secs_f64(),
380            total_mib / read_elapsed.as_secs_f64(),
381        );
382    }
383
384    #[test]
385    fn manager_is_send_and_sync() {
386        assert_send_sync::<InterleavedTempFile>();
387    }
388
389    #[test]
390    fn stream_writer_is_send() {
391        assert_send::<super::VirtualStreamWriter>();
392    }
393
394    #[test]
395    fn writes_can_be_interleaved_and_read_back_independently() {
396        let storage = InterleavedTempFile::new_temp().unwrap();
397
398        let mut handles = Vec::new();
399        for stream_index in 0..3 {
400            let mut writer = storage.spawn_stream();
401            handles.push(thread::spawn(move || {
402                let stream_id = writer.stream_id();
403                let parts = [
404                    format!("stream-{stream_index}:alpha|"),
405                    format!("stream-{stream_index}:beta|"),
406                    format!("stream-{stream_index}:gamma"),
407                ];
408
409                let mut expected = String::new();
410                for part in parts {
411                    writer.write_all(part.as_bytes()).unwrap();
412                    expected.push_str(&part);
413                }
414
415                writer.flush().unwrap();
416                (stream_id, expected)
417            }));
418        }
419
420        let mut expected_streams = Vec::new();
421        for handle in handles {
422            expected_streams.push(handle.join().unwrap());
423        }
424        expected_streams.sort_unstable_by_key(|(stream_id, _)| *stream_id);
425
426        storage.flush().unwrap();
427        assert_eq!(storage.stream_ids(), vec![0, 1, 2]);
428
429        for (stream_id, expected) in expected_streams {
430            let mut reader = storage.open_reader(stream_id).unwrap();
431            let mut actual = String::new();
432            reader.read_to_string(&mut actual).unwrap();
433            assert_eq!(actual, expected);
434        }
435    }
436
437    #[test]
438    fn fragmented_stream_reads_preserve_exact_write_order() {
439        let storage = InterleavedTempFile::new_temp().unwrap();
440        let mut writer = storage.spawn_stream();
441        let stream_id = writer.stream_id();
442
443        let chunks = [
444            b"header".as_slice(),
445            b"|event-1|".as_slice(),
446            b"event-2|".as_slice(),
447            b"tail".as_slice(),
448        ];
449        let mut expected = Vec::new();
450        for chunk in chunks {
451            writer.write_all(chunk).unwrap();
452            expected.extend_from_slice(chunk);
453        }
454        writer.flush().unwrap();
455
456        let mut reader = storage.open_reader(stream_id).unwrap();
457        let mut actual = Vec::new();
458        reader.read_to_end(&mut actual).unwrap();
459        assert_eq!(actual, expected);
460    }
461
462    #[test]
463    fn unknown_stream_returns_error() {
464        let storage = InterleavedTempFile::new_temp().unwrap();
465        let err = storage.open_reader(99).unwrap_err();
466        assert!(matches!(
467            err,
468            VirtualFileError::UnknownStream { stream_id: 99 }
469        ));
470    }
471
472    #[test]
473    fn streams_can_be_read_in_parallel_after_writes_finish() {
474        let storage = InterleavedTempFile::new_temp().unwrap();
475        let mut streams = Vec::new();
476
477        for stream_index in 0..6 {
478            let mut writer = storage.spawn_stream();
479            let stream_id = writer.stream_id();
480            let mut expected = Vec::new();
481
482            for chunk_index in 0..32 {
483                let chunk = make_chunk(stream_index, chunk_index, 257);
484                writer.write_all(&chunk).unwrap();
485                expected.extend_from_slice(&chunk);
486            }
487            writer.flush().unwrap();
488            streams.push((stream_id, expected));
489        }
490
491        let mut handles = Vec::new();
492        for (stream_id, expected) in streams {
493            let storage = storage.clone();
494            handles.push(thread::spawn(move || {
495                let mut reader = storage.open_reader(stream_id).unwrap();
496                let mut actual = Vec::new();
497                reader.read_to_end(&mut actual).unwrap();
498                assert_eq!(actual, expected);
499            }));
500        }
501
502        for handle in handles {
503            handle.join().unwrap();
504        }
505    }
506
507    #[test]
508    #[ignore = "stress test: many tiny appends maximize lock and extent pressure"]
509    fn stress_many_small_chunks() {
510        run_stress_case(16, 20_000, 64);
511    }
512
513    #[test]
514    #[ignore = "stress test: larger chunks emphasize raw IO throughput"]
515    fn stress_large_chunks() {
516        run_stress_case(8, 512, 16 * 1024);
517    }
518}