irox_tools/read/
multi_stream.rs

1// SPDX-License-Identifier: MIT
2// Copyright 2025 IROX Contributors
3//
4
5//!
6//! A Multi-Stream File is a file that allows multiple interlaced data streams within it.  Like
7//! databases, pagefiles, and the ilk.  Each stream is essentially a linked list of pages, where
8//! the last 4 bytes of the last page point to the next page index.
9extern crate alloc;
10
11use crate::buf::{Buffer, FixedBuf, RoundU8Buffer};
12use crate::codec::{encode_integer, DecodeVByte};
13use crate::IntegerValue;
14use alloc::collections::BTreeMap;
15use alloc::sync::Arc;
16use core::sync::atomic::{AtomicU32, AtomicU8, Ordering};
17use irox_bits::{
18    Bits, BitsErrorKind, BufBits, Error, MutBits, Seek, SeekFrom, SeekRead, SeekWrite,
19};
20use std::fs::{File, OpenOptions};
21use std::io::Write;
22use std::path::Path;
23use std::sync::Mutex;
24
25pub const DEFAULT_BLOCK_SIZE: usize = 16 * 1024; // 16K
26pub const DATA_SIZE: usize = DEFAULT_BLOCK_SIZE - 4;
27pub const HEADER: &[u8] = b"IRXMSB";
28
29macro_rules! broken_pipe {
30    () => {
31        Err(Error::new(
32            BitsErrorKind::BrokenPipe,
33            "Error: Lock poisoned",
34        ))
35    };
36}
37
38///
39/// Writer for a Multi-Stream File.
40#[derive(Clone)]
41pub struct MultiStreamWriter {
42    inner: Arc<Mutex<File>>,
43    num_streams: Arc<AtomicU8>,
44    current_block: Arc<AtomicU32>,
45    stream_first_blocks: Arc<Mutex<BTreeMap<u8, u32>>>,
46    stream_latest_blocks: Arc<Mutex<BTreeMap<u8, u32>>>,
47}
48impl MultiStreamWriter {
49    ///
50    /// Creates a new writer against the provided path.  If the file exists, will be truncated and
51    /// any data removed.  If it doesn't exist, it will be created.
52    pub fn new<P: AsRef<Path>>(path: P) -> Result<Arc<MultiStreamWriter>, Error> {
53        let inner = OpenOptions::new()
54            .create(true)
55            .truncate(true)
56            .write(true)
57            .append(false)
58            .open(path.as_ref())?;
59
60        Ok(Arc::new(MultiStreamWriter {
61            inner: Arc::new(Mutex::new(inner)),
62            num_streams: Arc::new(AtomicU8::new(1)),
63            current_block: Arc::new(AtomicU32::new(1)),
64            stream_first_blocks: Arc::new(Mutex::new(Default::default())),
65            stream_latest_blocks: Arc::new(Mutex::new(Default::default())),
66        }))
67    }
68    ///
69    /// Creates a new buffered data stream within this writer.
70    pub fn new_stream(self: &Arc<Self>) -> StreamWriter {
71        let idx = self.num_streams.fetch_add(1, Ordering::AcqRel);
72        StreamWriter::new(self.clone(), idx)
73    }
74
75    pub(crate) fn write_block(&self, stream_idx: u8, block: &[u8; DATA_SIZE]) -> Result<(), Error> {
76        let block_idx = self.current_block.fetch_add(1, Ordering::AcqRel);
77        {
78            let Ok(mut lock) = self.stream_first_blocks.lock() else {
79                return broken_pipe!();
80            };
81            let stream_first_entry = lock.entry(stream_idx).or_insert(block_idx);
82            if *stream_first_entry == block_idx {
83                let mut header = [0u8; DEFAULT_BLOCK_SIZE];
84                let mut hdr = header.as_mut_slice();
85                hdr.write_all(HEADER)?;
86                for (k, v) in lock.iter() {
87                    hdr.write_u8(*k)?;
88                    hdr.write_all(&encode_integer(IntegerValue::U32(*v)))?;
89                }
90                drop(lock);
91                let Ok(mut lock) = self.inner.lock() else {
92                    return broken_pipe!();
93                };
94                lock.seek_write_all(&header, 0)?;
95            }
96        }
97        let offset = block_idx as u64 * DEFAULT_BLOCK_SIZE as u64;
98        let Ok(mut lock) = self.inner.lock() else {
99            return broken_pipe!();
100        };
101        lock.seek_write_all(block, offset)?;
102        let Ok(mut l2) = self.stream_latest_blocks.lock() else {
103            return broken_pipe!();
104        };
105        lock.seek_write_all(&[0, 0, 0, 0], block.len() as u64 + offset)?;
106        let last_block_idx = l2.entry(stream_idx).or_insert(block_idx);
107        if *last_block_idx != block_idx {
108            let offset = *last_block_idx as u64 * DEFAULT_BLOCK_SIZE as u64 + DATA_SIZE as u64;
109            // println!("Updating {last_block_idx} at offset {offset:0X} to be new block {block_idx}");
110            let byts = block_idx.to_be_bytes();
111            lock.seek_write_all(&byts, offset)?;
112            *last_block_idx = block_idx;
113        }
114
115        Ok(())
116    }
117
118    pub fn len(&self) -> Result<u64, Error> {
119        if let Ok(lock) = self.inner.lock() {
120            return Ok(lock.metadata()?.len());
121        }
122        broken_pipe!()
123    }
124    pub fn is_empty(&self) -> Result<bool, Error> {
125        Ok(self.len()? == 0)
126    }
127}
128
129///
130/// A buffered writer for a single data stream.
131pub struct StreamWriter {
132    parent: Arc<MultiStreamWriter>,
133    buf: FixedBuf<DATA_SIZE, u8>,
134    stream_idx: u8,
135}
136impl StreamWriter {
137    pub(crate) fn new(parent: Arc<MultiStreamWriter>, stream_idx: u8) -> StreamWriter {
138        let mut buf = FixedBuf::default();
139        let _ = buf.write_be_u16(0);
140        StreamWriter {
141            parent,
142            buf,
143            stream_idx,
144        }
145    }
146}
147
148impl MutBits for StreamWriter {
149    fn write_u8(&mut self, val: u8) -> Result<(), Error> {
150        if self.buf.is_full() {
151            let v = &self.buf.into_buf_default();
152            self.parent.write_block(self.stream_idx, v)?;
153            self.buf.write_be_u16(0x0)?;
154        }
155        self.buf.write_u8(val)
156    }
157}
158impl Drop for StreamWriter {
159    fn drop(&mut self) {
160        if !self.buf.is_empty() {
161            let len = (self.buf.len() as u16).saturating_sub(2);
162            let v = &mut self.buf.into_buf_default();
163            let _ = v.as_mut_slice().write_be_u16(len);
164            let _ = self.parent.write_block(self.stream_idx, v);
165        }
166    }
167}
168
169///
170/// Reader for a multi-stream file.
171#[derive(Clone)]
172pub struct MultiStreamReader {
173    inner: Arc<Mutex<File>>,
174    stream_next_block: Arc<Mutex<BTreeMap<u8, u32>>>,
175}
176impl MultiStreamReader {
177    /// Opens a multi-stream file and returns readers for every stream contained therein.
178    pub fn open<P: AsRef<Path>>(path: P) -> Result<Vec<StreamReader>, Error> {
179        let mut inner = OpenOptions::new().read(true).open(path.as_ref())?;
180        let mut header_buf = [0u8; DEFAULT_BLOCK_SIZE];
181        inner.seek_read_all(&mut header_buf, 0)?;
182        let (magic, mut data) = header_buf.split_at(HEADER.len());
183        if magic != HEADER {
184            return Err(BitsErrorKind::InvalidData.into());
185        }
186
187        let mut stream_next_block = BTreeMap::<u8, u32>::new();
188        let mut expected_stream_idx = 1;
189        while let Some(read_idx) = data.next_u8()? {
190            if read_idx == 0 {
191                // 0'th idx indicates EOL
192                break;
193            }
194            if read_idx != expected_stream_idx {
195                return Err(BitsErrorKind::InvalidData.into());
196            }
197            let start_block = data.decode_vbyte()? as u32;
198            stream_next_block.insert(read_idx, start_block);
199            expected_stream_idx += 1;
200        }
201        let stream_ids = stream_next_block.keys().copied().collect::<Vec<_>>();
202        let parent = Arc::new(MultiStreamReader {
203            inner: Arc::new(Mutex::new(inner)),
204            stream_next_block: Arc::new(Mutex::new(stream_next_block)),
205        });
206        let mut out = Vec::<StreamReader>::new();
207        for k in stream_ids {
208            out.push(StreamReader::new(parent.clone(), k));
209        }
210
211        Ok(out)
212    }
213    pub(crate) fn read_next_block(
214        &self,
215        stream_idx: u8,
216        buf: &mut RoundU8Buffer<DATA_SIZE>,
217    ) -> Result<(), Error> {
218        let block_idx = {
219            let Ok(lock) = self.stream_next_block.lock() else {
220                return broken_pipe!();
221            };
222            let Some(v) = lock.get(&stream_idx) else {
223                return Ok(());
224            };
225            *v
226        };
227        if block_idx == 0 {
228            return Ok(());
229        }
230        let next_idx = {
231            let Ok(mut lock) = self.inner.lock() else {
232                return broken_pipe!();
233            };
234            let offset = block_idx as u64 * DEFAULT_BLOCK_SIZE as u64;
235            buf.as_ref_mut(|_, buf| {
236                lock.seek_read_all(buf, offset)?;
237                Ok(buf.len())
238            })?;
239            lock.seek(SeekFrom::Start(offset + buf.len() as u64))?;
240            lock.read_be_u32()?
241        };
242        let Ok(mut lock) = self.stream_next_block.lock() else {
243            return broken_pipe!();
244        };
245        lock.insert(stream_idx, next_idx);
246        Ok(())
247    }
248}
249pub struct StreamReader {
250    parent: Arc<MultiStreamReader>,
251    stream_idx: u8,
252    buf: RoundU8Buffer<DATA_SIZE>,
253    stream_counter: u64,
254}
255impl StreamReader {
256    pub fn new(parent: Arc<MultiStreamReader>, stream_idx: u8) -> StreamReader {
257        StreamReader {
258            stream_idx,
259            parent,
260            buf: RoundU8Buffer::default(),
261            stream_counter: 0,
262        }
263    }
264    pub fn stream_position(&self) -> u64 {
265        self.stream_counter
266    }
267
268    fn try_fill_buffer(&mut self) -> Result<usize, Error> {
269        if self.buf.is_empty() {
270            self.parent
271                .read_next_block(self.stream_idx, &mut self.buf)?;
272            if self.buf.is_empty() {
273                return Ok(0);
274            }
275            let lim = self.buf.read_be_u16()?;
276            if lim > 0 {
277                self.buf.limit(lim as usize)?;
278            }
279            return Ok(self.buf.len());
280        }
281        Ok(0)
282    }
283
284    pub fn has_more(&mut self) -> Result<bool, Error> {
285        self.try_fill_buffer()?;
286        Ok(!self.buf.is_empty())
287    }
288}
289impl Bits for StreamReader {
290    fn next_u8(&mut self) -> Result<Option<u8>, Error> {
291        if self.buf.is_empty() && self.try_fill_buffer()? == 0 {
292            return Ok(None);
293        }
294
295        self.stream_counter += 1;
296        Ok(self.buf.pop_front())
297    }
298}
299impl BufBits for StreamReader {
300    fn fill_buf(&mut self) -> Result<&[u8], Error> {
301        if self.buf.is_empty() {
302            let added = self.try_fill_buffer()?;
303            if added == 0 {
304                return Ok(&[]);
305            }
306        }
307        let (a, b) = self.buf.as_ref_used();
308        if a.is_empty() {
309            Ok(b)
310        } else {
311            Ok(a)
312        }
313    }
314
315    fn consume(&mut self, amt: usize) {
316        self.buf.consume(amt)
317    }
318}
319
320#[cfg(all(test, feature = "std"))]
321mod test {
322    use crate::read::{MultiStreamReader, MultiStreamWriter, StreamReader, DATA_SIZE};
323    use alloc::sync::Arc;
324    use irox_bits::{Bits, Error, MutBits};
325    use std::thread::JoinHandle;
326    use std::time::Instant;
327
328    const NUM_BLOCKS: usize = 100_000;
329
330    fn spawn_writer_stream(ms: &Arc<MultiStreamWriter>, value: u8) -> JoinHandle<()> {
331        let mut stream = ms.new_stream();
332        std::thread::spawn(move || {
333            let num_blocks = NUM_BLOCKS;
334            let count = num_blocks * DATA_SIZE - 100;
335            for _ in 0..count {
336                stream.write_u8(value).unwrap();
337            }
338        })
339    }
340
341    #[test]
342    #[ignore]
343    pub fn test_write() -> Result<(), Error> {
344        let ms = MultiStreamWriter::new("./test_multistream.ms")?;
345        let ms = Arc::new(ms);
346        let start = Instant::now();
347        let mut handles = vec![
348            spawn_writer_stream(&ms, 0xA),
349            spawn_writer_stream(&ms, 0x9),
350            spawn_writer_stream(&ms, 0xF),
351            spawn_writer_stream(&ms, 0x5),
352            spawn_writer_stream(&ms, 0x3),
353            spawn_writer_stream(&ms, 0x2),
354        ];
355
356        handles.drain(..).for_each(|h| h.join().unwrap());
357
358        let end = start.elapsed();
359        let len = NUM_BLOCKS as u64 * DATA_SIZE as u64 * 6 - 600;
360        let bs = len as f64 / end.as_secs_f64();
361        let mbs = bs / 1e6;
362        let lmb = len as f64 / 1e6;
363        println!("Wrote {lmb} MB in {end:?} = {mbs:02.02} MB/s");
364        Ok(())
365    }
366
367    fn spawn_reader_stream(mut stream: StreamReader, value: u8) -> JoinHandle<()> {
368        std::thread::spawn(move || {
369            let num_blocks = NUM_BLOCKS;
370            let count = num_blocks * DATA_SIZE - 100;
371            for _ in 0..count {
372                let len = stream.stream_position();
373                assert_eq!(value, stream.read_u8().unwrap(), "at position {len}");
374            }
375        })
376    }
377
378    #[test]
379    #[ignore]
380    pub fn test_read() -> Result<(), Error> {
381        let mut streams = MultiStreamReader::open("./test_multistream.ms")?;
382        assert_eq!(streams.len(), 6);
383
384        let start = Instant::now();
385
386        let mut drain = streams.drain(..);
387        let mut handles = vec![
388            spawn_reader_stream(drain.next().unwrap(), 0xA),
389            spawn_reader_stream(drain.next().unwrap(), 0x9),
390            spawn_reader_stream(drain.next().unwrap(), 0xF),
391            spawn_reader_stream(drain.next().unwrap(), 0x5),
392            spawn_reader_stream(drain.next().unwrap(), 0x3),
393            spawn_reader_stream(drain.next().unwrap(), 0x2),
394        ];
395
396        handles.drain(..).for_each(|h| h.join().unwrap());
397
398        let end = start.elapsed();
399        let len = NUM_BLOCKS as u64 * DATA_SIZE as u64 * 6 - 600;
400        let bs = len as f64 / end.as_secs_f64();
401        let mbs = bs / 1e6;
402        let lmb = len as f64 / 1e6;
403        println!("Read {lmb} MB in {end:?} = {mbs:02.02} MB/s");
404        Ok(())
405    }
406}