Skip to main content

grafeo_engine/database/
backup.rs

1//! Incremental backup and point-in-time recovery.
2//!
3//! Provides `backup_full()`, `backup_incremental()`, and `restore_to_epoch()`
4//! APIs on [`GrafeoDB`](super::GrafeoDB). Full backups capture the entire
5//! database state; incremental backups export only the WAL records since the
6//! last backup. Recovery replays a chain of full + incremental backups to
7//! restore the database to any committed epoch.
8//!
9//! # Backup chain model
10//!
11//! ```text
12//! [Full Snapshot] -> [Incr 1] -> [Incr 2] -> ... -> [Incr N]
13//!   epoch 0-100      101-200     201-300              901-1000
14//! ```
15//!
16//! To restore to epoch 750: load full snapshot (epoch 100), replay
17//! incrementals 1-7, stop at epoch 750.
18
19use std::path::Path;
20
21use grafeo_common::types::EpochId;
22use grafeo_common::utils::error::{Error, Result};
23use serde::{Deserialize, Serialize};
24
25// ── Backup types ───────────────────────────────────────────────────
26
27/// The type of a backup segment.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
29#[non_exhaustive]
30pub enum BackupKind {
31    /// A full snapshot of the entire database.
32    Full,
33    /// WAL records since the last backup checkpoint.
34    Incremental,
35}
36
37/// Metadata for a single backup segment (full or incremental).
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BackupSegment {
40    /// Segment type.
41    pub kind: BackupKind,
42    /// File name (relative to backup directory).
43    pub filename: String,
44    /// Start epoch (inclusive).
45    pub start_epoch: EpochId,
46    /// End epoch (inclusive).
47    pub end_epoch: EpochId,
48    /// CRC-32 checksum of the segment file.
49    pub checksum: u32,
50    /// Size in bytes.
51    pub size_bytes: u64,
52    /// Timestamp when this backup was created (ms since UNIX epoch).
53    pub created_at_ms: u64,
54}
55
56/// Tracks the full backup chain for a database.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct BackupManifest {
59    /// Manifest format version.
60    pub version: u32,
61    /// Ordered list of backup segments (full first, then incrementals).
62    pub segments: Vec<BackupSegment>,
63}
64
65impl BackupManifest {
66    /// Creates a new empty manifest.
67    #[must_use]
68    pub fn new() -> Self {
69        Self {
70            version: 1,
71            segments: Vec::new(),
72        }
73    }
74
75    /// Returns the most recent full backup segment, if any.
76    #[must_use]
77    pub fn latest_full(&self) -> Option<&BackupSegment> {
78        self.segments
79            .iter()
80            .rev()
81            .find(|s| s.kind == BackupKind::Full)
82    }
83
84    /// Returns incremental segments after the given epoch, in order.
85    pub fn incrementals_after(&self, epoch: EpochId) -> Vec<&BackupSegment> {
86        self.segments
87            .iter()
88            .filter(|s| s.kind == BackupKind::Incremental && s.start_epoch > epoch)
89            .collect()
90    }
91
92    /// Returns the epoch range covered by this manifest.
93    #[must_use]
94    pub fn epoch_range(&self) -> Option<(EpochId, EpochId)> {
95        let first = self.segments.first()?;
96        let last = self.segments.last()?;
97        Some((first.start_epoch, last.end_epoch))
98    }
99}
100
101impl Default for BackupManifest {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107/// Tracks the WAL position of the last completed backup.
108///
109/// Persisted as `backup_cursor.meta` in the WAL directory.
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct BackupCursor {
112    /// The epoch up to which WAL records have been backed up.
113    pub backed_up_epoch: EpochId,
114    /// The WAL log sequence number at the time of the last backup.
115    pub log_sequence: u64,
116    /// Timestamp of the last backup.
117    pub timestamp_ms: u64,
118}
119
120// ── Manifest I/O ───────────────────────────────────────────────────
121
122const MANIFEST_FILENAME: &str = "backup_manifest.json";
123const BACKUP_CURSOR_FILENAME: &str = "backup_cursor.meta";
124
125/// Reads the backup manifest from a backup directory.
126///
127/// Returns `None` if no manifest exists.
128///
129/// # Errors
130///
131/// Returns an error if the manifest file exists but cannot be read or parsed.
132pub fn read_manifest(backup_dir: &Path) -> Result<Option<BackupManifest>> {
133    let path = backup_dir.join(MANIFEST_FILENAME);
134    if !path.exists() {
135        return Ok(None);
136    }
137    let data = std::fs::read(&path)
138        .map_err(|e| Error::Internal(format!("failed to read backup manifest: {e}")))?;
139    let (manifest, _): (BackupManifest, _) =
140        bincode::serde::decode_from_slice(&data, bincode::config::standard())
141            .map_err(|e| Error::Internal(format!("failed to parse backup manifest: {e}")))?;
142    Ok(Some(manifest))
143}
144
145/// Writes the backup manifest to a backup directory.
146///
147/// Uses write-to-temp-then-rename for atomicity.
148///
149/// # Errors
150///
151/// Returns an error if the manifest cannot be written.
152pub fn write_manifest(backup_dir: &Path, manifest: &BackupManifest) -> Result<()> {
153    std::fs::create_dir_all(backup_dir)
154        .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
155
156    let path = backup_dir.join(MANIFEST_FILENAME);
157    let temp_path = backup_dir.join(format!("{MANIFEST_FILENAME}.tmp"));
158
159    let data = bincode::serde::encode_to_vec(manifest, bincode::config::standard())
160        .map_err(|e| Error::Internal(format!("failed to serialize backup manifest: {e}")))?;
161
162    std::fs::write(&temp_path, data)
163        .map_err(|e| Error::Internal(format!("failed to write backup manifest: {e}")))?;
164    std::fs::rename(&temp_path, &path)
165        .map_err(|e| Error::Internal(format!("failed to finalize backup manifest: {e}")))?;
166
167    Ok(())
168}
169
170// ── Backup cursor I/O ──────────────────────────────────────────────
171
172/// Reads the backup cursor from a WAL directory.
173///
174/// Returns `None` if no cursor exists (no backup has been taken).
175///
176/// # Errors
177///
178/// Returns an error if the cursor file exists but cannot be read.
179pub fn read_backup_cursor(wal_dir: &Path) -> Result<Option<BackupCursor>> {
180    let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
181    if !path.exists() {
182        return Ok(None);
183    }
184    let data = std::fs::read(&path)
185        .map_err(|e| Error::Internal(format!("failed to read backup cursor: {e}")))?;
186    let cursor: BackupCursor =
187        bincode::serde::decode_from_slice(&data, bincode::config::standard())
188            .map(|(c, _)| c)
189            .map_err(|e| Error::Internal(format!("failed to parse backup cursor: {e}")))?;
190    Ok(Some(cursor))
191}
192
193/// Writes the backup cursor to a WAL directory.
194///
195/// Uses write-to-temp-then-rename for atomicity.
196///
197/// # Errors
198///
199/// Returns an error if the cursor cannot be written.
200pub fn write_backup_cursor(wal_dir: &Path, cursor: &BackupCursor) -> Result<()> {
201    let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
202    let temp_path = wal_dir.join(format!("{BACKUP_CURSOR_FILENAME}.tmp"));
203
204    let data = bincode::serde::encode_to_vec(cursor, bincode::config::standard())
205        .map_err(|e| Error::Internal(format!("failed to serialize backup cursor: {e}")))?;
206
207    std::fs::write(&temp_path, &data)
208        .map_err(|e| Error::Internal(format!("failed to write backup cursor: {e}")))?;
209    std::fs::rename(&temp_path, &path)
210        .map_err(|e| Error::Internal(format!("failed to finalize backup cursor: {e}")))?;
211
212    Ok(())
213}
214
215// ── Incremental backup file format ─────────────────────────────────
216
217/// Magic bytes for incremental backup files.
218pub const BACKUP_MAGIC: [u8; 4] = *b"GBAK";
219/// Current backup file version.
220pub const BACKUP_VERSION: u32 = 1;
221
222/// Header for an incremental backup file.
223///
224/// ```text
225/// [magic: 4 bytes "GBAK"]
226/// [version: u32 LE]
227/// [start_epoch: u64 LE]
228/// [end_epoch: u64 LE]
229/// [record_count: u64 LE]
230/// ... WAL frames ...
231/// ```
232pub const BACKUP_HEADER_SIZE: usize = 32;
233
234/// Writes the incremental backup file header.
235pub fn write_backup_header(
236    buf: &mut Vec<u8>,
237    start_epoch: EpochId,
238    end_epoch: EpochId,
239    record_count: u64,
240) {
241    buf.extend_from_slice(&BACKUP_MAGIC);
242    buf.extend_from_slice(&BACKUP_VERSION.to_le_bytes());
243    buf.extend_from_slice(&start_epoch.as_u64().to_le_bytes());
244    buf.extend_from_slice(&end_epoch.as_u64().to_le_bytes());
245    buf.extend_from_slice(&record_count.to_le_bytes());
246}
247
248/// Reads and validates the incremental backup file header.
249///
250/// Returns `(start_epoch, end_epoch, record_count)` on success.
251///
252/// # Errors
253///
254/// Returns an error if the header is invalid.
255///
256/// # Panics
257///
258/// Cannot panic: all slice indexing is bounds-checked by the length guard.
259pub fn read_backup_header(data: &[u8]) -> Result<(EpochId, EpochId, u64)> {
260    if data.len() < BACKUP_HEADER_SIZE {
261        return Err(Error::Internal(
262            "incremental backup file too short".to_string(),
263        ));
264    }
265    if data[0..4] != BACKUP_MAGIC {
266        return Err(Error::Internal(
267            "invalid backup file magic bytes".to_string(),
268        ));
269    }
270    let version = u32::from_le_bytes(data[4..8].try_into().unwrap());
271    if version > BACKUP_VERSION {
272        return Err(Error::Internal(format!(
273            "unsupported backup version {version}, max supported is {BACKUP_VERSION}"
274        )));
275    }
276    let start_epoch = EpochId::new(u64::from_le_bytes(data[8..16].try_into().unwrap()));
277    let end_epoch = EpochId::new(u64::from_le_bytes(data[16..24].try_into().unwrap()));
278    let record_count = u64::from_le_bytes(data[24..32].try_into().unwrap());
279    Ok((start_epoch, end_epoch, record_count))
280}
281
282/// Returns the timestamp in milliseconds since UNIX epoch.
283// reason: millis since UNIX epoch fits u64 for centuries
284#[allow(clippy::cast_possible_truncation)]
285pub(super) fn now_ms() -> u64 {
286    std::time::SystemTime::now()
287        .duration_since(std::time::UNIX_EPOCH)
288        .map_or(0, |d| d.as_millis() as u64)
289}
290
291// ── Backup operations (called from GrafeoDB) ───────────────────────
292
293use grafeo_storage::file::GrafeoFileManager;
294use grafeo_storage::wal::LpgWal;
295
296/// Creates a full backup by copying the .grafeo container file.
297///
298/// 1. Copies the container file to the backup directory via the locked handle.
299/// 2. Updates the manifest and backup cursor.
300///
301/// Uses [`GrafeoFileManager::copy_to`] instead of `std::fs::copy()` so the
302/// copy reads through the already-locked file handle. `std::fs::copy()` opens
303/// a new handle, which fails on Windows when an exclusive lock is held.
304///
305/// # Errors
306///
307/// Returns an error if the database has no file manager, or if I/O fails.
308pub(super) fn do_backup_full(
309    backup_dir: &Path,
310    fm: &GrafeoFileManager,
311    wal: Option<&LpgWal>,
312    current_epoch: EpochId,
313) -> Result<BackupSegment> {
314    std::fs::create_dir_all(backup_dir)
315        .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
316
317    // Determine backup filename
318    let mut manifest = read_manifest(backup_dir)?.unwrap_or_default();
319    let segment_idx = manifest.segments.len();
320    let filename = format!("backup_full_{segment_idx:04}.grafeo");
321    let dest_path = backup_dir.join(&filename);
322
323    // Copy the .grafeo file to the backup directory through the locked handle
324    fm.copy_to(&dest_path)?;
325
326    let file_size = std::fs::metadata(&dest_path).map_or(0, |m| m.len());
327    let file_data = std::fs::read(&dest_path)
328        .map_err(|e| Error::Internal(format!("failed to read backup file for checksum: {e}")))?;
329    let checksum = crc32fast::hash(&file_data);
330
331    let segment = BackupSegment {
332        kind: BackupKind::Full,
333        filename,
334        start_epoch: EpochId::new(0),
335        end_epoch: current_epoch,
336        checksum,
337        size_bytes: file_size,
338        created_at_ms: now_ms(),
339    };
340
341    manifest.segments.push(segment.clone());
342    write_manifest(backup_dir, &manifest)?;
343
344    // Update backup cursor in the WAL directory.
345    // Rotate the WAL so that post-backup writes land in a new file with a
346    // strictly greater sequence number. Without this, writes that append to
347    // the still-active log file are invisible to incremental backup, which
348    // skips files with seq <= cursor.log_sequence. (GrafeoDB/grafeo#267)
349    if let Some(wal) = wal {
350        // Record the sequence of the file that was active during this backup,
351        // then rotate so post-backup writes land in a new file with seq > this.
352        let backed_up_sequence = wal.current_sequence();
353        wal.rotate()
354            .map_err(|e| Error::Internal(format!("failed to rotate WAL after full backup: {e}")))?;
355        let cursor = BackupCursor {
356            backed_up_epoch: current_epoch,
357            log_sequence: backed_up_sequence,
358            timestamp_ms: now_ms(),
359        };
360        write_backup_cursor(wal.dir(), &cursor)?;
361    }
362
363    Ok(segment)
364}
365
366/// Creates an incremental backup containing WAL records since the last backup.
367///
368/// Reads WAL log files from the backup cursor's position forward, copies
369/// the raw frames into a backup segment file.
370///
371/// # Errors
372///
373/// Returns an error if no full backup exists, or if the WAL files have been
374/// truncated past the cursor.
375pub(super) fn do_backup_incremental(
376    backup_dir: &Path,
377    wal: &LpgWal,
378    current_epoch: EpochId,
379) -> Result<BackupSegment> {
380    let manifest = read_manifest(backup_dir)?.ok_or_else(|| {
381        Error::Internal("no backup manifest found; run a full backup first".to_string())
382    })?;
383
384    if manifest.latest_full().is_none() {
385        return Err(Error::Internal(
386            "no full backup in manifest; run a full backup first".to_string(),
387        ));
388    }
389
390    let cursor = read_backup_cursor(wal.dir())?.ok_or_else(|| {
391        Error::Internal("no backup cursor found; run a full backup first".to_string())
392    })?;
393
394    let log_files = wal.log_files()?;
395    if log_files.is_empty() {
396        return Err(Error::Internal("no WAL log files to backup".to_string()));
397    }
398
399    // Read WAL files from cursor position onward
400    let mut wal_data = Vec::new();
401    let mut record_count = 0u64;
402    // cursor.backed_up_epoch used for start_epoch calculation below
403
404    for file_path in &log_files {
405        let seq = file_path
406            .file_stem()
407            .and_then(|s| s.to_str())
408            .and_then(|s| s.strip_prefix("wal_"))
409            .and_then(|s| s.parse::<u64>().ok())
410            .unwrap_or(0);
411
412        // Skip files at or before the cursor: the cursor records the
413        // sequence number that was active at the time of the last backup,
414        // so we need files strictly after that sequence to avoid re-including
415        // already-backed-up frames from the active log.
416        if seq <= cursor.log_sequence {
417            continue;
418        }
419
420        let file_bytes = std::fs::read(file_path).map_err(|e| {
421            Error::Internal(format!(
422                "failed to read WAL file {}: {e}",
423                file_path.display()
424            ))
425        })?;
426
427        if !file_bytes.is_empty() {
428            wal_data.extend_from_slice(&file_bytes);
429            // Count records by scanning for frame markers (rough count)
430            // Exact count would require parsing, but we record approximate
431            record_count += 1; // Per-file approximation
432        }
433    }
434
435    if wal_data.is_empty() {
436        return Err(Error::Internal(
437            "no new WAL records since last backup".to_string(),
438        ));
439    }
440
441    let start_epoch = EpochId::new(cursor.backed_up_epoch.as_u64() + 1);
442    let end_epoch = current_epoch;
443
444    // Write incremental backup file
445    let segment_idx = manifest.segments.len();
446    let filename = format!("backup_incr_{segment_idx:04}.wal");
447    let dest_path = backup_dir.join(&filename);
448
449    let mut output = Vec::new();
450    write_backup_header(&mut output, start_epoch, end_epoch, record_count);
451    output.extend_from_slice(&wal_data);
452
453    std::fs::write(&dest_path, &output)
454        .map_err(|e| Error::Internal(format!("failed to write incremental backup: {e}")))?;
455
456    let checksum = crc32fast::hash(&output);
457    let segment = BackupSegment {
458        kind: BackupKind::Incremental,
459        filename,
460        start_epoch,
461        end_epoch,
462        checksum,
463        size_bytes: output.len() as u64,
464        created_at_ms: now_ms(),
465    };
466
467    // Update manifest
468    let mut manifest = manifest;
469    manifest.segments.push(segment.clone());
470    write_manifest(backup_dir, &manifest)?;
471
472    // Rotate the WAL so subsequent incremental backups see a clean boundary.
473    // Same rationale as in do_backup_full (GrafeoDB/grafeo#267).
474    let backed_up_sequence = wal.current_sequence();
475    wal.rotate().map_err(|e| {
476        Error::Internal(format!(
477            "failed to rotate WAL after incremental backup: {e}"
478        ))
479    })?;
480
481    // Update backup cursor
482    let new_cursor = BackupCursor {
483        backed_up_epoch: current_epoch,
484        log_sequence: backed_up_sequence,
485        timestamp_ms: now_ms(),
486    };
487    write_backup_cursor(wal.dir(), &new_cursor)?;
488
489    Ok(segment)
490}
491
492// ── Restore ────────────────────────────────────────────────────────
493
494/// Restores a database to a specific epoch from a backup chain.
495///
496/// 1. Finds the most recent full backup with `end_epoch <= target_epoch`.
497/// 2. Opens the full backup as a GrafeoDB (via file manager).
498/// 3. Replays incremental segments up to `target_epoch` using epoch-bounded
499///    WAL recovery.
500///
501/// # Errors
502///
503/// Returns an error if the backup chain does not cover the target epoch,
504/// if segment checksums fail, or if I/O fails.
505pub(super) fn do_restore_to_epoch(
506    backup_dir: &Path,
507    target_epoch: EpochId,
508    output_path: &Path,
509) -> Result<()> {
510    let manifest = read_manifest(backup_dir)?
511        .ok_or_else(|| Error::Internal("no backup manifest found".to_string()))?;
512
513    // Find the best full backup (latest one that doesn't exceed target)
514    let full = manifest
515        .segments
516        .iter()
517        .rfind(|s| s.kind == BackupKind::Full && s.end_epoch <= target_epoch)
518        .ok_or_else(|| {
519            Error::Internal(format!(
520                "no full backup covers epoch {}",
521                target_epoch.as_u64()
522            ))
523        })?;
524
525    // Copy full backup to output path
526    let full_path = backup_dir.join(&full.filename);
527    std::fs::copy(&full_path, output_path)
528        .map_err(|e| Error::Internal(format!("failed to copy full backup to output: {e}")))?;
529
530    // Find incremental segments that cover (full.end_epoch, target_epoch]
531    let incrementals: Vec<&BackupSegment> = manifest
532        .segments
533        .iter()
534        .filter(|s| {
535            s.kind == BackupKind::Incremental
536                && s.start_epoch > full.end_epoch
537                && s.start_epoch <= target_epoch
538        })
539        .collect();
540
541    if incrementals.is_empty() {
542        // Full backup already covers the target epoch
543        return Ok(());
544    }
545
546    // Create a temporary WAL directory for replay
547    let wal_dir = output_path.parent().unwrap_or(Path::new(".")).join(format!(
548        "{}.restore_wal",
549        output_path
550            .file_name()
551            .and_then(|n| n.to_str())
552            .unwrap_or("db")
553    ));
554    std::fs::create_dir_all(&wal_dir)
555        .map_err(|e| Error::Internal(format!("failed to create restore WAL directory: {e}")))?;
556
557    // Write incremental WAL data to temp WAL files for recovery
558    for (i, incr) in incrementals.iter().enumerate() {
559        let incr_path = backup_dir.join(&incr.filename);
560        let incr_data = std::fs::read(&incr_path).map_err(|e| {
561            Error::Internal(format!(
562                "failed to read incremental backup {}: {e}",
563                incr.filename
564            ))
565        })?;
566
567        // Validate checksum
568        let actual_crc = crc32fast::hash(&incr_data);
569        if actual_crc != incr.checksum {
570            return Err(Error::Internal(format!(
571                "incremental backup {} CRC mismatch: expected {:08x}, got {actual_crc:08x}",
572                incr.filename, incr.checksum,
573            )));
574        }
575
576        // Skip the backup header, write the raw WAL frames to a temp log file
577        if incr_data.len() > BACKUP_HEADER_SIZE {
578            let wal_frames = &incr_data[BACKUP_HEADER_SIZE..];
579            let wal_file = wal_dir.join(format!("wal_{i:08}.log"));
580            std::fs::write(&wal_file, wal_frames).map_err(|e| {
581                Error::Internal(format!("failed to write WAL file for restore: {e}"))
582            })?;
583        }
584    }
585
586    // Recover WAL records up to target epoch, then write a trimmed WAL
587    // that contains only records within the epoch boundary. This ensures
588    // that when GrafeoDB::open() replays the sidecar WAL, it does not
589    // advance beyond the target epoch.
590    let recovery = grafeo_storage::wal::WalRecovery::new(&wal_dir);
591    let records = recovery.recover_until_epoch(target_epoch)?;
592
593    // Write a single trimmed WAL file containing only the bounded records
594    let trimmed_dir = wal_dir.parent().unwrap_or(Path::new(".")).join(format!(
595        "{}.trimmed_wal",
596        wal_dir
597            .file_name()
598            .and_then(|n| n.to_str())
599            .unwrap_or("wal")
600    ));
601    std::fs::create_dir_all(&trimmed_dir)
602        .map_err(|e| Error::Internal(format!("failed to create trimmed WAL directory: {e}")))?;
603
604    if !records.is_empty() {
605        use grafeo_storage::wal::{LpgWal, WalConfig};
606        let trimmed_wal = LpgWal::with_config(&trimmed_dir, WalConfig::default())?;
607        for record in &records {
608            trimmed_wal.log(record)?;
609        }
610        trimmed_wal.flush()?;
611        drop(trimmed_wal);
612    }
613
614    // Remove the original (untrimmed) restore WAL directory
615    std::fs::remove_dir_all(&wal_dir)
616        .map_err(|e| Error::Internal(format!("failed to remove restore WAL directory: {e}")))?;
617
618    // Move the trimmed WAL to the sidecar location
619    let sidecar_dir = format!("{}.wal", output_path.display());
620    let sidecar_path = std::path::Path::new(&sidecar_dir);
621    if sidecar_path.exists() {
622        std::fs::remove_dir_all(sidecar_path)
623            .map_err(|e| Error::Internal(format!("failed to remove existing sidecar WAL: {e}")))?;
624    }
625    std::fs::rename(&trimmed_dir, sidecar_path)
626        .map_err(|e| Error::Internal(format!("failed to move WAL to sidecar location: {e}")))?;
627
628    Ok(())
629}
630
631// ── Tests ──────────────────────────────────────────────────────────
632
633#[cfg(test)]
634mod tests {
635    use super::*;
636    use tempfile::TempDir;
637
638    #[test]
639    fn test_manifest_new() {
640        let manifest = BackupManifest::new();
641        assert_eq!(manifest.version, 1);
642        assert!(manifest.segments.is_empty());
643        assert!(manifest.latest_full().is_none());
644        assert!(manifest.epoch_range().is_none());
645    }
646
647    #[test]
648    fn test_manifest_with_segments() {
649        let mut manifest = BackupManifest::new();
650        manifest.segments.push(BackupSegment {
651            kind: BackupKind::Full,
652            filename: "backup_full_0000.grafeo".to_string(),
653            start_epoch: EpochId::new(0),
654            end_epoch: EpochId::new(100),
655            checksum: 12345,
656            size_bytes: 1024,
657            created_at_ms: 1000,
658        });
659        manifest.segments.push(BackupSegment {
660            kind: BackupKind::Incremental,
661            filename: "backup_incr_0001.wal".to_string(),
662            start_epoch: EpochId::new(101),
663            end_epoch: EpochId::new(200),
664            checksum: 67890,
665            size_bytes: 256,
666            created_at_ms: 2000,
667        });
668
669        let full = manifest.latest_full().unwrap();
670        assert_eq!(full.end_epoch, EpochId::new(100));
671
672        let incrs = manifest.incrementals_after(EpochId::new(100));
673        assert_eq!(incrs.len(), 1);
674        assert_eq!(incrs[0].start_epoch, EpochId::new(101));
675
676        let (start, end) = manifest.epoch_range().unwrap();
677        assert_eq!(start, EpochId::new(0));
678        assert_eq!(end, EpochId::new(200));
679    }
680
681    #[test]
682    fn test_manifest_round_trip() {
683        let dir = TempDir::new().unwrap();
684        let mut manifest = BackupManifest::new();
685        manifest.segments.push(BackupSegment {
686            kind: BackupKind::Full,
687            filename: "test.grafeo".to_string(),
688            start_epoch: EpochId::new(0),
689            end_epoch: EpochId::new(50),
690            checksum: 0,
691            size_bytes: 512,
692            created_at_ms: 0,
693        });
694
695        write_manifest(dir.path(), &manifest).unwrap();
696        let loaded = read_manifest(dir.path()).unwrap().unwrap();
697        assert_eq!(loaded.segments.len(), 1);
698        assert_eq!(loaded.segments[0].filename, "test.grafeo");
699    }
700
701    #[test]
702    fn test_manifest_not_found() {
703        let dir = TempDir::new().unwrap();
704        assert!(read_manifest(dir.path()).unwrap().is_none());
705    }
706
707    #[test]
708    fn test_backup_cursor_round_trip() {
709        let dir = TempDir::new().unwrap();
710        let cursor = BackupCursor {
711            backed_up_epoch: EpochId::new(42),
712            log_sequence: 7,
713            timestamp_ms: 12345,
714        };
715
716        write_backup_cursor(dir.path(), &cursor).unwrap();
717        let loaded = read_backup_cursor(dir.path()).unwrap().unwrap();
718        assert_eq!(loaded.backed_up_epoch, EpochId::new(42));
719        assert_eq!(loaded.log_sequence, 7);
720        assert_eq!(loaded.timestamp_ms, 12345);
721    }
722
723    #[test]
724    fn test_backup_cursor_not_found() {
725        let dir = TempDir::new().unwrap();
726        assert!(read_backup_cursor(dir.path()).unwrap().is_none());
727    }
728
729    #[test]
730    fn test_backup_header_round_trip() {
731        let mut buf = Vec::new();
732        write_backup_header(&mut buf, EpochId::new(101), EpochId::new(200), 500);
733        assert_eq!(buf.len(), BACKUP_HEADER_SIZE);
734
735        let (start, end, count) = read_backup_header(&buf).unwrap();
736        assert_eq!(start, EpochId::new(101));
737        assert_eq!(end, EpochId::new(200));
738        assert_eq!(count, 500);
739    }
740
741    #[test]
742    fn test_backup_header_invalid_magic() {
743        let data = vec![
744            0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
745            0, 0, 0, 0, 0, 0, 0,
746        ];
747        assert!(read_backup_header(&data).is_err());
748    }
749
750    #[test]
751    fn test_backup_header_too_short() {
752        let data = vec![0, 0, 0, 0];
753        assert!(read_backup_header(&data).is_err());
754    }
755
756    #[test]
757    fn test_backup_kind_serialization() {
758        let config = bincode::config::standard();
759        let encoded = bincode::serde::encode_to_vec(BackupKind::Full, config).unwrap();
760        let (parsed, _): (BackupKind, _) =
761            bincode::serde::decode_from_slice(&encoded, config).unwrap();
762        assert_eq!(parsed, BackupKind::Full);
763    }
764}