1use byteorder::{ByteOrder, LittleEndian};
22use std::io::{Cursor, Read, Seek, SeekFrom};
23
24use crate::IdbError;
25
26use super::checksum::validate_event_checksum;
27use super::constants::*;
28use super::event::{BinlogEvent, BinlogEventType, CommonEventHeader};
29use super::header::{FormatDescriptionEvent, RotateEvent};
30
31trait ReadSeek: Read + Seek {}
33impl<T: Read + Seek> ReadSeek for T {}
34
35pub struct BinlogFile {
40 reader: Box<dyn ReadSeek>,
41 file_size: u64,
42 fde: Option<FormatDescriptionEvent>,
43}
44
45impl BinlogFile {
46 pub fn open(path: &str) -> Result<Self, IdbError> {
50 let file = std::fs::File::open(path)
51 .map_err(|e| IdbError::Io(format!("cannot open {path}: {e}")))?;
52 let metadata = file
53 .metadata()
54 .map_err(|e| IdbError::Io(format!("cannot stat {path}: {e}")))?;
55 let file_size = metadata.len();
56
57 let mut binlog = Self {
58 reader: Box::new(file),
59 file_size,
60 fde: None,
61 };
62 binlog.read_header()?;
63 Ok(binlog)
64 }
65
66 pub fn from_bytes(data: Vec<u8>) -> Result<Self, IdbError> {
68 let file_size = data.len() as u64;
69 let mut binlog = Self {
70 reader: Box::new(Cursor::new(data)),
71 file_size,
72 fde: None,
73 };
74 binlog.read_header()?;
75 Ok(binlog)
76 }
77
78 pub fn file_size(&self) -> u64 {
80 self.file_size
81 }
82
83 pub fn format_description(&self) -> Option<&FormatDescriptionEvent> {
85 self.fde.as_ref()
86 }
87
88 pub fn has_checksum(&self) -> bool {
90 self.fde.as_ref().is_some_and(|f| f.has_checksum())
91 }
92
93 pub fn read_event_at(
98 &mut self,
99 offset: u64,
100 ) -> Result<Option<(CommonEventHeader, BinlogEvent)>, IdbError> {
101 if offset >= self.file_size {
102 return Ok(None);
103 }
104
105 self.reader
106 .seek(SeekFrom::Start(offset))
107 .map_err(|e| IdbError::Io(format!("seek to offset {offset}: {e}")))?;
108
109 let mut hdr_buf = [0u8; COMMON_HEADER_SIZE];
111 if let Err(e) = self.reader.read_exact(&mut hdr_buf) {
112 if e.kind() == std::io::ErrorKind::UnexpectedEof {
113 return Ok(None);
114 }
115 return Err(IdbError::Io(format!("read event header at {offset}: {e}")));
116 }
117
118 let header = CommonEventHeader::parse(&hdr_buf)
119 .ok_or_else(|| IdbError::Parse("invalid event header".into()))?;
120
121 let event_len = header.event_length as usize;
122 if event_len < COMMON_HEADER_SIZE {
123 return Err(IdbError::Parse(format!(
124 "event at offset {offset} has invalid length {event_len}"
125 )));
126 }
127
128 let mut event_data = vec![0u8; event_len];
130 event_data[..COMMON_HEADER_SIZE].copy_from_slice(&hdr_buf);
131 if event_len > COMMON_HEADER_SIZE {
132 self.reader
133 .read_exact(&mut event_data[COMMON_HEADER_SIZE..])
134 .map_err(|e| IdbError::Io(format!("read event body at {offset}: {e}")))?;
135 }
136
137 let checksum_enabled = self.has_checksum();
138
139 let payload_end = if checksum_enabled {
141 event_len.saturating_sub(BINLOG_CHECKSUM_LEN)
142 } else {
143 event_len
144 };
145 let payload = &event_data[COMMON_HEADER_SIZE..payload_end];
146
147 let event = match header.type_code {
148 BinlogEventType::FormatDescription => {
149 let fde_with_crc = if !checksum_enabled
155 && event_len > COMMON_HEADER_SIZE + BINLOG_CHECKSUM_LEN
156 {
157 let stripped = &event_data[COMMON_HEADER_SIZE..event_len - BINLOG_CHECKSUM_LEN];
158 FormatDescriptionEvent::parse(stripped).filter(|fde| fde.has_checksum())
159 } else {
160 None
161 };
162
163 let parsed = fde_with_crc.or_else(|| FormatDescriptionEvent::parse(payload));
164
165 match parsed {
166 Some(fde) => BinlogEvent::FormatDescription(fde),
167 None => BinlogEvent::Unknown {
168 type_code: header.type_code.type_code(),
169 payload: payload.to_vec(),
170 },
171 }
172 }
173 BinlogEventType::RotateEvent => match RotateEvent::parse(payload) {
174 Some(re) => BinlogEvent::Rotate(re),
175 None => BinlogEvent::Unknown {
176 type_code: header.type_code.type_code(),
177 payload: payload.to_vec(),
178 },
179 },
180 BinlogEventType::StopEvent => BinlogEvent::Stop,
181 BinlogEventType::QueryEvent => BinlogEvent::Query {
182 payload: payload.to_vec(),
183 },
184 BinlogEventType::XidEvent => {
185 if payload.len() >= 8 {
186 let xid = LittleEndian::read_u64(payload);
187 BinlogEvent::Xid { xid }
188 } else {
189 BinlogEvent::Unknown {
190 type_code: header.type_code.type_code(),
191 payload: payload.to_vec(),
192 }
193 }
194 }
195 _ => BinlogEvent::Unknown {
196 type_code: header.type_code.type_code(),
197 payload: payload.to_vec(),
198 },
199 };
200
201 Ok(Some((header, event)))
202 }
203
204 pub fn events(&mut self) -> BinlogEventIterator<'_> {
208 BinlogEventIterator {
209 binlog: self,
210 offset: BINLOG_MAGIC_SIZE as u64,
211 done: false,
212 }
213 }
214
215 pub fn validate_checksum_at(&mut self, offset: u64) -> Result<Option<bool>, IdbError> {
220 if !self.has_checksum() {
221 return Ok(None);
222 }
223
224 self.reader
225 .seek(SeekFrom::Start(offset))
226 .map_err(|e| IdbError::Io(format!("seek to {offset}: {e}")))?;
227
228 let mut hdr_buf = [0u8; COMMON_HEADER_SIZE];
230 self.reader
231 .read_exact(&mut hdr_buf)
232 .map_err(|e| IdbError::Io(format!("read header at {offset}: {e}")))?;
233
234 let header = CommonEventHeader::parse(&hdr_buf)
235 .ok_or_else(|| IdbError::Parse("invalid event header".into()))?;
236
237 let event_len = header.event_length as usize;
238 let mut event_data = vec![0u8; event_len];
239 event_data[..COMMON_HEADER_SIZE].copy_from_slice(&hdr_buf);
240 if event_len > COMMON_HEADER_SIZE {
241 self.reader
242 .read_exact(&mut event_data[COMMON_HEADER_SIZE..])
243 .map_err(|e| IdbError::Io(format!("read event at {offset}: {e}")))?;
244 }
245
246 Ok(Some(validate_event_checksum(&event_data)))
247 }
248
249 fn read_header(&mut self) -> Result<(), IdbError> {
251 self.reader
252 .seek(SeekFrom::Start(0))
253 .map_err(|e| IdbError::Io(format!("seek to start: {e}")))?;
254
255 let mut magic = [0u8; BINLOG_MAGIC_SIZE];
257 self.reader
258 .read_exact(&mut magic)
259 .map_err(|e| IdbError::Io(format!("read magic bytes: {e}")))?;
260
261 if magic != BINLOG_MAGIC {
262 return Err(IdbError::Parse(format!(
263 "invalid binlog magic bytes: expected {:02x?}, got {:02x?}",
264 BINLOG_MAGIC, magic
265 )));
266 }
267
268 match self.read_event_at(BINLOG_MAGIC_SIZE as u64)? {
270 Some((header, BinlogEvent::FormatDescription(fde))) => {
271 if header.type_code != BinlogEventType::FormatDescription {
272 return Err(IdbError::Parse(
273 "first event is not FORMAT_DESCRIPTION_EVENT".into(),
274 ));
275 }
276 self.fde = Some(fde);
277 Ok(())
278 }
279 Some((header, _)) => Err(IdbError::Parse(format!(
280 "first event is {} (expected FORMAT_DESCRIPTION_EVENT)",
281 header.type_code
282 ))),
283 None => Err(IdbError::Parse("no events after magic bytes".into())),
284 }
285 }
286}
287
288pub struct BinlogEventIterator<'a> {
292 binlog: &'a mut BinlogFile,
293 offset: u64,
294 done: bool,
295}
296
297impl<'a> Iterator for BinlogEventIterator<'a> {
298 type Item = Result<(u64, CommonEventHeader, BinlogEvent), IdbError>;
299
300 fn next(&mut self) -> Option<Self::Item> {
301 if self.done {
302 return None;
303 }
304
305 let current_offset = self.offset;
306 match self.binlog.read_event_at(current_offset) {
307 Ok(Some((header, event))) => {
308 let next = header.next_position;
310 if next == 0 || next as u64 <= current_offset {
311 self.done = true;
314 } else {
315 self.offset = next as u64;
316 }
317 Some(Ok((current_offset, header, event)))
318 }
319 Ok(None) => {
320 self.done = true;
321 None
322 }
323 Err(e) => {
324 self.done = true;
325 Some(Err(e))
326 }
327 }
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334
335 fn build_synthetic_binlog() -> Vec<u8> {
339 let mut buf = Vec::new();
340
341 buf.extend_from_slice(&BINLOG_MAGIC);
343
344 let mut fde_payload = vec![0u8; 59]; LittleEndian::write_u16(&mut fde_payload[0..], 4); let ver = b"8.0.35";
348 fde_payload[2..2 + ver.len()].copy_from_slice(ver);
349 LittleEndian::write_u32(&mut fde_payload[52..], 1_700_000_000); fde_payload[56] = 19; fde_payload[57] = 0; fde_payload[58] = BINLOG_CHECKSUM_ALG_CRC32; let event_len = COMMON_HEADER_SIZE + fde_payload.len() + BINLOG_CHECKSUM_LEN;
355 let next_pos = BINLOG_MAGIC_SIZE + event_len;
356
357 let mut hdr = vec![0u8; COMMON_HEADER_SIZE];
359 LittleEndian::write_u32(&mut hdr[0..], 1_700_000_000); hdr[4] = FORMAT_DESCRIPTION_EVENT; LittleEndian::write_u32(&mut hdr[5..], 1); LittleEndian::write_u32(&mut hdr[9..], event_len as u32); LittleEndian::write_u32(&mut hdr[13..], next_pos as u32); LittleEndian::write_u16(&mut hdr[17..], LOG_EVENT_BINLOG_IN_USE_F); let mut event = Vec::new();
368 event.extend_from_slice(&hdr);
369 event.extend_from_slice(&fde_payload);
370
371 let crc = crc32c::crc32c(&event);
373 let mut crc_bytes = [0u8; 4];
374 LittleEndian::write_u32(&mut crc_bytes, crc);
375 event.extend_from_slice(&crc_bytes);
376
377 buf.extend_from_slice(&event);
378 buf
379 }
380
381 fn build_stop_event(offset: usize) -> Vec<u8> {
383 let event_len = COMMON_HEADER_SIZE + BINLOG_CHECKSUM_LEN;
384 let mut hdr = vec![0u8; COMMON_HEADER_SIZE];
385 LittleEndian::write_u32(&mut hdr[0..], 1_700_000_001);
386 hdr[4] = STOP_EVENT;
387 LittleEndian::write_u32(&mut hdr[5..], 1);
388 LittleEndian::write_u32(&mut hdr[9..], event_len as u32);
389 LittleEndian::write_u32(&mut hdr[13..], 0); let crc = crc32c::crc32c(&hdr);
392 let mut crc_bytes = [0u8; 4];
393 LittleEndian::write_u32(&mut crc_bytes, crc);
394
395 let mut event = hdr;
396 event.extend_from_slice(&crc_bytes);
397 let _ = offset; event
399 }
400
401 fn build_rotate_event(offset: usize, next_filename: &str) -> Vec<u8> {
403 let mut payload = vec![0u8; 8 + next_filename.len()];
404 LittleEndian::write_u64(&mut payload[0..], 4); payload[8..].copy_from_slice(next_filename.as_bytes());
406
407 let event_len = COMMON_HEADER_SIZE + payload.len() + BINLOG_CHECKSUM_LEN;
408 let mut hdr = vec![0u8; COMMON_HEADER_SIZE];
409 LittleEndian::write_u32(&mut hdr[0..], 1_700_000_002);
410 hdr[4] = ROTATE_EVENT;
411 LittleEndian::write_u32(&mut hdr[5..], 1);
412 LittleEndian::write_u32(&mut hdr[9..], event_len as u32);
413 LittleEndian::write_u32(&mut hdr[13..], (offset + event_len) as u32);
414
415 let mut event = Vec::new();
416 event.extend_from_slice(&hdr);
417 event.extend_from_slice(&payload);
418
419 let crc = crc32c::crc32c(&event);
420 let mut crc_bytes = [0u8; 4];
421 LittleEndian::write_u32(&mut crc_bytes, crc);
422 event.extend_from_slice(&crc_bytes);
423
424 event
425 }
426
427 #[test]
428 fn open_synthetic_binlog() {
429 let data = build_synthetic_binlog();
430 let binlog = BinlogFile::from_bytes(data).unwrap();
431
432 assert!(binlog.has_checksum());
433 let fde = binlog.format_description().unwrap();
434 assert_eq!(fde.binlog_version, 4);
435 assert_eq!(fde.server_version, "8.0.35");
436 assert_eq!(fde.header_length, 19);
437 }
438
439 #[test]
440 fn invalid_magic_bytes() {
441 let data = vec![0u8; 100];
442 match BinlogFile::from_bytes(data) {
443 Err(IdbError::Parse(msg)) => assert!(msg.contains("magic bytes")),
444 Ok(_) => panic!("expected Parse error, got Ok"),
445 Err(e) => panic!("expected Parse error, got: {e}"),
446 }
447 }
448
449 #[test]
450 fn iterate_events() {
451 let mut data = build_synthetic_binlog();
452
453 let rotate_offset = data.len();
455 let rotate = build_rotate_event(rotate_offset, "mysql-bin.000002");
456
457 let fde_next_pos_offset = BINLOG_MAGIC_SIZE + EVENT_NEXT_POSITION_OFFSET;
462 LittleEndian::write_u32(&mut data[fde_next_pos_offset..], rotate_offset as u32);
463 let fde_event_start = BINLOG_MAGIC_SIZE;
465 let fde_event_len =
466 LittleEndian::read_u32(&data[fde_event_start + EVENT_LENGTH_OFFSET..]) as usize;
467 let fde_crc_offset = fde_event_start + fde_event_len - BINLOG_CHECKSUM_LEN;
468 let crc = crc32c::crc32c(&data[fde_event_start..fde_crc_offset]);
469 LittleEndian::write_u32(&mut data[fde_crc_offset..], crc);
470
471 data.extend_from_slice(&rotate);
472
473 let stop_offset = data.len();
475 let stop = build_stop_event(stop_offset);
476
477 let rotate_next_pos_offset = rotate_offset + EVENT_NEXT_POSITION_OFFSET;
479 LittleEndian::write_u32(&mut data[rotate_next_pos_offset..], stop_offset as u32);
480 let rotate_event_len =
482 LittleEndian::read_u32(&data[rotate_offset + EVENT_LENGTH_OFFSET..]) as usize;
483 let rotate_crc_offset = rotate_offset + rotate_event_len - BINLOG_CHECKSUM_LEN;
484 let crc = crc32c::crc32c(&data[rotate_offset..rotate_crc_offset]);
485 LittleEndian::write_u32(&mut data[rotate_crc_offset..], crc);
486
487 data.extend_from_slice(&stop);
488
489 let mut binlog = BinlogFile::from_bytes(data).unwrap();
490
491 let events: Vec<_> = binlog.events().collect::<Result<Vec<_>, _>>().unwrap();
492
493 assert_eq!(events.len(), 3);
494
495 assert_eq!(events[0].1.type_code, BinlogEventType::FormatDescription);
497 assert!(matches!(events[0].2, BinlogEvent::FormatDescription(_)));
498
499 assert_eq!(events[1].1.type_code, BinlogEventType::RotateEvent);
501 if let BinlogEvent::Rotate(ref re) = events[1].2 {
502 assert_eq!(re.next_filename, "mysql-bin.000002");
503 assert_eq!(re.position, 4);
504 } else {
505 panic!("expected Rotate event");
506 }
507
508 assert_eq!(events[2].1.type_code, BinlogEventType::StopEvent);
510 assert!(matches!(events[2].2, BinlogEvent::Stop));
511 }
512
513 #[test]
514 fn validate_checksum_at_offset() {
515 let data = build_synthetic_binlog();
516 let mut binlog = BinlogFile::from_bytes(data).unwrap();
517
518 let result = binlog
519 .validate_checksum_at(BINLOG_MAGIC_SIZE as u64)
520 .unwrap();
521 assert_eq!(result, Some(true));
522 }
523
524 #[test]
525 fn xid_event_parsing() {
526 let mut data = build_synthetic_binlog();
527
528 let xid_offset = data.len();
530 let xid_value: u64 = 42;
531 let mut xid_payload = [0u8; 8];
532 LittleEndian::write_u64(&mut xid_payload, xid_value);
533
534 let event_len = COMMON_HEADER_SIZE + 8 + BINLOG_CHECKSUM_LEN;
535 let mut hdr = vec![0u8; COMMON_HEADER_SIZE];
536 LittleEndian::write_u32(&mut hdr[0..], 1_700_000_003);
537 hdr[4] = XID_EVENT;
538 LittleEndian::write_u32(&mut hdr[5..], 1);
539 LittleEndian::write_u32(&mut hdr[9..], event_len as u32);
540 LittleEndian::write_u32(&mut hdr[13..], 0); let mut event = Vec::new();
543 event.extend_from_slice(&hdr);
544 event.extend_from_slice(&xid_payload);
545 let crc = crc32c::crc32c(&event);
546 let mut crc_bytes = [0u8; 4];
547 LittleEndian::write_u32(&mut crc_bytes, crc);
548 event.extend_from_slice(&crc_bytes);
549
550 let fde_next_pos_offset = BINLOG_MAGIC_SIZE + EVENT_NEXT_POSITION_OFFSET;
552 LittleEndian::write_u32(&mut data[fde_next_pos_offset..], xid_offset as u32);
553 let fde_event_start = BINLOG_MAGIC_SIZE;
554 let fde_event_len =
555 LittleEndian::read_u32(&data[fde_event_start + EVENT_LENGTH_OFFSET..]) as usize;
556 let fde_crc_offset = fde_event_start + fde_event_len - BINLOG_CHECKSUM_LEN;
557 let crc = crc32c::crc32c(&data[fde_event_start..fde_crc_offset]);
558 LittleEndian::write_u32(&mut data[fde_crc_offset..], crc);
559
560 data.extend_from_slice(&event);
561
562 let mut binlog = BinlogFile::from_bytes(data).unwrap();
563 let events: Vec<_> = binlog.events().collect::<Result<Vec<_>, _>>().unwrap();
564
565 assert_eq!(events.len(), 2);
566 if let BinlogEvent::Xid { xid } = &events[1].2 {
567 assert_eq!(*xid, 42);
568 } else {
569 panic!("expected Xid event");
570 }
571 }
572}