use std::collections::HashMap;
use std::sync::atomic::Ordering;
use crate::core::pager::{pages_to_store, FilePager};
use crate::core::snapshot::reader::SnapshotData;
use crate::core::snapshot::writer::{
build_snapshot_to_memory, EdgeData, NodeData, SnapshotBuildInput,
};
use crate::error::{KiteError, Result};
use crate::types::*;
use crate::util::mmap::map_file;
use crate::vector::store::vector_store_node_vector;
use super::vector::vector_stores_from_snapshot;
use super::{CheckpointStatus, SingleFileDB};
type GraphData = (
Vec<NodeData>,
Vec<EdgeData>,
HashMap<LabelId, String>,
HashMap<ETypeId, String>,
HashMap<PropKeyId, String>,
);
impl SingleFileDB {
pub fn checkpoint(&self) -> Result<()> {
if self.read_only {
return Err(KiteError::ReadOnly);
}
if self.has_any_transaction() {
return Err(KiteError::TransactionInProgress);
}
let (nodes, edges, labels, etypes, propkeys) = self.collect_graph_data();
let header = self.header.read().clone();
let new_gen = header.active_snapshot_gen + 1;
let snapshot_buffer = build_snapshot_to_memory(SnapshotBuildInput {
generation: new_gen,
nodes,
edges,
labels,
etypes,
propkeys,
compression: self.checkpoint_compression.clone(),
})?;
let wal_end_page = header.wal_start_page + header.wal_page_count;
let new_snapshot_start_page = wal_end_page;
let new_snapshot_page_count =
pages_to_store(snapshot_buffer.len(), header.page_size as usize) as u64;
{
let mut pager = self.pager.lock();
self.write_snapshot_pages(
&mut pager,
new_snapshot_start_page as u32,
&snapshot_buffer,
header.page_size as usize,
)?;
}
{
let mut pager = self.pager.lock();
let mut wal_buffer = self.wal_buffer.lock();
let mut header = self.header.write();
header.active_snapshot_gen = new_gen;
header.snapshot_start_page = new_snapshot_start_page;
header.snapshot_page_count = new_snapshot_page_count;
header.db_size_pages = new_snapshot_start_page + new_snapshot_page_count;
header.max_node_id = self.next_node_id.load(Ordering::SeqCst).saturating_sub(1);
header.next_tx_id = self.next_tx_id.load(Ordering::SeqCst);
header.wal_head = 0;
header.wal_tail = 0;
wal_buffer.reset();
header.change_counter += 1;
let header_bytes = header.serialize_to_page();
pager.write_page(0, &header_bytes)?;
pager.sync()?;
}
self.delta.write().clear();
self.reload_snapshot()?;
Ok(())
}
pub(crate) fn reload_snapshot(&self) -> Result<()> {
let header = self.header.read();
if header.snapshot_page_count == 0 {
*self.snapshot.write() = None;
self.vector_stores.write().clear();
return Ok(());
}
let snapshot_offset = (header.snapshot_start_page * header.page_size as u64) as usize;
let pager = self.pager.lock();
let new_snapshot = SnapshotData::parse_at_offset(
std::sync::Arc::new({
map_file(pager.file())?
}),
snapshot_offset,
&crate::core::snapshot::reader::ParseSnapshotOptions::default(),
)?;
*self.snapshot.write() = Some(new_snapshot);
if let Some(ref snapshot) = *self.snapshot.read() {
let stores = vector_stores_from_snapshot(snapshot)?;
*self.vector_stores.write() = stores;
}
Ok(())
}
pub fn is_checkpoint_running(&self) -> bool {
let status = *self.checkpoint_status.lock();
matches!(
status,
CheckpointStatus::Running | CheckpointStatus::Completing
)
}
pub fn checkpoint_status(&self) -> CheckpointStatus {
*self.checkpoint_status.lock()
}
pub fn background_checkpoint(&self) -> Result<()> {
if self.read_only {
return Err(KiteError::ReadOnly);
}
{
let mut status = self.checkpoint_status.lock();
match *status {
CheckpointStatus::Running => {
return Ok(());
}
CheckpointStatus::Completing => {
return Ok(());
}
CheckpointStatus::Idle => {
*status = CheckpointStatus::Running;
}
}
}
{
let mut pager = self.pager.lock();
let mut wal_buffer = self.wal_buffer.lock();
let mut header = self.header.write();
wal_buffer.switch_to_secondary();
header.active_wal_region = 1;
header.checkpoint_in_progress = 1;
header.wal_primary_head = wal_buffer.primary_head();
header.wal_secondary_head = wal_buffer.secondary_head();
header.change_counter += 1;
let header_bytes = header.serialize_to_page();
pager.write_page(0, &header_bytes)?;
pager.sync()?;
}
let snapshot_info = match self.build_and_write_snapshot() {
Ok(info) => info,
Err(e) => {
self.recover_from_checkpoint_error();
return Err(e);
}
};
self.complete_background_checkpoint(snapshot_info)?;
Ok(())
}
fn build_and_write_snapshot(&self) -> Result<(u64, u64, u64)> {
let (nodes, edges, labels, etypes, propkeys) = self.collect_graph_data();
let header = self.header.read().clone();
let new_gen = header.active_snapshot_gen + 1;
let snapshot_buffer = build_snapshot_to_memory(SnapshotBuildInput {
generation: new_gen,
nodes,
edges,
labels,
etypes,
propkeys,
compression: self.checkpoint_compression.clone(),
})?;
let wal_end_page = header.wal_start_page + header.wal_page_count;
let new_snapshot_start_page = wal_end_page;
let new_snapshot_page_count =
pages_to_store(snapshot_buffer.len(), header.page_size as usize) as u64;
{
let mut pager = self.pager.lock();
self.write_snapshot_pages(
&mut pager,
new_snapshot_start_page as u32,
&snapshot_buffer,
header.page_size as usize,
)?;
}
Ok((new_gen, new_snapshot_start_page, new_snapshot_page_count))
}
fn complete_background_checkpoint(&self, snapshot_info: (u64, u64, u64)) -> Result<()> {
let (new_gen, new_snapshot_start_page, new_snapshot_page_count) = snapshot_info;
*self.checkpoint_status.lock() = CheckpointStatus::Completing;
{
let mut pager = self.pager.lock();
let mut wal_buffer = self.wal_buffer.lock();
let mut header = self.header.write();
let old_snapshot_start_page = header.snapshot_start_page;
let old_snapshot_page_count = header.snapshot_page_count;
wal_buffer.merge_secondary_into_primary(&mut pager)?;
wal_buffer.flush(&mut pager)?;
header.active_snapshot_gen = new_gen;
header.snapshot_start_page = new_snapshot_start_page;
header.snapshot_page_count = new_snapshot_page_count;
header.db_size_pages = new_snapshot_start_page + new_snapshot_page_count;
header.max_node_id = self.next_node_id.load(Ordering::SeqCst).saturating_sub(1);
header.next_tx_id = self.next_tx_id.load(Ordering::SeqCst);
header.wal_head = wal_buffer.head();
header.wal_tail = wal_buffer.tail();
header.wal_primary_head = wal_buffer.primary_head();
header.wal_secondary_head = wal_buffer.secondary_head();
header.active_wal_region = 0;
header.checkpoint_in_progress = 0;
header.change_counter += 1;
let header_bytes = header.serialize_to_page();
pager.write_page(0, &header_bytes)?;
pager.sync()?;
if old_snapshot_page_count > 0 && old_snapshot_start_page != new_snapshot_start_page {
pager.free_pages(
old_snapshot_start_page as u32,
old_snapshot_page_count as u32,
);
}
}
self.delta.write().clear();
self.reload_snapshot()?;
*self.checkpoint_status.lock() = CheckpointStatus::Idle;
Ok(())
}
fn recover_from_checkpoint_error(&self) {
if let Some(mut pager) = self.pager.try_lock() {
if let Some(mut wal_buffer) = self.wal_buffer.try_lock() {
if let Some(mut header) = self.header.try_write() {
wal_buffer.switch_to_primary(false);
header.active_wal_region = 0;
header.checkpoint_in_progress = 0;
let header_bytes = header.serialize_to_page();
if let Err(err) = pager.write_page(0, &header_bytes) {
eprintln!("Warning: Failed to write checkpoint header during recovery: {err}");
}
if let Err(err) = pager.sync() {
eprintln!("Warning: Failed to sync checkpoint header during recovery: {err}");
}
}
}
}
*self.checkpoint_status.lock() = CheckpointStatus::Idle;
}
pub(crate) fn write_snapshot_pages(
&self,
pager: &mut FilePager,
start_page: u32,
buffer: &[u8],
page_size: usize,
) -> Result<()> {
let num_pages = pages_to_store(buffer.len(), page_size);
let required_pages = start_page + num_pages;
let current_pages = (pager.file_size() as usize).div_ceil(page_size);
if required_pages as usize > current_pages {
pager.allocate_pages(required_pages - current_pages as u32)?;
}
for i in 0..num_pages {
let mut page_data = vec![0u8; page_size];
let src_offset = i as usize * page_size;
let src_end = std::cmp::min(src_offset + page_size, buffer.len());
page_data[..src_end - src_offset].copy_from_slice(&buffer[src_offset..src_end]);
pager.write_page(start_page + i, &page_data)?;
}
pager.sync()?;
Ok(())
}
pub(crate) fn collect_graph_data(&self) -> GraphData {
let mut nodes = Vec::new();
let mut edges = Vec::new();
let mut labels = HashMap::new();
let mut etypes = HashMap::new();
let mut propkeys = HashMap::new();
let delta = self.delta.read();
for (&id, name) in self.label_ids.read().iter() {
labels.insert(id, name.clone());
}
for (&id, name) in self.etype_ids.read().iter() {
etypes.insert(id, name.clone());
}
for (&id, name) in self.propkey_ids.read().iter() {
propkeys.insert(id, name.clone());
}
if let Some(ref snapshot) = *self.snapshot.read() {
let num_nodes = snapshot.header.num_nodes as usize;
for phys in 0..num_nodes {
let node_id = match snapshot.node_id(phys as u32) {
Some(id) => id,
None => continue,
};
if delta.is_node_deleted(node_id) {
continue;
}
let key = snapshot.node_key(phys as u32);
let mut props = HashMap::new();
if let Some(snapshot_props) = snapshot.node_props(phys as u32) {
for (key_id, value) in snapshot_props {
props.insert(key_id, value);
}
}
if let Some(node_delta) = delta.node_delta(node_id) {
if let Some(ref delta_props) = node_delta.props {
for (&key_id, value) in delta_props {
match value {
Some(v) => {
props.insert(key_id, v.as_ref().clone());
}
None => {
props.remove(&key_id);
}
}
}
}
}
let mut node_labels: std::collections::HashSet<LabelId> = std::collections::HashSet::new();
if let Some(snapshot_labels) = snapshot.node_labels(phys as u32) {
node_labels.extend(snapshot_labels.into_iter());
}
if let Some(node_delta) = delta.node_delta(node_id) {
if let Some(ref labels) = node_delta.labels {
node_labels.extend(labels.iter().copied());
}
if let Some(ref deleted) = node_delta.labels_deleted {
for label_id in deleted {
node_labels.remove(label_id);
}
}
}
let mut node_labels: Vec<LabelId> = node_labels.into_iter().collect();
node_labels.sort_unstable();
nodes.push(NodeData {
node_id,
key,
labels: node_labels,
props,
});
for edge_info in snapshot.out_edges(phys as u32) {
let dst_node_id = match snapshot.node_id(edge_info.dst) {
Some(id) => id,
None => continue,
};
if delta.is_node_deleted(dst_node_id) {
continue;
}
if delta.is_edge_deleted(node_id, edge_info.etype, dst_node_id) {
continue;
}
let mut edge_props = HashMap::new();
if let Some(edge_idx) =
snapshot.find_edge_index(phys as u32, edge_info.etype, edge_info.dst)
{
if let Some(snapshot_edge_props) = snapshot.edge_props(edge_idx) {
edge_props = snapshot_edge_props;
}
}
let edge_key = (node_id, edge_info.etype, dst_node_id);
if let Some(delta_edge_props) = delta.edge_props.get(&edge_key) {
for (&key_id, value) in delta_edge_props {
match value {
Some(v) => {
edge_props.insert(key_id, v.as_ref().clone());
}
None => {
edge_props.remove(&key_id);
}
}
}
}
edges.push(EdgeData {
src: node_id,
etype: edge_info.etype,
dst: dst_node_id,
props: edge_props,
});
}
}
}
for (&node_id, node_delta) in &delta.created_nodes {
let mut props = HashMap::new();
if let Some(ref delta_props) = node_delta.props {
for (&key_id, value) in delta_props {
if let Some(v) = value {
props.insert(key_id, v.as_ref().clone());
}
}
}
let mut node_labels: Vec<LabelId> = node_delta
.labels
.as_ref()
.map(|l| l.iter().copied().collect())
.unwrap_or_default();
node_labels.sort_unstable();
nodes.push(NodeData {
node_id,
key: node_delta.key.clone(),
labels: node_labels,
props,
});
}
for (&src, patches) in &delta.out_add {
if delta.is_node_deleted(src) {
continue;
}
for patch in patches {
if delta.is_node_deleted(patch.other) {
continue;
}
let mut edge_props = HashMap::new();
let edge_key = (src, patch.etype, patch.other);
if let Some(delta_edge_props) = delta.edge_props.get(&edge_key) {
for (&key_id, value) in delta_edge_props {
if let Some(v) = value {
edge_props.insert(key_id, v.as_ref().clone());
}
}
}
edges.push(EdgeData {
src,
etype: patch.etype,
dst: patch.other,
props: edge_props,
});
}
}
if !self.vector_stores.read().is_empty() {
let mut node_index: HashMap<NodeId, usize> = HashMap::new();
for (idx, node) in nodes.iter().enumerate() {
node_index.insert(node.node_id, idx);
}
let stores = self.vector_stores.read();
for (&prop_key_id, store) in stores.iter() {
for &node_id in store.node_to_vector.keys() {
if delta.is_node_deleted(node_id) {
continue;
}
let Some(&idx) = node_index.get(&node_id) else {
continue;
};
if let Some(vec) = vector_store_node_vector(store, node_id) {
nodes[idx]
.props
.insert(prop_key_id, PropValue::VectorF32(vec.to_vec()));
}
}
}
}
(nodes, edges, labels, etypes, propkeys)
}
pub fn should_checkpoint(&self, threshold: f64) -> bool {
let usage = self.wal_buffer.lock().usage_ratio();
usage >= threshold
}
}