1use std::collections::HashMap;
2use std::fs;
3use std::io::{Read, Seek, SeekFrom};
4use std::path::{Path, PathBuf};
5use std::sync::Arc;
6
7use crate::error::BinlogError;
8use crate::format::MessageFormat;
9use crate::reader::Reader;
10use crate::HEADER_MAGIC;
11
12const TAIL_SCAN_SIZE: u64 = 65536;
13
14pub struct File {
26 path: PathBuf,
27}
28
29impl File {
30 pub fn open(path: impl AsRef<Path>) -> Result<Self, BinlogError> {
32 let path = path.as_ref().to_path_buf();
33 fs::metadata(&path)?;
35 Ok(File { path })
36 }
37
38 pub fn entries(&self) -> Result<Reader<fs::File>, BinlogError> {
40 let file = fs::File::open(&self.path)?;
41 Ok(Reader::new(file))
42 }
43
44 pub fn time_range(&self) -> Result<Option<(u64, u64)>, BinlogError> {
56 let metadata = fs::metadata(&self.path)?;
57 let file_size = metadata.len();
58 if file_size == 0 {
59 return Ok(None);
60 }
61
62 let mut reader = self.entries()?;
64 let mut first_ts: Option<u64> = None;
65
66 for result in reader.by_ref() {
68 let entry = result?;
69 if let Some(ts) = entry.timestamp_usec {
70 first_ts = Some(ts);
71 break;
72 }
73 }
74
75 let first_ts = match first_ts {
76 Some(ts) => ts,
77 None => return Ok(None),
78 };
79
80 let formats = reader.formats().clone();
82
83 let mut file = fs::File::open(&self.path)?;
85 let seek_pos = file_size.saturating_sub(TAIL_SCAN_SIZE);
86 file.seek(SeekFrom::Start(seek_pos))?;
87
88 let mut tail_buf = Vec::new();
89 file.read_to_end(&mut tail_buf)?;
90
91 let last_ts = scan_tail_for_last_timestamp(&tail_buf, &formats);
92
93 match last_ts {
94 Some(ts) => Ok(Some((first_ts, ts.max(first_ts)))),
95 None => Ok(Some((first_ts, first_ts))),
96 }
97 }
98}
99
100fn scan_tail_for_last_timestamp(
103 buf: &[u8],
104 formats: &HashMap<u8, Arc<MessageFormat>>,
105) -> Option<u64> {
106 let mut max_ts: Option<u64> = None;
107 let mut pos = 0;
108
109 while pos + 3 <= buf.len() {
110 if buf[pos] == HEADER_MAGIC[0] && pos + 1 < buf.len() && buf[pos + 1] == HEADER_MAGIC[1] {
112 let msg_type = buf[pos + 2];
113 if let Some(fmt) = formats.get(&msg_type) {
114 let msg_len = fmt.msg_len as usize;
115 if pos + msg_len <= buf.len() {
116 let payload = &buf[(pos + 3)..(pos + msg_len)];
117 if let Some(ts) = fmt.extract_timestamp(payload) {
118 max_ts = Some(max_ts.map_or(ts, |prev: u64| prev.max(ts)));
119 }
120 pos += msg_len;
121 continue;
122 }
123 }
124 }
125 pos += 1;
126 }
127
128 max_ts
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use std::io::Write;
135
136 #[test]
137 fn open_nonexistent_file() {
138 assert!(File::open("/tmp/definitely_does_not_exist_12345.bin").is_err());
139 }
140
141 #[test]
142 fn open_and_entries_empty_file() {
143 let dir = std::env::temp_dir().join("binlog_test_empty");
144 fs::create_dir_all(&dir).ok();
145 let path = dir.join("empty.bin");
146 fs::write(&path, []).unwrap();
147
148 let file = File::open(&path).unwrap();
149 let entries: Vec<_> = file
150 .entries()
151 .unwrap()
152 .collect::<Result<Vec<_>, _>>()
153 .unwrap();
154 assert!(entries.is_empty());
155
156 assert_eq!(file.time_range().unwrap(), None);
157 fs::remove_file(&path).ok();
158 }
159
160 #[test]
161 fn time_range_with_data() {
162 let dir = std::env::temp_dir().join("binlog_test_time_range");
163 fs::create_dir_all(&dir).ok();
164 let path = dir.join("time_range.bin");
165
166 let mut data = Vec::new();
167
168 data.extend_from_slice(&HEADER_MAGIC);
170 data.push(0x80);
171 let mut payload = [0u8; 86];
172 payload[0] = 0x80;
173 payload[1] = 89;
174 payload[2..6].copy_from_slice(b"FMT\0");
175 payload[6..11].copy_from_slice(b"BBnNZ");
176 let labels = b"Type,Length,Name,Format,Labels";
177 payload[22..22 + labels.len()].copy_from_slice(labels);
178 data.extend_from_slice(&payload);
179
180 data.extend_from_slice(&HEADER_MAGIC);
182 data.push(0x80);
183 let mut payload = [0u8; 86];
184 payload[0] = 0x81;
185 payload[1] = 11;
186 payload[2..6].copy_from_slice(b"TST\0");
187 payload[6..7].copy_from_slice(b"Q");
188 payload[22..28].copy_from_slice(b"TimeUS");
189 data.extend_from_slice(&payload);
190
191 data.extend_from_slice(&HEADER_MAGIC);
193 data.push(0x81);
194 data.extend_from_slice(&1000u64.to_le_bytes());
195
196 data.extend_from_slice(&HEADER_MAGIC);
198 data.push(0x81);
199 data.extend_from_slice(&5000u64.to_le_bytes());
200
201 let mut f = fs::File::create(&path).unwrap();
202 f.write_all(&data).unwrap();
203 drop(f);
204
205 let file = File::open(&path).unwrap();
206 let range = file.time_range().unwrap();
207 assert_eq!(range, Some((1000, 5000)));
208
209 fs::remove_file(&path).ok();
210 }
211
212 #[test]
213 fn time_range_fmt_only() {
214 let dir = std::env::temp_dir().join("binlog_test_fmt_only");
215 fs::create_dir_all(&dir).ok();
216 let path = dir.join("fmt_only.bin");
217
218 let mut data = Vec::new();
219 data.extend_from_slice(&HEADER_MAGIC);
221 data.push(0x80);
222 let mut payload = [0u8; 86];
223 payload[0] = 0x80;
224 payload[1] = 89;
225 payload[2..6].copy_from_slice(b"FMT\0");
226 payload[6..11].copy_from_slice(b"BBnNZ");
227 let labels = b"Type,Length,Name,Format,Labels";
228 payload[22..22 + labels.len()].copy_from_slice(labels);
229 data.extend_from_slice(&payload);
230
231 fs::write(&path, &data).unwrap();
232
233 let file = File::open(&path).unwrap();
234 assert_eq!(file.time_range().unwrap(), None);
235
236 fs::remove_file(&path).ok();
237 }
238}