Skip to main content

grafeo_engine/database/
backup.rs

1//! Incremental backup and point-in-time recovery.
2//!
3//! Provides `backup_full()`, `backup_incremental()`, and `restore_to_epoch()`
4//! APIs on [`GrafeoDB`](super::GrafeoDB). Full backups capture the entire
5//! database state; incremental backups export only the WAL records since the
6//! last backup. Recovery replays a chain of full + incremental backups to
7//! restore the database to any committed epoch.
8//!
9//! # Backup chain model
10//!
11//! ```text
12//! [Full Snapshot] -> [Incr 1] -> [Incr 2] -> ... -> [Incr N]
13//!   epoch 0-100      101-200     201-300              901-1000
14//! ```
15//!
16//! To restore to epoch 750: load full snapshot (epoch 100), replay
17//! incrementals 1-7, stop at epoch 750.
18
19use std::path::Path;
20
21use grafeo_common::types::EpochId;
22use grafeo_common::utils::error::{Error, Result};
23use serde::{Deserialize, Serialize};
24
25// ── Backup types ───────────────────────────────────────────────────
26
27/// The type of a backup segment.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
29#[non_exhaustive]
30pub enum BackupKind {
31    /// A full snapshot of the entire database.
32    Full,
33    /// WAL records since the last backup checkpoint.
34    Incremental,
35}
36
37/// Metadata for a single backup segment (full or incremental).
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct BackupSegment {
40    /// Segment type.
41    pub kind: BackupKind,
42    /// File name (relative to backup directory).
43    pub filename: String,
44    /// Start epoch (inclusive).
45    pub start_epoch: EpochId,
46    /// End epoch (inclusive).
47    pub end_epoch: EpochId,
48    /// CRC-32 checksum of the segment file.
49    pub checksum: u32,
50    /// Size in bytes.
51    pub size_bytes: u64,
52    /// Timestamp when this backup was created (ms since UNIX epoch).
53    pub created_at_ms: u64,
54}
55
56/// Tracks the full backup chain for a database.
57#[derive(Debug, Clone, Serialize, Deserialize)]
58pub struct BackupManifest {
59    /// Manifest format version.
60    pub version: u32,
61    /// Ordered list of backup segments (full first, then incrementals).
62    pub segments: Vec<BackupSegment>,
63}
64
65impl BackupManifest {
66    /// Creates a new empty manifest.
67    #[must_use]
68    pub fn new() -> Self {
69        Self {
70            version: 1,
71            segments: Vec::new(),
72        }
73    }
74
75    /// Returns the most recent full backup segment, if any.
76    #[must_use]
77    pub fn latest_full(&self) -> Option<&BackupSegment> {
78        self.segments
79            .iter()
80            .rev()
81            .find(|s| s.kind == BackupKind::Full)
82    }
83
84    /// Returns incremental segments after the given epoch, in order.
85    pub fn incrementals_after(&self, epoch: EpochId) -> Vec<&BackupSegment> {
86        self.segments
87            .iter()
88            .filter(|s| s.kind == BackupKind::Incremental && s.start_epoch > epoch)
89            .collect()
90    }
91
92    /// Returns the epoch range covered by this manifest.
93    #[must_use]
94    pub fn epoch_range(&self) -> Option<(EpochId, EpochId)> {
95        let first = self.segments.first()?;
96        let last = self.segments.last()?;
97        Some((first.start_epoch, last.end_epoch))
98    }
99}
100
101impl Default for BackupManifest {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107/// Tracks the WAL position of the last completed backup.
108///
109/// Persisted as `backup_cursor.meta` in the WAL directory.
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct BackupCursor {
112    /// The epoch up to which WAL records have been backed up.
113    pub backed_up_epoch: EpochId,
114    /// The WAL log sequence number at the time of the last backup.
115    pub log_sequence: u64,
116    /// Timestamp of the last backup.
117    pub timestamp_ms: u64,
118}
119
120// ── Manifest I/O ───────────────────────────────────────────────────
121
122const MANIFEST_FILENAME: &str = "backup_manifest.json";
123const BACKUP_CURSOR_FILENAME: &str = "backup_cursor.meta";
124
125/// Reads the backup manifest from a backup directory.
126///
127/// Returns `None` if no manifest exists.
128///
129/// # Errors
130///
131/// Returns an error if the manifest file exists but cannot be read or parsed.
132pub fn read_manifest(backup_dir: &Path) -> Result<Option<BackupManifest>> {
133    let path = backup_dir.join(MANIFEST_FILENAME);
134    if !path.exists() {
135        return Ok(None);
136    }
137    let data = std::fs::read(&path)
138        .map_err(|e| Error::Internal(format!("failed to read backup manifest: {e}")))?;
139    let (manifest, _): (BackupManifest, _) =
140        bincode::serde::decode_from_slice(&data, bincode::config::standard())
141            .map_err(|e| Error::Internal(format!("failed to parse backup manifest: {e}")))?;
142    Ok(Some(manifest))
143}
144
145/// Writes the backup manifest to a backup directory.
146///
147/// Uses write-to-temp-then-rename for atomicity.
148///
149/// # Errors
150///
151/// Returns an error if the manifest cannot be written.
152pub fn write_manifest(backup_dir: &Path, manifest: &BackupManifest) -> Result<()> {
153    std::fs::create_dir_all(backup_dir)
154        .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
155
156    let path = backup_dir.join(MANIFEST_FILENAME);
157    let temp_path = backup_dir.join(format!("{MANIFEST_FILENAME}.tmp"));
158
159    let data = bincode::serde::encode_to_vec(manifest, bincode::config::standard())
160        .map_err(|e| Error::Internal(format!("failed to serialize backup manifest: {e}")))?;
161
162    std::fs::write(&temp_path, data)
163        .map_err(|e| Error::Internal(format!("failed to write backup manifest: {e}")))?;
164    std::fs::rename(&temp_path, &path)
165        .map_err(|e| Error::Internal(format!("failed to finalize backup manifest: {e}")))?;
166
167    Ok(())
168}
169
170// ── Backup cursor I/O ──────────────────────────────────────────────
171
172/// Reads the backup cursor from a WAL directory.
173///
174/// Returns `None` if no cursor exists (no backup has been taken).
175///
176/// # Errors
177///
178/// Returns an error if the cursor file exists but cannot be read.
179pub fn read_backup_cursor(wal_dir: &Path) -> Result<Option<BackupCursor>> {
180    let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
181    if !path.exists() {
182        return Ok(None);
183    }
184    let data = std::fs::read(&path)
185        .map_err(|e| Error::Internal(format!("failed to read backup cursor: {e}")))?;
186    let cursor: BackupCursor =
187        bincode::serde::decode_from_slice(&data, bincode::config::standard())
188            .map(|(c, _)| c)
189            .map_err(|e| Error::Internal(format!("failed to parse backup cursor: {e}")))?;
190    Ok(Some(cursor))
191}
192
193/// Writes the backup cursor to a WAL directory.
194///
195/// Uses write-to-temp-then-rename for atomicity.
196///
197/// # Errors
198///
199/// Returns an error if the cursor cannot be written.
200pub fn write_backup_cursor(wal_dir: &Path, cursor: &BackupCursor) -> Result<()> {
201    let path = wal_dir.join(BACKUP_CURSOR_FILENAME);
202    let temp_path = wal_dir.join(format!("{BACKUP_CURSOR_FILENAME}.tmp"));
203
204    let data = bincode::serde::encode_to_vec(cursor, bincode::config::standard())
205        .map_err(|e| Error::Internal(format!("failed to serialize backup cursor: {e}")))?;
206
207    std::fs::write(&temp_path, &data)
208        .map_err(|e| Error::Internal(format!("failed to write backup cursor: {e}")))?;
209    std::fs::rename(&temp_path, &path)
210        .map_err(|e| Error::Internal(format!("failed to finalize backup cursor: {e}")))?;
211
212    Ok(())
213}
214
215// ── Incremental backup file format ─────────────────────────────────
216
217/// Magic bytes for incremental backup files.
218pub const BACKUP_MAGIC: [u8; 4] = *b"GBAK";
219/// Current backup file version.
220pub const BACKUP_VERSION: u32 = 1;
221
222/// Header for an incremental backup file.
223///
224/// ```text
225/// [magic: 4 bytes "GBAK"]
226/// [version: u32 LE]
227/// [start_epoch: u64 LE]
228/// [end_epoch: u64 LE]
229/// [record_count: u64 LE]
230/// ... WAL frames ...
231/// ```
232pub const BACKUP_HEADER_SIZE: usize = 32;
233
234/// Writes the incremental backup file header.
235pub fn write_backup_header(
236    buf: &mut Vec<u8>,
237    start_epoch: EpochId,
238    end_epoch: EpochId,
239    record_count: u64,
240) {
241    buf.extend_from_slice(&BACKUP_MAGIC);
242    buf.extend_from_slice(&BACKUP_VERSION.to_le_bytes());
243    buf.extend_from_slice(&start_epoch.as_u64().to_le_bytes());
244    buf.extend_from_slice(&end_epoch.as_u64().to_le_bytes());
245    buf.extend_from_slice(&record_count.to_le_bytes());
246}
247
248/// Reads and validates the incremental backup file header.
249///
250/// Returns `(start_epoch, end_epoch, record_count)` on success.
251///
252/// # Errors
253///
254/// Returns an error if the header is invalid.
255///
256/// # Panics
257///
258/// Cannot panic: all slice indexing is bounds-checked by the length guard.
259pub fn read_backup_header(data: &[u8]) -> Result<(EpochId, EpochId, u64)> {
260    if data.len() < BACKUP_HEADER_SIZE {
261        return Err(Error::Internal(
262            "incremental backup file too short".to_string(),
263        ));
264    }
265    if data[0..4] != BACKUP_MAGIC {
266        return Err(Error::Internal(
267            "invalid backup file magic bytes".to_string(),
268        ));
269    }
270    let version = u32::from_le_bytes(data[4..8].try_into().unwrap());
271    if version > BACKUP_VERSION {
272        return Err(Error::Internal(format!(
273            "unsupported backup version {version}, max supported is {BACKUP_VERSION}"
274        )));
275    }
276    let start_epoch = EpochId::new(u64::from_le_bytes(data[8..16].try_into().unwrap()));
277    let end_epoch = EpochId::new(u64::from_le_bytes(data[16..24].try_into().unwrap()));
278    let record_count = u64::from_le_bytes(data[24..32].try_into().unwrap());
279    Ok((start_epoch, end_epoch, record_count))
280}
281
282/// Returns the timestamp in milliseconds since UNIX epoch.
283// reason: millis since UNIX epoch fits u64 for centuries
284#[allow(clippy::cast_possible_truncation)]
285pub(super) fn now_ms() -> u64 {
286    std::time::SystemTime::now()
287        .duration_since(std::time::UNIX_EPOCH)
288        .map(|d| d.as_millis() as u64)
289        .unwrap_or(0)
290}
291
292// ── Backup operations (called from GrafeoDB) ───────────────────────
293
294use grafeo_storage::file::GrafeoFileManager;
295use grafeo_storage::wal::LpgWal;
296
297/// Creates a full backup by copying the .grafeo container file.
298///
299/// 1. Copies the container file to the backup directory via the locked handle.
300/// 2. Updates the manifest and backup cursor.
301///
302/// Uses [`GrafeoFileManager::copy_to`] instead of `std::fs::copy()` so the
303/// copy reads through the already-locked file handle. `std::fs::copy()` opens
304/// a new handle, which fails on Windows when an exclusive lock is held.
305///
306/// # Errors
307///
308/// Returns an error if the database has no file manager, or if I/O fails.
309pub(super) fn do_backup_full(
310    backup_dir: &Path,
311    fm: &GrafeoFileManager,
312    wal: Option<&LpgWal>,
313    current_epoch: EpochId,
314) -> Result<BackupSegment> {
315    std::fs::create_dir_all(backup_dir)
316        .map_err(|e| Error::Internal(format!("failed to create backup directory: {e}")))?;
317
318    // Determine backup filename
319    let mut manifest = read_manifest(backup_dir)?.unwrap_or_default();
320    let segment_idx = manifest.segments.len();
321    let filename = format!("backup_full_{segment_idx:04}.grafeo");
322    let dest_path = backup_dir.join(&filename);
323
324    // Copy the .grafeo file to the backup directory through the locked handle
325    fm.copy_to(&dest_path)?;
326
327    let file_size = std::fs::metadata(&dest_path).map(|m| m.len()).unwrap_or(0);
328    let file_data = std::fs::read(&dest_path)
329        .map_err(|e| Error::Internal(format!("failed to read backup file for checksum: {e}")))?;
330    let checksum = crc32fast::hash(&file_data);
331
332    let segment = BackupSegment {
333        kind: BackupKind::Full,
334        filename,
335        start_epoch: EpochId::new(0),
336        end_epoch: current_epoch,
337        checksum,
338        size_bytes: file_size,
339        created_at_ms: now_ms(),
340    };
341
342    manifest.segments.push(segment.clone());
343    write_manifest(backup_dir, &manifest)?;
344
345    // Update backup cursor in the WAL directory.
346    // Rotate the WAL so that post-backup writes land in a new file with a
347    // strictly greater sequence number. Without this, writes that append to
348    // the still-active log file are invisible to incremental backup, which
349    // skips files with seq <= cursor.log_sequence. (GrafeoDB/grafeo#267)
350    if let Some(wal) = wal {
351        // Record the sequence of the file that was active during this backup,
352        // then rotate so post-backup writes land in a new file with seq > this.
353        let backed_up_sequence = wal.current_sequence();
354        wal.rotate()
355            .map_err(|e| Error::Internal(format!("failed to rotate WAL after full backup: {e}")))?;
356        let cursor = BackupCursor {
357            backed_up_epoch: current_epoch,
358            log_sequence: backed_up_sequence,
359            timestamp_ms: now_ms(),
360        };
361        write_backup_cursor(wal.dir(), &cursor)?;
362    }
363
364    Ok(segment)
365}
366
367/// Creates an incremental backup containing WAL records since the last backup.
368///
369/// Reads WAL log files from the backup cursor's position forward, copies
370/// the raw frames into a backup segment file.
371///
372/// # Errors
373///
374/// Returns an error if no full backup exists, or if the WAL files have been
375/// truncated past the cursor.
376pub(super) fn do_backup_incremental(
377    backup_dir: &Path,
378    wal: &LpgWal,
379    current_epoch: EpochId,
380) -> Result<BackupSegment> {
381    let manifest = read_manifest(backup_dir)?.ok_or_else(|| {
382        Error::Internal("no backup manifest found; run a full backup first".to_string())
383    })?;
384
385    if manifest.latest_full().is_none() {
386        return Err(Error::Internal(
387            "no full backup in manifest; run a full backup first".to_string(),
388        ));
389    }
390
391    let cursor = read_backup_cursor(wal.dir())?.ok_or_else(|| {
392        Error::Internal("no backup cursor found; run a full backup first".to_string())
393    })?;
394
395    let log_files = wal.log_files()?;
396    if log_files.is_empty() {
397        return Err(Error::Internal("no WAL log files to backup".to_string()));
398    }
399
400    // Read WAL files from cursor position onward
401    let mut wal_data = Vec::new();
402    let mut record_count = 0u64;
403    // cursor.backed_up_epoch used for start_epoch calculation below
404
405    for file_path in &log_files {
406        let seq = file_path
407            .file_stem()
408            .and_then(|s| s.to_str())
409            .and_then(|s| s.strip_prefix("wal_"))
410            .and_then(|s| s.parse::<u64>().ok())
411            .unwrap_or(0);
412
413        // Skip files at or before the cursor: the cursor records the
414        // sequence number that was active at the time of the last backup,
415        // so we need files strictly after that sequence to avoid re-including
416        // already-backed-up frames from the active log.
417        if seq <= cursor.log_sequence {
418            continue;
419        }
420
421        let file_bytes = std::fs::read(file_path).map_err(|e| {
422            Error::Internal(format!(
423                "failed to read WAL file {}: {e}",
424                file_path.display()
425            ))
426        })?;
427
428        if !file_bytes.is_empty() {
429            wal_data.extend_from_slice(&file_bytes);
430            // Count records by scanning for frame markers (rough count)
431            // Exact count would require parsing, but we record approximate
432            record_count += 1; // Per-file approximation
433        }
434    }
435
436    if wal_data.is_empty() {
437        return Err(Error::Internal(
438            "no new WAL records since last backup".to_string(),
439        ));
440    }
441
442    let start_epoch = EpochId::new(cursor.backed_up_epoch.as_u64() + 1);
443    let end_epoch = current_epoch;
444
445    // Write incremental backup file
446    let segment_idx = manifest.segments.len();
447    let filename = format!("backup_incr_{segment_idx:04}.wal");
448    let dest_path = backup_dir.join(&filename);
449
450    let mut output = Vec::new();
451    write_backup_header(&mut output, start_epoch, end_epoch, record_count);
452    output.extend_from_slice(&wal_data);
453
454    std::fs::write(&dest_path, &output)
455        .map_err(|e| Error::Internal(format!("failed to write incremental backup: {e}")))?;
456
457    let checksum = crc32fast::hash(&output);
458    let segment = BackupSegment {
459        kind: BackupKind::Incremental,
460        filename,
461        start_epoch,
462        end_epoch,
463        checksum,
464        size_bytes: output.len() as u64,
465        created_at_ms: now_ms(),
466    };
467
468    // Update manifest
469    let mut manifest = manifest;
470    manifest.segments.push(segment.clone());
471    write_manifest(backup_dir, &manifest)?;
472
473    // Rotate the WAL so subsequent incremental backups see a clean boundary.
474    // Same rationale as in do_backup_full (GrafeoDB/grafeo#267).
475    let backed_up_sequence = wal.current_sequence();
476    wal.rotate().map_err(|e| {
477        Error::Internal(format!(
478            "failed to rotate WAL after incremental backup: {e}"
479        ))
480    })?;
481
482    // Update backup cursor
483    let new_cursor = BackupCursor {
484        backed_up_epoch: current_epoch,
485        log_sequence: backed_up_sequence,
486        timestamp_ms: now_ms(),
487    };
488    write_backup_cursor(wal.dir(), &new_cursor)?;
489
490    Ok(segment)
491}
492
493// ── Restore ────────────────────────────────────────────────────────
494
495/// Restores a database to a specific epoch from a backup chain.
496///
497/// 1. Finds the most recent full backup with `end_epoch <= target_epoch`.
498/// 2. Opens the full backup as a GrafeoDB (via file manager).
499/// 3. Replays incremental segments up to `target_epoch` using epoch-bounded
500///    WAL recovery.
501///
502/// # Errors
503///
504/// Returns an error if the backup chain does not cover the target epoch,
505/// if segment checksums fail, or if I/O fails.
506pub(super) fn do_restore_to_epoch(
507    backup_dir: &Path,
508    target_epoch: EpochId,
509    output_path: &Path,
510) -> Result<()> {
511    let manifest = read_manifest(backup_dir)?
512        .ok_or_else(|| Error::Internal("no backup manifest found".to_string()))?;
513
514    // Find the best full backup (latest one that doesn't exceed target)
515    let full = manifest
516        .segments
517        .iter()
518        .rfind(|s| s.kind == BackupKind::Full && s.end_epoch <= target_epoch)
519        .ok_or_else(|| {
520            Error::Internal(format!(
521                "no full backup covers epoch {}",
522                target_epoch.as_u64()
523            ))
524        })?;
525
526    // Copy full backup to output path
527    let full_path = backup_dir.join(&full.filename);
528    std::fs::copy(&full_path, output_path)
529        .map_err(|e| Error::Internal(format!("failed to copy full backup to output: {e}")))?;
530
531    // Find incremental segments that cover (full.end_epoch, target_epoch]
532    let incrementals: Vec<&BackupSegment> = manifest
533        .segments
534        .iter()
535        .filter(|s| {
536            s.kind == BackupKind::Incremental
537                && s.start_epoch > full.end_epoch
538                && s.start_epoch <= target_epoch
539        })
540        .collect();
541
542    if incrementals.is_empty() {
543        // Full backup already covers the target epoch
544        return Ok(());
545    }
546
547    // Create a temporary WAL directory for replay
548    let wal_dir = output_path.parent().unwrap_or(Path::new(".")).join(format!(
549        "{}.restore_wal",
550        output_path
551            .file_name()
552            .and_then(|n| n.to_str())
553            .unwrap_or("db")
554    ));
555    std::fs::create_dir_all(&wal_dir)
556        .map_err(|e| Error::Internal(format!("failed to create restore WAL directory: {e}")))?;
557
558    // Write incremental WAL data to temp WAL files for recovery
559    for (i, incr) in incrementals.iter().enumerate() {
560        let incr_path = backup_dir.join(&incr.filename);
561        let incr_data = std::fs::read(&incr_path).map_err(|e| {
562            Error::Internal(format!(
563                "failed to read incremental backup {}: {e}",
564                incr.filename
565            ))
566        })?;
567
568        // Validate checksum
569        let actual_crc = crc32fast::hash(&incr_data);
570        if actual_crc != incr.checksum {
571            return Err(Error::Internal(format!(
572                "incremental backup {} CRC mismatch: expected {:08x}, got {actual_crc:08x}",
573                incr.filename, incr.checksum,
574            )));
575        }
576
577        // Skip the backup header, write the raw WAL frames to a temp log file
578        if incr_data.len() > BACKUP_HEADER_SIZE {
579            let wal_frames = &incr_data[BACKUP_HEADER_SIZE..];
580            let wal_file = wal_dir.join(format!("wal_{i:08}.log"));
581            std::fs::write(&wal_file, wal_frames).map_err(|e| {
582                Error::Internal(format!("failed to write WAL file for restore: {e}"))
583            })?;
584        }
585    }
586
587    // Recover WAL records up to target epoch, then write a trimmed WAL
588    // that contains only records within the epoch boundary. This ensures
589    // that when GrafeoDB::open() replays the sidecar WAL, it does not
590    // advance beyond the target epoch.
591    let recovery = grafeo_storage::wal::WalRecovery::new(&wal_dir);
592    let records = recovery.recover_until_epoch(target_epoch)?;
593
594    // Write a single trimmed WAL file containing only the bounded records
595    let trimmed_dir = wal_dir.parent().unwrap_or(Path::new(".")).join(format!(
596        "{}.trimmed_wal",
597        wal_dir
598            .file_name()
599            .and_then(|n| n.to_str())
600            .unwrap_or("wal")
601    ));
602    std::fs::create_dir_all(&trimmed_dir)
603        .map_err(|e| Error::Internal(format!("failed to create trimmed WAL directory: {e}")))?;
604
605    if !records.is_empty() {
606        use grafeo_storage::wal::{LpgWal, WalConfig};
607        let trimmed_wal = LpgWal::with_config(&trimmed_dir, WalConfig::default())?;
608        for record in &records {
609            trimmed_wal.log(record)?;
610        }
611        trimmed_wal.flush()?;
612        drop(trimmed_wal);
613    }
614
615    // Remove the original (untrimmed) restore WAL directory
616    std::fs::remove_dir_all(&wal_dir)
617        .map_err(|e| Error::Internal(format!("failed to remove restore WAL directory: {e}")))?;
618
619    // Move the trimmed WAL to the sidecar location
620    let sidecar_dir = format!("{}.wal", output_path.display());
621    let sidecar_path = std::path::Path::new(&sidecar_dir);
622    if sidecar_path.exists() {
623        std::fs::remove_dir_all(sidecar_path)
624            .map_err(|e| Error::Internal(format!("failed to remove existing sidecar WAL: {e}")))?;
625    }
626    std::fs::rename(&trimmed_dir, sidecar_path)
627        .map_err(|e| Error::Internal(format!("failed to move WAL to sidecar location: {e}")))?;
628
629    Ok(())
630}
631
632// ── Tests ──────────────────────────────────────────────────────────
633
634#[cfg(test)]
635mod tests {
636    use super::*;
637    use tempfile::TempDir;
638
639    #[test]
640    fn test_manifest_new() {
641        let manifest = BackupManifest::new();
642        assert_eq!(manifest.version, 1);
643        assert!(manifest.segments.is_empty());
644        assert!(manifest.latest_full().is_none());
645        assert!(manifest.epoch_range().is_none());
646    }
647
648    #[test]
649    fn test_manifest_with_segments() {
650        let mut manifest = BackupManifest::new();
651        manifest.segments.push(BackupSegment {
652            kind: BackupKind::Full,
653            filename: "backup_full_0000.grafeo".to_string(),
654            start_epoch: EpochId::new(0),
655            end_epoch: EpochId::new(100),
656            checksum: 12345,
657            size_bytes: 1024,
658            created_at_ms: 1000,
659        });
660        manifest.segments.push(BackupSegment {
661            kind: BackupKind::Incremental,
662            filename: "backup_incr_0001.wal".to_string(),
663            start_epoch: EpochId::new(101),
664            end_epoch: EpochId::new(200),
665            checksum: 67890,
666            size_bytes: 256,
667            created_at_ms: 2000,
668        });
669
670        let full = manifest.latest_full().unwrap();
671        assert_eq!(full.end_epoch, EpochId::new(100));
672
673        let incrs = manifest.incrementals_after(EpochId::new(100));
674        assert_eq!(incrs.len(), 1);
675        assert_eq!(incrs[0].start_epoch, EpochId::new(101));
676
677        let (start, end) = manifest.epoch_range().unwrap();
678        assert_eq!(start, EpochId::new(0));
679        assert_eq!(end, EpochId::new(200));
680    }
681
682    #[test]
683    fn test_manifest_round_trip() {
684        let dir = TempDir::new().unwrap();
685        let mut manifest = BackupManifest::new();
686        manifest.segments.push(BackupSegment {
687            kind: BackupKind::Full,
688            filename: "test.grafeo".to_string(),
689            start_epoch: EpochId::new(0),
690            end_epoch: EpochId::new(50),
691            checksum: 0,
692            size_bytes: 512,
693            created_at_ms: 0,
694        });
695
696        write_manifest(dir.path(), &manifest).unwrap();
697        let loaded = read_manifest(dir.path()).unwrap().unwrap();
698        assert_eq!(loaded.segments.len(), 1);
699        assert_eq!(loaded.segments[0].filename, "test.grafeo");
700    }
701
702    #[test]
703    fn test_manifest_not_found() {
704        let dir = TempDir::new().unwrap();
705        assert!(read_manifest(dir.path()).unwrap().is_none());
706    }
707
708    #[test]
709    fn test_backup_cursor_round_trip() {
710        let dir = TempDir::new().unwrap();
711        let cursor = BackupCursor {
712            backed_up_epoch: EpochId::new(42),
713            log_sequence: 7,
714            timestamp_ms: 12345,
715        };
716
717        write_backup_cursor(dir.path(), &cursor).unwrap();
718        let loaded = read_backup_cursor(dir.path()).unwrap().unwrap();
719        assert_eq!(loaded.backed_up_epoch, EpochId::new(42));
720        assert_eq!(loaded.log_sequence, 7);
721        assert_eq!(loaded.timestamp_ms, 12345);
722    }
723
724    #[test]
725    fn test_backup_cursor_not_found() {
726        let dir = TempDir::new().unwrap();
727        assert!(read_backup_cursor(dir.path()).unwrap().is_none());
728    }
729
730    #[test]
731    fn test_backup_header_round_trip() {
732        let mut buf = Vec::new();
733        write_backup_header(&mut buf, EpochId::new(101), EpochId::new(200), 500);
734        assert_eq!(buf.len(), BACKUP_HEADER_SIZE);
735
736        let (start, end, count) = read_backup_header(&buf).unwrap();
737        assert_eq!(start, EpochId::new(101));
738        assert_eq!(end, EpochId::new(200));
739        assert_eq!(count, 500);
740    }
741
742    #[test]
743    fn test_backup_header_invalid_magic() {
744        let data = vec![
745            0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
746            0, 0, 0, 0, 0, 0, 0,
747        ];
748        assert!(read_backup_header(&data).is_err());
749    }
750
751    #[test]
752    fn test_backup_header_too_short() {
753        let data = vec![0, 0, 0, 0];
754        assert!(read_backup_header(&data).is_err());
755    }
756
757    #[test]
758    fn test_backup_kind_serialization() {
759        let config = bincode::config::standard();
760        let encoded = bincode::serde::encode_to_vec(BackupKind::Full, config).unwrap();
761        let (parsed, _): (BackupKind, _) =
762            bincode::serde::decode_from_slice(&encoded, config).unwrap();
763        assert_eq!(parsed, BackupKind::Full);
764    }
765}