Skip to main content

archiver_core/retrieval/
merge.rs

1use std::time::SystemTime;
2
3use crate::storage::traits::EventStream;
4use crate::types::{ArchiverSample, EventStreamDesc};
5
6/// Merges multiple EventStreams in timestamp order.
7/// Assumes each input stream is already sorted by time.
8pub struct MergedEventStream {
9    desc: EventStreamDesc,
10    streams: Vec<Box<dyn EventStream>>,
11    /// Buffer of (stream_index, sample) — one lookahead per stream.
12    heads: Vec<Option<(usize, ArchiverSample)>>,
13    initialized: bool,
14}
15
16impl MergedEventStream {
17    pub fn new(desc: EventStreamDesc, streams: Vec<Box<dyn EventStream>>) -> Self {
18        let count = streams.len();
19        Self {
20            desc,
21            streams,
22            heads: vec![None; count],
23            initialized: false,
24        }
25    }
26
27    fn initialize(&mut self) -> anyhow::Result<()> {
28        for (i, stream) in self.streams.iter_mut().enumerate() {
29            if let Some(sample) = stream.next_event()? {
30                self.heads[i] = Some((i, sample));
31            }
32        }
33        self.initialized = true;
34        Ok(())
35    }
36}
37
38impl EventStream for MergedEventStream {
39    fn description(&self) -> &EventStreamDesc {
40        &self.desc
41    }
42
43    fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
44        if !self.initialized {
45            self.initialize()?;
46        }
47
48        // Find the stream with the earliest timestamp.
49        let mut earliest_idx = None;
50        let mut earliest_ts = None;
51
52        for (i, head) in self.heads.iter().enumerate() {
53            if let Some((_, sample)) = head {
54                let ts = sample.timestamp;
55                if earliest_ts.is_none_or(|e| ts < e) {
56                    earliest_ts = Some(ts);
57                    earliest_idx = Some(i);
58                }
59            }
60        }
61
62        let idx = match earliest_idx {
63            Some(i) => i,
64            None => return Ok(None),
65        };
66
67        // Take the sample and advance that stream.
68        let (stream_idx, sample) = self.heads[idx]
69            .take()
70            .expect("earliest_idx points to a Some entry");
71        if let Some(next_sample) = self.streams[stream_idx].next_event()? {
72            self.heads[idx] = Some((stream_idx, next_sample));
73        }
74
75        Ok(Some(sample))
76    }
77}
78
79/// Wraps another `EventStream` and drops samples whose timestamp equals the
80/// previously emitted one. Used by failover retrieval to deduplicate when the
81/// local archiver and a peer both have identical events for the same PV.
82pub struct DedupTimestampStream {
83    inner: Box<dyn EventStream>,
84    last_ts: Option<SystemTime>,
85}
86
87impl DedupTimestampStream {
88    pub fn new(inner: Box<dyn EventStream>) -> Self {
89        Self {
90            inner,
91            last_ts: None,
92        }
93    }
94}
95
96impl EventStream for DedupTimestampStream {
97    fn description(&self) -> &EventStreamDesc {
98        self.inner.description()
99    }
100
101    fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
102        loop {
103            match self.inner.next_event()? {
104                Some(sample) => {
105                    if Some(sample.timestamp) == self.last_ts {
106                        continue;
107                    }
108                    self.last_ts = Some(sample.timestamp);
109                    return Ok(Some(sample));
110                }
111                None => return Ok(None),
112            }
113        }
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120    use crate::types::{ArchDbType, ArchiverValue};
121    use std::time::{Duration, UNIX_EPOCH};
122
123    struct VecStream {
124        desc: EventStreamDesc,
125        items: std::vec::IntoIter<ArchiverSample>,
126    }
127
128    impl VecStream {
129        fn new(items: Vec<ArchiverSample>) -> Self {
130            Self {
131                desc: EventStreamDesc {
132                    pv_name: "T".to_string(),
133                    db_type: ArchDbType::ScalarDouble,
134                    year: 2024,
135                    element_count: Some(1),
136                    headers: Vec::new(),
137                },
138                items: items.into_iter(),
139            }
140        }
141    }
142
143    impl EventStream for VecStream {
144        fn description(&self) -> &EventStreamDesc {
145            &self.desc
146        }
147        fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
148            Ok(self.items.next())
149        }
150    }
151
152    fn s(secs: u64, val: f64) -> ArchiverSample {
153        ArchiverSample::new(
154            UNIX_EPOCH + Duration::from_secs(secs),
155            ArchiverValue::ScalarDouble(val),
156        )
157    }
158
159    fn drain(mut stream: Box<dyn EventStream>) -> Vec<f64> {
160        let mut out = Vec::new();
161        while let Some(sample) = stream.next_event().unwrap() {
162            if let ArchiverValue::ScalarDouble(v) = sample.value {
163                out.push(v);
164            }
165        }
166        out
167    }
168
169    #[test]
170    fn merge_interleaves_two_streams() {
171        let a = VecStream::new(vec![s(1, 1.0), s(3, 3.0), s(5, 5.0)]);
172        let b = VecStream::new(vec![s(2, 2.0), s(4, 4.0), s(6, 6.0)]);
173        let merged: Box<dyn EventStream> = Box::new(MergedEventStream::new(
174            a.desc.clone(),
175            vec![Box::new(a), Box::new(b)],
176        ));
177        assert_eq!(drain(merged), vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0]);
178    }
179
180    #[test]
181    fn dedup_drops_duplicate_timestamps() {
182        // A and B both have a sample at t=2; merge produces 1,2,2,3 → dedup yields 1,2,3.
183        let a = VecStream::new(vec![s(1, 1.0), s(2, 2.0), s(3, 3.0)]);
184        let b = VecStream::new(vec![s(2, 99.0)]); // duplicate at t=2
185        let merged = Box::new(MergedEventStream::new(
186            a.desc.clone(),
187            vec![Box::new(a), Box::new(b)],
188        ));
189        let dedup: Box<dyn EventStream> = Box::new(DedupTimestampStream::new(merged));
190        let out = drain(dedup);
191        // First t=2 sample wins (value 2.0); duplicate dropped.
192        assert_eq!(out, vec![1.0, 2.0, 3.0]);
193    }
194}