archiver_core/retrieval/
merge.rs1use std::time::SystemTime;
2
3use crate::storage::traits::EventStream;
4use crate::types::{ArchiverSample, EventStreamDesc};
5
6pub struct MergedEventStream {
9 desc: EventStreamDesc,
10 streams: Vec<Box<dyn EventStream>>,
11 heads: Vec<Option<(usize, ArchiverSample)>>,
13 initialized: bool,
14}
15
16impl MergedEventStream {
17 pub fn new(desc: EventStreamDesc, streams: Vec<Box<dyn EventStream>>) -> Self {
18 let count = streams.len();
19 Self {
20 desc,
21 streams,
22 heads: vec![None; count],
23 initialized: false,
24 }
25 }
26
27 fn initialize(&mut self) -> anyhow::Result<()> {
28 for (i, stream) in self.streams.iter_mut().enumerate() {
29 if let Some(sample) = stream.next_event()? {
30 self.heads[i] = Some((i, sample));
31 }
32 }
33 self.initialized = true;
34 Ok(())
35 }
36}
37
38impl EventStream for MergedEventStream {
39 fn description(&self) -> &EventStreamDesc {
40 &self.desc
41 }
42
43 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
44 if !self.initialized {
45 self.initialize()?;
46 }
47
48 let mut earliest_idx = None;
50 let mut earliest_ts = None;
51
52 for (i, head) in self.heads.iter().enumerate() {
53 if let Some((_, sample)) = head {
54 let ts = sample.timestamp;
55 if earliest_ts.is_none_or(|e| ts < e) {
56 earliest_ts = Some(ts);
57 earliest_idx = Some(i);
58 }
59 }
60 }
61
62 let idx = match earliest_idx {
63 Some(i) => i,
64 None => return Ok(None),
65 };
66
67 let (stream_idx, sample) = self.heads[idx]
69 .take()
70 .expect("earliest_idx points to a Some entry");
71 if let Some(next_sample) = self.streams[stream_idx].next_event()? {
72 self.heads[idx] = Some((stream_idx, next_sample));
73 }
74
75 Ok(Some(sample))
76 }
77}
78
79pub struct DedupTimestampStream {
83 inner: Box<dyn EventStream>,
84 last_ts: Option<SystemTime>,
85}
86
87impl DedupTimestampStream {
88 pub fn new(inner: Box<dyn EventStream>) -> Self {
89 Self {
90 inner,
91 last_ts: None,
92 }
93 }
94}
95
96impl EventStream for DedupTimestampStream {
97 fn description(&self) -> &EventStreamDesc {
98 self.inner.description()
99 }
100
101 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
102 loop {
103 match self.inner.next_event()? {
104 Some(sample) => {
105 if Some(sample.timestamp) == self.last_ts {
106 continue;
107 }
108 self.last_ts = Some(sample.timestamp);
109 return Ok(Some(sample));
110 }
111 None => return Ok(None),
112 }
113 }
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use crate::types::{ArchDbType, ArchiverValue};
121 use std::time::{Duration, UNIX_EPOCH};
122
123 struct VecStream {
124 desc: EventStreamDesc,
125 items: std::vec::IntoIter<ArchiverSample>,
126 }
127
128 impl VecStream {
129 fn new(items: Vec<ArchiverSample>) -> Self {
130 Self {
131 desc: EventStreamDesc {
132 pv_name: "T".to_string(),
133 db_type: ArchDbType::ScalarDouble,
134 year: 2024,
135 element_count: Some(1),
136 headers: Vec::new(),
137 },
138 items: items.into_iter(),
139 }
140 }
141 }
142
143 impl EventStream for VecStream {
144 fn description(&self) -> &EventStreamDesc {
145 &self.desc
146 }
147 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
148 Ok(self.items.next())
149 }
150 }
151
152 fn s(secs: u64, val: f64) -> ArchiverSample {
153 ArchiverSample::new(
154 UNIX_EPOCH + Duration::from_secs(secs),
155 ArchiverValue::ScalarDouble(val),
156 )
157 }
158
159 fn drain(mut stream: Box<dyn EventStream>) -> Vec<f64> {
160 let mut out = Vec::new();
161 while let Some(sample) = stream.next_event().unwrap() {
162 if let ArchiverValue::ScalarDouble(v) = sample.value {
163 out.push(v);
164 }
165 }
166 out
167 }
168
169 #[test]
170 fn merge_interleaves_two_streams() {
171 let a = VecStream::new(vec![s(1, 1.0), s(3, 3.0), s(5, 5.0)]);
172 let b = VecStream::new(vec![s(2, 2.0), s(4, 4.0), s(6, 6.0)]);
173 let merged: Box<dyn EventStream> = Box::new(MergedEventStream::new(
174 a.desc.clone(),
175 vec![Box::new(a), Box::new(b)],
176 ));
177 assert_eq!(drain(merged), vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0]);
178 }
179
180 #[test]
181 fn dedup_drops_duplicate_timestamps() {
182 let a = VecStream::new(vec![s(1, 1.0), s(2, 2.0), s(3, 3.0)]);
184 let b = VecStream::new(vec![s(2, 99.0)]); let merged = Box::new(MergedEventStream::new(
186 a.desc.clone(),
187 vec![Box::new(a), Box::new(b)],
188 ));
189 let dedup: Box<dyn EventStream> = Box::new(DedupTimestampStream::new(merged));
190 let out = drain(dedup);
191 assert_eq!(out, vec![1.0, 2.0, 3.0]);
193 }
194}