use crate::dense_hnsw::exact_dense_search_above_cutoff;
use crate::error::EngineError;
use crate::manifest::{default_manifest, load_manifest, load_manifest_readonly, write_manifest};
use crate::memtable::{encode_range_prop_value, Memtable};
use crate::segment_reader::SegmentReader;
use crate::segment_writer::{
node_prop_eq_sidecar_path, node_prop_range_sidecar_path, segment_dir, segment_tmp_dir,
write_indexes_from_metadata_with_secondary_indexes, write_merged_edges_dat,
write_merged_nodes_dat, write_node_prop_eq_sidecar_to_path,
write_node_prop_range_sidecar_to_path, write_segment_with_secondary_indexes,
write_v3_edges_dat, write_v3_nodes_dat, CompactEdgeMeta, CompactNodeMeta, FastMergeCopyInfo,
SecondaryIndexMaintenanceReport,
};
use crate::source_list::SourceList;
use crate::sparse_postings::{accumulate_sparse_posting_scores, sparse_dot_score};
use crate::types::*;
use crate::wal::{remove_wal_generation, wal_generation_path, WalReader, WalWriter};
use crate::wal_sync::{shutdown_sync_thread, sync_thread_loop, WalSyncState};
use std::cmp::Reverse;
use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
use std::ops::ControlFlow;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::thread::JoinHandle;
use std::time::SystemTime;
type SecondaryIndexCatalog =
HashMap<u32, HashMap<String, HashMap<SecondaryIndexKind, SecondaryIndexManifestEntry>>>;
type SecondaryIndexEntries = Vec<SecondaryIndexManifestEntry>;
fn merge_sorted_paged<T: Clone>(
mut memtable_items: Vec<T>,
segment_sorted_items: Vec<Vec<T>>,
key_fn: impl Fn(&T) -> u64,
skip_fn: impl Fn(&T) -> bool,
page: &PageRequest,
) -> PageResult<T> {
memtable_items.sort_unstable_by_key(|item| key_fn(item));
let sources_count = 1 + segment_sorted_items.len();
let mut sources: Vec<&[T]> = Vec::with_capacity(sources_count);
sources.push(&memtable_items);
for seg_items in &segment_sorted_items {
sources.push(seg_items);
}
let mut heap: BinaryHeap<Reverse<(u64, usize)>> = BinaryHeap::with_capacity(sources_count);
let mut positions: Vec<usize> = vec![0; sources_count];
for (i, source) in sources.iter().enumerate() {
if source.is_empty() {
continue;
}
let start = if let Some(cursor) = page.after {
match source.binary_search_by_key(&cursor, &key_fn) {
Ok(pos) => pos + 1, Err(pos) => pos, }
} else {
0
};
if start < source.len() {
heap.push(Reverse((key_fn(&source[start]), i)));
positions[i] = start + 1;
}
}
let limit = page.limit;
let mut result: Vec<T> = Vec::with_capacity(limit.unwrap_or(64).min(1024));
let mut last_seen_key: Option<u64> = None;
while let Some(Reverse((key, src_idx))) = heap.pop() {
let item_pos = positions[src_idx] - 1;
let src = sources[src_idx];
let next_pos = positions[src_idx];
if next_pos < src.len() {
heap.push(Reverse((key_fn(&src[next_pos]), src_idx)));
positions[src_idx] = next_pos + 1;
}
if last_seen_key == Some(key) {
continue;
}
last_seen_key = Some(key);
if skip_fn(&src[item_pos]) {
continue;
}
result.push(src[item_pos].clone());
if let Some(lim) = limit {
if lim > 0 && result.len() >= lim {
let has_more = !heap.is_empty();
return PageResult {
next_cursor: if has_more { Some(key) } else { None },
items: result,
};
}
}
}
PageResult {
items: result,
next_cursor: None,
}
}
fn merge_type_ids_paged(
memtable_ids: Vec<u64>,
segment_sorted_ids: Vec<Vec<u64>>,
deleted: &NodeIdSet,
page: &PageRequest,
) -> PageResult<u64> {
merge_sorted_paged(
memtable_ids,
segment_sorted_ids,
|&id| id,
|&id| deleted.contains(&id),
page,
)
}
fn now_millis() -> i64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}
fn reconcile_dense_vector_manifest(
manifest: &mut ManifestState,
options: &DbOptions,
) -> Result<bool, EngineError> {
if let Some(config) = manifest.dense_vector.as_ref() {
validate_dense_vector_config(config)?;
}
if let Some(config) = options.dense_vector.as_ref() {
validate_dense_vector_config(config)?;
}
match (&manifest.dense_vector, &options.dense_vector) {
(Some(existing), Some(requested)) if existing != requested => {
Err(EngineError::InvalidOperation(format!(
"dense vector configuration mismatch: manifest has {:?}, open requested {:?}",
existing, requested
)))
}
(None, Some(requested)) => {
manifest.dense_vector = Some(requested.clone());
Ok(true)
}
_ => Ok(false),
}
}
fn secondary_index_lookup_key(entry: &SecondaryIndexManifestEntry) -> SecondaryIndexLookupKey {
match &entry.target {
SecondaryIndexTarget::NodeProperty { type_id, prop_key } => SecondaryIndexLookupKey {
type_id: *type_id,
prop_key: prop_key.clone(),
kind: entry.kind.clone(),
},
}
}
fn normalize_secondary_index_manifest(manifest: &mut ManifestState) -> Result<bool, EngineError> {
let mut dirty = false;
let mut seen_ids = HashSet::new();
let mut seen_keys = HashSet::new();
let mut seen_range_targets = HashSet::new();
let mut max_index_id = 0u64;
for entry in &mut manifest.secondary_indexes {
if !seen_ids.insert(entry.index_id) {
return Err(EngineError::ManifestError(format!(
"duplicate secondary index id {} in manifest",
entry.index_id
)));
}
if !seen_keys.insert(secondary_index_lookup_key(entry)) {
return Err(EngineError::ManifestError(format!(
"duplicate secondary index declaration for {:?}",
entry.target
)));
}
if matches!(entry.kind, SecondaryIndexKind::Range { .. }) {
let SecondaryIndexTarget::NodeProperty { type_id, prop_key } = &entry.target;
if !seen_range_targets.insert((*type_id, prop_key.clone())) {
return Err(EngineError::ManifestError(format!(
"duplicate range declaration for node property ({}, {})",
type_id, prop_key
)));
}
}
max_index_id = max_index_id.max(entry.index_id);
}
let next_secondary_index_id = if max_index_id == 0 {
manifest.next_secondary_index_id.max(1)
} else {
manifest
.next_secondary_index_id
.max(max_index_id.saturating_add(1))
};
if next_secondary_index_id != manifest.next_secondary_index_id {
manifest.next_secondary_index_id = next_secondary_index_id;
dirty = true;
}
Ok(dirty)
}
fn build_secondary_index_catalog(
entries: &[SecondaryIndexManifestEntry],
) -> Result<SecondaryIndexCatalog, EngineError> {
let mut catalog: SecondaryIndexCatalog = HashMap::with_capacity(entries.len());
let mut seen_range_targets = HashSet::new();
for entry in entries {
match &entry.target {
SecondaryIndexTarget::NodeProperty { type_id, prop_key } => {
if matches!(entry.kind, SecondaryIndexKind::Range { .. })
&& !seen_range_targets.insert((*type_id, prop_key.clone()))
{
return Err(EngineError::ManifestError(format!(
"duplicate range declaration loaded from manifest for node property ({}, {})",
type_id, prop_key
)));
}
let kind_map = catalog
.entry(*type_id)
.or_default()
.entry(prop_key.clone())
.or_default();
if kind_map.insert(entry.kind.clone(), entry.clone()).is_some() {
return Err(EngineError::ManifestError(format!(
"duplicate secondary index declaration loaded from manifest: {:?}",
entry.target
)));
}
}
}
}
Ok(catalog)
}
fn sync_secondary_index_runtime_state(
catalog_lock: &RwLock<SecondaryIndexCatalog>,
entries_lock: &RwLock<SecondaryIndexEntries>,
entries: &[SecondaryIndexManifestEntry],
) -> Result<(), EngineError> {
let catalog = build_secondary_index_catalog(entries)?;
*catalog_lock.write().unwrap() = catalog;
*entries_lock.write().unwrap() = entries.to_vec();
Ok(())
}
fn merge_runtime_manifest_counters_from_shared(
manifest: &mut ManifestState,
next_node_id_seen: &AtomicU64,
next_edge_id_seen: &AtomicU64,
engine_seq_seen: &AtomicU64,
) {
manifest.next_node_id = manifest
.next_node_id
.max(next_node_id_seen.load(Ordering::Acquire));
manifest.next_edge_id = manifest
.next_edge_id
.max(next_edge_id_seen.load(Ordering::Acquire));
manifest.next_engine_seq = manifest
.next_engine_seq
.max(engine_seq_seen.load(Ordering::Acquire));
}
fn update_secondary_index_manifest_runtime(
db_dir: &Path,
manifest_write_lock: &Arc<Mutex<()>>,
catalog_lock: &Arc<RwLock<SecondaryIndexCatalog>>,
entries_lock: &Arc<RwLock<SecondaryIndexEntries>>,
next_node_id_seen: &AtomicU64,
next_edge_id_seen: &AtomicU64,
engine_seq_seen: &AtomicU64,
mutate: impl FnOnce(&mut ManifestState) -> Result<(), EngineError>,
) -> Result<(), EngineError> {
let _guard = manifest_write_lock.lock().unwrap();
let mut manifest = load_manifest_readonly(db_dir)?
.ok_or_else(|| EngineError::ManifestError("manifest missing".into()))?;
mutate(&mut manifest)?;
merge_runtime_manifest_counters_from_shared(
&mut manifest,
next_node_id_seen,
next_edge_id_seen,
engine_seq_seen,
);
write_manifest(db_dir, &manifest)?;
sync_secondary_index_runtime_state(catalog_lock, entries_lock, &manifest.secondary_indexes)?;
Ok(())
}
fn is_not_found_io_error(error: &EngineError) -> bool {
matches!(
error,
EngineError::IoError(io_error) if io_error.kind() == std::io::ErrorKind::NotFound
)
}
fn apply_secondary_index_failure_report(
manifest: &mut ManifestState,
report: &SecondaryIndexMaintenanceReport,
) {
for (index_id, message) in &report.failed_equality_indexes {
if let Some(entry) = manifest
.secondary_indexes
.iter_mut()
.find(|entry| entry.index_id == *index_id)
{
if matches!(entry.kind, SecondaryIndexKind::Equality) {
entry.state = SecondaryIndexState::Failed;
entry.last_error = Some(message.clone());
}
}
}
for (index_id, message) in &report.failed_range_indexes {
if let Some(entry) = manifest
.secondary_indexes
.iter_mut()
.find(|entry| entry.index_id == *index_id)
{
if matches!(entry.kind, SecondaryIndexKind::Range { .. }) {
entry.state = SecondaryIndexState::Failed;
entry.last_error = Some(message.clone());
}
}
}
}
fn equality_index_ids_snapshot(entries: &[SecondaryIndexManifestEntry]) -> NodeIdSet {
entries
.iter()
.filter(|entry| matches!(entry.kind, SecondaryIndexKind::Equality))
.map(|entry| entry.index_id)
.collect()
}
fn range_index_ids_snapshot(entries: &[SecondaryIndexManifestEntry]) -> NodeIdSet {
entries
.iter()
.filter(|entry| matches!(entry.kind, SecondaryIndexKind::Range { .. }))
.map(|entry| entry.index_id)
.collect()
}
fn reconcile_background_output_equality_declarations(
manifest: &mut ManifestState,
maintained_equality_index_ids: &NodeIdSet,
) -> Vec<u64> {
let mut rebuild_index_ids = Vec::new();
for entry in &mut manifest.secondary_indexes {
if !matches!(entry.kind, SecondaryIndexKind::Equality)
|| maintained_equality_index_ids.contains(&entry.index_id)
{
continue;
}
match entry.state {
SecondaryIndexState::Failed => {}
SecondaryIndexState::Building => {
entry.last_error = None;
rebuild_index_ids.push(entry.index_id);
}
SecondaryIndexState::Ready => {
entry.state = SecondaryIndexState::Building;
entry.last_error = None;
rebuild_index_ids.push(entry.index_id);
}
}
}
rebuild_index_ids.sort_unstable();
rebuild_index_ids.dedup();
rebuild_index_ids
}
fn reconcile_background_output_range_declarations(
manifest: &mut ManifestState,
maintained_range_index_ids: &NodeIdSet,
) -> Vec<u64> {
let mut rebuild_index_ids = Vec::new();
for entry in &mut manifest.secondary_indexes {
if !matches!(entry.kind, SecondaryIndexKind::Range { .. })
|| maintained_range_index_ids.contains(&entry.index_id)
{
continue;
}
match entry.state {
SecondaryIndexState::Failed => {}
SecondaryIndexState::Building => {
entry.last_error = None;
rebuild_index_ids.push(entry.index_id);
}
SecondaryIndexState::Ready => {
entry.state = SecondaryIndexState::Building;
entry.last_error = None;
rebuild_index_ids.push(entry.index_id);
}
}
}
rebuild_index_ids.sort_unstable();
rebuild_index_ids.dedup();
rebuild_index_ids
}
fn mark_secondary_index_failed(
db_dir: &Path,
manifest_write_lock: &Arc<Mutex<()>>,
catalog_lock: &Arc<RwLock<SecondaryIndexCatalog>>,
entries_lock: &Arc<RwLock<SecondaryIndexEntries>>,
next_node_id_seen: &AtomicU64,
next_edge_id_seen: &AtomicU64,
engine_seq_seen: &AtomicU64,
index_id: u64,
error: &EngineError,
) {
let message = error.to_string();
let _ = update_secondary_index_manifest_runtime(
db_dir,
manifest_write_lock,
catalog_lock,
entries_lock,
next_node_id_seen,
next_edge_id_seen,
engine_seq_seen,
|manifest| {
if let Some(entry) = manifest
.secondary_indexes
.iter_mut()
.find(|entry| entry.index_id == index_id)
{
entry.state = SecondaryIndexState::Failed;
entry.last_error = Some(message.clone());
}
Ok(())
},
);
}
fn build_secondary_eq_groups_for_segment(
segment: &SegmentReader,
type_id: u32,
prop_key: &str,
) -> Result<BTreeMap<u64, Vec<u64>>, EngineError> {
let target_key_hash = hash_prop_key(prop_key);
let legacy_hashes = segment.raw_node_prop_hashes_mmap();
let mut groups: BTreeMap<u64, Vec<u64>> = BTreeMap::new();
for index in 0..segment.node_meta_count() as usize {
let (
node_id,
data_offset,
_data_len,
node_type_id,
_updated_at,
_weight,
_key_len,
prop_hash_offset,
prop_hash_count,
_last_write_seq,
) = segment.node_meta_at(index)?;
if node_type_id != type_id {
continue;
}
let mut value_hash = None;
if !legacy_hashes.is_empty() && prop_hash_count > 0 {
let base = prop_hash_offset as usize;
for pair_index in 0..prop_hash_count as usize {
let pair_off = base + pair_index * 16;
let pair_end = pair_off + 16;
if pair_end > legacy_hashes.len() {
return Err(EngineError::CorruptRecord(format!(
"node {} prop hash pair at offset {} exceeds source length {}",
node_id,
pair_off,
legacy_hashes.len()
)));
}
let key_hash =
u64::from_le_bytes(legacy_hashes[pair_off..pair_off + 8].try_into().unwrap());
if key_hash != target_key_hash {
continue;
}
value_hash = Some(u64::from_le_bytes(
legacy_hashes[pair_off + 8..pair_off + 16]
.try_into()
.unwrap(),
));
break;
}
}
if value_hash.is_none() {
value_hash = segment
.node_property_value_at_offset(node_id, data_offset, prop_key)?
.map(|value| hash_prop_value(&value));
}
if let Some(value_hash) = value_hash {
groups.entry(value_hash).or_default().push(node_id);
}
}
for ids in groups.values_mut() {
ids.sort_unstable();
ids.dedup();
}
Ok(groups)
}
fn build_secondary_range_entries_for_segment(
segment: &SegmentReader,
type_id: u32,
prop_key: &str,
domain: SecondaryIndexRangeDomain,
) -> Result<Vec<(u64, u64)>, EngineError> {
let mut entries = Vec::new();
for index in 0..segment.node_meta_count() as usize {
let (
node_id,
data_offset,
_data_len,
node_type_id,
_updated_at,
_weight,
_key_len,
_prop_hash_offset,
_prop_hash_count,
_last_write_seq,
) = segment.node_meta_at(index)?;
if node_type_id != type_id {
continue;
}
let Some(value) = segment.node_property_value_at_offset(node_id, data_offset, prop_key)?
else {
continue;
};
let Some(encoded_value) = encode_range_prop_value(domain, &value) else {
continue;
};
entries.push((encoded_value, node_id));
}
entries.sort_unstable();
entries.dedup();
Ok(entries)
}
fn install_secondary_eq_sidecar(
seg_dir: &Path,
index_id: u64,
groups: &BTreeMap<u64, Vec<u64>>,
) -> Result<(), EngineError> {
let index_dir = seg_dir.join("secondary_indexes");
match std::fs::create_dir(&index_dir) {
Ok(()) => {}
Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => {}
Err(error) => return Err(error.into()),
}
let final_path = node_prop_eq_sidecar_path(seg_dir, index_id);
let tmp_path = index_dir.join(format!(".node_prop_eq_{}.tmp", index_id));
write_node_prop_eq_sidecar_to_path(&tmp_path, groups)?;
std::fs::rename(&tmp_path, &final_path)?;
fsync_dir(&index_dir)?;
Ok(())
}
fn install_secondary_range_sidecar(
seg_dir: &Path,
index_id: u64,
entries: &[(u64, u64)],
) -> Result<(), EngineError> {
let index_dir = seg_dir.join("secondary_indexes");
match std::fs::create_dir(&index_dir) {
Ok(()) => {}
Err(error) if error.kind() == std::io::ErrorKind::AlreadyExists => {}
Err(error) => return Err(error.into()),
}
let final_path = node_prop_range_sidecar_path(seg_dir, index_id);
let tmp_path = index_dir.join(format!(".node_prop_range_{}.tmp", index_id));
write_node_prop_range_sidecar_to_path(&tmp_path, entries)?;
std::fs::rename(&tmp_path, &final_path)?;
fsync_dir(&index_dir)?;
Ok(())
}
#[derive(Clone)]
struct SecondaryEqBuildSnapshot {
dense_config: Option<DenseVectorConfig>,
type_id: u32,
prop_key: String,
segment_ids: Vec<u64>,
}
enum SecondaryEqCoverageStatus {
Covered,
Incomplete,
Failed(String),
Cancelled,
}
enum SecondaryEqFinalizeOutcome {
Applied,
Retry,
Inactive,
}
#[derive(Clone)]
struct SecondaryRangeBuildSnapshot {
dense_config: Option<DenseVectorConfig>,
type_id: u32,
prop_key: String,
domain: SecondaryIndexRangeDomain,
segment_ids: Vec<u64>,
}
enum SecondaryRangeCoverageStatus {
Covered,
Incomplete,
Failed(String),
Cancelled,
}
enum SecondaryRangeFinalizeOutcome {
Applied,
Retry,
Inactive,
}
fn load_secondary_eq_build_snapshot(
db_dir: &Path,
manifest_write_lock: &Arc<Mutex<()>>,
index_id: u64,
) -> Result<Option<SecondaryEqBuildSnapshot>, EngineError> {
let _guard = manifest_write_lock.lock().unwrap();
let manifest = load_manifest_readonly(db_dir)?
.ok_or_else(|| EngineError::ManifestError("manifest missing".into()))?;
let Some(entry) = manifest
.secondary_indexes
.iter()
.find(|entry| entry.index_id == index_id)
.cloned()
else {
return Ok(None);
};
if entry.state != SecondaryIndexState::Building
|| !matches!(entry.kind, SecondaryIndexKind::Equality)
{
return Ok(None);
}
let SecondaryIndexTarget::NodeProperty { type_id, prop_key } = entry.target;
let mut segment_ids: Vec<u64> = manifest.segments.iter().map(|segment| segment.id).collect();
segment_ids.sort_unstable();
Ok(Some(SecondaryEqBuildSnapshot {
dense_config: manifest.dense_vector.clone(),
type_id,
prop_key,
segment_ids,
}))
}
fn build_secondary_eq_sidecars_for_snapshot(
db_dir: &Path,
index_id: u64,
snapshot: &SecondaryEqBuildSnapshot,
cancel: &AtomicBool,
) -> Result<(), EngineError> {
for &segment_id in &snapshot.segment_ids {
if cancel.load(Ordering::Relaxed) {
return Ok(());
}
let seg_path = segment_dir(db_dir, segment_id);
if !seg_path.exists() {
continue;
}
let segment =
match SegmentReader::open(&seg_path, segment_id, snapshot.dense_config.as_ref()) {
Ok(segment) => segment,
Err(error) if is_not_found_io_error(&error) => continue,
Err(error) => return Err(error),
};
match segment.validate_secondary_eq_sidecar(index_id) {
Ok(true) => continue,
Ok(false) => {
let groups = build_secondary_eq_groups_for_segment(
&segment,
snapshot.type_id,
&snapshot.prop_key,
)?;
match install_secondary_eq_sidecar(&seg_path, index_id, &groups) {
Ok(()) => {}
Err(error) if is_not_found_io_error(&error) => {}
Err(error) => return Err(error),
}
}
Err(error) if is_not_found_io_error(&error) => {}
Err(_) => {
let groups = build_secondary_eq_groups_for_segment(
&segment,
snapshot.type_id,
&snapshot.prop_key,
)?;
match install_secondary_eq_sidecar(&seg_path, index_id, &groups) {
Ok(()) => {}
Err(error) if is_not_found_io_error(&error) => {}
Err(error) => return Err(error),
}
}
}
}
Ok(())
}
fn validate_secondary_eq_snapshot_coverage(
db_dir: &Path,
index_id: u64,
snapshot: &SecondaryEqBuildSnapshot,
cancel: &AtomicBool,
) -> Result<SecondaryEqCoverageStatus, EngineError> {
let mut all_present = true;
for &segment_id in &snapshot.segment_ids {
if cancel.load(Ordering::Relaxed) {
return Ok(SecondaryEqCoverageStatus::Cancelled);
}
let seg_path = segment_dir(db_dir, segment_id);
if !seg_path.exists() {
all_present = false;
continue;
}
let segment =
match SegmentReader::open(&seg_path, segment_id, snapshot.dense_config.as_ref()) {
Ok(segment) => segment,
Err(error) if is_not_found_io_error(&error) => {
all_present = false;
continue;
}
Err(error) => return Err(error),
};
match segment.validate_secondary_eq_sidecar(index_id) {
Ok(true) => {}
Ok(false) => {
all_present = false;
}
Err(error) => {
return Ok(SecondaryEqCoverageStatus::Failed(error.to_string()));
}
}
}
Ok(if all_present {
SecondaryEqCoverageStatus::Covered
} else {
SecondaryEqCoverageStatus::Incomplete
})
}
fn finalize_secondary_eq_build_snapshot(
db_dir: &Path,
manifest_write_lock: &Arc<Mutex<()>>,
catalog_lock: &Arc<RwLock<SecondaryIndexCatalog>>,
entries_lock: &Arc<RwLock<SecondaryIndexEntries>>,
next_node_id_seen: &AtomicU64,
next_edge_id_seen: &AtomicU64,
engine_seq_seen: &AtomicU64,
index_id: u64,
snapshot: &SecondaryEqBuildSnapshot,
coverage: &SecondaryEqCoverageStatus,
) -> Result<SecondaryEqFinalizeOutcome, EngineError> {
let mut outcome = SecondaryEqFinalizeOutcome::Applied;
update_secondary_index_manifest_runtime(
db_dir,
manifest_write_lock,
catalog_lock,
entries_lock,
next_node_id_seen,
next_edge_id_seen,
engine_seq_seen,
|manifest| {
let Some(entry_pos) = manifest
.secondary_indexes
.iter()
.position(|entry| entry.index_id == index_id)
else {
outcome = SecondaryEqFinalizeOutcome::Inactive;
return Ok(());
};
let mut current_segment_ids: Vec<u64> =
manifest.segments.iter().map(|segment| segment.id).collect();
current_segment_ids.sort_unstable();
if current_segment_ids != snapshot.segment_ids {
outcome = SecondaryEqFinalizeOutcome::Retry;
return Ok(());
}
let entry = &mut manifest.secondary_indexes[entry_pos];
if entry.state != SecondaryIndexState::Building
|| !matches!(entry.kind, SecondaryIndexKind::Equality)
{
outcome = SecondaryEqFinalizeOutcome::Inactive;
return Ok(());
}
match coverage {
SecondaryEqCoverageStatus::Covered => {
entry.state = SecondaryIndexState::Ready;
entry.last_error = None;
}
SecondaryEqCoverageStatus::Incomplete => {
entry.state = SecondaryIndexState::Building;
entry.last_error = None;
}
SecondaryEqCoverageStatus::Failed(message) => {
entry.state = SecondaryIndexState::Failed;
entry.last_error = Some(message.clone());
}
SecondaryEqCoverageStatus::Cancelled => {
outcome = SecondaryEqFinalizeOutcome::Inactive;
}
}
Ok(())
},
)?;
Ok(outcome)
}
fn load_secondary_range_build_snapshot(
db_dir: &Path,
manifest_write_lock: &Arc<Mutex<()>>,
index_id: u64,
) -> Result<Option<SecondaryRangeBuildSnapshot>, EngineError> {
let _guard = manifest_write_lock.lock().unwrap();
let manifest = load_manifest_readonly(db_dir)?
.ok_or_else(|| EngineError::ManifestError("manifest missing".into()))?;
let Some(entry) = manifest
.secondary_indexes
.iter()
.find(|entry| entry.index_id == index_id)
.cloned()
else {
return Ok(None);
};
if entry.state != SecondaryIndexState::Building {
return Ok(None);
}
let SecondaryIndexKind::Range { domain } = entry.kind else {
return Ok(None);
};
let SecondaryIndexTarget::NodeProperty { type_id, prop_key } = entry.target;
let mut segment_ids: Vec<u64> = manifest.segments.iter().map(|segment| segment.id).collect();
segment_ids.sort_unstable();
Ok(Some(SecondaryRangeBuildSnapshot {
dense_config: manifest.dense_vector.clone(),
type_id,
prop_key,
domain,
segment_ids,
}))
}
fn build_secondary_range_sidecars_for_snapshot(
db_dir: &Path,
index_id: u64,
snapshot: &SecondaryRangeBuildSnapshot,
cancel: &AtomicBool,
) -> Result<(), EngineError> {
for &segment_id in &snapshot.segment_ids {
if cancel.load(Ordering::Relaxed) {
return Ok(());
}
let seg_path = segment_dir(db_dir, segment_id);
if !seg_path.exists() {
continue;
}
let segment =
match SegmentReader::open(&seg_path, segment_id, snapshot.dense_config.as_ref()) {
Ok(segment) => segment,
Err(error) if is_not_found_io_error(&error) => continue,
Err(error) => return Err(error),
};
match segment.validate_secondary_range_sidecar(index_id) {
Ok(true) => continue,
Ok(false) => {
let entries = build_secondary_range_entries_for_segment(
&segment,
snapshot.type_id,
&snapshot.prop_key,
snapshot.domain,
)?;
match install_secondary_range_sidecar(&seg_path, index_id, &entries) {
Ok(()) => {}
Err(error) if is_not_found_io_error(&error) => {}
Err(error) => return Err(error),
}
}
Err(error) if is_not_found_io_error(&error) => {}
Err(_) => {
let entries = build_secondary_range_entries_for_segment(
&segment,
snapshot.type_id,
&snapshot.prop_key,
snapshot.domain,
)?;
match install_secondary_range_sidecar(&seg_path, index_id, &entries) {
Ok(()) => {}
Err(error) if is_not_found_io_error(&error) => {}
Err(error) => return Err(error),
}
}
}
}
Ok(())
}
fn validate_secondary_range_snapshot_coverage(
db_dir: &Path,
index_id: u64,
snapshot: &SecondaryRangeBuildSnapshot,
cancel: &AtomicBool,
) -> Result<SecondaryRangeCoverageStatus, EngineError> {
let mut all_present = true;
for &segment_id in &snapshot.segment_ids {
if cancel.load(Ordering::Relaxed) {
return Ok(SecondaryRangeCoverageStatus::Cancelled);
}
let seg_path = segment_dir(db_dir, segment_id);
if !seg_path.exists() {
all_present = false;
continue;
}
let segment =
match SegmentReader::open(&seg_path, segment_id, snapshot.dense_config.as_ref()) {
Ok(segment) => segment,
Err(error) if is_not_found_io_error(&error) => {
all_present = false;
continue;
}
Err(error) => return Err(error),
};
match segment.validate_secondary_range_sidecar(index_id) {
Ok(true) => {}
Ok(false) => {
all_present = false;
}
Err(error) => {
return Ok(SecondaryRangeCoverageStatus::Failed(error.to_string()));
}
}
}
Ok(if all_present {
SecondaryRangeCoverageStatus::Covered
} else {
SecondaryRangeCoverageStatus::Incomplete
})
}
fn finalize_secondary_range_build_snapshot(
db_dir: &Path,
manifest_write_lock: &Arc<Mutex<()>>,
catalog_lock: &Arc<RwLock<SecondaryIndexCatalog>>,
entries_lock: &Arc<RwLock<SecondaryIndexEntries>>,
next_node_id_seen: &AtomicU64,
next_edge_id_seen: &AtomicU64,
engine_seq_seen: &AtomicU64,
index_id: u64,
snapshot: &SecondaryRangeBuildSnapshot,
coverage: &SecondaryRangeCoverageStatus,
) -> Result<SecondaryRangeFinalizeOutcome, EngineError> {
let mut outcome = SecondaryRangeFinalizeOutcome::Applied;
update_secondary_index_manifest_runtime(
db_dir,
manifest_write_lock,
catalog_lock,
entries_lock,
next_node_id_seen,
next_edge_id_seen,
engine_seq_seen,
|manifest| {
let Some(entry_pos) = manifest
.secondary_indexes
.iter()
.position(|entry| entry.index_id == index_id)
else {
outcome = SecondaryRangeFinalizeOutcome::Inactive;
return Ok(());
};
let mut current_segment_ids: Vec<u64> =
manifest.segments.iter().map(|segment| segment.id).collect();
current_segment_ids.sort_unstable();
if current_segment_ids != snapshot.segment_ids {
outcome = SecondaryRangeFinalizeOutcome::Retry;
return Ok(());
}
let entry = &mut manifest.secondary_indexes[entry_pos];
if entry.state != SecondaryIndexState::Building
|| !matches!(entry.kind, SecondaryIndexKind::Range { .. })
{
outcome = SecondaryRangeFinalizeOutcome::Inactive;
return Ok(());
}
match coverage {
SecondaryRangeCoverageStatus::Covered => {
entry.state = SecondaryIndexState::Ready;
entry.last_error = None;
}
SecondaryRangeCoverageStatus::Incomplete => {
entry.state = SecondaryIndexState::Building;
entry.last_error = None;
}
SecondaryRangeCoverageStatus::Failed(message) => {
entry.state = SecondaryIndexState::Failed;
entry.last_error = Some(message.clone());
}
SecondaryRangeCoverageStatus::Cancelled => {
outcome = SecondaryRangeFinalizeOutcome::Inactive;
}
}
Ok(())
},
)?;
Ok(outcome)
}
fn process_secondary_index_build(
db_dir: &Path,
manifest_write_lock: &Arc<Mutex<()>>,
catalog_lock: &Arc<RwLock<SecondaryIndexCatalog>>,
entries_lock: &Arc<RwLock<SecondaryIndexEntries>>,
next_node_id_seen: &AtomicU64,
next_edge_id_seen: &AtomicU64,
engine_seq_seen: &AtomicU64,
#[cfg(test)] build_pause: &Arc<Mutex<Option<SecondaryIndexBuildPauseHook>>>,
index_id: u64,
cancel: &AtomicBool,
) -> Result<(), EngineError> {
#[cfg(test)]
let mut build_pause_applied = false;
loop {
if cancel.load(Ordering::Relaxed) {
return Ok(());
}
#[cfg(test)]
if !build_pause_applied {
if let Some(hook) = build_pause.lock().unwrap().take() {
let _ = hook.ready_tx.send(());
let _ = hook.release_rx.recv();
}
build_pause_applied = true;
}
if let Some(snapshot) =
load_secondary_eq_build_snapshot(db_dir, manifest_write_lock, index_id)?
{
build_secondary_eq_sidecars_for_snapshot(db_dir, index_id, &snapshot, cancel)?;
let coverage =
validate_secondary_eq_snapshot_coverage(db_dir, index_id, &snapshot, cancel)?;
if matches!(coverage, SecondaryEqCoverageStatus::Cancelled) {
return Ok(());
}
match finalize_secondary_eq_build_snapshot(
db_dir,
manifest_write_lock,
catalog_lock,
entries_lock,
next_node_id_seen,
next_edge_id_seen,
engine_seq_seen,
index_id,
&snapshot,
&coverage,
)? {
SecondaryEqFinalizeOutcome::Applied | SecondaryEqFinalizeOutcome::Inactive => {
return Ok(())
}
SecondaryEqFinalizeOutcome::Retry => continue,
}
} else if let Some(snapshot) =
load_secondary_range_build_snapshot(db_dir, manifest_write_lock, index_id)?
{
build_secondary_range_sidecars_for_snapshot(db_dir, index_id, &snapshot, cancel)?;
let coverage =
validate_secondary_range_snapshot_coverage(db_dir, index_id, &snapshot, cancel)?;
if matches!(coverage, SecondaryRangeCoverageStatus::Cancelled) {
return Ok(());
}
match finalize_secondary_range_build_snapshot(
db_dir,
manifest_write_lock,
catalog_lock,
entries_lock,
next_node_id_seen,
next_edge_id_seen,
engine_seq_seen,
index_id,
&snapshot,
&coverage,
)? {
SecondaryRangeFinalizeOutcome::Applied
| SecondaryRangeFinalizeOutcome::Inactive => return Ok(()),
SecondaryRangeFinalizeOutcome::Retry => continue,
}
} else {
return Ok(());
}
}
}
fn process_secondary_index_drop_cleanup(
db_dir: &Path,
index_id: u64,
cancel: &AtomicBool,
) -> Result<(), EngineError> {
let manifest = load_manifest_readonly(db_dir)?
.ok_or_else(|| EngineError::ManifestError("manifest missing".into()))?;
for segment_info in &manifest.segments {
if cancel.load(Ordering::Relaxed) {
return Ok(());
}
let seg_dir = segment_dir(db_dir, segment_info.id);
for sidecar_path in [
node_prop_eq_sidecar_path(&seg_dir, index_id),
node_prop_range_sidecar_path(&seg_dir, index_id),
] {
if sidecar_path.exists() {
let _ = std::fs::remove_file(&sidecar_path);
if let Some(parent) = sidecar_path.parent() {
let _ = fsync_dir(parent);
}
}
}
}
Ok(())
}
fn bg_secondary_index_worker(
rx: std::sync::mpsc::Receiver<SecondaryIndexJob>,
cancel: Arc<AtomicBool>,
db_dir: PathBuf,
manifest_write_lock: Arc<Mutex<()>>,
catalog_lock: Arc<RwLock<SecondaryIndexCatalog>>,
entries_lock: Arc<RwLock<SecondaryIndexEntries>>,
next_node_id_seen: Arc<AtomicU64>,
next_edge_id_seen: Arc<AtomicU64>,
engine_seq_seen: Arc<AtomicU64>,
#[cfg(test)] build_pause: Arc<Mutex<Option<SecondaryIndexBuildPauseHook>>>,
) {
while let Ok(job) = rx.recv() {
if cancel.load(Ordering::Relaxed) {
break;
}
match job {
SecondaryIndexJob::Build { index_id } => {
if let Err(error) = process_secondary_index_build(
&db_dir,
&manifest_write_lock,
&catalog_lock,
&entries_lock,
next_node_id_seen.as_ref(),
next_edge_id_seen.as_ref(),
engine_seq_seen.as_ref(),
#[cfg(test)]
&build_pause,
index_id,
&cancel,
) {
mark_secondary_index_failed(
&db_dir,
&manifest_write_lock,
&catalog_lock,
&entries_lock,
next_node_id_seen.as_ref(),
next_edge_id_seen.as_ref(),
engine_seq_seen.as_ref(),
index_id,
&error,
);
}
}
SecondaryIndexJob::DropCleanup { index_id } => {
let _ = process_secondary_index_drop_cleanup(&db_dir, index_id, &cancel);
}
SecondaryIndexJob::Shutdown => break,
}
}
}
fn normalize_node_vectors_for_write(
dense_config: Option<&DenseVectorConfig>,
dense_vector: Option<&DenseVector>,
sparse_vector: Option<&SparseVector>,
) -> Result<(Option<DenseVector>, Option<SparseVector>), EngineError> {
let dense_vector = match dense_vector {
Some(values) => {
let config = dense_config.ok_or_else(|| {
EngineError::InvalidOperation(
"dense vector writes require DbOptions::dense_vector to be configured".into(),
)
})?;
validate_dense_vector(values, config)?;
Some(values.clone())
}
None => None,
};
let sparse_vector = match sparse_vector {
Some(values) => canonicalize_sparse_vector(values)?,
None => None,
};
Ok((dense_vector, sparse_vector))
}
fn normalize_owned_node_vectors_for_write(
dense_config: Option<&DenseVectorConfig>,
dense_vector: Option<DenseVector>,
sparse_vector: Option<SparseVector>,
) -> Result<(Option<DenseVector>, Option<SparseVector>), EngineError> {
let dense_vector = match dense_vector {
Some(values) => {
let config = dense_config.ok_or_else(|| {
EngineError::InvalidOperation(
"dense vector writes require DbOptions::dense_vector to be configured".into(),
)
})?;
validate_dense_vector(&values, config)?;
Some(values)
}
None => None,
};
let sparse_vector = match sparse_vector {
Some(values) => canonicalize_sparse_vector_owned(values)?,
None => None,
};
Ok((dense_vector, sparse_vector))
}
fn normalize_wal_op_for_write(
dense_config: Option<&DenseVectorConfig>,
op: &WalOp,
) -> Result<WalOp, EngineError> {
match op {
WalOp::UpsertNode(node) => {
let (dense_vector, sparse_vector) = normalize_node_vectors_for_write(
dense_config,
node.dense_vector.as_ref(),
node.sparse_vector.as_ref(),
)?;
let mut normalized = node.clone();
normalized.dense_vector = dense_vector;
normalized.sparse_vector = sparse_vector;
Ok(WalOp::UpsertNode(normalized))
}
_ => Ok(op.clone()),
}
}
fn normalize_wal_op_for_replay(
dense_config: Option<&DenseVectorConfig>,
op: WalOp,
) -> Result<WalOp, EngineError> {
normalize_wal_op_for_write(dense_config, &op).map_err(|err| match (&op, err) {
(WalOp::UpsertNode(node), EngineError::InvalidOperation(message)) => {
EngineError::CorruptWal(format!(
"invalid vector payload for replayed node {} (key={}): {}",
node.id, node.key, message
))
}
(_, err) => err,
})
}
#[inline]
fn is_edge_valid_at(valid_from: i64, valid_to: i64, reference_time: i64) -> bool {
valid_from <= reference_time && valid_to > reference_time
}
fn matches_prune_cutoff(
type_id: u32,
updated_at: i64,
weight: f32,
policy_age_cutoff: Option<i64>,
policy_max_weight: Option<f32>,
policy_type_id: Option<u32>,
) -> bool {
if let Some(tid) = policy_type_id {
if type_id != tid {
return false;
}
}
if let Some(cutoff) = policy_age_cutoff {
if updated_at >= cutoff {
return false;
}
}
if let Some(max_w) = policy_max_weight {
if weight > max_w {
return false;
}
}
true
}
struct PrecomputedPruneCutoffs {
policies: Vec<(Option<i64>, Option<f32>, Option<u32>)>,
}
impl PrecomputedPruneCutoffs {
fn from_manifest(manifest: &ManifestState, now: i64) -> Self {
let policies = manifest
.prune_policies
.values()
.map(|p| {
let age_cutoff = p.max_age_ms.map(|age| now - age);
(age_cutoff, p.max_weight, p.type_id)
})
.collect();
Self { policies }
}
fn excludes(&self, node: &NodeRecord) -> bool {
self.excludes_fields(node.type_id, node.updated_at, node.weight)
}
fn excludes_fields(&self, node_type_id: u32, updated_at: i64, weight: f32) -> bool {
for &(age_cutoff, max_weight, policy_type_id) in &self.policies {
if matches_prune_cutoff(
node_type_id,
updated_at,
weight,
age_cutoff,
max_weight,
policy_type_id,
) {
return true;
}
}
false
}
}
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct DegreeEntry {
pub out_degree: u32,
pub in_degree: u32,
pub out_weight_sum: f64,
pub in_weight_sum: f64,
pub self_loop_count: u32,
pub self_loop_weight_sum: f64,
pub temporal_edge_count: u32,
}
impl DegreeEntry {
pub const ZERO: DegreeEntry = DegreeEntry {
out_degree: 0,
in_degree: 0,
out_weight_sum: 0.0,
in_weight_sum: 0.0,
self_loop_count: 0,
self_loop_weight_sum: 0.0,
temporal_edge_count: 0,
};
}
#[inline]
fn is_cache_bypass_edge(valid_from: i64, valid_to: i64, created_at: i64) -> bool {
valid_to != i64::MAX || valid_from > created_at
}
pub struct DatabaseEngine {
db_dir: PathBuf,
manifest: ManifestState,
wal_writer_immediate: Option<WalWriter>,
wal_state: Option<Arc<(Mutex<WalSyncState>, Condvar)>>,
sync_thread: Option<JoinHandle<()>>,
memtable: Memtable,
segments: Vec<SegmentReader>,
next_node_id: u64,
next_edge_id: u64,
edge_uniqueness: bool,
flush_threshold: usize,
next_segment_id: u64,
compact_after_n_flushes: u32,
ingest_saved_compact_after_n_flushes: Option<u32>,
flush_count_since_last_compact: u32,
compacting: bool,
wal_sync_mode: WalSyncMode,
memtable_hard_cap: usize,
max_immutable_memtables: usize,
bg_compact: Option<BgCompactHandle>,
last_compaction_ms: Option<i64>,
degree_cache: NodeIdMap<DegreeEntry>,
engine_seq: u64,
next_node_id_seen: Arc<AtomicU64>,
next_edge_id_seen: Arc<AtomicU64>,
engine_seq_seen: Arc<AtomicU64>,
manifest_write_lock: Arc<Mutex<()>>,
immutable_epochs: Vec<ImmutableEpoch>,
immutable_bytes_total: usize,
active_wal_generation_id: u64,
bg_flush: Option<BgFlushHandle>,
secondary_index_catalog: Arc<RwLock<SecondaryIndexCatalog>>,
secondary_index_entries: Arc<RwLock<SecondaryIndexEntries>>,
secondary_index_bg: Option<SecondaryIndexBgHandle>,
flush_pipeline_error: Option<FlushPipelineError>,
flush_pipeline_error_reported: bool,
#[cfg(test)]
flush_pause: Mutex<Option<FlushPauseHook>>,
#[cfg(test)]
flush_publish_pause: Arc<Mutex<Option<FlushPublishPauseHook>>>,
#[cfg(test)]
bg_compact_pause: Arc<Mutex<Option<BgCompactPauseHook>>>,
#[cfg(test)]
secondary_index_build_pause: Arc<Mutex<Option<SecondaryIndexBuildPauseHook>>>,
#[cfg(test)]
flush_force_error: bool,
#[cfg(test)]
property_query_routes: PropertyQueryRouteCounters,
}
struct OldEdgeInfo {
from: u64,
to: u64,
weight: f32,
valid_at_now: bool,
created_at: i64,
valid_from: i64,
valid_to: i64,
}
#[derive(Clone, Copy)]
struct EdgeCore {
from: u64,
to: u64,
created_at: i64,
weight: f32,
valid_from: i64,
valid_to: i64,
}
struct BgCompactHandle {
handle: JoinHandle<Result<BgCompactResult, EngineError>>,
cancel: Arc<AtomicBool>,
}
struct BgCompactResult {
seg_info: SegmentInfo,
reader: SegmentReader,
old_seg_dirs: Vec<PathBuf>,
stats: CompactionStats,
input_segment_ids: NodeIdSet,
maintained_equality_index_ids: NodeIdSet,
maintained_range_index_ids: NodeIdSet,
secondary_index_report: SecondaryIndexMaintenanceReport,
}
struct BgFlushHandle {
work_tx: std::sync::mpsc::Sender<BgFlushWork>,
event_rx: Mutex<std::sync::mpsc::Receiver<BgFlushEvent>>,
build_handle: Option<JoinHandle<()>>,
publish_handle: Option<JoinHandle<()>>,
cancel: Arc<AtomicBool>,
events_ready: Arc<AtomicUsize>,
events_applied: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct SecondaryIndexLookupKey {
type_id: u32,
prop_key: String,
kind: SecondaryIndexKind,
}
enum SecondaryIndexJob {
Build { index_id: u64 },
DropCleanup { index_id: u64 },
Shutdown,
}
struct SecondaryIndexBgHandle {
job_tx: std::sync::mpsc::Sender<SecondaryIndexJob>,
handle: Option<JoinHandle<()>>,
cancel: Arc<AtomicBool>,
}
#[cfg(test)]
#[derive(Default)]
struct PropertyQueryRouteCounters {
equality_scan_fallback: AtomicUsize,
equality_index_lookup: AtomicUsize,
range_scan_fallback: AtomicUsize,
range_index_lookup: AtomicUsize,
}
#[cfg(test)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct PropertyQueryRouteSnapshot {
pub equality_scan_fallback: usize,
pub equality_index_lookup: usize,
pub range_scan_fallback: usize,
pub range_index_lookup: usize,
}
struct BgFlushWork {
epoch_id: u64,
frozen: Arc<Memtable>,
seg_id: u64,
tmp_dir: PathBuf,
final_dir: PathBuf,
dense_config: Option<DenseVectorConfig>,
wal_gen_id: u64,
#[cfg(test)]
pause: Option<FlushPauseHook>,
#[cfg(test)]
force_write_error: bool,
}
struct BuiltFlushResult {
epoch_id: u64,
wal_gen_to_retire: u64,
seg_info: SegmentInfo,
seg_id: u64,
final_dir: PathBuf,
dense_config: Option<DenseVectorConfig>,
maintained_equality_index_ids: NodeIdSet,
maintained_range_index_ids: NodeIdSet,
}
struct PublishedFlushAdoption {
epoch_id: u64,
wal_gen_to_retire: u64,
seg_info: SegmentInfo,
reader: SegmentReader,
rebuild_equality_index_ids: Vec<u64>,
rebuild_range_index_ids: Vec<u64>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum FlushPipelineStage {
Build,
PublishOpenReader,
PublishManifest,
}
#[derive(Debug, Clone)]
struct FlushPipelineError {
epoch_id: u64,
wal_generation_id: u64,
stage: FlushPipelineStage,
message: String,
}
impl FlushPipelineError {
fn to_engine_error(&self) -> EngineError {
EngineError::InvalidOperation(format!(
"bg flush {:?} failed for epoch {} wal {}: {}",
self.stage, self.epoch_id, self.wal_generation_id, self.message
))
}
}
#[allow(clippy::large_enum_variant)]
enum BgFlushEvent {
Adopt(PublishedFlushAdoption),
Failed(FlushPipelineError),
}
pub(crate) struct ImmutableEpoch {
pub(crate) epoch_id: u64,
pub(crate) wal_generation_id: u64,
pub(crate) memtable: Arc<Memtable>,
pub(crate) in_flight: bool,
}
#[cfg(test)]
struct FlushPauseHook {
ready_tx: std::sync::mpsc::SyncSender<()>,
release_rx: std::sync::mpsc::Receiver<()>,
}
#[cfg(test)]
struct FlushPublishPauseHook {
ready_tx: std::sync::mpsc::SyncSender<()>,
release_rx: std::sync::mpsc::Receiver<()>,
}
#[cfg(test)]
struct BgCompactPauseHook {
ready_tx: std::sync::mpsc::SyncSender<()>,
release_rx: std::sync::mpsc::Receiver<()>,
}
#[cfg(test)]
struct SecondaryIndexBuildPauseHook {
ready_tx: std::sync::mpsc::SyncSender<()>,
release_rx: std::sync::mpsc::Receiver<()>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum CompactionPath {
FastMerge,
UnifiedV3,
}
impl DatabaseEngine {
pub fn open(path: &Path, options: &DbOptions) -> Result<Self, EngineError> {
if !path.exists() {
if options.create_if_missing {
std::fs::create_dir_all(path)?;
} else {
return Err(EngineError::DatabaseNotFound(format!("{}", path.display())));
}
}
let loaded_manifest = load_manifest(path)?;
let created_manifest = loaded_manifest.is_none();
let mut manifest = match loaded_manifest {
Some(m) => m,
None => default_manifest(),
};
let manifest_dirty = reconcile_dense_vector_manifest(&mut manifest, options)?
|| normalize_secondary_index_manifest(&mut manifest)?;
if created_manifest || manifest_dirty {
write_manifest(path, &manifest)?;
};
let legacy_wal_path = path.join("data.wal");
let gen0_path = wal_generation_path(path, 0);
if manifest.next_wal_generation_id == 0
&& manifest.active_wal_generation_id == 0
&& manifest.pending_flush_epochs.is_empty()
&& legacy_wal_path.exists()
&& !gen0_path.exists()
{
std::fs::rename(&legacy_wal_path, &gen0_path)?;
}
let mut max_gen = manifest.active_wal_generation_id;
for epoch in &manifest.pending_flush_epochs {
max_gen = max_gen.max(epoch.wal_generation_id);
}
manifest.next_wal_generation_id = manifest.next_wal_generation_id.max(max_gen + 1);
let mut frozen_epochs: Vec<(u64, u64)> = manifest
.pending_flush_epochs
.iter()
.filter(|e| e.state == FlushEpochState::FrozenPendingFlush)
.map(|e| (e.epoch_id, e.wal_generation_id))
.collect();
frozen_epochs.sort_unstable_by_key(|&(_, gen)| gen);
let mut engine_seq = manifest.next_engine_seq;
let mut immutable_epochs_on_open: Vec<ImmutableEpoch> = Vec::new();
let mut immutable_bytes_on_open: usize = 0;
for &(epoch_id, wal_gen_id) in &frozen_epochs {
let mut frozen_mt = Memtable::new();
let wal_records = WalReader::read_generation(path, wal_gen_id)?;
for (seq, op) in wal_records {
let op = normalize_wal_op_for_replay(manifest.dense_vector.as_ref(), op)?;
engine_seq = engine_seq.max(seq);
frozen_mt.apply_op(&op, seq);
}
immutable_bytes_on_open += frozen_mt.estimated_size();
immutable_epochs_on_open.insert(
0,
ImmutableEpoch {
epoch_id,
wal_generation_id: wal_gen_id,
memtable: Arc::new(frozen_mt),
in_flight: false,
},
);
}
let mut memtable = Memtable::new();
let active_wal_records =
WalReader::read_generation(path, manifest.active_wal_generation_id)?;
for (seq, op) in active_wal_records {
let op = normalize_wal_op_for_replay(manifest.dense_vector.as_ref(), op)?;
engine_seq = engine_seq.max(seq);
memtable.apply_op(&op, seq);
}
let mut max_node_id = manifest
.next_node_id
.max(memtable.max_node_id().saturating_add(1));
let mut max_edge_id = manifest
.next_edge_id
.max(memtable.max_edge_id().saturating_add(1));
for epoch in &immutable_epochs_on_open {
max_node_id = max_node_id.max(epoch.memtable.max_node_id().saturating_add(1));
max_edge_id = max_edge_id.max(epoch.memtable.max_edge_id().saturating_add(1));
}
let next_node_id = max_node_id;
let next_edge_id = max_edge_id;
let mut segments = Vec::new();
for seg_info in manifest.segments.iter().rev() {
let seg_path = segment_dir(path, seg_info.id);
if seg_path.exists() {
let reader =
SegmentReader::open(&seg_path, seg_info.id, manifest.dense_vector.as_ref())?;
segments.push(reader);
} else if manifest.pending_flush_epochs.iter().any(|e| {
e.state == FlushEpochState::PublishedPendingRetire
&& e.segment_id == Some(seg_info.id)
}) {
return Err(EngineError::InvalidOperation(format!(
"manifest references published segment {} for pending flush recovery, but {} is missing",
seg_info.id,
seg_path.display()
)));
}
}
let live_segment_ids: NodeIdSet = manifest.segments.iter().map(|s| s.id).collect();
let mut published_gen_ids = Vec::new();
for epoch in manifest
.pending_flush_epochs
.iter()
.filter(|e| e.state == FlushEpochState::PublishedPendingRetire)
{
let seg_id = epoch.segment_id.ok_or_else(|| {
EngineError::InvalidOperation(format!(
"PublishedPendingRetire epoch {} is missing segment_id",
epoch.epoch_id
))
})?;
if !live_segment_ids.contains(&seg_id) {
return Err(EngineError::InvalidOperation(format!(
"PublishedPendingRetire epoch {} references segment {} that is not present in the manifest",
epoch.epoch_id, seg_id
)));
}
published_gen_ids.push(epoch.wal_generation_id);
}
if !published_gen_ids.is_empty() {
manifest
.pending_flush_epochs
.retain(|e| e.state != FlushEpochState::PublishedPendingRetire);
for gen_id in &published_gen_ids {
let _ = remove_wal_generation(path, *gen_id);
}
write_manifest(path, &manifest)?;
}
let manifest_max = manifest.segments.iter().map(|s| s.id).max().unwrap_or(0);
let fs_max = scan_max_segment_id(path);
let next_segment_id = manifest_max.max(fs_max).saturating_add(1);
cleanup_orphan_segments(path, &manifest);
cleanup_orphan_wal_files(path, &manifest);
let wal_writer = WalWriter::open_generation(path, manifest.active_wal_generation_id)?;
if let WalSyncMode::GroupCommit {
interval_ms,
soft_trigger_bytes,
hard_cap_bytes,
} = &options.wal_sync_mode
{
if *interval_ms == 0 {
return Err(EngineError::InvalidOperation(
"GroupCommit interval_ms must be > 0".into(),
));
}
if *soft_trigger_bytes == 0 {
return Err(EngineError::InvalidOperation(
"GroupCommit soft_trigger_bytes must be > 0".into(),
));
}
if *hard_cap_bytes == 0 {
return Err(EngineError::InvalidOperation(
"GroupCommit hard_cap_bytes must be > 0".into(),
));
}
if *hard_cap_bytes <= *soft_trigger_bytes {
return Err(EngineError::InvalidOperation(format!(
"GroupCommit hard_cap_bytes ({}) must be > soft_trigger_bytes ({})",
hard_cap_bytes, soft_trigger_bytes
)));
}
}
let wal_sync_mode = options.wal_sync_mode.clone();
let (wal_writer_immediate, wal_state, sync_thread) = match &wal_sync_mode {
WalSyncMode::Immediate => (Some(wal_writer), None, None),
WalSyncMode::GroupCommit { interval_ms, .. } => {
let state = WalSyncState {
wal_writer,
buffered_bytes: 0,
shutdown: false,
sync_error_count: 0,
poisoned: None,
};
let arc = Arc::new((Mutex::new(state), Condvar::new()));
let arc_clone = Arc::clone(&arc);
let interval = std::time::Duration::from_millis(*interval_ms);
let handle = std::thread::spawn(move || {
sync_thread_loop(arc_clone, interval);
});
(None, Some(arc), Some(handle))
}
};
let active_wal_generation_id = manifest.active_wal_generation_id;
let next_node_id_seen = Arc::new(AtomicU64::new(next_node_id));
let next_edge_id_seen = Arc::new(AtomicU64::new(next_edge_id));
let engine_seq_seen = Arc::new(AtomicU64::new(engine_seq));
let manifest_write_lock = Arc::new(Mutex::new(()));
let mut engine = DatabaseEngine {
db_dir: path.to_path_buf(),
manifest,
wal_writer_immediate,
wal_state,
sync_thread,
memtable,
segments,
next_node_id,
next_edge_id,
edge_uniqueness: options.edge_uniqueness,
flush_threshold: options.memtable_flush_threshold,
next_segment_id,
compact_after_n_flushes: options.compact_after_n_flushes,
ingest_saved_compact_after_n_flushes: None,
flush_count_since_last_compact: 0,
compacting: false,
wal_sync_mode,
memtable_hard_cap: options.memtable_hard_cap_bytes,
max_immutable_memtables: options.max_immutable_memtables,
bg_compact: None,
last_compaction_ms: None,
degree_cache: NodeIdMap::default(),
engine_seq,
next_node_id_seen,
next_edge_id_seen,
engine_seq_seen,
manifest_write_lock,
immutable_epochs: immutable_epochs_on_open,
immutable_bytes_total: immutable_bytes_on_open,
active_wal_generation_id,
bg_flush: None,
secondary_index_catalog: Arc::new(RwLock::new(HashMap::new())),
secondary_index_entries: Arc::new(RwLock::new(Vec::new())),
secondary_index_bg: None,
flush_pipeline_error: None,
flush_pipeline_error_reported: false,
#[cfg(test)]
flush_pause: Mutex::new(None),
#[cfg(test)]
flush_publish_pause: Arc::new(Mutex::new(None)),
#[cfg(test)]
bg_compact_pause: Arc::new(Mutex::new(None)),
#[cfg(test)]
secondary_index_build_pause: Arc::new(Mutex::new(None)),
#[cfg(test)]
flush_force_error: false,
#[cfg(test)]
property_query_routes: PropertyQueryRouteCounters::default(),
};
engine.rebuild_degree_cache()?;
engine.recover_secondary_index_states_on_open()?;
engine.rebuild_secondary_index_catalog()?;
engine.ensure_secondary_index_worker_if_needed();
engine.seed_secondary_indexes_from_manifest()?;
engine.schedule_building_secondary_indexes();
Ok(engine)
}
pub fn close(mut self) -> Result<(), EngineError> {
self.try_apply_all_bg_flushes();
let mut first_error: Option<EngineError> = None;
if !self.memtable.is_empty() || !self.immutable_epochs.is_empty() {
if let Err(e) = self.flush() {
first_error = Some(e);
}
} else {
self.drain_bg_flush();
}
self.wait_for_bg_compact();
for event in self.shutdown_bg_flush() {
let _ = self.process_bg_flush_event(event);
}
self.shutdown_secondary_index_worker();
let close_result = self.close_inner();
match (first_error, close_result) {
(Some(err), _) => Err(err),
(None, Err(err)) => Err(err),
(None, Ok(())) => self.current_flush_pipeline_error().map_or(Ok(()), Err),
}
}
pub fn close_fast(mut self) -> Result<(), EngineError> {
self.cancel_bg_compact();
self.try_apply_all_bg_flushes();
for event in self.shutdown_bg_flush() {
let _ = self.process_bg_flush_event(event);
}
self.shutdown_secondary_index_worker();
self.close_inner()?;
self.current_flush_pipeline_error().map_or(Ok(()), Err)
}
fn close_inner(&mut self) -> Result<(), EngineError> {
match &self.wal_sync_mode {
WalSyncMode::Immediate => {
if let Some(ref mut w) = self.wal_writer_immediate {
w.sync()?;
}
}
WalSyncMode::GroupCommit { .. } => {
if let Some(ref wal_state) = self.wal_state {
shutdown_sync_thread(wal_state, &mut self.sync_thread)?;
}
}
}
let active_wal_generation_id = self.active_wal_generation_id;
self.with_runtime_manifest_write(|manifest| {
manifest.active_wal_generation_id = active_wal_generation_id;
Ok(())
})
}
fn sources(&self) -> SourceList<'_> {
SourceList {
active: &self.memtable,
immutable: &self.immutable_epochs,
segments: &self.segments,
}
}
fn secondary_index_entries_snapshot(&self) -> SecondaryIndexEntries {
self.secondary_index_entries.read().unwrap().clone()
}
fn rebuild_secondary_index_catalog(&mut self) -> Result<(), EngineError> {
sync_secondary_index_runtime_state(
&self.secondary_index_catalog,
&self.secondary_index_entries,
&self.manifest.secondary_indexes,
)
}
fn recover_secondary_index_states_on_open(&mut self) -> Result<(), EngineError> {
let mut dirty = false;
for entry in &mut self.manifest.secondary_indexes {
if entry.state != SecondaryIndexState::Ready {
continue;
}
for segment in &self.segments {
let validation = match entry.kind {
SecondaryIndexKind::Equality => {
segment.validate_secondary_eq_sidecar(entry.index_id)
}
SecondaryIndexKind::Range { .. } => {
segment.validate_secondary_range_sidecar(entry.index_id)
}
};
match validation {
Ok(true) => continue,
Ok(false) => {
entry.state = SecondaryIndexState::Building;
entry.last_error = None;
dirty = true;
break;
}
Err(error) => {
entry.state = SecondaryIndexState::Failed;
entry.last_error = Some(error.to_string());
dirty = true;
break;
}
}
}
}
if dirty {
write_manifest(&self.db_dir, &self.manifest)?;
}
Ok(())
}
fn seed_secondary_indexes_from_manifest(&mut self) -> Result<(), EngineError> {
let entries = self.secondary_index_entries_snapshot();
for entry in &entries {
self.memtable.register_secondary_index(entry);
}
for epoch in &mut self.immutable_epochs {
let memtable = Arc::make_mut(&mut epoch.memtable);
for entry in &entries {
memtable.register_secondary_index(entry);
}
}
self.refresh_immutable_bytes_total();
Ok(())
}
fn seed_secondary_index_entry(
&mut self,
entry: &SecondaryIndexManifestEntry,
) -> Result<(), EngineError> {
self.memtable.register_secondary_index(entry);
for epoch in &mut self.immutable_epochs {
let memtable = Arc::make_mut(&mut epoch.memtable);
memtable.register_secondary_index(entry);
}
self.refresh_immutable_bytes_total();
Ok(())
}
fn remove_secondary_index_entry_from_memtables(
&mut self,
index_id: u64,
) -> Result<(), EngineError> {
self.memtable.unregister_secondary_index(index_id);
for epoch in &mut self.immutable_epochs {
let memtable = Arc::make_mut(&mut epoch.memtable);
memtable.unregister_secondary_index(index_id);
}
self.refresh_immutable_bytes_total();
Ok(())
}
fn ensure_secondary_index_worker(&mut self) {
if self.secondary_index_bg.is_some() {
return;
}
let (job_tx, job_rx) = std::sync::mpsc::channel();
let cancel = Arc::new(AtomicBool::new(false));
let cancel_clone = Arc::clone(&cancel);
let db_dir = self.db_dir.clone();
let manifest_write_lock = Arc::clone(&self.manifest_write_lock);
let catalog_lock = Arc::clone(&self.secondary_index_catalog);
let entries_lock = Arc::clone(&self.secondary_index_entries);
let next_node_id_seen = Arc::clone(&self.next_node_id_seen);
let next_edge_id_seen = Arc::clone(&self.next_edge_id_seen);
let engine_seq_seen = Arc::clone(&self.engine_seq_seen);
#[cfg(test)]
let build_pause = Arc::clone(&self.secondary_index_build_pause);
let handle = std::thread::spawn(move || {
bg_secondary_index_worker(
job_rx,
cancel_clone,
db_dir,
manifest_write_lock,
catalog_lock,
entries_lock,
next_node_id_seen,
next_edge_id_seen,
engine_seq_seen,
#[cfg(test)]
build_pause,
)
});
self.secondary_index_bg = Some(SecondaryIndexBgHandle {
job_tx,
handle: Some(handle),
cancel,
});
}
fn ensure_secondary_index_worker_if_needed(&mut self) {
let has_secondary_declarations = self
.secondary_index_entries_snapshot()
.into_iter()
.next()
.is_some();
if has_secondary_declarations {
self.ensure_secondary_index_worker();
}
}
fn enqueue_secondary_index_job(&mut self, job: SecondaryIndexJob) {
self.ensure_secondary_index_worker();
if let Some(bg) = &self.secondary_index_bg {
let _ = bg.job_tx.send(job);
}
}
fn schedule_building_secondary_indexes(&mut self) {
let building_ids: Vec<u64> = self
.secondary_index_entries_snapshot()
.into_iter()
.filter(|entry| entry.state == SecondaryIndexState::Building)
.map(|entry| entry.index_id)
.collect();
for index_id in building_ids {
self.enqueue_secondary_index_job(SecondaryIndexJob::Build { index_id });
}
}
fn shutdown_secondary_index_worker(&mut self) {
if let Some(mut bg) = self.secondary_index_bg.take() {
bg.cancel.store(true, Ordering::Relaxed);
let _ = bg.job_tx.send(SecondaryIndexJob::Shutdown);
if let Some(handle) = bg.handle.take() {
let _ = handle.join();
}
}
}
fn node_property_index_entry(
&self,
type_id: u32,
prop_key: &str,
kind: &SecondaryIndexKind,
) -> Option<SecondaryIndexManifestEntry> {
self.secondary_index_catalog
.read()
.unwrap()
.get(&type_id)?
.get(prop_key)?
.get(kind)
.cloned()
}
fn update_next_node_id_seen(&self) {
self.next_node_id_seen
.fetch_max(self.next_node_id, Ordering::Release);
}
fn update_next_edge_id_seen(&self) {
self.next_edge_id_seen
.fetch_max(self.next_edge_id, Ordering::Release);
}
fn refresh_immutable_bytes_total(&mut self) {
self.immutable_bytes_total = self
.immutable_epochs
.iter()
.map(|epoch| epoch.memtable.estimated_size())
.sum();
}
fn update_engine_seq_seen(&self) {
self.engine_seq_seen
.fetch_max(self.engine_seq, Ordering::Release);
}
fn merge_runtime_manifest_counters(&self, manifest: &mut ManifestState) {
merge_runtime_manifest_counters_from_shared(
manifest,
&self.next_node_id_seen,
&self.next_edge_id_seen,
&self.engine_seq_seen,
);
}
fn load_current_manifest_for_write(&self) -> Result<ManifestState, EngineError> {
let mut manifest = load_manifest_readonly(&self.db_dir)?
.ok_or_else(|| EngineError::ManifestError("manifest missing".into()))?;
manifest.next_wal_generation_id = manifest
.next_wal_generation_id
.max(self.manifest.next_wal_generation_id);
manifest.active_wal_generation_id = manifest
.active_wal_generation_id
.max(self.manifest.active_wal_generation_id);
Ok(manifest)
}
fn with_runtime_manifest_write<T>(
&mut self,
mutate: impl FnOnce(&mut ManifestState) -> Result<T, EngineError>,
) -> Result<T, EngineError> {
let _guard = self.manifest_write_lock.lock().unwrap();
let mut manifest = self.load_current_manifest_for_write()?;
let result = mutate(&mut manifest)?;
self.merge_runtime_manifest_counters(&mut manifest);
write_manifest(&self.db_dir, &manifest)?;
self.manifest = manifest;
Ok(result)
}
fn get_edge_core_for_cache(&self, id: u64) -> Result<Option<EdgeCore>, EngineError> {
if let Some(edge) = self.memtable.get_edge(id) {
return Ok(Some(EdgeCore {
from: edge.from,
to: edge.to,
created_at: edge.created_at,
weight: edge.weight,
valid_from: edge.valid_from,
valid_to: edge.valid_to,
}));
}
if self.memtable.deleted_edges().contains_key(&id) {
return Ok(None);
}
for epoch in &self.immutable_epochs {
if let Some(edge) = epoch.memtable.get_edge(id) {
return Ok(Some(EdgeCore {
from: edge.from,
to: edge.to,
created_at: edge.created_at,
weight: edge.weight,
valid_from: edge.valid_from,
valid_to: edge.valid_to,
}));
}
if epoch.memtable.deleted_edges().contains_key(&id) {
return Ok(None);
}
}
for seg in &self.segments {
if seg.is_edge_deleted(id) {
return Ok(None);
}
if let Some((from, to, created_at, weight, valid_from, valid_to)) =
seg.get_edge_core(id)?
{
return Ok(Some(EdgeCore {
from,
to,
created_at,
weight,
valid_from,
valid_to,
}));
}
}
Ok(None)
}
fn capture_old_edge_for_cache(&self, op: &WalOp, now: i64) -> Option<OldEdgeInfo> {
match op {
WalOp::UpsertEdge(edge) => {
self.get_edge_core_for_cache(edge.id)
.ok()
.flatten()
.map(|e| OldEdgeInfo {
from: e.from,
to: e.to,
weight: e.weight,
valid_at_now: is_edge_valid_at(e.valid_from, e.valid_to, now),
created_at: e.created_at,
valid_from: e.valid_from,
valid_to: e.valid_to,
})
}
WalOp::DeleteEdge { id, .. } => {
self.get_edge_core_for_cache(*id)
.ok()
.flatten()
.map(|e| OldEdgeInfo {
from: e.from,
to: e.to,
weight: e.weight,
valid_at_now: is_edge_valid_at(e.valid_from, e.valid_to, now),
created_at: e.created_at,
valid_from: e.valid_from,
valid_to: e.valid_to,
})
}
_ => None,
}
}
fn degree_cache_increment(
&mut self,
from: u64,
to: u64,
weight: f32,
created_at: i64,
valid_from: i64,
valid_to: i64,
) {
let temporal = is_cache_bypass_edge(valid_from, valid_to, created_at);
let from_entry = self.degree_cache.entry(from).or_default();
from_entry.out_degree += 1;
from_entry.out_weight_sum += weight as f64;
if from == to {
from_entry.in_degree += 1;
from_entry.in_weight_sum += weight as f64;
from_entry.self_loop_count += 1;
from_entry.self_loop_weight_sum += weight as f64;
if temporal {
from_entry.temporal_edge_count += 1;
}
} else {
if temporal {
from_entry.temporal_edge_count += 1;
}
let to_entry = self.degree_cache.entry(to).or_default();
to_entry.in_degree += 1;
to_entry.in_weight_sum += weight as f64;
if temporal {
to_entry.temporal_edge_count += 1;
}
}
}
fn degree_cache_decrement(
&mut self,
from: u64,
to: u64,
weight: f32,
created_at: i64,
valid_from: i64,
valid_to: i64,
) {
let temporal = is_cache_bypass_edge(valid_from, valid_to, created_at);
if let Some(from_entry) = self.degree_cache.get_mut(&from) {
from_entry.out_degree = from_entry.out_degree.saturating_sub(1);
from_entry.out_weight_sum -= weight as f64;
if temporal {
from_entry.temporal_edge_count = from_entry.temporal_edge_count.saturating_sub(1);
}
}
if from == to {
if let Some(entry) = self.degree_cache.get_mut(&from) {
entry.in_degree = entry.in_degree.saturating_sub(1);
entry.in_weight_sum -= weight as f64;
entry.self_loop_count = entry.self_loop_count.saturating_sub(1);
entry.self_loop_weight_sum -= weight as f64;
}
} else if let Some(to_entry) = self.degree_cache.get_mut(&to) {
to_entry.in_degree = to_entry.in_degree.saturating_sub(1);
to_entry.in_weight_sum -= weight as f64;
if temporal {
to_entry.temporal_edge_count = to_entry.temporal_edge_count.saturating_sub(1);
}
}
}
fn degree_cache_temporal_adjust(&mut self, from: u64, to: u64, delta: i32) {
let from_entry = self.degree_cache.entry(from).or_default();
if delta > 0 {
from_entry.temporal_edge_count += delta as u32;
} else {
from_entry.temporal_edge_count = from_entry
.temporal_edge_count
.saturating_sub((-delta) as u32);
}
if from != to {
let to_entry = self.degree_cache.entry(to).or_default();
if delta > 0 {
to_entry.temporal_edge_count += delta as u32;
} else {
to_entry.temporal_edge_count =
to_entry.temporal_edge_count.saturating_sub((-delta) as u32);
}
}
}
fn update_degree_cache_for_op(&mut self, op: &WalOp, old_edge: Option<OldEdgeInfo>, now: i64) {
match op {
WalOp::UpsertEdge(edge) => {
let new_valid = is_edge_valid_at(edge.valid_from, edge.valid_to, now);
let new_temporal =
is_cache_bypass_edge(edge.valid_from, edge.valid_to, edge.created_at);
match old_edge {
None => {
if new_valid {
self.degree_cache_increment(
edge.from,
edge.to,
edge.weight,
edge.created_at,
edge.valid_from,
edge.valid_to,
);
} else if new_temporal {
self.degree_cache_temporal_adjust(edge.from, edge.to, 1);
}
}
Some(old) => {
let old_temporal =
is_cache_bypass_edge(old.valid_from, old.valid_to, old.created_at);
if old.from == edge.from && old.to == edge.to {
if old.valid_at_now && new_valid {
let weight_delta = (edge.weight as f64) - (old.weight as f64);
if weight_delta.abs() > f64::EPSILON {
let from_entry =
self.degree_cache.entry(edge.from).or_default();
from_entry.out_weight_sum += weight_delta;
if edge.from == edge.to {
from_entry.in_weight_sum += weight_delta;
from_entry.self_loop_weight_sum += weight_delta;
} else {
let to_entry =
self.degree_cache.entry(edge.to).or_default();
to_entry.in_weight_sum += weight_delta;
}
}
if old_temporal != new_temporal {
let delta: i32 = if new_temporal { 1 } else { -1 };
self.degree_cache_temporal_adjust(edge.from, edge.to, delta);
}
} else if old.valid_at_now && !new_valid {
self.degree_cache_decrement(
old.from,
old.to,
old.weight,
old.created_at,
old.valid_from,
old.valid_to,
);
if new_temporal && !old_temporal {
self.degree_cache_temporal_adjust(edge.from, edge.to, 1);
}
} else if !old.valid_at_now && new_valid {
self.degree_cache_increment(
edge.from,
edge.to,
edge.weight,
edge.created_at,
edge.valid_from,
edge.valid_to,
);
if old_temporal && !new_temporal {
self.degree_cache_temporal_adjust(edge.from, edge.to, -1);
}
} else {
if old_temporal != new_temporal {
let delta: i32 = if new_temporal { 1 } else { -1 };
self.degree_cache_temporal_adjust(edge.from, edge.to, delta);
}
}
} else {
if old.valid_at_now {
self.degree_cache_decrement(
old.from,
old.to,
old.weight,
old.created_at,
old.valid_from,
old.valid_to,
);
} else if old_temporal {
self.degree_cache_temporal_adjust(old.from, old.to, -1);
}
if new_valid {
self.degree_cache_increment(
edge.from,
edge.to,
edge.weight,
edge.created_at,
edge.valid_from,
edge.valid_to,
);
} else if new_temporal {
self.degree_cache_temporal_adjust(edge.from, edge.to, 1);
}
}
}
}
}
WalOp::DeleteEdge { .. } => {
if let Some(old) = old_edge {
if old.valid_at_now {
self.degree_cache_decrement(
old.from,
old.to,
old.weight,
old.created_at,
old.valid_from,
old.valid_to,
);
} else if is_cache_bypass_edge(old.valid_from, old.valid_to, old.created_at) {
self.degree_cache_temporal_adjust(old.from, old.to, -1);
}
}
}
WalOp::DeleteNode { id, .. } => {
self.degree_cache.remove(id);
}
WalOp::UpsertNode(_) => {
}
}
}
fn append_and_apply_normalized(&mut self, ops: &[WalOp]) -> Result<(), EngineError> {
let base_seq = self.engine_seq;
let sequenced: Vec<(u64, WalOp)> = ops
.iter()
.enumerate()
.map(|(i, op)| (base_seq + 1 + i as u64, op.clone()))
.collect();
self.wal_append(|w| w.append_batch(&sequenced))?;
let now = now_millis();
for (seq, op) in &sequenced {
self.engine_seq = *seq;
let old_edge = self.capture_old_edge_for_cache(op, now);
self.memtable.apply_op(op, *seq);
self.update_degree_cache_for_op(op, old_edge, now);
}
self.update_engine_seq_seen();
Ok(())
}
fn append_and_apply(&mut self, ops: &[WalOp]) -> Result<(), EngineError> {
let normalized_ops: Vec<WalOp> = ops
.iter()
.map(|op| normalize_wal_op_for_write(self.manifest.dense_vector.as_ref(), op))
.collect::<Result<_, _>>()?;
self.append_and_apply_normalized(&normalized_ops)
}
fn append_and_apply_one_normalized(&mut self, op: &WalOp) -> Result<(), EngineError> {
let seq = self.engine_seq + 1;
self.wal_append(|w| w.append(op, seq))?;
self.engine_seq = seq;
let now = now_millis();
let old_edge = self.capture_old_edge_for_cache(op, now);
self.memtable.apply_op(op, seq);
self.update_degree_cache_for_op(op, old_edge, now);
self.update_engine_seq_seen();
Ok(())
}
fn append_and_apply_one(&mut self, op: &WalOp) -> Result<(), EngineError> {
let normalized = normalize_wal_op_for_write(self.manifest.dense_vector.as_ref(), op)?;
self.append_and_apply_one_normalized(&normalized)?;
Ok(())
}
fn wal_append<F>(&mut self, f: F) -> Result<(), EngineError>
where
F: FnOnce(&mut WalWriter) -> Result<usize, EngineError>,
{
match &self.wal_sync_mode {
WalSyncMode::Immediate => {
let w = self
.wal_writer_immediate
.as_mut()
.expect("immediate WAL writer");
f(w)?;
w.sync()?;
}
WalSyncMode::GroupCommit {
soft_trigger_bytes,
hard_cap_bytes,
..
} => {
let soft = *soft_trigger_bytes;
let hard = *hard_cap_bytes;
let arc = self.wal_state.as_ref().expect("group commit WAL state");
let (lock, cvar) = &**arc;
let mut state = lock.lock().unwrap();
if let Some(ref msg) = state.poisoned {
return Err(EngineError::WalSyncFailed(msg.clone()));
}
while state.buffered_bytes >= hard {
state = cvar.wait(state).unwrap();
if let Some(ref msg) = state.poisoned {
return Err(EngineError::WalSyncFailed(msg.clone()));
}
}
let bytes_written = f(&mut state.wal_writer)?;
state.buffered_bytes += bytes_written;
if state.buffered_bytes >= soft {
cvar.notify_all();
}
}
}
Ok(())
}
pub fn sync(&self) -> Result<(), EngineError> {
match &self.wal_sync_mode {
WalSyncMode::Immediate => Ok(()),
WalSyncMode::GroupCommit { .. } => {
let arc = self.wal_state.as_ref().expect("group commit WAL state");
let (lock, cvar) = &**arc;
let mut state = lock.lock().unwrap();
if let Some(ref msg) = state.poisoned {
return Err(EngineError::WalSyncFailed(msg.clone()));
}
if state.buffered_bytes > 0 {
state.wal_writer.sync()?;
state.buffered_bytes = 0;
state.sync_error_count = 0;
cvar.notify_all();
}
Ok(())
}
}
}
pub(crate) fn freeze_memtable(&mut self) -> Result<(), EngineError> {
if self.memtable.is_empty() {
return Ok(());
}
match &self.wal_sync_mode {
WalSyncMode::Immediate => {
self.wal_writer_immediate
.as_mut()
.expect("immediate WAL writer")
.sync()?;
}
WalSyncMode::GroupCommit { .. } => {
let arc = self.wal_state.as_ref().expect("group commit WAL state");
let (lock, cvar) = &**arc;
let mut state = lock.lock().unwrap();
if let Some(ref msg) = state.poisoned {
return Err(EngineError::WalSyncFailed(msg.clone()));
}
if state.buffered_bytes > 0 {
state.wal_writer.sync()?;
state.buffered_bytes = 0;
state.sync_error_count = 0;
cvar.notify_all();
}
}
}
self.update_engine_seq_seen();
let old_wal_gen = self.active_wal_generation_id;
let epoch_id = old_wal_gen;
let new_wal_gen = self.with_runtime_manifest_write(|manifest| {
let new_wal_gen = manifest.next_wal_generation_id;
manifest.next_wal_generation_id = new_wal_gen + 1;
manifest.pending_flush_epochs.push(FlushEpochMeta {
epoch_id,
wal_generation_id: old_wal_gen,
state: FlushEpochState::FrozenPendingFlush,
segment_id: None,
});
manifest.active_wal_generation_id = new_wal_gen;
Ok(new_wal_gen)
})?;
match &self.wal_sync_mode {
WalSyncMode::Immediate => {
self.wal_writer_immediate =
Some(WalWriter::open_generation(&self.db_dir, new_wal_gen)?);
}
WalSyncMode::GroupCommit { .. } => {
let arc = self.wal_state.as_ref().expect("group commit WAL state");
let (lock, _cvar) = &**arc;
let mut state = lock.lock().unwrap();
state.wal_writer = WalWriter::open_generation(&self.db_dir, new_wal_gen)?;
}
}
self.active_wal_generation_id = new_wal_gen;
let frozen = std::mem::take(&mut self.memtable);
let frozen_size = frozen.estimated_size();
self.immutable_epochs.insert(
0,
ImmutableEpoch {
epoch_id: old_wal_gen,
wal_generation_id: old_wal_gen,
memtable: Arc::new(frozen),
in_flight: false,
},
);
self.immutable_bytes_total += frozen_size;
Ok(())
}
pub fn flush(&mut self) -> Result<Option<SegmentInfo>, EngineError> {
self.try_complete_bg_compact();
self.try_apply_all_bg_flushes();
if self.memtable.is_empty() && self.immutable_epochs.is_empty() {
return self.current_flush_pipeline_error().map_or(Ok(None), Err);
}
if !self.memtable.is_empty() {
self.freeze_memtable()?;
}
if self.immutable_epochs.is_empty() {
return self.current_flush_pipeline_error().map_or(Ok(None), Err);
}
self.ensure_bg_flush_worker();
self.enqueue_all_non_in_flight()?;
let mut last_seg_info = None;
while self.immutable_epochs.iter().any(|e| e.in_flight) {
match self.wait_for_one_flush() {
Ok(Some(info)) => {
last_seg_info = Some(info);
}
Ok(None) => {}
Err(e) => return Err(e),
}
}
if let Some(err) = self.current_flush_pipeline_error() {
return Err(err);
}
Ok(last_seg_info)
}
fn total_memtable_bytes(&self) -> usize {
self.memtable.estimated_size() + self.immutable_bytes_total
}
fn maybe_auto_flush(&mut self) -> Result<(), EngineError> {
self.try_apply_all_bg_flushes();
self.maybe_surface_or_retry_flush_pipeline_error()?;
if self.flush_threshold > 0 && self.memtable.estimated_size() >= self.flush_threshold {
if !self.memtable.is_empty() {
self.freeze_memtable()?;
}
self.ensure_bg_flush_worker();
self.enqueue_all_non_in_flight()?;
}
Ok(())
}
fn maybe_backpressure_flush(&mut self) -> Result<(), EngineError> {
if self.bg_compact.is_some() {
self.try_complete_bg_compact();
}
self.try_apply_all_bg_flushes();
self.maybe_surface_or_retry_flush_pipeline_error()?;
let bytes_exceeded =
self.memtable_hard_cap > 0 && self.total_memtable_bytes() >= self.memtable_hard_cap;
let count_exceeded = self.max_immutable_memtables > 0
&& self.immutable_epochs.len() >= self.max_immutable_memtables;
if bytes_exceeded || count_exceeded {
if self.immutable_epochs.iter().any(|e| e.in_flight) {
self.wait_for_one_flush()?;
} else if !self.immutable_epochs.is_empty() {
self.ensure_bg_flush_worker();
self.enqueue_flush()?;
self.wait_for_one_flush()?;
} else {
self.freeze_memtable()?;
self.ensure_bg_flush_worker();
self.enqueue_flush()?;
self.wait_for_one_flush()?;
}
}
Ok(())
}
fn ensure_bg_flush_worker(&mut self) {
if self.bg_flush.is_some() {
return;
}
let (work_tx, work_rx) = std::sync::mpsc::channel();
let (built_tx, built_rx) = std::sync::mpsc::sync_channel(1);
let event_cap = self.max_immutable_memtables.max(4) + 1;
let (event_tx, event_rx) = std::sync::mpsc::sync_channel(event_cap);
let cancel = Arc::new(AtomicBool::new(false));
let events_ready = Arc::new(AtomicUsize::new(0));
let build_cancel = Arc::clone(&cancel);
let build_events_ready = Arc::clone(&events_ready);
let build_event_tx = event_tx.clone();
let build_secondary_indexes = Arc::clone(&self.secondary_index_entries);
let build_handle = std::thread::spawn(move || {
bg_flush_build_worker(
work_rx,
built_tx,
build_event_tx,
build_cancel,
build_events_ready,
build_secondary_indexes,
);
});
let publish_cancel = Arc::clone(&cancel);
let publish_events_ready = Arc::clone(&events_ready);
let db_dir = self.db_dir.clone();
let manifest_write_lock = Arc::clone(&self.manifest_write_lock);
let publish_catalog = Arc::clone(&self.secondary_index_catalog);
let publish_entries = Arc::clone(&self.secondary_index_entries);
let next_node_id_seen = Arc::clone(&self.next_node_id_seen);
let next_edge_id_seen = Arc::clone(&self.next_edge_id_seen);
let engine_seq_seen = Arc::clone(&self.engine_seq_seen);
#[cfg(test)]
let publish_pause = Arc::clone(&self.flush_publish_pause);
let publish_handle = std::thread::spawn(move || {
bg_flush_publish_worker(
db_dir,
built_rx,
event_tx,
manifest_write_lock,
publish_catalog,
publish_entries,
next_node_id_seen,
next_edge_id_seen,
engine_seq_seen,
publish_cancel,
publish_events_ready,
#[cfg(test)]
publish_pause,
);
});
self.bg_flush = Some(BgFlushHandle {
work_tx,
event_rx: Mutex::new(event_rx),
build_handle: Some(build_handle),
publish_handle: Some(publish_handle),
cancel,
events_ready,
events_applied: 0,
});
}
fn enqueue_flush(&mut self) -> Result<(), EngineError> {
let bg = self
.bg_flush
.as_ref()
.expect("bg flush worker must be running before enqueue");
let epoch_idx = self
.immutable_epochs
.iter()
.rposition(|e| !e.in_flight)
.expect("enqueue_flush: no non-in-flight epoch available");
let epoch_id = self.immutable_epochs[epoch_idx].epoch_id;
let wal_gen_id = self.immutable_epochs[epoch_idx].wal_generation_id;
let frozen = Arc::clone(&self.immutable_epochs[epoch_idx].memtable);
let seg_id = self.next_segment_id;
let segments_dir = self.db_dir.join("segments");
std::fs::create_dir_all(&segments_dir)?;
let work = BgFlushWork {
epoch_id,
frozen,
seg_id,
tmp_dir: segment_tmp_dir(&self.db_dir, seg_id),
final_dir: segment_dir(&self.db_dir, seg_id),
dense_config: self.manifest.dense_vector.clone(),
wal_gen_id,
#[cfg(test)]
pause: self.flush_pause.lock().unwrap().take(),
#[cfg(test)]
force_write_error: {
let err = self.flush_force_error;
self.flush_force_error = false;
err
},
};
bg.work_tx
.send(work)
.map_err(|_| EngineError::InvalidOperation("bg flush worker died".into()))?;
self.immutable_epochs[epoch_idx].in_flight = true;
self.next_segment_id += 1;
Ok(())
}
fn enqueue_all_non_in_flight(&mut self) -> Result<(), EngineError> {
while self.immutable_epochs.iter().any(|e| !e.in_flight) {
self.enqueue_flush()?;
}
Ok(())
}
fn try_apply_all_bg_flushes(&mut self) {
loop {
let event = {
let bg = match self.bg_flush.as_ref() {
Some(bg) => bg,
None => return,
};
let ready = bg.events_ready.load(Ordering::Acquire);
if ready <= bg.events_applied {
return;
}
let rx = bg.event_rx.lock().unwrap();
match rx.try_recv() {
Ok(event) => event,
Err(_) => return,
}
};
if let Some(bg) = self.bg_flush.as_mut() {
bg.events_applied += 1;
}
match self.process_bg_flush_event(event) {
Ok(_) => {}
Err(e) => eprintln!("try_apply_all_bg_flushes: {}", e),
}
}
}
fn wait_for_one_flush(&mut self) -> Result<Option<SegmentInfo>, EngineError> {
let recv_result = {
let bg = self
.bg_flush
.as_ref()
.ok_or_else(|| EngineError::InvalidOperation("no bg flush worker".into()))?;
let rx = bg.event_rx.lock().unwrap();
rx.recv()
};
match recv_result {
Ok(event) => {
if let Some(bg) = self.bg_flush.as_mut() {
bg.events_applied += 1;
}
let result = self.process_bg_flush_event(event);
if result.is_err() {
self.flush_pipeline_error_reported = true;
}
result
}
Err(_) => {
let shutdown_events = self.shutdown_bg_flush();
for event in shutdown_events {
let _ = self.process_bg_flush_event(event);
}
self.reset_all_flush_in_flight();
if let Some(err) = self.current_flush_pipeline_error() {
Err(err)
} else {
Err(EngineError::InvalidOperation("bg flush worker died".into()))
}
}
}
}
fn process_bg_flush_event(
&mut self,
event: BgFlushEvent,
) -> Result<Option<SegmentInfo>, EngineError> {
match event {
BgFlushEvent::Adopt(adoption) => {
let seg_info = adoption.seg_info.clone();
for index_id in &adoption.rebuild_equality_index_ids {
if let Some(entry) = self
.manifest
.secondary_indexes
.iter_mut()
.find(|entry| entry.index_id == *index_id)
{
if matches!(entry.kind, SecondaryIndexKind::Equality)
&& entry.state != SecondaryIndexState::Failed
{
entry.state = SecondaryIndexState::Building;
entry.last_error = None;
}
}
}
for index_id in &adoption.rebuild_range_index_ids {
if let Some(entry) = self
.manifest
.secondary_indexes
.iter_mut()
.find(|entry| entry.index_id == *index_id)
{
if matches!(entry.kind, SecondaryIndexKind::Range { .. })
&& entry.state != SecondaryIndexState::Failed
{
entry.state = SecondaryIndexState::Building;
entry.last_error = None;
}
}
}
if !self
.manifest
.segments
.iter()
.any(|s| s.id == adoption.seg_info.id)
{
self.manifest.segments.push(adoption.seg_info);
}
self.manifest.pending_flush_epochs.retain(|epoch| {
!(epoch.epoch_id == adoption.epoch_id
&& epoch.wal_generation_id == adoption.wal_gen_to_retire)
});
self.segments.insert(0, adoption.reader);
if let Some(idx) = self
.immutable_epochs
.iter()
.position(|epoch| epoch.epoch_id == adoption.epoch_id)
{
let removed = self.immutable_epochs.remove(idx);
self.immutable_bytes_total = self
.immutable_bytes_total
.saturating_sub(removed.memtable.estimated_size());
}
if self
.flush_pipeline_error
.as_ref()
.is_some_and(|err| err.epoch_id == adoption.epoch_id)
{
self.flush_pipeline_error = None;
self.flush_pipeline_error_reported = false;
}
if !self.compacting {
self.flush_count_since_last_compact =
self.flush_count_since_last_compact.saturating_add(1);
let _ = self.maybe_schedule_bg_compact();
}
for index_id in adoption.rebuild_equality_index_ids {
self.enqueue_secondary_index_job(SecondaryIndexJob::Build { index_id });
}
for index_id in adoption.rebuild_range_index_ids {
self.enqueue_secondary_index_job(SecondaryIndexJob::Build { index_id });
}
Ok(Some(seg_info))
}
BgFlushEvent::Failed(err) => {
self.record_flush_pipeline_error(err.clone());
self.reset_all_flush_in_flight();
let shutdown_events = self.shutdown_bg_flush();
for shutdown_event in shutdown_events {
let _ = self.process_bg_flush_event(shutdown_event);
}
Err(err.to_engine_error())
}
}
}
fn drain_bg_flush(&mut self) {
while self.immutable_epochs.iter().any(|e| e.in_flight) {
match self.wait_for_one_flush() {
Ok(_) => {}
Err(e) => {
eprintln!("drain_bg_flush: error waiting for flush: {}", e);
}
}
}
}
fn reset_all_flush_in_flight(&mut self) {
for epoch in &mut self.immutable_epochs {
epoch.in_flight = false;
}
}
fn record_flush_pipeline_error(&mut self, err: FlushPipelineError) {
match &self.flush_pipeline_error {
Some(existing) if existing.wal_generation_id < err.wal_generation_id => {}
_ => {
self.flush_pipeline_error = Some(err);
self.flush_pipeline_error_reported = false;
}
}
}
fn maybe_surface_or_retry_flush_pipeline_error(&mut self) -> Result<(), EngineError> {
if let Some(err) = self.flush_pipeline_error.clone() {
if !self.flush_pipeline_error_reported {
self.flush_pipeline_error_reported = true;
return Err(err.to_engine_error());
}
if self.immutable_epochs.iter().any(|epoch| !epoch.in_flight) {
self.ensure_bg_flush_worker();
self.enqueue_all_non_in_flight()?;
}
}
Ok(())
}
fn current_flush_pipeline_error(&mut self) -> Option<EngineError> {
self.flush_pipeline_error.clone().map(|err| {
self.flush_pipeline_error_reported = true;
err.to_engine_error()
})
}
fn shutdown_bg_flush(&mut self) -> Vec<BgFlushEvent> {
if let Some(mut bg) = self.bg_flush.take() {
bg.cancel.store(true, Ordering::Relaxed);
drop(bg.work_tx);
if let Some(handle) = bg.build_handle.take() {
let _ = handle.join();
}
if let Some(handle) = bg.publish_handle.take() {
let _ = handle.join();
}
let mut events = Vec::new();
let rx = bg.event_rx.lock().unwrap();
while let Ok(event) = rx.try_recv() {
events.push(event);
}
return events;
}
Vec::new()
}
fn maybe_schedule_bg_compact(&mut self) -> Result<(), EngineError> {
if self.compacting
|| self.bg_compact.is_some()
|| self.compact_after_n_flushes == 0
|| self.flush_count_since_last_compact < self.compact_after_n_flushes
|| self.segments.len() < 2
{
return Ok(());
}
self.start_bg_compact()
}
fn start_bg_compact(&mut self) -> Result<(), EngineError> {
if self.bg_compact.is_some() || self.segments.len() < 2 {
return Ok(());
}
let input_segments: Vec<(u64, PathBuf)> = self
.segments
.iter()
.map(|s| (s.segment_id, segment_dir(&self.db_dir, s.segment_id)))
.collect();
let seg_id = self.next_segment_id;
self.next_segment_id += 1;
self.flush_count_since_last_compact = 0;
let db_dir = self.db_dir.clone();
let prune_policies: Vec<PrunePolicy> =
self.manifest.prune_policies.values().cloned().collect();
let dense_vector = self.manifest.dense_vector.clone();
let secondary_indexes = self.secondary_index_entries_snapshot();
let cancel = Arc::new(AtomicBool::new(false));
let cancel_clone = Arc::clone(&cancel);
#[cfg(test)]
let compact_pause = Arc::clone(&self.bg_compact_pause);
let handle = std::thread::spawn(move || {
bg_compact_worker(
db_dir,
seg_id,
input_segments,
prune_policies,
dense_vector,
secondary_indexes,
&cancel_clone,
#[cfg(test)]
&compact_pause,
)
});
self.bg_compact = Some(BgCompactHandle { handle, cancel });
Ok(())
}
fn try_complete_bg_compact(&mut self) -> Option<CompactionStats> {
let is_finished = self
.bg_compact
.as_ref()
.is_some_and(|bg| bg.handle.is_finished());
if !is_finished {
return None;
}
let bg = self.bg_compact.take().unwrap();
self.join_bg_compact(bg)
}
fn wait_for_bg_compact(&mut self) -> Option<CompactionStats> {
let bg = self.bg_compact.take()?;
self.join_bg_compact(bg)
}
fn cancel_bg_compact(&mut self) {
if let Some(bg) = self.bg_compact.take() {
bg.cancel.store(true, Ordering::Relaxed);
let _ = bg.handle.join();
}
}
fn join_bg_compact(&mut self, bg: BgCompactHandle) -> Option<CompactionStats> {
match bg.handle.join() {
Ok(Ok(result)) => self.apply_bg_compact_result(result),
Ok(Err(e)) => {
eprintln!("Background compaction failed: {}", e);
None
}
Err(_) => {
eprintln!("Background compaction thread panicked");
None
}
}
}
fn apply_bg_compact_result(&mut self, result: BgCompactResult) -> Option<CompactionStats> {
let (updated_manifest, rebuild_equality_index_ids, rebuild_range_index_ids) = {
let _guard = self.manifest_write_lock.lock().unwrap();
let mut manifest = match self.load_current_manifest_for_write() {
Ok(manifest) => manifest,
Err(e) => {
eprintln!("Background compaction: manifest load failed: {}", e);
let output_dir = segment_dir(&self.db_dir, result.stats.output_segment_id);
let _ = std::fs::remove_dir_all(output_dir);
return None;
}
};
let live_seg_ids: NodeIdSet = manifest.segments.iter().map(|s| s.id).collect();
for input_id in &result.input_segment_ids {
if !live_seg_ids.contains(input_id) {
let output_dir = segment_dir(&self.db_dir, result.stats.output_segment_id);
let _ = std::fs::remove_dir_all(output_dir);
return None;
}
}
manifest
.segments
.retain(|s| !result.input_segment_ids.contains(&s.id));
manifest.segments.push(result.seg_info.clone());
apply_secondary_index_failure_report(&mut manifest, &result.secondary_index_report);
let rebuild_equality_index_ids = reconcile_background_output_equality_declarations(
&mut manifest,
&result.maintained_equality_index_ids,
);
let rebuild_range_index_ids = reconcile_background_output_range_declarations(
&mut manifest,
&result.maintained_range_index_ids,
);
self.merge_runtime_manifest_counters(&mut manifest);
if let Err(e) = write_manifest(&self.db_dir, &manifest) {
eprintln!("Background compaction: manifest write failed: {}", e);
let output_dir = segment_dir(&self.db_dir, result.stats.output_segment_id);
let _ = std::fs::remove_dir_all(output_dir);
return None;
}
(
manifest,
rebuild_equality_index_ids,
rebuild_range_index_ids,
)
};
self.manifest = updated_manifest;
if let Err(error) = self.rebuild_secondary_index_catalog() {
eprintln!(
"Background compaction: secondary index runtime sync failed: {}",
error
);
}
self.segments
.retain(|s| !result.input_segment_ids.contains(&s.segment_id));
self.segments.push(result.reader);
for dir in &result.old_seg_dirs {
let _ = std::fs::remove_dir_all(dir);
}
self.last_compaction_ms = Some(now_millis());
if let Err(e) = self.rebuild_degree_cache() {
eprintln!("Background compaction: degree cache rebuild failed: {}", e);
}
for index_id in rebuild_equality_index_ids {
self.enqueue_secondary_index_job(SecondaryIndexJob::Build { index_id });
}
for index_id in rebuild_range_index_ids {
self.enqueue_secondary_index_job(SecondaryIndexJob::Build { index_id });
}
let _ = self.maybe_schedule_bg_compact();
Some(result.stats)
}
pub fn ingest_mode(&mut self) {
if self.ingest_saved_compact_after_n_flushes.is_none() {
self.ingest_saved_compact_after_n_flushes = Some(self.compact_after_n_flushes);
}
self.compact_after_n_flushes = 0;
}
pub fn end_ingest(&mut self) -> Result<Option<CompactionStats>, EngineError> {
if let Some(previous) = self.ingest_saved_compact_after_n_flushes.take() {
self.compact_after_n_flushes = previous;
}
self.compact()
}
pub fn compact(&mut self) -> Result<Option<CompactionStats>, EngineError> {
self.compact_with_progress(|_| true)
}
pub fn compact_with_progress<F>(
&mut self,
mut callback: F,
) -> Result<Option<CompactionStats>, EngineError>
where
F: FnMut(&CompactionProgress) -> bool,
{
self.wait_for_bg_compact();
self.try_apply_all_bg_flushes();
if self.segments.len() < 2 {
return Ok(None);
}
self.compacting = true;
let result = self.compact_with_progress_inner(&mut callback);
self.compacting = false;
self.flush_count_since_last_compact = 0;
if let Ok(Some(_)) = &result {
self.rebuild_degree_cache()?;
}
result
}
fn compact_with_progress_inner<F>(
&mut self,
callback: &mut F,
) -> Result<Option<CompactionStats>, EngineError>
where
F: FnMut(&CompactionProgress) -> bool,
{
let compact_start = std::time::Instant::now();
if !self.memtable.is_empty() || !self.immutable_epochs.is_empty() {
self.flush()?;
}
let input_segment_count = self.segments.len();
let total_input_nodes: u64 = self.segments.iter().map(|s| s.node_count()).sum();
let total_input_edges: u64 = self.segments.iter().map(|s| s.edge_count()).sum();
let has_tombstones = self.segments.iter().any(|s| s.has_tombstones());
let policies: Vec<PrunePolicy> = self.manifest.prune_policies.values().cloned().collect();
let secondary_indexes = self.secondary_index_entries_snapshot();
let compaction_path =
select_compaction_path(&self.segments, has_tombstones, !policies.is_empty());
let segments_dir = self.db_dir.join("segments");
std::fs::create_dir_all(&segments_dir)?;
let seg_id = self.next_segment_id;
self.next_segment_id += 1;
let tmp_dir = segment_tmp_dir(&self.db_dir, seg_id);
let final_dir = segment_dir(&self.db_dir, seg_id);
let (seg_info, nodes_auto_pruned, edges_auto_pruned, secondary_index_report) =
match compaction_path {
CompactionPath::FastMerge => match self.compact_fast_merge(
&tmp_dir,
seg_id,
callback,
&secondary_indexes,
input_segment_count,
total_input_nodes,
total_input_edges,
) {
Ok((seg_info, report)) => (seg_info, 0, 0, report),
Err(e) => {
self.next_segment_id -= 1;
let _ = std::fs::remove_dir_all(&tmp_dir);
return Err(e);
}
},
CompactionPath::UnifiedV3 => match self.compact_standard(
&tmp_dir,
seg_id,
callback,
has_tombstones,
&policies,
&secondary_indexes,
input_segment_count,
total_input_nodes,
total_input_edges,
) {
Ok(result) => result,
Err(e) => {
self.next_segment_id -= 1;
let _ = std::fs::remove_dir_all(&tmp_dir);
return Err(e);
}
},
};
if let Err(e) = std::fs::rename(&tmp_dir, &final_dir) {
self.next_segment_id -= 1;
let _ = std::fs::remove_dir_all(&tmp_dir);
return Err(e.into());
}
let new_reader =
match SegmentReader::open(&final_dir, seg_id, self.manifest.dense_vector.as_ref()) {
Ok(r) => r,
Err(e) => {
self.next_segment_id -= 1;
let _ = std::fs::remove_dir_all(&final_dir);
return Err(e);
}
};
let old_seg_ids: NodeIdSet = self.segments.iter().map(|s| s.segment_id).collect();
let old_seg_dirs: Vec<PathBuf> = old_seg_ids
.iter()
.map(|&id| segment_dir(&self.db_dir, id))
.collect();
let new_manifest = {
let _guard = self.manifest_write_lock.lock().unwrap();
let mut manifest = self.load_current_manifest_for_write()?;
manifest.segments.retain(|s| !old_seg_ids.contains(&s.id));
manifest.segments.push(seg_info.clone());
apply_secondary_index_failure_report(&mut manifest, &secondary_index_report);
self.merge_runtime_manifest_counters(&mut manifest);
write_manifest(&self.db_dir, &manifest)?;
manifest
};
self.manifest = new_manifest;
self.rebuild_secondary_index_catalog()?;
self.segments.clear();
self.segments.push(new_reader);
for dir in &old_seg_dirs {
let _ = std::fs::remove_dir_all(dir);
}
let stats = CompactionStats {
segments_merged: input_segment_count,
nodes_kept: seg_info.node_count,
nodes_removed: total_input_nodes.saturating_sub(seg_info.node_count),
edges_kept: seg_info.edge_count,
edges_removed: total_input_edges.saturating_sub(seg_info.edge_count),
duration_ms: compact_start.elapsed().as_millis() as u64,
output_segment_id: seg_id,
nodes_auto_pruned,
edges_auto_pruned,
};
self.last_compaction_ms = Some(now_millis());
Ok(Some(stats))
}
fn compact_fast_merge<F>(
&self,
tmp_dir: &Path,
seg_id: u64,
callback: &mut F,
secondary_indexes: &[SecondaryIndexManifestEntry],
input_segment_count: usize,
total_input_nodes: u64,
total_input_edges: u64,
) -> Result<(SegmentInfo, SecondaryIndexMaintenanceReport), EngineError>
where
F: FnMut(&CompactionProgress) -> bool,
{
std::fs::create_dir_all(tmp_dir)?;
let cont = callback(&CompactionProgress {
phase: CompactionPhase::CollectingTombstones,
segments_processed: input_segment_count,
total_segments: input_segment_count,
records_processed: 0,
total_records: 0,
});
if !cont {
return Err(EngineError::CompactionCancelled);
}
let mut nodes_counted: u64 = 0;
for (i, seg) in self.segments.iter().enumerate() {
nodes_counted += seg.node_meta_count();
let cont = callback(&CompactionProgress {
phase: CompactionPhase::MergingNodes,
segments_processed: i + 1,
total_segments: input_segment_count,
records_processed: nodes_counted,
total_records: total_input_nodes,
});
if !cont {
return Err(EngineError::CompactionCancelled);
}
}
let mut edges_counted: u64 = 0;
for (i, seg) in self.segments.iter().enumerate() {
edges_counted += seg.edge_meta_count();
let cont = callback(&CompactionProgress {
phase: CompactionPhase::MergingEdges,
segments_processed: i + 1,
total_segments: input_segment_count,
records_processed: edges_counted,
total_records: total_input_edges,
});
if !cont {
return Err(EngineError::CompactionCancelled);
}
}
let cont = callback(&CompactionProgress {
phase: CompactionPhase::WritingOutput,
segments_processed: 0,
total_segments: 1,
records_processed: 0,
total_records: total_input_nodes + total_input_edges,
});
if !cont {
return Err(EngineError::CompactionCancelled);
}
build_fast_merge_output(
tmp_dir,
seg_id,
&self.segments,
self.manifest.dense_vector.as_ref(),
secondary_indexes,
)
}
#[allow(clippy::too_many_arguments)]
fn compact_standard<F>(
&self,
tmp_dir: &Path,
seg_id: u64,
callback: &mut F,
has_tombstones: bool,
prune_policies: &[PrunePolicy],
secondary_indexes: &[SecondaryIndexManifestEntry],
input_segment_count: usize,
total_input_nodes: u64,
total_input_edges: u64,
) -> Result<(SegmentInfo, u64, u64, SecondaryIndexMaintenanceReport), EngineError>
where
F: FnMut(&CompactionProgress) -> bool,
{
let mut deleted_nodes: NodeIdSet = NodeIdSet::default();
let mut deleted_edges: NodeIdSet = NodeIdSet::default();
if has_tombstones {
for (i, seg) in self.segments.iter().enumerate() {
deleted_nodes.extend(seg.deleted_node_ids());
deleted_edges.extend(seg.deleted_edge_ids());
let cont = callback(&CompactionProgress {
phase: CompactionPhase::CollectingTombstones,
segments_processed: i + 1,
total_segments: input_segment_count,
records_processed: 0,
total_records: 0,
});
if !cont {
return Err(EngineError::CompactionCancelled);
}
}
} else {
let cont = callback(&CompactionProgress {
phase: CompactionPhase::CollectingTombstones,
segments_processed: input_segment_count,
total_segments: input_segment_count,
records_processed: 0,
total_records: 0,
});
if !cont {
return Err(EngineError::CompactionCancelled);
}
}
let cont = callback(&CompactionProgress {
phase: CompactionPhase::MergingNodes,
segments_processed: 0,
total_segments: input_segment_count,
records_processed: 0,
total_records: total_input_nodes,
});
if !cont {
return Err(EngineError::CompactionCancelled);
}
let plan = v3_plan_winners(
&self.segments,
prune_policies,
&deleted_nodes,
&deleted_edges,
)?;
let cont = callback(&CompactionProgress {
phase: CompactionPhase::MergingNodes,
segments_processed: input_segment_count,
total_segments: input_segment_count,
records_processed: total_input_nodes,
total_records: total_input_nodes,
});
if !cont {
return Err(EngineError::CompactionCancelled);
}
let cont = callback(&CompactionProgress {
phase: CompactionPhase::MergingEdges,
segments_processed: input_segment_count,
total_segments: input_segment_count,
records_processed: total_input_edges,
total_records: total_input_edges,
});
if !cont {
return Err(EngineError::CompactionCancelled);
}
let cont = callback(&CompactionProgress {
phase: CompactionPhase::WritingOutput,
segments_processed: 0,
total_segments: 1,
records_processed: 0,
total_records: total_input_nodes + total_input_edges,
});
if !cont {
return Err(EngineError::CompactionCancelled);
}
let nodes_auto_pruned = plan.pruned_node_ids.len() as u64;
let edges_auto_pruned = plan.edges_auto_pruned;
let (seg_info, secondary_index_report) = v3_build_output(
tmp_dir,
seg_id,
&self.segments,
&plan,
self.manifest.dense_vector.as_ref(),
secondary_indexes,
)?;
Ok((
seg_info,
nodes_auto_pruned,
edges_auto_pruned,
secondary_index_report,
))
}
pub fn stats(&self) -> DbStats {
let pending_wal_bytes = match &self.wal_sync_mode {
WalSyncMode::Immediate => 0,
WalSyncMode::GroupCommit { .. } => self
.wal_state
.as_ref()
.map(|arc| {
let (lock, _) = &**arc;
lock.lock().map(|s| s.buffered_bytes).unwrap_or(0)
})
.unwrap_or(0),
};
let sync_mode_str = match &self.wal_sync_mode {
WalSyncMode::Immediate => "immediate".to_string(),
WalSyncMode::GroupCommit { .. } => "group-commit".to_string(),
};
let immutable_memtable_bytes = self.immutable_bytes_total;
let oldest_retained_wal_gen = self
.manifest
.pending_flush_epochs
.iter()
.map(|e| e.wal_generation_id)
.min()
.unwrap_or(self.active_wal_generation_id);
DbStats {
pending_wal_bytes,
segment_count: self.segments.len(),
node_tombstone_count: self.memtable.deleted_nodes().len(),
edge_tombstone_count: self.memtable.deleted_edges().len(),
last_compaction_ms: self.last_compaction_ms,
wal_sync_mode: sync_mode_str,
active_memtable_bytes: self.memtable.estimated_size(),
immutable_memtable_bytes,
immutable_memtable_count: self.immutable_epochs.len(),
pending_flush_count: self.immutable_epochs.iter().filter(|e| e.in_flight).count(),
active_wal_generation_id: self.active_wal_generation_id,
oldest_retained_wal_generation_id: oldest_retained_wal_gen,
}
}
pub fn write_op(&mut self, op: &WalOp) -> Result<(), EngineError> {
self.append_and_apply_one(op)?;
self.track_id(op);
Ok(())
}
pub fn write_op_batch(&mut self, ops: &[WalOp]) -> Result<(), EngineError> {
self.append_and_apply(ops)?;
for op in ops {
self.track_id(op);
}
Ok(())
}
fn track_id(&mut self, op: &WalOp) {
match op {
WalOp::UpsertNode(node) => {
if node.id >= self.next_node_id {
self.next_node_id = node.id + 1;
self.update_next_node_id_seen();
}
}
WalOp::UpsertEdge(edge) => {
if edge.id >= self.next_edge_id {
self.next_edge_id = edge.id + 1;
self.update_next_edge_id_seen();
}
}
_ => {}
}
}
pub fn path(&self) -> &Path {
&self.db_dir
}
pub fn manifest(&self) -> &ManifestState {
&self.manifest
}
pub fn node_count(&self) -> usize {
let mut count = self.memtable.node_count();
for epoch in &self.immutable_epochs {
count += epoch.memtable.node_count();
}
for seg in &self.segments {
count += (seg.node_count() as usize).saturating_sub(seg.deleted_node_count());
}
count
}
pub fn edge_count(&self) -> usize {
let mut count = self.memtable.edge_count();
for epoch in &self.immutable_epochs {
count += epoch.memtable.edge_count();
}
for seg in &self.segments {
count += (seg.edge_count() as usize).saturating_sub(seg.deleted_edge_count());
}
count
}
pub fn next_node_id(&self) -> u64 {
self.next_node_id
}
pub fn next_edge_id(&self) -> u64 {
self.next_edge_id
}
pub fn segment_count(&self) -> usize {
self.segments.len()
}
pub fn segment_tombstone_node_count(&self) -> usize {
self.segments.iter().map(|s| s.deleted_node_count()).sum()
}
pub fn segment_tombstone_edge_count(&self) -> usize {
self.segments.iter().map(|s| s.deleted_edge_count()).sum()
}
}
impl Drop for DatabaseEngine {
fn drop(&mut self) {
self.wait_for_bg_compact();
self.drain_bg_flush();
self.shutdown_bg_flush();
self.shutdown_secondary_index_worker();
if self.sync_thread.is_some() {
if let Some(ref wal_state) = self.wal_state {
let _ = shutdown_sync_thread(wal_state, &mut self.sync_thread);
}
}
}
}
fn scan_max_segment_id(db_dir: &Path) -> u64 {
let seg_parent = db_dir.join("segments");
let entries = match std::fs::read_dir(&seg_parent) {
Ok(e) => e,
Err(_) => return 0, };
let mut max_id: u64 = 0;
for entry in entries.flatten() {
let name = entry.file_name();
let name = name.to_string_lossy();
if let Some(id_str) = name.strip_prefix("seg_") {
if let Ok(id) = id_str.parse::<u64>() {
max_id = max_id.max(id);
}
}
}
max_id
}
fn fsync_dir(dir: &Path) -> Result<(), EngineError> {
#[cfg(not(target_os = "windows"))]
{
let file = std::fs::File::open(dir)?;
file.sync_all()?;
}
#[cfg(target_os = "windows")]
let _ = dir;
Ok(())
}
fn cleanup_orphan_segments(db_dir: &Path, manifest: &ManifestState) {
let seg_parent = db_dir.join("segments");
let entries = match std::fs::read_dir(&seg_parent) {
Ok(e) => e,
Err(_) => return,
};
let manifest_ids: NodeIdSet = manifest.segments.iter().map(|s| s.id).collect();
for entry in entries.flatten() {
let name = entry.file_name();
let name = name.to_string_lossy();
if let Some(id_str) = name.strip_prefix("seg_") {
if id_str.ends_with(".tmp") {
let _ = std::fs::remove_dir_all(entry.path());
continue;
}
if let Ok(id) = id_str.parse::<u64>() {
if !manifest_ids.contains(&id) {
let _ = std::fs::remove_dir_all(entry.path());
}
}
}
}
}
fn cleanup_orphan_wal_files(db_dir: &Path, manifest: &ManifestState) {
let mut live_gens: NodeIdSet = NodeIdSet::default();
live_gens.insert(manifest.active_wal_generation_id);
for epoch in &manifest.pending_flush_epochs {
live_gens.insert(epoch.wal_generation_id);
}
let entries = match std::fs::read_dir(db_dir) {
Ok(e) => e,
Err(_) => return,
};
for entry in entries.flatten() {
let name = entry.file_name();
let name = name.to_string_lossy();
if let Some(rest) = name.strip_prefix("wal_") {
if let Some(id_str) = rest.strip_suffix(".wal") {
if let Ok(gen_id) = id_str.parse::<u64>() {
if !live_gens.contains(&gen_id) {
let _ = std::fs::remove_file(entry.path());
}
}
}
}
}
}
#[cfg(test)]
impl DatabaseEngine {
pub(crate) fn degree_cache_entry(&self, node_id: u64) -> DegreeEntry {
self.degree_cache
.get(&node_id)
.copied()
.unwrap_or(DegreeEntry::ZERO)
}
pub(crate) fn immutable_memtable_count(&self) -> usize {
self.immutable_epochs.len()
}
pub(crate) fn active_wal_generation(&self) -> u64 {
self.active_wal_generation_id
}
pub(crate) fn engine_seq_for_test(&self) -> u64 {
self.engine_seq
}
pub(crate) fn active_memtable(&self) -> &Memtable {
&self.memtable
}
pub(crate) fn immutable_memtable(&self, idx: usize) -> &Memtable {
&self.immutable_epochs[idx].memtable
}
pub(crate) fn property_query_route_snapshot(&self) -> PropertyQueryRouteSnapshot {
PropertyQueryRouteSnapshot {
equality_scan_fallback: self
.property_query_routes
.equality_scan_fallback
.load(Ordering::Relaxed),
equality_index_lookup: self
.property_query_routes
.equality_index_lookup
.load(Ordering::Relaxed),
range_scan_fallback: self
.property_query_routes
.range_scan_fallback
.load(Ordering::Relaxed),
range_index_lookup: self
.property_query_routes
.range_index_lookup
.load(Ordering::Relaxed),
}
}
pub(crate) fn reset_property_query_routes(&self) {
self.property_query_routes
.equality_scan_fallback
.store(0, Ordering::Relaxed);
self.property_query_routes
.equality_index_lookup
.store(0, Ordering::Relaxed);
self.property_query_routes
.range_scan_fallback
.store(0, Ordering::Relaxed);
self.property_query_routes
.range_index_lookup
.store(0, Ordering::Relaxed);
}
pub(crate) fn set_flush_pause(
&mut self,
) -> (
std::sync::mpsc::Receiver<()>,
std::sync::mpsc::SyncSender<()>,
) {
let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel(1);
let (release_tx, release_rx) = std::sync::mpsc::sync_channel(1);
*self.flush_pause.lock().unwrap() = Some(FlushPauseHook {
ready_tx,
release_rx,
});
(ready_rx, release_tx)
}
pub(crate) fn set_flush_publish_pause(
&self,
) -> (
std::sync::mpsc::Receiver<()>,
std::sync::mpsc::SyncSender<()>,
) {
let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel(1);
let (release_tx, release_rx) = std::sync::mpsc::sync_channel(1);
*self.flush_publish_pause.lock().unwrap() = Some(FlushPublishPauseHook {
ready_tx,
release_rx,
});
(ready_rx, release_tx)
}
pub(crate) fn set_bg_compact_pause(
&self,
) -> (
std::sync::mpsc::Receiver<()>,
std::sync::mpsc::SyncSender<()>,
) {
let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel(1);
let (release_tx, release_rx) = std::sync::mpsc::sync_channel(1);
*self.bg_compact_pause.lock().unwrap() = Some(BgCompactPauseHook {
ready_tx,
release_rx,
});
(ready_rx, release_tx)
}
pub(crate) fn set_flush_force_error(&mut self) {
self.flush_force_error = true;
}
pub(crate) fn set_secondary_index_build_pause(
&self,
) -> (
std::sync::mpsc::Receiver<()>,
std::sync::mpsc::SyncSender<()>,
) {
let (ready_tx, ready_rx) = std::sync::mpsc::sync_channel(1);
let (release_tx, release_rx) = std::sync::mpsc::sync_channel(1);
*self.secondary_index_build_pause.lock().unwrap() = Some(SecondaryIndexBuildPauseHook {
ready_tx,
release_rx,
});
(ready_rx, release_tx)
}
pub(crate) fn enqueue_one_flush(&mut self) -> Result<(), EngineError> {
self.ensure_bg_flush_worker();
self.enqueue_flush()
}
pub(crate) fn wait_one_flush(&mut self) -> Result<Option<SegmentInfo>, EngineError> {
self.wait_for_one_flush()
}
pub(crate) fn wait_for_bg_compaction(&mut self) -> Option<CompactionStats> {
self.wait_for_bg_compact()
}
pub(crate) fn immutable_epoch_count(&self) -> usize {
self.immutable_epochs.len()
}
pub(crate) fn in_flight_count(&self) -> usize {
self.immutable_epochs.iter().filter(|e| e.in_flight).count()
}
}
fn segments_are_non_overlapping(segments: &[SegmentReader]) -> bool {
let mut node_ranges: Vec<(u64, u64)> =
segments.iter().filter_map(|s| s.node_id_range()).collect();
node_ranges.sort_unstable_by_key(|(min, _)| *min);
for window in node_ranges.windows(2) {
if window[0].1 >= window[1].0 {
return false;
}
}
let mut edge_ranges: Vec<(u64, u64)> =
segments.iter().filter_map(|s| s.edge_id_range()).collect();
edge_ranges.sort_unstable_by_key(|(min, _)| *min);
for window in edge_ranges.windows(2) {
if window[0].1 >= window[1].0 {
return false;
}
}
true
}
fn select_compaction_path(
segments: &[SegmentReader],
has_tombstones: bool,
has_active_prune_policies: bool,
) -> CompactionPath {
if !has_tombstones && !has_active_prune_policies && segments_are_non_overlapping(segments) {
CompactionPath::FastMerge
} else {
CompactionPath::UnifiedV3
}
}
fn send_bg_flush_event(
tx: &std::sync::mpsc::SyncSender<BgFlushEvent>,
events_ready: &AtomicUsize,
event: BgFlushEvent,
) {
if tx.send(event).is_ok() {
events_ready.fetch_add(1, Ordering::Release);
}
}
fn bg_flush_build_worker(
rx: std::sync::mpsc::Receiver<BgFlushWork>,
built_tx: std::sync::mpsc::SyncSender<BuiltFlushResult>,
event_tx: std::sync::mpsc::SyncSender<BgFlushEvent>,
cancel: Arc<AtomicBool>,
events_ready: Arc<AtomicUsize>,
secondary_index_entries: Arc<RwLock<SecondaryIndexEntries>>,
) {
while let Ok(work) = rx.recv() {
if cancel.load(Ordering::Relaxed) {
break;
}
#[cfg(test)]
{
if let Some(hook) = work.pause {
let _ = hook.ready_tx.send(()); let _ = hook.release_rx.recv(); }
if work.force_write_error {
send_bg_flush_event(
&event_tx,
&events_ready,
BgFlushEvent::Failed(FlushPipelineError {
epoch_id: work.epoch_id,
wal_generation_id: work.wal_gen_id,
stage: FlushPipelineStage::Build,
message: "injected test failure".into(),
}),
);
cancel.store(true, Ordering::Relaxed);
break;
}
}
if let Some(parent) = work.tmp_dir.parent() {
let _ = std::fs::create_dir_all(parent);
}
let current_secondary_indexes = secondary_index_entries.read().unwrap().clone();
let maintained_equality_index_ids = equality_index_ids_snapshot(¤t_secondary_indexes);
let maintained_range_index_ids = range_index_ids_snapshot(¤t_secondary_indexes);
let needs_reseed = current_secondary_indexes.iter().any(|entry| {
!work
.frozen
.secondary_index_declarations()
.contains_key(&entry.index_id)
});
let reseeded_frozen = needs_reseed.then(|| {
let mut memtable = (*work.frozen).clone();
for entry in ¤t_secondary_indexes {
memtable.register_secondary_index(entry);
}
memtable
});
let frozen_ref: &Memtable = if let Some(memtable) = reseeded_frozen.as_ref() {
memtable
} else {
work.frozen.as_ref()
};
let built_result = match write_segment_with_secondary_indexes(
&work.tmp_dir,
work.seg_id,
frozen_ref,
work.dense_config.as_ref(),
¤t_secondary_indexes,
) {
Ok(seg_info) => match std::fs::rename(&work.tmp_dir, &work.final_dir) {
Ok(()) => {
if let Some(parent) = work.final_dir.parent() {
if let Err(e) = fsync_dir(parent) {
let _ = std::fs::remove_dir_all(&work.final_dir);
send_bg_flush_event(
&event_tx,
&events_ready,
BgFlushEvent::Failed(FlushPipelineError {
epoch_id: work.epoch_id,
wal_generation_id: work.wal_gen_id,
stage: FlushPipelineStage::Build,
message: format!("segment parent fsync failed: {}", e),
}),
);
cancel.store(true, Ordering::Relaxed);
break;
} else {
BuiltFlushResult {
epoch_id: work.epoch_id,
wal_gen_to_retire: work.wal_gen_id,
seg_info,
seg_id: work.seg_id,
final_dir: work.final_dir,
dense_config: work.dense_config,
maintained_equality_index_ids,
maintained_range_index_ids,
}
}
} else {
let _ = std::fs::remove_dir_all(&work.final_dir);
send_bg_flush_event(
&event_tx,
&events_ready,
BgFlushEvent::Failed(FlushPipelineError {
epoch_id: work.epoch_id,
wal_generation_id: work.wal_gen_id,
stage: FlushPipelineStage::Build,
message: "segment final dir is missing a parent".into(),
}),
);
cancel.store(true, Ordering::Relaxed);
break;
}
}
Err(e) => {
let _ = std::fs::remove_dir_all(&work.tmp_dir);
send_bg_flush_event(
&event_tx,
&events_ready,
BgFlushEvent::Failed(FlushPipelineError {
epoch_id: work.epoch_id,
wal_generation_id: work.wal_gen_id,
stage: FlushPipelineStage::Build,
message: format!("segment rename failed: {}", e),
}),
);
cancel.store(true, Ordering::Relaxed);
break;
}
},
Err(e) => {
let _ = std::fs::remove_dir_all(&work.tmp_dir);
send_bg_flush_event(
&event_tx,
&events_ready,
BgFlushEvent::Failed(FlushPipelineError {
epoch_id: work.epoch_id,
wal_generation_id: work.wal_gen_id,
stage: FlushPipelineStage::Build,
message: format!("segment write failed: {}", e),
}),
);
cancel.store(true, Ordering::Relaxed);
break;
}
};
if built_tx.send(built_result).is_err() {
break;
}
}
}
#[allow(clippy::too_many_arguments)]
fn bg_flush_publish_worker(
db_dir: PathBuf,
rx: std::sync::mpsc::Receiver<BuiltFlushResult>,
event_tx: std::sync::mpsc::SyncSender<BgFlushEvent>,
manifest_write_lock: Arc<Mutex<()>>,
secondary_index_catalog: Arc<RwLock<SecondaryIndexCatalog>>,
secondary_index_entries: Arc<RwLock<SecondaryIndexEntries>>,
next_node_id_seen: Arc<AtomicU64>,
next_edge_id_seen: Arc<AtomicU64>,
engine_seq_seen: Arc<AtomicU64>,
cancel: Arc<AtomicBool>,
events_ready: Arc<AtomicUsize>,
#[cfg(test)] publish_pause: Arc<Mutex<Option<FlushPublishPauseHook>>>,
) {
while let Ok(result) = rx.recv() {
if cancel.load(Ordering::Relaxed) {
break;
}
let reader = match SegmentReader::open(
&result.final_dir,
result.seg_id,
result.dense_config.as_ref(),
) {
Ok(reader) => reader,
Err(e) => {
send_bg_flush_event(
&event_tx,
&events_ready,
BgFlushEvent::Failed(FlushPipelineError {
epoch_id: result.epoch_id,
wal_generation_id: result.wal_gen_to_retire,
stage: FlushPipelineStage::PublishOpenReader,
message: format!("failed to open segment {}: {}", result.seg_id, e),
}),
);
cancel.store(true, Ordering::Relaxed);
break;
}
};
#[cfg(test)]
if let Some(hook) = publish_pause.lock().unwrap().take() {
let _ = hook.ready_tx.send(());
let _ = hook.release_rx.recv();
}
let publish_result: Result<(Vec<u64>, Vec<u64>), EngineError> = (|| {
let _guard = manifest_write_lock.lock().unwrap();
let mut manifest = load_manifest_readonly(&db_dir)?
.ok_or_else(|| EngineError::ManifestError("manifest missing".into()))?;
if !manifest
.segments
.iter()
.any(|seg| seg.id == result.seg_info.id)
{
manifest.segments.push(result.seg_info.clone());
}
let pending_idx = manifest
.pending_flush_epochs
.iter()
.position(|epoch| {
epoch.epoch_id == result.epoch_id
&& epoch.wal_generation_id == result.wal_gen_to_retire
&& epoch.state == FlushEpochState::FrozenPendingFlush
})
.ok_or_else(|| {
EngineError::InvalidOperation(format!(
"missing FrozenPendingFlush epoch {} wal {} during publish",
result.epoch_id, result.wal_gen_to_retire
))
})?;
manifest.pending_flush_epochs.remove(pending_idx);
manifest.next_node_id = manifest
.next_node_id
.max(next_node_id_seen.load(Ordering::Acquire));
manifest.next_edge_id = manifest
.next_edge_id
.max(next_edge_id_seen.load(Ordering::Acquire));
manifest.next_engine_seq = manifest
.next_engine_seq
.max(engine_seq_seen.load(Ordering::Acquire));
let rebuild_equality_index_ids = reconcile_background_output_equality_declarations(
&mut manifest,
&result.maintained_equality_index_ids,
);
let rebuild_range_index_ids = reconcile_background_output_range_declarations(
&mut manifest,
&result.maintained_range_index_ids,
);
write_manifest(&db_dir, &manifest)?;
sync_secondary_index_runtime_state(
&secondary_index_catalog,
&secondary_index_entries,
&manifest.secondary_indexes,
)?;
Ok((rebuild_equality_index_ids, rebuild_range_index_ids))
})();
let (rebuild_equality_index_ids, rebuild_range_index_ids) = match publish_result {
Ok(ids) => ids,
Err(e) => {
send_bg_flush_event(
&event_tx,
&events_ready,
BgFlushEvent::Failed(FlushPipelineError {
epoch_id: result.epoch_id,
wal_generation_id: result.wal_gen_to_retire,
stage: FlushPipelineStage::PublishManifest,
message: e.to_string(),
}),
);
cancel.store(true, Ordering::Relaxed);
break;
}
};
let _ = remove_wal_generation(&db_dir, result.wal_gen_to_retire);
send_bg_flush_event(
&event_tx,
&events_ready,
BgFlushEvent::Adopt(PublishedFlushAdoption {
epoch_id: result.epoch_id,
wal_gen_to_retire: result.wal_gen_to_retire,
seg_info: result.seg_info,
reader,
rebuild_equality_index_ids,
rebuild_range_index_ids,
}),
);
}
}
fn bg_compact_worker(
db_dir: PathBuf,
seg_id: u64,
input_segments: Vec<(u64, PathBuf)>,
prune_policies: Vec<PrunePolicy>,
dense_vector: Option<DenseVectorConfig>,
secondary_indexes: SecondaryIndexEntries,
cancel: &AtomicBool,
#[cfg(test)] compact_pause: &Arc<Mutex<Option<BgCompactPauseHook>>>,
) -> Result<BgCompactResult, EngineError> {
let compact_start = std::time::Instant::now();
let maintained_equality_index_ids = equality_index_ids_snapshot(&secondary_indexes);
let maintained_range_index_ids = range_index_ids_snapshot(&secondary_indexes);
#[cfg(test)]
if let Some(hook) = compact_pause.lock().unwrap().take() {
let _ = hook.ready_tx.send(());
let _ = hook.release_rx.recv();
}
let mut segments = Vec::with_capacity(input_segments.len());
for (id, path) in &input_segments {
segments.push(SegmentReader::open(path, *id, dense_vector.as_ref())?);
}
let input_segment_count = segments.len();
let total_input_nodes: u64 = segments.iter().map(|s| s.node_count()).sum();
let total_input_edges: u64 = segments.iter().map(|s| s.edge_count()).sum();
let has_tombstones = segments.iter().any(|s| s.has_tombstones());
let compaction_path =
select_compaction_path(&segments, has_tombstones, !prune_policies.is_empty());
let segments_dir = db_dir.join("segments");
std::fs::create_dir_all(&segments_dir)?;
let tmp_dir = segment_tmp_dir(&db_dir, seg_id);
let final_dir = segment_dir(&db_dir, seg_id);
let (seg_info, nodes_auto_pruned, edges_auto_pruned, secondary_index_report) =
match compaction_path {
CompactionPath::FastMerge => {
match bg_fast_merge(
&segments,
&tmp_dir,
seg_id,
dense_vector.as_ref(),
&secondary_indexes,
cancel,
) {
Ok((seg_info, report)) => (seg_info, 0, 0, report),
Err(e) => {
let _ = std::fs::remove_dir_all(&tmp_dir);
return Err(e);
}
}
}
CompactionPath::UnifiedV3 => match bg_standard_merge(
&segments,
&tmp_dir,
seg_id,
has_tombstones,
&prune_policies,
dense_vector.as_ref(),
&secondary_indexes,
cancel,
) {
Ok(result) => result,
Err(e) => {
let _ = std::fs::remove_dir_all(&tmp_dir);
return Err(e);
}
},
};
if let Err(e) = std::fs::rename(&tmp_dir, &final_dir) {
let _ = std::fs::remove_dir_all(&tmp_dir);
return Err(e.into());
}
let reader = match SegmentReader::open(&final_dir, seg_id, dense_vector.as_ref()) {
Ok(r) => r,
Err(e) => {
let _ = std::fs::remove_dir_all(&final_dir);
return Err(e);
}
};
let input_segment_ids: NodeIdSet = input_segments.iter().map(|(id, _)| *id).collect();
let old_seg_dirs: Vec<PathBuf> = input_segments
.iter()
.map(|(id, _)| segment_dir(&db_dir, *id))
.collect();
let stats = CompactionStats {
segments_merged: input_segment_count,
nodes_kept: seg_info.node_count,
nodes_removed: total_input_nodes.saturating_sub(seg_info.node_count),
edges_kept: seg_info.edge_count,
edges_removed: total_input_edges.saturating_sub(seg_info.edge_count),
duration_ms: compact_start.elapsed().as_millis() as u64,
output_segment_id: seg_id,
nodes_auto_pruned,
edges_auto_pruned,
};
Ok(BgCompactResult {
seg_info,
reader,
old_seg_dirs,
stats,
input_segment_ids,
maintained_equality_index_ids,
maintained_range_index_ids,
secondary_index_report,
})
}
struct NodeWinner {
seg_idx: usize,
data_offset: u64,
data_len: u32,
type_id: u32,
updated_at: i64,
weight: f32,
key_len: u16,
prop_hash_offset: u64,
prop_hash_count: u32,
dense_vector_offset: u64,
dense_vector_len: u32,
sparse_vector_offset: u64,
sparse_vector_len: u32,
last_write_seq: u64,
}
struct EdgeWinner {
seg_idx: usize,
data_offset: u64,
data_len: u32,
from: u64,
to: u64,
type_id: u32,
updated_at: i64,
weight: f32,
valid_from: i64,
valid_to: i64,
last_write_seq: u64,
}
struct V3Plan {
node_winners: BTreeMap<u64, NodeWinner>,
edge_winners: BTreeMap<u64, EdgeWinner>,
pruned_node_ids: NodeIdSet,
edges_auto_pruned: u64,
}
fn matches_any_prune_policy_meta(
type_id: u32,
updated_at: i64,
weight: f32,
policies: &[PrunePolicy],
now: i64,
) -> bool {
for policy in policies {
let age_cutoff = policy.max_age_ms.map(|age| now - age);
if matches_prune_cutoff(
type_id,
updated_at,
weight,
age_cutoff,
policy.max_weight,
policy.type_id,
) {
return true;
}
}
false
}
fn v3_plan_winners(
segments: &[SegmentReader],
prune_policies: &[PrunePolicy],
deleted_nodes: &NodeIdSet,
deleted_edges: &NodeIdSet,
) -> Result<V3Plan, EngineError> {
let now = now_millis();
let has_policies = !prune_policies.is_empty();
let mut node_winners: BTreeMap<u64, NodeWinner> = BTreeMap::new();
let mut pruned_node_ids: NodeIdSet = NodeIdSet::default();
let mut seen_nodes: NodeIdSet = NodeIdSet::default();
for (seg_idx, seg) in segments.iter().enumerate() {
let count = seg.node_meta_count() as usize;
for i in 0..count {
let (
node_id,
data_offset,
data_len,
type_id,
updated_at,
weight,
key_len,
prop_hash_offset,
prop_hash_count,
last_write_seq,
) = seg.node_meta_at(i)?;
let (dense_vector_offset, dense_vector_len, sparse_vector_offset, sparse_vector_len) =
seg.node_vector_meta_at(i)?;
if seen_nodes.contains(&node_id) {
continue; }
seen_nodes.insert(node_id);
if deleted_nodes.contains(&node_id) {
continue; }
if has_policies
&& matches_any_prune_policy_meta(type_id, updated_at, weight, prune_policies, now)
{
pruned_node_ids.insert(node_id);
continue;
}
node_winners.insert(
node_id,
NodeWinner {
seg_idx,
data_offset,
data_len,
type_id,
updated_at,
weight,
key_len,
prop_hash_offset,
prop_hash_count,
dense_vector_offset,
dense_vector_len,
sparse_vector_offset,
sparse_vector_len,
last_write_seq,
},
);
}
}
let surviving_node_ids: NodeIdSet = node_winners.keys().copied().collect();
let mut edge_winners: BTreeMap<u64, EdgeWinner> = BTreeMap::new();
let mut seen_edges: NodeIdSet = NodeIdSet::default();
let mut edges_auto_pruned: u64 = 0;
for (seg_idx, seg) in segments.iter().enumerate() {
let count = seg.edge_meta_count() as usize;
for i in 0..count {
let (
edge_id,
data_offset,
data_len,
from,
to,
type_id,
updated_at,
weight,
valid_from,
valid_to,
last_write_seq,
) = seg.edge_meta_at(i)?;
if !seen_edges.insert(edge_id) {
continue;
}
if deleted_edges.contains(&edge_id) {
continue;
}
if deleted_nodes.contains(&from) || deleted_nodes.contains(&to) {
continue;
}
if !surviving_node_ids.contains(&from) || !surviving_node_ids.contains(&to) {
if pruned_node_ids.contains(&from) || pruned_node_ids.contains(&to) {
edges_auto_pruned += 1;
}
continue;
}
edge_winners.insert(
edge_id,
EdgeWinner {
seg_idx,
data_offset,
data_len,
from,
to,
type_id,
updated_at,
weight,
valid_from,
valid_to,
last_write_seq,
},
);
}
}
Ok(V3Plan {
node_winners,
edge_winners,
pruned_node_ids,
edges_auto_pruned,
})
}
fn v3_build_output(
tmp_dir: &Path,
seg_id: u64,
segments: &[SegmentReader],
plan: &V3Plan,
dense_config: Option<&DenseVectorConfig>,
secondary_indexes: &[SecondaryIndexManifestEntry],
) -> Result<(SegmentInfo, SecondaryIndexMaintenanceReport), EngineError> {
std::fs::create_dir_all(tmp_dir)?;
let node_winner_list: Vec<(u64, usize, u64, u32)> = plan
.node_winners
.iter()
.map(|(&id, w)| (id, w.seg_idx, w.data_offset, w.data_len))
.collect();
let edge_winner_list: Vec<(u64, usize, u64, u32)> = plan
.edge_winners
.iter()
.map(|(&id, w)| (id, w.seg_idx, w.data_offset, w.data_len))
.collect();
let node_data = write_v3_nodes_dat(tmp_dir, segments, &node_winner_list)?;
let edge_data = write_v3_edges_dat(tmp_dir, segments, &edge_winner_list)?;
if plan.node_winners.len() != node_data.len() {
return Err(EngineError::CorruptRecord(format!(
"compaction node winner count ({}) != output data count ({})",
plan.node_winners.len(),
node_data.len()
)));
}
if plan.edge_winners.len() != edge_data.len() {
return Err(EngineError::CorruptRecord(format!(
"compaction edge winner count ({}) != output data count ({})",
plan.edge_winners.len(),
edge_data.len()
)));
}
let mut node_metas = Vec::with_capacity(plan.node_winners.len());
for ((&node_id, w), &(data_id, new_data_offset, data_len)) in
plan.node_winners.iter().zip(node_data.iter())
{
if node_id != data_id {
return Err(EngineError::CorruptRecord(format!(
"compaction node ID mismatch: winner={}, data={}",
node_id, data_id
)));
}
node_metas.push(CompactNodeMeta {
node_id,
new_data_offset,
data_len,
type_id: w.type_id,
updated_at: w.updated_at,
weight: w.weight,
key_len: w.key_len,
prop_hash_offset: w.prop_hash_offset,
prop_hash_count: w.prop_hash_count,
dense_vector_offset: w.dense_vector_offset,
dense_vector_len: w.dense_vector_len,
sparse_vector_offset: w.sparse_vector_offset,
sparse_vector_len: w.sparse_vector_len,
src_seg_idx: w.seg_idx,
src_data_offset: w.data_offset,
last_write_seq: w.last_write_seq,
});
}
let mut edge_metas = Vec::with_capacity(plan.edge_winners.len());
for ((&edge_id, w), &(data_id, new_data_offset, data_len)) in
plan.edge_winners.iter().zip(edge_data.iter())
{
if edge_id != data_id {
return Err(EngineError::CorruptRecord(format!(
"compaction edge ID mismatch: winner={}, data={}",
edge_id, data_id
)));
}
edge_metas.push(CompactEdgeMeta {
edge_id,
new_data_offset,
data_len,
from: w.from,
to: w.to,
type_id: w.type_id,
updated_at: w.updated_at,
weight: w.weight,
valid_from: w.valid_from,
valid_to: w.valid_to,
last_write_seq: w.last_write_seq,
});
}
let secondary_index_report = write_indexes_from_metadata_with_secondary_indexes(
tmp_dir,
segments,
&node_metas,
&edge_metas,
dense_config,
secondary_indexes,
)?;
let node_count = plan.node_winners.len() as u64;
let edge_count = plan.edge_winners.len() as u64;
Ok((
SegmentInfo {
id: seg_id,
node_count,
edge_count,
},
secondary_index_report,
))
}
fn collect_fast_merge_node_metas(
segments: &[SegmentReader],
copy_info: &[FastMergeCopyInfo],
) -> Result<Vec<CompactNodeMeta>, EngineError> {
let mut metas = Vec::new();
for (seg_idx, seg) in segments.iter().enumerate() {
let info = ©_info[seg_idx];
for i in 0..seg.node_meta_count() as usize {
let (
node_id,
data_offset,
data_len,
type_id,
updated_at,
weight,
key_len,
prop_hash_offset,
prop_hash_count,
last_write_seq,
) = seg.node_meta_at(i)?;
let (dense_vector_offset, dense_vector_len, sparse_vector_offset, sparse_vector_len) =
seg.node_vector_meta_at(i)?;
let rebased_offset =
info.new_data_base
.checked_add(data_offset.checked_sub(info.orig_data_start).ok_or_else(
|| {
EngineError::CorruptRecord(format!(
"segment {} node {} data offset {} precedes data section {}",
seg.segment_id, node_id, data_offset, info.orig_data_start
))
},
)?)
.ok_or_else(|| {
EngineError::CorruptRecord(format!(
"segment {} node {} merged offset overflow",
seg.segment_id, node_id
))
})?;
metas.push(CompactNodeMeta {
node_id,
new_data_offset: rebased_offset,
data_len,
type_id,
updated_at,
weight,
key_len,
prop_hash_offset,
prop_hash_count,
dense_vector_offset,
dense_vector_len,
sparse_vector_offset,
sparse_vector_len,
src_seg_idx: seg_idx,
src_data_offset: data_offset,
last_write_seq,
});
}
}
metas.sort_unstable_by_key(|m| m.node_id);
for pair in metas.windows(2) {
if pair[0].node_id == pair[1].node_id {
return Err(EngineError::CorruptRecord(format!(
"fast-merge requires non-overlapping node IDs, found duplicate {}",
pair[0].node_id
)));
}
}
Ok(metas)
}
fn collect_fast_merge_edge_metas(
segments: &[SegmentReader],
copy_info: &[FastMergeCopyInfo],
) -> Result<Vec<CompactEdgeMeta>, EngineError> {
let mut metas = Vec::new();
for (seg_idx, seg) in segments.iter().enumerate() {
let info = ©_info[seg_idx];
for i in 0..seg.edge_meta_count() as usize {
let (
edge_id,
data_offset,
data_len,
from,
to,
type_id,
updated_at,
weight,
valid_from,
valid_to,
last_write_seq,
) = seg.edge_meta_at(i)?;
let rebased_offset =
info.new_data_base
.checked_add(data_offset.checked_sub(info.orig_data_start).ok_or_else(
|| {
EngineError::CorruptRecord(format!(
"segment {} edge {} data offset {} precedes data section {}",
seg.segment_id, edge_id, data_offset, info.orig_data_start
))
},
)?)
.ok_or_else(|| {
EngineError::CorruptRecord(format!(
"segment {} edge {} merged offset overflow",
seg.segment_id, edge_id
))
})?;
metas.push(CompactEdgeMeta {
edge_id,
new_data_offset: rebased_offset,
data_len,
from,
to,
type_id,
updated_at,
weight,
valid_from,
valid_to,
last_write_seq,
});
}
}
metas.sort_unstable_by_key(|m| m.edge_id);
for pair in metas.windows(2) {
if pair[0].edge_id == pair[1].edge_id {
return Err(EngineError::CorruptRecord(format!(
"fast-merge requires non-overlapping edge IDs, found duplicate {}",
pair[0].edge_id
)));
}
}
Ok(metas)
}
fn build_fast_merge_output(
tmp_dir: &Path,
seg_id: u64,
segments: &[SegmentReader],
dense_config: Option<&DenseVectorConfig>,
secondary_indexes: &[SecondaryIndexManifestEntry],
) -> Result<(SegmentInfo, SecondaryIndexMaintenanceReport), EngineError> {
std::fs::create_dir_all(tmp_dir)?;
let node_copy_info = write_merged_nodes_dat(tmp_dir, segments)?;
let edge_copy_info = write_merged_edges_dat(tmp_dir, segments)?;
let node_metas = collect_fast_merge_node_metas(segments, &node_copy_info)?;
let edge_metas = collect_fast_merge_edge_metas(segments, &edge_copy_info)?;
let secondary_index_report = write_indexes_from_metadata_with_secondary_indexes(
tmp_dir,
segments,
&node_metas,
&edge_metas,
dense_config,
secondary_indexes,
)?;
Ok((
SegmentInfo {
id: seg_id,
node_count: node_metas.len() as u64,
edge_count: edge_metas.len() as u64,
},
secondary_index_report,
))
}
fn bg_fast_merge(
segments: &[SegmentReader],
tmp_dir: &Path,
seg_id: u64,
dense_config: Option<&DenseVectorConfig>,
secondary_indexes: &[SecondaryIndexManifestEntry],
cancel: &AtomicBool,
) -> Result<(SegmentInfo, SecondaryIndexMaintenanceReport), EngineError> {
if cancel.load(Ordering::Relaxed) {
return Err(EngineError::CompactionCancelled);
}
let result =
build_fast_merge_output(tmp_dir, seg_id, segments, dense_config, secondary_indexes)?;
if cancel.load(Ordering::Relaxed) {
return Err(EngineError::CompactionCancelled);
}
Ok(result)
}
fn bg_standard_merge(
segments: &[SegmentReader],
tmp_dir: &Path,
seg_id: u64,
has_tombstones: bool,
prune_policies: &[PrunePolicy],
dense_config: Option<&DenseVectorConfig>,
secondary_indexes: &[SecondaryIndexManifestEntry],
cancel: &AtomicBool,
) -> Result<(SegmentInfo, u64, u64, SecondaryIndexMaintenanceReport), EngineError> {
let mut deleted_nodes: NodeIdSet = NodeIdSet::default();
let mut deleted_edges: NodeIdSet = NodeIdSet::default();
if has_tombstones {
for seg in segments {
deleted_nodes.extend(seg.deleted_node_ids());
deleted_edges.extend(seg.deleted_edge_ids());
}
}
if cancel.load(Ordering::Relaxed) {
return Err(EngineError::CompactionCancelled);
}
let plan = v3_plan_winners(segments, prune_policies, &deleted_nodes, &deleted_edges)?;
if cancel.load(Ordering::Relaxed) {
return Err(EngineError::CompactionCancelled);
}
let nodes_auto_pruned = plan.pruned_node_ids.len() as u64;
let edges_auto_pruned = plan.edges_auto_pruned;
let (seg_info, secondary_index_report) = v3_build_output(
tmp_dir,
seg_id,
segments,
&plan,
dense_config,
secondary_indexes,
)?;
Ok((
seg_info,
nodes_auto_pruned,
edges_auto_pruned,
secondary_index_report,
))
}
include!("graph_ops.rs");
include!("write.rs");
include!("read.rs");
#[cfg(test)]
#[allow(clippy::field_reassign_with_default)]
mod tests {
use super::*;
use tempfile::TempDir;
fn make_node(id: u64, key: &str) -> NodeRecord {
let mut props = BTreeMap::new();
props.insert("name".to_string(), PropValue::String(key.to_string()));
NodeRecord {
id,
type_id: 1,
key: key.to_string(),
props,
created_at: 1000 * id as i64,
updated_at: 1000 * id as i64 + 1,
weight: 0.5,
dense_vector: None,
sparse_vector: None,
last_write_seq: 0,
}
}
fn make_edge(id: u64, from: u64, to: u64) -> EdgeRecord {
EdgeRecord {
id,
from,
to,
type_id: 10,
props: BTreeMap::new(),
created_at: 2000 * id as i64,
updated_at: 2000 * id as i64 + 1,
weight: 1.0,
valid_from: 0,
valid_to: i64::MAX,
last_write_seq: 0,
}
}
include!("tests/lifecycle.rs");
include!("tests/write.rs");
include!("tests/read.rs");
include!("tests/graph_ops.rs");
}