1use crate::data::{ColumnData, CursorPosition, Sample};
2use crate::tio::proto::identifiers::*;
3use crate::tio::proto::meta::MetadataEpoch;
4use crate::tio::proto::{BufferType, ColumnMetadata, DeviceRoute, SegmentMetadata, StreamMetadata};
5
6use std::{
7 collections::{HashMap, VecDeque},
8 sync::Arc,
9};
10
11pub type RunId = u64;
12
13#[derive(Debug, Clone)]
14pub enum ColumnBatch {
15 F64(Vec<f64>),
16 I64(Vec<i64>),
17 U64(Vec<u64>),
18}
19
20impl ColumnBatch {
21 pub fn len(&self) -> usize {
22 match self {
23 Self::F64(v) => v.len(),
24 Self::I64(v) => v.len(),
25 Self::U64(v) => v.len(),
26 }
27 }
28
29 pub fn is_empty(&self) -> bool {
30 self.len() == 0
31 }
32}
33
34#[derive(Debug, Clone)]
35pub struct AlignedWindow {
36 pub sample_numbers: HashMap<StreamKey, Vec<SampleNumber>>,
37 pub timestamps: Vec<f64>,
38 pub columns: HashMap<ColumnKey, ColumnBatch>,
39 pub stream_metadata: HashMap<StreamKey, Arc<StreamMetadata>>,
40 pub segment_metadata: HashMap<StreamKey, Arc<SegmentMetadata>>,
41 pub column_metadata: HashMap<ColumnKey, Arc<ColumnMetadata>>,
42 pub session_ids: HashMap<StreamKey, SessionId>,
43 pub run_ids: HashMap<StreamKey, RunId>,
44}
45
46#[derive(Debug, thiserror::Error)]
47pub enum ReadError {
48 #[error("no columns requested")]
49 NoColumnsRequested,
50 #[error("no cursor for stream {stream_key:?}")]
51 NoCursorForStream { stream_key: StreamKey },
52 #[error("no active run for stream {stream_key:?}")]
53 NoActiveRun { stream_key: StreamKey },
54 #[error(
55 "insufficient data for stream {stream_key:?}: requested {requested}, available {available}"
56 )]
57 InsufficientData {
58 stream_key: StreamKey,
59 requested: usize,
60 available: usize,
61 },
62 #[error("no data in time range [{requested_start}, {requested_end}]")]
63 NoDataInTimeRange {
64 requested_start: f64,
65 requested_end: f64,
66 },
67 #[error("requested range [{requested_start}, {requested_end}] exceeds retention window [{available_start}, {available_end}]")]
68 RequestedRangeExceedsRetention {
69 requested_start: f64,
70 requested_end: f64,
71 available_start: f64,
72 available_end: f64,
73 },
74 #[error("column {column_id:?} not found in stream {stream_key:?}")]
75 ColumnNotFound {
76 stream_key: StreamKey,
77 column_id: ColumnId,
78 },
79 #[error("sampling rate mismatch across streams {streams:?} at rates {rates:?}")]
80 SamplingRateMismatch {
81 streams: Vec<StreamKey>,
82 rates: Vec<f64>,
83 },
84 #[error("cursor invalidated for stream {stream_key:?}: cursor at run {cursor_run:?}, current run is {current_run:?}")]
85 CursorInvalidated {
86 stream_key: StreamKey,
87 cursor_run: RunId,
88 current_run: RunId,
89 },
90 #[error("cursor out of buffer for stream {stream_key:?}: at sample {cursor_sample:?}, earliest available is {earliest_available:?}")]
91 CursorOutOfBuffer {
92 stream_key: StreamKey,
93 cursor_sample: SampleNumber,
94 earliest_available: SampleNumber,
95 },
96}
97
98#[derive(Debug)]
99enum ColumnBuffer {
100 F64 {
101 metadata: Arc<ColumnMetadata>,
102 data: VecDeque<f64>,
103 },
104 I64 {
105 metadata: Arc<ColumnMetadata>,
106 data: VecDeque<i64>,
107 },
108 U64 {
109 metadata: Arc<ColumnMetadata>,
110 data: VecDeque<u64>,
111 },
112}
113
114impl ColumnBuffer {
115 fn new(metadata: Arc<ColumnMetadata>, capacity: usize) -> Self {
116 let alloc = capacity.min(65_536);
117 match metadata.data_type.buffer_type() {
118 BufferType::Float => Self::F64 {
119 metadata,
120 data: VecDeque::with_capacity(alloc),
121 },
122 BufferType::Int => Self::I64 {
123 metadata,
124 data: VecDeque::with_capacity(alloc),
125 },
126 BufferType::UInt => Self::U64 {
127 metadata,
128 data: VecDeque::with_capacity(alloc),
129 },
130 }
131 }
132
133 fn metadata(&self) -> &Arc<ColumnMetadata> {
134 match self {
135 Self::F64 { metadata, .. }
136 | Self::I64 { metadata, .. }
137 | Self::U64 { metadata, .. } => metadata,
138 }
139 }
140
141 fn push(&mut self, value: ColumnData) {
142 match (self, value) {
143 (Self::F64 { data, .. }, ColumnData::Float(v)) => data.push_back(v),
144 (Self::F64 { data, .. }, ColumnData::Int(v)) => data.push_back(v as f64),
145 (Self::I64 { data, .. }, ColumnData::Int(v)) => data.push_back(v),
146 (Self::U64 { data, .. }, ColumnData::UInt(v)) => data.push_back(v),
147 _ => {}
148 }
149 }
150
151 fn pop_front(&mut self) {
152 match self {
153 Self::F64 { data, .. } => {
154 data.pop_front();
155 }
156 Self::I64 { data, .. } => {
157 data.pop_front();
158 }
159 Self::U64 { data, .. } => {
160 data.pop_front();
161 }
162 }
163 }
164
165 fn get_range(&self, start: usize, count: usize) -> ColumnBatch {
166 match self {
167 Self::F64 { data, .. } => {
168 ColumnBatch::F64(data.iter().skip(start).take(count).copied().collect())
169 }
170 Self::I64 { data, .. } => {
171 ColumnBatch::I64(data.iter().skip(start).take(count).copied().collect())
172 }
173 Self::U64 { data, .. } => {
174 ColumnBatch::U64(data.iter().skip(start).take(count).copied().collect())
175 }
176 }
177 }
178}
179
180struct RunBuffer {
181 run_id: RunId,
182 session_id: SessionId,
183 stream_metadata: Arc<StreamMetadata>,
184 segment_metadata: Arc<SegmentMetadata>,
185 sample_numbers: VecDeque<SampleNumber>,
186 timestamps: VecDeque<f64>,
187 columns: HashMap<ColumnId, ColumnBuffer>,
188 capacity: usize,
189}
190
191impl RunBuffer {
192 fn new(run_id: RunId, sample: &Sample, capacity: usize) -> Self {
193 let alloc = capacity.min(65_536);
194 Self {
195 run_id,
196 session_id: sample.device.session_id,
197 stream_metadata: sample.stream.clone(),
198 segment_metadata: sample.segment.clone(),
199 sample_numbers: VecDeque::with_capacity(alloc),
200 timestamps: VecDeque::with_capacity(alloc),
201 columns: HashMap::new(),
202 capacity,
203 }
204 }
205
206 fn len(&self) -> usize {
207 self.sample_numbers.len()
208 }
209
210 fn push(&mut self, sample: &Sample) {
211 self.sample_numbers.push_back(sample.n);
212 self.timestamps.push_back(sample.timestamp_end());
213 self.segment_metadata = sample.segment.clone();
214
215 for col in &sample.columns {
216 self.columns
217 .entry(col.desc.index)
218 .or_insert_with(|| ColumnBuffer::new(col.desc.clone(), self.capacity))
219 .push(col.value.clone());
220 }
221 }
222
223 fn pop_front(&mut self) {
224 self.sample_numbers.pop_front();
225 self.timestamps.pop_front();
226 for col in self.columns.values_mut() {
227 col.pop_front();
228 }
229 }
230
231 fn sample_number_wraps(&self) -> bool {
232 match (self.sample_numbers.front(), self.sample_numbers.back()) {
233 (Some(first), Some(last)) => first > last,
234 _ => false,
235 }
236 }
237
238 fn find_start_after_sample(&self, sample_number: SampleNumber) -> Option<usize> {
239 if self.sample_numbers.is_empty() {
240 return None;
241 }
242
243 if !self.sample_number_wraps() {
244 let start = self
245 .sample_numbers
246 .partition_point(|&sn| sn <= sample_number);
247 if start == 0 || self.sample_numbers.get(start - 1).copied()? != sample_number {
248 return None;
249 }
250 return Some(start);
251 }
252
253 self.sample_numbers
254 .iter()
255 .rposition(|&sn| sn == sample_number)
256 .map(|idx| idx + 1)
257 }
258
259 fn timestamps_range(&self, start: usize, count: usize) -> Vec<f64> {
260 self.timestamps
261 .iter()
262 .skip(start)
263 .take(count)
264 .copied()
265 .collect()
266 }
267}
268
269pub struct ActiveRun {
270 pub run_id: RunId,
271 pub session_id: SessionId,
272 pub segment_id: SegmentId,
273 pub effective_rate: f64,
274 pub time_ref_epoch: MetadataEpoch,
275 pub last_sample_number: SampleNumber,
276 pub last_timestamp: f64,
277 buffer: RunBuffer,
278}
279
280impl ActiveRun {
281 fn new(run_id: RunId, sample: &Sample, capacity: usize) -> Self {
282 let segment = &sample.segment;
283 Self {
284 run_id,
285 session_id: sample.device.session_id,
286 segment_id: segment.segment_id,
287 effective_rate: segment.sampling_rate as f64 / segment.decimation as f64,
288 time_ref_epoch: segment.time_ref_epoch.clone(),
289 last_sample_number: sample.n,
290 last_timestamp: sample.timestamp_end(),
291 buffer: RunBuffer::new(run_id, sample, capacity),
292 }
293 }
294}
295
296pub struct Buffer {
297 capacity: usize,
298 active_runs: HashMap<StreamKey, ActiveRun>,
299 next_run_id: RunId,
300}
301
302enum AlignmentMode<'a> {
303 LastN(usize),
304 FromCursors {
305 cursors: &'a HashMap<StreamKey, CursorPosition>,
306 n: usize,
307 },
308 CommonTail,
309 TimeRange {
310 start: f64,
311 end: f64,
312 },
313}
314
315impl Buffer {
316 pub fn new(capacity: usize) -> Self {
317 Self {
318 capacity,
319 active_runs: HashMap::new(),
320 next_run_id: 0,
321 }
322 }
323
324 pub fn process_sample(&mut self, sample: Sample, stream_key: StreamKey) {
325 let needs_new_run = !sample.is_continuous() || !self.active_runs.contains_key(&stream_key);
326
327 if needs_new_run {
328 let new_run_id = self.next_run_id;
329 self.next_run_id += 1;
330 self.active_runs.insert(
331 stream_key.clone(),
332 ActiveRun::new(new_run_id, &sample, self.capacity),
333 );
334 }
335
336 let active = self.active_runs.get_mut(&stream_key).unwrap();
337 active.buffer.push(&sample);
338 active.last_sample_number = sample.n;
339 active.last_timestamp = sample.timestamp_end();
340 active.segment_id = sample.segment.segment_id;
341
342 if active.buffer.len() > self.capacity {
343 active.buffer.pop_front();
344 }
345 }
346
347 pub fn get_run(&self, stream_key: &StreamKey) -> Option<&ActiveRun> {
348 self.active_runs.get(stream_key)
349 }
350
351 pub fn read_aligned_window(
352 &self,
353 columns: &[ColumnKey],
354 n: usize,
355 ) -> Result<AlignedWindow, ReadError> {
356 let by_stream = self.prepare_stream_selection(columns)?;
357 let (slices, timestamps) =
358 self.compute_aligned_slices(&by_stream, AlignmentMode::LastN(n))?;
359 self.build_window_from_slices(&by_stream, &slices, timestamps)
360 }
361
362 pub fn read_from_cursor(
363 &self,
364 columns: &[ColumnKey],
365 cursors: &HashMap<StreamKey, CursorPosition>,
366 n: usize,
367 ) -> Result<AlignedWindow, ReadError> {
368 let by_stream = self.prepare_stream_selection(columns)?;
369 let (slices, timestamps) =
370 self.compute_aligned_slices(&by_stream, AlignmentMode::FromCursors { cursors, n })?;
371 self.build_window_from_slices(&by_stream, &slices, timestamps)
372 }
373
374 pub fn read_aligned_tail(&self, columns: &[ColumnKey]) -> Result<AlignedWindow, ReadError> {
375 let by_stream = self.prepare_stream_selection(columns)?;
376 let (slices, timestamps) =
377 self.compute_aligned_slices(&by_stream, AlignmentMode::CommonTail)?;
378 self.build_window_from_slices(&by_stream, &slices, timestamps)
379 }
380
381 pub fn read_aligned_time_range(
382 &self,
383 columns: &[ColumnKey],
384 start_time: f64,
385 end_time: f64,
386 ) -> Result<AlignedWindow, ReadError> {
387 let by_stream = self.prepare_stream_selection(columns)?;
388 let (slices, timestamps) = self.compute_aligned_slices(
389 &by_stream,
390 AlignmentMode::TimeRange {
391 start: start_time,
392 end: end_time,
393 },
394 )?;
395 self.build_window_from_slices(&by_stream, &slices, timestamps)
396 .map_err(|err| match err {
397 ReadError::InsufficientData { .. } => ReadError::NoDataInTimeRange {
398 requested_start: start_time.min(end_time),
399 requested_end: start_time.max(end_time),
400 },
401 other => other,
402 })
403 }
404
405 fn compute_aligned_slices(
406 &self,
407 by_stream: &HashMap<StreamKey, Vec<ColumnId>>,
408 mode: AlignmentMode<'_>,
409 ) -> Result<(HashMap<StreamKey, (usize, usize)>, Vec<f64>), ReadError> {
410 match mode {
411 AlignmentMode::LastN(n) => {
412 let ref_key = Self::reference_stream_key(by_stream);
413 let ref_buf = self.active_buffer(ref_key)?;
414 let available = ref_buf.len();
415 if available == 0 {
416 return Err(ReadError::InsufficientData {
417 stream_key: ref_key.clone(),
418 requested: n,
419 available: 0,
420 });
421 }
422 let count = n.min(available);
423 let start = available.saturating_sub(count);
424 let timestamps = ref_buf.timestamps_range(start, count);
425 let slices = by_stream
426 .keys()
427 .map(|k| (k.clone(), (start, count)))
428 .collect();
429 Ok((slices, timestamps))
430 }
431
432 AlignmentMode::FromCursors { cursors, n } => {
433 let mut start = 0;
434 let mut reference_key: Option<StreamKey> = None;
435
436 for stream_key in by_stream.keys() {
437 let active = self.active_run(stream_key)?;
438 let cursor = cursors
439 .get(stream_key)
440 .ok_or(ReadError::NoCursorForStream {
441 stream_key: stream_key.clone(),
442 })?;
443 if cursor.run_id != active.run_id {
444 return Err(ReadError::CursorInvalidated {
445 stream_key: stream_key.clone(),
446 cursor_run: cursor.run_id,
447 current_run: active.run_id,
448 });
449 }
450 let buf = &active.buffer;
451 if buf.sample_numbers.is_empty() {
452 return Err(ReadError::InsufficientData {
453 stream_key: stream_key.clone(),
454 requested: n,
455 available: 0,
456 });
457 }
458 let s = buf
459 .find_start_after_sample(cursor.last_sample_number)
460 .ok_or(ReadError::CursorOutOfBuffer {
461 stream_key: stream_key.clone(),
462 cursor_sample: cursor.last_sample_number,
463 earliest_available: *buf.sample_numbers.front().unwrap(),
464 })?;
465 if s + n > buf.len() {
466 return Err(ReadError::InsufficientData {
467 stream_key: stream_key.clone(),
468 requested: n,
469 available: buf.len().saturating_sub(s),
470 });
471 }
472 if reference_key.is_none() {
473 start = s;
474 reference_key = Some(stream_key.clone());
475 }
476 }
477
478 let ref_key = reference_key.unwrap();
479 let ref_buf = self.active_buffer(&ref_key)?;
480 let timestamps = ref_buf.timestamps_range(start, n);
481 let slices = by_stream.keys().map(|k| (k.clone(), (start, n))).collect();
482 Ok((slices, timestamps))
483 }
484
485 AlignmentMode::CommonTail => {
486 let mut global_start = f64::MIN;
487 let mut global_end = f64::MAX;
488
489 for stream_key in by_stream.keys() {
490 let buf = self.active_buffer(stream_key)?;
491 if buf.timestamps.is_empty() {
492 return Err(ReadError::InsufficientData {
493 stream_key: stream_key.clone(),
494 requested: 0,
495 available: 0,
496 });
497 }
498 let first = *buf.timestamps.front().unwrap();
499 let last = *buf.timestamps.back().unwrap();
500 global_start = global_start.max(first);
501 global_end = global_end.min(last);
502 }
503
504 if global_start >= global_end {
505 return Err(ReadError::InsufficientData {
506 stream_key: StreamKey::new(DeviceRoute::root(), 0),
507 requested: 0,
508 available: 0,
509 });
510 }
511
512 let ref_key = Self::reference_stream_key(by_stream);
513 let ref_buf = self.active_buffer(ref_key)?;
514 let start = ref_buf
515 .timestamps
516 .iter()
517 .position(|&t| t >= global_start)
518 .unwrap_or(0);
519 let end = ref_buf
520 .timestamps
521 .iter()
522 .rposition(|&t| t <= global_end)
523 .unwrap_or(ref_buf.len().saturating_sub(1));
524 let count = end.saturating_sub(start) + 1;
525 let timestamps = ref_buf.timestamps_range(start, count);
526 let slices = by_stream
527 .keys()
528 .map(|k| (k.clone(), (start, count)))
529 .collect();
530 Ok((slices, timestamps))
531 }
532
533 AlignmentMode::TimeRange {
534 start: start_time,
535 end: end_time,
536 } => {
537 let (requested_start, requested_end) = normalize_time_bounds(start_time, end_time);
538
539 let (available_start, available_end) = self
540 .aligned_retained_time_bounds(by_stream)?
541 .ok_or(ReadError::NoDataInTimeRange {
542 requested_start,
543 requested_end,
544 })?;
545
546 if requested_start < available_start || requested_end > available_end {
547 return Err(ReadError::RequestedRangeExceedsRetention {
548 requested_start,
549 requested_end,
550 available_start,
551 available_end,
552 });
553 }
554
555 let ref_key = Self::reference_stream_key(by_stream);
556 let ref_buf = self.active_buffer(ref_key)?;
557 let ref_start = ref_buf.timestamps.partition_point(|&t| t < requested_start);
558 let ref_end = ref_buf.timestamps.partition_point(|&t| t <= requested_end);
559 if ref_start >= ref_end {
560 return Err(ReadError::NoDataInTimeRange {
561 requested_start,
562 requested_end,
563 });
564 }
565
566 let ref_count = ref_end - ref_start;
567 let timestamps = ref_buf.timestamps_range(ref_start, ref_count);
568 if timestamps.is_empty() {
569 return Err(ReadError::NoDataInTimeRange {
570 requested_start,
571 requested_end,
572 });
573 }
574
575 let mut slices = HashMap::new();
576 slices.insert(ref_key.clone(), (ref_start, ref_count));
577
578 for stream_key in by_stream.keys() {
579 if stream_key == ref_key {
580 continue;
581 }
582 let buf = self.active_buffer(stream_key)?;
583 let s = buf.timestamps.partition_point(|&t| t < requested_start);
584 let e = buf.timestamps.partition_point(|&t| t <= requested_end);
585 if s >= e {
586 return Err(ReadError::NoDataInTimeRange {
587 requested_start,
588 requested_end,
589 });
590 }
591 let count = e - s;
592 if count != ref_count {
593 return Err(ReadError::NoDataInTimeRange {
594 requested_start,
595 requested_end,
596 });
597 }
598 let stream_timestamps = buf.timestamps.iter().skip(s).take(count).copied();
599 if !timestamps_match_iter(×tamps, stream_timestamps) {
600 return Err(ReadError::NoDataInTimeRange {
601 requested_start,
602 requested_end,
603 });
604 }
605 slices.insert(stream_key.clone(), (s, count));
606 }
607
608 Ok((slices, timestamps))
609 }
610 }
611 }
612
613 fn aligned_retained_time_bounds(
614 &self,
615 by_stream: &HashMap<StreamKey, Vec<ColumnId>>,
616 ) -> Result<Option<(f64, f64)>, ReadError> {
617 let mut global_start = f64::MIN;
618 let mut global_end = f64::MAX;
619
620 for stream_key in by_stream.keys() {
621 let buf = self.active_buffer(stream_key)?;
622 let (Some(&first), Some(&last)) = (buf.timestamps.front(), buf.timestamps.back())
623 else {
624 return Ok(None);
625 };
626
627 global_start = global_start.max(first);
628 global_end = global_end.min(last);
629 }
630
631 if global_start > global_end {
632 return Ok(None);
633 }
634
635 Ok(Some((global_start, global_end)))
636 }
637
638 fn build_window_from_slices(
639 &self,
640 by_stream: &HashMap<StreamKey, Vec<ColumnId>>,
641 slices: &HashMap<StreamKey, (usize, usize)>,
642 timestamps: Vec<f64>,
643 ) -> Result<AlignedWindow, ReadError> {
644 let expected_len = timestamps.len();
645 let mut sample_numbers = HashMap::new();
646 let mut columns = HashMap::new();
647 let mut stream_metadata = HashMap::new();
648 let mut segment_metadata = HashMap::new();
649 let mut column_metadata = HashMap::new();
650 let mut session_ids = HashMap::new();
651 let mut run_ids = HashMap::new();
652
653 for (stream_key, col_ids) in by_stream {
654 let buf = self.active_buffer(stream_key)?;
655 let (start, count) =
656 slices
657 .get(stream_key)
658 .copied()
659 .ok_or(ReadError::InsufficientData {
660 stream_key: stream_key.clone(),
661 requested: expected_len,
662 available: 0,
663 })?;
664
665 let stream_sample_numbers: Vec<_> = buf
666 .sample_numbers
667 .iter()
668 .skip(start)
669 .take(count)
670 .copied()
671 .collect();
672 if stream_sample_numbers.len() != expected_len {
673 return Err(ReadError::InsufficientData {
674 stream_key: stream_key.clone(),
675 requested: expected_len,
676 available: stream_sample_numbers.len(),
677 });
678 }
679 sample_numbers.insert(stream_key.clone(), stream_sample_numbers);
680
681 stream_metadata.insert(stream_key.clone(), buf.stream_metadata.clone());
682 segment_metadata.insert(stream_key.clone(), buf.segment_metadata.clone());
683 session_ids.insert(stream_key.clone(), buf.session_id);
684 run_ids.insert(stream_key.clone(), buf.run_id);
685
686 for &col_id in col_ids {
687 let col_buf = buf.columns.get(&col_id).ok_or(ReadError::ColumnNotFound {
688 stream_key: stream_key.clone(),
689 column_id: col_id,
690 })?;
691
692 let key = ColumnKey::new(stream_key.route.clone(), stream_key.stream_id, col_id);
693 let batch = col_buf.get_range(start, count);
694 if batch.len() != expected_len {
695 return Err(ReadError::InsufficientData {
696 stream_key: stream_key.clone(),
697 requested: expected_len,
698 available: batch.len(),
699 });
700 }
701 columns.insert(key.clone(), batch);
702 column_metadata.insert(key, col_buf.metadata().clone());
703 }
704 }
705
706 Ok(AlignedWindow {
707 sample_numbers,
708 timestamps,
709 columns,
710 stream_metadata,
711 segment_metadata,
712 column_metadata,
713 session_ids,
714 run_ids,
715 })
716 }
717
718 fn prepare_stream_selection(
719 &self,
720 columns: &[ColumnKey],
721 ) -> Result<HashMap<StreamKey, Vec<ColumnId>>, ReadError> {
722 if columns.is_empty() {
723 return Err(ReadError::NoColumnsRequested);
724 }
725 let by_stream = group_columns_by_stream(columns);
726 self.validate_rates(&by_stream)?;
727 Ok(by_stream)
728 }
729
730 fn validate_rates(
731 &self,
732 by_stream: &HashMap<StreamKey, Vec<ColumnId>>,
733 ) -> Result<(), ReadError> {
734 let mut rate: Option<f64> = None;
735 let mut rates = Vec::new();
736
737 for stream_key in by_stream.keys() {
738 let active = self
739 .active_runs
740 .get(stream_key)
741 .ok_or(ReadError::NoActiveRun {
742 stream_key: stream_key.clone(),
743 })?;
744
745 let r = active.effective_rate;
746 rates.push(r);
747
748 if let Some(first_rate) = rate {
749 if (r - first_rate).abs() > 0.001 {
750 return Err(ReadError::SamplingRateMismatch {
751 streams: by_stream.keys().cloned().collect(),
752 rates,
753 });
754 }
755 } else {
756 rate = Some(r);
757 }
758 }
759
760 Ok(())
761 }
762
763 fn active_run(&self, stream_key: &StreamKey) -> Result<&ActiveRun, ReadError> {
764 self.active_runs
765 .get(stream_key)
766 .ok_or(ReadError::NoActiveRun {
767 stream_key: stream_key.clone(),
768 })
769 }
770
771 fn active_buffer(&self, stream_key: &StreamKey) -> Result<&RunBuffer, ReadError> {
772 Ok(&self.active_run(stream_key)?.buffer)
773 }
774
775 fn reference_stream_key<'a>(by_stream: &'a HashMap<StreamKey, Vec<ColumnId>>) -> &'a StreamKey {
776 by_stream
777 .keys()
778 .min()
779 .expect("reference stream requires at least one stream")
780 }
781}
782
783fn group_columns_by_stream(columns: &[ColumnKey]) -> HashMap<StreamKey, Vec<ColumnId>> {
784 let mut by_stream: HashMap<StreamKey, Vec<ColumnId>> = HashMap::new();
785 for col in columns {
786 by_stream
787 .entry(col.stream_key())
788 .or_default()
789 .push(col.column_id);
790 }
791 by_stream
792}
793
794fn normalize_time_bounds(start_time: f64, end_time: f64) -> (f64, f64) {
795 if start_time <= end_time {
796 (start_time, end_time)
797 } else {
798 (end_time, start_time)
799 }
800}
801
802fn timestamps_match_iter<I>(reference: &[f64], candidate: I) -> bool
803where
804 I: Iterator<Item = f64>,
805{
806 reference
807 .iter()
808 .copied()
809 .zip(candidate)
810 .all(|(a, b)| (a - b).abs() <= 1e-9)
811}