bare_metrics_reader/
lib.rs1use anyhow::bail;
2use bare_metrics_core::structures::{
3 get_supported_version, Frame, LogHeader, UnixTimestampMilliseconds,
4};
5use std::io::{Read, Seek, SeekFrom};
6
7#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
10pub struct SeekToken {
11 pub start_ts: UnixTimestampMilliseconds,
12 pub offset: u64,
13}
14
15pub struct MetricsLogReader<R: Read> {
19 reader: R,
20 pub header: LogHeader,
21 last_read_ts: UnixTimestampMilliseconds,
22}
23
24impl<R: Read> MetricsLogReader<R> {
25 pub fn new(mut reader: R) -> anyhow::Result<Self> {
27 let header: LogHeader = serde_bare::from_reader(&mut reader)?;
28 if header.bare_metrics_version != get_supported_version() {
29 bail!("Wrong version. Expected {:?} got {:?}. Later versions of Bare Metrics may use a stable format.", get_supported_version(), header.bare_metrics_version);
30 }
31 let last_read_ts = header.start_time;
32 Ok(MetricsLogReader {
33 reader,
34 header,
35 last_read_ts,
36 })
37 }
38
39 pub fn read_frame(&mut self) -> anyhow::Result<Option<(UnixTimestampMilliseconds, Frame)>> {
42 let mut interceptor = EofTrackingReadInterceptor::new(&mut self.reader);
43 match serde_bare::from_reader::<_, Frame>(&mut interceptor) {
44 Ok(frame) => {
45 let start_ts = self.last_read_ts;
46 self.last_read_ts = frame.end_time;
47 Ok(Some((start_ts, frame)))
48 }
49 Err(err) if err.classify().is_eof() => Ok(None),
51 Err(other_err) => {
52 let eof_flag = interceptor.was_eof();
53 if eof_flag == Some(true) {
54 Ok(None)
55 } else {
56 bail!(
57 "Failed to read frame: {:?} class {:?}, intercepted eof flag {:?}",
58 other_err,
59 other_err.classify(),
60 eof_flag
61 );
62 }
63 }
64 }
65 }
66}
67
68struct EofTrackingReadInterceptor<R: Read> {
69 inner: R,
70 was_eof_flag: Option<bool>,
74}
75
76impl<R: Read> EofTrackingReadInterceptor<R> {
77 pub fn new(inner: R) -> EofTrackingReadInterceptor<R> {
78 EofTrackingReadInterceptor {
79 inner,
80 was_eof_flag: None,
81 }
82 }
83
84 pub fn was_eof(self) -> Option<bool> {
85 self.was_eof_flag
86 }
87}
88
89impl<R: Read> Read for EofTrackingReadInterceptor<R> {
90 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
91 if self.was_eof_flag.is_none() {
92 let count = self.inner.read(buf)?;
93 if count == 0 {
94 self.was_eof_flag = Some(true);
95 } else {
96 self.was_eof_flag = Some(false);
97 }
98 Ok(count)
99 } else {
100 self.inner.read(buf)
101 }
102 }
103}
104
105impl<R: Read + Seek> MetricsLogReader<R> {
106 pub fn read_frame_rewindable(
109 &mut self,
110 ) -> anyhow::Result<Option<(SeekToken, UnixTimestampMilliseconds, Frame)>> {
111 let current_pos_in_file = self.reader.stream_position()?;
112 if let Some((timestamp, frame)) = self.read_frame()? {
114 Ok(Some((
115 SeekToken {
116 start_ts: timestamp,
117 offset: current_pos_in_file,
118 },
119 timestamp,
120 frame,
121 )))
122 } else {
123 self.reader.seek(SeekFrom::Start(current_pos_in_file))?;
126 Ok(None)
127 }
128 }
129
130 pub fn seek(&mut self, seek_token: SeekToken) -> anyhow::Result<SeekToken> {
135 let SeekToken {
136 start_ts: seek_timestamp,
137 offset: seek_pos,
138 } = seek_token;
139 let old_pos_in_file = self.reader.stream_position()?;
140 let old_timestamp = self.last_read_ts;
141
142 self.reader.seek(SeekFrom::Start(seek_pos))?;
144 self.last_read_ts = seek_timestamp;
145
146 Ok(SeekToken {
147 start_ts: old_timestamp,
148 offset: old_pos_in_file,
149 })
150 }
151}