1use std::fs::File;
2use std::io::{BufRead, BufReader, Seek, SeekFrom};
3use std::path::Path;
4use std::time::SystemTime;
5
6use archiver_proto::epics_event::{self, PayloadInfo};
7use prost::Message;
8use tracing::warn;
9
10use crate::storage::plainpb::codec;
11use crate::storage::plainpb::search::binary_search_pb_file;
12use crate::storage::traits::EventStream;
13use crate::types::{ArchDbType, ArchiverSample, ArchiverValue, EventStreamDesc};
14
15pub struct PbFileReader {
18 desc: EventStreamDesc,
19 reader: BufReader<File>,
20}
21
22impl PbFileReader {
23 pub fn open(path: &Path) -> anyhow::Result<Self> {
25 let file = File::open(path)?;
26 let mut reader = BufReader::new(file);
27
28 let mut header_line = Vec::new();
30 reader.read_until(codec::NEWLINE, &mut header_line)?;
31 if header_line.last() == Some(&codec::NEWLINE) {
33 header_line.pop();
34 }
35
36 let header_bytes = codec::unescape(&header_line);
37 let payload_info = PayloadInfo::decode(header_bytes.as_slice())?;
38 let desc = EventStreamDesc::from_payload_info(&payload_info);
39
40 Ok(Self { desc, reader })
41 }
42
43 pub fn open_seeked(path: &Path, start_time: SystemTime) -> anyhow::Result<Self> {
46 let offset = binary_search_pb_file(path, start_time).ok().flatten();
48
49 let file = File::open(path)?;
51 let mut reader = BufReader::new(file);
52
53 let mut header_line = Vec::new();
55 reader.read_until(codec::NEWLINE, &mut header_line)?;
56 if header_line.last() == Some(&codec::NEWLINE) {
57 header_line.pop();
58 }
59 let header_bytes = codec::unescape(&header_line);
60 let payload_info = PayloadInfo::decode(header_bytes.as_slice())?;
61 let desc = EventStreamDesc::from_payload_info(&payload_info);
62
63 if let Some(off) = offset {
65 reader.seek(SeekFrom::Start(off))?;
66 }
67
68 Ok(Self { desc, reader })
69 }
70}
71
72impl EventStream for PbFileReader {
73 fn description(&self) -> &EventStreamDesc {
74 &self.desc
75 }
76
77 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
78 loop {
79 let mut line_buf = Vec::new();
80 let bytes_read = self.reader.read_until(codec::NEWLINE, &mut line_buf)?;
81 if bytes_read == 0 {
82 return Ok(None);
83 }
84
85 let had_newline = line_buf.last() == Some(&codec::NEWLINE);
89 if had_newline {
90 line_buf.pop();
91 } else if !line_buf.is_empty() {
92 warn!(
93 "PB stream: dropping {} truncated trailing bytes (no newline at EOF)",
94 line_buf.len()
95 );
96 return Ok(None);
97 }
98
99 if line_buf.is_empty() {
100 continue;
101 }
102
103 let raw_bytes = codec::unescape(&line_buf);
104 match decode_sample(self.desc.db_type, self.desc.year, &raw_bytes) {
108 Ok(sample) => return Ok(Some(sample)),
109 Err(e) => {
110 warn!(
111 "PB stream: skipping undecodable sample ({} bytes): {e}",
112 raw_bytes.len()
113 );
114 continue;
115 }
116 }
117 }
118 }
119}
120
121pub struct PbBytesReader {
125 desc: EventStreamDesc,
126 reader: BufReader<std::io::Cursor<Vec<u8>>>,
127}
128
129impl PbBytesReader {
130 pub fn from_bytes(bytes: Vec<u8>) -> anyhow::Result<Self> {
132 let mut reader = BufReader::new(std::io::Cursor::new(bytes));
133
134 let mut header_line = Vec::new();
135 reader.read_until(codec::NEWLINE, &mut header_line)?;
136 if header_line.last() == Some(&codec::NEWLINE) {
137 header_line.pop();
138 }
139 let header_bytes = codec::unescape(&header_line);
140 let payload_info = PayloadInfo::decode(header_bytes.as_slice())?;
141 let desc = EventStreamDesc::from_payload_info(&payload_info);
142
143 Ok(Self { desc, reader })
144 }
145}
146
147impl EventStream for PbBytesReader {
148 fn description(&self) -> &EventStreamDesc {
149 &self.desc
150 }
151
152 fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
153 loop {
154 let mut line_buf = Vec::new();
155 let bytes_read = self.reader.read_until(codec::NEWLINE, &mut line_buf)?;
156 if bytes_read == 0 {
157 return Ok(None);
158 }
159 let had_newline = line_buf.last() == Some(&codec::NEWLINE);
160 if had_newline {
161 line_buf.pop();
162 } else if !line_buf.is_empty() {
163 warn!(
164 "PB bytes-stream: dropping {} truncated trailing bytes",
165 line_buf.len()
166 );
167 return Ok(None);
168 }
169 if line_buf.is_empty() {
170 continue;
171 }
172 let raw_bytes = codec::unescape(&line_buf);
173 match decode_sample(self.desc.db_type, self.desc.year, &raw_bytes) {
174 Ok(sample) => return Ok(Some(sample)),
175 Err(e) => {
176 warn!("PB bytes-stream: skipping undecodable sample: {e}");
177 continue;
178 }
179 }
180 }
181 }
182}
183
184pub fn decode_sample(
186 dbr_type: ArchDbType,
187 year: i32,
188 data: &[u8],
189) -> anyhow::Result<ArchiverSample> {
190 match dbr_type {
191 ArchDbType::ScalarString => {
192 let msg = epics_event::ScalarString::decode(data)?;
193 sample_from_parts(
194 year,
195 msg.secondsintoyear,
196 msg.nano,
197 ArchiverValue::ScalarString(msg.val),
198 msg.severity,
199 msg.status,
200 msg.repeatcount,
201 &msg.fieldvalues,
202 msg.fieldactualchange,
203 )
204 }
205 ArchDbType::ScalarByte => {
206 let msg = epics_event::ScalarByte::decode(data)?;
207 sample_from_parts(
208 year,
209 msg.secondsintoyear,
210 msg.nano,
211 ArchiverValue::ScalarByte(msg.val),
212 msg.severity,
213 msg.status,
214 msg.repeatcount,
215 &msg.fieldvalues,
216 msg.fieldactualchange,
217 )
218 }
219 ArchDbType::ScalarShort => {
220 let msg = epics_event::ScalarShort::decode(data)?;
221 sample_from_parts(
222 year,
223 msg.secondsintoyear,
224 msg.nano,
225 ArchiverValue::ScalarShort(msg.val),
226 msg.severity,
227 msg.status,
228 msg.repeatcount,
229 &msg.fieldvalues,
230 msg.fieldactualchange,
231 )
232 }
233 ArchDbType::ScalarInt => {
234 let msg = epics_event::ScalarInt::decode(data)?;
235 sample_from_parts(
236 year,
237 msg.secondsintoyear,
238 msg.nano,
239 ArchiverValue::ScalarInt(msg.val),
240 msg.severity,
241 msg.status,
242 msg.repeatcount,
243 &msg.fieldvalues,
244 msg.fieldactualchange,
245 )
246 }
247 ArchDbType::ScalarEnum => {
248 let msg = epics_event::ScalarEnum::decode(data)?;
249 sample_from_parts(
250 year,
251 msg.secondsintoyear,
252 msg.nano,
253 ArchiverValue::ScalarEnum(msg.val),
254 msg.severity,
255 msg.status,
256 msg.repeatcount,
257 &msg.fieldvalues,
258 msg.fieldactualchange,
259 )
260 }
261 ArchDbType::ScalarFloat => {
262 let msg = epics_event::ScalarFloat::decode(data)?;
263 sample_from_parts(
264 year,
265 msg.secondsintoyear,
266 msg.nano,
267 ArchiverValue::ScalarFloat(msg.val),
268 msg.severity,
269 msg.status,
270 msg.repeatcount,
271 &msg.fieldvalues,
272 msg.fieldactualchange,
273 )
274 }
275 ArchDbType::ScalarDouble => {
276 let msg = epics_event::ScalarDouble::decode(data)?;
277 sample_from_parts(
278 year,
279 msg.secondsintoyear,
280 msg.nano,
281 ArchiverValue::ScalarDouble(msg.val),
282 msg.severity,
283 msg.status,
284 msg.repeatcount,
285 &msg.fieldvalues,
286 msg.fieldactualchange,
287 )
288 }
289 ArchDbType::WaveformString => {
290 let msg = epics_event::VectorString::decode(data)?;
291 sample_from_parts(
292 year,
293 msg.secondsintoyear,
294 msg.nano,
295 ArchiverValue::VectorString(msg.val),
296 msg.severity,
297 msg.status,
298 msg.repeatcount,
299 &msg.fieldvalues,
300 msg.fieldactualchange,
301 )
302 }
303 ArchDbType::WaveformByte => {
304 let msg = epics_event::VectorChar::decode(data)?;
305 sample_from_parts(
306 year,
307 msg.secondsintoyear,
308 msg.nano,
309 ArchiverValue::VectorChar(msg.val),
310 msg.severity,
311 msg.status,
312 msg.repeatcount,
313 &msg.fieldvalues,
314 msg.fieldactualchange,
315 )
316 }
317 ArchDbType::WaveformShort => {
318 let msg = epics_event::VectorShort::decode(data)?;
319 sample_from_parts(
320 year,
321 msg.secondsintoyear,
322 msg.nano,
323 ArchiverValue::VectorShort(msg.val),
324 msg.severity,
325 msg.status,
326 msg.repeatcount,
327 &msg.fieldvalues,
328 msg.fieldactualchange,
329 )
330 }
331 ArchDbType::WaveformInt => {
332 let msg = epics_event::VectorInt::decode(data)?;
333 sample_from_parts(
334 year,
335 msg.secondsintoyear,
336 msg.nano,
337 ArchiverValue::VectorInt(msg.val),
338 msg.severity,
339 msg.status,
340 msg.repeatcount,
341 &msg.fieldvalues,
342 msg.fieldactualchange,
343 )
344 }
345 ArchDbType::WaveformEnum => {
346 let msg = epics_event::VectorEnum::decode(data)?;
347 sample_from_parts(
348 year,
349 msg.secondsintoyear,
350 msg.nano,
351 ArchiverValue::VectorEnum(msg.val),
352 msg.severity,
353 msg.status,
354 msg.repeatcount,
355 &msg.fieldvalues,
356 msg.fieldactualchange,
357 )
358 }
359 ArchDbType::WaveformFloat => {
360 let msg = epics_event::VectorFloat::decode(data)?;
361 sample_from_parts(
362 year,
363 msg.secondsintoyear,
364 msg.nano,
365 ArchiverValue::VectorFloat(msg.val),
366 msg.severity,
367 msg.status,
368 msg.repeatcount,
369 &msg.fieldvalues,
370 msg.fieldactualchange,
371 )
372 }
373 ArchDbType::WaveformDouble => {
374 let msg = epics_event::VectorDouble::decode(data)?;
375 sample_from_parts(
376 year,
377 msg.secondsintoyear,
378 msg.nano,
379 ArchiverValue::VectorDouble(msg.val),
380 msg.severity,
381 msg.status,
382 msg.repeatcount,
383 &msg.fieldvalues,
384 msg.fieldactualchange,
385 )
386 }
387 ArchDbType::V4GenericBytes => {
388 let msg = epics_event::V4GenericBytes::decode(data)?;
389 sample_from_parts(
390 year,
391 msg.secondsintoyear,
392 msg.nano,
393 ArchiverValue::V4GenericBytes(msg.val),
394 msg.severity,
395 msg.status,
396 msg.repeatcount,
397 &msg.fieldvalues,
398 msg.fieldactualchange,
399 )
400 }
401 }
402}
403
404#[allow(clippy::too_many_arguments)]
405fn sample_from_parts(
406 year: i32,
407 seconds_into_year: u32,
408 nanos: u32,
409 value: ArchiverValue,
410 severity: Option<i32>,
411 status: Option<i32>,
412 repeat_count: Option<u32>,
413 field_values: &[epics_event::FieldValue],
414 field_actual_change: Option<bool>,
415) -> anyhow::Result<ArchiverSample> {
416 let timestamp = ArchiverSample::timestamp_from_epoch_parts(year, seconds_into_year, nanos)
417 .ok_or_else(|| {
418 anyhow::anyhow!("invalid timestamp: year={year} secs={seconds_into_year} nanos={nanos}")
419 })?;
420 Ok(ArchiverSample {
421 timestamp,
422 value,
423 severity: severity.unwrap_or(0),
424 status: status.unwrap_or(0),
425 repeat_count,
426 field_values: field_values
427 .iter()
428 .map(|fv| (fv.name.clone(), fv.val.clone()))
429 .collect(),
430 field_actual_change: field_actual_change.unwrap_or(false),
431 })
432}