Skip to main content

archiver_core/storage/plainpb/
reader.rs

1use std::fs::File;
2use std::io::{BufRead, BufReader, Seek, SeekFrom};
3use std::path::Path;
4use std::time::SystemTime;
5
6use archiver_proto::epics_event::{self, PayloadInfo};
7use prost::Message;
8use tracing::warn;
9
10use crate::storage::plainpb::codec;
11use crate::storage::plainpb::search::binary_search_pb_file;
12use crate::storage::traits::EventStream;
13use crate::types::{ArchDbType, ArchiverSample, ArchiverValue, EventStreamDesc};
14
15/// Reads a PlainPB file, yielding one ArchiverSample per line.
16/// Uses binary-safe line reading (read_until) since PB data may contain non-UTF8 bytes.
17pub struct PbFileReader {
18    desc: EventStreamDesc,
19    reader: BufReader<File>,
20}
21
22impl PbFileReader {
23    /// Open a PB file. Reads and parses the PayloadInfo header from the first line.
24    pub fn open(path: &Path) -> anyhow::Result<Self> {
25        let file = File::open(path)?;
26        let mut reader = BufReader::new(file);
27
28        // Read first line (header).
29        let mut header_line = Vec::new();
30        reader.read_until(codec::NEWLINE, &mut header_line)?;
31        // Strip trailing newline.
32        if header_line.last() == Some(&codec::NEWLINE) {
33            header_line.pop();
34        }
35
36        let header_bytes = codec::unescape(&header_line);
37        let payload_info = PayloadInfo::decode(header_bytes.as_slice())?;
38        let desc = EventStreamDesc::from_payload_info(&payload_info);
39
40        Ok(Self { desc, reader })
41    }
42
43    /// Open a PB file and seek to the first sample >= start_time using binary search.
44    /// Falls back to reading from the beginning if binary search finds nothing or fails.
45    pub fn open_seeked(path: &Path, start_time: SystemTime) -> anyhow::Result<Self> {
46        // First, run binary search on a separate file handle.
47        let offset = binary_search_pb_file(path, start_time).ok().flatten();
48
49        // Open the file normally (reads header).
50        let file = File::open(path)?;
51        let mut reader = BufReader::new(file);
52
53        // Read header.
54        let mut header_line = Vec::new();
55        reader.read_until(codec::NEWLINE, &mut header_line)?;
56        if header_line.last() == Some(&codec::NEWLINE) {
57            header_line.pop();
58        }
59        let header_bytes = codec::unescape(&header_line);
60        let payload_info = PayloadInfo::decode(header_bytes.as_slice())?;
61        let desc = EventStreamDesc::from_payload_info(&payload_info);
62
63        // Seek to the binary search result if found.
64        if let Some(off) = offset {
65            reader.seek(SeekFrom::Start(off))?;
66        }
67
68        Ok(Self { desc, reader })
69    }
70}
71
72impl EventStream for PbFileReader {
73    fn description(&self) -> &EventStreamDesc {
74        &self.desc
75    }
76
77    fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
78        loop {
79            let mut line_buf = Vec::new();
80            let bytes_read = self.reader.read_until(codec::NEWLINE, &mut line_buf)?;
81            if bytes_read == 0 {
82                return Ok(None);
83            }
84
85            // Java parity: a trailing line without a terminating newline
86            // is a torn write — drop it rather than feeding partial bytes
87            // to `decode_sample` (PVNames.java fix 8a902a80).
88            let had_newline = line_buf.last() == Some(&codec::NEWLINE);
89            if had_newline {
90                line_buf.pop();
91            } else if !line_buf.is_empty() {
92                warn!(
93                    "PB stream: dropping {} truncated trailing bytes (no newline at EOF)",
94                    line_buf.len()
95                );
96                return Ok(None);
97            }
98
99            if line_buf.is_empty() {
100                continue;
101            }
102
103            let raw_bytes = codec::unescape(&line_buf);
104            // Java parity (53ebdc99): a single corrupt sample line must
105            // not abort the iterator. Skip + log so the rest of the file
106            // is still readable.
107            match decode_sample(self.desc.db_type, self.desc.year, &raw_bytes) {
108                Ok(sample) => return Ok(Some(sample)),
109                Err(e) => {
110                    warn!(
111                        "PB stream: skipping undecodable sample ({} bytes): {e}",
112                        raw_bytes.len()
113                    );
114                    continue;
115                }
116            }
117        }
118    }
119}
120
121/// In-memory PB stream reader. Reads the same line-escaped PB format as
122/// `PbFileReader` but from a `Vec<u8>` (e.g. an HTTP response body from a
123/// failover peer). Yields `ArchiverSample`s through `EventStream`.
124pub struct PbBytesReader {
125    desc: EventStreamDesc,
126    reader: BufReader<std::io::Cursor<Vec<u8>>>,
127}
128
129impl PbBytesReader {
130    /// Decode header + samples from a complete in-memory PB body.
131    pub fn from_bytes(bytes: Vec<u8>) -> anyhow::Result<Self> {
132        let mut reader = BufReader::new(std::io::Cursor::new(bytes));
133
134        let mut header_line = Vec::new();
135        reader.read_until(codec::NEWLINE, &mut header_line)?;
136        if header_line.last() == Some(&codec::NEWLINE) {
137            header_line.pop();
138        }
139        let header_bytes = codec::unescape(&header_line);
140        let payload_info = PayloadInfo::decode(header_bytes.as_slice())?;
141        let desc = EventStreamDesc::from_payload_info(&payload_info);
142
143        Ok(Self { desc, reader })
144    }
145}
146
147impl EventStream for PbBytesReader {
148    fn description(&self) -> &EventStreamDesc {
149        &self.desc
150    }
151
152    fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
153        loop {
154            let mut line_buf = Vec::new();
155            let bytes_read = self.reader.read_until(codec::NEWLINE, &mut line_buf)?;
156            if bytes_read == 0 {
157                return Ok(None);
158            }
159            let had_newline = line_buf.last() == Some(&codec::NEWLINE);
160            if had_newline {
161                line_buf.pop();
162            } else if !line_buf.is_empty() {
163                warn!(
164                    "PB bytes-stream: dropping {} truncated trailing bytes",
165                    line_buf.len()
166                );
167                return Ok(None);
168            }
169            if line_buf.is_empty() {
170                continue;
171            }
172            let raw_bytes = codec::unescape(&line_buf);
173            match decode_sample(self.desc.db_type, self.desc.year, &raw_bytes) {
174                Ok(sample) => return Ok(Some(sample)),
175                Err(e) => {
176                    warn!("PB bytes-stream: skipping undecodable sample: {e}");
177                    continue;
178                }
179            }
180        }
181    }
182}
183
184/// Decode a protobuf sample from raw bytes based on the DBR type.
185pub fn decode_sample(
186    dbr_type: ArchDbType,
187    year: i32,
188    data: &[u8],
189) -> anyhow::Result<ArchiverSample> {
190    match dbr_type {
191        ArchDbType::ScalarString => {
192            let msg = epics_event::ScalarString::decode(data)?;
193            sample_from_parts(
194                year,
195                msg.secondsintoyear,
196                msg.nano,
197                ArchiverValue::ScalarString(msg.val),
198                msg.severity,
199                msg.status,
200                msg.repeatcount,
201                &msg.fieldvalues,
202                msg.fieldactualchange,
203            )
204        }
205        ArchDbType::ScalarByte => {
206            let msg = epics_event::ScalarByte::decode(data)?;
207            sample_from_parts(
208                year,
209                msg.secondsintoyear,
210                msg.nano,
211                ArchiverValue::ScalarByte(msg.val),
212                msg.severity,
213                msg.status,
214                msg.repeatcount,
215                &msg.fieldvalues,
216                msg.fieldactualchange,
217            )
218        }
219        ArchDbType::ScalarShort => {
220            let msg = epics_event::ScalarShort::decode(data)?;
221            sample_from_parts(
222                year,
223                msg.secondsintoyear,
224                msg.nano,
225                ArchiverValue::ScalarShort(msg.val),
226                msg.severity,
227                msg.status,
228                msg.repeatcount,
229                &msg.fieldvalues,
230                msg.fieldactualchange,
231            )
232        }
233        ArchDbType::ScalarInt => {
234            let msg = epics_event::ScalarInt::decode(data)?;
235            sample_from_parts(
236                year,
237                msg.secondsintoyear,
238                msg.nano,
239                ArchiverValue::ScalarInt(msg.val),
240                msg.severity,
241                msg.status,
242                msg.repeatcount,
243                &msg.fieldvalues,
244                msg.fieldactualchange,
245            )
246        }
247        ArchDbType::ScalarEnum => {
248            let msg = epics_event::ScalarEnum::decode(data)?;
249            sample_from_parts(
250                year,
251                msg.secondsintoyear,
252                msg.nano,
253                ArchiverValue::ScalarEnum(msg.val),
254                msg.severity,
255                msg.status,
256                msg.repeatcount,
257                &msg.fieldvalues,
258                msg.fieldactualchange,
259            )
260        }
261        ArchDbType::ScalarFloat => {
262            let msg = epics_event::ScalarFloat::decode(data)?;
263            sample_from_parts(
264                year,
265                msg.secondsintoyear,
266                msg.nano,
267                ArchiverValue::ScalarFloat(msg.val),
268                msg.severity,
269                msg.status,
270                msg.repeatcount,
271                &msg.fieldvalues,
272                msg.fieldactualchange,
273            )
274        }
275        ArchDbType::ScalarDouble => {
276            let msg = epics_event::ScalarDouble::decode(data)?;
277            sample_from_parts(
278                year,
279                msg.secondsintoyear,
280                msg.nano,
281                ArchiverValue::ScalarDouble(msg.val),
282                msg.severity,
283                msg.status,
284                msg.repeatcount,
285                &msg.fieldvalues,
286                msg.fieldactualchange,
287            )
288        }
289        ArchDbType::WaveformString => {
290            let msg = epics_event::VectorString::decode(data)?;
291            sample_from_parts(
292                year,
293                msg.secondsintoyear,
294                msg.nano,
295                ArchiverValue::VectorString(msg.val),
296                msg.severity,
297                msg.status,
298                msg.repeatcount,
299                &msg.fieldvalues,
300                msg.fieldactualchange,
301            )
302        }
303        ArchDbType::WaveformByte => {
304            let msg = epics_event::VectorChar::decode(data)?;
305            sample_from_parts(
306                year,
307                msg.secondsintoyear,
308                msg.nano,
309                ArchiverValue::VectorChar(msg.val),
310                msg.severity,
311                msg.status,
312                msg.repeatcount,
313                &msg.fieldvalues,
314                msg.fieldactualchange,
315            )
316        }
317        ArchDbType::WaveformShort => {
318            let msg = epics_event::VectorShort::decode(data)?;
319            sample_from_parts(
320                year,
321                msg.secondsintoyear,
322                msg.nano,
323                ArchiverValue::VectorShort(msg.val),
324                msg.severity,
325                msg.status,
326                msg.repeatcount,
327                &msg.fieldvalues,
328                msg.fieldactualchange,
329            )
330        }
331        ArchDbType::WaveformInt => {
332            let msg = epics_event::VectorInt::decode(data)?;
333            sample_from_parts(
334                year,
335                msg.secondsintoyear,
336                msg.nano,
337                ArchiverValue::VectorInt(msg.val),
338                msg.severity,
339                msg.status,
340                msg.repeatcount,
341                &msg.fieldvalues,
342                msg.fieldactualchange,
343            )
344        }
345        ArchDbType::WaveformEnum => {
346            let msg = epics_event::VectorEnum::decode(data)?;
347            sample_from_parts(
348                year,
349                msg.secondsintoyear,
350                msg.nano,
351                ArchiverValue::VectorEnum(msg.val),
352                msg.severity,
353                msg.status,
354                msg.repeatcount,
355                &msg.fieldvalues,
356                msg.fieldactualchange,
357            )
358        }
359        ArchDbType::WaveformFloat => {
360            let msg = epics_event::VectorFloat::decode(data)?;
361            sample_from_parts(
362                year,
363                msg.secondsintoyear,
364                msg.nano,
365                ArchiverValue::VectorFloat(msg.val),
366                msg.severity,
367                msg.status,
368                msg.repeatcount,
369                &msg.fieldvalues,
370                msg.fieldactualchange,
371            )
372        }
373        ArchDbType::WaveformDouble => {
374            let msg = epics_event::VectorDouble::decode(data)?;
375            sample_from_parts(
376                year,
377                msg.secondsintoyear,
378                msg.nano,
379                ArchiverValue::VectorDouble(msg.val),
380                msg.severity,
381                msg.status,
382                msg.repeatcount,
383                &msg.fieldvalues,
384                msg.fieldactualchange,
385            )
386        }
387        ArchDbType::V4GenericBytes => {
388            let msg = epics_event::V4GenericBytes::decode(data)?;
389            sample_from_parts(
390                year,
391                msg.secondsintoyear,
392                msg.nano,
393                ArchiverValue::V4GenericBytes(msg.val),
394                msg.severity,
395                msg.status,
396                msg.repeatcount,
397                &msg.fieldvalues,
398                msg.fieldactualchange,
399            )
400        }
401    }
402}
403
404#[allow(clippy::too_many_arguments)]
405fn sample_from_parts(
406    year: i32,
407    seconds_into_year: u32,
408    nanos: u32,
409    value: ArchiverValue,
410    severity: Option<i32>,
411    status: Option<i32>,
412    repeat_count: Option<u32>,
413    field_values: &[epics_event::FieldValue],
414    field_actual_change: Option<bool>,
415) -> anyhow::Result<ArchiverSample> {
416    let timestamp = ArchiverSample::timestamp_from_epoch_parts(year, seconds_into_year, nanos)
417        .ok_or_else(|| {
418            anyhow::anyhow!("invalid timestamp: year={year} secs={seconds_into_year} nanos={nanos}")
419        })?;
420    Ok(ArchiverSample {
421        timestamp,
422        value,
423        severity: severity.unwrap_or(0),
424        status: status.unwrap_or(0),
425        repeat_count,
426        field_values: field_values
427            .iter()
428            .map(|fv| (fv.name.clone(), fv.val.clone()))
429            .collect(),
430        field_actual_change: field_actual_change.unwrap_or(false),
431    })
432}