use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::Path;
use std::time::SystemTime;
use archiver_proto::epics_event::{self, PayloadInfo};
use prost::Message;
use tracing::warn;
use crate::storage::plainpb::codec;
use crate::storage::plainpb::search::binary_search_pb_file;
use crate::storage::traits::EventStream;
use crate::types::{ArchDbType, ArchiverSample, ArchiverValue, EventStreamDesc};
pub struct PbFileReader {
desc: EventStreamDesc,
reader: BufReader<File>,
}
impl PbFileReader {
pub fn open(path: &Path) -> anyhow::Result<Self> {
let file = File::open(path)?;
let mut reader = BufReader::new(file);
let mut header_line = Vec::new();
reader.read_until(codec::NEWLINE, &mut header_line)?;
if header_line.last() == Some(&codec::NEWLINE) {
header_line.pop();
}
let header_bytes = codec::unescape(&header_line);
let payload_info = PayloadInfo::decode(header_bytes.as_slice())?;
let desc = EventStreamDesc::from_payload_info(&payload_info);
Ok(Self { desc, reader })
}
pub fn open_seeked(path: &Path, start_time: SystemTime) -> anyhow::Result<Self> {
let offset = binary_search_pb_file(path, start_time).ok().flatten();
let file = File::open(path)?;
let mut reader = BufReader::new(file);
let mut header_line = Vec::new();
reader.read_until(codec::NEWLINE, &mut header_line)?;
if header_line.last() == Some(&codec::NEWLINE) {
header_line.pop();
}
let header_bytes = codec::unescape(&header_line);
let payload_info = PayloadInfo::decode(header_bytes.as_slice())?;
let desc = EventStreamDesc::from_payload_info(&payload_info);
if let Some(off) = offset {
reader.seek(SeekFrom::Start(off))?;
}
Ok(Self { desc, reader })
}
}
impl EventStream for PbFileReader {
fn description(&self) -> &EventStreamDesc {
&self.desc
}
fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
loop {
let mut line_buf = Vec::new();
let bytes_read = self.reader.read_until(codec::NEWLINE, &mut line_buf)?;
if bytes_read == 0 {
return Ok(None);
}
let had_newline = line_buf.last() == Some(&codec::NEWLINE);
if had_newline {
line_buf.pop();
} else if !line_buf.is_empty() {
warn!(
"PB stream: dropping {} truncated trailing bytes (no newline at EOF)",
line_buf.len()
);
return Ok(None);
}
if line_buf.is_empty() {
continue;
}
let raw_bytes = codec::unescape(&line_buf);
match decode_sample(self.desc.db_type, self.desc.year, &raw_bytes) {
Ok(sample) => return Ok(Some(sample)),
Err(e) => {
warn!(
"PB stream: skipping undecodable sample ({} bytes): {e}",
raw_bytes.len()
);
continue;
}
}
}
}
}
pub struct PbBytesReader {
desc: EventStreamDesc,
reader: BufReader<std::io::Cursor<Vec<u8>>>,
}
impl PbBytesReader {
pub fn from_bytes(bytes: Vec<u8>) -> anyhow::Result<Self> {
let mut reader = BufReader::new(std::io::Cursor::new(bytes));
let mut header_line = Vec::new();
reader.read_until(codec::NEWLINE, &mut header_line)?;
if header_line.last() == Some(&codec::NEWLINE) {
header_line.pop();
}
let header_bytes = codec::unescape(&header_line);
let payload_info = PayloadInfo::decode(header_bytes.as_slice())?;
let desc = EventStreamDesc::from_payload_info(&payload_info);
Ok(Self { desc, reader })
}
}
impl EventStream for PbBytesReader {
fn description(&self) -> &EventStreamDesc {
&self.desc
}
fn next_event(&mut self) -> anyhow::Result<Option<ArchiverSample>> {
loop {
let mut line_buf = Vec::new();
let bytes_read = self.reader.read_until(codec::NEWLINE, &mut line_buf)?;
if bytes_read == 0 {
return Ok(None);
}
let had_newline = line_buf.last() == Some(&codec::NEWLINE);
if had_newline {
line_buf.pop();
} else if !line_buf.is_empty() {
warn!(
"PB bytes-stream: dropping {} truncated trailing bytes",
line_buf.len()
);
return Ok(None);
}
if line_buf.is_empty() {
continue;
}
let raw_bytes = codec::unescape(&line_buf);
match decode_sample(self.desc.db_type, self.desc.year, &raw_bytes) {
Ok(sample) => return Ok(Some(sample)),
Err(e) => {
warn!("PB bytes-stream: skipping undecodable sample: {e}");
continue;
}
}
}
}
}
pub fn decode_sample(
dbr_type: ArchDbType,
year: i32,
data: &[u8],
) -> anyhow::Result<ArchiverSample> {
match dbr_type {
ArchDbType::ScalarString => {
let msg = epics_event::ScalarString::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::ScalarString(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::ScalarByte => {
let msg = epics_event::ScalarByte::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::ScalarByte(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::ScalarShort => {
let msg = epics_event::ScalarShort::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::ScalarShort(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::ScalarInt => {
let msg = epics_event::ScalarInt::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::ScalarInt(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::ScalarEnum => {
let msg = epics_event::ScalarEnum::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::ScalarEnum(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::ScalarFloat => {
let msg = epics_event::ScalarFloat::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::ScalarFloat(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::ScalarDouble => {
let msg = epics_event::ScalarDouble::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::ScalarDouble(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::WaveformString => {
let msg = epics_event::VectorString::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::VectorString(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::WaveformByte => {
let msg = epics_event::VectorChar::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::VectorChar(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::WaveformShort => {
let msg = epics_event::VectorShort::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::VectorShort(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::WaveformInt => {
let msg = epics_event::VectorInt::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::VectorInt(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::WaveformEnum => {
let msg = epics_event::VectorEnum::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::VectorEnum(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::WaveformFloat => {
let msg = epics_event::VectorFloat::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::VectorFloat(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::WaveformDouble => {
let msg = epics_event::VectorDouble::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::VectorDouble(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
ArchDbType::V4GenericBytes => {
let msg = epics_event::V4GenericBytes::decode(data)?;
sample_from_parts(
year,
msg.secondsintoyear,
msg.nano,
ArchiverValue::V4GenericBytes(msg.val),
msg.severity,
msg.status,
msg.repeatcount,
&msg.fieldvalues,
msg.fieldactualchange,
)
}
}
}
#[allow(clippy::too_many_arguments)]
fn sample_from_parts(
year: i32,
seconds_into_year: u32,
nanos: u32,
value: ArchiverValue,
severity: Option<i32>,
status: Option<i32>,
repeat_count: Option<u32>,
field_values: &[epics_event::FieldValue],
field_actual_change: Option<bool>,
) -> anyhow::Result<ArchiverSample> {
let timestamp = ArchiverSample::timestamp_from_epoch_parts(year, seconds_into_year, nanos)
.ok_or_else(|| {
anyhow::anyhow!("invalid timestamp: year={year} secs={seconds_into_year} nanos={nanos}")
})?;
Ok(ArchiverSample {
timestamp,
value,
severity: severity.unwrap_or(0),
status: status.unwrap_or(0),
repeat_count,
field_values: field_values
.iter()
.map(|fv| (fv.name.clone(), fv.val.clone()))
.collect(),
field_actual_change: field_actual_change.unwrap_or(false),
})
}