audit_trail/readers/
file.rs1use std::fs::File;
4use std::io::{self, BufReader, Read};
5use std::path::Path;
6
7use crate::codec;
8use crate::error::{Error, Result};
9use crate::owned::OwnedRecord;
10
11pub struct FileReader<R: Read> {
18 reader: R,
19 state: State,
20 scratch: Vec<u8>,
22}
23
24enum State {
25 Fresh,
27 Reading,
29 Done,
31}
32
33impl FileReader<BufReader<File>> {
34 pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
41 let file = File::open(path)?;
42 Ok(Self::new(BufReader::new(file)))
43 }
44}
45
46impl<R: Read> FileReader<R> {
47 #[inline]
50 pub fn new(reader: R) -> Self {
51 Self {
52 reader,
53 state: State::Fresh,
54 scratch: Vec::with_capacity(256),
55 }
56 }
57
58 #[inline]
60 pub fn into_reader(self) -> R {
61 self.reader
62 }
63
64 fn read_header(&mut self) -> Result<()> {
65 let mut header = [0u8; codec::FILE_HEADER_LEN];
66 match self.reader.read_exact(&mut header) {
67 Ok(()) => codec::verify_file_header(&header),
68 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => Err(Error::Truncated),
69 Err(_) => Err(Error::Io),
70 }
71 }
72
73 fn read_record(&mut self) -> Option<Result<OwnedRecord>> {
74 let mut len_buf = [0u8; 4];
75 match self.reader.read_exact(&mut len_buf) {
76 Ok(()) => {}
77 Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return None,
78 Err(_) => return Some(Err(Error::Io)),
79 }
80 let body_len = u32::from_be_bytes(len_buf) as usize;
81 let frame_len = 4 + body_len;
82 self.scratch.clear();
83 self.scratch.resize(frame_len, 0);
84 self.scratch[0..4].copy_from_slice(&len_buf);
85 if let Err(e) = self.reader.read_exact(&mut self.scratch[4..]) {
86 return Some(Err(match e.kind() {
87 io::ErrorKind::UnexpectedEof => Error::Truncated,
88 _ => Error::Io,
89 }));
90 }
91 match codec::decode_record(&self.scratch) {
92 Ok((record, consumed)) if consumed == frame_len => Some(Ok(record)),
93 Ok(_) => Some(Err(Error::InvalidFormat)),
94 Err(e) => Some(Err(e)),
95 }
96 }
97}
98
99impl<R: Read> Iterator for FileReader<R> {
100 type Item = Result<OwnedRecord>;
101
102 fn next(&mut self) -> Option<Self::Item> {
103 loop {
104 match self.state {
105 State::Fresh => match self.read_header() {
106 Ok(()) => self.state = State::Reading,
107 Err(e) => {
108 self.state = State::Done;
109 return Some(Err(e));
110 }
111 },
112 State::Reading => match self.read_record() {
113 None => {
114 self.state = State::Done;
115 return None;
116 }
117 Some(Err(e)) => {
118 self.state = State::Done;
119 return Some(Err(e));
120 }
121 Some(Ok(record)) => return Some(Ok(record)),
122 },
123 State::Done => return None,
124 }
125 }
126 }
127}