bare_metrics_reader/
lib.rs

1use anyhow::bail;
2use bare_metrics_core::structures::{
3    get_supported_version, Frame, LogHeader, UnixTimestampMilliseconds,
4};
5use std::io::{Read, Seek, SeekFrom};
6
7/// Token that is known to be usable for seeking to a frame in a metrics log.
8/// TODO identify which reader is applicable? Or don't bother?
9#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq, Hash)]
10pub struct SeekToken {
11    pub start_ts: UnixTimestampMilliseconds,
12    pub offset: u64,
13}
14
15/// A streaming reader for metric logs.
16/// If the underlying reader supports seeks, it is possible to get SeekTokens which may be used
17/// to seek to previous frames in the stream.
18pub struct MetricsLogReader<R: Read> {
19    reader: R,
20    pub header: LogHeader,
21    last_read_ts: UnixTimestampMilliseconds,
22}
23
24impl<R: Read> MetricsLogReader<R> {
25    /// Constructs a bare metrics log reader.
26    pub fn new(mut reader: R) -> anyhow::Result<Self> {
27        let header: LogHeader = serde_bare::from_reader(&mut reader)?;
28        if header.bare_metrics_version != get_supported_version() {
29            bail!("Wrong version. Expected {:?} got {:?}. Later versions of Bare Metrics may use a stable format.", get_supported_version(), header.bare_metrics_version);
30        }
31        let last_read_ts = header.start_time;
32        Ok(MetricsLogReader {
33            reader,
34            header,
35            last_read_ts,
36        })
37    }
38
39    /// Reads a frame.
40    /// Returns the start time of the frame and the frame itself.
41    pub fn read_frame(&mut self) -> anyhow::Result<Option<(UnixTimestampMilliseconds, Frame)>> {
42        let mut interceptor = EofTrackingReadInterceptor::new(&mut self.reader);
43        match serde_bare::from_reader::<_, Frame>(&mut interceptor) {
44            Ok(frame) => {
45                let start_ts = self.last_read_ts;
46                self.last_read_ts = frame.end_time;
47                Ok(Some((start_ts, frame)))
48            }
49            // This doesn't seem to work properly for some reason...
50            Err(err) if err.classify().is_eof() => Ok(None),
51            Err(other_err) => {
52                let eof_flag = interceptor.was_eof();
53                if eof_flag == Some(true) {
54                    Ok(None)
55                } else {
56                    bail!(
57                        "Failed to read frame: {:?} class {:?}, intercepted eof flag {:?}",
58                        other_err,
59                        other_err.classify(),
60                        eof_flag
61                    );
62                }
63            }
64        }
65    }
66}
67
68struct EofTrackingReadInterceptor<R: Read> {
69    inner: R,
70    /// None if no reads have taken place yet.
71    /// Some(true) if the first read that took place and was EOF
72    /// Some(false) if the first read that took place and was not EOF
73    was_eof_flag: Option<bool>,
74}
75
76impl<R: Read> EofTrackingReadInterceptor<R> {
77    pub fn new(inner: R) -> EofTrackingReadInterceptor<R> {
78        EofTrackingReadInterceptor {
79            inner,
80            was_eof_flag: None,
81        }
82    }
83
84    pub fn was_eof(self) -> Option<bool> {
85        self.was_eof_flag
86    }
87}
88
89impl<R: Read> Read for EofTrackingReadInterceptor<R> {
90    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
91        if self.was_eof_flag.is_none() {
92            let count = self.inner.read(buf)?;
93            if count == 0 {
94                self.was_eof_flag = Some(true);
95            } else {
96                self.was_eof_flag = Some(false);
97            }
98            Ok(count)
99        } else {
100            self.inner.read(buf)
101        }
102    }
103}
104
105impl<R: Read + Seek> MetricsLogReader<R> {
106    /// Reads a new frame, and returns a seek token that can be used to rewind such that you can
107    /// 'undo' the read.
108    pub fn read_frame_rewindable(
109        &mut self,
110    ) -> anyhow::Result<Option<(SeekToken, UnixTimestampMilliseconds, Frame)>> {
111        let current_pos_in_file = self.reader.stream_position()?;
112        // TODO should we rewind in case of error?
113        if let Some((timestamp, frame)) = self.read_frame()? {
114            Ok(Some((
115                SeekToken {
116                    start_ts: timestamp,
117                    offset: current_pos_in_file,
118                },
119                timestamp,
120                frame,
121            )))
122        } else {
123            // EOF (no more things to read here).
124            // Rewind to where we were before.
125            self.reader.seek(SeekFrom::Start(current_pos_in_file))?;
126            Ok(None)
127        }
128    }
129
130    /// Seeks to a position in the stream.
131    /// The given seek token MUST have come from this instance's `read_from_rewindable` function.
132    /// Otherwise, corrupt frames may be read.
133    /// The old position is returned as a seek token.
134    pub fn seek(&mut self, seek_token: SeekToken) -> anyhow::Result<SeekToken> {
135        let SeekToken {
136            start_ts: seek_timestamp,
137            offset: seek_pos,
138        } = seek_token;
139        let old_pos_in_file = self.reader.stream_position()?;
140        let old_timestamp = self.last_read_ts;
141
142        // TODO should we rewind in case of error?
143        self.reader.seek(SeekFrom::Start(seek_pos))?;
144        self.last_read_ts = seek_timestamp;
145
146        Ok(SeekToken {
147            start_ts: old_timestamp,
148            offset: old_pos_in_file,
149        })
150    }
151}