1use std::{
2 collections::BTreeMap,
3 fs::{remove_file, File, OpenOptions},
4 io::{self, Read, Seek, SeekFrom, Write},
5 path::{Path, PathBuf},
6 process,
7 sync::{
8 atomic::{AtomicU64, Ordering},
9 Arc, Mutex,
10 },
11 time::{SystemTime, UNIX_EPOCH},
12};
13
14use super::VirtualFileError;
15
16static TEMP_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
17
18#[derive(Clone, Copy, Debug)]
19struct Extent {
20 offset: u64,
21 len: u64,
22}
23
24#[derive(Debug)]
25struct State {
26 writer: Option<File>,
27 next_offset: u64,
28 next_stream_id: u64,
29 streams: BTreeMap<u64, Vec<Extent>>,
30}
31
32#[derive(Debug)]
33struct Inner {
34 path: PathBuf,
35 delete_on_drop: bool,
36 state: Mutex<State>,
37}
38
39impl Inner {
40 fn append(&self, stream_id: u64, bytes: &[u8]) -> Result<usize, VirtualFileError> {
41 if bytes.is_empty() {
42 return Ok(0);
43 }
44
45 let mut state = self.state.lock().unwrap();
46 if !state.streams.contains_key(&stream_id) {
47 return Err(VirtualFileError::UnknownStream { stream_id });
48 }
49
50 let offset = state.next_offset;
51 {
52 let writer = state.writer.as_mut().ok_or_else(|| {
53 io::Error::new(io::ErrorKind::BrokenPipe, "backing file already closed")
54 })?;
55 writer.seek(SeekFrom::Start(offset))?;
56 writer.write_all(bytes)?;
57 }
58
59 state.next_offset += bytes.len() as u64;
60 state
61 .streams
62 .get_mut(&stream_id)
63 .expect("stream presence was checked above")
64 .push(Extent {
65 offset,
66 len: bytes.len() as u64,
67 });
68
69 Ok(bytes.len())
70 }
71
72 fn flush(&self) -> io::Result<()> {
73 let mut state = self.state.lock().unwrap();
74 match state.writer.as_mut() {
75 Some(writer) => writer.flush(),
76 None => Ok(()),
77 }
78 }
79}
80
81impl Drop for Inner {
82 fn drop(&mut self) {
83 if !self.delete_on_drop {
84 return;
85 }
86
87 let writer = self.state.lock().unwrap().writer.take();
88 drop(writer);
89 let _ = remove_file(&self.path);
90 }
91}
92
93fn create_unique_temp_file() -> io::Result<(File, PathBuf)> {
94 let base = std::env::temp_dir();
95 let pid = process::id();
96 let nanos = SystemTime::now()
97 .duration_since(UNIX_EPOCH)
98 .unwrap_or_default()
99 .as_nanos();
100
101 for _ in 0..32 {
102 let counter = TEMP_FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
103 let path = base.join(format!(
104 "midi-toolkit-interleaved-{pid}-{nanos}-{counter}.bin"
105 ));
106
107 let file = OpenOptions::new()
108 .read(true)
109 .write(true)
110 .create_new(true)
111 .open(&path);
112
113 match file {
114 Ok(file) => return Ok((file, path)),
115 Err(err) if err.kind() == io::ErrorKind::AlreadyExists => continue,
116 Err(err) => return Err(err),
117 }
118 }
119
120 Err(io::Error::new(
121 io::ErrorKind::AlreadyExists,
122 "failed to allocate a unique temporary file",
123 ))
124}
125
126#[derive(Clone, Debug)]
127pub struct InterleavedTempFile {
128 inner: Arc<Inner>,
129}
130
131impl InterleavedTempFile {
132 pub fn new_temp() -> Result<Self, VirtualFileError> {
133 let (file, path) = create_unique_temp_file()?;
134 Ok(Self::new_inner(file, path, true))
135 }
136
137 pub fn new_at_path(path: impl AsRef<Path>) -> Result<Self, VirtualFileError> {
138 let path = path.as_ref().to_path_buf();
139 let file = OpenOptions::new()
140 .read(true)
141 .write(true)
142 .create(true)
143 .truncate(true)
144 .open(&path)?;
145 Ok(Self::new_inner(file, path, false))
146 }
147
148 pub fn new_temp_at_path(path: impl AsRef<Path>) -> Result<Self, VirtualFileError> {
149 let path = path.as_ref().to_path_buf();
150 let file = OpenOptions::new()
151 .read(true)
152 .write(true)
153 .create_new(true)
154 .open(&path)?;
155 Ok(Self::new_inner(file, path, true))
156 }
157
158 fn new_inner(file: File, path: PathBuf, delete_on_drop: bool) -> Self {
159 Self {
160 inner: Arc::new(Inner {
161 path,
162 delete_on_drop,
163 state: Mutex::new(State {
164 writer: Some(file),
165 next_offset: 0,
166 next_stream_id: 0,
167 streams: BTreeMap::new(),
168 }),
169 }),
170 }
171 }
172
173 pub fn path(&self) -> &Path {
174 &self.inner.path
175 }
176
177 pub fn spawn_stream(&self) -> VirtualStreamWriter {
178 let mut state = self.inner.state.lock().unwrap();
179 let stream_id = state.next_stream_id;
180 state.next_stream_id += 1;
181 state.streams.insert(stream_id, Vec::new());
182
183 VirtualStreamWriter {
184 inner: Arc::clone(&self.inner),
185 stream_id,
186 }
187 }
188
189 pub fn stream_ids(&self) -> Vec<u64> {
190 self.inner
191 .state
192 .lock()
193 .unwrap()
194 .streams
195 .keys()
196 .copied()
197 .collect()
198 }
199
200 pub fn open_reader(&self, stream_id: u64) -> Result<VirtualStreamReader, VirtualFileError> {
201 let extents = self
202 .inner
203 .state
204 .lock()
205 .unwrap()
206 .streams
207 .get(&stream_id)
208 .cloned()
209 .ok_or(VirtualFileError::UnknownStream { stream_id })?;
210
211 let file = File::open(&self.inner.path)?;
212 Ok(VirtualStreamReader {
213 _inner: Arc::clone(&self.inner),
214 file,
215 extents,
216 extent_index: 0,
217 extent_offset: 0,
218 })
219 }
220
221 pub fn flush(&self) -> Result<(), VirtualFileError> {
222 Ok(self.inner.flush()?)
223 }
224
225 pub fn remove_backing_file(&self) -> Result<(), VirtualFileError> {
226 let writer = self.inner.state.lock().unwrap().writer.take();
227 drop(writer);
228
229 match remove_file(&self.inner.path) {
230 Ok(()) => Ok(()),
231 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(()),
232 Err(err) => Err(err.into()),
233 }
234 }
235}
236
237#[derive(Debug)]
238pub struct VirtualStreamWriter {
239 inner: Arc<Inner>,
240 stream_id: u64,
241}
242
243impl VirtualStreamWriter {
244 pub fn stream_id(&self) -> u64 {
245 self.stream_id
246 }
247
248 pub fn flush_to_disk(&self) -> Result<(), VirtualFileError> {
249 Ok(self.inner.flush()?)
250 }
251}
252
253impl Write for VirtualStreamWriter {
254 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
255 self.inner
256 .append(self.stream_id, buf)
257 .map_err(|err| match err {
258 VirtualFileError::FilesystemError(err) => err,
259 other => io::Error::new(io::ErrorKind::NotFound, other),
260 })
261 }
262
263 fn flush(&mut self) -> io::Result<()> {
264 self.inner.flush()
265 }
266}
267
268#[derive(Debug)]
269pub struct VirtualStreamReader {
270 _inner: Arc<Inner>,
271 file: File,
272 extents: Vec<Extent>,
273 extent_index: usize,
274 extent_offset: u64,
275}
276
277impl Read for VirtualStreamReader {
278 fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
279 let mut total = 0;
280
281 while !buf.is_empty() && self.extent_index < self.extents.len() {
282 let extent = self.extents[self.extent_index];
283 if self.extent_offset >= extent.len {
284 self.extent_index += 1;
285 self.extent_offset = 0;
286 continue;
287 }
288
289 let remaining = (extent.len - self.extent_offset) as usize;
290 let to_read = remaining.min(buf.len());
291 self.file
292 .seek(SeekFrom::Start(extent.offset + self.extent_offset))?;
293
294 let read = self.file.read(&mut buf[..to_read])?;
295 if read == 0 {
296 break;
297 }
298
299 total += read;
300 self.extent_offset += read as u64;
301
302 let (_, rest) = buf.split_at_mut(read);
303 buf = rest;
304 }
305
306 Ok(total)
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::{InterleavedTempFile, VirtualFileError};
313 use std::{
314 io::{Read, Write},
315 thread,
316 time::Instant,
317 };
318
319 fn assert_send<T: Send>() {}
320 fn assert_send_sync<T: Send + Sync>() {}
321
322 fn make_chunk(stream_index: usize, chunk_index: usize, chunk_len: usize) -> Vec<u8> {
323 (0..chunk_len)
324 .map(|byte_index| ((stream_index * 31 + chunk_index * 17 + byte_index) & 0xFF) as u8)
325 .collect()
326 }
327
328 fn run_stress_case(stream_count: usize, chunks_per_stream: usize, chunk_len: usize) {
329 let storage = InterleavedTempFile::new_temp().unwrap();
330 let total_bytes = stream_count * chunks_per_stream * chunk_len;
331
332 let write_started = Instant::now();
333 let mut handles = Vec::new();
334 for stream_index in 0..stream_count {
335 let mut writer = storage.spawn_stream();
336 handles.push(thread::spawn(move || {
337 let stream_id = writer.stream_id();
338 for chunk_index in 0..chunks_per_stream {
339 let chunk = make_chunk(stream_index, chunk_index, chunk_len);
340 writer.write_all(&chunk).unwrap();
341 }
342 writer.flush().unwrap();
343 stream_id
344 }));
345 }
346
347 let mut stream_ids = Vec::new();
348 for handle in handles {
349 stream_ids.push(handle.join().unwrap());
350 }
351 let write_elapsed = write_started.elapsed();
352
353 let read_started = Instant::now();
354 let mut read_handles = Vec::new();
355 for (stream_index, stream_id) in stream_ids.into_iter().enumerate() {
356 let storage = storage.clone();
357 read_handles.push(thread::spawn(move || {
358 let mut reader = storage.open_reader(stream_id).unwrap();
359 for chunk_index in 0..chunks_per_stream {
360 let expected = make_chunk(stream_index, chunk_index, chunk_len);
361 let mut actual = vec![0; chunk_len];
362 reader.read_exact(&mut actual).unwrap();
363 assert_eq!(actual, expected);
364 }
365
366 let mut tail = [0_u8; 1];
367 assert_eq!(reader.read(&mut tail).unwrap(), 0);
368 }));
369 }
370
371 for handle in read_handles {
372 handle.join().unwrap();
373 }
374 let read_elapsed = read_started.elapsed();
375
376 let total_mib = total_bytes as f64 / (1024.0 * 1024.0);
377 println!(
378 "stress_case streams={stream_count} chunks_per_stream={chunks_per_stream} chunk_len={chunk_len} total_bytes={total_bytes} write={write_elapsed:?} ({:.2} MiB/s) read={read_elapsed:?} ({:.2} MiB/s)",
379 total_mib / write_elapsed.as_secs_f64(),
380 total_mib / read_elapsed.as_secs_f64(),
381 );
382 }
383
384 #[test]
385 fn manager_is_send_and_sync() {
386 assert_send_sync::<InterleavedTempFile>();
387 }
388
389 #[test]
390 fn stream_writer_is_send() {
391 assert_send::<super::VirtualStreamWriter>();
392 }
393
394 #[test]
395 fn writes_can_be_interleaved_and_read_back_independently() {
396 let storage = InterleavedTempFile::new_temp().unwrap();
397
398 let mut handles = Vec::new();
399 for stream_index in 0..3 {
400 let mut writer = storage.spawn_stream();
401 handles.push(thread::spawn(move || {
402 let stream_id = writer.stream_id();
403 let parts = [
404 format!("stream-{stream_index}:alpha|"),
405 format!("stream-{stream_index}:beta|"),
406 format!("stream-{stream_index}:gamma"),
407 ];
408
409 let mut expected = String::new();
410 for part in parts {
411 writer.write_all(part.as_bytes()).unwrap();
412 expected.push_str(&part);
413 }
414
415 writer.flush().unwrap();
416 (stream_id, expected)
417 }));
418 }
419
420 let mut expected_streams = Vec::new();
421 for handle in handles {
422 expected_streams.push(handle.join().unwrap());
423 }
424 expected_streams.sort_unstable_by_key(|(stream_id, _)| *stream_id);
425
426 storage.flush().unwrap();
427 assert_eq!(storage.stream_ids(), vec![0, 1, 2]);
428
429 for (stream_id, expected) in expected_streams {
430 let mut reader = storage.open_reader(stream_id).unwrap();
431 let mut actual = String::new();
432 reader.read_to_string(&mut actual).unwrap();
433 assert_eq!(actual, expected);
434 }
435 }
436
437 #[test]
438 fn fragmented_stream_reads_preserve_exact_write_order() {
439 let storage = InterleavedTempFile::new_temp().unwrap();
440 let mut writer = storage.spawn_stream();
441 let stream_id = writer.stream_id();
442
443 let chunks = [
444 b"header".as_slice(),
445 b"|event-1|".as_slice(),
446 b"event-2|".as_slice(),
447 b"tail".as_slice(),
448 ];
449 let mut expected = Vec::new();
450 for chunk in chunks {
451 writer.write_all(chunk).unwrap();
452 expected.extend_from_slice(chunk);
453 }
454 writer.flush().unwrap();
455
456 let mut reader = storage.open_reader(stream_id).unwrap();
457 let mut actual = Vec::new();
458 reader.read_to_end(&mut actual).unwrap();
459 assert_eq!(actual, expected);
460 }
461
462 #[test]
463 fn unknown_stream_returns_error() {
464 let storage = InterleavedTempFile::new_temp().unwrap();
465 let err = storage.open_reader(99).unwrap_err();
466 assert!(matches!(
467 err,
468 VirtualFileError::UnknownStream { stream_id: 99 }
469 ));
470 }
471
472 #[test]
473 fn streams_can_be_read_in_parallel_after_writes_finish() {
474 let storage = InterleavedTempFile::new_temp().unwrap();
475 let mut streams = Vec::new();
476
477 for stream_index in 0..6 {
478 let mut writer = storage.spawn_stream();
479 let stream_id = writer.stream_id();
480 let mut expected = Vec::new();
481
482 for chunk_index in 0..32 {
483 let chunk = make_chunk(stream_index, chunk_index, 257);
484 writer.write_all(&chunk).unwrap();
485 expected.extend_from_slice(&chunk);
486 }
487 writer.flush().unwrap();
488 streams.push((stream_id, expected));
489 }
490
491 let mut handles = Vec::new();
492 for (stream_id, expected) in streams {
493 let storage = storage.clone();
494 handles.push(thread::spawn(move || {
495 let mut reader = storage.open_reader(stream_id).unwrap();
496 let mut actual = Vec::new();
497 reader.read_to_end(&mut actual).unwrap();
498 assert_eq!(actual, expected);
499 }));
500 }
501
502 for handle in handles {
503 handle.join().unwrap();
504 }
505 }
506
507 #[test]
508 #[ignore = "stress test: many tiny appends maximize lock and extent pressure"]
509 fn stress_many_small_chunks() {
510 run_stress_case(16, 20_000, 64);
511 }
512
513 #[test]
514 #[ignore = "stress test: larger chunks emphasize raw IO throughput"]
515 fn stress_large_chunks() {
516 run_stress_case(8, 512, 16 * 1024);
517 }
518}