use tracing::info;
use crate::storage::snapshot_restore::parse_utc_timestamp;
pub use crate::storage::snapshot_restore::{PitrTarget, RestoreDryRun, dry_run_restore};
use crate::types::Lsn;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct SnapshotMeta {
pub snapshot_id: u64,
pub begin_lsn: Lsn,
pub end_lsn: Lsn,
pub created_at_us: u64,
pub created_by: String,
pub kind: SnapshotKind,
pub parent_id: Option<u64>,
pub data_bytes: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum SnapshotKind {
Base,
Delta,
}
#[derive(Debug, Clone)]
pub struct SnapshotCatalog {
snapshots: Vec<SnapshotMeta>,
}
impl SnapshotCatalog {
pub fn new() -> Self {
Self {
snapshots: Vec::new(),
}
}
pub fn add(&mut self, meta: SnapshotMeta) {
info!(
id = meta.snapshot_id,
kind = ?meta.kind,
begin_lsn = meta.begin_lsn.as_u64(),
end_lsn = meta.end_lsn.as_u64(),
"registered snapshot"
);
self.snapshots.push(meta);
}
pub fn find_base(&self, target_lsn: Lsn) -> Option<&SnapshotMeta> {
self.snapshots
.iter()
.filter(|s| s.kind == SnapshotKind::Base && s.end_lsn <= target_lsn)
.max_by_key(|s| s.end_lsn)
}
pub fn find_deltas(&self, base_lsn: Lsn, target_lsn: Lsn) -> Vec<&SnapshotMeta> {
let mut deltas: Vec<_> = self
.snapshots
.iter()
.filter(|s| {
s.kind == SnapshotKind::Delta && s.begin_lsn >= base_lsn && s.end_lsn <= target_lsn
})
.collect();
deltas.sort_by_key(|s| s.begin_lsn);
deltas
}
pub fn resolve_pitr<F>(
&self,
target_timestamp_us: u64,
lsn_for_timestamp: F,
) -> Option<PitrTarget>
where
F: Fn(u64) -> Option<Lsn>,
{
let replay_lsn = lsn_for_timestamp(target_timestamp_us)?;
let base = self.find_base(replay_lsn)?;
let deltas: Vec<_> = self
.find_deltas(base.end_lsn, replay_lsn)
.into_iter()
.cloned()
.collect();
let last_snapshot_lsn = deltas.last().map(|d| d.end_lsn).unwrap_or(base.end_lsn);
let wal_records = replay_lsn
.as_u64()
.saturating_sub(last_snapshot_lsn.as_u64());
Some(PitrTarget {
base_snapshot: base.clone(),
deltas,
replay_lsn,
wal_records_to_replay: wal_records,
})
}
pub fn len(&self) -> usize {
self.snapshots.len()
}
pub fn is_empty(&self) -> bool {
self.snapshots.is_empty()
}
pub fn all(&self) -> &[SnapshotMeta] {
&self.snapshots
}
pub fn emit_begin_marker(&self, current_lsn: Lsn) -> SnapshotMarker {
info!(lsn = current_lsn.as_u64(), "snapshot BEGIN marker");
SnapshotMarker {
marker_type: MarkerType::Begin,
lsn: current_lsn,
timestamp_us: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as u64,
}
}
pub fn emit_end_marker(&self, end_lsn: Lsn) -> SnapshotMarker {
info!(lsn = end_lsn.as_u64(), "snapshot END marker");
SnapshotMarker {
marker_type: MarkerType::End,
lsn: end_lsn,
timestamp_us: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as u64,
}
}
pub fn resolve_pitr_utc<F>(
&self,
utc_input: &str,
lsn_for_timestamp: F,
) -> crate::Result<PitrTarget>
where
F: Fn(u64) -> Option<Lsn>,
{
let timestamp_us = parse_utc_timestamp(utc_input)?;
self.resolve_pitr(timestamp_us, lsn_for_timestamp)
.ok_or_else(|| crate::Error::Storage {
engine: "snapshot".into(),
detail: format!(
"no snapshot available for PITR target timestamp {utc_input} \
(resolved to {timestamp_us}µs)"
),
})
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SnapshotMarker {
pub marker_type: MarkerType,
pub lsn: Lsn,
pub timestamp_us: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum MarkerType {
Begin,
End,
}
impl Default for SnapshotCatalog {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn base_snapshot(id: u64, end_lsn: u64) -> SnapshotMeta {
SnapshotMeta {
snapshot_id: id,
begin_lsn: Lsn::new(1),
end_lsn: Lsn::new(end_lsn),
created_at_us: 1_700_000_000_000_000,
created_by: "node-1".into(),
kind: SnapshotKind::Base,
parent_id: None,
data_bytes: 1_000_000,
}
}
fn delta_snapshot(id: u64, begin: u64, end: u64, parent: u64) -> SnapshotMeta {
SnapshotMeta {
snapshot_id: id,
begin_lsn: Lsn::new(begin),
end_lsn: Lsn::new(end),
created_at_us: 1_700_000_000_000_000 + end * 1000,
created_by: "node-1".into(),
kind: SnapshotKind::Delta,
parent_id: Some(parent),
data_bytes: 100_000,
}
}
#[test]
fn empty_catalog() {
let cat = SnapshotCatalog::new();
assert!(cat.is_empty());
assert!(cat.find_base(Lsn::new(100)).is_none());
}
#[test]
fn find_base_snapshot() {
let mut cat = SnapshotCatalog::new();
cat.add(base_snapshot(1, 100));
cat.add(base_snapshot(2, 500));
let base = cat.find_base(Lsn::new(300)).unwrap();
assert_eq!(base.snapshot_id, 1);
let base = cat.find_base(Lsn::new(600)).unwrap();
assert_eq!(base.snapshot_id, 2);
assert!(cat.find_base(Lsn::new(50)).is_none());
}
#[test]
fn find_deltas_in_range() {
let mut cat = SnapshotCatalog::new();
cat.add(base_snapshot(1, 100));
cat.add(delta_snapshot(2, 100, 200, 1));
cat.add(delta_snapshot(3, 200, 300, 1));
cat.add(delta_snapshot(4, 300, 400, 1));
let deltas = cat.find_deltas(Lsn::new(100), Lsn::new(350));
assert_eq!(deltas.len(), 2); assert_eq!(deltas[0].snapshot_id, 2);
assert_eq!(deltas[1].snapshot_id, 3);
}
#[test]
fn resolve_pitr() {
let mut cat = SnapshotCatalog::new();
cat.add(base_snapshot(1, 100));
cat.add(delta_snapshot(2, 100, 200, 1));
let target = cat
.resolve_pitr(1_700_000_000_250_000, |_| Some(Lsn::new(250)))
.unwrap();
assert_eq!(target.base_snapshot.snapshot_id, 1);
assert_eq!(target.deltas.len(), 1);
assert_eq!(target.deltas[0].snapshot_id, 2);
assert_eq!(target.replay_lsn, Lsn::new(250));
assert_eq!(target.wal_records_to_replay, 50); }
#[test]
fn dry_run_valid() {
let target = PitrTarget {
base_snapshot: base_snapshot(1, 100),
deltas: vec![delta_snapshot(2, 100, 200, 1)],
replay_lsn: Lsn::new(250),
wal_records_to_replay: 50,
};
let result = dry_run_restore(&target);
assert!(result.valid);
assert!(result.issues.is_empty());
assert_eq!(result.files_to_read, 2);
assert_eq!(result.wal_records, 50);
assert!(result.plan_description.contains("base snapshot #1"));
}
#[test]
fn dry_run_detects_gap() {
let target = PitrTarget {
base_snapshot: base_snapshot(1, 100),
deltas: vec![delta_snapshot(2, 150, 200, 1)], replay_lsn: Lsn::new(250),
wal_records_to_replay: 50,
};
let result = dry_run_restore(&target);
assert!(!result.valid);
assert!(!result.issues.is_empty());
assert!(result.issues[0].contains("gap"));
}
#[test]
fn pitr_no_deltas_needed() {
let mut cat = SnapshotCatalog::new();
cat.add(base_snapshot(1, 100));
let target = cat
.resolve_pitr(1_700_000_000_110_000, |_| Some(Lsn::new(110)))
.unwrap();
assert!(target.deltas.is_empty());
assert_eq!(target.wal_records_to_replay, 10); }
}