Skip to main content

grafeo_engine/database/
backup.rs

1//! Incremental backup and point-in-time recovery.
2//!
3//! Provides `backup_full()`, `backup_incremental()`, and `restore_to_epoch()`
4//! APIs on [`GrafeoDB`](super::GrafeoDB). Full backups capture the entire
5//! database state; incremental backups export only the WAL records since the
6//! last backup. Recovery replays a chain of full + incremental backups to
7//! restore the database to any committed epoch.
8//!
9//! # Backup chain model
10//!
11//! ```text
12//! [Full Snapshot] -> [Incr 1] -> [Incr 2] -> ... -> [Incr N]
13//!   epoch 0-100      101-200     201-300              901-1000
14//! ```
15//!
16//! To restore to epoch 750: load full snapshot (epoch 100), replay
17//! incrementals 1-7, stop at epoch 750.
18
19use std::path::Path;
20
21use grafeo_common::types::EpochId;
22use grafeo_common::utils::error::{Error, Result};
23use serde::{Deserialize, Serialize};
24
25// ── Backup types ───────────────────────────────────────────────────
26
27/// The type of a backup segment.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
29#[non_exhaustive]
30pub enum BackupKind {
31    /// A full snapshot of the entire database.
32    Full,
33    /// WAL records since the last backup checkpoint.
34    Incremental,
35}
36
37/// Metadata for a single backup segment (full or incremental).
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BackupSegment {
40    /// Segment type.
41    pub kind: BackupKind,
42    /// File name (relative to backup directory).
43    pub filename: String,
44    /// Start epoch (inclusive).
45    pub start_epoch: EpochId,
46    /// End epoch (inclusive).
47    pub end_epoch: EpochId,
48    /// CRC-32 checksum of the segment file.
49    pub checksum: u32,
50    /// Size in bytes.
51    pub size_bytes: u64,
52    /// Timestamp when this backup was created (ms since UNIX epoch).
53    pub created_at_ms: u64,
54}
55
56/// Tracks the full backup chain for a database.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct BackupManifest {
59    /// Manifest format version.
60    pub version: u32,
61    /// Ordered list of backup segments (full first, then incrementals).
62    pub segments: Vec<BackupSegment>,
63}
64
65impl BackupManifest {
66    /// Creates a new empty manifest.
67    #[must_use]
68    pub fn new() -> Self {
69        Self {
70            version: 1,
71            segments: Vec::new(),
72        }
73    }
74
75    /// Returns the most recent full backup segment, if any.
76    #[must_use]
77    pub fn latest_full(&self) -> Option<&BackupSegment> {
78        self.segments
79            .iter()
80            .rev()
81            .find(|s| s.kind == BackupKind::Full)
82    }
83
84    /// Returns incremental segments after the given epoch, in order.
85    pub fn incrementals_after(&self, epoch: EpochId) -> Vec<&BackupSegment> {
86        self.segments
87            .iter()
88            .filter(|s| s.kind == BackupKind::Incremental && s.start_epoch > epoch)
89            .collect()
90    }
91
92    /// Returns the epoch range covered by this manifest.
93    #[must_use]
94    pub fn epoch_range(&self) -> Option<(EpochId, EpochId)> {
95        let first = self.segments.first()?;
96        let last = self.segments.last()?;
97        Some((first.start_epoch, last.end_epoch))
98    }
99}
100
101impl Default for BackupManifest {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107/// Tracks the WAL position of the last completed backup.
108///
109/// Persisted as `backup_cursor.meta` in the WAL directory.
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct BackupCursor {
112    /// The epoch up to which WAL records have been backed up.
113    pub backed_up_epoch: EpochId,
114    /// The WAL log sequence number at the time of the last backup.
115    pub log_sequence: u64,
116    /// Timestamp of the last backup.
117    pub timestamp_ms: u64,
118}
119
120// ── Manifest I/O ───────────────────────────────────────────────────
121
122const MANIFEST_FILENAME: &str = "backup_manifest.json";
123const BACKUP_CURSOR_FILENAME: &str = "backup_cursor.meta";
124
125/// Reads the backup manifest from a backup directory.
126///
127/// Returns `None` if no manifest exists.
128///
129/// # Errors
130///
131/// Returns an error if the manifest file exists but cannot be read or parsed.
132pub fn read_manifest(backup_dir: &Path) -> Result<Option<BackupManifest>> {
133    let path = backup_dir.join(MANIFEST_FILENAME);
134    if !path.exists() {
135        return Ok(None);
136    }
137    let data = std::fs::read(&path)
138        .map_err(|e| Error::Internal(format!("failed to read backup manifest: {e}")))?;
139    let (manifest, _): (BackupManifest, _) =
140        bincode::serde::decode_from_slice(&data, bincode::config::standard())
141            .map_err(|e| Error::Internal(format!("failed to parse backup manifest: {e}")))?;
142    Ok(Some(manifest))
143}
144
145/// Writes the backup manifest to a backup directory.
146///
147/// Uses write-to-temp-then-rename for atomicity.
148///
149/// # Errors
150///
151/// Returns an error if the manifest cannot be written.
152pub fn write_manifest(backup_dir: &Path, manifest: &BackupManifest) -> Result<()> {
153    std::fs::create_dir_all(backup_dir)
154        .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
155
156    let path = backup_dir.join(MANIFEST_FILENAME);
157    let temp_path = backup_dir.join(format!("{MANIFEST_FILENAME}.tmp"));
158
159    let data = bincode::serde::encode_to_vec(manifest, bincode::config::standard())
160        .map_err(|e| Error::Internal(format!("failed to serialize backup manifest: {e}")))?;
161
162    std::fs::write(&temp_path, data)
163        .map_err(|e| Error::Internal(format!("failed to write backup manifest: {e}")))?;
164    std::fs::rename(&temp_path, &path)
165        .map_err(|e| Error::Internal(format!("failed to finalize backup manifest: {e}")))?;
166
167    Ok(())
168}
169
170// ── Backup cursor I/O ──────────────────────────────────────────────
171
172/// Reads the backup cursor from a WAL directory.
173///
174/// Returns `None` if no cursor exists (no backup has been taken).
175///
176/// # Errors
177///
178/// Returns an error if the cursor file exists but cannot be read.
179pub fn read_backup_cursor(wal_dir: &Path) -> Result<Option<BackupCursor>> {
180    let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
181    if !path.exists() {
182        return Ok(None);
183    }
184    let data = std::fs::read(&path)
185        .map_err(|e| Error::Internal(format!("failed to read backup cursor: {e}")))?;
186    let cursor: BackupCursor =
187        bincode::serde::decode_from_slice(&data, bincode::config::standard())
188            .map(|(c, _)| c)
189            .map_err(|e| Error::Internal(format!("failed to parse backup cursor: {e}")))?;
190    Ok(Some(cursor))
191}
192
193/// Writes the backup cursor to a WAL directory.
194///
195/// Uses write-to-temp-then-rename for atomicity.
196///
197/// # Errors
198///
199/// Returns an error if the cursor cannot be written.
200pub fn write_backup_cursor(wal_dir: &Path, cursor: &BackupCursor) -> Result<()> {
201    let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
202    let temp_path = wal_dir.join(format!("{BACKUP_CURSOR_FILENAME}.tmp"));
203
204    let data = bincode::serde::encode_to_vec(cursor, bincode::config::standard())
205        .map_err(|e| Error::Internal(format!("failed to serialize backup cursor: {e}")))?;
206
207    std::fs::write(&temp_path, &data)
208        .map_err(|e| Error::Internal(format!("failed to write backup cursor: {e}")))?;
209    std::fs::rename(&temp_path, &path)
210        .map_err(|e| Error::Internal(format!("failed to finalize backup cursor: {e}")))?;
211
212    Ok(())
213}
214
215// ── Incremental backup file format ─────────────────────────────────
216
217/// Magic bytes for incremental backup files.
218pub const BACKUP_MAGIC: [u8; 4] = *b"GBAK";
219/// Current backup file version.
220pub const BACKUP_VERSION: u32 = 1;
221
222/// Header for an incremental backup file.
223///
224/// ```text
225/// [magic: 4 bytes "GBAK"]
226/// [version: u32 LE]
227/// [start_epoch: u64 LE]
228/// [end_epoch: u64 LE]
229/// [record_count: u64 LE]
230/// ... WAL frames ...
231/// ```
232pub const BACKUP_HEADER_SIZE: usize = 32;
233
234/// Writes the incremental backup file header.
235pub fn write_backup_header(
236    buf: &mut Vec<u8>,
237    start_epoch: EpochId,
238    end_epoch: EpochId,
239    record_count: u64,
240) {
241    buf.extend_from_slice(&BACKUP_MAGIC);
242    buf.extend_from_slice(&BACKUP_VERSION.to_le_bytes());
243    buf.extend_from_slice(&start_epoch.as_u64().to_le_bytes());
244    buf.extend_from_slice(&end_epoch.as_u64().to_le_bytes());
245    buf.extend_from_slice(&record_count.to_le_bytes());
246}
247
248/// Reads and validates the incremental backup file header.
249///
250/// Returns `(start_epoch, end_epoch, record_count)` on success.
251///
252/// # Errors
253///
254/// Returns an error if the header is invalid.
255///
256/// # Panics
257///
258/// Cannot panic: all slice indexing is bounds-checked by the length guard.
259pub fn read_backup_header(data: &[u8]) -> Result<(EpochId, EpochId, u64)> {
260    if data.len() < BACKUP_HEADER_SIZE {
261        return Err(Error::Internal(
262            "incremental backup file too short".to_string(),
263        ));
264    }
265    if data[0..4] != BACKUP_MAGIC {
266        return Err(Error::Internal(
267            "invalid backup file magic bytes".to_string(),
268        ));
269    }
270    let version = u32::from_le_bytes(data[4..8].try_into().unwrap());
271    if version > BACKUP_VERSION {
272        return Err(Error::Internal(format!(
273            "unsupported backup version {version}, max supported is {BACKUP_VERSION}"
274        )));
275    }
276    let start_epoch = EpochId::new(u64::from_le_bytes(data[8..16].try_into().unwrap()));
277    let end_epoch = EpochId::new(u64::from_le_bytes(data[16..24].try_into().unwrap()));
278    let record_count = u64::from_le_bytes(data[24..32].try_into().unwrap());
279    Ok((start_epoch, end_epoch, record_count))
280}
281
282/// Returns the timestamp in milliseconds since UNIX epoch.
283pub(super) fn now_ms() -> u64 {
284    std::time::SystemTime::now()
285        .duration_since(std::time::UNIX_EPOCH)
286        .map(|d| d.as_millis() as u64)
287        .unwrap_or(0)
288}
289
290// ── Backup operations (called from GrafeoDB) ───────────────────────
291
292use grafeo_storage::file::GrafeoFileManager;
293use grafeo_storage::wal::LpgWal;
294
295/// Creates a full backup by copying the .grafeo container file.
296///
297/// 1. Copies the container file to the backup directory via the locked handle.
298/// 2. Updates the manifest and backup cursor.
299///
300/// Uses [`GrafeoFileManager::copy_to`] instead of `std::fs::copy()` so the
301/// copy reads through the already-locked file handle. `std::fs::copy()` opens
302/// a new handle, which fails on Windows when an exclusive lock is held.
303///
304/// # Errors
305///
306/// Returns an error if the database has no file manager, or if I/O fails.
307pub(super) fn do_backup_full(
308    backup_dir: &Path,
309    fm: &GrafeoFileManager,
310    wal: Option<&LpgWal>,
311    current_epoch: EpochId,
312) -> Result<BackupSegment> {
313    std::fs::create_dir_all(backup_dir)
314        .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
315
316    // Determine backup filename
317    let mut manifest = read_manifest(backup_dir)?.unwrap_or_default();
318    let segment_idx = manifest.segments.len();
319    let filename = format!("backup_full_{segment_idx:04}.grafeo");
320    let dest_path = backup_dir.join(&filename);
321
322    // Copy the .grafeo file to the backup directory through the locked handle
323    fm.copy_to(&dest_path)?;
324
325    let file_size = std::fs::metadata(&dest_path).map(|m| m.len()).unwrap_or(0);
326    let file_data = std::fs::read(&dest_path)
327        .map_err(|e| Error::Internal(format!("failed to read backup file for checksum: {e}")))?;
328    let checksum = crc32fast::hash(&file_data);
329
330    let segment = BackupSegment {
331        kind: BackupKind::Full,
332        filename,
333        start_epoch: EpochId::new(0),
334        end_epoch: current_epoch,
335        checksum,
336        size_bytes: file_size,
337        created_at_ms: now_ms(),
338    };
339
340    manifest.segments.push(segment.clone());
341    write_manifest(backup_dir, &manifest)?;
342
343    // Update backup cursor in the WAL directory.
344    // Rotate the WAL so that post-backup writes land in a new file with a
345    // strictly greater sequence number. Without this, writes that append to
346    // the still-active log file are invisible to incremental backup, which
347    // skips files with seq <= cursor.log_sequence. (GrafeoDB/grafeo#267)
348    if let Some(wal) = wal {
349        // Record the sequence of the file that was active during this backup,
350        // then rotate so post-backup writes land in a new file with seq > this.
351        let backed_up_sequence = wal.current_sequence();
352        wal.rotate()
353            .map_err(|e| Error::Internal(format!("failed to rotate WAL after full backup: {e}")))?;
354        let cursor = BackupCursor {
355            backed_up_epoch: current_epoch,
356            log_sequence: backed_up_sequence,
357            timestamp_ms: now_ms(),
358        };
359        write_backup_cursor(wal.dir(), &cursor)?;
360    }
361
362    Ok(segment)
363}
364
365/// Creates an incremental backup containing WAL records since the last backup.
366///
367/// Reads WAL log files from the backup cursor's position forward, copies
368/// the raw frames into a backup segment file.
369///
370/// # Errors
371///
372/// Returns an error if no full backup exists, or if the WAL files have been
373/// truncated past the cursor.
374pub(super) fn do_backup_incremental(
375    backup_dir: &Path,
376    wal: &LpgWal,
377    current_epoch: EpochId,
378) -> Result<BackupSegment> {
379    let manifest = read_manifest(backup_dir)?.ok_or_else(|| {
380        Error::Internal("no backup manifest found; run a full backup first".to_string())
381    })?;
382
383    if manifest.latest_full().is_none() {
384        return Err(Error::Internal(
385            "no full backup in manifest; run a full backup first".to_string(),
386        ));
387    }
388
389    let cursor = read_backup_cursor(wal.dir())?.ok_or_else(|| {
390        Error::Internal("no backup cursor found; run a full backup first".to_string())
391    })?;
392
393    let log_files = wal.log_files()?;
394    if log_files.is_empty() {
395        return Err(Error::Internal("no WAL log files to backup".to_string()));
396    }
397
398    // Read WAL files from cursor position onward
399    let mut wal_data = Vec::new();
400    let mut record_count = 0u64;
401    // cursor.backed_up_epoch used for start_epoch calculation below
402
403    for file_path in &log_files {
404        let seq = file_path
405            .file_stem()
406            .and_then(|s| s.to_str())
407            .and_then(|s| s.strip_prefix("wal_"))
408            .and_then(|s| s.parse::<u64>().ok())
409            .unwrap_or(0);
410
411        // Skip files at or before the cursor: the cursor records the
412        // sequence number that was active at the time of the last backup,
413        // so we need files strictly after that sequence to avoid re-including
414        // already-backed-up frames from the active log.
415        if seq <= cursor.log_sequence {
416            continue;
417        }
418
419        let file_bytes = std::fs::read(file_path).map_err(|e| {
420            Error::Internal(format!(
421                "failed to read WAL file {}: {e}",
422                file_path.display()
423            ))
424        })?;
425
426        if !file_bytes.is_empty() {
427            wal_data.extend_from_slice(&file_bytes);
428            // Count records by scanning for frame markers (rough count)
429            // Exact count would require parsing, but we record approximate
430            record_count += 1; // Per-file approximation
431        }
432    }
433
434    if wal_data.is_empty() {
435        return Err(Error::Internal(
436            "no new WAL records since last backup".to_string(),
437        ));
438    }
439
440    let start_epoch = EpochId::new(cursor.backed_up_epoch.as_u64() + 1);
441    let end_epoch = current_epoch;
442
443    // Write incremental backup file
444    let segment_idx = manifest.segments.len();
445    let filename = format!("backup_incr_{segment_idx:04}.wal");
446    let dest_path = backup_dir.join(&filename);
447
448    let mut output = Vec::new();
449    write_backup_header(&mut output, start_epoch, end_epoch, record_count);
450    output.extend_from_slice(&wal_data);
451
452    std::fs::write(&dest_path, &output)
453        .map_err(|e| Error::Internal(format!("failed to write incremental backup: {e}")))?;
454
455    let checksum = crc32fast::hash(&output);
456    let segment = BackupSegment {
457        kind: BackupKind::Incremental,
458        filename,
459        start_epoch,
460        end_epoch,
461        checksum,
462        size_bytes: output.len() as u64,
463        created_at_ms: now_ms(),
464    };
465
466    // Update manifest
467    let mut manifest = manifest;
468    manifest.segments.push(segment.clone());
469    write_manifest(backup_dir, &manifest)?;
470
471    // Rotate the WAL so subsequent incremental backups see a clean boundary.
472    // Same rationale as in do_backup_full (GrafeoDB/grafeo#267).
473    let backed_up_sequence = wal.current_sequence();
474    wal.rotate().map_err(|e| {
475        Error::Internal(format!(
476            "failed to rotate WAL after incremental backup: {e}"
477        ))
478    })?;
479
480    // Update backup cursor
481    let new_cursor = BackupCursor {
482        backed_up_epoch: current_epoch,
483        log_sequence: backed_up_sequence,
484        timestamp_ms: now_ms(),
485    };
486    write_backup_cursor(wal.dir(), &new_cursor)?;
487
488    Ok(segment)
489}
490
491// ── Restore ────────────────────────────────────────────────────────
492
493/// Restores a database to a specific epoch from a backup chain.
494///
495/// 1. Finds the most recent full backup with `end_epoch <= target_epoch`.
496/// 2. Opens the full backup as a GrafeoDB (via file manager).
497/// 3. Replays incremental segments up to `target_epoch` using epoch-bounded
498///    WAL recovery.
499///
500/// # Errors
501///
502/// Returns an error if the backup chain does not cover the target epoch,
503/// if segment checksums fail, or if I/O fails.
504pub(super) fn do_restore_to_epoch(
505    backup_dir: &Path,
506    target_epoch: EpochId,
507    output_path: &Path,
508) -> Result<()> {
509    let manifest = read_manifest(backup_dir)?
510        .ok_or_else(|| Error::Internal("no backup manifest found".to_string()))?;
511
512    // Find the best full backup (latest one that doesn't exceed target)
513    let full = manifest
514        .segments
515        .iter()
516        .rfind(|s| s.kind == BackupKind::Full && s.end_epoch <= target_epoch)
517        .ok_or_else(|| {
518            Error::Internal(format!(
519                "no full backup covers epoch {}",
520                target_epoch.as_u64()
521            ))
522        })?;
523
524    // Copy full backup to output path
525    let full_path = backup_dir.join(&full.filename);
526    std::fs::copy(&full_path, output_path)
527        .map_err(|e| Error::Internal(format!("failed to copy full backup to output: {e}")))?;
528
529    // Find incremental segments that cover (full.end_epoch, target_epoch]
530    let incrementals: Vec<&BackupSegment> = manifest
531        .segments
532        .iter()
533        .filter(|s| {
534            s.kind == BackupKind::Incremental
535                && s.start_epoch > full.end_epoch
536                && s.start_epoch <= target_epoch
537        })
538        .collect();
539
540    if incrementals.is_empty() {
541        // Full backup already covers the target epoch
542        return Ok(());
543    }
544
545    // Create a temporary WAL directory for replay
546    let wal_dir = output_path.parent().unwrap_or(Path::new(".")).join(format!(
547        "{}.restore_wal",
548        output_path
549            .file_name()
550            .and_then(|n| n.to_str())
551            .unwrap_or("db")
552    ));
553    std::fs::create_dir_all(&wal_dir)
554        .map_err(|e| Error::Internal(format!("failed to create restore WAL directory: {e}")))?;
555
556    // Write incremental WAL data to temp WAL files for recovery
557    for (i, incr) in incrementals.iter().enumerate() {
558        let incr_path = backup_dir.join(&incr.filename);
559        let incr_data = std::fs::read(&incr_path).map_err(|e| {
560            Error::Internal(format!(
561                "failed to read incremental backup {}: {e}",
562                incr.filename
563            ))
564        })?;
565
566        // Validate checksum
567        let actual_crc = crc32fast::hash(&incr_data);
568        if actual_crc != incr.checksum {
569            return Err(Error::Internal(format!(
570                "incremental backup {} CRC mismatch: expected {:08x}, got {actual_crc:08x}",
571                incr.filename, incr.checksum,
572            )));
573        }
574
575        // Skip the backup header, write the raw WAL frames to a temp log file
576        if incr_data.len() > BACKUP_HEADER_SIZE {
577            let wal_frames = &incr_data[BACKUP_HEADER_SIZE..];
578            let wal_file = wal_dir.join(format!("wal_{i:08}.log"));
579            std::fs::write(&wal_file, wal_frames).map_err(|e| {
580                Error::Internal(format!("failed to write WAL file for restore: {e}"))
581            })?;
582        }
583    }
584
585    // Recover WAL records up to target epoch, then write a trimmed WAL
586    // that contains only records within the epoch boundary. This ensures
587    // that when GrafeoDB::open() replays the sidecar WAL, it does not
588    // advance beyond the target epoch.
589    let recovery = grafeo_storage::wal::WalRecovery::new(&wal_dir);
590    let records = recovery.recover_until_epoch(target_epoch)?;
591
592    // Write a single trimmed WAL file containing only the bounded records
593    let trimmed_dir = wal_dir.parent().unwrap_or(Path::new(".")).join(format!(
594        "{}.trimmed_wal",
595        wal_dir
596            .file_name()
597            .and_then(|n| n.to_str())
598            .unwrap_or("wal")
599    ));
600    std::fs::create_dir_all(&trimmed_dir)
601        .map_err(|e| Error::Internal(format!("failed to create trimmed WAL directory: {e}")))?;
602
603    if !records.is_empty() {
604        use grafeo_storage::wal::{LpgWal, WalConfig};
605        let trimmed_wal = LpgWal::with_config(&trimmed_dir, WalConfig::default())?;
606        for record in &records {
607            trimmed_wal.log(record)?;
608        }
609        trimmed_wal.flush()?;
610        drop(trimmed_wal);
611    }
612
613    // Remove the original (untrimmed) restore WAL directory
614    std::fs::remove_dir_all(&wal_dir)
615        .map_err(|e| Error::Internal(format!("failed to remove restore WAL directory: {e}")))?;
616
617    // Move the trimmed WAL to the sidecar location
618    let sidecar_dir = format!("{}.wal", output_path.display());
619    let sidecar_path = std::path::Path::new(&sidecar_dir);
620    if sidecar_path.exists() {
621        std::fs::remove_dir_all(sidecar_path)
622            .map_err(|e| Error::Internal(format!("failed to remove existing sidecar WAL: {e}")))?;
623    }
624    std::fs::rename(&trimmed_dir, sidecar_path)
625        .map_err(|e| Error::Internal(format!("failed to move WAL to sidecar location: {e}")))?;
626
627    Ok(())
628}
629
630// ── Tests ──────────────────────────────────────────────────────────
631
632#[cfg(test)]
633mod tests {
634    use super::*;
635    use tempfile::TempDir;
636
637    #[test]
638    fn test_manifest_new() {
639        let manifest = BackupManifest::new();
640        assert_eq!(manifest.version, 1);
641        assert!(manifest.segments.is_empty());
642        assert!(manifest.latest_full().is_none());
643        assert!(manifest.epoch_range().is_none());
644    }
645
646    #[test]
647    fn test_manifest_with_segments() {
648        let mut manifest = BackupManifest::new();
649        manifest.segments.push(BackupSegment {
650            kind: BackupKind::Full,
651            filename: "backup_full_0000.grafeo".to_string(),
652            start_epoch: EpochId::new(0),
653            end_epoch: EpochId::new(100),
654            checksum: 12345,
655            size_bytes: 1024,
656            created_at_ms: 1000,
657        });
658        manifest.segments.push(BackupSegment {
659            kind: BackupKind::Incremental,
660            filename: "backup_incr_0001.wal".to_string(),
661            start_epoch: EpochId::new(101),
662            end_epoch: EpochId::new(200),
663            checksum: 67890,
664            size_bytes: 256,
665            created_at_ms: 2000,
666        });
667
668        let full = manifest.latest_full().unwrap();
669        assert_eq!(full.end_epoch, EpochId::new(100));
670
671        let incrs = manifest.incrementals_after(EpochId::new(100));
672        assert_eq!(incrs.len(), 1);
673        assert_eq!(incrs[0].start_epoch, EpochId::new(101));
674
675        let (start, end) = manifest.epoch_range().unwrap();
676        assert_eq!(start, EpochId::new(0));
677        assert_eq!(end, EpochId::new(200));
678    }
679
680    #[test]
681    fn test_manifest_round_trip() {
682        let dir = TempDir::new().unwrap();
683        let mut manifest = BackupManifest::new();
684        manifest.segments.push(BackupSegment {
685            kind: BackupKind::Full,
686            filename: "test.grafeo".to_string(),
687            start_epoch: EpochId::new(0),
688            end_epoch: EpochId::new(50),
689            checksum: 0,
690            size_bytes: 512,
691            created_at_ms: 0,
692        });
693
694        write_manifest(dir.path(), &manifest).unwrap();
695        let loaded = read_manifest(dir.path()).unwrap().unwrap();
696        assert_eq!(loaded.segments.len(), 1);
697        assert_eq!(loaded.segments[0].filename, "test.grafeo");
698    }
699
700    #[test]
701    fn test_manifest_not_found() {
702        let dir = TempDir::new().unwrap();
703        assert!(read_manifest(dir.path()).unwrap().is_none());
704    }
705
706    #[test]
707    fn test_backup_cursor_round_trip() {
708        let dir = TempDir::new().unwrap();
709        let cursor = BackupCursor {
710            backed_up_epoch: EpochId::new(42),
711            log_sequence: 7,
712            timestamp_ms: 12345,
713        };
714
715        write_backup_cursor(dir.path(), &cursor).unwrap();
716        let loaded = read_backup_cursor(dir.path()).unwrap().unwrap();
717        assert_eq!(loaded.backed_up_epoch, EpochId::new(42));
718        assert_eq!(loaded.log_sequence, 7);
719        assert_eq!(loaded.timestamp_ms, 12345);
720    }
721
722    #[test]
723    fn test_backup_cursor_not_found() {
724        let dir = TempDir::new().unwrap();
725        assert!(read_backup_cursor(dir.path()).unwrap().is_none());
726    }
727
728    #[test]
729    fn test_backup_header_round_trip() {
730        let mut buf = Vec::new();
731        write_backup_header(&mut buf, EpochId::new(101), EpochId::new(200), 500);
732        assert_eq!(buf.len(), BACKUP_HEADER_SIZE);
733
734        let (start, end, count) = read_backup_header(&buf).unwrap();
735        assert_eq!(start, EpochId::new(101));
736        assert_eq!(end, EpochId::new(200));
737        assert_eq!(count, 500);
738    }
739
740    #[test]
741    fn test_backup_header_invalid_magic() {
742        let data = vec![
743            0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
744            0, 0, 0, 0, 0, 0, 0,
745        ];
746        assert!(read_backup_header(&data).is_err());
747    }
748
749    #[test]
750    fn test_backup_header_too_short() {
751        let data = vec![0, 0, 0, 0];
752        assert!(read_backup_header(&data).is_err());
753    }
754
755    #[test]
756    fn test_backup_kind_serialization() {
757        let config = bincode::config::standard();
758        let encoded = bincode::serde::encode_to_vec(BackupKind::Full, config).unwrap();
759        let (parsed, _): (BackupKind, _) =
760            bincode::serde::decode_from_slice(&encoded, config).unwrap();
761        assert_eq!(parsed, BackupKind::Full);
762    }
763}