Skip to main content

idb/innodb/
undo.rs

1//! UNDO log page parsing.
2//!
3//! UNDO log pages (page type 2 / `FIL_PAGE_UNDO_LOG`) store previous versions
4//! of modified records for MVCC and rollback. Each undo page has an
5//! [`UndoPageHeader`] at `FIL_PAGE_DATA` (byte 38) describing the undo type
6//! and free space pointers, followed by an [`UndoSegmentHeader`] with the
7//! segment state and transaction metadata.
8
9use byteorder::{BigEndian, ByteOrder};
10use serde::Serialize;
11
12use crate::innodb::constants::{FIL_NULL, FIL_PAGE_DATA};
13use crate::innodb::page::FilHeader;
14use crate::innodb::page_types::PageType;
15use crate::innodb::tablespace::Tablespace;
16use crate::IdbError;
17
18/// Undo log page header offsets (relative to FIL_PAGE_DATA).
19///
20/// From trx0undo.h in MySQL source.
21const TRX_UNDO_PAGE_TYPE: usize = 0; // 2 bytes
22const TRX_UNDO_PAGE_START: usize = 2; // 2 bytes
23const TRX_UNDO_PAGE_FREE: usize = 4; // 2 bytes
24#[allow(dead_code)]
25const TRX_UNDO_PAGE_NODE: usize = 6; // 12 bytes (FLST_NODE)
26const TRX_UNDO_PAGE_HDR_SIZE: usize = 18;
27
28/// Undo segment header offsets (relative to FIL_PAGE_DATA + TRX_UNDO_PAGE_HDR_SIZE).
29const TRX_UNDO_STATE: usize = 0; // 2 bytes
30const TRX_UNDO_LAST_LOG: usize = 2; // 2 bytes
31#[allow(dead_code)]
32const TRX_UNDO_FSEG_HEADER: usize = 4; // 10 bytes (FSEG_HEADER)
33#[allow(dead_code)]
34const TRX_UNDO_PAGE_LIST: usize = 14; // 16 bytes (FLST_BASE_NODE)
35const TRX_UNDO_SEG_HDR_SIZE: usize = 30;
36
37/// Undo log header offsets (at the start of the undo log within the page).
38const TRX_UNDO_TRX_ID: usize = 0; // 8 bytes
39const TRX_UNDO_TRX_NO: usize = 8; // 8 bytes
40const TRX_UNDO_DEL_MARKS: usize = 16; // 2 bytes
41const TRX_UNDO_LOG_START: usize = 18; // 2 bytes
42const TRX_UNDO_XID_EXISTS: usize = 20; // 1 byte
43const TRX_UNDO_DICT_TRANS: usize = 21; // 1 byte
44const TRX_UNDO_TABLE_ID: usize = 22; // 8 bytes
45const TRX_UNDO_NEXT_LOG: usize = 30; // 2 bytes
46const TRX_UNDO_PREV_LOG: usize = 32; // 2 bytes
47
48/// Undo page types.
49///
50/// # Examples
51///
52/// ```
53/// use idb::innodb::undo::UndoPageType;
54///
55/// let insert = UndoPageType::from_u16(1);
56/// assert_eq!(insert, UndoPageType::Insert);
57/// assert_eq!(insert.name(), "INSERT");
58///
59/// let update = UndoPageType::from_u16(2);
60/// assert_eq!(update, UndoPageType::Update);
61/// assert_eq!(update.name(), "UPDATE");
62///
63/// let unknown = UndoPageType::from_u16(99);
64/// assert_eq!(unknown, UndoPageType::Unknown(99));
65/// assert_eq!(unknown.name(), "UNKNOWN");
66/// ```
67#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
68pub enum UndoPageType {
69    /// Insert undo log (INSERT operations only)
70    Insert,
71    /// Update undo log (UPDATE and DELETE operations)
72    Update,
73    /// Unknown type
74    Unknown(u16),
75}
76
77impl UndoPageType {
78    /// Convert a raw u16 value from the undo page header to an `UndoPageType`.
79    pub fn from_u16(value: u16) -> Self {
80        match value {
81            1 => UndoPageType::Insert,
82            2 => UndoPageType::Update,
83            v => UndoPageType::Unknown(v),
84        }
85    }
86
87    /// Returns the MySQL source-style name for this undo page type.
88    pub fn name(&self) -> &'static str {
89        match self {
90            UndoPageType::Insert => "INSERT",
91            UndoPageType::Update => "UPDATE",
92            UndoPageType::Unknown(_) => "UNKNOWN",
93        }
94    }
95}
96
97/// Undo segment states.
98///
99/// # Examples
100///
101/// ```
102/// use idb::innodb::undo::UndoState;
103///
104/// assert_eq!(UndoState::from_u16(1), UndoState::Active);
105/// assert_eq!(UndoState::from_u16(2), UndoState::Cached);
106/// assert_eq!(UndoState::from_u16(3), UndoState::ToFree);
107/// assert_eq!(UndoState::from_u16(4), UndoState::ToPurge);
108/// assert_eq!(UndoState::from_u16(5), UndoState::Prepared);
109///
110/// assert_eq!(UndoState::Active.name(), "ACTIVE");
111/// assert_eq!(UndoState::ToPurge.name(), "TO_PURGE");
112///
113/// let unknown = UndoState::from_u16(0);
114/// assert_eq!(unknown, UndoState::Unknown(0));
115/// assert_eq!(unknown.name(), "UNKNOWN");
116/// ```
117#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
118pub enum UndoState {
119    /// Active transaction is using this segment
120    Active,
121    /// Cached for reuse
122    Cached,
123    /// Insert undo segment can be freed
124    ToFree,
125    /// Update undo segment will not be freed (has delete marks)
126    ToPurge,
127    /// Prepared transaction undo
128    Prepared,
129    /// Unknown state
130    Unknown(u16),
131}
132
133impl UndoState {
134    /// Convert a raw u16 value from the undo segment header to an `UndoState`.
135    pub fn from_u16(value: u16) -> Self {
136        match value {
137            1 => UndoState::Active,
138            2 => UndoState::Cached,
139            3 => UndoState::ToFree,
140            4 => UndoState::ToPurge,
141            5 => UndoState::Prepared,
142            v => UndoState::Unknown(v),
143        }
144    }
145
146    /// Returns the MySQL source-style name for this undo state.
147    pub fn name(&self) -> &'static str {
148        match self {
149            UndoState::Active => "ACTIVE",
150            UndoState::Cached => "CACHED",
151            UndoState::ToFree => "TO_FREE",
152            UndoState::ToPurge => "TO_PURGE",
153            UndoState::Prepared => "PREPARED",
154            UndoState::Unknown(_) => "UNKNOWN",
155        }
156    }
157}
158
159/// Parsed undo log page header.
160#[derive(Debug, Clone, Serialize)]
161pub struct UndoPageHeader {
162    /// Type of undo log (INSERT or UPDATE).
163    pub page_type: UndoPageType,
164    /// Offset of the start of undo log records on this page.
165    pub start: u16,
166    /// Offset of the first free byte on this page.
167    pub free: u16,
168}
169
170/// Parsed undo segment header (only on first page of undo segment).
171#[derive(Debug, Clone, Serialize)]
172pub struct UndoSegmentHeader {
173    /// State of the undo segment.
174    pub state: UndoState,
175    /// Offset of the last undo log header on the segment.
176    pub last_log: u16,
177}
178
179impl UndoPageHeader {
180    /// Parse an undo page header from a full page buffer.
181    ///
182    /// The undo page header starts at FIL_PAGE_DATA (byte 38).
183    ///
184    /// # Examples
185    ///
186    /// ```
187    /// use idb::innodb::undo::{UndoPageHeader, UndoPageType};
188    /// use byteorder::{BigEndian, ByteOrder};
189    ///
190    /// // Build a minimal page buffer (at least 38 + 18 = 56 bytes).
191    /// let mut page = vec![0u8; 64];
192    /// let base = 38; // FIL_PAGE_DATA
193    ///
194    /// // Undo page type = UPDATE (2) at offset base+0
195    /// BigEndian::write_u16(&mut page[base..], 2);
196    /// // Start offset at base+2
197    /// BigEndian::write_u16(&mut page[base + 2..], 80);
198    /// // Free offset at base+4
199    /// BigEndian::write_u16(&mut page[base + 4..], 160);
200    ///
201    /// let hdr = UndoPageHeader::parse(&page).unwrap();
202    /// assert_eq!(hdr.page_type, UndoPageType::Update);
203    /// assert_eq!(hdr.start, 80);
204    /// assert_eq!(hdr.free, 160);
205    /// ```
206    pub fn parse(page_data: &[u8]) -> Option<Self> {
207        let base = FIL_PAGE_DATA;
208        if page_data.len() < base + TRX_UNDO_PAGE_HDR_SIZE {
209            return None;
210        }
211
212        let d = &page_data[base..];
213        Some(UndoPageHeader {
214            page_type: UndoPageType::from_u16(BigEndian::read_u16(&d[TRX_UNDO_PAGE_TYPE..])),
215            start: BigEndian::read_u16(&d[TRX_UNDO_PAGE_START..]),
216            free: BigEndian::read_u16(&d[TRX_UNDO_PAGE_FREE..]),
217        })
218    }
219}
220
221impl UndoSegmentHeader {
222    /// Parse an undo segment header from a full page buffer.
223    ///
224    /// The segment header follows the page header at FIL_PAGE_DATA + TRX_UNDO_PAGE_HDR_SIZE.
225    ///
226    /// # Examples
227    ///
228    /// ```
229    /// use idb::innodb::undo::{UndoSegmentHeader, UndoState};
230    /// use byteorder::{BigEndian, ByteOrder};
231    ///
232    /// // Need at least 38 (FIL header) + 18 (page header) + 30 (seg header) = 86 bytes.
233    /// let mut page = vec![0u8; 96];
234    /// let base = 38 + 18; // FIL_PAGE_DATA + TRX_UNDO_PAGE_HDR_SIZE
235    ///
236    /// // State = CACHED (2) at base+0
237    /// BigEndian::write_u16(&mut page[base..], 2);
238    /// // Last log offset at base+2
239    /// BigEndian::write_u16(&mut page[base + 2..], 200);
240    ///
241    /// let hdr = UndoSegmentHeader::parse(&page).unwrap();
242    /// assert_eq!(hdr.state, UndoState::Cached);
243    /// assert_eq!(hdr.last_log, 200);
244    /// ```
245    pub fn parse(page_data: &[u8]) -> Option<Self> {
246        let base = FIL_PAGE_DATA + TRX_UNDO_PAGE_HDR_SIZE;
247        if page_data.len() < base + TRX_UNDO_SEG_HDR_SIZE {
248            return None;
249        }
250
251        let d = &page_data[base..];
252        Some(UndoSegmentHeader {
253            state: UndoState::from_u16(BigEndian::read_u16(&d[TRX_UNDO_STATE..])),
254            last_log: BigEndian::read_u16(&d[TRX_UNDO_LAST_LOG..]),
255        })
256    }
257}
258
259/// Parsed undo log record header (at the start of an undo log within the page).
260#[derive(Debug, Clone, Serialize)]
261pub struct UndoLogHeader {
262    /// Transaction ID that created this undo log.
263    pub trx_id: u64,
264    /// Transaction serial number.
265    pub trx_no: u64,
266    /// Whether delete marks exist in this undo log.
267    pub del_marks: bool,
268    /// Offset of the first undo log record.
269    pub log_start: u16,
270    /// Whether XID info exists (distributed transactions).
271    pub xid_exists: bool,
272    /// Whether this is a DDL transaction.
273    pub dict_trans: bool,
274    /// Table ID (for insert undo logs).
275    pub table_id: u64,
276    /// Offset of the next undo log header (0 if last).
277    pub next_log: u16,
278    /// Offset of the previous undo log header (0 if first).
279    pub prev_log: u16,
280}
281
282impl UndoLogHeader {
283    /// Parse an undo log header from a page at the given offset.
284    ///
285    /// The `log_offset` is typically obtained from UndoSegmentHeader::last_log
286    /// or UndoPageHeader::start.
287    ///
288    /// # Examples
289    ///
290    /// ```
291    /// use idb::innodb::undo::UndoLogHeader;
292    /// use byteorder::{BigEndian, ByteOrder};
293    ///
294    /// // The undo log header is 34 bytes starting at log_offset.
295    /// let log_offset = 100;
296    /// let mut page = vec![0u8; log_offset + 34];
297    ///
298    /// // trx_id (8 bytes) at offset 0
299    /// BigEndian::write_u64(&mut page[log_offset..], 1001);
300    /// // trx_no (8 bytes) at offset 8
301    /// BigEndian::write_u64(&mut page[log_offset + 8..], 500);
302    /// // del_marks (2 bytes) at offset 16
303    /// BigEndian::write_u16(&mut page[log_offset + 16..], 1);
304    /// // log_start (2 bytes) at offset 18
305    /// BigEndian::write_u16(&mut page[log_offset + 18..], 120);
306    /// // xid_exists (1 byte) at offset 20
307    /// page[log_offset + 20] = 1;
308    /// // dict_trans (1 byte) at offset 21
309    /// page[log_offset + 21] = 0;
310    /// // table_id (8 bytes) at offset 22
311    /// BigEndian::write_u64(&mut page[log_offset + 22..], 42);
312    /// // next_log (2 bytes) at offset 30
313    /// BigEndian::write_u16(&mut page[log_offset + 30..], 0);
314    /// // prev_log (2 bytes) at offset 32
315    /// BigEndian::write_u16(&mut page[log_offset + 32..], 0);
316    ///
317    /// let hdr = UndoLogHeader::parse(&page, log_offset).unwrap();
318    /// assert_eq!(hdr.trx_id, 1001);
319    /// assert_eq!(hdr.trx_no, 500);
320    /// assert!(hdr.del_marks);
321    /// assert_eq!(hdr.log_start, 120);
322    /// assert!(hdr.xid_exists);
323    /// assert!(!hdr.dict_trans);
324    /// assert_eq!(hdr.table_id, 42);
325    /// assert_eq!(hdr.next_log, 0);
326    /// assert_eq!(hdr.prev_log, 0);
327    /// ```
328    pub fn parse(page_data: &[u8], log_offset: usize) -> Option<Self> {
329        if page_data.len() < log_offset + 34 {
330            return None;
331        }
332
333        let d = &page_data[log_offset..];
334        Some(UndoLogHeader {
335            trx_id: BigEndian::read_u64(&d[TRX_UNDO_TRX_ID..]),
336            trx_no: BigEndian::read_u64(&d[TRX_UNDO_TRX_NO..]),
337            del_marks: BigEndian::read_u16(&d[TRX_UNDO_DEL_MARKS..]) != 0,
338            log_start: BigEndian::read_u16(&d[TRX_UNDO_LOG_START..]),
339            xid_exists: d[TRX_UNDO_XID_EXISTS] != 0,
340            dict_trans: d[TRX_UNDO_DICT_TRANS] != 0,
341            table_id: BigEndian::read_u64(&d[TRX_UNDO_TABLE_ID..]),
342            next_log: BigEndian::read_u16(&d[TRX_UNDO_NEXT_LOG..]),
343            prev_log: BigEndian::read_u16(&d[TRX_UNDO_PREV_LOG..]),
344        })
345    }
346}
347
348/// Rollback segment array page header (page type FIL_PAGE_RSEG_ARRAY, MySQL 8.0+).
349///
350/// This page is the first page of an undo tablespace (.ibu) and contains
351/// an array of rollback segment page numbers.
352#[derive(Debug, Clone, Serialize)]
353pub struct RsegArrayHeader {
354    /// Number of rollback segment slots.
355    pub size: u32,
356}
357
358impl RsegArrayHeader {
359    /// Parse a rollback segment array header from a full page buffer.
360    ///
361    /// RSEG array header starts at FIL_PAGE_DATA.
362    pub fn parse(page_data: &[u8]) -> Option<Self> {
363        let base = FIL_PAGE_DATA;
364        if page_data.len() < base + 4 {
365            return None;
366        }
367
368        Some(RsegArrayHeader {
369            size: BigEndian::read_u32(&page_data[base..]),
370        })
371    }
372
373    /// Read rollback segment page numbers from the array.
374    ///
375    /// Each slot is a 4-byte page number. Returns up to `max_slots` entries.
376    pub fn read_slots(page_data: &[u8], max_slots: usize) -> Vec<u32> {
377        let base = FIL_PAGE_DATA + 4; // After the size field
378        let mut slots = Vec::new();
379
380        for i in 0..max_slots {
381            let offset = base + i * 4;
382            if offset + 4 > page_data.len() {
383                break;
384            }
385            let page_no = BigEndian::read_u32(&page_data[offset..]);
386            if page_no != 0 && page_no != crate::innodb::constants::FIL_NULL {
387                slots.push(page_no);
388            }
389        }
390
391        slots
392    }
393}
394
395// ---------------------------------------------------------------------------
396// Rollback segment header (RSEG header page pointed to by RSEG array slots)
397// ---------------------------------------------------------------------------
398
399/// Offsets within the rollback segment header page (at FIL_PAGE_DATA).
400const TRX_RSEG_MAX_SIZE: usize = 0; // 4 bytes
401const TRX_RSEG_HISTORY_SIZE: usize = 4; // 4 bytes
402#[allow(dead_code)]
403const TRX_RSEG_HISTORY: usize = 8; // 16 bytes (FLST_BASE_NODE)
404const TRX_RSEG_SLOTS_OFFSET: usize = 24; // 1024 * 4 bytes of page numbers
405
406/// Maximum number of undo segment slots per rollback segment.
407const TRX_RSEG_N_SLOTS: usize = 1024;
408
409/// Parsed rollback segment header (the page pointed to by RSEG array slots).
410///
411/// Contains the maximum size, history list length, and an array of up to 1024
412/// undo segment page number slots.
413#[derive(Debug, Clone, Serialize)]
414pub struct RollbackSegmentHeader {
415    /// Maximum number of undo pages this RSEG can use.
416    pub max_size: u32,
417    /// Number of committed transactions in the history list.
418    pub history_size: u32,
419    /// Undo segment page numbers (FIL_NULL = empty slot).
420    pub slots: Vec<u32>,
421}
422
423impl RollbackSegmentHeader {
424    /// Parse a rollback segment header from a full page buffer.
425    ///
426    /// The RSEG header starts at FIL_PAGE_DATA on the page pointed to by an
427    /// RSEG array slot.
428    pub fn parse(page_data: &[u8]) -> Option<Self> {
429        let base = FIL_PAGE_DATA;
430        let min_size = base + TRX_RSEG_SLOTS_OFFSET + TRX_RSEG_N_SLOTS * 4;
431        if page_data.len() < min_size {
432            return None;
433        }
434
435        let d = &page_data[base..];
436        let max_size = BigEndian::read_u32(&d[TRX_RSEG_MAX_SIZE..]);
437        let history_size = BigEndian::read_u32(&d[TRX_RSEG_HISTORY_SIZE..]);
438
439        let mut slots = Vec::new();
440        for i in 0..TRX_RSEG_N_SLOTS {
441            let offset = TRX_RSEG_SLOTS_OFFSET + i * 4;
442            let page_no = BigEndian::read_u32(&d[offset..]);
443            slots.push(page_no);
444        }
445
446        Some(RollbackSegmentHeader {
447            max_size,
448            history_size,
449            slots,
450        })
451    }
452
453    /// Return only the active (non-FIL_NULL, non-zero) slot page numbers.
454    pub fn active_slots(&self) -> Vec<u32> {
455        self.slots
456            .iter()
457            .copied()
458            .filter(|&s| s != FIL_NULL && s != 0)
459            .collect()
460    }
461}
462
463// ---------------------------------------------------------------------------
464// InnoDB compressed integer reader (mach0data.h encoding)
465// ---------------------------------------------------------------------------
466
467/// Read an InnoDB compressed integer from `data` at the given `offset`.
468///
469/// Returns `(value, bytes_consumed)` or `None` if insufficient data.
470///
471/// Encoding rules (from `mach0data.h`):
472/// - `byte < 0x80`: 1 byte, value = byte
473/// - `byte < 0xC0`: 2 bytes, value = `(byte-0x80)<<8 | next`
474/// - `byte < 0xE0`: 3 bytes, value = `(byte-0xC0)<<16 | next_2`
475/// - `byte < 0xF0`: 4 bytes, value = `(byte-0xE0)<<24 | next_3`
476/// - `byte == 0xF0`: 5 bytes, value = `next_4_bytes`
477///
478/// # Examples
479///
480/// ```
481/// use idb::innodb::undo::read_compressed;
482///
483/// // 1-byte: value 0x7F = 127
484/// assert_eq!(read_compressed(&[0x7F], 0), Some((127, 1)));
485///
486/// // 2-byte: (0x80 - 0x80) << 8 | 0x01 = 1
487/// assert_eq!(read_compressed(&[0x80, 0x01], 0), Some((1, 2)));
488///
489/// // Insufficient data
490/// assert_eq!(read_compressed(&[0x80], 0), None);
491/// ```
492pub fn read_compressed(data: &[u8], offset: usize) -> Option<(u64, usize)> {
493    if offset >= data.len() {
494        return None;
495    }
496    let b = data[offset];
497    if b < 0x80 {
498        Some((b as u64, 1))
499    } else if b < 0xC0 {
500        if offset + 2 > data.len() {
501            return None;
502        }
503        let val = ((b as u64 - 0x80) << 8) | data[offset + 1] as u64;
504        Some((val, 2))
505    } else if b < 0xE0 {
506        if offset + 3 > data.len() {
507            return None;
508        }
509        let val =
510            ((b as u64 - 0xC0) << 16) | (data[offset + 1] as u64) << 8 | data[offset + 2] as u64;
511        Some((val, 3))
512    } else if b < 0xF0 {
513        if offset + 4 > data.len() {
514            return None;
515        }
516        let val = ((b as u64 - 0xE0) << 24)
517            | (data[offset + 1] as u64) << 16
518            | (data[offset + 2] as u64) << 8
519            | data[offset + 3] as u64;
520        Some((val, 4))
521    } else if b == 0xF0 {
522        if offset + 5 > data.len() {
523            return None;
524        }
525        let val = BigEndian::read_u32(&data[offset + 1..]) as u64;
526        Some((val, 5))
527    } else {
528        // Invalid leading byte (0xF1..0xFF not defined)
529        None
530    }
531}
532
533// ---------------------------------------------------------------------------
534// Undo log header chain traversal
535// ---------------------------------------------------------------------------
536
537/// Walk the chain of undo log headers within a single page.
538///
539/// Starts from `start_offset` (typically `UndoSegmentHeader::last_log`) and
540/// follows `prev_log` pointers backwards. Returns headers in reverse
541/// chronological order (newest first).
542pub fn walk_undo_log_headers(page_data: &[u8], start_offset: u16) -> Vec<UndoLogHeader> {
543    let mut headers = Vec::new();
544    let mut offset = start_offset as usize;
545    let max_iterations = 1000; // safety limit
546
547    for _ in 0..max_iterations {
548        if offset == 0 || offset >= page_data.len() {
549            break;
550        }
551
552        match UndoLogHeader::parse(page_data, offset) {
553            Some(hdr) => {
554                let prev = hdr.prev_log;
555                headers.push(hdr);
556                if prev == 0 {
557                    break;
558                }
559                offset = prev as usize;
560            }
561            None => break,
562        }
563    }
564
565    headers
566}
567
568// ---------------------------------------------------------------------------
569// Undo record type classification and chain traversal
570// ---------------------------------------------------------------------------
571
572/// Undo record operation types from trx0undo.h.
573///
574/// # Examples
575///
576/// ```
577/// use idb::innodb::undo::UndoRecordType;
578///
579/// assert_eq!(UndoRecordType::from_type_byte(11), UndoRecordType::InsertRec);
580/// assert_eq!(UndoRecordType::from_type_byte(14), UndoRecordType::DelMarkRec);
581/// assert_eq!(UndoRecordType::from_u8(11), UndoRecordType::InsertRec);
582/// assert_eq!(UndoRecordType::InsertRec.name(), "INSERT");
583/// ```
584#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
585pub enum UndoRecordType {
586    /// TRX_UNDO_INSERT_REC (11) — fresh insert record
587    InsertRec,
588    /// TRX_UNDO_UPD_EXIST_REC (12) — update of a non-delete-marked record
589    UpdExistRec,
590    /// TRX_UNDO_UPD_DEL_REC (13) — update of a previously delete-marked record (undelete)
591    UpdDelRec,
592    /// TRX_UNDO_DEL_MARK_REC (14) — delete marking of a record
593    DelMarkRec,
594    /// Unrecognized type code
595    Unknown(u8),
596}
597
598impl UndoRecordType {
599    /// Classify an undo record from the type/compilation info byte.
600    ///
601    /// The type code is stored in the lower 4 bits of the first byte of
602    /// each undo record (the upper bits contain compilation info flags).
603    pub fn from_type_byte(byte: u8) -> Self {
604        match byte & 0x0F {
605            11 => UndoRecordType::InsertRec,
606            12 => UndoRecordType::UpdExistRec,
607            13 => UndoRecordType::UpdDelRec,
608            14 => UndoRecordType::DelMarkRec,
609            other => UndoRecordType::Unknown(other),
610        }
611    }
612
613    /// Convert a raw type code to an `UndoRecordType`.
614    ///
615    /// Unlike `from_type_byte`, this does NOT mask the upper bits.
616    /// Caller is responsible for extracting the lower 4 bits if needed.
617    pub fn from_u8(val: u8) -> Self {
618        match val {
619            11 => UndoRecordType::InsertRec,
620            12 => UndoRecordType::UpdExistRec,
621            13 => UndoRecordType::UpdDelRec,
622            14 => UndoRecordType::DelMarkRec,
623            v => UndoRecordType::Unknown(v),
624        }
625    }
626
627    /// Returns the MySQL source-style name for this record type.
628    pub fn name(&self) -> &'static str {
629        match self {
630            UndoRecordType::InsertRec => "INSERT",
631            UndoRecordType::UpdExistRec => "UPD_EXIST",
632            UndoRecordType::UpdDelRec => "UPD_DEL",
633            UndoRecordType::DelMarkRec => "DEL_MARK",
634            UndoRecordType::Unknown(_) => "UNKNOWN",
635        }
636    }
637}
638
639impl std::fmt::Display for UndoRecordType {
640    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641        write!(f, "{}", self.name())
642    }
643}
644
645/// A single field update from an undo record's update vector.
646#[derive(Debug, Clone, Serialize)]
647pub struct UndoUpdateField {
648    /// Field number within the row (0-based).
649    pub field_no: u64,
650    /// Raw field data bytes.
651    pub data: Vec<u8>,
652}
653
654/// A parsed undo record within an undo log page.
655#[derive(Debug, Clone, Serialize)]
656pub struct UndoRecord {
657    /// Byte offset of this record within the page.
658    pub offset: usize,
659    /// Operation type.
660    pub record_type: UndoRecordType,
661    /// Info bits (compilation info flags from upper bits of type byte).
662    pub info_bits: u8,
663    /// Offset of the next undo record (2-byte pointer), 0 if last.
664    pub next_offset: u16,
665    /// Approximate data length of this record (bytes until next record or free pointer).
666    pub data_len: usize,
667}
668
669/// Walk the chain of undo records within a single page.
670///
671/// Starts at `start_offset` (typically `UndoPageHeader::start` or
672/// `UndoLogHeader::log_start`) and follows 2-byte "next record" pointers.
673/// Terminates at offset 0 or when reaching `free_offset` (from `UndoPageHeader::free`).
674///
675/// Returns records in forward order (oldest first).
676pub fn walk_undo_records(
677    page_data: &[u8],
678    start_offset: u16,
679    free_offset: u16,
680    max_records: usize,
681) -> Vec<UndoRecord> {
682    let mut records = Vec::new();
683    let mut offset = start_offset as usize;
684
685    for _ in 0..max_records {
686        if offset == 0 || offset >= page_data.len() || offset as u16 >= free_offset {
687            break;
688        }
689
690        // Each undo record starts with a 2-byte "next record" pointer
691        // followed by a type/compilation_info byte
692        if offset + 3 > page_data.len() {
693            break;
694        }
695
696        let next_offset = BigEndian::read_u16(&page_data[offset..]);
697        let type_byte = page_data[offset + 2];
698        let record_type = UndoRecordType::from_type_byte(type_byte);
699        let info_bits = type_byte >> 4;
700
701        // Estimate data length: from current offset to next record or free pointer
702        let end = if next_offset > 0 && (next_offset as usize) < page_data.len() {
703            next_offset as usize
704        } else {
705            free_offset as usize
706        };
707        let data_len = if end > offset + 3 {
708            end - offset - 3
709        } else {
710            0
711        };
712
713        records.push(UndoRecord {
714            offset,
715            record_type,
716            info_bits,
717            next_offset,
718            data_len,
719        });
720
721        if next_offset == 0 {
722            break;
723        }
724        offset = next_offset as usize;
725    }
726
727    records
728}
729
730// ---------------------------------------------------------------------------
731// Detailed undo record parsing (with compressed field decoding)
732// ---------------------------------------------------------------------------
733
734/// A fully-parsed undo record with decoded fields.
735///
736/// Unlike [`UndoRecord`] which only captures offsets and type info, this struct
737/// decodes compressed integers (undo_no, table_id), primary key fields,
738/// transaction IDs, roll pointers, and update vector fields.
739#[derive(Debug, Clone, Serialize)]
740pub struct DetailedUndoRecord {
741    /// Byte offset of this record within the page.
742    pub offset: usize,
743    /// Undo operation type.
744    pub record_type: UndoRecordType,
745    /// Undo record sequence number within the transaction.
746    pub undo_no: u64,
747    /// Table ID this record belongs to.
748    pub table_id: u64,
749    /// Raw primary key field bytes.
750    pub pk_fields: Vec<Vec<u8>>,
751    /// Transaction ID (for DEL_MARK and UPD_EXIST records).
752    #[serde(skip_serializing_if = "Option::is_none")]
753    pub trx_id: Option<u64>,
754    /// Roll pointer (7 bytes, for DEL_MARK and UPD_EXIST records).
755    #[serde(skip_serializing_if = "Option::is_none")]
756    pub roll_ptr: Option<[u8; 7]>,
757    /// Update vector fields (for UPD_EXIST/UPD_DEL/DEL_MARK).
758    #[serde(skip_serializing_if = "Vec::is_empty")]
759    pub update_fields: Vec<UndoUpdateField>,
760}
761
762/// Parse undo records on a single UNDO_LOG page with full field decoding.
763///
764/// Iterates from `UndoPageHeader::start` to `free`, parsing each record
765/// including compressed undo_no, table_id, PK fields, trx_id, roll_ptr,
766/// and update vector. Returns all successfully parsed records; stops on
767/// parse failure or when reaching the free pointer.
768///
769/// # Format (from `trx0rec.cc`)
770///
771/// Each undo record starts with:
772/// 1. 2 bytes: next record offset
773/// 2. 1 byte: `type_cmpl` (lower 4 bits = type code)
774/// 3. Compressed: `undo_no`
775/// 4. Compressed: `table_id`
776///
777/// For UPD_EXIST/UPD_DEL/DEL_MARK (from `trx_undo_page_report_modify`):
778/// 5. 6 bytes: `trx_id` (fixed big-endian, via `mach_write_to_6`)
779/// 6. 7 bytes: `roll_ptr`
780/// 7. Update vector: compressed field count, then per-field
781///    (compressed field_no, compressed len, raw data)
782/// 8. PK fields: for each PK column, compressed length + raw bytes
783///    (we read 1 PK field; multi-column PKs need schema info)
784///
785/// For INSERT_REC (from `trx_undo_page_report_insert`):
786/// 5. PK fields: for each PK column, compressed length + raw bytes
787pub fn parse_undo_records(page_data: &[u8]) -> Vec<DetailedUndoRecord> {
788    let mut records = Vec::new();
789
790    let page_hdr = match UndoPageHeader::parse(page_data) {
791        Some(h) => h,
792        None => return records,
793    };
794
795    let start = page_hdr.start as usize;
796    let free = page_hdr.free as usize;
797
798    if start == 0 || start >= page_data.len() || free == 0 || start >= free {
799        return records;
800    }
801
802    let mut pos = start;
803    let mut visited = std::collections::HashSet::new();
804
805    while pos >= start && pos < free && pos + 3 <= page_data.len() {
806        if !visited.insert(pos) {
807            break; // cycle detection
808        }
809
810        let rec_offset = pos;
811
812        // 2 bytes: next record offset
813        if pos + 2 > page_data.len() {
814            break;
815        }
816        let next = BigEndian::read_u16(&page_data[pos..]) as usize;
817        pos += 2;
818
819        // 1 byte: type_cmpl
820        if pos >= page_data.len() {
821            break;
822        }
823        let type_cmpl = page_data[pos];
824        let rec_type = UndoRecordType::from_u8(type_cmpl & 0x0F);
825        pos += 1;
826
827        // Compressed: undo_no
828        let (undo_no, consumed) = match read_compressed(page_data, pos) {
829            Some(v) => v,
830            None => break,
831        };
832        pos += consumed;
833
834        // Compressed: table_id
835        let (table_id, consumed) = match read_compressed(page_data, pos) {
836            Some(v) => v,
837            None => break,
838        };
839        pos += consumed;
840
841        let mut trx_id = None;
842        let mut roll_ptr = None;
843        let mut update_fields = Vec::new();
844
845        let is_modify = matches!(
846            rec_type,
847            UndoRecordType::UpdExistRec | UndoRecordType::UpdDelRec | UndoRecordType::DelMarkRec
848        );
849
850        // For UPD_EXIST/UPD_DEL/DEL_MARK: trx_id + roll_ptr + update vector come
851        // BEFORE PK fields (per trx_undo_page_report_modify in trx0rec.cc).
852        if is_modify {
853            // trx_id: fixed 6-byte big-endian (mach_write_to_6 / mach_read_from_6)
854            if pos + 6 <= page_data.len() {
855                let mut buf = [0u8; 8];
856                buf[2..8].copy_from_slice(&page_data[pos..pos + 6]);
857                trx_id = Some(BigEndian::read_u64(&buf));
858                pos += 6;
859            }
860
861            // 7-byte roll_ptr
862            if pos + 7 <= page_data.len() {
863                let mut rp = [0u8; 7];
864                rp.copy_from_slice(&page_data[pos..pos + 7]);
865                roll_ptr = Some(rp);
866                pos += 7;
867            }
868
869            // Update vector: compressed field count
870            if let Some((n_fields, consumed)) = read_compressed(page_data, pos) {
871                pos += consumed;
872                for _ in 0..n_fields.min(256) {
873                    // compressed field_no
874                    let (field_no, c1) = match read_compressed(page_data, pos) {
875                        Some(v) => v,
876                        None => break,
877                    };
878                    pos += c1;
879
880                    // compressed length
881                    let (flen, c2) = match read_compressed(page_data, pos) {
882                        Some(v) => v,
883                        None => break,
884                    };
885                    pos += c2;
886
887                    let flen = flen as usize;
888                    if flen > 0 && pos + flen <= page_data.len() && flen < 65536 {
889                        update_fields.push(UndoUpdateField {
890                            field_no,
891                            data: page_data[pos..pos + flen].to_vec(),
892                        });
893                        pos += flen;
894                    } else if flen == 0 {
895                        update_fields.push(UndoUpdateField {
896                            field_no,
897                            data: Vec::new(),
898                        });
899                    } else {
900                        break;
901                    }
902                }
903            }
904        }
905
906        // PK fields: for all record types, compressed length + raw bytes
907        // (for INSERT_REC this comes right after table_id; for modify types,
908        // after the update vector)
909        let mut pk_fields = Vec::new();
910        if let Some((pk_len, consumed)) = read_compressed(page_data, pos) {
911            pos += consumed;
912            let pk_len = pk_len as usize;
913            if pk_len > 0 && pos + pk_len <= page_data.len() && pk_len < 8192 {
914                pk_fields.push(page_data[pos..pos + pk_len].to_vec());
915                let _ = pk_len; // pos not needed after this point
916            }
917        }
918
919        records.push(DetailedUndoRecord {
920            offset: rec_offset,
921            record_type: rec_type,
922            undo_no,
923            table_id,
924            pk_fields,
925            trx_id,
926            roll_ptr,
927            update_fields,
928        });
929
930        // Advance to next record
931        if next == 0 || next >= free || next <= rec_offset {
932            break;
933        }
934        pos = next;
935    }
936
937    records
938}
939
940// ---------------------------------------------------------------------------
941// Undo segment and tablespace analysis
942// ---------------------------------------------------------------------------
943
944/// Aggregated information about a single undo segment within a tablespace.
945#[derive(Debug, Clone, Serialize)]
946pub struct UndoSegmentInfo {
947    /// Page number of the undo segment header page.
948    pub page_no: u64,
949    /// Parsed undo page header.
950    pub page_header: UndoPageHeader,
951    /// Parsed undo segment header.
952    pub segment_header: UndoSegmentHeader,
953    /// Undo log headers found by walking the chain.
954    pub log_headers: Vec<UndoLogHeader>,
955    /// Number of undo records found on this page.
956    pub record_count: usize,
957}
958
959/// Top-level analysis result for an undo tablespace.
960#[derive(Debug, Clone, Serialize)]
961pub struct UndoAnalysis {
962    /// RSEG array slot page numbers (from page 0 of MySQL 8.0+ undo tablespace).
963    #[serde(skip_serializing_if = "Vec::is_empty")]
964    pub rseg_slots: Vec<u32>,
965    /// Per-rollback-segment details.
966    #[serde(skip_serializing_if = "Vec::is_empty")]
967    pub rseg_headers: Vec<RsegInfo>,
968    /// Per-undo-segment details (from RSEG slot traversal or direct scan).
969    pub segments: Vec<UndoSegmentInfo>,
970    /// Total undo log headers found.
971    pub total_transactions: usize,
972    /// Count of segments in ACTIVE state.
973    pub active_transactions: usize,
974}
975
976/// Rollback segment summary within an undo tablespace.
977#[derive(Debug, Clone, Serialize)]
978pub struct RsegInfo {
979    /// Page number of the RSEG header page.
980    pub page_no: u32,
981    /// Maximum undo pages this RSEG can use.
982    pub max_size: u32,
983    /// History list length.
984    pub history_size: u32,
985    /// Number of active (non-empty) undo segment slots.
986    pub active_slot_count: usize,
987}
988
989/// Analyze an undo tablespace (MySQL 8.0+ `.ibu` file).
990///
991/// Reads the RSEG array from page 0 (if it's an RSEG_ARRAY page), then
992/// follows the RSEG slots to find rollback segment header pages, and finally
993/// reads undo segment pages to collect log headers.
994///
995/// For non-RSEG-array tablespaces, falls back to scanning all pages for
996/// undo log pages (FIL_PAGE_UNDO_LOG).
997pub fn analyze_undo_tablespace(ts: &mut Tablespace) -> Result<UndoAnalysis, IdbError> {
998    let page0 = ts.read_page(0)?;
999    let page0_type = FilHeader::parse(&page0).map(|h| h.page_type);
1000
1001    if page0_type == Some(PageType::RsegArray) {
1002        analyze_via_rseg_array(ts)
1003    } else {
1004        analyze_via_scan(ts)
1005    }
1006}
1007
1008/// Analyze using RSEG array structure (MySQL 8.0+ undo tablespaces).
1009fn analyze_via_rseg_array(ts: &mut Tablespace) -> Result<UndoAnalysis, IdbError> {
1010    let page0 = ts.read_page(0)?;
1011    let rseg_array = RsegArrayHeader::parse(&page0);
1012    let rseg_slots = rseg_array
1013        .map(|a| {
1014            let max = a.size.min(128) as usize;
1015            RsegArrayHeader::read_slots(&page0, max)
1016        })
1017        .unwrap_or_default();
1018
1019    let mut rseg_headers = Vec::new();
1020    let mut segments = Vec::new();
1021
1022    for &rseg_page_no in &rseg_slots {
1023        let rseg_page = match ts.read_page(rseg_page_no as u64) {
1024            Ok(p) => p,
1025            Err(_) => continue,
1026        };
1027
1028        if let Some(rseg_hdr) = RollbackSegmentHeader::parse(&rseg_page) {
1029            let active_slots = rseg_hdr.active_slots();
1030            rseg_headers.push(RsegInfo {
1031                page_no: rseg_page_no,
1032                max_size: rseg_hdr.max_size,
1033                history_size: rseg_hdr.history_size,
1034                active_slot_count: active_slots.len(),
1035            });
1036
1037            for &undo_page_no in &active_slots {
1038                if let Ok(info) = read_undo_segment(ts, undo_page_no as u64) {
1039                    segments.push(info);
1040                }
1041            }
1042        }
1043    }
1044
1045    let total_transactions: usize = segments.iter().map(|s| s.log_headers.len()).sum();
1046    let active_transactions = segments
1047        .iter()
1048        .filter(|s| s.segment_header.state == UndoState::Active)
1049        .count();
1050
1051    Ok(UndoAnalysis {
1052        rseg_slots,
1053        rseg_headers,
1054        segments,
1055        total_transactions,
1056        active_transactions,
1057    })
1058}
1059
1060/// Fallback: scan all pages for undo log pages.
1061fn analyze_via_scan(ts: &mut Tablespace) -> Result<UndoAnalysis, IdbError> {
1062    let page_count = ts.page_count();
1063    let mut segments = Vec::new();
1064
1065    for page_num in 0..page_count {
1066        let page_data = match ts.read_page(page_num) {
1067            Ok(d) => d,
1068            Err(_) => continue,
1069        };
1070
1071        let fil_hdr = match FilHeader::parse(&page_data) {
1072            Some(h) => h,
1073            None => continue,
1074        };
1075
1076        if fil_hdr.page_type != PageType::UndoLog {
1077            continue;
1078        }
1079
1080        // Only process segment header pages (those with a valid segment header)
1081        let page_hdr = match UndoPageHeader::parse(&page_data) {
1082            Some(h) => h,
1083            None => continue,
1084        };
1085
1086        let seg_hdr = match UndoSegmentHeader::parse(&page_data) {
1087            Some(h) => h,
1088            None => continue,
1089        };
1090
1091        // Only walk log headers if this appears to be a segment's first page
1092        // (indicated by having a non-zero last_log offset)
1093        if seg_hdr.last_log == 0 {
1094            continue;
1095        }
1096
1097        let log_headers = walk_undo_log_headers(&page_data, seg_hdr.last_log);
1098        let record_count =
1099            walk_undo_records(&page_data, page_hdr.start, page_hdr.free, 10000).len();
1100
1101        segments.push(UndoSegmentInfo {
1102            page_no: page_num,
1103            page_header: page_hdr,
1104            segment_header: seg_hdr,
1105            log_headers,
1106            record_count,
1107        });
1108    }
1109
1110    let total_transactions: usize = segments.iter().map(|s| s.log_headers.len()).sum();
1111    let active_transactions = segments
1112        .iter()
1113        .filter(|s| s.segment_header.state == UndoState::Active)
1114        .count();
1115
1116    Ok(UndoAnalysis {
1117        rseg_slots: Vec::new(),
1118        rseg_headers: Vec::new(),
1119        segments,
1120        total_transactions,
1121        active_transactions,
1122    })
1123}
1124
1125/// Read a single undo segment page and extract its headers.
1126fn read_undo_segment(ts: &mut Tablespace, page_no: u64) -> Result<UndoSegmentInfo, IdbError> {
1127    let page_data = ts.read_page(page_no)?;
1128
1129    let page_header = UndoPageHeader::parse(&page_data)
1130        .ok_or_else(|| IdbError::Parse("Cannot parse undo page header".to_string()))?;
1131
1132    let segment_header = UndoSegmentHeader::parse(&page_data)
1133        .ok_or_else(|| IdbError::Parse("Cannot parse undo segment header".to_string()))?;
1134
1135    let log_headers = walk_undo_log_headers(&page_data, segment_header.last_log);
1136
1137    let record_count =
1138        walk_undo_records(&page_data, page_header.start, page_header.free, 10000).len();
1139
1140    Ok(UndoSegmentInfo {
1141        page_no,
1142        page_header,
1143        segment_header,
1144        log_headers,
1145        record_count,
1146    })
1147}
1148
1149#[cfg(test)]
1150mod tests {
1151    use super::*;
1152
1153    #[test]
1154    fn test_undo_page_type() {
1155        assert_eq!(UndoPageType::from_u16(1), UndoPageType::Insert);
1156        assert_eq!(UndoPageType::from_u16(2), UndoPageType::Update);
1157        assert_eq!(UndoPageType::from_u16(1).name(), "INSERT");
1158        assert_eq!(UndoPageType::from_u16(2).name(), "UPDATE");
1159    }
1160
1161    #[test]
1162    fn test_undo_state() {
1163        assert_eq!(UndoState::from_u16(1), UndoState::Active);
1164        assert_eq!(UndoState::from_u16(2), UndoState::Cached);
1165        assert_eq!(UndoState::from_u16(3), UndoState::ToFree);
1166        assert_eq!(UndoState::from_u16(4), UndoState::ToPurge);
1167        assert_eq!(UndoState::from_u16(5), UndoState::Prepared);
1168        assert_eq!(UndoState::from_u16(1).name(), "ACTIVE");
1169    }
1170
1171    #[test]
1172    fn test_undo_page_header_parse() {
1173        let mut page = vec![0u8; 256];
1174        let base = FIL_PAGE_DATA;
1175
1176        // Set page type = INSERT (1)
1177        BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_TYPE..], 1);
1178        // Set start offset
1179        BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_START..], 100);
1180        // Set free offset
1181        BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_FREE..], 200);
1182
1183        let hdr = UndoPageHeader::parse(&page).unwrap();
1184        assert_eq!(hdr.page_type, UndoPageType::Insert);
1185        assert_eq!(hdr.start, 100);
1186        assert_eq!(hdr.free, 200);
1187    }
1188
1189    #[test]
1190    fn test_walk_undo_log_headers_single() {
1191        // Build a page with one undo log header at offset 86 (after page hdr + seg hdr)
1192        let mut page = vec![0u8; 256];
1193        let seg_base = FIL_PAGE_DATA + TRX_UNDO_PAGE_HDR_SIZE;
1194        // last_log = 86 (= FIL_PAGE_DATA + TRX_UNDO_PAGE_HDR_SIZE + TRX_UNDO_SEG_HDR_SIZE)
1195        let log_offset = seg_base + TRX_UNDO_SEG_HDR_SIZE;
1196        BigEndian::write_u16(&mut page[seg_base + TRX_UNDO_LAST_LOG..], log_offset as u16);
1197
1198        // Write undo log header at log_offset
1199        BigEndian::write_u64(&mut page[log_offset..], 1001); // trx_id
1200        BigEndian::write_u64(&mut page[log_offset + 8..], 500); // trx_no
1201        BigEndian::write_u16(&mut page[log_offset + 16..], 1); // del_marks
1202        BigEndian::write_u16(&mut page[log_offset + 18..], 120); // log_start
1203        BigEndian::write_u16(&mut page[log_offset + 30..], 0); // next_log
1204        BigEndian::write_u16(&mut page[log_offset + 32..], 0); // prev_log
1205
1206        let headers = walk_undo_log_headers(&page, log_offset as u16);
1207        assert_eq!(headers.len(), 1);
1208        assert_eq!(headers[0].trx_id, 1001);
1209        assert_eq!(headers[0].trx_no, 500);
1210        assert!(headers[0].del_marks);
1211    }
1212
1213    #[test]
1214    fn test_walk_undo_log_headers_chain() {
1215        // Two undo log headers chained via prev_log
1216        let mut page = vec![0u8; 512];
1217        let offset1 = 100usize;
1218        let offset2 = 200usize;
1219
1220        // Header at offset2 (newer, start of chain)
1221        BigEndian::write_u64(&mut page[offset2..], 2002); // trx_id
1222        BigEndian::write_u64(&mut page[offset2 + 8..], 600);
1223        BigEndian::write_u16(&mut page[offset2 + 30..], 0); // next_log
1224        BigEndian::write_u16(&mut page[offset2 + 32..], offset1 as u16); // prev_log → offset1
1225
1226        // Header at offset1 (older)
1227        BigEndian::write_u64(&mut page[offset1..], 1001); // trx_id
1228        BigEndian::write_u64(&mut page[offset1 + 8..], 500);
1229        BigEndian::write_u16(&mut page[offset1 + 30..], offset2 as u16); // next_log → offset2
1230        BigEndian::write_u16(&mut page[offset1 + 32..], 0); // prev_log (end)
1231
1232        let headers = walk_undo_log_headers(&page, offset2 as u16);
1233        assert_eq!(headers.len(), 2);
1234        assert_eq!(headers[0].trx_id, 2002); // newest first
1235        assert_eq!(headers[1].trx_id, 1001);
1236    }
1237
1238    #[test]
1239    fn test_rollback_segment_header_parse() {
1240        let page_size = 16384;
1241        let mut page = vec![0u8; page_size];
1242        let base = FIL_PAGE_DATA;
1243
1244        BigEndian::write_u32(&mut page[base + TRX_RSEG_MAX_SIZE..], 1000);
1245        BigEndian::write_u32(&mut page[base + TRX_RSEG_HISTORY_SIZE..], 42);
1246
1247        // Write slot 0 with a page number, slot 1 with FIL_NULL
1248        BigEndian::write_u32(&mut page[base + TRX_RSEG_SLOTS_OFFSET..], 5);
1249        BigEndian::write_u32(&mut page[base + TRX_RSEG_SLOTS_OFFSET + 4..], FIL_NULL);
1250
1251        let hdr = RollbackSegmentHeader::parse(&page).unwrap();
1252        assert_eq!(hdr.max_size, 1000);
1253        assert_eq!(hdr.history_size, 42);
1254        let active = hdr.active_slots();
1255        assert_eq!(active, vec![5]);
1256    }
1257
1258    #[test]
1259    fn test_undo_segment_header_parse() {
1260        let mut page = vec![0u8; 256];
1261        let base = FIL_PAGE_DATA + TRX_UNDO_PAGE_HDR_SIZE;
1262
1263        // Set state = ACTIVE (1)
1264        BigEndian::write_u16(&mut page[base + TRX_UNDO_STATE..], 1);
1265        // Set last log offset
1266        BigEndian::write_u16(&mut page[base + TRX_UNDO_LAST_LOG..], 150);
1267
1268        let hdr = UndoSegmentHeader::parse(&page).unwrap();
1269        assert_eq!(hdr.state, UndoState::Active);
1270        assert_eq!(hdr.last_log, 150);
1271    }
1272
1273    #[test]
1274    fn test_undo_record_type_classification() {
1275        assert_eq!(
1276            UndoRecordType::from_type_byte(11),
1277            UndoRecordType::InsertRec
1278        );
1279        assert_eq!(
1280            UndoRecordType::from_type_byte(12),
1281            UndoRecordType::UpdExistRec
1282        );
1283        assert_eq!(
1284            UndoRecordType::from_type_byte(13),
1285            UndoRecordType::UpdDelRec
1286        );
1287        assert_eq!(
1288            UndoRecordType::from_type_byte(14),
1289            UndoRecordType::DelMarkRec
1290        );
1291        assert_eq!(
1292            UndoRecordType::from_type_byte(0),
1293            UndoRecordType::Unknown(0)
1294        );
1295    }
1296
1297    // -----------------------------------------------------------------------
1298    // read_compressed tests
1299    // -----------------------------------------------------------------------
1300
1301    #[test]
1302    fn test_read_compressed_1byte() {
1303        // Values 0..0x7F are encoded as a single byte
1304        assert_eq!(read_compressed(&[0x00], 0), Some((0, 1)));
1305        assert_eq!(read_compressed(&[0x7F], 0), Some((127, 1)));
1306        assert_eq!(read_compressed(&[0x42], 0), Some((0x42, 1)));
1307    }
1308
1309    #[test]
1310    fn test_read_compressed_2byte() {
1311        // 0x80..0xBF: (b-0x80)<<8 | next
1312        assert_eq!(read_compressed(&[0x80, 0x01], 0), Some((1, 2)));
1313        assert_eq!(read_compressed(&[0xBF, 0xFF], 0), Some((0x3FFF, 2)));
1314    }
1315
1316    #[test]
1317    fn test_read_compressed_3byte() {
1318        // 0xC0..0xDF: (b-0xC0)<<16 | next_2
1319        assert_eq!(read_compressed(&[0xC0, 0x00, 0x01], 0), Some((1, 3)));
1320        assert_eq!(read_compressed(&[0xDF, 0xFF, 0xFF], 0), Some((0x1FFFFF, 3)));
1321    }
1322
1323    #[test]
1324    fn test_read_compressed_4byte() {
1325        // 0xE0..0xEF: (b-0xE0)<<24 | next_3
1326        assert_eq!(read_compressed(&[0xE0, 0x00, 0x00, 0x01], 0), Some((1, 4)));
1327        assert_eq!(
1328            read_compressed(&[0xEF, 0xFF, 0xFF, 0xFF], 0),
1329            Some((0x0FFFFFFF, 4))
1330        );
1331    }
1332
1333    #[test]
1334    fn test_undo_record_type_masks_upper_bits() {
1335        // Upper 4 bits are compilation info flags — should be masked off
1336        assert_eq!(
1337            UndoRecordType::from_type_byte(0xFB), // 0xF0 | 11
1338            UndoRecordType::InsertRec
1339        );
1340        assert_eq!(
1341            UndoRecordType::from_type_byte(0x2E), // 0x20 | 14
1342            UndoRecordType::DelMarkRec
1343        );
1344    }
1345
1346    #[test]
1347    fn test_read_compressed_5byte() {
1348        // 0xF0: next 4 bytes as u32
1349        assert_eq!(
1350            read_compressed(&[0xF0, 0x00, 0x00, 0x00, 0x42], 0),
1351            Some((0x42, 5))
1352        );
1353        assert_eq!(
1354            read_compressed(&[0xF0, 0xFF, 0xFF, 0xFF, 0xFF], 0),
1355            Some((0xFFFFFFFF, 5))
1356        );
1357    }
1358
1359    #[test]
1360    fn test_undo_record_type_names() {
1361        assert_eq!(UndoRecordType::InsertRec.name(), "INSERT");
1362        assert_eq!(UndoRecordType::UpdExistRec.name(), "UPD_EXIST");
1363        assert_eq!(UndoRecordType::UpdDelRec.name(), "UPD_DEL");
1364        assert_eq!(UndoRecordType::DelMarkRec.name(), "DEL_MARK");
1365        assert_eq!(UndoRecordType::Unknown(0).name(), "UNKNOWN");
1366    }
1367
1368    #[test]
1369    fn test_walk_undo_records_single() {
1370        // Build a page with one undo record at offset 100
1371        let mut page = vec![0u8; 256];
1372        let offset = 100usize;
1373
1374        // next_offset = 0 (last record)
1375        BigEndian::write_u16(&mut page[offset..], 0);
1376        // type byte = 11 (INSERT)
1377        page[offset + 2] = 11;
1378
1379        let records = walk_undo_records(&page, offset as u16, 200, 100);
1380        assert_eq!(records.len(), 1);
1381        assert_eq!(records[0].offset, 100);
1382        assert_eq!(records[0].record_type, UndoRecordType::InsertRec);
1383        assert_eq!(records[0].next_offset, 0);
1384    }
1385
1386    #[test]
1387    fn test_walk_undo_records_chain() {
1388        // Build a page with 3 chained undo records
1389        let mut page = vec![0u8; 512];
1390        let o1 = 100usize;
1391        let o2 = 150usize;
1392        let o3 = 200usize;
1393
1394        // Record 1: next -> o2, type INSERT
1395        BigEndian::write_u16(&mut page[o1..], o2 as u16);
1396        page[o1 + 2] = 11;
1397
1398        // Record 2: next -> o3, type UPD_EXIST
1399        BigEndian::write_u16(&mut page[o2..], o3 as u16);
1400        page[o2 + 2] = 12;
1401
1402        // Record 3: next -> 0 (end), type DEL_MARK
1403        BigEndian::write_u16(&mut page[o3..], 0);
1404        page[o3 + 2] = 14;
1405
1406        let records = walk_undo_records(&page, o1 as u16, 300, 100);
1407        assert_eq!(records.len(), 3);
1408        assert_eq!(records[0].record_type, UndoRecordType::InsertRec);
1409        assert_eq!(records[1].record_type, UndoRecordType::UpdExistRec);
1410        assert_eq!(records[2].record_type, UndoRecordType::DelMarkRec);
1411        assert_eq!(records[0].data_len, 47); // 150 - 100 - 3
1412        assert_eq!(records[1].data_len, 47); // 200 - 150 - 3
1413    }
1414
1415    #[test]
1416    fn test_walk_undo_records_respects_free_offset() {
1417        // Record chain continues past free_offset -- should stop
1418        let mut page = vec![0u8; 512];
1419        let o1 = 100usize;
1420        let o2 = 200usize;
1421
1422        BigEndian::write_u16(&mut page[o1..], o2 as u16);
1423        page[o1 + 2] = 11;
1424        BigEndian::write_u16(&mut page[o2..], 0);
1425        page[o2 + 2] = 12;
1426
1427        // free_offset = 150, so o2 (200) is past free -- should get only 1 record
1428        let records = walk_undo_records(&page, o1 as u16, 150, 100);
1429        assert_eq!(records.len(), 1);
1430    }
1431
1432    #[test]
1433    fn test_walk_undo_records_respects_max() {
1434        // Chain of 3 but max_records = 2
1435        let mut page = vec![0u8; 512];
1436        let o1 = 100usize;
1437        let o2 = 150usize;
1438        let o3 = 200usize;
1439
1440        BigEndian::write_u16(&mut page[o1..], o2 as u16);
1441        page[o1 + 2] = 11;
1442        BigEndian::write_u16(&mut page[o2..], o3 as u16);
1443        page[o2 + 2] = 12;
1444        BigEndian::write_u16(&mut page[o3..], 0);
1445        page[o3 + 2] = 14;
1446
1447        let records = walk_undo_records(&page, o1 as u16, 300, 2);
1448        assert_eq!(records.len(), 2);
1449    }
1450
1451    #[test]
1452    fn test_walk_undo_records_empty() {
1453        let page = vec![0u8; 256];
1454        // start_offset = 0 means no records
1455        let records = walk_undo_records(&page, 0, 200, 100);
1456        assert_eq!(records.len(), 0);
1457    }
1458
1459    #[test]
1460    fn test_read_compressed_insufficient_data() {
1461        assert_eq!(read_compressed(&[], 0), None);
1462        assert_eq!(read_compressed(&[0x80], 0), None); // needs 2 bytes
1463        assert_eq!(read_compressed(&[0xC0, 0x00], 0), None); // needs 3 bytes
1464        assert_eq!(read_compressed(&[0xE0, 0x00, 0x00], 0), None); // needs 4 bytes
1465        assert_eq!(read_compressed(&[0xF0, 0x00, 0x00, 0x00], 0), None); // needs 5 bytes
1466    }
1467
1468    #[test]
1469    fn test_read_compressed_with_offset() {
1470        let data = [0x00, 0x00, 0x42];
1471        assert_eq!(read_compressed(&data, 2), Some((0x42, 1)));
1472    }
1473
1474    #[test]
1475    fn test_read_compressed_invalid_leading_byte() {
1476        // 0xF1..0xFF are not valid leading bytes
1477        assert_eq!(read_compressed(&[0xF1], 0), None);
1478        assert_eq!(read_compressed(&[0xFF], 0), None);
1479    }
1480
1481    // -----------------------------------------------------------------------
1482    // UndoRecordType from_u8 tests
1483    // -----------------------------------------------------------------------
1484
1485    #[test]
1486    fn test_undo_record_type_from_u8() {
1487        assert_eq!(UndoRecordType::from_u8(11), UndoRecordType::InsertRec);
1488        assert_eq!(UndoRecordType::from_u8(12), UndoRecordType::UpdExistRec);
1489        assert_eq!(UndoRecordType::from_u8(13), UndoRecordType::UpdDelRec);
1490        assert_eq!(UndoRecordType::from_u8(14), UndoRecordType::DelMarkRec);
1491        assert_eq!(UndoRecordType::from_u8(99), UndoRecordType::Unknown(99));
1492    }
1493
1494    // -----------------------------------------------------------------------
1495    // parse_undo_records tests (detailed undo record parsing)
1496    // -----------------------------------------------------------------------
1497
1498    #[test]
1499    fn test_parse_undo_records_empty_page() {
1500        // Page where start == free (no records)
1501        let mut page = vec![0u8; 256];
1502        let base = FIL_PAGE_DATA;
1503        BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_TYPE..], 2); // UPDATE
1504        BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_START..], 100);
1505        BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_FREE..], 100); // start == free
1506        assert!(parse_undo_records(&page).is_empty());
1507    }
1508
1509    #[test]
1510    fn test_parse_undo_records_single_insert() {
1511        let mut page = vec![0u8; 512];
1512        let base = FIL_PAGE_DATA;
1513
1514        let start_offset: u16 = 100;
1515        let free_offset: u16 = 120;
1516
1517        BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_TYPE..], 1); // INSERT
1518        BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_START..], start_offset);
1519        BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_FREE..], free_offset);
1520
1521        let mut pos = start_offset as usize;
1522
1523        // next record offset = 0 (last record)
1524        BigEndian::write_u16(&mut page[pos..], 0);
1525        pos += 2;
1526
1527        // type_cmpl = 11 (InsertRec)
1528        page[pos] = 11;
1529        pos += 1;
1530
1531        // undo_no = 5 (single byte compressed)
1532        page[pos] = 5;
1533        pos += 1;
1534
1535        // table_id = 42 (single byte compressed)
1536        page[pos] = 42;
1537        pos += 1;
1538
1539        // PK field: length=4, data=[0,0,0,1]
1540        page[pos] = 4; // compressed length
1541        pos += 1;
1542        BigEndian::write_u32(&mut page[pos..], 1);
1543
1544        let records = parse_undo_records(&page);
1545        assert_eq!(records.len(), 1);
1546        assert_eq!(records[0].record_type, UndoRecordType::InsertRec);
1547        assert_eq!(records[0].undo_no, 5);
1548        assert_eq!(records[0].table_id, 42);
1549        assert_eq!(records[0].pk_fields.len(), 1);
1550        assert_eq!(records[0].pk_fields[0], vec![0, 0, 0, 1]);
1551        assert!(records[0].trx_id.is_none());
1552    }
1553
1554    #[test]
1555    fn test_parse_undo_records_del_mark() {
1556        let mut page = vec![0u8; 512];
1557        let base = FIL_PAGE_DATA;
1558
1559        let start_offset: u16 = 100;
1560        let free_offset: u16 = 200;
1561
1562        BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_TYPE..], 2); // UPDATE
1563        BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_START..], start_offset);
1564        BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_FREE..], free_offset);
1565
1566        let mut pos = start_offset as usize;
1567
1568        // next = 0 (last)
1569        BigEndian::write_u16(&mut page[pos..], 0);
1570        pos += 2;
1571
1572        // type_cmpl = 14 (DelMarkRec)
1573        page[pos] = 14;
1574        pos += 1;
1575
1576        // undo_no = 10
1577        page[pos] = 10;
1578        pos += 1;
1579
1580        // table_id = 7
1581        page[pos] = 7;
1582        pos += 1;
1583
1584        // For DEL_MARK_REC: trx_id + roll_ptr + update_vector come BEFORE PK fields
1585
1586        // trx_id = 100 (fixed 6-byte big-endian via mach_write_to_6)
1587        page[pos] = 0;
1588        page[pos + 1] = 0;
1589        page[pos + 2] = 0;
1590        page[pos + 3] = 0;
1591        page[pos + 4] = 0;
1592        page[pos + 5] = 100;
1593        pos += 6;
1594
1595        // roll_ptr = 7 bytes
1596        for i in 0..7 {
1597            page[pos + i] = (i + 1) as u8;
1598        }
1599        pos += 7;
1600
1601        // update vector: 0 fields
1602        page[pos] = 0;
1603        pos += 1;
1604
1605        // PK field: length=2, data=[0x00, 0x05]
1606        page[pos] = 2;
1607        pos += 1;
1608        page[pos] = 0x00;
1609        page[pos + 1] = 0x05;
1610
1611        let records = parse_undo_records(&page);
1612        assert_eq!(records.len(), 1);
1613        assert_eq!(records[0].record_type, UndoRecordType::DelMarkRec);
1614        assert_eq!(records[0].table_id, 7);
1615        assert_eq!(records[0].trx_id, Some(100));
1616        assert_eq!(records[0].roll_ptr, Some([1, 2, 3, 4, 5, 6, 7]));
1617    }
1618
1619    #[test]
1620    fn test_parse_undo_records_bounds_safety() {
1621        // Page too small to parse even the header
1622        let page = vec![0u8; 30];
1623        assert!(parse_undo_records(&page).is_empty());
1624    }
1625
1626    #[test]
1627    fn test_detailed_undo_record_serialization() {
1628        let rec = DetailedUndoRecord {
1629            offset: 100,
1630            record_type: UndoRecordType::DelMarkRec,
1631            undo_no: 5,
1632            table_id: 42,
1633            pk_fields: vec![vec![0, 0, 0, 1]],
1634            trx_id: Some(100),
1635            roll_ptr: Some([1, 2, 3, 4, 5, 6, 7]),
1636            update_fields: vec![],
1637        };
1638        let json = serde_json::to_string(&rec).unwrap();
1639        assert!(json.contains("\"record_type\":\"DelMarkRec\""));
1640        assert!(json.contains("\"table_id\":42"));
1641        assert!(json.contains("\"trx_id\":100"));
1642    }
1643}