irox_stats/
streams.rs

1// SPDX-License-Identifier: MIT
2// Copyright 2025 IROX Contributors
3//
4
5//!
6//! Streaming data encoders and decoders
7
8extern crate alloc;
9use crate::cfg_feature_miniz;
10use alloc::boxed::Box;
11use alloc::collections::BTreeMap;
12use alloc::format;
13use alloc::string::{String, ToString};
14use alloc::vec::Vec;
15use core::fmt::UpperHex;
16use core::ops::{Add, BitXor, DerefMut, Sub};
17use irox_bits::{BitsWrapper, Error, MutBits, SharedROCounter, WriteToBEBits};
18use irox_tools::codec::{EncodeVByteTo, ZagZig, ZigZag};
19use irox_tools::{ToSigned, ToUnsigned};
20use irox_types::{AnyUnsignedInteger, NumberSigned};
21
22pub trait Stream<T> {
23    fn write_value(&mut self, value: T) -> Result<usize, Error>;
24    fn flush(&mut self) -> Result<(), Error> {
25        Ok(())
26    }
27    fn written_stats(&self) -> String {
28        String::new()
29    }
30}
31pub trait Decoder<T> {
32    fn next(&mut self) -> Result<Option<T>, Error>;
33}
34
35pub trait Streamable: Sized + Default + Copy + WriteToBEBits {}
36impl<T> Streamable for T where T: Sized + Default + Copy + WriteToBEBits {}
37pub trait StreamableVByte:
38    Sized + Default + Copy + Sub<Output: EncodeVByteTo + UpperHex> + EncodeVByteTo + WriteToBEBits + Sub
39{
40}
41impl<T> StreamableVByte for T where
42    T: Sized
43        + Default
44        + Copy
45        + Sub<Output: EncodeVByteTo + UpperHex>
46        + EncodeVByteTo
47        + WriteToBEBits
48        + Sub
49{
50}
51pub trait ValueOperation<'a, T> {
52    fn encode(&'a mut self, value: &T) -> Result<T, Error>;
53}
54pub struct CompositeStream<'a, T: Streamable, B: MutBits> {
55    writer: &'a mut B,
56    operations: Vec<Box<dyn ValueOperation<'a, T>>>,
57}
58impl<'a, T: Streamable, B: MutBits> CompositeStream<'a, T, B> {
59    pub fn new(writer: &'a mut B) -> CompositeStream<'a, T, B> {
60        Self {
61            writer,
62            operations: Vec::new(),
63        }
64    }
65    pub fn and_then<V: ValueOperation<'a, T> + 'static>(&mut self, value: Box<V>) {
66        self.operations.push(value);
67    }
68    pub fn write_value(&'a mut self, value: T) -> Result<usize, Error> {
69        let mut v = value;
70        for op in &mut self.operations {
71            v = op.encode(&v)?;
72        }
73        WriteToBEBits::write_be_to(&value, self.writer)
74    }
75}
76
77pub struct DeltaOperation<T> {
78    last_value: T,
79}
80impl<'a, T: Sub<T, Output = T> + Copy> ValueOperation<'a, T> for DeltaOperation<T> {
81    fn encode(&'a mut self, value: &T) -> Result<T, Error> {
82        let out = *value - self.last_value;
83        self.last_value = out;
84        Ok(out)
85    }
86}
87pub struct VByteOperation;
88impl<'a, T: Sub<T, Output = T> + Copy> ValueOperation<'a, T> for VByteOperation {
89    fn encode(&'a mut self, _value: &T) -> Result<T, Error> {
90        todo!()
91    }
92}
93
94///
95/// A stream impl that writes the difference between the last value and the current
96/// value to the provided [`MutBits`] writer.  The previous value is initialized to 0.
97pub struct DeltaStream<T: Streamable> {
98    last_value: T,
99    writer: Box<dyn Stream<T>>,
100}
101
102impl<T: Streamable> DeltaStream<T> {
103    ///
104    /// Create a new stream impl
105    pub fn new(writer: Box<dyn Stream<T>>) -> Self {
106        DeltaStream {
107            last_value: Default::default(),
108            writer,
109        }
110    }
111}
112impl<S: Streamable + ToSigned<Output = T>, T: Streamable + NumberSigned + Sub<Output = T>> Stream<S>
113    for DeltaStream<T>
114{
115    ///
116    /// Deltifies the value against the previous value and writes it out.
117    fn write_value(&mut self, value: S) -> Result<usize, Error> {
118        let value = ToSigned::to_signed(value);
119        let delta = value - self.last_value;
120        self.last_value = value;
121        self.writer.write_value(delta)
122    }
123
124    fn flush(&mut self) -> Result<(), Error> {
125        self.writer.flush()
126    }
127    fn written_stats(&self) -> String {
128        self.writer.written_stats()
129    }
130}
131pub struct AddingDecoder<T: Streamable> {
132    last_value: T,
133    reader: Box<dyn Decoder<T>>,
134}
135impl<T: Streamable> AddingDecoder<T> {
136    pub fn new(reader: Box<dyn Decoder<T>>) -> Self {
137        AddingDecoder {
138            last_value: Default::default(),
139            reader,
140        }
141    }
142}
143impl<T: Streamable + ToUnsigned<Output = R> + Add<Output = T>, R: Streamable> Decoder<R>
144    for AddingDecoder<T>
145{
146    fn next(&mut self) -> Result<Option<R>, Error> {
147        let a = self.reader.next()?;
148        let Some(a) = a else {
149            return Ok(None);
150        };
151        let next = a + self.last_value;
152        self.last_value = next;
153        let next = ToUnsigned::to_unsigned(next);
154        Ok(Some(next))
155    }
156}
157
158pub struct ZigZagStream<T> {
159    writer: Box<dyn Stream<T>>,
160}
161impl<T: Streamable> ZigZagStream<T> {
162    pub fn new(writer: Box<dyn Stream<T>>) -> Self {
163        Self { writer }
164    }
165}
166impl<D: Streamable, S: Streamable + ZigZag<Output = D>> Stream<S> for ZigZagStream<D> {
167    fn write_value(&mut self, value: S) -> Result<usize, Error> {
168        self.writer.write_value(ZigZag::zigzag(value))
169    }
170}
171
172pub struct ZigZagDecoder<T> {
173    reader: Box<dyn Decoder<T>>,
174}
175impl<T: Streamable> ZigZagDecoder<T> {
176    pub fn new(reader: Box<dyn Decoder<T>>) -> Self {
177        Self { reader }
178    }
179}
180impl<D: Streamable, S: Streamable + ZagZig<Output = D>> Decoder<D> for ZigZagDecoder<S> {
181    fn next(&mut self) -> Result<Option<D>, Error> {
182        let a = self.reader.next()?;
183        let Some(a) = a else {
184            return Ok(None);
185        };
186        let a = ZagZig::zagzig(a);
187        Ok(Some(a))
188    }
189}
190
191pub struct I64ToU64Stream {
192    writer: Box<dyn Stream<u64>>,
193}
194impl I64ToU64Stream {
195    pub fn new(writer: Box<dyn Stream<u64>>) -> Self {
196        Self { writer }
197    }
198}
199impl Stream<i64> for I64ToU64Stream {
200    fn write_value(&mut self, value: i64) -> Result<usize, Error> {
201        self.writer.write_value(value.as_u64())
202    }
203}
204pub struct U64ToI64Decoder {
205    reader: Box<dyn Decoder<u64>>,
206}
207impl U64ToI64Decoder {
208    pub fn new(reader: Box<dyn Decoder<u64>>) -> Self {
209        Self { reader }
210    }
211}
212impl Decoder<i64> for U64ToI64Decoder {
213    fn next(&mut self) -> Result<Option<i64>, Error> {
214        let Some(val) = self.reader.next()? else {
215            return Ok(None);
216        };
217        Ok(Some(val as i64))
218    }
219}
220
221///
222/// Basic stream to convert from a [`f64`] to a [`u64`]
223pub struct F64ToU64Stream {
224    writer: Box<dyn Stream<u64>>,
225}
226impl F64ToU64Stream {
227    pub fn new(writer: Box<dyn Stream<u64>>) -> Self {
228        Self { writer }
229    }
230}
231impl Stream<f64> for F64ToU64Stream {
232    fn write_value(&mut self, value: f64) -> Result<usize, Error> {
233        self.writer.write_value(value.to_bits())
234    }
235}
236pub struct U64ToF64Decoder {
237    reader: Box<dyn Decoder<u64>>,
238}
239impl U64ToF64Decoder {
240    pub fn new(reader: Box<dyn Decoder<u64>>) -> Self {
241        Self { reader }
242    }
243}
244impl Decoder<f64> for U64ToF64Decoder {
245    fn next(&mut self) -> Result<Option<f64>, Error> {
246        let Some(val) = self.reader.next()? else {
247            return Ok(None);
248        };
249        Ok(Some(f64::from_bits(val)))
250    }
251}
252
253pub struct XorDeltaStream<T> {
254    last_value: T,
255    writer: Box<dyn Stream<T>>,
256}
257impl<T: Sized + Default> XorDeltaStream<T> {
258    pub fn new(writer: Box<dyn Stream<T>>) -> Self {
259        Self {
260            writer,
261            last_value: Default::default(),
262        }
263    }
264}
265impl<T: Streamable + BitXor<Output = T> + Copy> Stream<T> for XorDeltaStream<T> {
266    fn write_value(&mut self, value: T) -> Result<usize, Error> {
267        let out = BitXor::bitxor(self.last_value, value);
268        self.last_value = value;
269        self.writer.write_value(out)
270    }
271    fn flush(&mut self) -> Result<(), Error> {
272        self.writer.flush()
273    }
274
275    fn written_stats(&self) -> String {
276        self.writer.written_stats()
277    }
278}
279impl Stream<f64> for XorDeltaStream<u64> {
280    fn write_value(&mut self, value: f64) -> Result<usize, Error> {
281        let value = value.to_bits();
282        self.write_value(value)
283    }
284
285    fn flush(&mut self) -> Result<(), Error> {
286        self.writer.flush()
287    }
288
289    fn written_stats(&self) -> String {
290        self.writer.written_stats()
291    }
292}
293
294pub struct VByteIntStream<'a, B: MutBits> {
295    writer: BitsWrapper<'a, B>,
296}
297impl<'a, B: MutBits> VByteIntStream<'a, B> {
298    pub fn new(writer: BitsWrapper<'a, B>) -> Self {
299        Self { writer }
300    }
301}
302impl<B: MutBits, T: StreamableVByte + WriteToBEBits> Stream<T> for VByteIntStream<'_, B> {
303    fn write_value(&mut self, value: T) -> Result<usize, Error> {
304        EncodeVByteTo::encode_vbyte_to(&value, self.writer.deref_mut())
305    }
306}
307macro_rules! impl_mutbits_for_stream {
308    () => {
309        fn write_u8(&mut self, val: u8) -> Result<(), Error> {
310            self.write_value(val)?;
311            Ok(())
312        }
313
314        fn write_be_u16(&mut self, val: u16) -> Result<(), Error> {
315            self.write_value(val)?;
316            Ok(())
317        }
318
319        fn write_be_u32(&mut self, val: u32) -> Result<(), Error> {
320            self.write_value(val)?;
321            Ok(())
322        }
323
324        fn write_be_u64(&mut self, val: u64) -> Result<(), Error> {
325            self.write_value(val)?;
326            Ok(())
327        }
328
329        fn write_be_u128(&mut self, val: u128) -> Result<(), Error> {
330            self.write_value(val)?;
331            Ok(())
332        }
333    };
334}
335impl<B: MutBits> MutBits for VByteIntStream<'_, B> {
336    impl_mutbits_for_stream!();
337}
338
339///
340/// A stream impl that writes the varint-encoded difference between the last
341/// value and the current value to the provided [`MutBits`] writer.  The previous
342/// value is initialized to 0.
343pub struct VByteDeltaIntStream<'a, T, B: MutBits> {
344    last_value: T,
345    writer: VByteIntStream<'a, B>,
346}
347
348impl<'a, T: Streamable, B: MutBits> VByteDeltaIntStream<'a, T, B> {
349    /// Creates a new stream
350    pub fn new(writer: BitsWrapper<'a, B>) -> VByteDeltaIntStream<'a, T, B> {
351        VByteDeltaIntStream {
352            last_value: Default::default(),
353            writer: VByteIntStream::new(writer),
354        }
355    }
356}
357impl<
358        T: Streamable + Sub<Output = T> + EncodeVByteTo + UpperHex + Sub<T> + NumberSigned,
359        B: MutBits,
360    > Stream<T> for VByteDeltaIntStream<'_, T, B>
361{
362    fn write_value(&mut self, value: T) -> Result<usize, Error> {
363        let delta = value - self.last_value;
364        self.last_value = value;
365        self.writer.write_value(delta)
366    }
367}
368
369cfg_feature_miniz! {
370    use miniz_oxide::deflate::core::{compress_to_output, CompressorOxide, TDEFLFlush, TDEFLStatus};
371    use miniz_oxide::deflate::CompressionLevel;
372    use miniz_oxide::DataFormat;
373    use alloc::collections::VecDeque;
374    use irox_bits::{Bits, ErrorKind};
375    use irox_tools::buf::{Buffer, RoundU8Buffer};
376
377    pub struct CompressStream<'a, T: MutBits> {
378        writer: BitsWrapper<'a, T>,
379        inbuf: VecDeque<u8>,
380        compressor: CompressorOxide,
381        written: u64,
382    }
383    impl<'a, T: MutBits> CompressStream<'a, T> {
384        pub fn new(writer: BitsWrapper<'a, T>) -> Self {
385            let mut compressor = CompressorOxide::default();
386            compressor.set_format_and_level(DataFormat::Raw, CompressionLevel::DefaultCompression as u8);
387            Self {
388                writer,
389                inbuf: VecDeque::with_capacity(32768),
390                compressor,
391                written: 0,
392            }
393        }
394
395        pub fn write_value<V: WriteToBEBits+Copy>(
396                &mut self, value: V) -> Result<(), Error> {
397            // println!("writing {value:08X}");
398            WriteToBEBits::write_be_to(&value, &mut self.inbuf)?;
399            if self.inbuf.len() < 32768 {
400                return Ok(())
401            }
402            let (a,b) = self.inbuf.as_slices();
403            let v = if a.is_empty() {
404                b
405            } else  {
406                a
407            };
408
409            let (status, size) = compress_to_output(&mut self.compressor, v, TDEFLFlush::None, |out| {
410                self.written = self.written.wrapping_add(out.len() as u64);
411                self.writer.write_all_bytes(out).is_ok()
412            });
413            if status != TDEFLStatus::Okay {
414                return Err(ErrorKind::BrokenPipe.into());
415            }
416            self.inbuf.drain(0..size);
417            Ok(())
418        }
419
420         pub fn flush(&mut self) -> Result<(), Error> {
421            loop {
422                let v = self.inbuf.make_contiguous();
423                let (status, size) = compress_to_output(&mut self.compressor, v, TDEFLFlush::Finish, |out| {
424                    self.written = self.written.wrapping_add(out.len() as u64);
425                    self.writer.write_all_bytes(out).is_ok()
426                });
427                self.inbuf.drain(0..size);
428                return match status {
429                    TDEFLStatus::BadParam => {
430                        Err(ErrorKind::InvalidInput.into())
431                    }
432                    TDEFLStatus::PutBufFailed => {
433                        Err(ErrorKind::BrokenPipe.into())
434                    }
435                    TDEFLStatus::Okay => {
436                        continue;
437                    }
438                    TDEFLStatus::Done => {
439                        break;
440                    }
441                }
442            }
443            Ok(())
444        }
445        pub fn written(&self) -> u64 {
446            self.written
447        }
448    }
449    impl<B: MutBits> Drop for CompressStream<'_, B> {
450        /// Make sure the buffer is fully flushed on drop
451        fn drop(&mut self) {
452            let _ = self.flush();
453        }
454    }
455    impl<B: MutBits> MutBits for CompressStream<'_, B> {
456        impl_mutbits_for_stream!();
457    }
458
459    impl<B: MutBits, T: Sized + Default + WriteToBEBits> Stream<T> for CompressStream<'_, B> {
460        fn write_value(&mut self, value: T) -> Result<usize, Error> {
461            WriteToBEBits::write_be_to(&value, self)
462        }
463
464        fn flush(&mut self) -> Result<(), Error> {
465            Self::flush(self)
466        }
467
468        fn written_stats(&self) -> String {
469            format!("{}", self.written)
470        }
471
472    }
473
474    ///
475    /// A stream impl that writes the deflated, varint-encoded difference between
476    /// the last value and the current value to the provided [`MutBits`] writer.
477    /// The previous value is initialized to 0.
478    pub struct DeltaCompressStream<'a, T: Streamable+Copy, B: MutBits> {
479        last_value: T,
480        compressor: CompressStream<'a, B>
481    }
482    impl<'a, T: Streamable+Copy, B: MutBits> DeltaCompressStream<'a, T, B> {
483        /// Create a new stream
484        pub fn new(writer: BitsWrapper<'a, B>) -> DeltaCompressStream<'a, T, B> {
485            DeltaCompressStream {
486                last_value: Default::default(),
487                compressor: CompressStream::new(writer),
488            }
489        }
490
491        ///
492        /// Encodes & writes the value out.
493        pub fn write_value(&mut self, value: T) -> Result<(), Error> {
494
495            let delta = value; //value.wrapping_sub(self.last_value);
496            self.last_value = value;
497            // println!("Delta: {delta:08X}");
498            // EncodeVByteTo::encode_vbyte_to(&delta, &mut self.compressor)?;
499            self.compressor.write_value(delta)?;
500            Ok(())
501        }
502
503        pub fn flush(&mut self) -> Result<(), Error> {
504            self.compressor.flush()
505        }
506
507        pub fn written(&self) -> u64 {
508            self.compressor.written()
509        }
510
511    }
512    impl<T: Streamable+Copy, B: MutBits> Drop for DeltaCompressStream<'_, T, B> {
513        /// Make sure the buffer is fully flushed on drop
514        fn drop(&mut self) {
515            let _ = self.flush();
516        }
517    }
518
519    pub struct InflateStream<'a, B: irox_bits::BufBits> {
520        reader: BitsWrapper<'a, B>,
521        out_buffer: RoundU8Buffer<4096>,
522        inflater: miniz_oxide::inflate::stream::InflateState,
523    }
524    impl<'a, B: irox_bits::BufBits> InflateStream<'a, B> {
525        pub fn new(reader: BitsWrapper<'a, B>) -> Self {
526            let inflater = miniz_oxide::inflate::stream::InflateState::new(DataFormat::Raw);
527            let out_buffer = RoundU8Buffer::<4096>::default();
528            Self {
529                reader,
530                out_buffer,
531                inflater
532            }
533        }
534        pub fn has_more(&mut self) -> Result<bool, Error> {
535            if self.out_buffer.is_empty() {
536                self.try_fill_buf()?;
537            }
538            Ok(!self.out_buffer.is_empty())
539        }
540        fn try_fill_buf(&mut self) -> Result<(), Error> {
541            if !self.out_buffer.is_empty() {
542                return Ok(());
543            }
544            self.out_buffer.clear();
545
546            let outbuf = self.out_buffer.as_ref_mut_available();
547            let inbuf = self.reader.fill_buf()?;
548            let res = miniz_oxide::inflate::stream::inflate(&mut self.inflater, inbuf, outbuf, miniz_oxide::MZFlush::None);
549            self.reader.consume(res.bytes_consumed);
550            self.out_buffer.mark_some_used(res.bytes_written)?;
551            Ok(())
552        }
553    }
554    impl<B: irox_bits::BufBits> Bits for InflateStream<'_, B> {
555        fn next_u8(&mut self) -> Result<Option<u8>, Error> {
556            if let Some(v) = self.out_buffer.pop_front() {
557                return Ok(Some(v));
558            }
559            self.try_fill_buf()?;
560
561            Ok(self.out_buffer.pop_front())
562        }
563
564    }
565}
566#[derive(Default)]
567pub struct StreamStageStats {
568    stats: BTreeMap<String, Box<dyn Fn() -> String>>,
569}
570impl StreamStageStats {
571    pub fn stage(&mut self, name: &str, value: Box<dyn Fn() -> String>) {
572        self.stats.insert(name.to_string(), value);
573    }
574    pub fn stage_counting(&mut self, name: &str, value: SharedROCounter) {
575        self.stage(name, Box::new(move || value.get_count().to_string()))
576    }
577    pub fn stats(&self) -> Vec<String> {
578        self.stats
579            .iter()
580            .map(|(k, v)| format!("{k}: {}", v()))
581            .collect::<Vec<String>>()
582    }
583}
584
585#[cfg(all(test, feature = "miniz", feature = "std"))]
586mod test {
587    use crate::streams::{BitsWrapper, DeltaCompressStream};
588    use irox_bits::Error;
589    use irox_time::Time64;
590    use irox_units::units::duration::Duration;
591    use std::time::Instant;
592
593    ///
594    /// Writes out 8*1M = 8MB to the underlying stream.
595    #[test]
596    pub fn test1() -> Result<(), Error> {
597        let mut buf = Vec::with_capacity(32768);
598        let mut input = 0;
599        let start = Instant::now();
600        let written = {
601            let wrapper = BitsWrapper::Borrowed(&mut buf);
602            let mut vbout = DeltaCompressStream::<u64, _>::new(wrapper);
603
604            for i in 0..4_000_000 {
605                input += 8;
606                vbout.write_value(i)?;
607            }
608            vbout.flush()?;
609            drop(vbout);
610            buf.len()
611        };
612        let end = start.elapsed();
613        // irox_tools::hex::HexDump::hexdump(&buf);
614        let ratio = 1. - (written as f64 / input as f64);
615        let ratio = ratio * 100.;
616        let ubps = input as f64 / end.as_secs_f64() / 1e6;
617        println!("Turned {input} bytes into {written} = {ratio:02.}% reduction = {ubps:02.02}MB/s");
618        Ok(())
619    }
620
621    #[test]
622    pub fn test_nanos() -> Result<(), Error> {
623        let mut buf = Vec::with_capacity(32768);
624        let mut input = Time64::now();
625        let incr = Duration::from_millis(100);
626        let start = Instant::now();
627        let count = 2_000_000;
628        let written = {
629            let wrapper = BitsWrapper::Borrowed(&mut buf);
630            let mut vbout = DeltaCompressStream::new(wrapper);
631
632            for _ in 0..count {
633                input += incr;
634                vbout.write_value(input.as_u64())?;
635            }
636            vbout.flush()?;
637            drop(vbout);
638            buf.len()
639        };
640        let count = count * 8;
641        let end = start.elapsed();
642        // irox_tools::hex::HexDump::hexdump(&buf);
643        let ratio = 1. - (written as f64 / count as f64);
644        let ratio = ratio * 100.;
645        let ubps = count as f64 / end.as_secs_f64() / 1e6;
646        println!("Turned {count} bytes into {written} = {ratio:02.}% reduction = {ubps:02.02}MB/s");
647
648        Ok(())
649    }
650}