use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
#[cfg(feature = "bench-profile")]
use std::time::Instant;
use parking_lot::{Mutex, RwLock};
use crate::cache::manager::CacheManager;
use crate::constants::*;
use crate::core::pager::{create_pager, is_valid_page_size, open_pager, pages_to_store, FilePager};
use crate::core::snapshot::reader::SnapshotData;
use crate::core::wal::buffer::WalBuffer;
use crate::error::{KiteError, Result};
use crate::mvcc::{GcConfig, MvccManager};
use crate::replication::primary::PrimaryReplication;
use crate::replication::replica::ReplicaReplication;
use crate::replication::types::ReplicationRole;
use crate::types::*;
use crate::util::compression::CompressionOptions;
use crate::util::mmap::map_file;
use crate::vector::store::{create_vector_store, vector_store_delete, vector_store_insert};
use crate::vector::types::VectorStoreConfig;
use super::recovery::{committed_transactions, replay_wal_record, scan_wal_records};
use super::vector::{materialize_vector_store_from_lazy_entries, vector_store_state_from_snapshot};
use super::{CheckpointStatus, SingleFileDB};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SyncMode {
#[default]
Full,
Normal,
Off,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum SnapshotParseMode {
#[default]
Strict,
Salvage,
}
#[derive(Debug, Clone)]
pub struct SingleFileOpenOptions {
pub read_only: bool,
pub create_if_missing: bool,
pub mvcc: bool,
pub mvcc_gc_interval_ms: Option<u64>,
pub mvcc_retention_ms: Option<u64>,
pub mvcc_max_chain_depth: Option<usize>,
pub page_size: usize,
pub wal_size: usize,
pub auto_checkpoint: bool,
pub checkpoint_threshold: f64,
pub background_checkpoint: bool,
pub cache: Option<CacheOptions>,
pub checkpoint_compression: Option<CompressionOptions>,
pub sync_mode: SyncMode,
pub group_commit_enabled: bool,
pub group_commit_window_ms: u64,
pub snapshot_parse_mode: SnapshotParseMode,
pub replication_role: ReplicationRole,
pub replication_sidecar_path: Option<PathBuf>,
pub replication_source_db_path: Option<PathBuf>,
pub replication_source_sidecar_path: Option<PathBuf>,
pub replication_fail_after_append_for_testing: Option<u64>,
pub replication_segment_max_bytes: Option<u64>,
pub replication_retention_min_entries: Option<u64>,
pub replication_retention_min_ms: Option<u64>,
}
impl Default for SingleFileOpenOptions {
fn default() -> Self {
Self {
read_only: false,
create_if_missing: true,
mvcc: false,
mvcc_gc_interval_ms: None,
mvcc_retention_ms: None,
mvcc_max_chain_depth: None,
page_size: DEFAULT_PAGE_SIZE,
wal_size: WAL_DEFAULT_SIZE,
auto_checkpoint: true,
checkpoint_threshold: 0.5,
background_checkpoint: true,
cache: None,
checkpoint_compression: Some(CompressionOptions {
enabled: true,
..Default::default()
}),
sync_mode: SyncMode::Full,
group_commit_enabled: false,
group_commit_window_ms: 2,
snapshot_parse_mode: SnapshotParseMode::Strict,
replication_role: ReplicationRole::Disabled,
replication_sidecar_path: None,
replication_source_db_path: None,
replication_source_sidecar_path: None,
replication_fail_after_append_for_testing: None,
replication_segment_max_bytes: None,
replication_retention_min_entries: None,
replication_retention_min_ms: None,
}
}
}
impl SingleFileOpenOptions {
pub fn new() -> Self {
Self::default()
}
pub fn read_only(mut self, value: bool) -> Self {
self.read_only = value;
self
}
pub fn create_if_missing(mut self, value: bool) -> Self {
self.create_if_missing = value;
self
}
pub fn mvcc(mut self, value: bool) -> Self {
self.mvcc = value;
self
}
pub fn mvcc_gc_interval_ms(mut self, value: u64) -> Self {
self.mvcc_gc_interval_ms = Some(value);
self
}
pub fn mvcc_retention_ms(mut self, value: u64) -> Self {
self.mvcc_retention_ms = Some(value);
self
}
pub fn mvcc_max_chain_depth(mut self, value: usize) -> Self {
self.mvcc_max_chain_depth = Some(value);
self
}
pub fn page_size(mut self, value: usize) -> Self {
self.page_size = value;
self
}
pub fn wal_size(mut self, value: usize) -> Self {
self.wal_size = value;
self
}
pub fn auto_checkpoint(mut self, value: bool) -> Self {
self.auto_checkpoint = value;
self
}
pub fn checkpoint_threshold(mut self, value: f64) -> Self {
self.checkpoint_threshold = value.clamp(0.0, 1.0);
self
}
pub fn background_checkpoint(mut self, value: bool) -> Self {
self.background_checkpoint = value;
self
}
pub fn cache(mut self, options: Option<CacheOptions>) -> Self {
self.cache = options;
self
}
pub fn checkpoint_compression(mut self, options: Option<CompressionOptions>) -> Self {
self.checkpoint_compression = options;
self
}
pub fn disable_checkpoint_compression(mut self) -> Self {
self.checkpoint_compression = None;
self
}
pub fn enable_cache(mut self) -> Self {
self.cache = Some(CacheOptions {
enabled: true,
..Default::default()
});
self
}
pub fn sync_mode(mut self, mode: SyncMode) -> Self {
self.sync_mode = mode;
self
}
pub fn group_commit_enabled(mut self, value: bool) -> Self {
self.group_commit_enabled = value;
self
}
pub fn group_commit_window_ms(mut self, value: u64) -> Self {
self.group_commit_window_ms = value;
self
}
pub fn sync_normal(mut self) -> Self {
self.sync_mode = SyncMode::Normal;
self
}
pub fn sync_off(mut self) -> Self {
self.sync_mode = SyncMode::Off;
self
}
pub fn snapshot_parse_mode(mut self, mode: SnapshotParseMode) -> Self {
self.snapshot_parse_mode = mode;
self
}
pub fn replication_role(mut self, role: ReplicationRole) -> Self {
self.replication_role = role;
self
}
pub fn replication_sidecar_path<P: AsRef<Path>>(mut self, path: P) -> Self {
self.replication_sidecar_path = Some(path.as_ref().to_path_buf());
self
}
pub fn replication_source_db_path<P: AsRef<Path>>(mut self, path: P) -> Self {
self.replication_source_db_path = Some(path.as_ref().to_path_buf());
self
}
pub fn replication_source_sidecar_path<P: AsRef<Path>>(mut self, path: P) -> Self {
self.replication_source_sidecar_path = Some(path.as_ref().to_path_buf());
self
}
pub fn replication_fail_after_append_for_testing(mut self, value: u64) -> Self {
self.replication_fail_after_append_for_testing = Some(value);
self
}
pub fn replication_segment_max_bytes(mut self, value: u64) -> Self {
self.replication_segment_max_bytes = Some(value);
self
}
pub fn replication_retention_min_entries(mut self, value: u64) -> Self {
self.replication_retention_min_entries = Some(value);
self
}
pub fn replication_retention_min_ms(mut self, value: u64) -> Self {
self.replication_retention_min_ms = Some(value);
self
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct SingleFileCloseOptions {
pub checkpoint_if_wal_usage_at_least: Option<f64>,
}
impl SingleFileCloseOptions {
pub fn new() -> Self {
Self::default()
}
pub fn checkpoint_if_wal_usage_at_least(mut self, threshold: f64) -> Self {
self.checkpoint_if_wal_usage_at_least = Some(threshold);
self
}
}
struct SnapshotLoadState<'a> {
header: &'a DbHeaderV1,
pager: &'a mut FilePager,
options: &'a SingleFileOpenOptions,
label_names: &'a mut HashMap<String, LabelId>,
label_ids: &'a mut HashMap<LabelId, String>,
etype_names: &'a mut HashMap<String, ETypeId>,
etype_ids: &'a mut HashMap<ETypeId, String>,
propkey_names: &'a mut HashMap<String, PropKeyId>,
propkey_ids: &'a mut HashMap<PropKeyId, String>,
next_node_id: &'a mut NodeId,
next_label_id: &'a mut LabelId,
next_etype_id: &'a mut ETypeId,
next_propkey_id: &'a mut PropKeyId,
#[cfg(feature = "bench-profile")]
profile: &'a mut OpenProfileCounters,
#[cfg(feature = "bench-profile")]
profile_enabled: bool,
crc_chunk_size: Option<usize>,
#[cfg(feature = "bench-profile")]
snapshot_profile: Option<&'a mut SnapshotOpenProfile>,
}
#[cfg(feature = "bench-profile")]
#[derive(Debug, Default)]
struct OpenProfileCounters {
snapshot_parse_ns: u64,
snapshot_crc_ns: u64,
snapshot_decode_ns: u64,
schema_hydrate_ns: u64,
wal_scan_ns: u64,
wal_replay_ns: u64,
vector_init_ns: u64,
}
#[cfg(feature = "bench-profile")]
fn elapsed_ns(started: Instant) -> u64 {
started.elapsed().as_nanos().min(u128::from(u64::MAX)) as u64
}
#[cfg(feature = "bench-profile")]
#[derive(Debug, Clone, Default)]
struct SnapshotOpenProfile {
parse_total_ns: u64,
snapshot_crc_ns: u64,
snapshot_crc_bytes: usize,
snapshot_crc_chunk_size: usize,
snapshot_crc_sections: Vec<crate::core::snapshot::reader::SnapshotCrcSectionProfile>,
}
fn snapshot_crc_chunk_size_from_env() -> Option<usize> {
std::env::var("KITEDB_SNAPSHOT_CRC_CHUNK_BYTES")
.ok()
.and_then(|raw| raw.parse::<usize>().ok())
.filter(|value| *value > 0)
}
#[cfg(feature = "bench-profile")]
fn open_profile_enabled() -> bool {
if std::env::var_os("KITEDB_BENCH_PROFILE_OPEN").is_none() {
return false;
}
std::env::var("KITEDB_BENCH_PROFILE_OPEN")
.map(|value| {
let value = value.to_lowercase();
value == "1" || value == "true" || value == "yes"
})
.unwrap_or(true)
}
#[cfg(feature = "bench-profile")]
fn log_open_profile(path: &Path, profile: &SnapshotOpenProfile) {
if profile.parse_total_ns == 0 {
return;
}
let snapshot_decode_ns = profile
.parse_total_ns
.saturating_sub(profile.snapshot_crc_ns);
println!(
"[open_profile] path={} snapshot_crc_ns={} snapshot_decode_ns={} snapshot_crc_bytes={} snapshot_crc_chunk_bytes={}",
path.display(),
profile.snapshot_crc_ns,
snapshot_decode_ns,
profile.snapshot_crc_bytes,
profile.snapshot_crc_chunk_size,
);
for section in &profile.snapshot_crc_sections {
let section_name = section
.section_id
.map(|id| format!("{id:?}"))
.unwrap_or_else(|| "__non_section__".to_string());
println!(
"[open_profile] path={} snapshot_crc_section={} bytes={} ns={}",
path.display(),
section_name,
section.bytes,
section.crc_ns
);
}
}
fn load_snapshot_and_schema(state: &mut SnapshotLoadState<'_>) -> Result<Option<SnapshotData>> {
if state.header.snapshot_page_count == 0 {
return Ok(None);
}
let snapshot_offset = (state.header.snapshot_start_page * state.header.page_size as u64) as usize;
let mut parse_options = crate::core::snapshot::reader::ParseSnapshotOptions::default();
if matches!(
state.options.snapshot_parse_mode,
SnapshotParseMode::Salvage
) {
parse_options.skip_crc_validation = true;
}
parse_options.crc_chunk_size = state.crc_chunk_size;
#[cfg(feature = "bench-profile")]
{
let mmap = std::sync::Arc::new({
map_file(state.pager.file())?
});
let parse_started = Instant::now();
let _ = SnapshotData::parse_at_offset(mmap.clone(), snapshot_offset, &parse_options);
let parse_total_ns = elapsed_ns(parse_started);
state.profile.snapshot_parse_ns = state
.profile
.snapshot_parse_ns
.saturating_add(parse_total_ns);
if state.profile_enabled && !parse_options.skip_crc_validation {
let mut decode_options = parse_options.clone();
decode_options.skip_crc_validation = true;
let decode_started = Instant::now();
if SnapshotData::parse_at_offset(mmap, snapshot_offset, &decode_options).is_ok() {
let decode_ns = elapsed_ns(decode_started);
state.profile.snapshot_decode_ns =
state.profile.snapshot_decode_ns.saturating_add(decode_ns);
state.profile.snapshot_crc_ns = state
.profile
.snapshot_crc_ns
.saturating_add(parse_total_ns.saturating_sub(decode_ns));
} else {
state.profile.snapshot_decode_ns = state
.profile
.snapshot_decode_ns
.saturating_add(parse_total_ns);
}
} else {
state.profile.snapshot_decode_ns = state
.profile
.snapshot_decode_ns
.saturating_add(parse_total_ns);
}
}
#[cfg(feature = "bench-profile")]
let profile_sink = if state.snapshot_profile.is_some() && !parse_options.skip_crc_validation {
Some(Arc::new(std::sync::Mutex::new(None)))
} else {
None
};
#[cfg(feature = "bench-profile")]
{
parse_options.crc_profile_sink = profile_sink.clone();
}
#[cfg(feature = "bench-profile")]
let parse_start = Instant::now();
let parse_result = SnapshotData::parse_at_offset(
Arc::new({
map_file(state.pager.file())?
}),
snapshot_offset,
&parse_options,
);
#[cfg(feature = "bench-profile")]
let parse_total_ns = parse_start.elapsed().as_nanos() as u64;
match parse_result {
Ok(snap) => {
#[cfg(feature = "bench-profile")]
let schema_started = Instant::now();
#[cfg(feature = "bench-profile")]
{
if let Some(profile) = state.snapshot_profile.as_deref_mut() {
profile.parse_total_ns = parse_total_ns;
if let Some(sink) = profile_sink {
if let Ok(mut guard) = sink.lock() {
if let Some(crc_profile) = guard.take() {
profile.snapshot_crc_ns = crc_profile.total_ns;
profile.snapshot_crc_bytes = crc_profile.total_bytes;
profile.snapshot_crc_chunk_size = crc_profile.chunk_size;
profile.snapshot_crc_sections = crc_profile.sections;
}
}
}
}
}
for i in 1..=snap.header.num_labels as u32 {
if let Some(name) = snap.label_name(i) {
state.label_names.insert(name.to_string(), i);
state.label_ids.insert(i, name.to_string());
}
}
for i in 1..=snap.header.num_etypes as u32 {
if let Some(name) = snap.etype_name(i) {
state.etype_names.insert(name.to_string(), i);
state.etype_ids.insert(i, name.to_string());
}
}
for i in 1..=snap.header.num_propkeys as u32 {
if let Some(name) = snap.propkey_name(i) {
state.propkey_names.insert(name.to_string(), i);
state.propkey_ids.insert(i, name.to_string());
}
}
*state.next_node_id = snap.header.max_node_id + 1;
*state.next_label_id = snap.header.num_labels as u32 + 1;
*state.next_etype_id = snap.header.num_etypes as u32 + 1;
*state.next_propkey_id = snap.header.num_propkeys as u32 + 1;
#[cfg(feature = "bench-profile")]
{
state.profile.schema_hydrate_ns = state
.profile
.schema_hydrate_ns
.saturating_add(elapsed_ns(schema_started));
}
Ok(Some(snap))
}
Err(e) => match state.options.snapshot_parse_mode {
SnapshotParseMode::Strict => Err(e),
SnapshotParseMode::Salvage => {
eprintln!("Warning: Failed to parse snapshot: {e}");
Ok(None)
}
},
}
}
fn init_mvcc_from_wal(
options: &SingleFileOpenOptions,
next_tx_id: TxId,
next_commit_ts: u64,
committed_in_order: &[(TxId, Vec<&crate::core::wal::record::ParsedWalRecord>)],
delta: &DeltaState,
) -> Option<std::sync::Arc<MvccManager>> {
if !options.mvcc {
return None;
}
let mut gc_config = GcConfig::default();
if let Some(v) = options.mvcc_gc_interval_ms {
gc_config.interval_ms = v;
}
if let Some(v) = options.mvcc_retention_ms {
gc_config.retention_ms = v;
}
if let Some(v) = options.mvcc_max_chain_depth {
gc_config.max_chain_depth = v;
}
let mvcc = std::sync::Arc::new(MvccManager::new(next_tx_id, next_commit_ts, gc_config));
if !committed_in_order.is_empty() {
use crate::core::wal::record::{
parse_add_edge_payload, parse_add_edge_props_payload, parse_add_edges_batch_payload,
parse_add_edges_props_batch_payload, parse_add_node_label_payload, parse_create_node_payload,
parse_create_nodes_batch_payload, parse_del_edge_prop_payload, parse_del_node_prop_payload,
parse_delete_edge_payload, parse_delete_node_payload, parse_remove_node_label_payload,
parse_set_edge_prop_payload, parse_set_edge_props_payload, parse_set_node_prop_payload,
};
let mut commit_ts: u64 = 1;
for (txid, records) in committed_in_order {
for record in records {
match record.record_type {
WalRecordType::CreateNode => {
if let Some(data) = parse_create_node_payload(&record.payload) {
if let Some(node_delta) = delta.created_nodes.get(&data.node_id) {
let mut vc = mvcc.version_chain.lock();
vc.append_node_version(
data.node_id,
NodeVersionData {
node_id: data.node_id,
delta: node_delta.for_version(),
},
*txid,
commit_ts,
);
}
}
}
WalRecordType::CreateNodesBatch => {
if let Some(nodes) = parse_create_nodes_batch_payload(&record.payload) {
for data in nodes {
if let Some(node_delta) = delta.created_nodes.get(&data.node_id) {
let mut vc = mvcc.version_chain.lock();
vc.append_node_version(
data.node_id,
NodeVersionData {
node_id: data.node_id,
delta: node_delta.for_version(),
},
*txid,
commit_ts,
);
}
}
}
}
WalRecordType::DeleteNode => {
if let Some(data) = parse_delete_node_payload(&record.payload) {
let mut vc = mvcc.version_chain.lock();
vc.delete_node_version(data.node_id, *txid, commit_ts);
}
}
WalRecordType::AddEdge => {
if let Some(data) = parse_add_edge_payload(&record.payload) {
let mut vc = mvcc.version_chain.lock();
vc.append_edge_version(data.src, data.etype, data.dst, true, *txid, commit_ts);
}
}
WalRecordType::AddEdgesBatch => {
if let Some(edges) = parse_add_edges_batch_payload(&record.payload) {
let mut vc = mvcc.version_chain.lock();
for data in edges {
vc.append_edge_version(data.src, data.etype, data.dst, true, *txid, commit_ts);
}
}
}
WalRecordType::AddEdgeProps => {
if let Some(data) = parse_add_edge_props_payload(&record.payload) {
let mut vc = mvcc.version_chain.lock();
vc.append_edge_version(data.src, data.etype, data.dst, true, *txid, commit_ts);
for (key_id, value) in data.props {
vc.append_edge_prop_version(
data.src,
data.etype,
data.dst,
key_id,
Some(std::sync::Arc::new(value)),
*txid,
commit_ts,
);
}
}
}
WalRecordType::AddEdgesPropsBatch => {
if let Some(edges) = parse_add_edges_props_batch_payload(&record.payload) {
let mut vc = mvcc.version_chain.lock();
for data in edges {
vc.append_edge_version(data.src, data.etype, data.dst, true, *txid, commit_ts);
for (key_id, value) in data.props {
vc.append_edge_prop_version(
data.src,
data.etype,
data.dst,
key_id,
Some(std::sync::Arc::new(value)),
*txid,
commit_ts,
);
}
}
}
}
WalRecordType::DeleteEdge => {
if let Some(data) = parse_delete_edge_payload(&record.payload) {
let mut vc = mvcc.version_chain.lock();
vc.append_edge_version(data.src, data.etype, data.dst, false, *txid, commit_ts);
}
}
WalRecordType::SetNodeProp => {
if let Some(data) = parse_set_node_prop_payload(&record.payload) {
let mut vc = mvcc.version_chain.lock();
vc.append_node_prop_version(
data.node_id,
data.key_id,
Some(std::sync::Arc::new(data.value)),
*txid,
commit_ts,
);
}
}
WalRecordType::DelNodeProp => {
if let Some(data) = parse_del_node_prop_payload(&record.payload) {
let mut vc = mvcc.version_chain.lock();
vc.append_node_prop_version(data.node_id, data.key_id, None, *txid, commit_ts);
}
}
WalRecordType::SetEdgeProp => {
if let Some(data) = parse_set_edge_prop_payload(&record.payload) {
let mut vc = mvcc.version_chain.lock();
vc.append_edge_prop_version(
data.src,
data.etype,
data.dst,
data.key_id,
Some(std::sync::Arc::new(data.value)),
*txid,
commit_ts,
);
}
}
WalRecordType::SetEdgeProps => {
if let Some(data) = parse_set_edge_props_payload(&record.payload) {
let mut vc = mvcc.version_chain.lock();
for (key_id, value) in data.props {
vc.append_edge_prop_version(
data.src,
data.etype,
data.dst,
key_id,
Some(std::sync::Arc::new(value)),
*txid,
commit_ts,
);
}
}
}
WalRecordType::DelEdgeProp => {
if let Some(data) = parse_del_edge_prop_payload(&record.payload) {
let mut vc = mvcc.version_chain.lock();
vc.append_edge_prop_version(
data.src,
data.etype,
data.dst,
data.key_id,
None,
*txid,
commit_ts,
);
}
}
WalRecordType::AddNodeLabel => {
if let Some(data) = parse_add_node_label_payload(&record.payload) {
let mut vc = mvcc.version_chain.lock();
vc.append_node_label_version(
data.node_id,
data.label_id,
Some(true),
*txid,
commit_ts,
);
}
}
WalRecordType::RemoveNodeLabel => {
if let Some(data) = parse_remove_node_label_payload(&record.payload) {
let mut vc = mvcc.version_chain.lock();
vc.append_node_label_version(data.node_id, data.label_id, None, *txid, commit_ts);
}
}
_ => {}
}
}
commit_ts += 1;
}
}
mvcc.start();
Some(mvcc)
}
pub fn open_single_file<P: AsRef<Path>>(
path: P,
options: SingleFileOpenOptions,
) -> Result<SingleFileDB> {
let path = path.as_ref();
#[cfg(feature = "bench-profile")]
let open_started = Instant::now();
#[cfg(feature = "bench-profile")]
let mut open_profile = OpenProfileCounters::default();
#[cfg(feature = "bench-profile")]
let profile_enabled = open_profile_enabled();
if !is_valid_page_size(options.page_size) {
return Err(KiteError::Internal(format!(
"Invalid page size: {}. Must be power of 2 between 4KB and 64KB",
options.page_size
)));
}
let file_exists = path.exists();
if !file_exists && !options.create_if_missing {
return Err(KiteError::InvalidPath(format!(
"Database does not exist at {}",
path.display()
)));
}
if !file_exists && options.read_only {
return Err(KiteError::ReadOnly);
}
let (mut pager, mut header, is_new) = if file_exists {
let mut pager = open_pager(path, options.page_size)?;
let header_data = pager.read_page(0)?;
let header = DbHeaderV1::parse(&header_data)?;
let expected_wal_pages = pages_to_store(options.wal_size, header.page_size as usize) as u64;
if header.wal_page_count != expected_wal_pages {
return Err(KiteError::InvalidSnapshot(format!(
"WAL size mismatch: header has {} pages, options require {} pages",
header.wal_page_count, expected_wal_pages
)));
}
(pager, header, false)
} else {
let mut pager = create_pager(path, options.page_size)?;
let wal_page_count = pages_to_store(options.wal_size, options.page_size) as u64;
let header = DbHeaderV1::new(options.page_size as u32, wal_page_count);
let header_bytes = header.serialize_to_page();
pager.write_page(0, &header_bytes)?;
pager.allocate_pages(wal_page_count as u32)?;
pager.sync()?;
(pager, header, true)
};
let mut wal_buffer = WalBuffer::from_header(&header);
if header.checkpoint_in_progress != 0 {
wal_buffer.recover_incomplete_checkpoint(&mut pager)?;
wal_buffer.flush(&mut pager)?;
header.active_wal_region = 0;
header.checkpoint_in_progress = 0;
header.wal_head = wal_buffer.head();
header.wal_tail = wal_buffer.tail();
header.wal_primary_head = wal_buffer.primary_head();
header.wal_secondary_head = wal_buffer.secondary_head();
header.change_counter += 1;
let header_bytes = header.serialize_to_page();
pager.write_page(0, &header_bytes)?;
pager.sync()?;
}
let mut next_node_id = INITIAL_NODE_ID;
let mut next_label_id = INITIAL_LABEL_ID;
let mut next_etype_id = INITIAL_ETYPE_ID;
let mut next_propkey_id = INITIAL_PROPKEY_ID;
let next_tx_id = header.next_tx_id;
if header.max_node_id > 0 {
next_node_id = header.max_node_id + 1;
}
let mut delta = DeltaState::new();
let mut next_commit_ts: u64 = 1;
let mut committed_in_order: Vec<(TxId, Vec<&crate::core::wal::record::ParsedWalRecord>)> =
Vec::new();
let mut label_names: HashMap<String, LabelId> = HashMap::new();
let mut label_ids: HashMap<LabelId, String> = HashMap::new();
let mut etype_names: HashMap<String, ETypeId> = HashMap::new();
let mut etype_ids: HashMap<ETypeId, String> = HashMap::new();
let mut propkey_names: HashMap<String, PropKeyId> = HashMap::new();
let mut propkey_ids: HashMap<PropKeyId, String> = HashMap::new();
let crc_chunk_size = snapshot_crc_chunk_size_from_env();
#[cfg(feature = "bench-profile")]
let mut snapshot_profile = if open_profile_enabled() {
Some(SnapshotOpenProfile::default())
} else {
None
};
let mut snapshot_state = SnapshotLoadState {
header: &header,
pager: &mut pager,
options: &options,
label_names: &mut label_names,
label_ids: &mut label_ids,
etype_names: &mut etype_names,
etype_ids: &mut etype_ids,
propkey_names: &mut propkey_names,
propkey_ids: &mut propkey_ids,
next_node_id: &mut next_node_id,
next_label_id: &mut next_label_id,
next_etype_id: &mut next_etype_id,
next_propkey_id: &mut next_propkey_id,
#[cfg(feature = "bench-profile")]
profile: &mut open_profile,
#[cfg(feature = "bench-profile")]
profile_enabled,
crc_chunk_size,
#[cfg(feature = "bench-profile")]
snapshot_profile: snapshot_profile.as_mut(),
};
let snapshot = load_snapshot_and_schema(&mut snapshot_state)?;
#[cfg(feature = "bench-profile")]
if let Some(profile) = snapshot_profile.as_ref() {
log_open_profile(path, profile);
}
let mut _wal_records_storage: Option<Vec<crate::core::wal::record::ParsedWalRecord>>;
if !is_new && header.wal_head > 0 {
#[cfg(feature = "bench-profile")]
let wal_scan_started = Instant::now();
_wal_records_storage = Some(scan_wal_records(&mut pager, &header)?);
#[cfg(feature = "bench-profile")]
{
open_profile.wal_scan_ns = open_profile
.wal_scan_ns
.saturating_add(elapsed_ns(wal_scan_started));
}
if let Some(ref wal_records) = _wal_records_storage {
committed_in_order = committed_transactions(wal_records);
#[cfg(feature = "bench-profile")]
let wal_replay_started = Instant::now();
for (_txid, records) in &committed_in_order {
for record in records {
replay_wal_record(
record,
snapshot.as_ref(),
&mut delta,
&mut next_node_id,
&mut next_label_id,
&mut next_etype_id,
&mut next_propkey_id,
&mut label_names,
&mut label_ids,
&mut etype_names,
&mut etype_ids,
&mut propkey_names,
&mut propkey_ids,
);
}
next_commit_ts += 1;
}
#[cfg(feature = "bench-profile")]
{
open_profile.wal_replay_ns = open_profile
.wal_replay_ns
.saturating_add(elapsed_ns(wal_replay_started));
}
}
} else {
_wal_records_storage = None;
}
#[cfg(feature = "bench-profile")]
let vector_init_started = Instant::now();
let (mut vector_stores, mut vector_store_lazy_entries) = if let Some(ref snapshot) = snapshot {
if snapshot
.header
.flags
.contains(SnapshotFlags::HAS_VECTOR_STORES)
|| snapshot.header.flags.contains(SnapshotFlags::HAS_VECTORS)
{
vector_store_state_from_snapshot(snapshot)?
} else {
(HashMap::new(), HashMap::new())
}
} else {
(HashMap::new(), HashMap::new())
};
for ((node_id, prop_key_id), operation) in delta.pending_vectors.drain() {
if let Some(ref snapshot) = snapshot {
materialize_vector_store_from_lazy_entries(
snapshot,
&mut vector_stores,
&mut vector_store_lazy_entries,
prop_key_id,
)?;
}
match operation {
Some(vector) => {
let store = vector_stores.entry(prop_key_id).or_insert_with(|| {
let config = VectorStoreConfig::new(vector.len());
create_vector_store(config)
});
vector_store_insert(store, node_id, vector.as_ref()).map_err(|e| {
KiteError::InvalidWal(format!(
"Failed to apply vector insert during WAL replay for node {node_id} (prop {prop_key_id}): {e}"
))
})?;
}
None => {
if let Some(store) = vector_stores.get_mut(&prop_key_id) {
vector_store_delete(store, node_id);
}
}
}
}
#[cfg(feature = "bench-profile")]
{
open_profile.vector_init_ns = open_profile
.vector_init_ns
.saturating_add(elapsed_ns(vector_init_started));
}
let cache = options.cache.clone().map(CacheManager::new);
let mvcc = init_mvcc_from_wal(
&options,
next_tx_id,
next_commit_ts,
&committed_in_order,
&delta,
);
let (primary_replication, replica_replication) = match options.replication_role {
ReplicationRole::Disabled => (None, None),
ReplicationRole::Primary => (
Some(PrimaryReplication::open(
path,
options.replication_sidecar_path.clone(),
options.replication_segment_max_bytes,
options.replication_retention_min_entries,
options.replication_retention_min_ms,
options.sync_mode,
options.replication_fail_after_append_for_testing,
)?),
None,
),
ReplicationRole::Replica => (
None,
Some(ReplicaReplication::open(
path,
options.replication_sidecar_path.clone(),
options.replication_source_db_path.clone(),
options.replication_source_sidecar_path.clone(),
)?),
),
};
#[cfg(feature = "bench-profile")]
{
if profile_enabled {
let total_ns = elapsed_ns(open_started);
let wal_records = _wal_records_storage.as_ref().map(|r| r.len()).unwrap_or(0);
eprintln!(
"[bench-profile][open] path={} total_ns={} snapshot_parse_ns={} snapshot_crc_ns={} snapshot_decode_ns={} schema_hydrate_ns={} wal_scan_ns={} wal_replay_ns={} vector_init_ns={} snapshot_loaded={} wal_records={} wal_txs={} vector_stores={} vector_lazy_entries={}",
path.display(),
total_ns,
open_profile.snapshot_parse_ns,
open_profile.snapshot_crc_ns,
open_profile.snapshot_decode_ns,
open_profile.schema_hydrate_ns,
open_profile.wal_scan_ns,
open_profile.wal_replay_ns,
open_profile.vector_init_ns,
usize::from(snapshot.is_some()),
wal_records,
committed_in_order.len(),
vector_stores.len(),
vector_store_lazy_entries.len(),
);
}
}
Ok(SingleFileDB {
path: path.to_path_buf(),
read_only: options.read_only,
pager: Mutex::new(pager),
header: RwLock::new(header),
wal_buffer: Mutex::new(wal_buffer),
snapshot: RwLock::new(snapshot),
delta: RwLock::new(delta),
next_node_id: AtomicU64::new(next_node_id),
next_label_id: AtomicU32::new(next_label_id),
next_etype_id: AtomicU32::new(next_etype_id),
next_propkey_id: AtomicU32::new(next_propkey_id),
next_tx_id: AtomicU64::new(next_tx_id),
current_tx: Mutex::new(HashMap::new()),
active_writers: AtomicUsize::new(0),
commit_lock: Mutex::new(()),
group_commit_state: Mutex::new(super::GroupCommitState::default()),
group_commit_cv: parking_lot::Condvar::new(),
mvcc,
label_names: RwLock::new(label_names),
label_ids: RwLock::new(label_ids),
etype_names: RwLock::new(etype_names),
etype_ids: RwLock::new(etype_ids),
propkey_names: RwLock::new(propkey_names),
propkey_ids: RwLock::new(propkey_ids),
auto_checkpoint: options.auto_checkpoint,
checkpoint_threshold: options.checkpoint_threshold,
background_checkpoint: options.background_checkpoint,
checkpoint_status: Mutex::new(CheckpointStatus::Idle),
vector_stores: RwLock::new(vector_stores),
vector_store_lazy_entries: RwLock::new(vector_store_lazy_entries),
cache: RwLock::new(cache),
checkpoint_compression: options.checkpoint_compression.clone(),
sync_mode: options.sync_mode,
group_commit_enabled: options.group_commit_enabled,
group_commit_window_ms: options.group_commit_window_ms,
primary_replication,
replica_replication,
#[cfg(feature = "bench-profile")]
commit_lock_wait_ns: AtomicU64::new(0),
#[cfg(feature = "bench-profile")]
wal_flush_ns: AtomicU64::new(0),
})
}
pub fn close_single_file_with_options(
db: SingleFileDB,
options: SingleFileCloseOptions,
) -> Result<()> {
if let Some(threshold_raw) = options.checkpoint_if_wal_usage_at_least {
if !threshold_raw.is_finite() {
return Err(KiteError::Internal(format!(
"invalid close checkpoint threshold: {threshold_raw}"
)));
}
let threshold = threshold_raw.clamp(0.0, 1.0);
if !db.read_only && db.should_checkpoint(threshold) {
db.checkpoint()?;
}
}
if let Some(ref mvcc) = db.mvcc {
mvcc.stop();
}
let mut pager = db.pager.lock();
let mut wal_buffer = db.wal_buffer.lock();
wal_buffer.flush(&mut pager)?;
{
let mut header = db.header.write();
header.wal_head = wal_buffer.head();
header.wal_tail = wal_buffer.tail();
header.max_node_id = db.next_node_id.load(Ordering::SeqCst).saturating_sub(1);
header.next_tx_id = db.next_tx_id.load(Ordering::SeqCst);
let header_bytes = header.serialize_to_page();
pager.write_page(0, &header_bytes)?;
}
pager.sync()?;
Ok(())
}
pub fn close_single_file(db: SingleFileDB) -> Result<()> {
close_single_file_with_options(db, SingleFileCloseOptions::default())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::single_file::recovery::read_wal_area;
use crate::core::single_file::{
close_single_file, close_single_file_with_options, SingleFileCloseOptions,
};
use crate::core::wal::record::parse_wal_record;
use crate::util::binary::{align_up, read_u32};
use tempfile::tempdir;
fn corrupt_last_wal_record(db: &SingleFileDB) {
let mut pager = db.pager.lock();
let header = db.header.read().clone();
let wal_data = read_wal_area(&mut pager, &header).expect("expected value");
let mut pos = header.wal_tail as usize;
let head = header.wal_head as usize;
let mut last_start = None;
while pos < head {
let rec_len = read_u32(&wal_data, pos) as usize;
if rec_len == 0 {
break;
}
if parse_wal_record(&wal_data, pos).is_none() {
break;
}
last_start = Some(pos);
let aligned_size = align_up(rec_len, WAL_RECORD_ALIGNMENT);
pos += aligned_size;
}
let last_start = last_start.expect("wal record");
let rec_len = read_u32(&wal_data, last_start) as usize;
let crc_offset = last_start + rec_len - 4;
let wal_start = header.wal_start_page as usize * header.page_size as usize;
let file_offset = wal_start + crc_offset;
let page_size = header.page_size as usize;
let page_num = (file_offset / page_size) as u32;
let page_offset = file_offset % page_size;
if page_offset + 4 <= page_size {
let mut page = pager.read_page(page_num).expect("expected value");
page[page_offset..page_offset + 4].fill(0);
pager.write_page(page_num, &page).expect("expected value");
} else {
let first_len = page_size - page_offset;
let mut page = pager.read_page(page_num).expect("expected value");
page[page_offset..].fill(0);
pager.write_page(page_num, &page).expect("expected value");
let mut next_page = pager.read_page(page_num + 1).expect("expected value");
next_page[..(4 - first_len)].fill(0);
pager
.write_page(page_num + 1, &next_page)
.expect("expected value");
}
pager.sync().expect("expected value");
}
#[test]
fn test_recover_incomplete_background_checkpoint() {
let temp_dir = tempdir().expect("expected value");
let db_path = temp_dir.path().join("checkpoint-recover.kitedb");
let db = open_single_file(&db_path, SingleFileOpenOptions::new()).expect("expected value");
db.begin(false).expect("expected value");
let _n1 = db.create_node(Some("n1")).expect("expected value");
db.commit().expect("expected value");
{
let mut pager = db.pager.lock();
let mut wal = db.wal_buffer.lock();
let mut header = db.header.write();
wal.switch_to_secondary();
header.active_wal_region = 1;
header.checkpoint_in_progress = 1;
header.wal_primary_head = wal.primary_head();
header.wal_secondary_head = wal.secondary_head();
header.wal_head = wal.head();
header.wal_tail = wal.tail();
header.change_counter += 1;
let header_bytes = header.serialize_to_page();
pager.write_page(0, &header_bytes).expect("expected value");
pager.sync().expect("expected value");
}
db.begin(false).expect("expected value");
let _n2 = db.create_node(Some("n2")).expect("expected value");
db.commit().expect("expected value");
close_single_file(db).expect("expected value");
let db = open_single_file(&db_path, SingleFileOpenOptions::new()).expect("expected value");
assert!(db.node_by_key("n1").is_some());
assert!(db.node_by_key("n2").is_some());
close_single_file(db).expect("expected value");
}
#[test]
fn test_group_commit_flush_and_persist() {
let temp_dir = tempdir().expect("expected value");
let db_path = temp_dir.path().join("group-commit.kitedb");
let db = open_single_file(
&db_path,
SingleFileOpenOptions::new()
.sync_mode(SyncMode::Normal)
.group_commit_enabled(true)
.group_commit_window_ms(0),
)
.expect("expected value");
db.begin(false).expect("expected value");
let node_id = db.create_node(Some("n1")).expect("expected value");
let key_id = db.propkey_id_or_create("value");
db.set_node_prop(node_id, key_id, crate::types::PropValue::I64(42))
.expect("expected value");
db.commit().expect("expected value");
assert!(!db.wal_buffer.lock().has_pending_writes());
close_single_file(db).expect("expected value");
let reopened = open_single_file(
&db_path,
SingleFileOpenOptions::new()
.sync_mode(SyncMode::Normal)
.group_commit_enabled(true)
.group_commit_window_ms(0),
)
.expect("expected value");
let value = reopened.node_prop(node_id, key_id).expect("prop value");
assert_eq!(value, crate::types::PropValue::I64(42));
close_single_file(reopened).expect("expected value");
}
#[test]
fn test_open_rejects_wal_size_mismatch() {
let temp_dir = tempdir().expect("expected value");
let db_path = temp_dir.path().join("wal-size-mismatch.kitedb");
let db = open_single_file(&db_path, SingleFileOpenOptions::new().wal_size(64 * 1024))
.expect("expected value");
close_single_file(db).expect("expected value");
let reopen = open_single_file(
&db_path,
SingleFileOpenOptions::new().wal_size(64 * 1024 * 1024),
);
assert!(reopen.is_err(), "expected wal size mismatch to error");
}
#[test]
fn test_recover_checkpoint_with_partial_header_update() {
let temp_dir = tempdir().expect("expected value");
let db_path = temp_dir
.path()
.join("checkpoint-recover-partial-header.kitedb");
let db = open_single_file(&db_path, SingleFileOpenOptions::new()).expect("expected value");
db.begin(false).expect("expected value");
let _n1 = db.create_node(Some("n1")).expect("expected value");
db.commit().expect("expected value");
{
let mut pager = db.pager.lock();
let mut wal = db.wal_buffer.lock();
let mut header = db.header.write();
wal.switch_to_secondary();
header.active_wal_region = 1;
header.checkpoint_in_progress = 1;
header.wal_primary_head = wal.primary_head();
header.wal_secondary_head = wal.secondary_head();
header.wal_head = wal.head();
header.wal_tail = wal.tail();
header.change_counter += 1;
let header_bytes = header.serialize_to_page();
pager.write_page(0, &header_bytes).expect("expected value");
pager.sync().expect("expected value");
}
db.begin(false).expect("expected value");
let _n2 = db.create_node(Some("n2")).expect("expected value");
db.commit().expect("expected value");
{
let mut pager = db.pager.lock();
let mut wal = db.wal_buffer.lock();
wal.flush(&mut pager).expect("expected value");
let mut header = db.header.write();
header.active_wal_region = 1;
header.checkpoint_in_progress = 1;
header.wal_primary_head = wal.primary_head();
header.wal_head = wal.head();
header.wal_tail = wal.tail();
header.wal_secondary_head = wal.primary_region_size();
header.change_counter += 1;
let header_bytes = header.serialize_to_page();
pager.write_page(0, &header_bytes).expect("expected value");
pager.sync().expect("expected value");
}
drop(db);
let db = open_single_file(&db_path, SingleFileOpenOptions::new()).expect("expected value");
assert!(db.node_by_key("n1").is_some());
assert!(db.node_by_key("n2").is_some());
close_single_file(db).expect("expected value");
}
#[test]
fn test_recover_checkpoint_with_missing_primary_head() {
let temp_dir = tempdir().expect("expected value");
let db_path = temp_dir
.path()
.join("checkpoint-recover-missing-primary-head.kitedb");
let db = open_single_file(&db_path, SingleFileOpenOptions::new()).expect("expected value");
db.begin(false).expect("expected value");
let _n1 = db.create_node(Some("n1")).expect("expected value");
db.commit().expect("expected value");
{
let mut pager = db.pager.lock();
let wal = db.wal_buffer.lock();
let mut header = db.header.write();
header.active_wal_region = 1;
header.checkpoint_in_progress = 1;
header.wal_primary_head = 0;
header.wal_secondary_head = wal.secondary_head();
header.wal_head = wal.head();
header.wal_tail = wal.tail();
header.change_counter += 1;
let header_bytes = header.serialize_to_page();
pager.write_page(0, &header_bytes).expect("expected value");
pager.sync().expect("expected value");
}
drop(db);
let db = open_single_file(&db_path, SingleFileOpenOptions::new()).expect("expected value");
assert!(db.node_by_key("n1").is_some());
close_single_file(db).expect("expected value");
}
#[test]
fn test_recover_wal_with_truncated_record() {
let temp_dir = tempdir().expect("expected value");
let db_path = temp_dir.path().join("wal-truncated.kitedb");
let db = open_single_file(&db_path, SingleFileOpenOptions::new()).expect("expected value");
db.begin(false).expect("expected value");
let _n1 = db.create_node(Some("n1")).expect("expected value");
db.commit().expect("expected value");
db.begin(false).expect("expected value");
let _n2 = db.create_node(Some("n2")).expect("expected value");
db.commit().expect("expected value");
corrupt_last_wal_record(&db);
drop(db);
let db = open_single_file(&db_path, SingleFileOpenOptions::new()).expect("expected value");
assert!(db.node_by_key("n1").is_some());
assert!(db.node_by_key("n2").is_none());
close_single_file(db).expect("expected value");
}
#[test]
fn test_recover_ignores_uncommitted_transaction() {
let temp_dir = tempdir().expect("expected value");
let db_path = temp_dir.path().join("wal-uncommitted.kitedb");
let db = open_single_file(&db_path, SingleFileOpenOptions::new()).expect("expected value");
db.begin(false).expect("expected value");
let _n1 = db.create_node(Some("n1")).expect("expected value");
{
let mut pager = db.pager.lock();
let wal = db.wal_buffer.lock();
let mut header = db.header.write();
header.wal_head = wal.head();
header.wal_tail = wal.tail();
header.wal_primary_head = wal.primary_head();
header.wal_secondary_head = wal.secondary_head();
header.active_wal_region = wal.active_region();
header.change_counter += 1;
let header_bytes = header.serialize_to_page();
pager.write_page(0, &header_bytes).expect("expected value");
pager.sync().expect("expected value");
}
drop(db);
let db = open_single_file(&db_path, SingleFileOpenOptions::new()).expect("expected value");
assert!(db.node_by_key("n1").is_none());
close_single_file(db).expect("expected value");
}
#[test]
fn test_checkpoint_replay_after_crash() {
let temp_dir = tempdir().expect("expected value");
let db_path = temp_dir.path().join("checkpoint-replay.kitedb");
let db = open_single_file(&db_path, SingleFileOpenOptions::new()).expect("expected value");
db.begin(false).expect("expected value");
let _n1 = db.create_node(Some("n1")).expect("expected value");
db.commit().expect("expected value");
db.checkpoint().expect("expected value");
db.begin(false).expect("expected value");
let _n2 = db.create_node(Some("n2")).expect("expected value");
db.commit().expect("expected value");
drop(db);
let db = open_single_file(&db_path, SingleFileOpenOptions::new()).expect("expected value");
assert!(db.node_by_key("n1").is_some());
assert!(db.node_by_key("n2").is_some());
close_single_file(db).expect("expected value");
}
#[test]
fn test_close_with_checkpoint_if_wal_over_clears_wal() {
let temp_dir = tempdir().expect("expected value");
let db_path = temp_dir.path().join("close-with-checkpoint.kitedb");
let db = open_single_file(
&db_path,
SingleFileOpenOptions::new().auto_checkpoint(false),
)
.expect("expected value");
db.begin(false).expect("expected value");
let _ = db.create_node(Some("n1")).expect("expected value");
db.commit().expect("expected value");
assert!(db.should_checkpoint(0.0));
close_single_file_with_options(
db,
SingleFileCloseOptions::new().checkpoint_if_wal_usage_at_least(0.0),
)
.expect("expected value");
let reopened = open_single_file(
&db_path,
SingleFileOpenOptions::new().auto_checkpoint(false),
)
.expect("expected value");
let header = reopened.header.read().clone();
assert_eq!(header.wal_head, 0);
assert_eq!(header.wal_tail, 0);
close_single_file(reopened).expect("expected value");
}
#[test]
fn test_close_with_high_threshold_keeps_wal() {
let temp_dir = tempdir().expect("expected value");
let db_path = temp_dir.path().join("close-without-checkpoint.kitedb");
let db = open_single_file(
&db_path,
SingleFileOpenOptions::new().auto_checkpoint(false),
)
.expect("expected value");
db.begin(false).expect("expected value");
let _ = db.create_node(Some("n1")).expect("expected value");
db.commit().expect("expected value");
close_single_file_with_options(
db,
SingleFileCloseOptions::new().checkpoint_if_wal_usage_at_least(1.0),
)
.expect("expected value");
let reopened = open_single_file(
&db_path,
SingleFileOpenOptions::new().auto_checkpoint(false),
)
.expect("expected value");
let header = reopened.header.read().clone();
assert!(header.wal_head > 0);
close_single_file(reopened).expect("expected value");
}
}