irox_stats/
tsdf.rs

1// SPDX-License-Identifier: MIT
2// Copyright 2025 IROX Contributors
3//
4
5use crate::sampling::{IntSample64, Sample64, StrSample64};
6use crate::streams::{
7    AddingDecoder, CompressStream, Decoder, DeltaStream, F64ToU64Stream, I64ToU64Stream,
8    InflateStream, Stream, StreamStageStats, U64ToF64Decoder, U64ToI64Decoder, ZigZagDecoder,
9    ZigZagStream,
10};
11use alloc::sync::Arc;
12use core::hash::Hash;
13use irox_bits::{
14    Bits, BitsError, BitsErrorKind, BitsWrapper, Error, MutBits, ReadFromBEBits,
15    SharedCountingBits, SharedROCounter, WriteToBEBits,
16};
17use irox_time::Time64;
18use irox_tools::buf::{Buffer, RoundBuffer};
19use irox_tools::codec::{GroupVarintCodeDecoder, GroupVarintCodeEncoder};
20use irox_tools::map::OrderedHashMap;
21use irox_tools::read::{MultiStreamReader, MultiStreamWriter, StreamWriter};
22use irox_tools::StrWrapper;
23use std::path::Path;
24
25macro_rules! new_bdc {
26    ($writer:ident) => {
27        Box::new(CompressStream::new(BitsWrapper::Owned(
28            $writer.new_stream(),
29        )))
30    };
31}
32
33///
34/// Breaks a [`u64`] into the 8 component bytes, and then compresses them individually.
35pub struct EightByteStream<'a> {
36    pub(crate) fb1: Box<CompressStream<'a, StreamWriter>>,
37    pub(crate) fb2: Box<CompressStream<'a, StreamWriter>>,
38    pub(crate) fb3: Box<CompressStream<'a, StreamWriter>>,
39    pub(crate) fb4: Box<CompressStream<'a, StreamWriter>>,
40    pub(crate) fb5: Box<CompressStream<'a, StreamWriter>>,
41    pub(crate) fb6: Box<CompressStream<'a, StreamWriter>>,
42    pub(crate) fb7: Box<CompressStream<'a, StreamWriter>>,
43    pub(crate) fb8: Box<CompressStream<'a, StreamWriter>>,
44}
45impl EightByteStream<'_> {
46    pub fn new(writer: &Arc<MultiStreamWriter>) -> Self {
47        Self {
48            fb1: new_bdc!(writer),
49            fb2: new_bdc!(writer),
50            fb3: new_bdc!(writer),
51            fb4: new_bdc!(writer),
52            fb5: new_bdc!(writer),
53            fb6: new_bdc!(writer),
54            fb7: new_bdc!(writer),
55            fb8: new_bdc!(writer),
56        }
57    }
58
59    pub fn written(&self) -> u64 {
60        let mut out = 0u64;
61        out = out.wrapping_add(self.fb1.written());
62        out = out.wrapping_add(self.fb2.written());
63        out = out.wrapping_add(self.fb3.written());
64        out = out.wrapping_add(self.fb4.written());
65        out = out.wrapping_add(self.fb5.written());
66        out = out.wrapping_add(self.fb6.written());
67        out = out.wrapping_add(self.fb7.written());
68        out = out.wrapping_add(self.fb8.written());
69        out
70    }
71
72    pub fn written_stats(&self) -> [u64; 8] {
73        [
74            self.fb1.written(),
75            self.fb2.written(),
76            self.fb3.written(),
77            self.fb4.written(),
78            self.fb5.written(),
79            self.fb6.written(),
80            self.fb7.written(),
81            self.fb8.written(),
82        ]
83    }
84}
85impl Stream<u64> for EightByteStream<'_> {
86    fn write_value(&mut self, v: u64) -> Result<usize, Error> {
87        let [a, b, c, d, e, f, g, h] = v.to_be_bytes();
88        self.fb1.write_value(a as i8)?;
89        self.fb2.write_value(b as i8)?;
90        self.fb3.write_value(c as i8)?;
91        self.fb4.write_value(d as i8)?;
92        self.fb5.write_value(e as i8)?;
93        self.fb6.write_value(f as i8)?;
94        self.fb7.write_value(g as i8)?;
95        self.fb8.write_value(h as i8)?;
96        Ok(8)
97    }
98
99    fn flush(&mut self) -> Result<(), Error> {
100        self.fb1.flush()?;
101        self.fb2.flush()?;
102        self.fb3.flush()?;
103        self.fb4.flush()?;
104        self.fb5.flush()?;
105        self.fb6.flush()?;
106        self.fb7.flush()?;
107        self.fb8.flush()?;
108        Ok(())
109    }
110
111    fn written_stats(&self) -> String {
112        format!("{:?} = {}", self.written_stats(), self.written())
113    }
114}
115///
116/// Time Series Data File using the SPDP encoding scheme
117pub struct SPDPWriter<'a> {
118    writer: Arc<MultiStreamWriter>,
119    time_stream: EightByteStream<'a>,
120    float_stream: EightByteStream<'a>,
121    semi_last_value: f64,
122    last_value: f64,
123}
124
125///
126/// The rot54 operation is intended to exploit the ordered entropy of a float mantissa.  Most of the
127/// zero values from the mantissa will be in the LSBs.  Rot54 is therefor:
128///
129/// Float: `0xEEE_MMMMMMMMMMMMMu64 = 0x00_0ABBCCDDEEFFGG`
130/// Rot54: `0x00GGFFEEDDCCBB0A`
131///
132/// A value like `0.25f64` is encoded as `0x3FD0000000000000` if you call [`f64::to_bits`].  The rot54
133/// operation rotates it to `0xD03F` - which usually encodes much nicer.
134pub fn rot54(value: u64) -> u64 {
135    let [_, b, c, d, e, f, g, h] = value.to_be_bytes();
136    irox_bits::FromBEBytes::from_be_bytes([0, h, g, f, e, d, c, b])
137}
138
139impl SPDPWriter<'_> {
140    pub fn new<T: AsRef<Path>>(path: T) -> Result<Self, Error> {
141        let writer = MultiStreamWriter::new(path)?;
142        let time_stream = EightByteStream::new(&writer);
143        let float_stream = EightByteStream::new(&writer);
144
145        Ok(SPDPWriter {
146            writer,
147            time_stream,
148            float_stream,
149            last_value: f64::default(),
150            semi_last_value: f64::default(),
151        })
152    }
153
154    pub fn write_sample(&mut self, sample: &Sample64) -> Result<(), Error> {
155        let Sample64 { time, value } = sample;
156        self.time_stream.write_value(time.as_u64())?;
157
158        let delta = value; // - self.semi_last_value;
159        self.semi_last_value = self.last_value;
160        self.last_value = *value;
161        self.float_stream.write_value(delta.to_bits())?;
162        Ok(())
163    }
164    pub fn flush(&mut self) -> Result<(), Error> {
165        self.time_stream.flush()?;
166        self.float_stream.flush()?;
167        Ok(())
168    }
169
170    pub fn len(&self) -> Result<u64, Error> {
171        self.writer.len()
172    }
173    pub fn is_empty(&self) -> Result<bool, Error> {
174        Ok(self.len()? == 0)
175    }
176}
177///
178/// Basic stream to [`rot54`] the input sample.
179pub struct Rot54Stream {
180    writer: Box<dyn Stream<u64>>,
181}
182impl Rot54Stream {
183    pub fn new(writer: Box<dyn Stream<u64>>) -> Self {
184        Self { writer }
185    }
186}
187impl Stream<u64> for Rot54Stream {
188    fn write_value(&mut self, value: u64) -> Result<usize, Error> {
189        let v = rot54(value);
190        self.writer.write_value(v)
191    }
192
193    fn flush(&mut self) -> Result<(), Error> {
194        self.writer.flush()
195    }
196}
197
198///
199/// Basic stream to split a [`f64`] into it's exponent and mantissa
200pub struct FloatSplitter {
201    mantissa_writer: Box<dyn Stream<u64>>,
202    exponent_writer: Box<dyn Stream<u64>>,
203}
204impl FloatSplitter {
205    pub fn new(
206        mantissa_writer: Box<dyn Stream<u64>>,
207        exponent_writer: Box<dyn Stream<u64>>,
208    ) -> Self {
209        Self {
210            mantissa_writer,
211            exponent_writer,
212        }
213    }
214}
215impl Stream<f64> for FloatSplitter {
216    fn write_value(&mut self, value: f64) -> Result<usize, Error> {
217        let bits = value.to_bits();
218        let exponent = bits >> 52;
219        let mantissa = bits & 0xFFFFFFFFFFFFF;
220        self.mantissa_writer.write_value(mantissa)?;
221        self.exponent_writer.write_value(exponent)?;
222        Ok(8)
223    }
224
225    fn flush(&mut self) -> Result<(), Error> {
226        self.mantissa_writer.flush()?;
227        self.exponent_writer.flush()?;
228        Ok(())
229    }
230}
231
232///
233/// Stream to collect values into groups of 4 and then run them through a [`GroupVarintCodeEncoder`]
234pub struct GroupCodingStream<'a, T: Hash + Eq + Sized + Default + Clone + WriteToBEBits, B: MutBits>
235{
236    buf: RoundBuffer<4, T>,
237    inner: GroupVarintCodeEncoder<'a, T, B>,
238}
239impl<'a, T: Hash + Eq + Default + Sized + Default + Clone + WriteToBEBits, B: MutBits>
240    GroupCodingStream<'a, T, B>
241{
242    pub fn new(inner: BitsWrapper<'a, B>) -> Self {
243        Self {
244            buf: RoundBuffer::new(),
245            inner: GroupVarintCodeEncoder::new(inner),
246        }
247    }
248    pub fn counter(&self) -> SharedROCounter {
249        self.inner.counter()
250    }
251}
252impl<T: Hash + Eq + Sized + Default + Clone + WriteToBEBits, B: MutBits> Stream<T>
253    for GroupCodingStream<'_, T, B>
254{
255    fn write_value(&mut self, value: T) -> Result<usize, Error> {
256        let _ = self.buf.push_back(value);
257        if self.buf.is_full() {
258            let a = self.buf.pop_front().unwrap_or_default();
259            let b = self.buf.pop_front().unwrap_or_default();
260            let c = self.buf.pop_front().unwrap_or_default();
261            let d = self.buf.pop_front().unwrap_or_default();
262            self.inner.encode_4(&[a, b, c, d])
263        } else {
264            Ok(0)
265        }
266    }
267
268    fn flush(&mut self) -> Result<(), Error> {
269        self.inner.flush()
270    }
271}
272impl<T: Hash + Eq + Sized + Default + Clone + WriteToBEBits, B: MutBits> Drop
273    for GroupCodingStream<'_, T, B>
274{
275    fn drop(&mut self) {
276        let len = self.buf.len();
277        if len > 0 {
278            let needed = 4 - len;
279            for _ in 0..needed {
280                let _ = self.write_value(T::default());
281            }
282        }
283        let _ = self.inner.flush();
284    }
285}
286
287pub struct GroupDecodingStream<'a, T: Hash + Eq + Default, B: Bits> {
288    inner: GroupVarintCodeDecoder<'a, T, B>,
289    buf: RoundBuffer<4, T>,
290}
291impl<'a, T: Hash + Eq + Default + ReadFromBEBits + Clone, B: Bits> GroupDecodingStream<'a, T, B> {
292    pub fn new(inner: BitsWrapper<'a, B>) -> Self {
293        Self {
294            inner: GroupVarintCodeDecoder::new(inner),
295            buf: RoundBuffer::new(),
296        }
297    }
298}
299impl<T: Hash + Eq + Default + ReadFromBEBits + Clone, B: Bits> Decoder<T>
300    for GroupDecodingStream<'_, T, B>
301{
302    fn next(&mut self) -> Result<Option<T>, Error> {
303        if self.buf.is_empty() {
304            let Some(val) = self.inner.decode_4()? else {
305                return Ok(None);
306            };
307            for v in val {
308                let _ = self.buf.push_back(v.clone());
309            }
310        }
311        Ok(self.buf.pop_front())
312    }
313}
314
315///
316/// Coded Time Series Sample File consists of 2 streams: a data stream and a time stream backed by a [`MultiStreamWriter`]
317///
318/// Data stream:
319/// 1. Convert [`f64`] to [`u64`] bit-for-bit
320/// 2. Run it through a [`irox_tools::codec::CodeDictionary`] to map the observed values into unique [`u32`] codes
321/// 3. Group those codes into blocks of 4 using [`GroupCodingStream`] and then encode into Varint-GB using [`GroupVarintCodeEncoder`]
322/// 4. Deflate/GZ the resultant byte stream.
323///
324/// It is assumed that the data stream samples come from something approximating a A2D sensor with a fixed number of detection bits
325/// and as such, most of the data will be fairly similar, even if very noisy when it jumps around.
326///
327/// Time stream:
328/// 0. Convert the time value into a [`u64`] (external)
329/// 1. Run it through a [`DeltaStream`] to encode the first value, and then output the `N-1` difference
330/// 2. Run it through the same 2, 3, 4 processing as the data stream.
331///
332/// It is assumed that the time series will be periodically sampled and atomically increasing
333pub struct CodedTimeSeriesWriter<'a> {
334    float_stream: Box<dyn Stream<f64>>,
335    time_stream: Box<dyn Stream<u64>>,
336    int_stream: Box<dyn Stream<u64>>,
337    str_stream: Box<dyn Stream<StrWrapper<'a>> + 'a>,
338    meta_stream: Box<dyn Stream<Arc<String>> + 'a>,
339    stats: StreamStageStats,
340}
341
342impl<'a> CodedTimeSeriesWriter<'a> {
343    pub fn new<T: AsRef<Path>>(path: T) -> Result<Self, Error> {
344        let mut stats = StreamStageStats::default();
345
346        let writer = MultiStreamWriter::new(path)?;
347
348        let meta_stream = {
349            let meta_stream = writer.new_stream();
350            let meta_stream = CompressStream::new(BitsWrapper::Owned(meta_stream));
351            Box::new(meta_stream)
352        };
353
354        let time_stream = {
355            let time_stream = writer.new_stream();
356            let time_stream = SharedCountingBits::new(BitsWrapper::Owned(time_stream));
357            stats.stage_counting("1.1.time_gz", time_stream.get_count());
358            let time_stream = CompressStream::new(BitsWrapper::Owned(time_stream));
359            let time_stream = SharedCountingBits::new(BitsWrapper::Owned(time_stream));
360            stats.stage_counting("1.2.time_vgb", time_stream.get_count());
361            let time_stream = GroupCodingStream::<u64, _>::new(BitsWrapper::Owned(time_stream));
362            stats.stage_counting("1.3.time_codes", time_stream.counter());
363            let time_stream = ZigZagStream::new(Box::new(time_stream));
364            let time_stream = DeltaStream::<i64>::new(Box::new(time_stream));
365            Box::new(time_stream)
366        };
367
368        let float_stream = {
369            let float_stream = writer.new_stream();
370            let float_stream = SharedCountingBits::new(BitsWrapper::Owned(float_stream));
371            stats.stage_counting("2.1.float_gz", float_stream.get_count());
372            let float_stream = CompressStream::new(BitsWrapper::Owned(float_stream));
373            let float_stream = SharedCountingBits::new(BitsWrapper::Owned(float_stream));
374            stats.stage_counting("2.2.float_vgb", float_stream.get_count());
375            let float_stream = GroupCodingStream::<u64, _>::new(BitsWrapper::Owned(float_stream));
376            stats.stage_counting("2.3.float_codes", float_stream.counter());
377            let float_stream = F64ToU64Stream::new(Box::new(float_stream));
378            Box::new(float_stream)
379        };
380
381        let int_stream = {
382            let int_stream = writer.new_stream();
383            let int_stream = SharedCountingBits::new(BitsWrapper::Owned(int_stream));
384            stats.stage_counting("3.1.int_gz", int_stream.get_count());
385            let int_stream = CompressStream::new(BitsWrapper::Owned(int_stream));
386            let int_stream = SharedCountingBits::new(BitsWrapper::Owned(int_stream));
387            stats.stage_counting("3.2.int_vgb", int_stream.get_count());
388            let int_stream = GroupCodingStream::<u64, _>::new(BitsWrapper::Owned(int_stream));
389            let int_stream = I64ToU64Stream::new(Box::new(int_stream));
390            let int_stream = DeltaStream::<i64>::new(Box::new(int_stream));
391            Box::new(int_stream)
392        };
393
394        let str_stream = {
395            let str_stream = writer.new_stream();
396            let str_stream = SharedCountingBits::new(BitsWrapper::Owned(str_stream));
397            stats.stage_counting("4.1.str_gz", str_stream.get_count());
398            let str_stream = CompressStream::new(BitsWrapper::Owned(str_stream));
399            let str_stream = GroupCodingStream::new(BitsWrapper::Owned(str_stream));
400            Box::new(str_stream)
401        };
402
403        Ok(Self {
404            float_stream,
405            time_stream,
406            meta_stream,
407            int_stream,
408            str_stream,
409            stats,
410        })
411    }
412
413    #[must_use]
414    pub fn float_stream(self) -> CodedTimeSeriesFloatWriter<'a> {
415        CodedTimeSeriesFloatWriter { writer: self }
416    }
417    #[must_use]
418    pub fn int_stream(self) -> CodedTimeSeriesIntWriter<'a> {
419        CodedTimeSeriesIntWriter { writer: self }
420    }
421
422    pub fn write_str(&mut self, time: Time64, value: StrWrapper<'a>) -> Result<(), Error> {
423        self.time_stream.write_value(time.as_u64())?;
424        self.str_stream.write_value(value)?;
425        Ok(())
426    }
427
428    pub fn flush(&mut self) -> Result<(), Error> {
429        self.time_stream.flush()?;
430        self.float_stream.flush()?;
431        self.int_stream.flush()?;
432        self.str_stream.flush()?;
433        self.meta_stream.flush()?;
434        Ok(())
435    }
436    pub fn written_stats(&self) -> Vec<String> {
437        let mut out = self.stats.stats();
438        out.push(self.meta_stream.written_stats());
439        out.push(self.time_stream.written_stats());
440        out.push(self.float_stream.written_stats());
441        out.push(self.int_stream.written_stats());
442        out.push(self.str_stream.written_stats());
443        out
444    }
445
446    pub fn metadata(&'a mut self, key: Arc<String>, value: Arc<String>) -> Result<(), Error> {
447        self.meta_stream.write_value(key)?;
448        self.meta_stream.write_value(value)?;
449        Ok(())
450    }
451}
452pub struct CodedTimeSeriesFloatWriter<'a> {
453    writer: CodedTimeSeriesWriter<'a>,
454}
455impl<'a> CodedTimeSeriesFloatWriter<'a> {
456    pub fn write_sample(&mut self, sample: &Sample64) -> Result<(), Error> {
457        let Sample64 { time, value } = sample;
458        self.writer.time_stream.write_value(time.as_u64())?;
459        self.writer.float_stream.write_value(*value)?;
460        Ok(())
461    }
462    pub fn metadata<K: AsRef<str> + 'a, V: AsRef<str> + 'a>(
463        &'a mut self,
464        key: &'a K,
465        value: &'a V,
466    ) -> Result<(), Error> {
467        let key = Arc::new(key.as_ref().to_string());
468        let value = Arc::new(value.as_ref().to_string());
469        self.writer.metadata(key, value)
470    }
471    pub fn flush(&mut self) -> Result<(), Error> {
472        self.writer.flush()
473    }
474    pub fn written_stats(&self) -> Vec<String> {
475        self.writer.written_stats()
476    }
477}
478pub struct CodedTimeSeriesIntWriter<'a> {
479    writer: CodedTimeSeriesWriter<'a>,
480}
481impl<'a> CodedTimeSeriesIntWriter<'a> {
482    pub fn write_sample(&mut self, time: Time64, value: u64) -> Result<(), Error> {
483        self.writer.time_stream.write_value(time.as_u64())?;
484        self.writer.int_stream.write_value(value)?;
485        Ok(())
486    }
487    pub fn metadata<K: AsRef<str>, V: AsRef<str>>(
488        &'a mut self,
489        key: &K,
490        value: &V,
491    ) -> Result<(), Error> {
492        let k = Arc::new(key.as_ref().to_string());
493        let v = Arc::new(value.as_ref().to_string());
494        self.writer.metadata(k, v)
495    }
496    pub fn flush(&mut self) -> Result<(), Error> {
497        self.writer.flush()
498    }
499    pub fn written_stats(&self) -> Vec<String> {
500        self.writer.written_stats()
501    }
502}
503pub struct CodedTimeSeriesStrWriter<'a> {
504    writer: CodedTimeSeriesWriter<'a>,
505}
506impl<'a> CodedTimeSeriesStrWriter<'a> {
507    pub fn write_sample(&mut self, time: Time64, value: StrWrapper<'a>) -> Result<(), Error> {
508        self.writer.time_stream.write_value(time.as_u64())?;
509        self.writer.str_stream.write_value(value)?;
510        Ok(())
511    }
512    pub fn metadata<K: AsRef<str> + 'a, V: AsRef<str> + 'a>(
513        &'a mut self,
514        key: &K,
515        value: &V,
516    ) -> Result<(), Error> {
517        let key = Arc::new(key.as_ref().to_string());
518        let value = Arc::new(value.as_ref().to_string());
519        self.writer.metadata(key, value)
520    }
521    pub fn flush(&mut self) -> Result<(), Error> {
522        self.writer.flush()
523    }
524    pub fn written_stats(&self) -> Vec<String> {
525        self.writer.written_stats()
526    }
527}
528
529pub enum TimeSeriesError {
530    BitsError(BitsError),
531    MissingMetadataStream,
532    MissingFloatStream,
533    MissingIntStream,
534    MissingStrStream,
535    MissingTimeStream,
536}
537impl TimeSeriesError {
538    pub fn name(&self) -> &'static str {
539        match self {
540            TimeSeriesError::BitsError(..) => "BitsError",
541            TimeSeriesError::MissingMetadataStream => "MissingMetadataStream",
542            TimeSeriesError::MissingFloatStream => "MissingFloatStream",
543            TimeSeriesError::MissingTimeStream => "MissingTimeStream",
544            TimeSeriesError::MissingIntStream => "MissingIntStream",
545            TimeSeriesError::MissingStrStream => "MissingStrStream",
546        }
547    }
548}
549impl From<BitsError> for TimeSeriesError {
550    fn from(e: BitsError) -> Self {
551        TimeSeriesError::BitsError(e)
552    }
553}
554impl From<TimeSeriesError> for BitsError {
555    fn from(e: TimeSeriesError) -> Self {
556        match e {
557            TimeSeriesError::BitsError(e) => e,
558            _ => BitsError::new(BitsErrorKind::InvalidData, e.name()),
559        }
560    }
561}
562pub struct CodedTimeSeriesReader<'a> {
563    metadata: OrderedHashMap<String, String>,
564    float_decoder: Box<dyn Decoder<f64>>,
565    time_decoder: Box<dyn Decoder<u64>>,
566    int_decoder: Box<dyn Decoder<u64>>,
567    str_decoder: Box<dyn Decoder<StrWrapper<'a>> + 'a>,
568}
569impl<'a> CodedTimeSeriesReader<'a> {
570    pub fn new<T: AsRef<Path>>(path: T) -> Result<Self, TimeSeriesError> {
571        let mut reader = MultiStreamReader::open(path)?;
572        let mut streams = reader.drain(..);
573        let Some(meta_stream) = streams.next() else {
574            return Err(TimeSeriesError::MissingMetadataStream);
575        };
576        let mut meta_stream = InflateStream::new(BitsWrapper::Owned(meta_stream));
577        let mut metadata = OrderedHashMap::<String, String>::new();
578        while meta_stream.has_more()? {
579            let key = String::read_from_be_bits(&mut meta_stream)?;
580            let value = String::read_from_be_bits(&mut meta_stream)?;
581            metadata.insert(key, value);
582        }
583
584        let Some(time_stream) = streams.next() else {
585            return Err(TimeSeriesError::MissingTimeStream);
586        };
587        let time_stream = InflateStream::new(BitsWrapper::Owned(time_stream));
588        let time_stream = GroupDecodingStream::<u64, _>::new(BitsWrapper::Owned(time_stream));
589        let time_stream = ZigZagDecoder::new(Box::new(time_stream));
590        let time_stream = AddingDecoder::new(Box::new(time_stream));
591
592        let Some(float_stream) = streams.next() else {
593            return Err(TimeSeriesError::MissingFloatStream);
594        };
595        let float_stream = InflateStream::new(BitsWrapper::Owned(float_stream));
596        let float_stream = GroupDecodingStream::<u64, _>::new(BitsWrapper::Owned(float_stream));
597        let float_stream = U64ToF64Decoder::new(Box::new(float_stream));
598
599        let Some(int_stream) = streams.next() else {
600            return Err(TimeSeriesError::MissingIntStream);
601        };
602        let int_stream = InflateStream::new(BitsWrapper::Owned(int_stream));
603        let int_stream = GroupDecodingStream::<u64, _>::new(BitsWrapper::Owned(int_stream));
604        let int_stream = U64ToI64Decoder::new(Box::new(int_stream));
605        let int_stream = AddingDecoder::new(Box::new(int_stream));
606
607        let Some(str_stream) = streams.next() else {
608            return Err(TimeSeriesError::MissingStrStream);
609        };
610        let str_stream = InflateStream::new(BitsWrapper::Owned(str_stream));
611        let str_stream =
612            GroupDecodingStream::<StrWrapper<'a>, _>::new(BitsWrapper::Owned(str_stream));
613
614        Ok(Self {
615            metadata,
616            float_decoder: Box::new(float_stream),
617            time_decoder: Box::new(time_stream),
618            int_decoder: Box::new(int_stream),
619            str_decoder: Box::new(str_stream),
620        })
621    }
622
623    pub fn float_reader(self) -> CodedTimeSeriesFloatReader<'a> {
624        CodedTimeSeriesFloatReader {
625            reader: self,
626            last_item: None,
627        }
628    }
629    pub fn int_reader(self) -> CodedTimeSeriesIntReader<'a> {
630        CodedTimeSeriesIntReader {
631            reader: self,
632            last_item: None,
633        }
634    }
635    pub fn str_reader(self) -> CodedTimeSeriesStrReader<'a> {
636        CodedTimeSeriesStrReader {
637            reader: self,
638            last_item: None,
639        }
640    }
641
642    pub fn metadata(&self) -> impl Iterator<Item = (&String, &String)> {
643        self.metadata.iter()
644    }
645}
646pub struct CodedTimeSeriesFloatReader<'a> {
647    reader: CodedTimeSeriesReader<'a>,
648    last_item: Option<Sample64>,
649}
650impl CodedTimeSeriesFloatReader<'_> {
651    pub fn peek(&mut self) -> Result<&mut Option<Sample64>, Error> {
652        if self.last_item.is_some() {
653            Ok(&mut self.last_item)
654        } else {
655            if let Some(v) = self.next() {
656                let v = v?;
657                self.last_item = Some(v);
658            }
659            Ok(&mut self.last_item)
660        }
661    }
662}
663impl Iterator for CodedTimeSeriesFloatReader<'_> {
664    type Item = Result<Sample64, Error>;
665
666    fn next(&mut self) -> Option<Result<Sample64, Error>> {
667        let r1 = self.reader.float_decoder.next();
668        let r2 = self.reader.time_decoder.next();
669        let float = match r1 {
670            Ok(v) => v,
671            Err(e) => return Some(Err(e)),
672        };
673        let time = match r2 {
674            Ok(v) => v,
675            Err(e) => return Some(Err(e)),
676        };
677        let float = float?;
678        let time = time?;
679        let samp = Sample64 {
680            value: float,
681            time: Time64::from_unix_raw(time),
682        };
683        self.last_item = Some(samp);
684        Some(Ok(samp))
685    }
686}
687pub struct CodedTimeSeriesIntReader<'a> {
688    reader: CodedTimeSeriesReader<'a>,
689    last_item: Option<IntSample64>,
690}
691impl CodedTimeSeriesIntReader<'_> {
692    pub fn peek(&mut self) -> Result<&mut Option<IntSample64>, Error> {
693        if self.last_item.is_some() {
694            Ok(&mut self.last_item)
695        } else {
696            if let Some(v) = self.next() {
697                let v = v?;
698                self.last_item = Some(v);
699            }
700            Ok(&mut self.last_item)
701        }
702    }
703}
704impl Iterator for CodedTimeSeriesIntReader<'_> {
705    type Item = Result<IntSample64, Error>;
706
707    fn next(&mut self) -> Option<Result<IntSample64, Error>> {
708        let r1 = self.reader.int_decoder.next();
709        let r2 = self.reader.time_decoder.next();
710        let val = match r1 {
711            Ok(v) => v,
712            Err(e) => return Some(Err(e)),
713        };
714        let time = match r2 {
715            Ok(v) => v,
716            Err(e) => return Some(Err(e)),
717        };
718        let val = val?;
719        let time = time?;
720        let samp = IntSample64 {
721            value: val,
722            time: Time64::from_unix_raw(time),
723        };
724        self.last_item = Some(samp);
725        Some(Ok(samp))
726    }
727}
728pub struct CodedTimeSeriesStrReader<'a> {
729    reader: CodedTimeSeriesReader<'a>,
730    last_item: Option<StrSample64<'a>>,
731}
732impl<'a> CodedTimeSeriesStrReader<'a> {
733    pub fn peek(&mut self) -> Result<&mut Option<StrSample64<'a>>, Error> {
734        if self.last_item.is_some() {
735            Ok(&mut self.last_item)
736        } else {
737            if let Some(v) = self.next() {
738                let v = v?;
739                self.last_item = Some(v);
740            }
741            Ok(&mut self.last_item)
742        }
743    }
744}
745impl<'a> Iterator for CodedTimeSeriesStrReader<'a> {
746    type Item = Result<StrSample64<'a>, Error>;
747
748    fn next(&mut self) -> Option<Result<StrSample64<'a>, Error>> {
749        let r1 = self.reader.str_decoder.next();
750        let r2 = self.reader.time_decoder.next();
751        let val = match r1 {
752            Ok(v) => v,
753            Err(e) => return Some(Err(e)),
754        };
755        let time = match r2 {
756            Ok(v) => v,
757            Err(e) => return Some(Err(e)),
758        };
759        let val = val?;
760        let time = time?;
761        let samp = StrSample64 {
762            value: val,
763            time: Time64::from_unix_raw(time),
764        };
765        self.last_item = Some(samp.clone());
766        Some(Ok(samp))
767    }
768}
769
770#[cfg(test)]
771mod tests {
772    use crate::tsdf::Sample64;
773    use crate::tsdf::{CodedTimeSeriesReader, CodedTimeSeriesWriter};
774    use irox_bits::Error;
775    use irox_time::Time64;
776    use irox_tools::buf::UnlimitedBuffer;
777    use irox_tools::random::{Random, PRNG};
778    use irox_units::units::duration::Duration;
779    use std::time::Instant;
780
781    #[test]
782    pub fn test() -> Result<(), Error> {
783        let mut data = UnlimitedBuffer::<Sample64>::new();
784        {
785            let file = CodedTimeSeriesWriter::new("test_file.tsd")?;
786            let mut file = file.float_stream();
787            // let mut buf2 = UnlimitedBuffer::<u8>::new();
788            let mut input = Time64::now();
789            let incr = Duration::from_millis(100);
790            let start = Instant::now();
791            let count = 20_000_000u64;
792            let center = 100f64;
793            let variance = 0.001f64;
794            let mut rand = Random::default();
795            {
796                // let mut cbuf1 = CompressStream::new(BitsWrapper::Borrowed(&mut buf1));
797                // let mut cbuf2 = CompressStream::new(BitsWrapper::Borrowed(&mut buf2));
798                for _i in 0..count {
799                    let val = rand.next_in_range(0., 4096.); // 12-bit A2D
800                                                             // let val = rand.next_in_range(0., 8192.); // 13-bit A2D
801                                                             // let val = rand.next_in_range(0., 16384.); // 14-bit A2D
802                    let val = center + val.round() * variance - variance / 2f64;
803                    // let val = (_i as f64) * center + val;
804                    // println!("value: {val} // {:08X}", val.to_bits());
805                    // cbuf1.write_value(input.as_u64())?;
806                    // cbuf2.write_value(val)?;
807                    let samp = Sample64::new(input, val);
808                    data.push_back(samp);
809                    file.write_sample(&samp)?;
810                    input += incr;
811                }
812                file.flush()?;
813                // drop(cbuf1);
814                // drop(cbuf2);
815            }
816            let written = std::fs::metadata("test_file.tsd")?.len();
817            let input_size = count * 16;
818            let end = start.elapsed();
819            // irox_tools::hex::HexDump::hexdump(&buf);
820            let ratio = 1. - (written as f64 / input_size as f64);
821            let ratio = ratio * 100.;
822            let ubps = input_size as f64 / end.as_secs_f64() / 1e6;
823            println!(
824                "Turned {input_size} bytes into {written} = {ratio:02.}% reduction = {ubps:02.02}MB/s"
825            );
826            println!("{:#?}", file.written_stats());
827            drop(file);
828        }
829
830        let file = CodedTimeSeriesReader::new("test_file.tsd")?;
831        let mut file = file.float_reader();
832        let num_samps = data.len();
833        assert!(num_samps > 0);
834        let mut idx = 0;
835        loop {
836            let res = file.peek()?;
837            let Some(val) = res.take() else {
838                break;
839            };
840            let Some(v) = data.pop_front() else {
841                panic!("should not happen");
842            };
843            assert_eq!(val, v, "{idx}");
844            idx += 1;
845        }
846        assert_eq!(num_samps, idx);
847        Ok(())
848    }
849}