1extern crate alloc;
10
11use crate::buf::{Buffer, FixedBuf, RoundU8Buffer};
12use crate::codec::{encode_integer, DecodeVByte};
13use crate::IntegerValue;
14use alloc::collections::BTreeMap;
15use alloc::sync::Arc;
16use core::sync::atomic::{AtomicU32, AtomicU8, Ordering};
17use irox_bits::{
18 Bits, BitsErrorKind, BufBits, Error, MutBits, Seek, SeekFrom, SeekRead, SeekWrite,
19};
20use std::fs::{File, OpenOptions};
21use std::io::Write;
22use std::path::Path;
23use std::sync::Mutex;
24
25pub const DEFAULT_BLOCK_SIZE: usize = 16 * 1024; pub const DATA_SIZE: usize = DEFAULT_BLOCK_SIZE - 4;
27pub const HEADER: &[u8] = b"IRXMSB";
28
29macro_rules! broken_pipe {
30 () => {
31 Err(Error::new(
32 BitsErrorKind::BrokenPipe,
33 "Error: Lock poisoned",
34 ))
35 };
36}
37
38#[derive(Clone)]
41pub struct MultiStreamWriter {
42 inner: Arc<Mutex<File>>,
43 num_streams: Arc<AtomicU8>,
44 current_block: Arc<AtomicU32>,
45 stream_first_blocks: Arc<Mutex<BTreeMap<u8, u32>>>,
46 stream_latest_blocks: Arc<Mutex<BTreeMap<u8, u32>>>,
47}
48impl MultiStreamWriter {
49 pub fn new<P: AsRef<Path>>(path: P) -> Result<Arc<MultiStreamWriter>, Error> {
53 let inner = OpenOptions::new()
54 .create(true)
55 .truncate(true)
56 .write(true)
57 .append(false)
58 .open(path.as_ref())?;
59
60 Ok(Arc::new(MultiStreamWriter {
61 inner: Arc::new(Mutex::new(inner)),
62 num_streams: Arc::new(AtomicU8::new(1)),
63 current_block: Arc::new(AtomicU32::new(1)),
64 stream_first_blocks: Arc::new(Mutex::new(Default::default())),
65 stream_latest_blocks: Arc::new(Mutex::new(Default::default())),
66 }))
67 }
68 pub fn new_stream(self: &Arc<Self>) -> StreamWriter {
71 let idx = self.num_streams.fetch_add(1, Ordering::AcqRel);
72 StreamWriter::new(self.clone(), idx)
73 }
74
75 pub(crate) fn write_block(&self, stream_idx: u8, block: &[u8; DATA_SIZE]) -> Result<(), Error> {
76 let block_idx = self.current_block.fetch_add(1, Ordering::AcqRel);
77 {
78 let Ok(mut lock) = self.stream_first_blocks.lock() else {
79 return broken_pipe!();
80 };
81 let stream_first_entry = lock.entry(stream_idx).or_insert(block_idx);
82 if *stream_first_entry == block_idx {
83 let mut header = [0u8; DEFAULT_BLOCK_SIZE];
84 let mut hdr = header.as_mut_slice();
85 hdr.write_all(HEADER)?;
86 for (k, v) in lock.iter() {
87 hdr.write_u8(*k)?;
88 hdr.write_all(&encode_integer(IntegerValue::U32(*v)))?;
89 }
90 drop(lock);
91 let Ok(mut lock) = self.inner.lock() else {
92 return broken_pipe!();
93 };
94 lock.seek_write_all(&header, 0)?;
95 }
96 }
97 let offset = block_idx as u64 * DEFAULT_BLOCK_SIZE as u64;
98 let Ok(mut lock) = self.inner.lock() else {
99 return broken_pipe!();
100 };
101 lock.seek_write_all(block, offset)?;
102 let Ok(mut l2) = self.stream_latest_blocks.lock() else {
103 return broken_pipe!();
104 };
105 lock.seek_write_all(&[0, 0, 0, 0], block.len() as u64 + offset)?;
106 let last_block_idx = l2.entry(stream_idx).or_insert(block_idx);
107 if *last_block_idx != block_idx {
108 let offset = *last_block_idx as u64 * DEFAULT_BLOCK_SIZE as u64 + DATA_SIZE as u64;
109 let byts = block_idx.to_be_bytes();
111 lock.seek_write_all(&byts, offset)?;
112 *last_block_idx = block_idx;
113 }
114
115 Ok(())
116 }
117
118 pub fn len(&self) -> Result<u64, Error> {
119 if let Ok(lock) = self.inner.lock() {
120 return Ok(lock.metadata()?.len());
121 }
122 broken_pipe!()
123 }
124 pub fn is_empty(&self) -> Result<bool, Error> {
125 Ok(self.len()? == 0)
126 }
127}
128
129pub struct StreamWriter {
132 parent: Arc<MultiStreamWriter>,
133 buf: FixedBuf<DATA_SIZE, u8>,
134 stream_idx: u8,
135}
136impl StreamWriter {
137 pub(crate) fn new(parent: Arc<MultiStreamWriter>, stream_idx: u8) -> StreamWriter {
138 let mut buf = FixedBuf::default();
139 let _ = buf.write_be_u16(0);
140 StreamWriter {
141 parent,
142 buf,
143 stream_idx,
144 }
145 }
146}
147
148impl MutBits for StreamWriter {
149 fn write_u8(&mut self, val: u8) -> Result<(), Error> {
150 if self.buf.is_full() {
151 let v = &self.buf.into_buf_default();
152 self.parent.write_block(self.stream_idx, v)?;
153 self.buf.write_be_u16(0x0)?;
154 }
155 self.buf.write_u8(val)
156 }
157}
158impl Drop for StreamWriter {
159 fn drop(&mut self) {
160 if !self.buf.is_empty() {
161 let len = (self.buf.len() as u16).saturating_sub(2);
162 let v = &mut self.buf.into_buf_default();
163 let _ = v.as_mut_slice().write_be_u16(len);
164 let _ = self.parent.write_block(self.stream_idx, v);
165 }
166 }
167}
168
169#[derive(Clone)]
172pub struct MultiStreamReader {
173 inner: Arc<Mutex<File>>,
174 stream_next_block: Arc<Mutex<BTreeMap<u8, u32>>>,
175}
176impl MultiStreamReader {
177 pub fn open<P: AsRef<Path>>(path: P) -> Result<Vec<StreamReader>, Error> {
179 let mut inner = OpenOptions::new().read(true).open(path.as_ref())?;
180 let mut header_buf = [0u8; DEFAULT_BLOCK_SIZE];
181 inner.seek_read_all(&mut header_buf, 0)?;
182 let (magic, mut data) = header_buf.split_at(HEADER.len());
183 if magic != HEADER {
184 return Err(BitsErrorKind::InvalidData.into());
185 }
186
187 let mut stream_next_block = BTreeMap::<u8, u32>::new();
188 let mut expected_stream_idx = 1;
189 while let Some(read_idx) = data.next_u8()? {
190 if read_idx == 0 {
191 break;
193 }
194 if read_idx != expected_stream_idx {
195 return Err(BitsErrorKind::InvalidData.into());
196 }
197 let start_block = data.decode_vbyte()? as u32;
198 stream_next_block.insert(read_idx, start_block);
199 expected_stream_idx += 1;
200 }
201 let stream_ids = stream_next_block.keys().copied().collect::<Vec<_>>();
202 let parent = Arc::new(MultiStreamReader {
203 inner: Arc::new(Mutex::new(inner)),
204 stream_next_block: Arc::new(Mutex::new(stream_next_block)),
205 });
206 let mut out = Vec::<StreamReader>::new();
207 for k in stream_ids {
208 out.push(StreamReader::new(parent.clone(), k));
209 }
210
211 Ok(out)
212 }
213 pub(crate) fn read_next_block(
214 &self,
215 stream_idx: u8,
216 buf: &mut RoundU8Buffer<DATA_SIZE>,
217 ) -> Result<(), Error> {
218 let block_idx = {
219 let Ok(lock) = self.stream_next_block.lock() else {
220 return broken_pipe!();
221 };
222 let Some(v) = lock.get(&stream_idx) else {
223 return Ok(());
224 };
225 *v
226 };
227 if block_idx == 0 {
228 return Ok(());
229 }
230 let next_idx = {
231 let Ok(mut lock) = self.inner.lock() else {
232 return broken_pipe!();
233 };
234 let offset = block_idx as u64 * DEFAULT_BLOCK_SIZE as u64;
235 buf.as_ref_mut(|_, buf| {
236 lock.seek_read_all(buf, offset)?;
237 Ok(buf.len())
238 })?;
239 lock.seek(SeekFrom::Start(offset + buf.len() as u64))?;
240 lock.read_be_u32()?
241 };
242 let Ok(mut lock) = self.stream_next_block.lock() else {
243 return broken_pipe!();
244 };
245 lock.insert(stream_idx, next_idx);
246 Ok(())
247 }
248}
249pub struct StreamReader {
250 parent: Arc<MultiStreamReader>,
251 stream_idx: u8,
252 buf: RoundU8Buffer<DATA_SIZE>,
253 stream_counter: u64,
254}
255impl StreamReader {
256 pub fn new(parent: Arc<MultiStreamReader>, stream_idx: u8) -> StreamReader {
257 StreamReader {
258 stream_idx,
259 parent,
260 buf: RoundU8Buffer::default(),
261 stream_counter: 0,
262 }
263 }
264 pub fn stream_position(&self) -> u64 {
265 self.stream_counter
266 }
267
268 fn try_fill_buffer(&mut self) -> Result<usize, Error> {
269 if self.buf.is_empty() {
270 self.parent
271 .read_next_block(self.stream_idx, &mut self.buf)?;
272 if self.buf.is_empty() {
273 return Ok(0);
274 }
275 let lim = self.buf.read_be_u16()?;
276 if lim > 0 {
277 self.buf.limit(lim as usize)?;
278 }
279 return Ok(self.buf.len());
280 }
281 Ok(0)
282 }
283
284 pub fn has_more(&mut self) -> Result<bool, Error> {
285 self.try_fill_buffer()?;
286 Ok(!self.buf.is_empty())
287 }
288}
289impl Bits for StreamReader {
290 fn next_u8(&mut self) -> Result<Option<u8>, Error> {
291 if self.buf.is_empty() && self.try_fill_buffer()? == 0 {
292 return Ok(None);
293 }
294
295 self.stream_counter += 1;
296 Ok(self.buf.pop_front())
297 }
298}
299impl BufBits for StreamReader {
300 fn fill_buf(&mut self) -> Result<&[u8], Error> {
301 if self.buf.is_empty() {
302 let added = self.try_fill_buffer()?;
303 if added == 0 {
304 return Ok(&[]);
305 }
306 }
307 let (a, b) = self.buf.as_ref_used();
308 if a.is_empty() {
309 Ok(b)
310 } else {
311 Ok(a)
312 }
313 }
314
315 fn consume(&mut self, amt: usize) {
316 self.buf.consume(amt)
317 }
318}
319
320#[cfg(all(test, feature = "std"))]
321mod test {
322 use crate::read::{MultiStreamReader, MultiStreamWriter, StreamReader, DATA_SIZE};
323 use alloc::sync::Arc;
324 use irox_bits::{Bits, Error, MutBits};
325 use std::thread::JoinHandle;
326 use std::time::Instant;
327
328 const NUM_BLOCKS: usize = 100_000;
329
330 fn spawn_writer_stream(ms: &Arc<MultiStreamWriter>, value: u8) -> JoinHandle<()> {
331 let mut stream = ms.new_stream();
332 std::thread::spawn(move || {
333 let num_blocks = NUM_BLOCKS;
334 let count = num_blocks * DATA_SIZE - 100;
335 for _ in 0..count {
336 stream.write_u8(value).unwrap();
337 }
338 })
339 }
340
341 #[test]
342 #[ignore]
343 pub fn test_write() -> Result<(), Error> {
344 let ms = MultiStreamWriter::new("./test_multistream.ms")?;
345 let ms = Arc::new(ms);
346 let start = Instant::now();
347 let mut handles = vec![
348 spawn_writer_stream(&ms, 0xA),
349 spawn_writer_stream(&ms, 0x9),
350 spawn_writer_stream(&ms, 0xF),
351 spawn_writer_stream(&ms, 0x5),
352 spawn_writer_stream(&ms, 0x3),
353 spawn_writer_stream(&ms, 0x2),
354 ];
355
356 handles.drain(..).for_each(|h| h.join().unwrap());
357
358 let end = start.elapsed();
359 let len = NUM_BLOCKS as u64 * DATA_SIZE as u64 * 6 - 600;
360 let bs = len as f64 / end.as_secs_f64();
361 let mbs = bs / 1e6;
362 let lmb = len as f64 / 1e6;
363 println!("Wrote {lmb} MB in {end:?} = {mbs:02.02} MB/s");
364 Ok(())
365 }
366
367 fn spawn_reader_stream(mut stream: StreamReader, value: u8) -> JoinHandle<()> {
368 std::thread::spawn(move || {
369 let num_blocks = NUM_BLOCKS;
370 let count = num_blocks * DATA_SIZE - 100;
371 for _ in 0..count {
372 let len = stream.stream_position();
373 assert_eq!(value, stream.read_u8().unwrap(), "at position {len}");
374 }
375 })
376 }
377
378 #[test]
379 #[ignore]
380 pub fn test_read() -> Result<(), Error> {
381 let mut streams = MultiStreamReader::open("./test_multistream.ms")?;
382 assert_eq!(streams.len(), 6);
383
384 let start = Instant::now();
385
386 let mut drain = streams.drain(..);
387 let mut handles = vec![
388 spawn_reader_stream(drain.next().unwrap(), 0xA),
389 spawn_reader_stream(drain.next().unwrap(), 0x9),
390 spawn_reader_stream(drain.next().unwrap(), 0xF),
391 spawn_reader_stream(drain.next().unwrap(), 0x5),
392 spawn_reader_stream(drain.next().unwrap(), 0x3),
393 spawn_reader_stream(drain.next().unwrap(), 0x2),
394 ];
395
396 handles.drain(..).for_each(|h| h.join().unwrap());
397
398 let end = start.elapsed();
399 let len = NUM_BLOCKS as u64 * DATA_SIZE as u64 * 6 - 600;
400 let bs = len as f64 / end.as_secs_f64();
401 let mbs = bs / 1e6;
402 let lmb = len as f64 / 1e6;
403 println!("Read {lmb} MB in {end:?} = {mbs:02.02} MB/s");
404 Ok(())
405 }
406}