use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::core::GLOBAL_INTERNER;
use crate::core::error::{Result, StorageError};
use crate::core::graph::{Edge, Node};
use crate::core::id::{EdgeId, NodeId, VersionId};
use crate::core::interning::InternedString;
use crate::core::version::VersionData;
use crate::storage::current::CurrentStorage;
use crate::storage::historical::HistoricalStorage;
use crate::storage::index_persistence::{
GRAPH_MAGIC, IndexPersistenceError, IndexPersistenceManager, MANIFEST_VERSION, TEMPORAL_MAGIC,
formats::{
GraphIndexData, GraphIndexManifestEntry, IndexManifest, PersistedEdge, PersistedNode,
StringInternerManifestEntry, TemporalIndexData, TemporalIndexManifestEntry,
},
graph::{persist_property_map, restore_property_map},
};
use crate::storage::redb_cold_storage::RedbColdStorage;
use crate::storage::wal::LSN;
use crate::storage::wal::concurrent_system::ConcurrentWalSystem;
pub const MIN_ZSTD_LEVEL: i32 = 1;
pub const MAX_ZSTD_LEVEL: i32 = 22;
pub const DEFAULT_ZSTD_LEVEL: i32 = 3;
fn persistence_err(e: IndexPersistenceError) -> crate::core::error::Error {
StorageError::CheckpointError {
reason: e.to_string(),
}
.into()
}
#[derive(Debug, Clone)]
pub struct CheckpointConfig {
pub data_dir: PathBuf,
pub checkpoint_interval: Duration,
pub min_wal_entries: u64,
pub enable_compression: bool,
pub compression_level: i32,
}
impl Default for CheckpointConfig {
fn default() -> Self {
Self {
data_dir: PathBuf::from("data"),
checkpoint_interval: Duration::from_secs(300), min_wal_entries: 1000,
enable_compression: true,
compression_level: DEFAULT_ZSTD_LEVEL,
}
}
}
impl CheckpointConfig {
pub fn with_data_dir(data_dir: impl Into<PathBuf>) -> Self {
Self {
data_dir: data_dir.into(),
..Default::default()
}
}
}
pub struct RecoveryResult {
pub current: CurrentStorage,
pub historical: HistoricalStorage,
pub final_lsn: LSN,
pub checkpoint_lsn: Option<LSN>,
pub flushed_lsn: Option<LSN>,
pub effective_lsn: LSN,
pub wal_entries_replayed: u64,
}
impl RecoveryResult {
pub fn used_cold_storage(&self) -> bool {
match (self.checkpoint_lsn, self.flushed_lsn) {
(Some(checkpoint), Some(flushed)) => flushed.0 > checkpoint.0,
(None, Some(_)) => true,
_ => false,
}
}
pub fn used_checkpoint(&self) -> bool {
self.checkpoint_lsn.is_some()
}
pub fn wal_entries_skipped_from_cold(&self) -> u64 {
match (self.checkpoint_lsn, self.flushed_lsn) {
(Some(checkpoint), Some(flushed)) if flushed.0 > checkpoint.0 => {
flushed.0 - checkpoint.0
}
(None, Some(flushed)) => flushed.0,
_ => 0,
}
}
}
pub struct CheckpointManager {
config: CheckpointConfig,
persistence_manager: IndexPersistenceManager,
last_checkpoint_time: SystemTime,
last_checkpoint_lsn: LSN,
}
impl CheckpointManager {
pub fn new(config: CheckpointConfig) -> Result<Self> {
if config.enable_compression
&& !(MIN_ZSTD_LEVEL..=MAX_ZSTD_LEVEL).contains(&config.compression_level)
{
return Err(StorageError::CheckpointError {
reason: format!(
"Invalid compression level {}: must be 1-22 for zstd",
config.compression_level
),
}
.into());
}
let persistence_manager = IndexPersistenceManager::new(&config.data_dir);
persistence_manager
.ensure_directories()
.map_err(persistence_err)?;
Ok(Self {
config,
persistence_manager,
last_checkpoint_time: UNIX_EPOCH,
last_checkpoint_lsn: LSN::initial(),
})
}
pub fn should_checkpoint(&self, current_lsn: LSN) -> bool {
let time_elapsed = SystemTime::now()
.duration_since(self.last_checkpoint_time)
.unwrap_or(Duration::MAX);
if time_elapsed >= self.config.checkpoint_interval {
return true;
}
let lsn_diff = current_lsn.0.saturating_sub(self.last_checkpoint_lsn.0);
lsn_diff >= self.config.min_wal_entries
}
pub fn create_checkpoint(
&mut self,
lsn: LSN,
current: &CurrentStorage,
historical: &HistoricalStorage,
) -> Result<CheckpointStats> {
let start_time = std::time::Instant::now();
let mut bytes_written = 0u64;
let (current_snapshot, historical_snapshot) = {
let _lock = current.snapshot_lock.write();
let c = current.create_snapshot(lsn);
let h = historical.create_snapshot(lsn);
(c, h)
};
self.persistence_manager
.save_string_interner()
.map_err(persistence_err)?;
bytes_written += std::fs::metadata(self.persistence_manager.interner_path())
.map(|m| m.len())
.unwrap_or(0);
let graph_data = self.extract_graph_data_from_snapshot(¤t_snapshot)?;
let graph_path = self.persistence_manager.graph_path().join("adjacency.idx");
if self.config.enable_compression {
crate::storage::index_persistence::graph::save_graph_index_compressed(
&graph_data,
&graph_path,
self.config.compression_level,
)
.map_err(persistence_err)?;
} else {
crate::storage::index_persistence::graph::save_graph_index(&graph_data, &graph_path)
.map_err(persistence_err)?;
}
bytes_written += std::fs::metadata(&graph_path).map(|m| m.len()).unwrap_or(0);
let temporal_data = self.extract_temporal_data_from_snapshot(&historical_snapshot)?;
let temporal_path = self
.persistence_manager
.temporal_path()
.join("versions.idx");
crate::storage::index_persistence::temporal::save_temporal_index(
&temporal_data,
&temporal_path,
)
.map_err(persistence_err)?;
bytes_written += std::fs::metadata(&temporal_path)
.map(|m| m.len())
.unwrap_or(0);
let mut manifest = IndexManifest::new(lsn.0);
manifest.graph_index = Some(GraphIndexManifestEntry {
adjacency_file: "graph/adjacency.idx".to_string(),
node_count: graph_data.node_count,
edge_count: graph_data.edge_count,
});
manifest.temporal_index = Some(TemporalIndexManifestEntry {
node_versions_file: "temporal/versions.idx".to_string(),
edge_versions_file: "temporal/versions.idx".to_string(),
version_count: (temporal_data.node_versions.len() + temporal_data.edge_versions.len())
as u64,
});
let string_count = GLOBAL_INTERNER.len() as u64;
manifest.string_interner = Some(StringInternerManifestEntry {
interner_file: "strings/interner.idx".to_string(),
string_count,
});
self.persistence_manager
.save_manifest(&manifest)
.map_err(persistence_err)?;
bytes_written += std::fs::metadata(self.persistence_manager.manifest_path())
.map(|m| m.len())
.unwrap_or(0);
self.last_checkpoint_time = SystemTime::now();
self.last_checkpoint_lsn = lsn;
Ok(CheckpointStats {
duration: start_time.elapsed(),
bytes_written,
lsn,
node_count: graph_data.node_count as usize,
edge_count: graph_data.edge_count as usize,
version_count: temporal_data.node_versions.len() + temporal_data.edge_versions.len(),
})
}
pub fn recover(
&mut self,
wal: &ConcurrentWalSystem,
) -> Result<(CurrentStorage, HistoricalStorage, LSN)> {
if !self.persistence_manager.indexes_exist() {
return self.recover_from_wal_only(wal);
}
let manifest = self
.persistence_manager
.load_manifest_and_strings()
.map_err(persistence_err)?;
let checkpoint_lsn = LSN(manifest.lsn);
let wal_current_lsn = wal.current_lsn();
if checkpoint_lsn.0 > wal_current_lsn.0 {
return Err(StorageError::CheckpointError {
reason: format!(
"Checkpoint LSN {} is ahead of WAL current LSN {}, \
checkpoint may be from a different WAL or corrupted",
checkpoint_lsn.0, wal_current_lsn.0
),
}
.into());
}
let current = self.load_current_storage(&manifest)?;
let (historical, historical_max_version_id) = self.load_historical_storage(&manifest)?;
if historical_max_version_id > 0 {
use crate::core::id::MAX_VALID_ID;
let next_version_id = historical_max_version_id.saturating_add(1);
if next_version_id > MAX_VALID_ID {
return Err(StorageError::CheckpointError {
reason: format!(
"Historical max version ID {} would overflow MAX_VALID_ID on recovery",
historical_max_version_id
),
}
.into());
}
current.ensure_version_id_generator_at_least(next_version_id);
}
let start_lsn = checkpoint_lsn.next();
let (current, historical, final_lsn) =
self.replay_wal(wal, current, historical, start_lsn)?;
self.last_checkpoint_lsn = checkpoint_lsn;
Ok((current, historical, final_lsn))
}
pub fn has_persisted_state(&self) -> bool {
self.persistence_manager.indexes_exist()
}
pub fn get_persisted_lsn(&self) -> Option<LSN> {
if !self.persistence_manager.indexes_exist() {
return None;
}
self.persistence_manager
.load_manifest_and_strings()
.ok()
.map(|m| LSN(m.lsn))
}
pub fn recover_with_cold_storage(
&mut self,
wal: &ConcurrentWalSystem,
cold_storage: Option<&Arc<RedbColdStorage>>,
) -> Result<RecoveryResult> {
let flushed_lsn = cold_storage.and_then(|cs| cs.get_flushed_lsn().ok().flatten());
if !self.persistence_manager.indexes_exist() {
return self.recover_from_wal_with_cold_storage(wal, flushed_lsn);
}
let manifest = self
.persistence_manager
.load_manifest_and_strings()
.map_err(persistence_err)?;
let checkpoint_lsn = LSN(manifest.lsn);
let wal_current_lsn = wal.current_lsn();
if checkpoint_lsn.0 > wal_current_lsn.0 {
return Err(StorageError::CheckpointError {
reason: format!(
"Checkpoint LSN {} is ahead of WAL current LSN {}, \
checkpoint may be from a different WAL or corrupted",
checkpoint_lsn.0, wal_current_lsn.0
),
}
.into());
}
let current = self.load_current_storage(&manifest)?;
let (historical, historical_max_version_id) = self.load_historical_storage(&manifest)?;
if historical_max_version_id > 0 {
use crate::core::id::MAX_VALID_ID;
let next_version_id = historical_max_version_id.saturating_add(1);
if next_version_id > MAX_VALID_ID {
return Err(StorageError::CheckpointError {
reason: format!(
"Historical max version ID {} would overflow MAX_VALID_ID on recovery",
historical_max_version_id
),
}
.into());
}
current.ensure_version_id_generator_at_least(next_version_id);
}
let effective_lsn = match flushed_lsn {
Some(flushed) if flushed.0 > checkpoint_lsn.0 => {
if flushed.0 > wal_current_lsn.0 {
return Err(StorageError::CheckpointError {
reason: format!(
"Cold storage flushed_lsn {} is ahead of WAL current LSN {}, \
data inconsistency detected",
flushed.0, wal_current_lsn.0
),
}
.into());
}
flushed
}
_ => checkpoint_lsn,
};
let start_lsn = effective_lsn.next();
let (current, historical, final_lsn) =
self.replay_wal(wal, current, historical, start_lsn)?;
self.last_checkpoint_lsn = checkpoint_lsn;
Ok(RecoveryResult {
current,
historical,
final_lsn,
checkpoint_lsn: Some(checkpoint_lsn),
flushed_lsn,
effective_lsn,
wal_entries_replayed: final_lsn.0.saturating_sub(start_lsn.0),
})
}
fn recover_from_wal_with_cold_storage(
&self,
wal: &ConcurrentWalSystem,
flushed_lsn: Option<LSN>,
) -> Result<RecoveryResult> {
let current = CurrentStorage::new();
let historical = HistoricalStorage::new();
let start_lsn = match flushed_lsn {
Some(lsn) => lsn.next(),
None => LSN::initial(),
};
let effective_lsn = flushed_lsn.unwrap_or(LSN::initial());
let (current, historical, final_lsn) =
self.replay_wal(wal, current, historical, start_lsn)?;
Ok(RecoveryResult {
current,
historical,
final_lsn,
checkpoint_lsn: None,
flushed_lsn,
effective_lsn,
wal_entries_replayed: final_lsn.0.saturating_sub(start_lsn.0),
})
}
fn extract_graph_data_from_snapshot(
&self,
snapshot: &crate::storage::snapshot::CurrentStorageSnapshot,
) -> Result<GraphIndexData> {
let mut nodes = Vec::with_capacity(snapshot.node_count());
let mut edges = Vec::with_capacity(snapshot.edge_count());
for node in snapshot.iter_nodes() {
let persisted = PersistedNode {
id: node.id.as_u64(),
label_idx: node.label.as_u32(),
version_id: node.current_version.as_u64(),
properties: persist_property_map(&node.properties).map_err(persistence_err)?,
};
nodes.push(persisted);
}
for edge in snapshot.iter_edges() {
let persisted = PersistedEdge {
id: edge.id.as_u64(),
source_id: edge.source.as_u64(),
target_id: edge.target.as_u64(),
label_idx: edge.label.as_u32(),
version_id: edge.current_version.as_u64(),
properties: persist_property_map(&edge.properties).map_err(persistence_err)?,
};
edges.push(persisted);
}
Ok(GraphIndexData {
magic: GRAPH_MAGIC,
version: MANIFEST_VERSION,
node_count: nodes.len() as u64,
edge_count: edges.len() as u64,
nodes,
edges,
outgoing_node_ids: Vec::new(),
outgoing_offsets: Vec::new(),
outgoing_neighbors: Vec::new(),
incoming_node_ids: Vec::new(),
incoming_offsets: Vec::new(),
incoming_neighbors: Vec::new(),
})
}
#[allow(dead_code)]
fn extract_graph_data(&self, current: &CurrentStorage) -> Result<GraphIndexData> {
let snapshot = current.create_snapshot(LSN(0));
self.extract_graph_data_from_snapshot(&snapshot)
}
fn extract_temporal_data_from_snapshot(
&self,
snapshot: &crate::storage::snapshot::HistoricalStorageSnapshot,
) -> Result<TemporalIndexData> {
use crate::core::property::PropertyMapBuilder;
use crate::storage::index_persistence::formats::{
EdgeAnchorEntry, EdgeVersionEntry, NodeAnchorEntry, NodeVersionEntry,
PersistedVersionType,
};
let mut node_versions = Vec::with_capacity(snapshot.node_version_count());
let mut node_anchors = Vec::with_capacity(snapshot.node_version_count());
let mut edge_versions = Vec::with_capacity(snapshot.edge_version_count());
let mut edge_anchors = Vec::with_capacity(snapshot.edge_version_count());
for version_arc in snapshot.iter_node_versions() {
let version = &*version_arc;
let version_id = version.id;
let (version_type, properties, vector_snapshot_id) = match &version.data {
VersionData::Anchor {
properties,
vector_snapshot_id,
} => {
node_anchors.push(NodeAnchorEntry {
node_id: version.node_id.as_u64(),
anchor_tx_time: version.temporal.transaction_time().start().wallclock(),
full_state: persist_property_map(properties).map_err(persistence_err)?,
vector_snapshot_id: vector_snapshot_id.map(|id| id as u64),
});
(
PersistedVersionType::Anchor,
persist_property_map(properties).map_err(persistence_err)?,
vector_snapshot_id.map(|id| id as u64),
)
}
VersionData::Delta { delta } => {
let mut builder = PropertyMapBuilder::new();
for (key, value) in &delta.changed {
builder = builder.insert_by_key(*key, value.clone());
}
let changed_props = builder.build();
let removed_keys: Vec<u32> = delta
.removed
.iter()
.map(|k: &crate::core::interning::InternedString| k.as_u32())
.collect();
(
PersistedVersionType::Delta {
base_anchor_tx: version.temporal.transaction_time().start().wallclock(),
base_anchor_tx_logical: version
.temporal
.transaction_time()
.start()
.logical(),
removed_keys,
},
persist_property_map(&changed_props).map_err(persistence_err)?,
None,
)
}
};
let valid_time = version.temporal.valid_time();
let entry = NodeVersionEntry {
version_id: version_id.as_u64(),
node_id: version.node_id.as_u64(),
label_idx: version.label.as_u32(),
valid_from: valid_time.start().wallclock(),
valid_from_logical: valid_time.start().logical(),
valid_to: if valid_time.is_current() {
None
} else {
Some(valid_time.end().wallclock())
},
valid_to_logical: if valid_time.is_current() {
None
} else {
Some(valid_time.end().logical())
},
tx_time: version.temporal.transaction_time().start().wallclock(),
tx_time_logical: version.temporal.transaction_time().start().logical(),
version_type,
properties,
vector_snapshot_id,
};
node_versions.push(entry);
}
for version_arc in snapshot.iter_edge_versions() {
let version = &*version_arc;
let version_id = version.id;
let (version_type, properties) = match &version.data {
VersionData::Anchor { properties, .. } => {
edge_anchors.push(EdgeAnchorEntry {
edge_id: version.edge_id.as_u64(),
anchor_tx_time: version.temporal.transaction_time().start().wallclock(),
full_state: persist_property_map(properties).map_err(persistence_err)?,
});
(
PersistedVersionType::Anchor,
persist_property_map(properties).map_err(persistence_err)?,
)
}
VersionData::Delta { delta } => {
let mut builder = PropertyMapBuilder::new();
for (key, value) in &delta.changed {
builder = builder.insert_by_key(*key, value.clone());
}
let changed_props = builder.build();
let removed_keys: Vec<u32> = delta
.removed
.iter()
.map(|k: &crate::core::interning::InternedString| k.as_u32())
.collect();
(
PersistedVersionType::Delta {
base_anchor_tx: version.temporal.transaction_time().start().wallclock(),
base_anchor_tx_logical: version
.temporal
.transaction_time()
.start()
.logical(),
removed_keys,
},
persist_property_map(&changed_props).map_err(persistence_err)?,
)
}
};
let valid_time = version.temporal.valid_time();
let entry = EdgeVersionEntry {
version_id: version_id.as_u64(),
edge_id: version.edge_id.as_u64(),
source_id: version.source.as_u64(),
target_id: version.target.as_u64(),
label_idx: version.label.as_u32(),
valid_from: valid_time.start().wallclock(),
valid_from_logical: valid_time.start().logical(),
valid_to: if valid_time.is_current() {
None
} else {
Some(valid_time.end().wallclock())
},
valid_to_logical: if valid_time.is_current() {
None
} else {
Some(valid_time.end().logical())
},
tx_time: version.temporal.transaction_time().start().wallclock(),
tx_time_logical: version.temporal.transaction_time().start().logical(),
version_type,
properties,
};
edge_versions.push(entry);
}
Ok(TemporalIndexData {
magic: TEMPORAL_MAGIC,
version: MANIFEST_VERSION,
node_versions,
node_anchors,
edge_versions,
edge_anchors,
})
}
#[allow(dead_code)]
fn extract_temporal_data(&self, historical: &HistoricalStorage) -> Result<TemporalIndexData> {
let snapshot = historical.create_snapshot(LSN(0));
self.extract_temporal_data_from_snapshot(&snapshot)
}
fn load_current_storage(&self, manifest: &IndexManifest) -> Result<CurrentStorage> {
let current = CurrentStorage::new();
if let Some(ref graph_entry) = manifest.graph_index {
let graph_path = self
.persistence_manager
.indexes_path()
.join(&graph_entry.adjacency_file);
let graph_data =
crate::storage::index_persistence::graph::load_graph_index(&graph_path)
.map_err(persistence_err)?;
let mut max_version_id: u64 = 0;
for persisted_node in &graph_data.nodes {
let node_id = NodeId::new(persisted_node.id)?;
let label = InternedString::from_raw(persisted_node.label_idx);
let properties =
restore_property_map(&persisted_node.properties).map_err(persistence_err)?;
let version_id = VersionId::new(persisted_node.version_id)?;
max_version_id = max_version_id.max(persisted_node.version_id);
let node = Node::new(node_id, label, properties, version_id);
current.insert_node_direct(node, crate::core::temporal::time::now())?;
}
for persisted_edge in &graph_data.edges {
let edge_id = EdgeId::new(persisted_edge.id)?;
let source = NodeId::new(persisted_edge.source_id)?;
let target = NodeId::new(persisted_edge.target_id)?;
let label = InternedString::from_raw(persisted_edge.label_idx);
let properties =
restore_property_map(&persisted_edge.properties).map_err(persistence_err)?;
let version_id = VersionId::new(persisted_edge.version_id)?;
max_version_id = max_version_id.max(persisted_edge.version_id);
let edge = Edge::new(edge_id, label, source, target, properties, version_id);
current.insert_edge_direct(edge)?;
}
current.init_version_id_generator(max_version_id + 1);
if let Some(max_node_id) = graph_data.nodes.iter().map(|n| n.id).max() {
current.init_node_id_generator(max_node_id + 1);
}
if let Some(max_edge_id) = graph_data.edges.iter().map(|e| e.id).max() {
current.init_edge_id_generator(max_edge_id + 1);
}
}
Ok(current)
}
fn load_historical_storage(
&self,
manifest: &IndexManifest,
) -> Result<(HistoricalStorage, u64)> {
use crate::core::version::PropertyDelta;
let mut historical = HistoricalStorage::new();
let mut max_version_id: u64 = 0;
if let Some(ref temporal_entry) = manifest.temporal_index {
let temporal_path = self
.persistence_manager
.indexes_path()
.join(&temporal_entry.node_versions_file);
let temporal_data =
crate::storage::index_persistence::temporal::load_temporal_index(&temporal_path)
.map_err(persistence_err)?;
historical.reserve_restoration_capacity(
temporal_data.node_versions.len(),
temporal_data.edge_versions.len(),
);
for entry in &temporal_data.node_versions {
max_version_id = max_version_id.max(entry.version_id);
let version_id = VersionId::new(entry.version_id)?;
let node_id = NodeId::new(entry.node_id)?;
use crate::core::hlc::HybridTimestamp;
use crate::core::temporal::{TIMESTAMP_MAX, TimeRange};
let valid_start =
HybridTimestamp::new_unchecked(entry.valid_from, entry.valid_from_logical);
let valid_end = entry
.valid_to
.map(|t| HybridTimestamp::new_unchecked(t, entry.valid_to_logical.unwrap_or(0)))
.unwrap_or(TIMESTAMP_MAX);
let valid_time = TimeRange::new(valid_start, valid_end).map_err(|e| {
StorageError::CheckpointError {
reason: format!("Invalid valid time range: {}", e),
}
})?;
let tx_start = HybridTimestamp::new_unchecked(entry.tx_time, entry.tx_time_logical);
let tx_time = TimeRange::from(tx_start);
let temporal = crate::core::temporal::BiTemporalInterval::new(valid_time, tx_time);
let properties =
restore_property_map(&entry.properties).map_err(persistence_err)?;
let label = InternedString::from_raw(entry.label_idx);
let data = match &entry.version_type {
crate::storage::index_persistence::formats::PersistedVersionType::Anchor => {
let vector_snapshot_id = entry.vector_snapshot_id.map(|id| id as usize);
VersionData::Anchor {
properties,
vector_snapshot_id,
}
}
crate::storage::index_persistence::formats::PersistedVersionType::Delta {
removed_keys,
..
} => {
let mut delta = PropertyDelta::new();
for (key, value) in properties.iter() {
delta.changed.insert(*key, value.clone());
}
for key_idx in removed_keys {
delta.removed.insert(InternedString::from_raw(*key_idx));
}
VersionData::Delta { delta }
}
};
let version = crate::core::version::NodeVersion {
id: version_id,
node_id,
commit_timestamp: temporal.transaction_time().start(),
temporal,
label,
data,
next_version: None,
prev_version: None,
};
historical.insert_restored_node_version(version)?;
}
for entry in &temporal_data.edge_versions {
max_version_id = max_version_id.max(entry.version_id);
let version_id = VersionId::new(entry.version_id)?;
let edge_id = EdgeId::new(entry.edge_id)?;
let source = NodeId::new(entry.source_id)?;
let target = NodeId::new(entry.target_id)?;
use crate::core::hlc::HybridTimestamp;
use crate::core::temporal::{TIMESTAMP_MAX, TimeRange};
let valid_start =
HybridTimestamp::new_unchecked(entry.valid_from, entry.valid_from_logical);
let valid_end = entry
.valid_to
.map(|t| HybridTimestamp::new_unchecked(t, entry.valid_to_logical.unwrap_or(0)))
.unwrap_or(TIMESTAMP_MAX);
let valid_time = TimeRange::new(valid_start, valid_end).map_err(|e| {
StorageError::CheckpointError {
reason: format!("Invalid valid time range: {}", e),
}
})?;
let tx_start = HybridTimestamp::new_unchecked(entry.tx_time, entry.tx_time_logical);
let tx_time = TimeRange::from(tx_start);
let temporal = crate::core::temporal::BiTemporalInterval::new(valid_time, tx_time);
let properties =
restore_property_map(&entry.properties).map_err(persistence_err)?;
let label = InternedString::from_raw(entry.label_idx);
let data = match &entry.version_type {
crate::storage::index_persistence::formats::PersistedVersionType::Anchor => {
VersionData::Anchor {
properties,
vector_snapshot_id: None,
}
}
crate::storage::index_persistence::formats::PersistedVersionType::Delta {
removed_keys,
..
} => {
let mut delta = PropertyDelta::new();
for (key, value) in properties.iter() {
delta.changed.insert(*key, value.clone());
}
for key_idx in removed_keys {
delta.removed.insert(InternedString::from_raw(*key_idx));
}
VersionData::Delta { delta }
}
};
let version = crate::core::version::EdgeVersion {
id: version_id,
edge_id,
source,
target,
commit_timestamp: temporal.transaction_time().start(),
temporal,
label,
data,
next_version: None,
prev_version: None,
};
historical.insert_restored_edge_version(version)?;
}
}
Ok((historical, max_version_id))
}
fn recover_from_wal_only(
&mut self,
wal: &ConcurrentWalSystem,
) -> Result<(CurrentStorage, HistoricalStorage, LSN)> {
let current = CurrentStorage::new();
let historical = HistoricalStorage::new();
self.replay_wal(wal, current, historical, LSN::initial())
}
fn replay_wal(
&self,
wal: &ConcurrentWalSystem,
current: CurrentStorage,
mut historical: HistoricalStorage,
start_lsn: LSN,
) -> Result<(CurrentStorage, HistoricalStorage, LSN)> {
let initial_version_id = current.get_version_id_generator_current();
let (final_lsn, max_node_id, max_edge_id, next_version_id) =
crate::storage::recovery::replay_wal_into_storage(
wal,
¤t,
&mut historical,
start_lsn,
initial_version_id,
)?;
if let Some(max_node_id) = max_node_id {
current.init_node_id_generator(max_node_id + 1);
}
if let Some(max_edge_id) = max_edge_id {
current.init_edge_id_generator(max_edge_id + 1);
}
current.ensure_version_id_generator_at_least(next_version_id);
Ok((current, historical, final_lsn))
}
}
#[derive(Debug, Clone)]
pub struct CheckpointStats {
pub duration: Duration,
pub bytes_written: u64,
pub lsn: LSN,
pub node_count: usize,
pub edge_count: usize,
pub version_count: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::GLOBAL_INTERNER;
use crate::PropertyMapBuilder;
use crate::core::id::NodeId;
use crate::core::temporal::{BiTemporalInterval, time};
use crate::storage::wal::WalOperation;
use crate::storage::wal::concurrent_system::ConcurrentWalSystemConfig;
use tempfile::TempDir;
#[test]
fn test_checkpoint_creation_basic() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = CheckpointConfig::with_data_dir(temp_dir.path());
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
let historical = HistoricalStorage::new();
let stats = manager.create_checkpoint(LSN(100), ¤t, &historical)?;
assert_eq!(stats.lsn, LSN(100));
assert_eq!(stats.node_count, 0);
assert_eq!(stats.edge_count, 0);
assert!(stats.bytes_written > 0);
Ok(())
}
#[test]
fn test_checkpoint_persists_graph_data() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = CheckpointConfig::with_data_dir(temp_dir.path());
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
for i in 1..=10 {
let props = PropertyMapBuilder::new()
.insert("name", format!("Node{}", i))
.build();
let node_id = NodeId::new(i)?;
let label = GLOBAL_INTERNER
.intern("Person")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let version_id = VersionId::new(i)?;
let node = Node::new(node_id, label, props, version_id);
current.insert_node_direct(node, time::now())?;
}
let historical = HistoricalStorage::new();
let stats = manager.create_checkpoint(LSN(50), ¤t, &historical)?;
assert_eq!(stats.node_count, 10);
assert!(manager.persistence_manager.manifest_path().exists());
assert!(manager.persistence_manager.interner_path().exists());
assert!(
manager
.persistence_manager
.graph_path()
.join("adjacency.idx")
.exists()
);
Ok(())
}
#[test]
fn test_checkpoint_recovery_loads_state() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
for i in 1..=5 {
let props = PropertyMapBuilder::new()
.insert("name", format!("Node{}", i))
.build();
let node_id = NodeId::new(i)?;
let label =
GLOBAL_INTERNER
.intern("Document")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let version_id = VersionId::new(i)?;
let node = Node::new(node_id, label, props, version_id);
current.insert_node_direct(node, time::now())?;
}
let historical = HistoricalStorage::new();
manager.create_checkpoint(LSN(0), ¤t, &historical)?;
}
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let (recovered_current, _recovered_historical, lsn) = manager.recover(&wal)?;
assert_eq!(recovered_current.node_count(), 5);
assert_eq!(lsn, LSN::initial());
for i in 1..=5 {
let node = recovered_current.get_node(NodeId::new(i)?)?;
let name = node.get_property("name").unwrap().as_str().unwrap();
assert_eq!(name, format!("Node{}", i));
}
}
Ok(())
}
#[test]
fn test_checkpoint_recovery_with_wal_replay() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
for i in 1..=3 {
let props = PropertyMapBuilder::new()
.insert("name", format!("Initial{}", i))
.build();
wal.append(WalOperation::CreateNode {
node_id: NodeId::new(i)?,
label: GLOBAL_INTERNER.intern("Person").unwrap(),
properties: props,
valid_from: time::now(),
})?;
}
wal.flush()?;
let checkpoint_lsn = LSN(wal.current_lsn().0.saturating_sub(1));
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
for i in 1..=3 {
let props = PropertyMapBuilder::new()
.insert("name", format!("Initial{}", i))
.build();
let node_id = NodeId::new(i)?;
let label =
GLOBAL_INTERNER
.intern("Person")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let version_id = VersionId::new(i)?;
let node = Node::new(node_id, label, props, version_id);
current.insert_node_direct(node, time::now())?;
}
let historical = HistoricalStorage::new();
manager.create_checkpoint(checkpoint_lsn, ¤t, &historical)?;
}
for i in 4..=5 {
let props = PropertyMapBuilder::new()
.insert("name", format!("WalNode{}", i))
.build();
wal.append(WalOperation::CreateNode {
node_id: NodeId::new(i)?,
label: GLOBAL_INTERNER.intern("Person").unwrap(),
properties: props,
valid_from: time::now(),
})?;
}
wal.flush()?;
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let (recovered_current, _recovered_historical, _lsn) = manager.recover(&wal)?;
assert_eq!(recovered_current.node_count(), 5);
for i in 1..=3 {
let node = recovered_current.get_node(NodeId::new(i)?)?;
let name = node.get_property("name").unwrap().as_str().unwrap();
assert_eq!(name, format!("Initial{}", i));
}
for i in 4..=5 {
let node = recovered_current.get_node(NodeId::new(i)?)?;
let name = node.get_property("name").unwrap().as_str().unwrap();
assert_eq!(name, format!("WalNode{}", i));
}
Ok(())
}
#[test]
fn test_lsn_consistency() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = CheckpointConfig::with_data_dir(temp_dir.path());
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
let historical = HistoricalStorage::new();
manager.create_checkpoint(LSN(42), ¤t, &historical)?;
let persisted_lsn = manager.get_persisted_lsn();
assert_eq!(persisted_lsn, Some(LSN(42)));
Ok(())
}
#[test]
fn test_should_checkpoint_logic() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = CheckpointConfig {
data_dir: temp_dir.path().to_path_buf(),
checkpoint_interval: Duration::from_secs(3600), min_wal_entries: 100,
..Default::default()
};
let mut manager = CheckpointManager::new(config)?;
assert!(manager.should_checkpoint(LSN(1)));
manager.last_checkpoint_time = SystemTime::now();
manager.last_checkpoint_lsn = LSN(50);
assert!(!manager.should_checkpoint(LSN(60)));
assert!(manager.should_checkpoint(LSN(200)));
Ok(())
}
#[test]
fn test_recovery_without_persisted_state() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
for i in 1..=3 {
let props = PropertyMapBuilder::new().insert("value", i as i64).build();
wal.append(WalOperation::CreateNode {
node_id: NodeId::new(i)?,
label: GLOBAL_INTERNER.intern("Test").unwrap(),
properties: props,
valid_from: time::now(),
})?;
}
wal.flush()?;
let (recovered_current, _recovered_historical, _lsn) = manager.recover(&wal)?;
assert_eq!(recovered_current.node_count(), 3);
Ok(())
}
#[test]
fn test_checkpoint_with_compression() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = CheckpointConfig {
data_dir: temp_dir.path().to_path_buf(),
enable_compression: true,
compression_level: 3,
..Default::default()
};
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
for i in 1..=100 {
let props = PropertyMapBuilder::new()
.insert("name", format!("Node{} with some longer text for compression", i))
.insert("description", "This is a longer description that should compress well when repeated across many nodes")
.build();
let node_id = NodeId::new(i)?;
let label = GLOBAL_INTERNER
.intern("Document")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let version_id = VersionId::new(i)?;
let node = Node::new(node_id, label, props, version_id);
current.insert_node_direct(node, time::now())?;
}
let historical = HistoricalStorage::new();
let stats = manager.create_checkpoint(LSN(100), ¤t, &historical)?;
assert_eq!(stats.node_count, 100);
assert!(stats.bytes_written > 0);
Ok(())
}
#[test]
fn test_has_persisted_state() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = CheckpointConfig::with_data_dir(temp_dir.path());
let mut manager = CheckpointManager::new(config)?;
assert!(!manager.has_persisted_state());
let current = CurrentStorage::new();
let historical = HistoricalStorage::new();
manager.create_checkpoint(LSN(1), ¤t, &historical)?;
assert!(manager.has_persisted_state());
Ok(())
}
#[test]
fn test_checkpoint_preserves_properties() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
let props = PropertyMapBuilder::new()
.insert("string_prop", "hello")
.insert("int_prop", 42i64)
.insert("float_prop", 3.15f64)
.insert("bool_prop", true)
.build();
let node_id = NodeId::new(1)?;
let label = GLOBAL_INTERNER
.intern("TestNode")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let version_id = VersionId::new(1)?;
let node = Node::new(node_id, label, props, version_id);
current.insert_node_direct(node, time::now())?;
let historical = HistoricalStorage::new();
manager.create_checkpoint(LSN(1), ¤t, &historical)?;
}
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let (recovered_current, _recovered_historical, _lsn) = manager.recover(&wal)?;
let node = recovered_current.get_node(NodeId::new(1)?)?;
assert_eq!(
node.get_property("string_prop").unwrap().as_str().unwrap(),
"hello"
);
assert_eq!(node.get_property("int_prop").unwrap().as_int().unwrap(), 42);
assert!(
(node.get_property("float_prop").unwrap().as_float().unwrap() - 3.15).abs() < 0.001
);
assert!(node.get_property("bool_prop").unwrap().as_bool().unwrap());
}
Ok(())
}
#[test]
fn test_invalid_compression_level_error() {
let temp_dir = TempDir::new().unwrap();
let config = CheckpointConfig {
data_dir: temp_dir.path().to_path_buf(),
enable_compression: true,
compression_level: 0, ..Default::default()
};
let result = CheckpointManager::new(config);
assert!(result.is_err());
match result {
Err(e) => assert!(e.to_string().contains("Invalid compression level")),
Ok(_) => panic!("Expected error"),
}
let config2 = CheckpointConfig {
data_dir: temp_dir.path().to_path_buf(),
enable_compression: true,
compression_level: 25, ..Default::default()
};
let result2 = CheckpointManager::new(config2);
assert!(result2.is_err());
}
#[test]
fn test_compression_disabled_ignores_level() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = CheckpointConfig {
data_dir: temp_dir.path().to_path_buf(),
enable_compression: false,
compression_level: 0, ..Default::default()
};
let _manager = CheckpointManager::new(config)?;
Ok(())
}
#[test]
fn test_checkpoint_without_compression() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
{
let config = CheckpointConfig {
data_dir: data_dir.clone(),
enable_compression: false,
compression_level: 1, ..Default::default()
};
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
for i in 1..=5 {
let props = PropertyMapBuilder::new()
.insert("name", format!("Node{}", i))
.build();
let node_id = NodeId::new(i)?;
let label =
GLOBAL_INTERNER
.intern("Uncompressed")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let version_id = VersionId::new(i)?;
let node = Node::new(node_id, label, props, version_id);
current.insert_node_direct(node, time::now())?;
}
let historical = HistoricalStorage::new();
let stats = manager.create_checkpoint(LSN(0), ¤t, &historical)?;
assert_eq!(stats.node_count, 5);
}
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let (recovered_current, _recovered_historical, _lsn) = manager.recover(&wal)?;
assert_eq!(recovered_current.node_count(), 5);
}
Ok(())
}
#[test]
fn test_checkpoint_with_edges() -> Result<()> {
use crate::core::graph::Edge;
use crate::core::id::EdgeId;
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
for i in 1..=3 {
let props = PropertyMapBuilder::new()
.insert("name", format!("Person{}", i))
.build();
let node_id = NodeId::new(i)?;
let label =
GLOBAL_INTERNER
.intern("Person")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let version_id = VersionId::new(i)?;
let node = Node::new(node_id, label, props, version_id);
current.insert_node_direct(node, time::now())?;
}
let edge_label =
GLOBAL_INTERNER
.intern("KNOWS")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let edge1 = Edge::new(
EdgeId::new(1)?,
edge_label,
NodeId::new(1)?,
NodeId::new(2)?,
PropertyMapBuilder::new().insert("since", 2020i64).build(),
VersionId::new(4)?,
);
let edge2 = Edge::new(
EdgeId::new(2)?,
edge_label,
NodeId::new(2)?,
NodeId::new(3)?,
PropertyMapBuilder::new().insert("since", 2021i64).build(),
VersionId::new(5)?,
);
current.insert_edge_direct(edge1)?;
current.insert_edge_direct(edge2)?;
let historical = HistoricalStorage::new();
let stats = manager.create_checkpoint(LSN(0), ¤t, &historical)?;
assert_eq!(stats.node_count, 3);
assert_eq!(stats.edge_count, 2);
}
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let (recovered_current, _recovered_historical, _lsn) = manager.recover(&wal)?;
assert_eq!(recovered_current.node_count(), 3);
assert_eq!(recovered_current.edge_count(), 2);
let edge1 = recovered_current.get_edge(EdgeId::new(1)?)?;
assert_eq!(edge1.source.as_u64(), 1);
assert_eq!(edge1.target.as_u64(), 2);
assert_eq!(edge1.get_property("since").unwrap().as_int().unwrap(), 2020);
let edge2 = recovered_current.get_edge(EdgeId::new(2)?)?;
assert_eq!(edge2.source.as_u64(), 2);
assert_eq!(edge2.target.as_u64(), 3);
}
Ok(())
}
#[test]
fn test_checkpoint_lsn_ahead_of_wal_error() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
let historical = HistoricalStorage::new();
manager.create_checkpoint(LSN(1000), ¤t, &historical)?;
}
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let result = manager.recover(&wal);
assert!(result.is_err());
match result {
Err(e) => {
let err_str = e.to_string();
assert!(err_str.contains("Checkpoint LSN"));
assert!(err_str.contains("ahead of WAL"));
}
Ok(_) => panic!("Expected error"),
}
}
Ok(())
}
#[test]
fn test_wal_replay_create_edge() -> Result<()> {
use crate::core::id::EdgeId;
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
for i in 1..=2 {
wal.append(WalOperation::CreateNode {
node_id: NodeId::new(i)?,
label: GLOBAL_INTERNER.intern("Person").unwrap(),
properties: PropertyMapBuilder::new()
.insert("name", format!("Person{}", i))
.build(),
valid_from: time::now(),
})?;
}
wal.append(WalOperation::CreateEdge {
edge_id: EdgeId::new(1)?,
source: NodeId::new(1)?,
target: NodeId::new(2)?,
label: GLOBAL_INTERNER.intern("KNOWS").unwrap(),
properties: PropertyMapBuilder::new().insert("since", 2023i64).build(),
valid_from: time::now(),
})?;
wal.flush()?;
let (recovered_current, recovered_historical, _lsn) = manager.recover(&wal)?;
assert_eq!(recovered_current.node_count(), 2);
assert_eq!(recovered_current.edge_count(), 1);
let edge = recovered_current.get_edge(EdgeId::new(1)?)?;
assert_eq!(edge.source.as_u64(), 1);
assert_eq!(edge.target.as_u64(), 2);
assert_eq!(recovered_historical.get_edge_versions().len(), 1);
Ok(())
}
#[test]
fn test_wal_replay_update_node() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let node_id = NodeId::new(1)?;
wal.append(WalOperation::CreateNode {
node_id,
label: GLOBAL_INTERNER.intern("Person").unwrap(),
properties: PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("age", 30i64)
.build(),
valid_from: time::now(),
})?;
wal.append(WalOperation::UpdateNode {
node_id,
version_id: VersionId::new(2)?,
label: GLOBAL_INTERNER.intern("Person").unwrap(),
properties: PropertyMapBuilder::new()
.insert("name", "Alice")
.insert("age", 31i64)
.build(),
valid_from: time::now(),
})?;
wal.flush()?;
let (recovered_current, recovered_historical, _lsn) = manager.recover(&wal)?;
assert_eq!(recovered_current.node_count(), 1);
let node = recovered_current.get_node(node_id)?;
assert_eq!(node.get_property("age").unwrap().as_int().unwrap(), 31);
assert_eq!(recovered_historical.get_node_versions().len(), 2);
Ok(())
}
#[test]
fn test_wal_replay_update_edge() -> Result<()> {
use crate::core::id::EdgeId;
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
for i in 1..=2 {
wal.append(WalOperation::CreateNode {
node_id: NodeId::new(i)?,
label: GLOBAL_INTERNER.intern("Person").unwrap(),
properties: PropertyMapBuilder::new().build(),
valid_from: time::now(),
})?;
}
let edge_id = EdgeId::new(1)?;
wal.append(WalOperation::CreateEdge {
edge_id,
source: NodeId::new(1)?,
target: NodeId::new(2)?,
label: GLOBAL_INTERNER.intern("KNOWS").unwrap(),
properties: PropertyMapBuilder::new().insert("strength", 5i64).build(),
valid_from: time::now(),
})?;
wal.append(WalOperation::UpdateEdge {
edge_id,
version_id: VersionId::new(4)?,
label: GLOBAL_INTERNER.intern("KNOWS").unwrap(),
properties: PropertyMapBuilder::new().insert("strength", 10i64).build(),
valid_from: time::now(),
})?;
wal.flush()?;
let (recovered_current, recovered_historical, _lsn) = manager.recover(&wal)?;
assert_eq!(recovered_current.edge_count(), 1);
let edge = recovered_current.get_edge(edge_id)?;
assert_eq!(edge.get_property("strength").unwrap().as_int().unwrap(), 10);
assert_eq!(recovered_historical.get_edge_versions().len(), 2);
Ok(())
}
#[test]
fn test_wal_replay_delete_node() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let node_id = NodeId::new(1)?;
wal.append(WalOperation::CreateNode {
node_id,
label: GLOBAL_INTERNER.intern("ToDelete").unwrap(),
properties: PropertyMapBuilder::new().insert("temp", true).build(),
valid_from: time::now(),
})?;
wal.append(WalOperation::DeleteNode {
node_id,
valid_from: time::now(),
})?;
wal.flush()?;
let (recovered_current, recovered_historical, _lsn) = manager.recover(&wal)?;
assert_eq!(recovered_current.node_count(), 0);
assert_eq!(recovered_historical.get_node_versions().len(), 2);
Ok(())
}
#[test]
fn test_wal_replay_delete_edge() -> Result<()> {
use crate::core::id::EdgeId;
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
for i in 1..=2 {
wal.append(WalOperation::CreateNode {
node_id: NodeId::new(i)?,
label: GLOBAL_INTERNER.intern("Person").unwrap(),
properties: PropertyMapBuilder::new().build(),
valid_from: time::now(),
})?;
}
let edge_id = EdgeId::new(1)?;
wal.append(WalOperation::CreateEdge {
edge_id,
source: NodeId::new(1)?,
target: NodeId::new(2)?,
label: GLOBAL_INTERNER.intern("TEMP_EDGE").unwrap(),
properties: PropertyMapBuilder::new().build(),
valid_from: time::now(),
})?;
wal.append(WalOperation::DeleteEdge {
edge_id,
valid_from: time::now(),
})?;
wal.flush()?;
let (recovered_current, recovered_historical, _lsn) = manager.recover(&wal)?;
assert_eq!(recovered_current.edge_count(), 0);
assert_eq!(recovered_current.node_count(), 2);
assert_eq!(recovered_historical.get_edge_versions().len(), 2);
Ok(())
}
#[test]
fn test_wal_replay_checkpoint_marker() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
wal.append(WalOperation::CreateNode {
node_id: NodeId::new(1)?,
label: GLOBAL_INTERNER.intern("Test").unwrap(),
properties: PropertyMapBuilder::new().build(),
valid_from: time::now(),
})?;
wal.append(WalOperation::Checkpoint {
lsn: LSN(1),
timestamp: time::now(),
})?;
wal.append(WalOperation::CreateNode {
node_id: NodeId::new(2)?,
label: GLOBAL_INTERNER.intern("Test").unwrap(),
properties: PropertyMapBuilder::new().build(),
valid_from: time::now(),
})?;
wal.flush()?;
let (recovered_current, _recovered_historical, _lsn) = manager.recover(&wal)?;
assert_eq!(recovered_current.node_count(), 2);
Ok(())
}
#[test]
fn test_checkpoint_with_temporal_node_versions() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
let mut historical = HistoricalStorage::new();
let node_id = NodeId::new(1)?;
let label = GLOBAL_INTERNER
.intern("Document")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let props = PropertyMapBuilder::new()
.insert("title", "Version 2")
.build();
let version_id = VersionId::new(2)?;
let node = Node::new(node_id, label, props, version_id);
current.insert_node_direct(node, time::now())?;
let anchor_props = PropertyMapBuilder::new()
.insert("title", "Version 1")
.build();
let now = time::now();
historical.add_node_version(
node_id,
VersionId::new(1)?,
now,
now,
label,
anchor_props,
false, )?;
let stats = manager.create_checkpoint(LSN(0), ¤t, &historical)?;
assert_eq!(stats.node_count, 1);
assert!(stats.version_count >= 1);
}
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let (recovered_current, recovered_historical, _lsn) = manager.recover(&wal)?;
assert_eq!(recovered_current.node_count(), 1);
assert_eq!(recovered_historical.get_node_versions().len(), 1);
}
Ok(())
}
#[test]
fn test_checkpoint_with_temporal_edge_versions() -> Result<()> {
use crate::core::graph::Edge;
use crate::core::id::EdgeId;
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
let mut historical = HistoricalStorage::new();
let person_label =
GLOBAL_INTERNER
.intern("Person")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
for i in 1..=2 {
let node = Node::new(
NodeId::new(i)?,
person_label,
PropertyMapBuilder::new().build(),
VersionId::new(i)?,
);
current.insert_node_direct(node, time::now())?;
}
let edge_label =
GLOBAL_INTERNER
.intern("KNOWS")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let edge = Edge::new(
EdgeId::new(1)?,
edge_label,
NodeId::new(1)?,
NodeId::new(2)?,
PropertyMapBuilder::new().insert("strength", 10i64).build(),
VersionId::new(4)?,
);
current.insert_edge_direct(edge)?;
let now = time::now();
historical.add_edge_version(
EdgeId::new(1)?,
VersionId::new(3)?,
now,
now,
edge_label,
NodeId::new(1)?,
NodeId::new(2)?,
PropertyMapBuilder::new().insert("strength", 5i64).build(),
false, )?;
let stats = manager.create_checkpoint(LSN(0), ¤t, &historical)?;
assert_eq!(stats.edge_count, 1);
assert_eq!(stats.version_count, 1);
}
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let (recovered_current, recovered_historical, _lsn) = manager.recover(&wal)?;
assert_eq!(recovered_current.edge_count(), 1);
assert_eq!(recovered_historical.get_edge_versions().len(), 1);
}
Ok(())
}
#[test]
fn test_get_persisted_lsn_none() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = CheckpointConfig::with_data_dir(temp_dir.path());
let manager = CheckpointManager::new(config)?;
assert!(manager.get_persisted_lsn().is_none());
Ok(())
}
#[test]
fn test_checkpoint_config_default() {
let config = CheckpointConfig::default();
assert_eq!(config.data_dir, PathBuf::from("data"));
assert_eq!(config.checkpoint_interval, Duration::from_secs(300));
assert_eq!(config.min_wal_entries, 1000);
assert!(config.enable_compression);
assert_eq!(config.compression_level, 3);
}
#[test]
fn test_checkpoint_updates_tracking() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = CheckpointConfig::with_data_dir(temp_dir.path());
let mut manager = CheckpointManager::new(config)?;
assert_eq!(manager.last_checkpoint_lsn, LSN::initial());
let current = CurrentStorage::new();
let historical = HistoricalStorage::new();
manager.create_checkpoint(LSN(42), ¤t, &historical)?;
assert_eq!(manager.last_checkpoint_lsn, LSN(42));
assert!(manager.last_checkpoint_time > UNIX_EPOCH);
Ok(())
}
#[test]
fn test_should_checkpoint_time_threshold() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = CheckpointConfig {
data_dir: temp_dir.path().to_path_buf(),
checkpoint_interval: Duration::from_millis(1), min_wal_entries: 1_000_000, ..Default::default()
};
let mut manager = CheckpointManager::new(config)?;
manager.last_checkpoint_time = SystemTime::now();
manager.last_checkpoint_lsn = LSN(100);
std::thread::sleep(Duration::from_millis(5));
assert!(manager.should_checkpoint(LSN(101)));
Ok(())
}
#[test]
fn test_checkpoint_with_closed_valid_time() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
let mut historical = HistoricalStorage::new();
let node_id = NodeId::new(1)?;
let label =
GLOBAL_INTERNER
.intern("ClosedNode")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let now = time::now();
historical.add_node_version(
node_id,
VersionId::new(1)?,
now, now, label,
PropertyMapBuilder::new().insert("deleted", true).build(),
false, )?;
let stats = manager.create_checkpoint(LSN(0), ¤t, &historical)?;
assert_eq!(stats.version_count, 1);
}
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let (_recovered_current, recovered_historical, _lsn) = manager.recover(&wal)?;
assert_eq!(recovered_historical.get_node_versions().len(), 1);
}
Ok(())
}
#[test]
fn test_recovery_id_generator_initialization() -> Result<()> {
use crate::core::graph::Edge;
use crate::core::id::EdgeId;
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
let label = GLOBAL_INTERNER
.intern("Test")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let node = Node::new(
NodeId::new(100)?,
label,
PropertyMapBuilder::new().build(),
VersionId::new(1)?,
);
current.insert_node_direct(node, time::now())?;
let edge = Edge::new(
EdgeId::new(200)?,
label,
NodeId::new(100)?,
NodeId::new(100)?,
PropertyMapBuilder::new().build(),
VersionId::new(2)?,
);
current.insert_edge_direct(edge)?;
let historical = HistoricalStorage::new();
manager.create_checkpoint(LSN(0), ¤t, &historical)?;
}
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let (recovered_current, _recovered_historical, _lsn) = manager.recover(&wal)?;
let new_node_id =
recovered_current.create_node("NewNode", PropertyMapBuilder::new().build())?;
assert!(new_node_id.as_u64() > 100);
let new_edge_id = recovered_current.create_edge(
NodeId::new(100)?,
new_node_id,
"NEW_EDGE",
PropertyMapBuilder::new().build(),
)?;
assert!(new_edge_id.as_u64() > 200);
}
Ok(())
}
#[test]
fn test_wal_replay_updates_id_generators() -> Result<()> {
use crate::core::id::EdgeId;
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
let label = GLOBAL_INTERNER
.intern("Test")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let node = Node::new(
NodeId::new(1)?,
label,
PropertyMapBuilder::new().build(),
VersionId::new(1)?,
);
current.insert_node_direct(node, time::now())?;
let historical = HistoricalStorage::new();
manager.create_checkpoint(LSN(0), ¤t, &historical)?;
}
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
wal.append(WalOperation::CreateNode {
node_id: NodeId::new(500)?,
label: GLOBAL_INTERNER.intern("HighId").unwrap(),
properties: PropertyMapBuilder::new().build(),
valid_from: time::now(),
})?;
wal.append(WalOperation::CreateEdge {
edge_id: EdgeId::new(600)?,
source: NodeId::new(1)?,
target: NodeId::new(500)?,
label: GLOBAL_INTERNER.intern("HighEdge").unwrap(),
properties: PropertyMapBuilder::new().build(),
valid_from: time::now(),
})?;
wal.flush()?;
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let (recovered_current, _recovered_historical, _lsn) = manager.recover(&wal)?;
let new_node_id =
recovered_current.create_node("NewNode", PropertyMapBuilder::new().build())?;
assert!(new_node_id.as_u64() > 500);
let new_edge_id = recovered_current.create_edge(
NodeId::new(1)?,
new_node_id,
"NEW_EDGE",
PropertyMapBuilder::new().build(),
)?;
assert!(new_edge_id.as_u64() > 600);
}
Ok(())
}
#[test]
fn test_checkpoint_stats_fields() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let config = CheckpointConfig::with_data_dir(temp_dir.path());
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
let historical = HistoricalStorage::new();
let stats = manager.create_checkpoint(LSN(99), ¤t, &historical)?;
assert_eq!(stats.lsn, LSN(99));
assert_eq!(stats.node_count, 0);
assert_eq!(stats.edge_count, 0);
assert_eq!(stats.version_count, 0);
assert!(stats.duration.as_nanos() > 0); assert!(stats.bytes_written > 0);
Ok(())
}
#[test]
fn test_recovery_historical_version_id_tracking() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
let mut historical = HistoricalStorage::new();
let node_id = NodeId::new(1)?;
let label =
GLOBAL_INTERNER
.intern("Versioned")
.map_err(|e| StorageError::WalError {
reason: e.to_string(),
})?;
let node = Node::new(
node_id,
label,
PropertyMapBuilder::new().insert("v", 1i64).build(),
VersionId::new(1)?,
);
current.insert_node_direct(node, time::now())?;
let now = time::now();
historical.add_node_version(
node_id,
VersionId::new(100)?,
now,
now,
label,
PropertyMapBuilder::new().insert("v", 100i64).build(),
false, )?;
manager.create_checkpoint(LSN(0), ¤t, &historical)?;
}
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let (recovered_current, _recovered_historical, _lsn) = manager.recover(&wal)?;
let _new_node_id =
recovered_current.create_node("NewNode", PropertyMapBuilder::new().build())?;
assert_eq!(recovered_current.node_count(), 2);
}
Ok(())
}
#[test]
fn test_persistence_err_conversion() {
use crate::storage::index_persistence::IndexPersistenceError;
use std::path::PathBuf;
let orig_err = IndexPersistenceError::InvalidMagic {
path: PathBuf::from("/test/path"),
expected: [0x12, 0x34, 0x56, 0x78],
got: [0xAB, 0xCD, 0xEF, 0x00],
};
let converted = persistence_err(orig_err);
let err_string = converted.to_string();
assert!(err_string.contains("Invalid magic bytes"));
}
#[test]
fn test_recovery_with_no_cold_storage() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let result = manager.recover_with_cold_storage(&wal, None)?;
assert_eq!(result.current.node_count(), 0);
assert!(result.checkpoint_lsn.is_none());
assert!(result.flushed_lsn.is_none());
assert!(!result.used_cold_storage());
assert!(!result.used_checkpoint());
Ok(())
}
#[test]
fn test_recovery_with_checkpoint_no_cold_storage() -> Result<()> {
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
for i in 1..=100 {
let op = WalOperation::CreateNode {
node_id: NodeId::new(i)?,
label: GLOBAL_INTERNER.intern(format!("Node{}", i)).unwrap(),
properties: PropertyMapBuilder::new().build(),
valid_from: time::now(),
};
wal.append_async(op)?;
}
wal.flush()?;
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
for i in 1..=5 {
let props = PropertyMapBuilder::new()
.insert("name", format!("Node{}", i))
.build();
let node_id = NodeId::new(i)?;
let label = GLOBAL_INTERNER.intern("Person").unwrap();
let version_id = VersionId::new(i)?;
let node = Node::new(node_id, label, props, version_id);
current.insert_node_direct(node, time::now())?;
}
let historical = HistoricalStorage::new();
manager.create_checkpoint(LSN(100), ¤t, &historical)?;
}
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let result = manager.recover_with_cold_storage(&wal, None)?;
assert_eq!(result.current.node_count(), 5);
assert_eq!(result.checkpoint_lsn, Some(LSN(100)));
assert!(result.flushed_lsn.is_none());
assert!(!result.used_cold_storage());
assert!(result.used_checkpoint());
}
Ok(())
}
#[test]
fn test_recovery_loads_cold_storage_first() -> Result<()> {
use crate::storage::redb_cold_storage::{RedbColdStorage, RedbConfig};
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let cold_dir = temp_dir.path().join("cold");
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let cold_storage = Arc::new(RedbColdStorage::new(
cold_dir.join("cold.redb"),
RedbConfig::new(),
)?);
let node = crate::core::version::NodeVersion::new_anchor(
VersionId::new(1)?,
NodeId::new(1)?,
BiTemporalInterval::current(time::now()),
GLOBAL_INTERNER.intern("Test").unwrap(),
PropertyMapBuilder::new().build(),
);
cold_storage.store_batch_with_lsn(&[node], &[], LSN(50))?;
assert_eq!(cold_storage.get_flushed_lsn()?, Some(LSN(50)));
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let result = manager.recover_with_cold_storage(&wal, Some(&cold_storage))?;
assert_eq!(result.flushed_lsn, Some(LSN(50)));
assert!(result.used_cold_storage());
Ok(())
}
#[test]
fn test_recovery_replays_wal_from_flushed_lsn() -> Result<()> {
use crate::storage::redb_cold_storage::{RedbColdStorage, RedbConfig};
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let cold_dir = temp_dir.path().join("cold");
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
for i in 1..=100 {
let op = WalOperation::CreateNode {
node_id: NodeId::new(i)?,
label: GLOBAL_INTERNER.intern(format!("Node{}", i)).unwrap(),
properties: PropertyMapBuilder::new().build(),
valid_from: time::now(),
};
wal.append_async(op)?;
}
wal.flush()?;
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
let historical = HistoricalStorage::new();
manager.create_checkpoint(LSN(50), ¤t, &historical)?;
}
let cold_storage = Arc::new(RedbColdStorage::new(
cold_dir.join("cold.redb"),
RedbConfig::new(),
)?);
let node = crate::core::version::NodeVersion::new_anchor(
VersionId::new(1)?,
NodeId::new(1)?,
BiTemporalInterval::current(time::now()),
GLOBAL_INTERNER.intern("Test").unwrap(),
PropertyMapBuilder::new().build(),
);
cold_storage.store_batch_with_lsn(&[node], &[], LSN(100))?;
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let result = manager.recover_with_cold_storage(&wal, Some(&cold_storage))?;
assert_eq!(result.checkpoint_lsn, Some(LSN(50)));
assert_eq!(result.flushed_lsn, Some(LSN(100)));
assert_eq!(result.effective_lsn, LSN(100));
assert!(result.used_cold_storage());
assert_eq!(result.wal_entries_skipped_from_cold(), 50);
}
Ok(())
}
#[test]
fn test_recovery_with_no_wal_segments() -> Result<()> {
use crate::storage::redb_cold_storage::{RedbColdStorage, RedbConfig};
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let cold_dir = temp_dir.path().join("cold");
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
let cold_storage = Arc::new(RedbColdStorage::new(
cold_dir.join("cold.redb"),
RedbConfig::new(),
)?);
let node = crate::core::version::NodeVersion::new_anchor(
VersionId::new(1)?,
NodeId::new(1)?,
BiTemporalInterval::current(time::now()),
GLOBAL_INTERNER.intern("Test").unwrap(),
PropertyMapBuilder::new().build(),
);
cold_storage.store_batch_with_lsn(&[node], &[], LSN(75))?;
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let result = manager.recover_with_cold_storage(&wal, Some(&cold_storage))?;
assert_eq!(result.wal_entries_replayed, 0);
assert_eq!(result.effective_lsn, LSN(75));
Ok(())
}
#[test]
fn test_recovery_validates_lsn_consistency() -> Result<()> {
use crate::storage::redb_cold_storage::{RedbColdStorage, RedbConfig};
let temp_dir = TempDir::new().unwrap();
let wal_dir = temp_dir.path().join("wal");
let data_dir = temp_dir.path().join("data");
let cold_dir = temp_dir.path().join("cold");
let wal_config = ConcurrentWalSystemConfig::new(&wal_dir);
let wal = ConcurrentWalSystem::new(wal_config)?;
for i in 1..=50 {
let op = WalOperation::CreateNode {
node_id: NodeId::new(i)?,
label: GLOBAL_INTERNER.intern(format!("Node{}", i)).unwrap(),
properties: PropertyMapBuilder::new().build(),
valid_from: time::now(),
};
wal.append_async(op)?;
}
wal.flush()?;
{
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let current = CurrentStorage::new();
let historical = HistoricalStorage::new();
manager.create_checkpoint(LSN(10), ¤t, &historical)?;
}
let cold_storage = Arc::new(RedbColdStorage::new(
cold_dir.join("cold.redb"),
RedbConfig::new(),
)?);
let node = crate::core::version::NodeVersion::new_anchor(
VersionId::new(1)?,
NodeId::new(1)?,
BiTemporalInterval::current(time::now()),
GLOBAL_INTERNER.intern("Test").unwrap(),
PropertyMapBuilder::new().build(),
);
cold_storage.store_batch_with_lsn(&[node], &[], LSN(1000))?;
let config = CheckpointConfig::with_data_dir(&data_dir);
let mut manager = CheckpointManager::new(config)?;
let result = manager.recover_with_cold_storage(&wal, Some(&cold_storage));
assert!(result.is_err());
let err = result.err().expect("Expected error").to_string();
assert!(
err.contains("flushed_lsn") || err.contains("inconsistency"),
"Error should mention LSN inconsistency: {}",
err
);
Ok(())
}
#[test]
fn test_recovery_result_helpers() {
let result = RecoveryResult {
current: CurrentStorage::new(),
historical: HistoricalStorage::new(),
final_lsn: LSN(0),
checkpoint_lsn: None,
flushed_lsn: None,
effective_lsn: LSN(0),
wal_entries_replayed: 0,
};
assert!(!result.used_cold_storage());
assert!(!result.used_checkpoint());
assert_eq!(result.wal_entries_skipped_from_cold(), 0);
let result = RecoveryResult {
current: CurrentStorage::new(),
historical: HistoricalStorage::new(),
final_lsn: LSN(50),
checkpoint_lsn: Some(LSN(50)),
flushed_lsn: None,
effective_lsn: LSN(50),
wal_entries_replayed: 0,
};
assert!(!result.used_cold_storage());
assert!(result.used_checkpoint());
assert_eq!(result.wal_entries_skipped_from_cold(), 0);
let result = RecoveryResult {
current: CurrentStorage::new(),
historical: HistoricalStorage::new(),
final_lsn: LSN(100),
checkpoint_lsn: Some(LSN(50)),
flushed_lsn: Some(LSN(100)),
effective_lsn: LSN(100),
wal_entries_replayed: 0,
};
assert!(result.used_cold_storage());
assert!(result.used_checkpoint());
assert_eq!(result.wal_entries_skipped_from_cold(), 50);
let result = RecoveryResult {
current: CurrentStorage::new(),
historical: HistoricalStorage::new(),
final_lsn: LSN(75),
checkpoint_lsn: None,
flushed_lsn: Some(LSN(75)),
effective_lsn: LSN(75),
wal_entries_replayed: 0,
};
assert!(result.used_cold_storage());
assert!(!result.used_checkpoint());
assert_eq!(result.wal_entries_skipped_from_cold(), 75);
}
}