1extern crate alloc;
9use crate::cfg_feature_miniz;
10use alloc::boxed::Box;
11use alloc::collections::BTreeMap;
12use alloc::format;
13use alloc::string::{String, ToString};
14use alloc::vec::Vec;
15use core::fmt::UpperHex;
16use core::ops::{Add, BitXor, DerefMut, Sub};
17use irox_bits::{BitsWrapper, Error, MutBits, SharedROCounter, WriteToBEBits};
18use irox_tools::codec::{EncodeVByteTo, ZagZig, ZigZag};
19use irox_tools::{ToSigned, ToUnsigned};
20use irox_types::{AnyUnsignedInteger, NumberSigned};
21
22pub trait Stream<T> {
23 fn write_value(&mut self, value: T) -> Result<usize, Error>;
24 fn flush(&mut self) -> Result<(), Error> {
25 Ok(())
26 }
27 fn written_stats(&self) -> String {
28 String::new()
29 }
30}
31pub trait Decoder<T> {
32 fn next(&mut self) -> Result<Option<T>, Error>;
33}
34
35pub trait Streamable: Sized + Default + Copy + WriteToBEBits {}
36impl<T> Streamable for T where T: Sized + Default + Copy + WriteToBEBits {}
37pub trait StreamableVByte:
38 Sized + Default + Copy + Sub<Output: EncodeVByteTo + UpperHex> + EncodeVByteTo + WriteToBEBits + Sub
39{
40}
41impl<T> StreamableVByte for T where
42 T: Sized
43 + Default
44 + Copy
45 + Sub<Output: EncodeVByteTo + UpperHex>
46 + EncodeVByteTo
47 + WriteToBEBits
48 + Sub
49{
50}
51pub trait ValueOperation<'a, T> {
52 fn encode(&'a mut self, value: &T) -> Result<T, Error>;
53}
54pub struct CompositeStream<'a, T: Streamable, B: MutBits> {
55 writer: &'a mut B,
56 operations: Vec<Box<dyn ValueOperation<'a, T>>>,
57}
58impl<'a, T: Streamable, B: MutBits> CompositeStream<'a, T, B> {
59 pub fn new(writer: &'a mut B) -> CompositeStream<'a, T, B> {
60 Self {
61 writer,
62 operations: Vec::new(),
63 }
64 }
65 pub fn and_then<V: ValueOperation<'a, T> + 'static>(&mut self, value: Box<V>) {
66 self.operations.push(value);
67 }
68 pub fn write_value(&'a mut self, value: T) -> Result<usize, Error> {
69 let mut v = value;
70 for op in &mut self.operations {
71 v = op.encode(&v)?;
72 }
73 WriteToBEBits::write_be_to(&value, self.writer)
74 }
75}
76
77pub struct DeltaOperation<T> {
78 last_value: T,
79}
80impl<'a, T: Sub<T, Output = T> + Copy> ValueOperation<'a, T> for DeltaOperation<T> {
81 fn encode(&'a mut self, value: &T) -> Result<T, Error> {
82 let out = *value - self.last_value;
83 self.last_value = out;
84 Ok(out)
85 }
86}
87pub struct VByteOperation;
88impl<'a, T: Sub<T, Output = T> + Copy> ValueOperation<'a, T> for VByteOperation {
89 fn encode(&'a mut self, _value: &T) -> Result<T, Error> {
90 todo!()
91 }
92}
93
94pub struct DeltaStream<T: Streamable> {
98 last_value: T,
99 writer: Box<dyn Stream<T>>,
100}
101
102impl<T: Streamable> DeltaStream<T> {
103 pub fn new(writer: Box<dyn Stream<T>>) -> Self {
106 DeltaStream {
107 last_value: Default::default(),
108 writer,
109 }
110 }
111}
112impl<S: Streamable + ToSigned<Output = T>, T: Streamable + NumberSigned + Sub<Output = T>> Stream<S>
113 for DeltaStream<T>
114{
115 fn write_value(&mut self, value: S) -> Result<usize, Error> {
118 let value = ToSigned::to_signed(value);
119 let delta = value - self.last_value;
120 self.last_value = value;
121 self.writer.write_value(delta)
122 }
123
124 fn flush(&mut self) -> Result<(), Error> {
125 self.writer.flush()
126 }
127 fn written_stats(&self) -> String {
128 self.writer.written_stats()
129 }
130}
131pub struct AddingDecoder<T: Streamable> {
132 last_value: T,
133 reader: Box<dyn Decoder<T>>,
134}
135impl<T: Streamable> AddingDecoder<T> {
136 pub fn new(reader: Box<dyn Decoder<T>>) -> Self {
137 AddingDecoder {
138 last_value: Default::default(),
139 reader,
140 }
141 }
142}
143impl<T: Streamable + ToUnsigned<Output = R> + Add<Output = T>, R: Streamable> Decoder<R>
144 for AddingDecoder<T>
145{
146 fn next(&mut self) -> Result<Option<R>, Error> {
147 let a = self.reader.next()?;
148 let Some(a) = a else {
149 return Ok(None);
150 };
151 let next = a + self.last_value;
152 self.last_value = next;
153 let next = ToUnsigned::to_unsigned(next);
154 Ok(Some(next))
155 }
156}
157
158pub struct ZigZagStream<T> {
159 writer: Box<dyn Stream<T>>,
160}
161impl<T: Streamable> ZigZagStream<T> {
162 pub fn new(writer: Box<dyn Stream<T>>) -> Self {
163 Self { writer }
164 }
165}
166impl<D: Streamable, S: Streamable + ZigZag<Output = D>> Stream<S> for ZigZagStream<D> {
167 fn write_value(&mut self, value: S) -> Result<usize, Error> {
168 self.writer.write_value(ZigZag::zigzag(value))
169 }
170}
171
172pub struct ZigZagDecoder<T> {
173 reader: Box<dyn Decoder<T>>,
174}
175impl<T: Streamable> ZigZagDecoder<T> {
176 pub fn new(reader: Box<dyn Decoder<T>>) -> Self {
177 Self { reader }
178 }
179}
180impl<D: Streamable, S: Streamable + ZagZig<Output = D>> Decoder<D> for ZigZagDecoder<S> {
181 fn next(&mut self) -> Result<Option<D>, Error> {
182 let a = self.reader.next()?;
183 let Some(a) = a else {
184 return Ok(None);
185 };
186 let a = ZagZig::zagzig(a);
187 Ok(Some(a))
188 }
189}
190
191pub struct I64ToU64Stream {
192 writer: Box<dyn Stream<u64>>,
193}
194impl I64ToU64Stream {
195 pub fn new(writer: Box<dyn Stream<u64>>) -> Self {
196 Self { writer }
197 }
198}
199impl Stream<i64> for I64ToU64Stream {
200 fn write_value(&mut self, value: i64) -> Result<usize, Error> {
201 self.writer.write_value(value.as_u64())
202 }
203}
204pub struct U64ToI64Decoder {
205 reader: Box<dyn Decoder<u64>>,
206}
207impl U64ToI64Decoder {
208 pub fn new(reader: Box<dyn Decoder<u64>>) -> Self {
209 Self { reader }
210 }
211}
212impl Decoder<i64> for U64ToI64Decoder {
213 fn next(&mut self) -> Result<Option<i64>, Error> {
214 let Some(val) = self.reader.next()? else {
215 return Ok(None);
216 };
217 Ok(Some(val as i64))
218 }
219}
220
221pub struct F64ToU64Stream {
224 writer: Box<dyn Stream<u64>>,
225}
226impl F64ToU64Stream {
227 pub fn new(writer: Box<dyn Stream<u64>>) -> Self {
228 Self { writer }
229 }
230}
231impl Stream<f64> for F64ToU64Stream {
232 fn write_value(&mut self, value: f64) -> Result<usize, Error> {
233 self.writer.write_value(value.to_bits())
234 }
235}
236pub struct U64ToF64Decoder {
237 reader: Box<dyn Decoder<u64>>,
238}
239impl U64ToF64Decoder {
240 pub fn new(reader: Box<dyn Decoder<u64>>) -> Self {
241 Self { reader }
242 }
243}
244impl Decoder<f64> for U64ToF64Decoder {
245 fn next(&mut self) -> Result<Option<f64>, Error> {
246 let Some(val) = self.reader.next()? else {
247 return Ok(None);
248 };
249 Ok(Some(f64::from_bits(val)))
250 }
251}
252
253pub struct XorDeltaStream<T> {
254 last_value: T,
255 writer: Box<dyn Stream<T>>,
256}
257impl<T: Sized + Default> XorDeltaStream<T> {
258 pub fn new(writer: Box<dyn Stream<T>>) -> Self {
259 Self {
260 writer,
261 last_value: Default::default(),
262 }
263 }
264}
265impl<T: Streamable + BitXor<Output = T> + Copy> Stream<T> for XorDeltaStream<T> {
266 fn write_value(&mut self, value: T) -> Result<usize, Error> {
267 let out = BitXor::bitxor(self.last_value, value);
268 self.last_value = value;
269 self.writer.write_value(out)
270 }
271 fn flush(&mut self) -> Result<(), Error> {
272 self.writer.flush()
273 }
274
275 fn written_stats(&self) -> String {
276 self.writer.written_stats()
277 }
278}
279impl Stream<f64> for XorDeltaStream<u64> {
280 fn write_value(&mut self, value: f64) -> Result<usize, Error> {
281 let value = value.to_bits();
282 self.write_value(value)
283 }
284
285 fn flush(&mut self) -> Result<(), Error> {
286 self.writer.flush()
287 }
288
289 fn written_stats(&self) -> String {
290 self.writer.written_stats()
291 }
292}
293
294pub struct VByteIntStream<'a, B: MutBits> {
295 writer: BitsWrapper<'a, B>,
296}
297impl<'a, B: MutBits> VByteIntStream<'a, B> {
298 pub fn new(writer: BitsWrapper<'a, B>) -> Self {
299 Self { writer }
300 }
301}
302impl<B: MutBits, T: StreamableVByte + WriteToBEBits> Stream<T> for VByteIntStream<'_, B> {
303 fn write_value(&mut self, value: T) -> Result<usize, Error> {
304 EncodeVByteTo::encode_vbyte_to(&value, self.writer.deref_mut())
305 }
306}
307macro_rules! impl_mutbits_for_stream {
308 () => {
309 fn write_u8(&mut self, val: u8) -> Result<(), Error> {
310 self.write_value(val)?;
311 Ok(())
312 }
313
314 fn write_be_u16(&mut self, val: u16) -> Result<(), Error> {
315 self.write_value(val)?;
316 Ok(())
317 }
318
319 fn write_be_u32(&mut self, val: u32) -> Result<(), Error> {
320 self.write_value(val)?;
321 Ok(())
322 }
323
324 fn write_be_u64(&mut self, val: u64) -> Result<(), Error> {
325 self.write_value(val)?;
326 Ok(())
327 }
328
329 fn write_be_u128(&mut self, val: u128) -> Result<(), Error> {
330 self.write_value(val)?;
331 Ok(())
332 }
333 };
334}
335impl<B: MutBits> MutBits for VByteIntStream<'_, B> {
336 impl_mutbits_for_stream!();
337}
338
339pub struct VByteDeltaIntStream<'a, T, B: MutBits> {
344 last_value: T,
345 writer: VByteIntStream<'a, B>,
346}
347
348impl<'a, T: Streamable, B: MutBits> VByteDeltaIntStream<'a, T, B> {
349 pub fn new(writer: BitsWrapper<'a, B>) -> VByteDeltaIntStream<'a, T, B> {
351 VByteDeltaIntStream {
352 last_value: Default::default(),
353 writer: VByteIntStream::new(writer),
354 }
355 }
356}
357impl<
358 T: Streamable + Sub<Output = T> + EncodeVByteTo + UpperHex + Sub<T> + NumberSigned,
359 B: MutBits,
360 > Stream<T> for VByteDeltaIntStream<'_, T, B>
361{
362 fn write_value(&mut self, value: T) -> Result<usize, Error> {
363 let delta = value - self.last_value;
364 self.last_value = value;
365 self.writer.write_value(delta)
366 }
367}
368
369cfg_feature_miniz! {
370 use miniz_oxide::deflate::core::{compress_to_output, CompressorOxide, TDEFLFlush, TDEFLStatus};
371 use miniz_oxide::deflate::CompressionLevel;
372 use miniz_oxide::DataFormat;
373 use alloc::collections::VecDeque;
374 use irox_bits::{Bits, ErrorKind};
375 use irox_tools::buf::{Buffer, RoundU8Buffer};
376
377 pub struct CompressStream<'a, T: MutBits> {
378 writer: BitsWrapper<'a, T>,
379 inbuf: VecDeque<u8>,
380 compressor: CompressorOxide,
381 written: u64,
382 }
383 impl<'a, T: MutBits> CompressStream<'a, T> {
384 pub fn new(writer: BitsWrapper<'a, T>) -> Self {
385 let mut compressor = CompressorOxide::default();
386 compressor.set_format_and_level(DataFormat::Raw, CompressionLevel::DefaultCompression as u8);
387 Self {
388 writer,
389 inbuf: VecDeque::with_capacity(32768),
390 compressor,
391 written: 0,
392 }
393 }
394
395 pub fn write_value<V: WriteToBEBits+Copy>(
396 &mut self, value: V) -> Result<(), Error> {
397 WriteToBEBits::write_be_to(&value, &mut self.inbuf)?;
399 if self.inbuf.len() < 32768 {
400 return Ok(())
401 }
402 let (a,b) = self.inbuf.as_slices();
403 let v = if a.is_empty() {
404 b
405 } else {
406 a
407 };
408
409 let (status, size) = compress_to_output(&mut self.compressor, v, TDEFLFlush::None, |out| {
410 self.written = self.written.wrapping_add(out.len() as u64);
411 self.writer.write_all_bytes(out).is_ok()
412 });
413 if status != TDEFLStatus::Okay {
414 return Err(ErrorKind::BrokenPipe.into());
415 }
416 self.inbuf.drain(0..size);
417 Ok(())
418 }
419
420 pub fn flush(&mut self) -> Result<(), Error> {
421 loop {
422 let v = self.inbuf.make_contiguous();
423 let (status, size) = compress_to_output(&mut self.compressor, v, TDEFLFlush::Finish, |out| {
424 self.written = self.written.wrapping_add(out.len() as u64);
425 self.writer.write_all_bytes(out).is_ok()
426 });
427 self.inbuf.drain(0..size);
428 return match status {
429 TDEFLStatus::BadParam => {
430 Err(ErrorKind::InvalidInput.into())
431 }
432 TDEFLStatus::PutBufFailed => {
433 Err(ErrorKind::BrokenPipe.into())
434 }
435 TDEFLStatus::Okay => {
436 continue;
437 }
438 TDEFLStatus::Done => {
439 break;
440 }
441 }
442 }
443 Ok(())
444 }
445 pub fn written(&self) -> u64 {
446 self.written
447 }
448 }
449 impl<B: MutBits> Drop for CompressStream<'_, B> {
450 fn drop(&mut self) {
452 let _ = self.flush();
453 }
454 }
455 impl<B: MutBits> MutBits for CompressStream<'_, B> {
456 impl_mutbits_for_stream!();
457 }
458
459 impl<B: MutBits, T: Sized + Default + WriteToBEBits> Stream<T> for CompressStream<'_, B> {
460 fn write_value(&mut self, value: T) -> Result<usize, Error> {
461 WriteToBEBits::write_be_to(&value, self)
462 }
463
464 fn flush(&mut self) -> Result<(), Error> {
465 Self::flush(self)
466 }
467
468 fn written_stats(&self) -> String {
469 format!("{}", self.written)
470 }
471
472 }
473
474 pub struct DeltaCompressStream<'a, T: Streamable+Copy, B: MutBits> {
479 last_value: T,
480 compressor: CompressStream<'a, B>
481 }
482 impl<'a, T: Streamable+Copy, B: MutBits> DeltaCompressStream<'a, T, B> {
483 pub fn new(writer: BitsWrapper<'a, B>) -> DeltaCompressStream<'a, T, B> {
485 DeltaCompressStream {
486 last_value: Default::default(),
487 compressor: CompressStream::new(writer),
488 }
489 }
490
491 pub fn write_value(&mut self, value: T) -> Result<(), Error> {
494
495 let delta = value; self.last_value = value;
497 self.compressor.write_value(delta)?;
500 Ok(())
501 }
502
503 pub fn flush(&mut self) -> Result<(), Error> {
504 self.compressor.flush()
505 }
506
507 pub fn written(&self) -> u64 {
508 self.compressor.written()
509 }
510
511 }
512 impl<T: Streamable+Copy, B: MutBits> Drop for DeltaCompressStream<'_, T, B> {
513 fn drop(&mut self) {
515 let _ = self.flush();
516 }
517 }
518
519 pub struct InflateStream<'a, B: irox_bits::BufBits> {
520 reader: BitsWrapper<'a, B>,
521 out_buffer: RoundU8Buffer<4096>,
522 inflater: miniz_oxide::inflate::stream::InflateState,
523 }
524 impl<'a, B: irox_bits::BufBits> InflateStream<'a, B> {
525 pub fn new(reader: BitsWrapper<'a, B>) -> Self {
526 let inflater = miniz_oxide::inflate::stream::InflateState::new(DataFormat::Raw);
527 let out_buffer = RoundU8Buffer::<4096>::default();
528 Self {
529 reader,
530 out_buffer,
531 inflater
532 }
533 }
534 pub fn has_more(&mut self) -> Result<bool, Error> {
535 if self.out_buffer.is_empty() {
536 self.try_fill_buf()?;
537 }
538 Ok(!self.out_buffer.is_empty())
539 }
540 fn try_fill_buf(&mut self) -> Result<(), Error> {
541 if !self.out_buffer.is_empty() {
542 return Ok(());
543 }
544 self.out_buffer.clear();
545
546 let outbuf = self.out_buffer.as_ref_mut_available();
547 let inbuf = self.reader.fill_buf()?;
548 let res = miniz_oxide::inflate::stream::inflate(&mut self.inflater, inbuf, outbuf, miniz_oxide::MZFlush::None);
549 self.reader.consume(res.bytes_consumed);
550 self.out_buffer.mark_some_used(res.bytes_written)?;
551 Ok(())
552 }
553 }
554 impl<B: irox_bits::BufBits> Bits for InflateStream<'_, B> {
555 fn next_u8(&mut self) -> Result<Option<u8>, Error> {
556 if let Some(v) = self.out_buffer.pop_front() {
557 return Ok(Some(v));
558 }
559 self.try_fill_buf()?;
560
561 Ok(self.out_buffer.pop_front())
562 }
563
564 }
565}
566#[derive(Default)]
567pub struct StreamStageStats {
568 stats: BTreeMap<String, Box<dyn Fn() -> String>>,
569}
570impl StreamStageStats {
571 pub fn stage(&mut self, name: &str, value: Box<dyn Fn() -> String>) {
572 self.stats.insert(name.to_string(), value);
573 }
574 pub fn stage_counting(&mut self, name: &str, value: SharedROCounter) {
575 self.stage(name, Box::new(move || value.get_count().to_string()))
576 }
577 pub fn stats(&self) -> Vec<String> {
578 self.stats
579 .iter()
580 .map(|(k, v)| format!("{k}: {}", v()))
581 .collect::<Vec<String>>()
582 }
583}
584
585#[cfg(all(test, feature = "miniz", feature = "std"))]
586mod test {
587 use crate::streams::{BitsWrapper, DeltaCompressStream};
588 use irox_bits::Error;
589 use irox_time::Time64;
590 use irox_units::units::duration::Duration;
591 use std::time::Instant;
592
593 #[test]
596 pub fn test1() -> Result<(), Error> {
597 let mut buf = Vec::with_capacity(32768);
598 let mut input = 0;
599 let start = Instant::now();
600 let written = {
601 let wrapper = BitsWrapper::Borrowed(&mut buf);
602 let mut vbout = DeltaCompressStream::<u64, _>::new(wrapper);
603
604 for i in 0..4_000_000 {
605 input += 8;
606 vbout.write_value(i)?;
607 }
608 vbout.flush()?;
609 drop(vbout);
610 buf.len()
611 };
612 let end = start.elapsed();
613 let ratio = 1. - (written as f64 / input as f64);
615 let ratio = ratio * 100.;
616 let ubps = input as f64 / end.as_secs_f64() / 1e6;
617 println!("Turned {input} bytes into {written} = {ratio:02.}% reduction = {ubps:02.02}MB/s");
618 Ok(())
619 }
620
621 #[test]
622 pub fn test_nanos() -> Result<(), Error> {
623 let mut buf = Vec::with_capacity(32768);
624 let mut input = Time64::now();
625 let incr = Duration::from_millis(100);
626 let start = Instant::now();
627 let count = 2_000_000;
628 let written = {
629 let wrapper = BitsWrapper::Borrowed(&mut buf);
630 let mut vbout = DeltaCompressStream::new(wrapper);
631
632 for _ in 0..count {
633 input += incr;
634 vbout.write_value(input.as_u64())?;
635 }
636 vbout.flush()?;
637 drop(vbout);
638 buf.len()
639 };
640 let count = count * 8;
641 let end = start.elapsed();
642 let ratio = 1. - (written as f64 / count as f64);
644 let ratio = ratio * 100.;
645 let ubps = count as f64 / end.as_secs_f64() / 1e6;
646 println!("Turned {count} bytes into {written} = {ratio:02.}% reduction = {ubps:02.02}MB/s");
647
648 Ok(())
649 }
650}