Skip to main content

idb/binlog/
file.rs

1//! Binlog file reader with event iteration.
2//!
3//! [`BinlogFile`] opens a MySQL binary log file, validates the 4-byte magic
4//! header, reads the FORMAT_DESCRIPTION_EVENT, and provides random-access
5//! event reading and sequential iteration.
6//!
7//! # Examples
8//!
9//! ```rust,ignore
10//! use idb::binlog::BinlogFile;
11//!
12//! let mut binlog = BinlogFile::open("mysql-bin.000001")?;
13//! println!("Server: {}", binlog.format_description().unwrap().server_version);
14//!
15//! for result in binlog.events() {
16//!     let (offset, header, event) = result?;
17//!     println!("{offset}: {}", header.type_code);
18//! }
19//! ```
20
21use byteorder::{ByteOrder, LittleEndian};
22use std::io::{Cursor, Read, Seek, SeekFrom};
23
24use crate::IdbError;
25
26use super::checksum::validate_event_checksum;
27use super::constants::*;
28use super::event::{BinlogEvent, BinlogEventType, CommonEventHeader};
29use super::header::{FormatDescriptionEvent, RotateEvent};
30
31/// Supertrait combining `Read + Seek` for type-erased readers.
32trait ReadSeek: Read + Seek {}
33impl<T: Read + Seek> ReadSeek for T {}
34
35/// A MySQL binary log file reader.
36///
37/// Validates the magic bytes on open, parses the FORMAT_DESCRIPTION_EVENT,
38/// and provides methods to read individual events or iterate through all of them.
39pub struct BinlogFile {
40    reader: Box<dyn ReadSeek>,
41    file_size: u64,
42    fde: Option<FormatDescriptionEvent>,
43}
44
45impl BinlogFile {
46    /// Open a binlog file from the filesystem.
47    ///
48    /// Validates the 4-byte magic header and reads the FORMAT_DESCRIPTION_EVENT.
49    pub fn open(path: &str) -> Result<Self, IdbError> {
50        let file = std::fs::File::open(path)
51            .map_err(|e| IdbError::Io(format!("cannot open {path}: {e}")))?;
52        let metadata = file
53            .metadata()
54            .map_err(|e| IdbError::Io(format!("cannot stat {path}: {e}")))?;
55        let file_size = metadata.len();
56
57        let mut binlog = Self {
58            reader: Box::new(file),
59            file_size,
60            fde: None,
61        };
62        binlog.read_header()?;
63        Ok(binlog)
64    }
65
66    /// Create a binlog reader from in-memory bytes (useful for WASM).
67    pub fn from_bytes(data: Vec<u8>) -> Result<Self, IdbError> {
68        let file_size = data.len() as u64;
69        let mut binlog = Self {
70            reader: Box::new(Cursor::new(data)),
71            file_size,
72            fde: None,
73        };
74        binlog.read_header()?;
75        Ok(binlog)
76    }
77
78    /// Total file size in bytes.
79    pub fn file_size(&self) -> u64 {
80        self.file_size
81    }
82
83    /// The FORMAT_DESCRIPTION_EVENT parsed from the file header, if available.
84    pub fn format_description(&self) -> Option<&FormatDescriptionEvent> {
85        self.fde.as_ref()
86    }
87
88    /// Whether CRC-32 checksums are enabled for events in this file.
89    pub fn has_checksum(&self) -> bool {
90        self.fde.as_ref().is_some_and(|f| f.has_checksum())
91    }
92
93    /// Read and parse the event at the given absolute file offset.
94    ///
95    /// Returns `Ok(None)` if the offset is at or beyond EOF. Returns the
96    /// common header and parsed event payload on success.
97    pub fn read_event_at(
98        &mut self,
99        offset: u64,
100    ) -> Result<Option<(CommonEventHeader, BinlogEvent)>, IdbError> {
101        if offset >= self.file_size {
102            return Ok(None);
103        }
104
105        self.reader
106            .seek(SeekFrom::Start(offset))
107            .map_err(|e| IdbError::Io(format!("seek to offset {offset}: {e}")))?;
108
109        // Read common header
110        let mut hdr_buf = [0u8; COMMON_HEADER_SIZE];
111        if let Err(e) = self.reader.read_exact(&mut hdr_buf) {
112            if e.kind() == std::io::ErrorKind::UnexpectedEof {
113                return Ok(None);
114            }
115            return Err(IdbError::Io(format!("read event header at {offset}: {e}")));
116        }
117
118        let header = CommonEventHeader::parse(&hdr_buf)
119            .ok_or_else(|| IdbError::Parse("invalid event header".into()))?;
120
121        let event_len = header.event_length as usize;
122        if event_len < COMMON_HEADER_SIZE {
123            return Err(IdbError::Parse(format!(
124                "event at offset {offset} has invalid length {event_len}"
125            )));
126        }
127
128        // Read full event data (including header) for checksum validation
129        let mut event_data = vec![0u8; event_len];
130        event_data[..COMMON_HEADER_SIZE].copy_from_slice(&hdr_buf);
131        if event_len > COMMON_HEADER_SIZE {
132            self.reader
133                .read_exact(&mut event_data[COMMON_HEADER_SIZE..])
134                .map_err(|e| IdbError::Io(format!("read event body at {offset}: {e}")))?;
135        }
136
137        let checksum_enabled = self.has_checksum();
138
139        // Extract payload (between header and checksum)
140        let payload_end = if checksum_enabled {
141            event_len.saturating_sub(BINLOG_CHECKSUM_LEN)
142        } else {
143            event_len
144        };
145        let payload = &event_data[COMMON_HEADER_SIZE..payload_end];
146
147        let event = match header.type_code {
148            BinlogEventType::FormatDescription => {
149                // Bootstrap: when reading the FDE, self.fde is None so
150                // checksum_enabled is false and the CRC-32 bytes (if present)
151                // are still in `payload`. Try parsing with CRC stripped first
152                // (most common case since MySQL 5.6.6+), then fall back to
153                // parsing the full payload.
154                let fde_with_crc = if !checksum_enabled
155                    && event_len > COMMON_HEADER_SIZE + BINLOG_CHECKSUM_LEN
156                {
157                    let stripped = &event_data[COMMON_HEADER_SIZE..event_len - BINLOG_CHECKSUM_LEN];
158                    FormatDescriptionEvent::parse(stripped).filter(|fde| fde.has_checksum())
159                } else {
160                    None
161                };
162
163                let parsed = fde_with_crc.or_else(|| FormatDescriptionEvent::parse(payload));
164
165                match parsed {
166                    Some(fde) => BinlogEvent::FormatDescription(fde),
167                    None => BinlogEvent::Unknown {
168                        type_code: header.type_code.type_code(),
169                        payload: payload.to_vec(),
170                    },
171                }
172            }
173            BinlogEventType::RotateEvent => match RotateEvent::parse(payload) {
174                Some(re) => BinlogEvent::Rotate(re),
175                None => BinlogEvent::Unknown {
176                    type_code: header.type_code.type_code(),
177                    payload: payload.to_vec(),
178                },
179            },
180            BinlogEventType::StopEvent => BinlogEvent::Stop,
181            BinlogEventType::QueryEvent => BinlogEvent::Query {
182                payload: payload.to_vec(),
183            },
184            BinlogEventType::XidEvent => {
185                if payload.len() >= 8 {
186                    let xid = LittleEndian::read_u64(payload);
187                    BinlogEvent::Xid { xid }
188                } else {
189                    BinlogEvent::Unknown {
190                        type_code: header.type_code.type_code(),
191                        payload: payload.to_vec(),
192                    }
193                }
194            }
195            _ => BinlogEvent::Unknown {
196                type_code: header.type_code.type_code(),
197                payload: payload.to_vec(),
198            },
199        };
200
201        Ok(Some((header, event)))
202    }
203
204    /// Return an iterator over all events in the file, starting after the magic bytes.
205    ///
206    /// Each item is `(file_offset, CommonEventHeader, BinlogEvent)`.
207    pub fn events(&mut self) -> BinlogEventIterator<'_> {
208        BinlogEventIterator {
209            binlog: self,
210            offset: BINLOG_MAGIC_SIZE as u64,
211            done: false,
212        }
213    }
214
215    /// Validate the CRC-32C checksum of the event at the given offset.
216    ///
217    /// Returns `None` if the event cannot be read, `Some(true)` if valid,
218    /// `Some(false)` if the checksum does not match.
219    pub fn validate_checksum_at(&mut self, offset: u64) -> Result<Option<bool>, IdbError> {
220        if !self.has_checksum() {
221            return Ok(None);
222        }
223
224        self.reader
225            .seek(SeekFrom::Start(offset))
226            .map_err(|e| IdbError::Io(format!("seek to {offset}: {e}")))?;
227
228        // Read just the header to get event length
229        let mut hdr_buf = [0u8; COMMON_HEADER_SIZE];
230        self.reader
231            .read_exact(&mut hdr_buf)
232            .map_err(|e| IdbError::Io(format!("read header at {offset}: {e}")))?;
233
234        let header = CommonEventHeader::parse(&hdr_buf)
235            .ok_or_else(|| IdbError::Parse("invalid event header".into()))?;
236
237        let event_len = header.event_length as usize;
238        let mut event_data = vec![0u8; event_len];
239        event_data[..COMMON_HEADER_SIZE].copy_from_slice(&hdr_buf);
240        if event_len > COMMON_HEADER_SIZE {
241            self.reader
242                .read_exact(&mut event_data[COMMON_HEADER_SIZE..])
243                .map_err(|e| IdbError::Io(format!("read event at {offset}: {e}")))?;
244        }
245
246        Ok(Some(validate_event_checksum(&event_data)))
247    }
248
249    /// Read and validate the magic bytes and FORMAT_DESCRIPTION_EVENT.
250    fn read_header(&mut self) -> Result<(), IdbError> {
251        self.reader
252            .seek(SeekFrom::Start(0))
253            .map_err(|e| IdbError::Io(format!("seek to start: {e}")))?;
254
255        // Validate magic bytes
256        let mut magic = [0u8; BINLOG_MAGIC_SIZE];
257        self.reader
258            .read_exact(&mut magic)
259            .map_err(|e| IdbError::Io(format!("read magic bytes: {e}")))?;
260
261        if magic != BINLOG_MAGIC {
262            return Err(IdbError::Parse(format!(
263                "invalid binlog magic bytes: expected {:02x?}, got {:02x?}",
264                BINLOG_MAGIC, magic
265            )));
266        }
267
268        // Read the first event (should be FORMAT_DESCRIPTION_EVENT)
269        match self.read_event_at(BINLOG_MAGIC_SIZE as u64)? {
270            Some((header, BinlogEvent::FormatDescription(fde))) => {
271                if header.type_code != BinlogEventType::FormatDescription {
272                    return Err(IdbError::Parse(
273                        "first event is not FORMAT_DESCRIPTION_EVENT".into(),
274                    ));
275                }
276                self.fde = Some(fde);
277                Ok(())
278            }
279            Some((header, _)) => Err(IdbError::Parse(format!(
280                "first event is {} (expected FORMAT_DESCRIPTION_EVENT)",
281                header.type_code
282            ))),
283            None => Err(IdbError::Parse("no events after magic bytes".into())),
284        }
285    }
286}
287
288/// Iterator over binlog events.
289///
290/// Yields `(offset, CommonEventHeader, BinlogEvent)` for each event in the file.
291pub struct BinlogEventIterator<'a> {
292    binlog: &'a mut BinlogFile,
293    offset: u64,
294    done: bool,
295}
296
297impl<'a> Iterator for BinlogEventIterator<'a> {
298    type Item = Result<(u64, CommonEventHeader, BinlogEvent), IdbError>;
299
300    fn next(&mut self) -> Option<Self::Item> {
301        if self.done {
302            return None;
303        }
304
305        let current_offset = self.offset;
306        match self.binlog.read_event_at(current_offset) {
307            Ok(Some((header, event))) => {
308                // Advance to next event
309                let next = header.next_position;
310                if next == 0 || next as u64 <= current_offset {
311                    // next_position == 0 means this is the last event (e.g. in relay logs)
312                    // or next_position didn't advance — stop to avoid infinite loop
313                    self.done = true;
314                } else {
315                    self.offset = next as u64;
316                }
317                Some(Ok((current_offset, header, event)))
318            }
319            Ok(None) => {
320                self.done = true;
321                None
322            }
323            Err(e) => {
324                self.done = true;
325                Some(Err(e))
326            }
327        }
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334
335    /// Build a synthetic binlog file with magic bytes and a FORMAT_DESCRIPTION_EVENT.
336    ///
337    /// The FDE has CRC-32 checksums enabled and contains a minimal post-header-lengths array.
338    fn build_synthetic_binlog() -> Vec<u8> {
339        let mut buf = Vec::new();
340
341        // Magic bytes
342        buf.extend_from_slice(&BINLOG_MAGIC);
343
344        // Build FDE payload
345        let mut fde_payload = vec![0u8; 59]; // 57 fixed + 1 post-header-length + 1 checksum algo
346        LittleEndian::write_u16(&mut fde_payload[0..], 4); // binlog_version
347        let ver = b"8.0.35";
348        fde_payload[2..2 + ver.len()].copy_from_slice(ver);
349        LittleEndian::write_u32(&mut fde_payload[52..], 1_700_000_000); // create_timestamp
350        fde_payload[56] = 19; // header_length
351        fde_payload[57] = 0; // one post-header-length entry
352        fde_payload[58] = BINLOG_CHECKSUM_ALG_CRC32; // checksum algo
353
354        let event_len = COMMON_HEADER_SIZE + fde_payload.len() + BINLOG_CHECKSUM_LEN;
355        let next_pos = BINLOG_MAGIC_SIZE + event_len;
356
357        // Build common header for FDE
358        let mut hdr = vec![0u8; COMMON_HEADER_SIZE];
359        LittleEndian::write_u32(&mut hdr[0..], 1_700_000_000); // timestamp
360        hdr[4] = FORMAT_DESCRIPTION_EVENT; // type
361        LittleEndian::write_u32(&mut hdr[5..], 1); // server_id
362        LittleEndian::write_u32(&mut hdr[9..], event_len as u32); // event_length
363        LittleEndian::write_u32(&mut hdr[13..], next_pos as u32); // next_position
364        LittleEndian::write_u16(&mut hdr[17..], LOG_EVENT_BINLOG_IN_USE_F); // flags
365
366        // Assemble event: header + payload
367        let mut event = Vec::new();
368        event.extend_from_slice(&hdr);
369        event.extend_from_slice(&fde_payload);
370
371        // Compute and append CRC-32C
372        let crc = crc32c::crc32c(&event);
373        let mut crc_bytes = [0u8; 4];
374        LittleEndian::write_u32(&mut crc_bytes, crc);
375        event.extend_from_slice(&crc_bytes);
376
377        buf.extend_from_slice(&event);
378        buf
379    }
380
381    /// Build a STOP_EVENT (no payload, just header + checksum).
382    fn build_stop_event(offset: usize) -> Vec<u8> {
383        let event_len = COMMON_HEADER_SIZE + BINLOG_CHECKSUM_LEN;
384        let mut hdr = vec![0u8; COMMON_HEADER_SIZE];
385        LittleEndian::write_u32(&mut hdr[0..], 1_700_000_001);
386        hdr[4] = STOP_EVENT;
387        LittleEndian::write_u32(&mut hdr[5..], 1);
388        LittleEndian::write_u32(&mut hdr[9..], event_len as u32);
389        LittleEndian::write_u32(&mut hdr[13..], 0); // 0 = last event
390
391        let crc = crc32c::crc32c(&hdr);
392        let mut crc_bytes = [0u8; 4];
393        LittleEndian::write_u32(&mut crc_bytes, crc);
394
395        let mut event = hdr;
396        event.extend_from_slice(&crc_bytes);
397        let _ = offset; // used for context only
398        event
399    }
400
401    /// Build a ROTATE_EVENT.
402    fn build_rotate_event(offset: usize, next_filename: &str) -> Vec<u8> {
403        let mut payload = vec![0u8; 8 + next_filename.len()];
404        LittleEndian::write_u64(&mut payload[0..], 4); // position
405        payload[8..].copy_from_slice(next_filename.as_bytes());
406
407        let event_len = COMMON_HEADER_SIZE + payload.len() + BINLOG_CHECKSUM_LEN;
408        let mut hdr = vec![0u8; COMMON_HEADER_SIZE];
409        LittleEndian::write_u32(&mut hdr[0..], 1_700_000_002);
410        hdr[4] = ROTATE_EVENT;
411        LittleEndian::write_u32(&mut hdr[5..], 1);
412        LittleEndian::write_u32(&mut hdr[9..], event_len as u32);
413        LittleEndian::write_u32(&mut hdr[13..], (offset + event_len) as u32);
414
415        let mut event = Vec::new();
416        event.extend_from_slice(&hdr);
417        event.extend_from_slice(&payload);
418
419        let crc = crc32c::crc32c(&event);
420        let mut crc_bytes = [0u8; 4];
421        LittleEndian::write_u32(&mut crc_bytes, crc);
422        event.extend_from_slice(&crc_bytes);
423
424        event
425    }
426
427    #[test]
428    fn open_synthetic_binlog() {
429        let data = build_synthetic_binlog();
430        let binlog = BinlogFile::from_bytes(data).unwrap();
431
432        assert!(binlog.has_checksum());
433        let fde = binlog.format_description().unwrap();
434        assert_eq!(fde.binlog_version, 4);
435        assert_eq!(fde.server_version, "8.0.35");
436        assert_eq!(fde.header_length, 19);
437    }
438
439    #[test]
440    fn invalid_magic_bytes() {
441        let data = vec![0u8; 100];
442        match BinlogFile::from_bytes(data) {
443            Err(IdbError::Parse(msg)) => assert!(msg.contains("magic bytes")),
444            Ok(_) => panic!("expected Parse error, got Ok"),
445            Err(e) => panic!("expected Parse error, got: {e}"),
446        }
447    }
448
449    #[test]
450    fn iterate_events() {
451        let mut data = build_synthetic_binlog();
452
453        // Append a ROTATE_EVENT
454        let rotate_offset = data.len();
455        let rotate = build_rotate_event(rotate_offset, "mysql-bin.000002");
456
457        // Fix the FDE's next_position to point to the ROTATE_EVENT
458        // (it's currently set, but let's verify by also updating)
459        // Actually the synthetic builder already sets it correctly to the end of FDE,
460        // but we appended the rotate after. We need to update the FDE next_pos.
461        let fde_next_pos_offset = BINLOG_MAGIC_SIZE + EVENT_NEXT_POSITION_OFFSET;
462        LittleEndian::write_u32(&mut data[fde_next_pos_offset..], rotate_offset as u32);
463        // Recompute FDE checksum
464        let fde_event_start = BINLOG_MAGIC_SIZE;
465        let fde_event_len =
466            LittleEndian::read_u32(&data[fde_event_start + EVENT_LENGTH_OFFSET..]) as usize;
467        let fde_crc_offset = fde_event_start + fde_event_len - BINLOG_CHECKSUM_LEN;
468        let crc = crc32c::crc32c(&data[fde_event_start..fde_crc_offset]);
469        LittleEndian::write_u32(&mut data[fde_crc_offset..], crc);
470
471        data.extend_from_slice(&rotate);
472
473        // Append a STOP_EVENT
474        let stop_offset = data.len();
475        let stop = build_stop_event(stop_offset);
476
477        // Fix rotate's next_position to point to stop
478        let rotate_next_pos_offset = rotate_offset + EVENT_NEXT_POSITION_OFFSET;
479        LittleEndian::write_u32(&mut data[rotate_next_pos_offset..], stop_offset as u32);
480        // Recompute rotate checksum
481        let rotate_event_len =
482            LittleEndian::read_u32(&data[rotate_offset + EVENT_LENGTH_OFFSET..]) as usize;
483        let rotate_crc_offset = rotate_offset + rotate_event_len - BINLOG_CHECKSUM_LEN;
484        let crc = crc32c::crc32c(&data[rotate_offset..rotate_crc_offset]);
485        LittleEndian::write_u32(&mut data[rotate_crc_offset..], crc);
486
487        data.extend_from_slice(&stop);
488
489        let mut binlog = BinlogFile::from_bytes(data).unwrap();
490
491        let events: Vec<_> = binlog.events().collect::<Result<Vec<_>, _>>().unwrap();
492
493        assert_eq!(events.len(), 3);
494
495        // First: FDE
496        assert_eq!(events[0].1.type_code, BinlogEventType::FormatDescription);
497        assert!(matches!(events[0].2, BinlogEvent::FormatDescription(_)));
498
499        // Second: ROTATE
500        assert_eq!(events[1].1.type_code, BinlogEventType::RotateEvent);
501        if let BinlogEvent::Rotate(ref re) = events[1].2 {
502            assert_eq!(re.next_filename, "mysql-bin.000002");
503            assert_eq!(re.position, 4);
504        } else {
505            panic!("expected Rotate event");
506        }
507
508        // Third: STOP
509        assert_eq!(events[2].1.type_code, BinlogEventType::StopEvent);
510        assert!(matches!(events[2].2, BinlogEvent::Stop));
511    }
512
513    #[test]
514    fn validate_checksum_at_offset() {
515        let data = build_synthetic_binlog();
516        let mut binlog = BinlogFile::from_bytes(data).unwrap();
517
518        let result = binlog
519            .validate_checksum_at(BINLOG_MAGIC_SIZE as u64)
520            .unwrap();
521        assert_eq!(result, Some(true));
522    }
523
524    #[test]
525    fn xid_event_parsing() {
526        let mut data = build_synthetic_binlog();
527
528        // Append a XID_EVENT
529        let xid_offset = data.len();
530        let xid_value: u64 = 42;
531        let mut xid_payload = [0u8; 8];
532        LittleEndian::write_u64(&mut xid_payload, xid_value);
533
534        let event_len = COMMON_HEADER_SIZE + 8 + BINLOG_CHECKSUM_LEN;
535        let mut hdr = vec![0u8; COMMON_HEADER_SIZE];
536        LittleEndian::write_u32(&mut hdr[0..], 1_700_000_003);
537        hdr[4] = XID_EVENT;
538        LittleEndian::write_u32(&mut hdr[5..], 1);
539        LittleEndian::write_u32(&mut hdr[9..], event_len as u32);
540        LittleEndian::write_u32(&mut hdr[13..], 0); // last event
541
542        let mut event = Vec::new();
543        event.extend_from_slice(&hdr);
544        event.extend_from_slice(&xid_payload);
545        let crc = crc32c::crc32c(&event);
546        let mut crc_bytes = [0u8; 4];
547        LittleEndian::write_u32(&mut crc_bytes, crc);
548        event.extend_from_slice(&crc_bytes);
549
550        // Fix FDE's next_position
551        let fde_next_pos_offset = BINLOG_MAGIC_SIZE + EVENT_NEXT_POSITION_OFFSET;
552        LittleEndian::write_u32(&mut data[fde_next_pos_offset..], xid_offset as u32);
553        let fde_event_start = BINLOG_MAGIC_SIZE;
554        let fde_event_len =
555            LittleEndian::read_u32(&data[fde_event_start + EVENT_LENGTH_OFFSET..]) as usize;
556        let fde_crc_offset = fde_event_start + fde_event_len - BINLOG_CHECKSUM_LEN;
557        let crc = crc32c::crc32c(&data[fde_event_start..fde_crc_offset]);
558        LittleEndian::write_u32(&mut data[fde_crc_offset..], crc);
559
560        data.extend_from_slice(&event);
561
562        let mut binlog = BinlogFile::from_bytes(data).unwrap();
563        let events: Vec<_> = binlog.events().collect::<Result<Vec<_>, _>>().unwrap();
564
565        assert_eq!(events.len(), 2);
566        if let BinlogEvent::Xid { xid } = &events[1].2 {
567            assert_eq!(*xid, 42);
568        } else {
569            panic!("expected Xid event");
570        }
571    }
572}