Skip to main content

twinleaf/data/
buffer.rs

1use crate::data::{ColumnData, CursorPosition, Sample};
2use crate::tio::proto::identifiers::*;
3use crate::tio::proto::meta::MetadataEpoch;
4use crate::tio::proto::{BufferType, ColumnMetadata, DeviceRoute, SegmentMetadata, StreamMetadata};
5
6use std::{
7    collections::{HashMap, VecDeque},
8    sync::Arc,
9};
10
11pub type RunId = u64;
12
13#[derive(Debug, Clone)]
14pub enum ColumnBatch {
15    F64(Vec<f64>),
16    I64(Vec<i64>),
17    U64(Vec<u64>),
18}
19
20impl ColumnBatch {
21    pub fn len(&self) -> usize {
22        match self {
23            Self::F64(v) => v.len(),
24            Self::I64(v) => v.len(),
25            Self::U64(v) => v.len(),
26        }
27    }
28
29    pub fn is_empty(&self) -> bool {
30        self.len() == 0
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct AlignedWindow {
36    pub sample_numbers: HashMap<StreamKey, Vec<SampleNumber>>,
37    pub timestamps: Vec<f64>,
38    pub columns: HashMap<ColumnKey, ColumnBatch>,
39    pub stream_metadata: HashMap<StreamKey, Arc<StreamMetadata>>,
40    pub segment_metadata: HashMap<StreamKey, Arc<SegmentMetadata>>,
41    pub column_metadata: HashMap<ColumnKey, Arc<ColumnMetadata>>,
42    pub session_ids: HashMap<StreamKey, SessionId>,
43    pub run_ids: HashMap<StreamKey, RunId>,
44}
45
46#[derive(Debug, thiserror::Error)]
47pub enum ReadError {
48    #[error("no columns requested")]
49    NoColumnsRequested,
50    #[error("no cursor for stream {stream_key:?}")]
51    NoCursorForStream { stream_key: StreamKey },
52    #[error("no active run for stream {stream_key:?}")]
53    NoActiveRun { stream_key: StreamKey },
54    #[error(
55        "insufficient data for stream {stream_key:?}: requested {requested}, available {available}"
56    )]
57    InsufficientData {
58        stream_key: StreamKey,
59        requested: usize,
60        available: usize,
61    },
62    #[error("no data in time range [{requested_start}, {requested_end}]")]
63    NoDataInTimeRange {
64        requested_start: f64,
65        requested_end: f64,
66    },
67    #[error("requested range [{requested_start}, {requested_end}] exceeds retention window [{available_start}, {available_end}]")]
68    RequestedRangeExceedsRetention {
69        requested_start: f64,
70        requested_end: f64,
71        available_start: f64,
72        available_end: f64,
73    },
74    #[error("column {column_id:?} not found in stream {stream_key:?}")]
75    ColumnNotFound {
76        stream_key: StreamKey,
77        column_id: ColumnId,
78    },
79    #[error("sampling rate mismatch across streams {streams:?} at rates {rates:?}")]
80    SamplingRateMismatch {
81        streams: Vec<StreamKey>,
82        rates: Vec<f64>,
83    },
84    #[error("cursor invalidated for stream {stream_key:?}: cursor at run {cursor_run:?}, current run is {current_run:?}")]
85    CursorInvalidated {
86        stream_key: StreamKey,
87        cursor_run: RunId,
88        current_run: RunId,
89    },
90    #[error("cursor out of buffer for stream {stream_key:?}: at sample {cursor_sample:?}, earliest available is {earliest_available:?}")]
91    CursorOutOfBuffer {
92        stream_key: StreamKey,
93        cursor_sample: SampleNumber,
94        earliest_available: SampleNumber,
95    },
96}
97
98#[derive(Debug)]
99enum ColumnBuffer {
100    F64 {
101        metadata: Arc<ColumnMetadata>,
102        data: VecDeque<f64>,
103    },
104    I64 {
105        metadata: Arc<ColumnMetadata>,
106        data: VecDeque<i64>,
107    },
108    U64 {
109        metadata: Arc<ColumnMetadata>,
110        data: VecDeque<u64>,
111    },
112}
113
114impl ColumnBuffer {
115    fn new(metadata: Arc<ColumnMetadata>, capacity: usize) -> Self {
116        let alloc = capacity.min(65_536);
117        match metadata.data_type.buffer_type() {
118            BufferType::Float => Self::F64 {
119                metadata,
120                data: VecDeque::with_capacity(alloc),
121            },
122            BufferType::Int => Self::I64 {
123                metadata,
124                data: VecDeque::with_capacity(alloc),
125            },
126            BufferType::UInt => Self::U64 {
127                metadata,
128                data: VecDeque::with_capacity(alloc),
129            },
130        }
131    }
132
133    fn metadata(&self) -> &Arc<ColumnMetadata> {
134        match self {
135            Self::F64 { metadata, .. }
136            | Self::I64 { metadata, .. }
137            | Self::U64 { metadata, .. } => metadata,
138        }
139    }
140
141    fn push(&mut self, value: ColumnData) {
142        match (self, value) {
143            (Self::F64 { data, .. }, ColumnData::Float(v)) => data.push_back(v),
144            (Self::F64 { data, .. }, ColumnData::Int(v)) => data.push_back(v as f64),
145            (Self::I64 { data, .. }, ColumnData::Int(v)) => data.push_back(v),
146            (Self::U64 { data, .. }, ColumnData::UInt(v)) => data.push_back(v),
147            _ => {}
148        }
149    }
150
151    fn pop_front(&mut self) {
152        match self {
153            Self::F64 { data, .. } => {
154                data.pop_front();
155            }
156            Self::I64 { data, .. } => {
157                data.pop_front();
158            }
159            Self::U64 { data, .. } => {
160                data.pop_front();
161            }
162        }
163    }
164
165    fn get_range(&self, start: usize, count: usize) -> ColumnBatch {
166        match self {
167            Self::F64 { data, .. } => {
168                ColumnBatch::F64(data.iter().skip(start).take(count).copied().collect())
169            }
170            Self::I64 { data, .. } => {
171                ColumnBatch::I64(data.iter().skip(start).take(count).copied().collect())
172            }
173            Self::U64 { data, .. } => {
174                ColumnBatch::U64(data.iter().skip(start).take(count).copied().collect())
175            }
176        }
177    }
178}
179
180struct RunBuffer {
181    run_id: RunId,
182    session_id: SessionId,
183    stream_metadata: Arc<StreamMetadata>,
184    segment_metadata: Arc<SegmentMetadata>,
185    sample_numbers: VecDeque<SampleNumber>,
186    timestamps: VecDeque<f64>,
187    columns: HashMap<ColumnId, ColumnBuffer>,
188    capacity: usize,
189}
190
191impl RunBuffer {
192    fn new(run_id: RunId, sample: &Sample, capacity: usize) -> Self {
193        let alloc = capacity.min(65_536);
194        Self {
195            run_id,
196            session_id: sample.device.session_id,
197            stream_metadata: sample.stream.clone(),
198            segment_metadata: sample.segment.clone(),
199            sample_numbers: VecDeque::with_capacity(alloc),
200            timestamps: VecDeque::with_capacity(alloc),
201            columns: HashMap::new(),
202            capacity,
203        }
204    }
205
206    fn len(&self) -> usize {
207        self.sample_numbers.len()
208    }
209
210    fn push(&mut self, sample: &Sample) {
211        self.sample_numbers.push_back(sample.n);
212        self.timestamps.push_back(sample.timestamp_end());
213        self.segment_metadata = sample.segment.clone();
214
215        for col in &sample.columns {
216            self.columns
217                .entry(col.desc.index)
218                .or_insert_with(|| ColumnBuffer::new(col.desc.clone(), self.capacity))
219                .push(col.value.clone());
220        }
221    }
222
223    fn pop_front(&mut self) {
224        self.sample_numbers.pop_front();
225        self.timestamps.pop_front();
226        for col in self.columns.values_mut() {
227            col.pop_front();
228        }
229    }
230
231    fn sample_number_wraps(&self) -> bool {
232        match (self.sample_numbers.front(), self.sample_numbers.back()) {
233            (Some(first), Some(last)) => first > last,
234            _ => false,
235        }
236    }
237
238    fn find_start_after_sample(&self, sample_number: SampleNumber) -> Option<usize> {
239        if self.sample_numbers.is_empty() {
240            return None;
241        }
242
243        if !self.sample_number_wraps() {
244            let start = self
245                .sample_numbers
246                .partition_point(|&sn| sn <= sample_number);
247            if start == 0 || self.sample_numbers.get(start - 1).copied()? != sample_number {
248                return None;
249            }
250            return Some(start);
251        }
252
253        self.sample_numbers
254            .iter()
255            .rposition(|&sn| sn == sample_number)
256            .map(|idx| idx + 1)
257    }
258
259    fn timestamps_range(&self, start: usize, count: usize) -> Vec<f64> {
260        self.timestamps
261            .iter()
262            .skip(start)
263            .take(count)
264            .copied()
265            .collect()
266    }
267}
268
269pub struct ActiveRun {
270    pub run_id: RunId,
271    pub session_id: SessionId,
272    pub segment_id: SegmentId,
273    pub effective_rate: f64,
274    pub time_ref_epoch: MetadataEpoch,
275    pub last_sample_number: SampleNumber,
276    pub last_timestamp: f64,
277    buffer: RunBuffer,
278}
279
280impl ActiveRun {
281    fn new(run_id: RunId, sample: &Sample, capacity: usize) -> Self {
282        let segment = &sample.segment;
283        Self {
284            run_id,
285            session_id: sample.device.session_id,
286            segment_id: segment.segment_id,
287            effective_rate: segment.sampling_rate as f64 / segment.decimation as f64,
288            time_ref_epoch: segment.time_ref_epoch.clone(),
289            last_sample_number: sample.n,
290            last_timestamp: sample.timestamp_end(),
291            buffer: RunBuffer::new(run_id, sample, capacity),
292        }
293    }
294}
295
296pub struct Buffer {
297    capacity: usize,
298    active_runs: HashMap<StreamKey, ActiveRun>,
299    next_run_id: RunId,
300}
301
302enum AlignmentMode<'a> {
303    LastN(usize),
304    FromCursors {
305        cursors: &'a HashMap<StreamKey, CursorPosition>,
306        n: usize,
307    },
308    CommonTail,
309    TimeRange {
310        start: f64,
311        end: f64,
312    },
313}
314
315impl Buffer {
316    pub fn new(capacity: usize) -> Self {
317        Self {
318            capacity,
319            active_runs: HashMap::new(),
320            next_run_id: 0,
321        }
322    }
323
324    pub fn process_sample(&mut self, sample: Sample, stream_key: StreamKey) {
325        let needs_new_run = !sample.is_continuous() || !self.active_runs.contains_key(&stream_key);
326
327        if needs_new_run {
328            let new_run_id = self.next_run_id;
329            self.next_run_id += 1;
330            self.active_runs.insert(
331                stream_key.clone(),
332                ActiveRun::new(new_run_id, &sample, self.capacity),
333            );
334        }
335
336        let active = self.active_runs.get_mut(&stream_key).unwrap();
337        active.buffer.push(&sample);
338        active.last_sample_number = sample.n;
339        active.last_timestamp = sample.timestamp_end();
340        active.segment_id = sample.segment.segment_id;
341
342        if active.buffer.len() > self.capacity {
343            active.buffer.pop_front();
344        }
345    }
346
347    pub fn get_run(&self, stream_key: &StreamKey) -> Option<&ActiveRun> {
348        self.active_runs.get(stream_key)
349    }
350
351    pub fn read_aligned_window(
352        &self,
353        columns: &[ColumnKey],
354        n: usize,
355    ) -> Result<AlignedWindow, ReadError> {
356        let by_stream = self.prepare_stream_selection(columns)?;
357        let (slices, timestamps) =
358            self.compute_aligned_slices(&by_stream, AlignmentMode::LastN(n))?;
359        self.build_window_from_slices(&by_stream, &slices, timestamps)
360    }
361
362    pub fn read_from_cursor(
363        &self,
364        columns: &[ColumnKey],
365        cursors: &HashMap<StreamKey, CursorPosition>,
366        n: usize,
367    ) -> Result<AlignedWindow, ReadError> {
368        let by_stream = self.prepare_stream_selection(columns)?;
369        let (slices, timestamps) =
370            self.compute_aligned_slices(&by_stream, AlignmentMode::FromCursors { cursors, n })?;
371        self.build_window_from_slices(&by_stream, &slices, timestamps)
372    }
373
374    pub fn read_aligned_tail(&self, columns: &[ColumnKey]) -> Result<AlignedWindow, ReadError> {
375        let by_stream = self.prepare_stream_selection(columns)?;
376        let (slices, timestamps) =
377            self.compute_aligned_slices(&by_stream, AlignmentMode::CommonTail)?;
378        self.build_window_from_slices(&by_stream, &slices, timestamps)
379    }
380
381    pub fn read_aligned_time_range(
382        &self,
383        columns: &[ColumnKey],
384        start_time: f64,
385        end_time: f64,
386    ) -> Result<AlignedWindow, ReadError> {
387        let by_stream = self.prepare_stream_selection(columns)?;
388        let (slices, timestamps) = self.compute_aligned_slices(
389            &by_stream,
390            AlignmentMode::TimeRange {
391                start: start_time,
392                end: end_time,
393            },
394        )?;
395        self.build_window_from_slices(&by_stream, &slices, timestamps)
396            .map_err(|err| match err {
397                ReadError::InsufficientData { .. } => ReadError::NoDataInTimeRange {
398                    requested_start: start_time.min(end_time),
399                    requested_end: start_time.max(end_time),
400                },
401                other => other,
402            })
403    }
404
405    fn compute_aligned_slices(
406        &self,
407        by_stream: &HashMap<StreamKey, Vec<ColumnId>>,
408        mode: AlignmentMode<'_>,
409    ) -> Result<(HashMap<StreamKey, (usize, usize)>, Vec<f64>), ReadError> {
410        match mode {
411            AlignmentMode::LastN(n) => {
412                let ref_key = Self::reference_stream_key(by_stream);
413                let ref_buf = self.active_buffer(ref_key)?;
414                let available = ref_buf.len();
415                if available == 0 {
416                    return Err(ReadError::InsufficientData {
417                        stream_key: ref_key.clone(),
418                        requested: n,
419                        available: 0,
420                    });
421                }
422                let count = n.min(available);
423                let start = available.saturating_sub(count);
424                let timestamps = ref_buf.timestamps_range(start, count);
425                let slices = by_stream
426                    .keys()
427                    .map(|k| (k.clone(), (start, count)))
428                    .collect();
429                Ok((slices, timestamps))
430            }
431
432            AlignmentMode::FromCursors { cursors, n } => {
433                let mut start = 0;
434                let mut reference_key: Option<StreamKey> = None;
435
436                for stream_key in by_stream.keys() {
437                    let active = self.active_run(stream_key)?;
438                    let cursor = cursors
439                        .get(stream_key)
440                        .ok_or(ReadError::NoCursorForStream {
441                            stream_key: stream_key.clone(),
442                        })?;
443                    if cursor.run_id != active.run_id {
444                        return Err(ReadError::CursorInvalidated {
445                            stream_key: stream_key.clone(),
446                            cursor_run: cursor.run_id,
447                            current_run: active.run_id,
448                        });
449                    }
450                    let buf = &active.buffer;
451                    if buf.sample_numbers.is_empty() {
452                        return Err(ReadError::InsufficientData {
453                            stream_key: stream_key.clone(),
454                            requested: n,
455                            available: 0,
456                        });
457                    }
458                    let s = buf
459                        .find_start_after_sample(cursor.last_sample_number)
460                        .ok_or(ReadError::CursorOutOfBuffer {
461                            stream_key: stream_key.clone(),
462                            cursor_sample: cursor.last_sample_number,
463                            earliest_available: *buf.sample_numbers.front().unwrap(),
464                        })?;
465                    if s + n > buf.len() {
466                        return Err(ReadError::InsufficientData {
467                            stream_key: stream_key.clone(),
468                            requested: n,
469                            available: buf.len().saturating_sub(s),
470                        });
471                    }
472                    if reference_key.is_none() {
473                        start = s;
474                        reference_key = Some(stream_key.clone());
475                    }
476                }
477
478                let ref_key = reference_key.unwrap();
479                let ref_buf = self.active_buffer(&ref_key)?;
480                let timestamps = ref_buf.timestamps_range(start, n);
481                let slices = by_stream.keys().map(|k| (k.clone(), (start, n))).collect();
482                Ok((slices, timestamps))
483            }
484
485            AlignmentMode::CommonTail => {
486                let mut global_start = f64::MIN;
487                let mut global_end = f64::MAX;
488
489                for stream_key in by_stream.keys() {
490                    let buf = self.active_buffer(stream_key)?;
491                    if buf.timestamps.is_empty() {
492                        return Err(ReadError::InsufficientData {
493                            stream_key: stream_key.clone(),
494                            requested: 0,
495                            available: 0,
496                        });
497                    }
498                    let first = *buf.timestamps.front().unwrap();
499                    let last = *buf.timestamps.back().unwrap();
500                    global_start = global_start.max(first);
501                    global_end = global_end.min(last);
502                }
503
504                if global_start >= global_end {
505                    return Err(ReadError::InsufficientData {
506                        stream_key: StreamKey::new(DeviceRoute::root(), 0),
507                        requested: 0,
508                        available: 0,
509                    });
510                }
511
512                let ref_key = Self::reference_stream_key(by_stream);
513                let ref_buf = self.active_buffer(ref_key)?;
514                let start = ref_buf
515                    .timestamps
516                    .iter()
517                    .position(|&t| t >= global_start)
518                    .unwrap_or(0);
519                let end = ref_buf
520                    .timestamps
521                    .iter()
522                    .rposition(|&t| t <= global_end)
523                    .unwrap_or(ref_buf.len().saturating_sub(1));
524                let count = end.saturating_sub(start) + 1;
525                let timestamps = ref_buf.timestamps_range(start, count);
526                let slices = by_stream
527                    .keys()
528                    .map(|k| (k.clone(), (start, count)))
529                    .collect();
530                Ok((slices, timestamps))
531            }
532
533            AlignmentMode::TimeRange {
534                start: start_time,
535                end: end_time,
536            } => {
537                let (requested_start, requested_end) = normalize_time_bounds(start_time, end_time);
538
539                let (available_start, available_end) = self
540                    .aligned_retained_time_bounds(by_stream)?
541                    .ok_or(ReadError::NoDataInTimeRange {
542                        requested_start,
543                        requested_end,
544                    })?;
545
546                if requested_start < available_start || requested_end > available_end {
547                    return Err(ReadError::RequestedRangeExceedsRetention {
548                        requested_start,
549                        requested_end,
550                        available_start,
551                        available_end,
552                    });
553                }
554
555                let ref_key = Self::reference_stream_key(by_stream);
556                let ref_buf = self.active_buffer(ref_key)?;
557                let ref_start = ref_buf.timestamps.partition_point(|&t| t < requested_start);
558                let ref_end = ref_buf.timestamps.partition_point(|&t| t <= requested_end);
559                if ref_start >= ref_end {
560                    return Err(ReadError::NoDataInTimeRange {
561                        requested_start,
562                        requested_end,
563                    });
564                }
565
566                let ref_count = ref_end - ref_start;
567                let timestamps = ref_buf.timestamps_range(ref_start, ref_count);
568                if timestamps.is_empty() {
569                    return Err(ReadError::NoDataInTimeRange {
570                        requested_start,
571                        requested_end,
572                    });
573                }
574
575                let mut slices = HashMap::new();
576                slices.insert(ref_key.clone(), (ref_start, ref_count));
577
578                for stream_key in by_stream.keys() {
579                    if stream_key == ref_key {
580                        continue;
581                    }
582                    let buf = self.active_buffer(stream_key)?;
583                    let s = buf.timestamps.partition_point(|&t| t < requested_start);
584                    let e = buf.timestamps.partition_point(|&t| t <= requested_end);
585                    if s >= e {
586                        return Err(ReadError::NoDataInTimeRange {
587                            requested_start,
588                            requested_end,
589                        });
590                    }
591                    let count = e - s;
592                    if count != ref_count {
593                        return Err(ReadError::NoDataInTimeRange {
594                            requested_start,
595                            requested_end,
596                        });
597                    }
598                    let stream_timestamps = buf.timestamps.iter().skip(s).take(count).copied();
599                    if !timestamps_match_iter(&timestamps, stream_timestamps) {
600                        return Err(ReadError::NoDataInTimeRange {
601                            requested_start,
602                            requested_end,
603                        });
604                    }
605                    slices.insert(stream_key.clone(), (s, count));
606                }
607
608                Ok((slices, timestamps))
609            }
610        }
611    }
612
613    fn aligned_retained_time_bounds(
614        &self,
615        by_stream: &HashMap<StreamKey, Vec<ColumnId>>,
616    ) -> Result<Option<(f64, f64)>, ReadError> {
617        let mut global_start = f64::MIN;
618        let mut global_end = f64::MAX;
619
620        for stream_key in by_stream.keys() {
621            let buf = self.active_buffer(stream_key)?;
622            let (Some(&first), Some(&last)) = (buf.timestamps.front(), buf.timestamps.back())
623            else {
624                return Ok(None);
625            };
626
627            global_start = global_start.max(first);
628            global_end = global_end.min(last);
629        }
630
631        if global_start > global_end {
632            return Ok(None);
633        }
634
635        Ok(Some((global_start, global_end)))
636    }
637
638    fn build_window_from_slices(
639        &self,
640        by_stream: &HashMap<StreamKey, Vec<ColumnId>>,
641        slices: &HashMap<StreamKey, (usize, usize)>,
642        timestamps: Vec<f64>,
643    ) -> Result<AlignedWindow, ReadError> {
644        let expected_len = timestamps.len();
645        let mut sample_numbers = HashMap::new();
646        let mut columns = HashMap::new();
647        let mut stream_metadata = HashMap::new();
648        let mut segment_metadata = HashMap::new();
649        let mut column_metadata = HashMap::new();
650        let mut session_ids = HashMap::new();
651        let mut run_ids = HashMap::new();
652
653        for (stream_key, col_ids) in by_stream {
654            let buf = self.active_buffer(stream_key)?;
655            let (start, count) =
656                slices
657                    .get(stream_key)
658                    .copied()
659                    .ok_or(ReadError::InsufficientData {
660                        stream_key: stream_key.clone(),
661                        requested: expected_len,
662                        available: 0,
663                    })?;
664
665            let stream_sample_numbers: Vec<_> = buf
666                .sample_numbers
667                .iter()
668                .skip(start)
669                .take(count)
670                .copied()
671                .collect();
672            if stream_sample_numbers.len() != expected_len {
673                return Err(ReadError::InsufficientData {
674                    stream_key: stream_key.clone(),
675                    requested: expected_len,
676                    available: stream_sample_numbers.len(),
677                });
678            }
679            sample_numbers.insert(stream_key.clone(), stream_sample_numbers);
680
681            stream_metadata.insert(stream_key.clone(), buf.stream_metadata.clone());
682            segment_metadata.insert(stream_key.clone(), buf.segment_metadata.clone());
683            session_ids.insert(stream_key.clone(), buf.session_id);
684            run_ids.insert(stream_key.clone(), buf.run_id);
685
686            for &col_id in col_ids {
687                let col_buf = buf.columns.get(&col_id).ok_or(ReadError::ColumnNotFound {
688                    stream_key: stream_key.clone(),
689                    column_id: col_id,
690                })?;
691
692                let key = ColumnKey::new(stream_key.route.clone(), stream_key.stream_id, col_id);
693                let batch = col_buf.get_range(start, count);
694                if batch.len() != expected_len {
695                    return Err(ReadError::InsufficientData {
696                        stream_key: stream_key.clone(),
697                        requested: expected_len,
698                        available: batch.len(),
699                    });
700                }
701                columns.insert(key.clone(), batch);
702                column_metadata.insert(key, col_buf.metadata().clone());
703            }
704        }
705
706        Ok(AlignedWindow {
707            sample_numbers,
708            timestamps,
709            columns,
710            stream_metadata,
711            segment_metadata,
712            column_metadata,
713            session_ids,
714            run_ids,
715        })
716    }
717
718    fn prepare_stream_selection(
719        &self,
720        columns: &[ColumnKey],
721    ) -> Result<HashMap<StreamKey, Vec<ColumnId>>, ReadError> {
722        if columns.is_empty() {
723            return Err(ReadError::NoColumnsRequested);
724        }
725        let by_stream = group_columns_by_stream(columns);
726        self.validate_rates(&by_stream)?;
727        Ok(by_stream)
728    }
729
730    fn validate_rates(
731        &self,
732        by_stream: &HashMap<StreamKey, Vec<ColumnId>>,
733    ) -> Result<(), ReadError> {
734        let mut rate: Option<f64> = None;
735        let mut rates = Vec::new();
736
737        for stream_key in by_stream.keys() {
738            let active = self
739                .active_runs
740                .get(stream_key)
741                .ok_or(ReadError::NoActiveRun {
742                    stream_key: stream_key.clone(),
743                })?;
744
745            let r = active.effective_rate;
746            rates.push(r);
747
748            if let Some(first_rate) = rate {
749                if (r - first_rate).abs() > 0.001 {
750                    return Err(ReadError::SamplingRateMismatch {
751                        streams: by_stream.keys().cloned().collect(),
752                        rates,
753                    });
754                }
755            } else {
756                rate = Some(r);
757            }
758        }
759
760        Ok(())
761    }
762
763    fn active_run(&self, stream_key: &StreamKey) -> Result<&ActiveRun, ReadError> {
764        self.active_runs
765            .get(stream_key)
766            .ok_or(ReadError::NoActiveRun {
767                stream_key: stream_key.clone(),
768            })
769    }
770
771    fn active_buffer(&self, stream_key: &StreamKey) -> Result<&RunBuffer, ReadError> {
772        Ok(&self.active_run(stream_key)?.buffer)
773    }
774
775    fn reference_stream_key<'a>(by_stream: &'a HashMap<StreamKey, Vec<ColumnId>>) -> &'a StreamKey {
776        by_stream
777            .keys()
778            .min()
779            .expect("reference stream requires at least one stream")
780    }
781}
782
783fn group_columns_by_stream(columns: &[ColumnKey]) -> HashMap<StreamKey, Vec<ColumnId>> {
784    let mut by_stream: HashMap<StreamKey, Vec<ColumnId>> = HashMap::new();
785    for col in columns {
786        by_stream
787            .entry(col.stream_key())
788            .or_default()
789            .push(col.column_id);
790    }
791    by_stream
792}
793
794fn normalize_time_bounds(start_time: f64, end_time: f64) -> (f64, f64) {
795    if start_time <= end_time {
796        (start_time, end_time)
797    } else {
798        (end_time, start_time)
799    }
800}
801
802fn timestamps_match_iter<I>(reference: &[f64], candidate: I) -> bool
803where
804    I: Iterator<Item = f64>,
805{
806    reference
807        .iter()
808        .copied()
809        .zip(candidate)
810        .all(|(a, b)| (a - b).abs() <= 1e-9)
811}