1use crate::sampling::{IntSample64, Sample64, StrSample64};
6use crate::streams::{
7 AddingDecoder, CompressStream, Decoder, DeltaStream, F64ToU64Stream, I64ToU64Stream,
8 InflateStream, Stream, StreamStageStats, U64ToF64Decoder, U64ToI64Decoder, ZigZagDecoder,
9 ZigZagStream,
10};
11use alloc::sync::Arc;
12use core::hash::Hash;
13use irox_bits::{
14 Bits, BitsError, BitsErrorKind, BitsWrapper, Error, MutBits, ReadFromBEBits,
15 SharedCountingBits, SharedROCounter, WriteToBEBits,
16};
17use irox_time::Time64;
18use irox_tools::buf::{Buffer, RoundBuffer};
19use irox_tools::codec::{GroupVarintCodeDecoder, GroupVarintCodeEncoder};
20use irox_tools::map::OrderedHashMap;
21use irox_tools::read::{MultiStreamReader, MultiStreamWriter, StreamWriter};
22use irox_tools::StrWrapper;
23use std::path::Path;
24
25macro_rules! new_bdc {
26 ($writer:ident) => {
27 Box::new(CompressStream::new(BitsWrapper::Owned(
28 $writer.new_stream(),
29 )))
30 };
31}
32
33pub struct EightByteStream<'a> {
36 pub(crate) fb1: Box<CompressStream<'a, StreamWriter>>,
37 pub(crate) fb2: Box<CompressStream<'a, StreamWriter>>,
38 pub(crate) fb3: Box<CompressStream<'a, StreamWriter>>,
39 pub(crate) fb4: Box<CompressStream<'a, StreamWriter>>,
40 pub(crate) fb5: Box<CompressStream<'a, StreamWriter>>,
41 pub(crate) fb6: Box<CompressStream<'a, StreamWriter>>,
42 pub(crate) fb7: Box<CompressStream<'a, StreamWriter>>,
43 pub(crate) fb8: Box<CompressStream<'a, StreamWriter>>,
44}
45impl EightByteStream<'_> {
46 pub fn new(writer: &Arc<MultiStreamWriter>) -> Self {
47 Self {
48 fb1: new_bdc!(writer),
49 fb2: new_bdc!(writer),
50 fb3: new_bdc!(writer),
51 fb4: new_bdc!(writer),
52 fb5: new_bdc!(writer),
53 fb6: new_bdc!(writer),
54 fb7: new_bdc!(writer),
55 fb8: new_bdc!(writer),
56 }
57 }
58
59 pub fn written(&self) -> u64 {
60 let mut out = 0u64;
61 out = out.wrapping_add(self.fb1.written());
62 out = out.wrapping_add(self.fb2.written());
63 out = out.wrapping_add(self.fb3.written());
64 out = out.wrapping_add(self.fb4.written());
65 out = out.wrapping_add(self.fb5.written());
66 out = out.wrapping_add(self.fb6.written());
67 out = out.wrapping_add(self.fb7.written());
68 out = out.wrapping_add(self.fb8.written());
69 out
70 }
71
72 pub fn written_stats(&self) -> [u64; 8] {
73 [
74 self.fb1.written(),
75 self.fb2.written(),
76 self.fb3.written(),
77 self.fb4.written(),
78 self.fb5.written(),
79 self.fb6.written(),
80 self.fb7.written(),
81 self.fb8.written(),
82 ]
83 }
84}
85impl Stream<u64> for EightByteStream<'_> {
86 fn write_value(&mut self, v: u64) -> Result<usize, Error> {
87 let [a, b, c, d, e, f, g, h] = v.to_be_bytes();
88 self.fb1.write_value(a as i8)?;
89 self.fb2.write_value(b as i8)?;
90 self.fb3.write_value(c as i8)?;
91 self.fb4.write_value(d as i8)?;
92 self.fb5.write_value(e as i8)?;
93 self.fb6.write_value(f as i8)?;
94 self.fb7.write_value(g as i8)?;
95 self.fb8.write_value(h as i8)?;
96 Ok(8)
97 }
98
99 fn flush(&mut self) -> Result<(), Error> {
100 self.fb1.flush()?;
101 self.fb2.flush()?;
102 self.fb3.flush()?;
103 self.fb4.flush()?;
104 self.fb5.flush()?;
105 self.fb6.flush()?;
106 self.fb7.flush()?;
107 self.fb8.flush()?;
108 Ok(())
109 }
110
111 fn written_stats(&self) -> String {
112 format!("{:?} = {}", self.written_stats(), self.written())
113 }
114}
115pub struct SPDPWriter<'a> {
118 writer: Arc<MultiStreamWriter>,
119 time_stream: EightByteStream<'a>,
120 float_stream: EightByteStream<'a>,
121 semi_last_value: f64,
122 last_value: f64,
123}
124
125pub fn rot54(value: u64) -> u64 {
135 let [_, b, c, d, e, f, g, h] = value.to_be_bytes();
136 irox_bits::FromBEBytes::from_be_bytes([0, h, g, f, e, d, c, b])
137}
138
139impl SPDPWriter<'_> {
140 pub fn new<T: AsRef<Path>>(path: T) -> Result<Self, Error> {
141 let writer = MultiStreamWriter::new(path)?;
142 let time_stream = EightByteStream::new(&writer);
143 let float_stream = EightByteStream::new(&writer);
144
145 Ok(SPDPWriter {
146 writer,
147 time_stream,
148 float_stream,
149 last_value: f64::default(),
150 semi_last_value: f64::default(),
151 })
152 }
153
154 pub fn write_sample(&mut self, sample: &Sample64) -> Result<(), Error> {
155 let Sample64 { time, value } = sample;
156 self.time_stream.write_value(time.as_u64())?;
157
158 let delta = value; self.semi_last_value = self.last_value;
160 self.last_value = *value;
161 self.float_stream.write_value(delta.to_bits())?;
162 Ok(())
163 }
164 pub fn flush(&mut self) -> Result<(), Error> {
165 self.time_stream.flush()?;
166 self.float_stream.flush()?;
167 Ok(())
168 }
169
170 pub fn len(&self) -> Result<u64, Error> {
171 self.writer.len()
172 }
173 pub fn is_empty(&self) -> Result<bool, Error> {
174 Ok(self.len()? == 0)
175 }
176}
177pub struct Rot54Stream {
180 writer: Box<dyn Stream<u64>>,
181}
182impl Rot54Stream {
183 pub fn new(writer: Box<dyn Stream<u64>>) -> Self {
184 Self { writer }
185 }
186}
187impl Stream<u64> for Rot54Stream {
188 fn write_value(&mut self, value: u64) -> Result<usize, Error> {
189 let v = rot54(value);
190 self.writer.write_value(v)
191 }
192
193 fn flush(&mut self) -> Result<(), Error> {
194 self.writer.flush()
195 }
196}
197
198pub struct FloatSplitter {
201 mantissa_writer: Box<dyn Stream<u64>>,
202 exponent_writer: Box<dyn Stream<u64>>,
203}
204impl FloatSplitter {
205 pub fn new(
206 mantissa_writer: Box<dyn Stream<u64>>,
207 exponent_writer: Box<dyn Stream<u64>>,
208 ) -> Self {
209 Self {
210 mantissa_writer,
211 exponent_writer,
212 }
213 }
214}
215impl Stream<f64> for FloatSplitter {
216 fn write_value(&mut self, value: f64) -> Result<usize, Error> {
217 let bits = value.to_bits();
218 let exponent = bits >> 52;
219 let mantissa = bits & 0xFFFFFFFFFFFFF;
220 self.mantissa_writer.write_value(mantissa)?;
221 self.exponent_writer.write_value(exponent)?;
222 Ok(8)
223 }
224
225 fn flush(&mut self) -> Result<(), Error> {
226 self.mantissa_writer.flush()?;
227 self.exponent_writer.flush()?;
228 Ok(())
229 }
230}
231
232pub struct GroupCodingStream<'a, T: Hash + Eq + Sized + Default + Clone + WriteToBEBits, B: MutBits>
235{
236 buf: RoundBuffer<4, T>,
237 inner: GroupVarintCodeEncoder<'a, T, B>,
238}
239impl<'a, T: Hash + Eq + Default + Sized + Default + Clone + WriteToBEBits, B: MutBits>
240 GroupCodingStream<'a, T, B>
241{
242 pub fn new(inner: BitsWrapper<'a, B>) -> Self {
243 Self {
244 buf: RoundBuffer::new(),
245 inner: GroupVarintCodeEncoder::new(inner),
246 }
247 }
248 pub fn counter(&self) -> SharedROCounter {
249 self.inner.counter()
250 }
251}
252impl<T: Hash + Eq + Sized + Default + Clone + WriteToBEBits, B: MutBits> Stream<T>
253 for GroupCodingStream<'_, T, B>
254{
255 fn write_value(&mut self, value: T) -> Result<usize, Error> {
256 let _ = self.buf.push_back(value);
257 if self.buf.is_full() {
258 let a = self.buf.pop_front().unwrap_or_default();
259 let b = self.buf.pop_front().unwrap_or_default();
260 let c = self.buf.pop_front().unwrap_or_default();
261 let d = self.buf.pop_front().unwrap_or_default();
262 self.inner.encode_4(&[a, b, c, d])
263 } else {
264 Ok(0)
265 }
266 }
267
268 fn flush(&mut self) -> Result<(), Error> {
269 self.inner.flush()
270 }
271}
272impl<T: Hash + Eq + Sized + Default + Clone + WriteToBEBits, B: MutBits> Drop
273 for GroupCodingStream<'_, T, B>
274{
275 fn drop(&mut self) {
276 let len = self.buf.len();
277 if len > 0 {
278 let needed = 4 - len;
279 for _ in 0..needed {
280 let _ = self.write_value(T::default());
281 }
282 }
283 let _ = self.inner.flush();
284 }
285}
286
287pub struct GroupDecodingStream<'a, T: Hash + Eq + Default, B: Bits> {
288 inner: GroupVarintCodeDecoder<'a, T, B>,
289 buf: RoundBuffer<4, T>,
290}
291impl<'a, T: Hash + Eq + Default + ReadFromBEBits + Clone, B: Bits> GroupDecodingStream<'a, T, B> {
292 pub fn new(inner: BitsWrapper<'a, B>) -> Self {
293 Self {
294 inner: GroupVarintCodeDecoder::new(inner),
295 buf: RoundBuffer::new(),
296 }
297 }
298}
299impl<T: Hash + Eq + Default + ReadFromBEBits + Clone, B: Bits> Decoder<T>
300 for GroupDecodingStream<'_, T, B>
301{
302 fn next(&mut self) -> Result<Option<T>, Error> {
303 if self.buf.is_empty() {
304 let Some(val) = self.inner.decode_4()? else {
305 return Ok(None);
306 };
307 for v in val {
308 let _ = self.buf.push_back(v.clone());
309 }
310 }
311 Ok(self.buf.pop_front())
312 }
313}
314
315pub struct CodedTimeSeriesWriter<'a> {
334 float_stream: Box<dyn Stream<f64>>,
335 time_stream: Box<dyn Stream<u64>>,
336 int_stream: Box<dyn Stream<u64>>,
337 str_stream: Box<dyn Stream<StrWrapper<'a>> + 'a>,
338 meta_stream: Box<dyn Stream<Arc<String>> + 'a>,
339 stats: StreamStageStats,
340}
341
342impl<'a> CodedTimeSeriesWriter<'a> {
343 pub fn new<T: AsRef<Path>>(path: T) -> Result<Self, Error> {
344 let mut stats = StreamStageStats::default();
345
346 let writer = MultiStreamWriter::new(path)?;
347
348 let meta_stream = {
349 let meta_stream = writer.new_stream();
350 let meta_stream = CompressStream::new(BitsWrapper::Owned(meta_stream));
351 Box::new(meta_stream)
352 };
353
354 let time_stream = {
355 let time_stream = writer.new_stream();
356 let time_stream = SharedCountingBits::new(BitsWrapper::Owned(time_stream));
357 stats.stage_counting("1.1.time_gz", time_stream.get_count());
358 let time_stream = CompressStream::new(BitsWrapper::Owned(time_stream));
359 let time_stream = SharedCountingBits::new(BitsWrapper::Owned(time_stream));
360 stats.stage_counting("1.2.time_vgb", time_stream.get_count());
361 let time_stream = GroupCodingStream::<u64, _>::new(BitsWrapper::Owned(time_stream));
362 stats.stage_counting("1.3.time_codes", time_stream.counter());
363 let time_stream = ZigZagStream::new(Box::new(time_stream));
364 let time_stream = DeltaStream::<i64>::new(Box::new(time_stream));
365 Box::new(time_stream)
366 };
367
368 let float_stream = {
369 let float_stream = writer.new_stream();
370 let float_stream = SharedCountingBits::new(BitsWrapper::Owned(float_stream));
371 stats.stage_counting("2.1.float_gz", float_stream.get_count());
372 let float_stream = CompressStream::new(BitsWrapper::Owned(float_stream));
373 let float_stream = SharedCountingBits::new(BitsWrapper::Owned(float_stream));
374 stats.stage_counting("2.2.float_vgb", float_stream.get_count());
375 let float_stream = GroupCodingStream::<u64, _>::new(BitsWrapper::Owned(float_stream));
376 stats.stage_counting("2.3.float_codes", float_stream.counter());
377 let float_stream = F64ToU64Stream::new(Box::new(float_stream));
378 Box::new(float_stream)
379 };
380
381 let int_stream = {
382 let int_stream = writer.new_stream();
383 let int_stream = SharedCountingBits::new(BitsWrapper::Owned(int_stream));
384 stats.stage_counting("3.1.int_gz", int_stream.get_count());
385 let int_stream = CompressStream::new(BitsWrapper::Owned(int_stream));
386 let int_stream = SharedCountingBits::new(BitsWrapper::Owned(int_stream));
387 stats.stage_counting("3.2.int_vgb", int_stream.get_count());
388 let int_stream = GroupCodingStream::<u64, _>::new(BitsWrapper::Owned(int_stream));
389 let int_stream = I64ToU64Stream::new(Box::new(int_stream));
390 let int_stream = DeltaStream::<i64>::new(Box::new(int_stream));
391 Box::new(int_stream)
392 };
393
394 let str_stream = {
395 let str_stream = writer.new_stream();
396 let str_stream = SharedCountingBits::new(BitsWrapper::Owned(str_stream));
397 stats.stage_counting("4.1.str_gz", str_stream.get_count());
398 let str_stream = CompressStream::new(BitsWrapper::Owned(str_stream));
399 let str_stream = GroupCodingStream::new(BitsWrapper::Owned(str_stream));
400 Box::new(str_stream)
401 };
402
403 Ok(Self {
404 float_stream,
405 time_stream,
406 meta_stream,
407 int_stream,
408 str_stream,
409 stats,
410 })
411 }
412
413 #[must_use]
414 pub fn float_stream(self) -> CodedTimeSeriesFloatWriter<'a> {
415 CodedTimeSeriesFloatWriter { writer: self }
416 }
417 #[must_use]
418 pub fn int_stream(self) -> CodedTimeSeriesIntWriter<'a> {
419 CodedTimeSeriesIntWriter { writer: self }
420 }
421
422 pub fn write_str(&mut self, time: Time64, value: StrWrapper<'a>) -> Result<(), Error> {
423 self.time_stream.write_value(time.as_u64())?;
424 self.str_stream.write_value(value)?;
425 Ok(())
426 }
427
428 pub fn flush(&mut self) -> Result<(), Error> {
429 self.time_stream.flush()?;
430 self.float_stream.flush()?;
431 self.int_stream.flush()?;
432 self.str_stream.flush()?;
433 self.meta_stream.flush()?;
434 Ok(())
435 }
436 pub fn written_stats(&self) -> Vec<String> {
437 let mut out = self.stats.stats();
438 out.push(self.meta_stream.written_stats());
439 out.push(self.time_stream.written_stats());
440 out.push(self.float_stream.written_stats());
441 out.push(self.int_stream.written_stats());
442 out.push(self.str_stream.written_stats());
443 out
444 }
445
446 pub fn metadata(&'a mut self, key: Arc<String>, value: Arc<String>) -> Result<(), Error> {
447 self.meta_stream.write_value(key)?;
448 self.meta_stream.write_value(value)?;
449 Ok(())
450 }
451}
452pub struct CodedTimeSeriesFloatWriter<'a> {
453 writer: CodedTimeSeriesWriter<'a>,
454}
455impl<'a> CodedTimeSeriesFloatWriter<'a> {
456 pub fn write_sample(&mut self, sample: &Sample64) -> Result<(), Error> {
457 let Sample64 { time, value } = sample;
458 self.writer.time_stream.write_value(time.as_u64())?;
459 self.writer.float_stream.write_value(*value)?;
460 Ok(())
461 }
462 pub fn metadata<K: AsRef<str> + 'a, V: AsRef<str> + 'a>(
463 &'a mut self,
464 key: &'a K,
465 value: &'a V,
466 ) -> Result<(), Error> {
467 let key = Arc::new(key.as_ref().to_string());
468 let value = Arc::new(value.as_ref().to_string());
469 self.writer.metadata(key, value)
470 }
471 pub fn flush(&mut self) -> Result<(), Error> {
472 self.writer.flush()
473 }
474 pub fn written_stats(&self) -> Vec<String> {
475 self.writer.written_stats()
476 }
477}
478pub struct CodedTimeSeriesIntWriter<'a> {
479 writer: CodedTimeSeriesWriter<'a>,
480}
481impl<'a> CodedTimeSeriesIntWriter<'a> {
482 pub fn write_sample(&mut self, time: Time64, value: u64) -> Result<(), Error> {
483 self.writer.time_stream.write_value(time.as_u64())?;
484 self.writer.int_stream.write_value(value)?;
485 Ok(())
486 }
487 pub fn metadata<K: AsRef<str>, V: AsRef<str>>(
488 &'a mut self,
489 key: &K,
490 value: &V,
491 ) -> Result<(), Error> {
492 let k = Arc::new(key.as_ref().to_string());
493 let v = Arc::new(value.as_ref().to_string());
494 self.writer.metadata(k, v)
495 }
496 pub fn flush(&mut self) -> Result<(), Error> {
497 self.writer.flush()
498 }
499 pub fn written_stats(&self) -> Vec<String> {
500 self.writer.written_stats()
501 }
502}
503pub struct CodedTimeSeriesStrWriter<'a> {
504 writer: CodedTimeSeriesWriter<'a>,
505}
506impl<'a> CodedTimeSeriesStrWriter<'a> {
507 pub fn write_sample(&mut self, time: Time64, value: StrWrapper<'a>) -> Result<(), Error> {
508 self.writer.time_stream.write_value(time.as_u64())?;
509 self.writer.str_stream.write_value(value)?;
510 Ok(())
511 }
512 pub fn metadata<K: AsRef<str> + 'a, V: AsRef<str> + 'a>(
513 &'a mut self,
514 key: &K,
515 value: &V,
516 ) -> Result<(), Error> {
517 let key = Arc::new(key.as_ref().to_string());
518 let value = Arc::new(value.as_ref().to_string());
519 self.writer.metadata(key, value)
520 }
521 pub fn flush(&mut self) -> Result<(), Error> {
522 self.writer.flush()
523 }
524 pub fn written_stats(&self) -> Vec<String> {
525 self.writer.written_stats()
526 }
527}
528
529pub enum TimeSeriesError {
530 BitsError(BitsError),
531 MissingMetadataStream,
532 MissingFloatStream,
533 MissingIntStream,
534 MissingStrStream,
535 MissingTimeStream,
536}
537impl TimeSeriesError {
538 pub fn name(&self) -> &'static str {
539 match self {
540 TimeSeriesError::BitsError(..) => "BitsError",
541 TimeSeriesError::MissingMetadataStream => "MissingMetadataStream",
542 TimeSeriesError::MissingFloatStream => "MissingFloatStream",
543 TimeSeriesError::MissingTimeStream => "MissingTimeStream",
544 TimeSeriesError::MissingIntStream => "MissingIntStream",
545 TimeSeriesError::MissingStrStream => "MissingStrStream",
546 }
547 }
548}
549impl From<BitsError> for TimeSeriesError {
550 fn from(e: BitsError) -> Self {
551 TimeSeriesError::BitsError(e)
552 }
553}
554impl From<TimeSeriesError> for BitsError {
555 fn from(e: TimeSeriesError) -> Self {
556 match e {
557 TimeSeriesError::BitsError(e) => e,
558 _ => BitsError::new(BitsErrorKind::InvalidData, e.name()),
559 }
560 }
561}
562pub struct CodedTimeSeriesReader<'a> {
563 metadata: OrderedHashMap<String, String>,
564 float_decoder: Box<dyn Decoder<f64>>,
565 time_decoder: Box<dyn Decoder<u64>>,
566 int_decoder: Box<dyn Decoder<u64>>,
567 str_decoder: Box<dyn Decoder<StrWrapper<'a>> + 'a>,
568}
569impl<'a> CodedTimeSeriesReader<'a> {
570 pub fn new<T: AsRef<Path>>(path: T) -> Result<Self, TimeSeriesError> {
571 let mut reader = MultiStreamReader::open(path)?;
572 let mut streams = reader.drain(..);
573 let Some(meta_stream) = streams.next() else {
574 return Err(TimeSeriesError::MissingMetadataStream);
575 };
576 let mut meta_stream = InflateStream::new(BitsWrapper::Owned(meta_stream));
577 let mut metadata = OrderedHashMap::<String, String>::new();
578 while meta_stream.has_more()? {
579 let key = String::read_from_be_bits(&mut meta_stream)?;
580 let value = String::read_from_be_bits(&mut meta_stream)?;
581 metadata.insert(key, value);
582 }
583
584 let Some(time_stream) = streams.next() else {
585 return Err(TimeSeriesError::MissingTimeStream);
586 };
587 let time_stream = InflateStream::new(BitsWrapper::Owned(time_stream));
588 let time_stream = GroupDecodingStream::<u64, _>::new(BitsWrapper::Owned(time_stream));
589 let time_stream = ZigZagDecoder::new(Box::new(time_stream));
590 let time_stream = AddingDecoder::new(Box::new(time_stream));
591
592 let Some(float_stream) = streams.next() else {
593 return Err(TimeSeriesError::MissingFloatStream);
594 };
595 let float_stream = InflateStream::new(BitsWrapper::Owned(float_stream));
596 let float_stream = GroupDecodingStream::<u64, _>::new(BitsWrapper::Owned(float_stream));
597 let float_stream = U64ToF64Decoder::new(Box::new(float_stream));
598
599 let Some(int_stream) = streams.next() else {
600 return Err(TimeSeriesError::MissingIntStream);
601 };
602 let int_stream = InflateStream::new(BitsWrapper::Owned(int_stream));
603 let int_stream = GroupDecodingStream::<u64, _>::new(BitsWrapper::Owned(int_stream));
604 let int_stream = U64ToI64Decoder::new(Box::new(int_stream));
605 let int_stream = AddingDecoder::new(Box::new(int_stream));
606
607 let Some(str_stream) = streams.next() else {
608 return Err(TimeSeriesError::MissingStrStream);
609 };
610 let str_stream = InflateStream::new(BitsWrapper::Owned(str_stream));
611 let str_stream =
612 GroupDecodingStream::<StrWrapper<'a>, _>::new(BitsWrapper::Owned(str_stream));
613
614 Ok(Self {
615 metadata,
616 float_decoder: Box::new(float_stream),
617 time_decoder: Box::new(time_stream),
618 int_decoder: Box::new(int_stream),
619 str_decoder: Box::new(str_stream),
620 })
621 }
622
623 pub fn float_reader(self) -> CodedTimeSeriesFloatReader<'a> {
624 CodedTimeSeriesFloatReader {
625 reader: self,
626 last_item: None,
627 }
628 }
629 pub fn int_reader(self) -> CodedTimeSeriesIntReader<'a> {
630 CodedTimeSeriesIntReader {
631 reader: self,
632 last_item: None,
633 }
634 }
635 pub fn str_reader(self) -> CodedTimeSeriesStrReader<'a> {
636 CodedTimeSeriesStrReader {
637 reader: self,
638 last_item: None,
639 }
640 }
641
642 pub fn metadata(&self) -> impl Iterator<Item = (&String, &String)> {
643 self.metadata.iter()
644 }
645}
646pub struct CodedTimeSeriesFloatReader<'a> {
647 reader: CodedTimeSeriesReader<'a>,
648 last_item: Option<Sample64>,
649}
650impl CodedTimeSeriesFloatReader<'_> {
651 pub fn peek(&mut self) -> Result<&mut Option<Sample64>, Error> {
652 if self.last_item.is_some() {
653 Ok(&mut self.last_item)
654 } else {
655 if let Some(v) = self.next() {
656 let v = v?;
657 self.last_item = Some(v);
658 }
659 Ok(&mut self.last_item)
660 }
661 }
662}
663impl Iterator for CodedTimeSeriesFloatReader<'_> {
664 type Item = Result<Sample64, Error>;
665
666 fn next(&mut self) -> Option<Result<Sample64, Error>> {
667 let r1 = self.reader.float_decoder.next();
668 let r2 = self.reader.time_decoder.next();
669 let float = match r1 {
670 Ok(v) => v,
671 Err(e) => return Some(Err(e)),
672 };
673 let time = match r2 {
674 Ok(v) => v,
675 Err(e) => return Some(Err(e)),
676 };
677 let float = float?;
678 let time = time?;
679 let samp = Sample64 {
680 value: float,
681 time: Time64::from_unix_raw(time),
682 };
683 self.last_item = Some(samp);
684 Some(Ok(samp))
685 }
686}
687pub struct CodedTimeSeriesIntReader<'a> {
688 reader: CodedTimeSeriesReader<'a>,
689 last_item: Option<IntSample64>,
690}
691impl CodedTimeSeriesIntReader<'_> {
692 pub fn peek(&mut self) -> Result<&mut Option<IntSample64>, Error> {
693 if self.last_item.is_some() {
694 Ok(&mut self.last_item)
695 } else {
696 if let Some(v) = self.next() {
697 let v = v?;
698 self.last_item = Some(v);
699 }
700 Ok(&mut self.last_item)
701 }
702 }
703}
704impl Iterator for CodedTimeSeriesIntReader<'_> {
705 type Item = Result<IntSample64, Error>;
706
707 fn next(&mut self) -> Option<Result<IntSample64, Error>> {
708 let r1 = self.reader.int_decoder.next();
709 let r2 = self.reader.time_decoder.next();
710 let val = match r1 {
711 Ok(v) => v,
712 Err(e) => return Some(Err(e)),
713 };
714 let time = match r2 {
715 Ok(v) => v,
716 Err(e) => return Some(Err(e)),
717 };
718 let val = val?;
719 let time = time?;
720 let samp = IntSample64 {
721 value: val,
722 time: Time64::from_unix_raw(time),
723 };
724 self.last_item = Some(samp);
725 Some(Ok(samp))
726 }
727}
728pub struct CodedTimeSeriesStrReader<'a> {
729 reader: CodedTimeSeriesReader<'a>,
730 last_item: Option<StrSample64<'a>>,
731}
732impl<'a> CodedTimeSeriesStrReader<'a> {
733 pub fn peek(&mut self) -> Result<&mut Option<StrSample64<'a>>, Error> {
734 if self.last_item.is_some() {
735 Ok(&mut self.last_item)
736 } else {
737 if let Some(v) = self.next() {
738 let v = v?;
739 self.last_item = Some(v);
740 }
741 Ok(&mut self.last_item)
742 }
743 }
744}
745impl<'a> Iterator for CodedTimeSeriesStrReader<'a> {
746 type Item = Result<StrSample64<'a>, Error>;
747
748 fn next(&mut self) -> Option<Result<StrSample64<'a>, Error>> {
749 let r1 = self.reader.str_decoder.next();
750 let r2 = self.reader.time_decoder.next();
751 let val = match r1 {
752 Ok(v) => v,
753 Err(e) => return Some(Err(e)),
754 };
755 let time = match r2 {
756 Ok(v) => v,
757 Err(e) => return Some(Err(e)),
758 };
759 let val = val?;
760 let time = time?;
761 let samp = StrSample64 {
762 value: val,
763 time: Time64::from_unix_raw(time),
764 };
765 self.last_item = Some(samp.clone());
766 Some(Ok(samp))
767 }
768}
769
770#[cfg(test)]
771mod tests {
772 use crate::tsdf::Sample64;
773 use crate::tsdf::{CodedTimeSeriesReader, CodedTimeSeriesWriter};
774 use irox_bits::Error;
775 use irox_time::Time64;
776 use irox_tools::buf::UnlimitedBuffer;
777 use irox_tools::random::{Random, PRNG};
778 use irox_units::units::duration::Duration;
779 use std::time::Instant;
780
781 #[test]
782 pub fn test() -> Result<(), Error> {
783 let mut data = UnlimitedBuffer::<Sample64>::new();
784 {
785 let file = CodedTimeSeriesWriter::new("test_file.tsd")?;
786 let mut file = file.float_stream();
787 let mut input = Time64::now();
789 let incr = Duration::from_millis(100);
790 let start = Instant::now();
791 let count = 20_000_000u64;
792 let center = 100f64;
793 let variance = 0.001f64;
794 let mut rand = Random::default();
795 {
796 for _i in 0..count {
799 let val = rand.next_in_range(0., 4096.); let val = center + val.round() * variance - variance / 2f64;
803 let samp = Sample64::new(input, val);
808 data.push_back(samp);
809 file.write_sample(&samp)?;
810 input += incr;
811 }
812 file.flush()?;
813 }
816 let written = std::fs::metadata("test_file.tsd")?.len();
817 let input_size = count * 16;
818 let end = start.elapsed();
819 let ratio = 1. - (written as f64 / input_size as f64);
821 let ratio = ratio * 100.;
822 let ubps = input_size as f64 / end.as_secs_f64() / 1e6;
823 println!(
824 "Turned {input_size} bytes into {written} = {ratio:02.}% reduction = {ubps:02.02}MB/s"
825 );
826 println!("{:#?}", file.written_stats());
827 drop(file);
828 }
829
830 let file = CodedTimeSeriesReader::new("test_file.tsd")?;
831 let mut file = file.float_reader();
832 let num_samps = data.len();
833 assert!(num_samps > 0);
834 let mut idx = 0;
835 loop {
836 let res = file.peek()?;
837 let Some(val) = res.take() else {
838 break;
839 };
840 let Some(v) = data.pop_front() else {
841 panic!("should not happen");
842 };
843 assert_eq!(val, v, "{idx}");
844 idx += 1;
845 }
846 assert_eq!(num_samps, idx);
847 Ok(())
848 }
849}