Skip to main content

idb/innodb/
backup.rs

1//! Incremental backup analysis.
2//!
3//! Detects changed pages between two tablespace snapshots via LSN comparison,
4//! and validates backup chains by parsing XtraBackup checkpoint files for
5//! LSN continuity.
6//!
7//! # Delta detection
8//!
9//! [`diff_backup_lsn`] compares a base (backup) tablespace against a current
10//! (live) copy page-by-page using their FIL header LSN values. Pages where
11//! the current LSN exceeds the base LSN are classified as modified.
12//!
13//! # Chain validation
14//!
15//! [`scan_backup_chain`] walks a directory of XtraBackup backup sets, parses
16//! each `xtrabackup_checkpoints` file, and verifies that the `to_lsn` of each
17//! backup connects to the `from_lsn` of the next.
18
19use std::collections::BTreeMap;
20use std::path::{Path, PathBuf};
21
22use serde::Serialize;
23
24use crate::innodb::checksum::validate_checksum;
25use crate::innodb::page::FilHeader;
26use crate::innodb::tablespace::Tablespace;
27use crate::IdbError;
28
29// ---------------------------------------------------------------------------
30// Delta detection (Issue #152)
31// ---------------------------------------------------------------------------
32
33/// Classification of a page's change status between two snapshots.
34#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
35pub enum PageChangeStatus {
36    /// LSN identical in both snapshots.
37    Unchanged,
38    /// Current LSN is greater than base LSN.
39    Modified,
40    /// Page exists only in the current tablespace (tablespace grew).
41    Added,
42    /// Page exists only in the base tablespace (tablespace shrank).
43    Removed,
44    /// Current LSN is less than base LSN (unusual — possible point-in-time restore).
45    Regressed,
46}
47
48/// Per-page delta detail.
49#[derive(Debug, Clone, Serialize)]
50pub struct PageDelta {
51    /// Page number.
52    pub page_number: u64,
53    /// Change status.
54    pub status: PageChangeStatus,
55    /// Page type name from FIL header.
56    pub page_type: String,
57    /// LSN in the base/backup tablespace.
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub base_lsn: Option<u64>,
60    /// LSN in the current tablespace.
61    #[serde(skip_serializing_if = "Option::is_none")]
62    pub current_lsn: Option<u64>,
63    /// Whether the current page's checksum is valid.
64    pub checksum_valid: bool,
65}
66
67/// Summary counts of changed pages.
68#[derive(Debug, Clone, Serialize)]
69pub struct BackupDiffSummary {
70    pub unchanged: u64,
71    pub modified: u64,
72    pub added: u64,
73    pub removed: u64,
74    pub regressed: u64,
75}
76
77/// Full diff report between two tablespace snapshots.
78#[derive(Debug, Clone, Serialize)]
79pub struct BackupDiffReport {
80    /// Path to the base/backup tablespace.
81    pub base_file: String,
82    /// Path to the current tablespace.
83    pub current_file: String,
84    /// Tablespace space ID.
85    pub space_id: u32,
86    /// Page size in bytes.
87    pub page_size: u32,
88    /// Number of pages in the base tablespace.
89    pub base_page_count: u64,
90    /// Number of pages in the current tablespace.
91    pub current_page_count: u64,
92    /// Maximum LSN observed across all base pages.
93    pub base_max_lsn: u64,
94    /// Maximum LSN observed across all current pages.
95    pub current_max_lsn: u64,
96    /// Summary of page change counts.
97    pub summary: BackupDiffSummary,
98    /// Per-page delta details (only populated in verbose mode).
99    #[serde(skip_serializing_if = "Vec::is_empty")]
100    pub pages: Vec<PageDelta>,
101    /// Distribution of modified pages by page type.
102    pub modified_page_types: BTreeMap<String, u64>,
103}
104
105/// Compare two tablespace snapshots using LSN-based delta detection.
106///
107/// Opens both tablespaces, validates they share the same space ID, then
108/// compares each page's LSN to classify it as unchanged, modified, added,
109/// removed, or regressed.
110pub fn diff_backup_lsn(
111    base: &mut Tablespace,
112    current: &mut Tablespace,
113    base_path: &str,
114    current_path: &str,
115    verbose: bool,
116) -> Result<BackupDiffReport, IdbError> {
117    let page_size = current.page_size();
118
119    // Validate same space_id
120    let base_space_id = base.fsp_header().map(|f| f.space_id).unwrap_or(0);
121    let current_space_id = current.fsp_header().map(|f| f.space_id).unwrap_or(0);
122    if base_space_id != current_space_id {
123        return Err(IdbError::Argument(format!(
124            "Space ID mismatch: base has {} but current has {}",
125            base_space_id, current_space_id
126        )));
127    }
128
129    let base_count = base.page_count();
130    let current_count = current.page_count();
131    let max_pages = base_count.max(current_count);
132    let vendor_info = current.vendor_info().clone();
133
134    let mut summary = BackupDiffSummary {
135        unchanged: 0,
136        modified: 0,
137        added: 0,
138        removed: 0,
139        regressed: 0,
140    };
141    let mut pages = Vec::new();
142    let mut modified_types: BTreeMap<String, u64> = BTreeMap::new();
143    let mut base_max_lsn = 0u64;
144    let mut current_max_lsn = 0u64;
145
146    for page_num in 0..max_pages {
147        let base_page = if page_num < base_count {
148            Some(base.read_page(page_num)?)
149        } else {
150            None
151        };
152        let current_page = if page_num < current_count {
153            Some(current.read_page(page_num)?)
154        } else {
155            None
156        };
157
158        match (&base_page, &current_page) {
159            (Some(bp), Some(cp)) => {
160                let bh = FilHeader::parse(bp);
161                let ch = FilHeader::parse(cp);
162                let b_lsn = bh.as_ref().map(|h| h.lsn).unwrap_or(0);
163                let c_lsn = ch.as_ref().map(|h| h.lsn).unwrap_or(0);
164                let is_empty = cp.iter().all(|&b| b == 0);
165
166                base_max_lsn = base_max_lsn.max(b_lsn);
167                current_max_lsn = current_max_lsn.max(c_lsn);
168
169                let status = if is_empty && bp.iter().all(|&b| b == 0) {
170                    PageChangeStatus::Unchanged
171                } else if c_lsn > b_lsn {
172                    PageChangeStatus::Modified
173                } else if c_lsn == b_lsn {
174                    PageChangeStatus::Unchanged
175                } else {
176                    PageChangeStatus::Regressed
177                };
178
179                let page_type_name = ch
180                    .as_ref()
181                    .map(|h| h.page_type.name().to_string())
182                    .unwrap_or_else(|| "UNKNOWN".to_string());
183
184                if status == PageChangeStatus::Modified {
185                    *modified_types.entry(page_type_name.clone()).or_insert(0) += 1;
186                }
187
188                let checksum_valid = validate_checksum(cp, page_size, Some(&vendor_info)).valid;
189
190                match status {
191                    PageChangeStatus::Unchanged => summary.unchanged += 1,
192                    PageChangeStatus::Modified => summary.modified += 1,
193                    PageChangeStatus::Regressed => summary.regressed += 1,
194                    _ => {}
195                }
196
197                if verbose {
198                    pages.push(PageDelta {
199                        page_number: page_num,
200                        status,
201                        page_type: page_type_name,
202                        base_lsn: Some(b_lsn),
203                        current_lsn: Some(c_lsn),
204                        checksum_valid,
205                    });
206                }
207            }
208            (None, Some(cp)) => {
209                // Page only in current (tablespace grew)
210                let ch = FilHeader::parse(cp);
211                let c_lsn = ch.as_ref().map(|h| h.lsn).unwrap_or(0);
212                current_max_lsn = current_max_lsn.max(c_lsn);
213                let page_type_name = ch
214                    .as_ref()
215                    .map(|h| h.page_type.name().to_string())
216                    .unwrap_or_else(|| "UNKNOWN".to_string());
217                let checksum_valid = validate_checksum(cp, page_size, Some(&vendor_info)).valid;
218
219                summary.added += 1;
220                if verbose {
221                    pages.push(PageDelta {
222                        page_number: page_num,
223                        status: PageChangeStatus::Added,
224                        page_type: page_type_name,
225                        base_lsn: None,
226                        current_lsn: Some(c_lsn),
227                        checksum_valid,
228                    });
229                }
230            }
231            (Some(bp), None) => {
232                // Page only in base (tablespace shrank)
233                let bh = FilHeader::parse(bp);
234                let b_lsn = bh.as_ref().map(|h| h.lsn).unwrap_or(0);
235                base_max_lsn = base_max_lsn.max(b_lsn);
236                let page_type_name = bh
237                    .as_ref()
238                    .map(|h| h.page_type.name().to_string())
239                    .unwrap_or_else(|| "UNKNOWN".to_string());
240
241                summary.removed += 1;
242                if verbose {
243                    pages.push(PageDelta {
244                        page_number: page_num,
245                        status: PageChangeStatus::Removed,
246                        page_type: page_type_name,
247                        base_lsn: Some(b_lsn),
248                        current_lsn: None,
249                        checksum_valid: false,
250                    });
251                }
252            }
253            (None, None) => {}
254        }
255    }
256
257    Ok(BackupDiffReport {
258        base_file: base_path.to_string(),
259        current_file: current_path.to_string(),
260        space_id: current_space_id,
261        page_size,
262        base_page_count: base_count,
263        current_page_count: current_count,
264        base_max_lsn,
265        current_max_lsn,
266        summary,
267        pages,
268        modified_page_types: modified_types,
269    })
270}
271
272// ---------------------------------------------------------------------------
273// Backup chain validation (Issue #153)
274// ---------------------------------------------------------------------------
275
276/// Parsed XtraBackup checkpoint metadata.
277#[derive(Debug, Clone, Serialize)]
278pub struct BackupCheckpoint {
279    /// Path to the backup directory.
280    pub path: PathBuf,
281    /// Backup type string (e.g., "full-backuped", "incremental").
282    pub backup_type: String,
283    /// Start LSN of the backup (0 for full backups).
284    pub from_lsn: u64,
285    /// End LSN of the backup.
286    pub to_lsn: u64,
287    /// Last LSN seen during the backup process.
288    #[serde(skip_serializing_if = "Option::is_none")]
289    pub last_lsn: Option<u64>,
290    /// Whether the backup was compacted.
291    pub compact: bool,
292}
293
294/// Kind of chain anomaly.
295#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
296pub enum ChainAnomalyKind {
297    /// LSN gap between consecutive backups.
298    Gap,
299    /// LSN overlap between consecutive backups.
300    Overlap,
301    /// No full backup found as chain anchor.
302    MissingFull,
303}
304
305/// A gap, overlap, or other anomaly in the backup chain.
306#[derive(Debug, Clone, Serialize)]
307pub struct ChainAnomaly {
308    /// Kind of anomaly.
309    pub kind: ChainAnomalyKind,
310    /// Indices of the two backups involved (in the sorted backups array).
311    pub between: (usize, usize),
312    /// The LSN gap/overlap boundary values.
313    pub from_lsn: u64,
314    pub to_lsn: u64,
315    /// Human-readable description.
316    pub message: String,
317}
318
319/// Backup chain analysis report.
320#[derive(Debug, Clone, Serialize)]
321pub struct BackupChainReport {
322    /// Path to the backup directory.
323    pub chain_dir: String,
324    /// Backup sets sorted by from_lsn.
325    pub backups: Vec<BackupCheckpoint>,
326    /// Whether the chain is valid (no gaps, has full backup).
327    pub chain_valid: bool,
328    /// Total LSN range covered (min from_lsn, max to_lsn).
329    #[serde(skip_serializing_if = "Option::is_none")]
330    pub total_lsn_range: Option<(u64, u64)>,
331    /// Anomalies detected in the chain.
332    #[serde(skip_serializing_if = "Vec::is_empty")]
333    pub anomalies: Vec<ChainAnomaly>,
334    /// Whether a full backup was found.
335    pub has_full_backup: bool,
336}
337
338/// Parse an XtraBackup `xtrabackup_checkpoints` file.
339///
340/// The file contains `key = value` lines. Required fields are `backup_type`,
341/// `from_lsn`, and `to_lsn`. Unknown keys are silently ignored.
342pub fn parse_xtrabackup_checkpoints(path: &Path) -> Result<BackupCheckpoint, IdbError> {
343    let content = std::fs::read_to_string(path)
344        .map_err(|e| IdbError::Io(format!("Failed to read {}: {}", path.display(), e)))?;
345
346    parse_xtrabackup_checkpoints_str(&content, path)
347}
348
349/// Parse checkpoint content from a string (testable without filesystem).
350fn parse_xtrabackup_checkpoints_str(
351    content: &str,
352    path: &Path,
353) -> Result<BackupCheckpoint, IdbError> {
354    let mut backup_type = None;
355    let mut from_lsn = None;
356    let mut to_lsn = None;
357    let mut last_lsn = None;
358    let mut compact = false;
359
360    for line in content.lines() {
361        let line = line.trim();
362        if line.is_empty() || line.starts_with('#') {
363            continue;
364        }
365        if let Some((key, value)) = line.split_once('=') {
366            let key = key.trim();
367            let value = value.trim();
368            match key {
369                "backup_type" => backup_type = Some(value.to_string()),
370                "from_lsn" => from_lsn = value.parse::<u64>().ok(),
371                "to_lsn" => to_lsn = value.parse::<u64>().ok(),
372                "last_lsn" => last_lsn = value.parse::<u64>().ok(),
373                "compact" => compact = value == "1",
374                _ => {} // ignore unknown keys
375            }
376        }
377    }
378
379    let backup_type = backup_type
380        .ok_or_else(|| IdbError::Parse(format!("Missing backup_type in {}", path.display())))?;
381    let from_lsn = from_lsn
382        .ok_or_else(|| IdbError::Parse(format!("Missing from_lsn in {}", path.display())))?;
383    let to_lsn =
384        to_lsn.ok_or_else(|| IdbError::Parse(format!("Missing to_lsn in {}", path.display())))?;
385
386    // Use parent directory as the backup path (the checkpoint file is inside it)
387    let backup_dir = path.parent().unwrap_or(path).to_path_buf();
388
389    Ok(BackupCheckpoint {
390        path: backup_dir,
391        backup_type,
392        from_lsn,
393        to_lsn,
394        last_lsn,
395        compact,
396    })
397}
398
399/// Scan a directory for XtraBackup backup sets and validate chain continuity.
400///
401/// Walks immediate subdirectories looking for `xtrabackup_checkpoints` files,
402/// parses each, sorts by `from_lsn`, and validates that the LSN chain is
403/// continuous (each backup's `to_lsn` connects to the next's `from_lsn`).
404#[cfg(not(target_arch = "wasm32"))]
405pub fn scan_backup_chain(dir: &Path) -> Result<BackupChainReport, IdbError> {
406    let mut checkpoints = Vec::new();
407
408    // Check the directory itself for a checkpoint file
409    let root_checkpoint = dir.join("xtrabackup_checkpoints");
410    if root_checkpoint.exists() {
411        if let Ok(cp) = parse_xtrabackup_checkpoints(&root_checkpoint) {
412            checkpoints.push(cp);
413        }
414    }
415
416    // Walk immediate subdirectories
417    let entries = std::fs::read_dir(dir)
418        .map_err(|e| IdbError::Io(format!("Failed to read directory {}: {}", dir.display(), e)))?;
419
420    for entry in entries {
421        let entry = match entry {
422            Ok(e) => e,
423            Err(_) => continue,
424        };
425        let path = entry.path();
426        if path.is_dir() {
427            let checkpoint_file = path.join("xtrabackup_checkpoints");
428            if checkpoint_file.exists() {
429                if let Ok(cp) = parse_xtrabackup_checkpoints(&checkpoint_file) {
430                    checkpoints.push(cp);
431                }
432            }
433        }
434    }
435
436    if checkpoints.is_empty() {
437        return Ok(BackupChainReport {
438            chain_dir: dir.to_string_lossy().to_string(),
439            backups: Vec::new(),
440            chain_valid: false,
441            total_lsn_range: None,
442            anomalies: vec![ChainAnomaly {
443                kind: ChainAnomalyKind::MissingFull,
444                between: (0, 0),
445                from_lsn: 0,
446                to_lsn: 0,
447                message: "No xtrabackup_checkpoints files found".to_string(),
448            }],
449            has_full_backup: false,
450        });
451    }
452
453    // Sort by from_lsn
454    checkpoints.sort_by_key(|c| c.from_lsn);
455
456    validate_chain(dir, checkpoints)
457}
458
459/// Validate a sorted list of backup checkpoints for chain continuity.
460#[cfg(not(target_arch = "wasm32"))]
461fn validate_chain(
462    dir: &Path,
463    backups: Vec<BackupCheckpoint>,
464) -> Result<BackupChainReport, IdbError> {
465    let mut anomalies = Vec::new();
466
467    // Check for full backup
468    let has_full = backups
469        .iter()
470        .any(|b| b.backup_type.contains("full") || b.from_lsn == 0);
471
472    if !has_full {
473        anomalies.push(ChainAnomaly {
474            kind: ChainAnomalyKind::MissingFull,
475            between: (0, 0),
476            from_lsn: 0,
477            to_lsn: 0,
478            message: "No full backup found in chain".to_string(),
479        });
480    }
481
482    // Check LSN continuity between consecutive backups
483    for i in 0..backups.len().saturating_sub(1) {
484        let prev_to = backups[i].to_lsn;
485        let next_from = backups[i + 1].from_lsn;
486
487        if prev_to < next_from {
488            anomalies.push(ChainAnomaly {
489                kind: ChainAnomalyKind::Gap,
490                between: (i, i + 1),
491                from_lsn: prev_to,
492                to_lsn: next_from,
493                message: format!(
494                    "LSN gap between backup {} and {}: {} → {}",
495                    i + 1,
496                    i + 2,
497                    prev_to,
498                    next_from
499                ),
500            });
501        } else if prev_to > next_from {
502            anomalies.push(ChainAnomaly {
503                kind: ChainAnomalyKind::Overlap,
504                between: (i, i + 1),
505                from_lsn: next_from,
506                to_lsn: prev_to,
507                message: format!(
508                    "LSN overlap between backup {} and {}: {} overlaps {}",
509                    i + 1,
510                    i + 2,
511                    prev_to,
512                    next_from
513                ),
514            });
515        }
516    }
517
518    let chain_valid = has_full
519        && !anomalies
520            .iter()
521            .any(|a| a.kind == ChainAnomalyKind::Gap || a.kind == ChainAnomalyKind::MissingFull);
522
523    let total_lsn_range = if !backups.is_empty() {
524        Some((
525            backups.first().map(|b| b.from_lsn).unwrap_or(0),
526            backups.last().map(|b| b.to_lsn).unwrap_or(0),
527        ))
528    } else {
529        None
530    };
531
532    Ok(BackupChainReport {
533        chain_dir: dir.to_string_lossy().to_string(),
534        backups,
535        chain_valid,
536        total_lsn_range,
537        anomalies,
538        has_full_backup: has_full,
539    })
540}
541
542// ---------------------------------------------------------------------------
543// Tests
544// ---------------------------------------------------------------------------
545
546#[cfg(test)]
547mod tests {
548    use super::*;
549    use crate::innodb::constants::*;
550    use byteorder::{BigEndian, ByteOrder};
551    use std::io::Write;
552    use tempfile::NamedTempFile;
553
554    /// Build a page with given page_number, page_type, LSN, and space_id.
555    fn build_page(
556        page_number: u32,
557        page_type: u16,
558        lsn: u64,
559        space_id: u32,
560        page_size: u32,
561    ) -> Vec<u8> {
562        let ps = page_size as usize;
563        let mut page = vec![0u8; ps];
564
565        BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], page_number);
566        BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], 0xFFFFFFFF);
567        BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], 0xFFFFFFFF);
568        BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], lsn);
569        BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], page_type);
570        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
571
572        // Trailer LSN low 32 bits
573        BigEndian::write_u32(&mut page[ps - 4..], lsn as u32);
574
575        // CRC-32C checksum
576        let crc1 = crc32c::crc32c(&page[4..26]);
577        let crc2 = crc32c::crc32c(&page[38..ps - 8]);
578        let checksum = crc1 ^ crc2;
579        BigEndian::write_u32(&mut page[0..4], checksum);
580        BigEndian::write_u32(&mut page[ps - 8..ps - 4], checksum);
581
582        page
583    }
584
585    fn build_fsp_page(space_id: u32, lsn: u64, page_size: u32) -> Vec<u8> {
586        let mut page = build_page(0, 8, lsn, space_id, page_size); // FSP_HDR = 8
587        let base = FIL_PAGE_DATA;
588        BigEndian::write_u32(&mut page[base..], space_id);
589
590        // Re-stamp checksum
591        let ps = page_size as usize;
592        let crc1 = crc32c::crc32c(&page[4..26]);
593        let crc2 = crc32c::crc32c(&page[38..ps - 8]);
594        let checksum = crc1 ^ crc2;
595        BigEndian::write_u32(&mut page[0..4], checksum);
596        BigEndian::write_u32(&mut page[ps - 8..ps - 4], checksum);
597
598        page
599    }
600
601    fn write_tablespace(pages: &[Vec<u8>]) -> NamedTempFile {
602        let mut file = NamedTempFile::new().unwrap();
603        for page in pages {
604            file.write_all(page).unwrap();
605        }
606        file.flush().unwrap();
607        file
608    }
609
610    // -- Delta detection tests --
611
612    #[test]
613    fn test_diff_identical_tablespaces() {
614        let ps = 16384u32;
615        let fsp = build_fsp_page(1, 1000, ps);
616        let idx = build_page(1, 17855, 2000, 1, ps);
617        let base_file = write_tablespace(&[fsp.clone(), idx.clone()]);
618        let current_file = write_tablespace(&[fsp, idx]);
619
620        let mut base = Tablespace::open(base_file.path().to_str().unwrap()).unwrap();
621        let mut current = Tablespace::open(current_file.path().to_str().unwrap()).unwrap();
622
623        let report =
624            diff_backup_lsn(&mut base, &mut current, "base.ibd", "current.ibd", false).unwrap();
625
626        assert_eq!(report.summary.unchanged, 2);
627        assert_eq!(report.summary.modified, 0);
628        assert_eq!(report.summary.added, 0);
629        assert_eq!(report.summary.removed, 0);
630    }
631
632    #[test]
633    fn test_diff_modified_pages() {
634        let ps = 16384u32;
635        let fsp = build_fsp_page(1, 1000, ps);
636        let idx_base = build_page(1, 17855, 2000, 1, ps);
637        let idx_current = build_page(1, 17855, 3000, 1, ps); // higher LSN
638
639        let base_file = write_tablespace(&[fsp.clone(), idx_base]);
640        let current_file = write_tablespace(&[fsp, idx_current]);
641
642        let mut base = Tablespace::open(base_file.path().to_str().unwrap()).unwrap();
643        let mut current = Tablespace::open(current_file.path().to_str().unwrap()).unwrap();
644
645        let report = diff_backup_lsn(&mut base, &mut current, "b", "c", true).unwrap();
646
647        assert_eq!(report.summary.modified, 1);
648        assert_eq!(report.summary.unchanged, 1); // FSP page same
649        assert_eq!(report.current_max_lsn, 3000);
650
651        let modified = report.pages.iter().find(|p| p.page_number == 1).unwrap();
652        assert_eq!(modified.status, PageChangeStatus::Modified);
653        assert_eq!(modified.base_lsn, Some(2000));
654        assert_eq!(modified.current_lsn, Some(3000));
655    }
656
657    #[test]
658    fn test_diff_grown_tablespace() {
659        let ps = 16384u32;
660        let fsp = build_fsp_page(1, 1000, ps);
661        let idx = build_page(1, 17855, 2000, 1, ps);
662        let extra = build_page(2, 17855, 3000, 1, ps);
663
664        let base_file = write_tablespace(&[fsp.clone(), idx.clone()]);
665        let current_file = write_tablespace(&[fsp, idx, extra]);
666
667        let mut base = Tablespace::open(base_file.path().to_str().unwrap()).unwrap();
668        let mut current = Tablespace::open(current_file.path().to_str().unwrap()).unwrap();
669
670        let report = diff_backup_lsn(&mut base, &mut current, "b", "c", false).unwrap();
671
672        assert_eq!(report.summary.added, 1);
673        assert_eq!(report.base_page_count, 2);
674        assert_eq!(report.current_page_count, 3);
675    }
676
677    #[test]
678    fn test_diff_shrunk_tablespace() {
679        let ps = 16384u32;
680        let fsp = build_fsp_page(1, 1000, ps);
681        let idx1 = build_page(1, 17855, 2000, 1, ps);
682        let idx2 = build_page(2, 17855, 3000, 1, ps);
683
684        let base_file = write_tablespace(&[fsp.clone(), idx1.clone(), idx2]);
685        let current_file = write_tablespace(&[fsp, idx1]);
686
687        let mut base = Tablespace::open(base_file.path().to_str().unwrap()).unwrap();
688        let mut current = Tablespace::open(current_file.path().to_str().unwrap()).unwrap();
689
690        let report = diff_backup_lsn(&mut base, &mut current, "b", "c", false).unwrap();
691
692        assert_eq!(report.summary.removed, 1);
693    }
694
695    #[test]
696    fn test_diff_space_id_mismatch() {
697        let ps = 16384u32;
698        let fsp1 = build_fsp_page(1, 1000, ps);
699        let fsp2 = build_fsp_page(2, 1000, ps); // different space_id
700
701        let base_file = write_tablespace(&[fsp1]);
702        let current_file = write_tablespace(&[fsp2]);
703
704        let mut base = Tablespace::open(base_file.path().to_str().unwrap()).unwrap();
705        let mut current = Tablespace::open(current_file.path().to_str().unwrap()).unwrap();
706
707        let result = diff_backup_lsn(&mut base, &mut current, "b", "c", false);
708        match result {
709            Err(IdbError::Argument(msg)) => assert!(msg.contains("Space ID mismatch")),
710            _ => panic!("Expected Argument error for space_id mismatch"),
711        }
712    }
713
714    #[test]
715    fn test_diff_regressed_lsn() {
716        let ps = 16384u32;
717        let fsp = build_fsp_page(1, 1000, ps);
718        let idx_base = build_page(1, 17855, 5000, 1, ps);
719        let idx_current = build_page(1, 17855, 2000, 1, ps); // lower LSN
720
721        let base_file = write_tablespace(&[fsp.clone(), idx_base]);
722        let current_file = write_tablespace(&[fsp, idx_current]);
723
724        let mut base = Tablespace::open(base_file.path().to_str().unwrap()).unwrap();
725        let mut current = Tablespace::open(current_file.path().to_str().unwrap()).unwrap();
726
727        let report = diff_backup_lsn(&mut base, &mut current, "b", "c", false).unwrap();
728
729        assert_eq!(report.summary.regressed, 1);
730    }
731
732    // -- XtraBackup checkpoint parsing tests --
733
734    #[test]
735    fn test_parse_xtrabackup_checkpoints() {
736        let content = "\
737backup_type = full-backuped
738from_lsn = 0
739to_lsn = 12345678
740last_lsn = 12345679
741compact = 0
742recover_binlog_info = 0
743";
744        let cp = parse_xtrabackup_checkpoints_str(content, Path::new("/backups/full")).unwrap();
745        assert_eq!(cp.backup_type, "full-backuped");
746        assert_eq!(cp.from_lsn, 0);
747        assert_eq!(cp.to_lsn, 12345678);
748        assert_eq!(cp.last_lsn, Some(12345679));
749        assert!(!cp.compact);
750    }
751
752    #[test]
753    fn test_parse_checkpoints_missing_field() {
754        let content = "backup_type = incremental\nfrom_lsn = 100\n";
755        let result = parse_xtrabackup_checkpoints_str(content, Path::new("/backups/inc1"));
756        match result {
757            Err(IdbError::Parse(msg)) => assert!(msg.contains("to_lsn")),
758            _ => panic!("Expected Parse error for missing to_lsn"),
759        }
760    }
761
762    // -- Chain validation tests --
763
764    #[test]
765    fn test_chain_valid() {
766        let backups = vec![
767            BackupCheckpoint {
768                path: PathBuf::from("/backups/full"),
769                backup_type: "full-backuped".to_string(),
770                from_lsn: 0,
771                to_lsn: 1000,
772                last_lsn: Some(1001),
773                compact: false,
774            },
775            BackupCheckpoint {
776                path: PathBuf::from("/backups/inc1"),
777                backup_type: "incremental".to_string(),
778                from_lsn: 1000,
779                to_lsn: 2000,
780                last_lsn: Some(2001),
781                compact: false,
782            },
783            BackupCheckpoint {
784                path: PathBuf::from("/backups/inc2"),
785                backup_type: "incremental".to_string(),
786                from_lsn: 2000,
787                to_lsn: 3000,
788                last_lsn: Some(3001),
789                compact: false,
790            },
791        ];
792
793        let report = validate_chain(Path::new("/backups"), backups).unwrap();
794        assert!(report.chain_valid);
795        assert!(report.has_full_backup);
796        assert!(report.anomalies.is_empty());
797        assert_eq!(report.total_lsn_range, Some((0, 3000)));
798    }
799
800    #[test]
801    fn test_chain_gap() {
802        let backups = vec![
803            BackupCheckpoint {
804                path: PathBuf::from("/backups/full"),
805                backup_type: "full-backuped".to_string(),
806                from_lsn: 0,
807                to_lsn: 1000,
808                last_lsn: None,
809                compact: false,
810            },
811            BackupCheckpoint {
812                path: PathBuf::from("/backups/inc1"),
813                backup_type: "incremental".to_string(),
814                from_lsn: 2000, // gap: 1000 < 2000
815                to_lsn: 3000,
816                last_lsn: None,
817                compact: false,
818            },
819        ];
820
821        let report = validate_chain(Path::new("/backups"), backups).unwrap();
822        assert!(!report.chain_valid);
823        assert_eq!(report.anomalies.len(), 1);
824        assert_eq!(report.anomalies[0].kind, ChainAnomalyKind::Gap);
825    }
826
827    #[test]
828    fn test_chain_no_full_backup() {
829        let backups = vec![BackupCheckpoint {
830            path: PathBuf::from("/backups/inc1"),
831            backup_type: "incremental".to_string(),
832            from_lsn: 1000,
833            to_lsn: 2000,
834            last_lsn: None,
835            compact: false,
836        }];
837
838        let report = validate_chain(Path::new("/backups"), backups).unwrap();
839        assert!(!report.chain_valid);
840        assert!(!report.has_full_backup);
841        assert!(report
842            .anomalies
843            .iter()
844            .any(|a| a.kind == ChainAnomalyKind::MissingFull));
845    }
846
847    #[test]
848    fn test_chain_overlap() {
849        let backups = vec![
850            BackupCheckpoint {
851                path: PathBuf::from("/backups/full"),
852                backup_type: "full-backuped".to_string(),
853                from_lsn: 0,
854                to_lsn: 2000,
855                last_lsn: None,
856                compact: false,
857            },
858            BackupCheckpoint {
859                path: PathBuf::from("/backups/inc1"),
860                backup_type: "incremental".to_string(),
861                from_lsn: 1500, // overlap: 2000 > 1500
862                to_lsn: 3000,
863                last_lsn: None,
864                compact: false,
865            },
866        ];
867
868        let report = validate_chain(Path::new("/backups"), backups).unwrap();
869        // Overlap is a warning, not chain-breaking
870        assert!(report.chain_valid);
871        assert!(report
872            .anomalies
873            .iter()
874            .any(|a| a.kind == ChainAnomalyKind::Overlap));
875    }
876}