use crate::features::ops;
use crate::features::storage::compaction;
use super::wal_codec::{
bytes_to_u64_list, decode_typed_edge_delta_payload, encode_embedding_pending_wal_record,
encode_typed_edge_delta_payload, TypedEdgeDeltaOp,
};
use super::*;
pub fn put_edge_delta(handle: &mut StorageHandle, delta: &[u8]) -> Result<()> {
let entry = decode_delta(delta, sstable::EntryKind::EdgeDelta)?;
if let Some(typed_edge) = decode_typed_edge_delta_payload(&entry.value) {
super::reads::apply_typed_inbound_edge_delta(
handle,
entry.key,
typed_edge.target_id,
&typed_edge.rel_type,
typed_edge.op,
)?;
}
invalidate_logical_node_cache(handle, entry.key);
increment_pending_delta(handle, entry.key);
let record = encode_wal_record(sstable::EntryKind::EdgeDelta, delta);
let wal_bytes = handle.wal.append_record(&record)?;
handle.metrics.wal_bytes_written += wal_bytes;
handle.metrics.logical_bytes_written += delta.len() as u64;
handle.memtable.put(entry);
Ok(())
}
pub fn put_edge_deltas_batch(handle: &mut StorageHandle, deltas: &[Vec<u8>]) -> Result<()> {
if deltas.is_empty() {
return Ok(());
}
let mut entries = Vec::with_capacity(deltas.len());
let mut records = Vec::with_capacity(deltas.len());
let mut logical_bytes = 0u64;
for delta in deltas {
let entry = decode_delta(delta, sstable::EntryKind::EdgeDelta)?;
if let Some(typed_edge) = decode_typed_edge_delta_payload(&entry.value) {
super::reads::apply_typed_inbound_edge_delta(
handle,
entry.key,
typed_edge.target_id,
&typed_edge.rel_type,
typed_edge.op,
)?;
}
invalidate_logical_node_cache(handle, entry.key);
increment_pending_delta(handle, entry.key);
let record = encode_wal_record(sstable::EntryKind::EdgeDelta, delta);
entries.push(entry);
records.push(record);
logical_bytes += delta.len() as u64;
}
let wal_bytes = handle
.wal
.append_records_batch(records.iter().map(|record| record.as_slice()))?;
handle.metrics.wal_bytes_written += wal_bytes;
handle.metrics.logical_bytes_written += logical_bytes;
for entry in entries {
handle.memtable.put(entry);
}
Ok(())
}
pub fn put_embedding_pending(handle: &mut StorageHandle, node_id: u64) -> Result<()> {
let record = encode_embedding_pending_wal_record(node_id);
let wal_bytes = handle.wal.append_record(&record)?;
handle.metrics.wal_bytes_written += wal_bytes;
handle.embedding_pending_nodes.insert(node_id);
Ok(())
}
pub fn put_vector_delta_batch(handle: &mut StorageHandle, deltas: &[Vec<u8>]) -> Result<()> {
if deltas.is_empty() {
return Ok(());
}
let mut entries = Vec::with_capacity(deltas.len());
let mut records = Vec::with_capacity(deltas.len());
let mut hnsw_inserts: Vec<(u32, u64, Vec<f32>)> = Vec::new();
let mut inserted_space_ids: std::collections::HashSet<u32> = std::collections::HashSet::new();
let mut logical_bytes = 0u64;
for delta in deltas {
let entry = decode_delta(delta, sstable::EntryKind::VectorDelta)?;
let decoded = decode_vector_payload(&entry.value).map_err(StorageError::CorruptData)?;
match &decoded {
DecodedVectorPayload::Structured(vector) => {
if handle.manifest.is_space_retired(vector.descriptor.space_id) {
return Err(StorageError::SpaceRetired(vector.descriptor.space_id));
}
register_vector_space_for_write(handle, &vector.descriptor)?;
if vector.descriptor.metric == VectorMetric::Cosine {
inserted_space_ids.insert(vector.descriptor.space_id);
hnsw_inserts.push((
vector.descriptor.space_id,
entry.key,
vector.values.clone(),
));
}
}
DecodedVectorPayload::LegacyF32(values) => {
if !handle.manifest.legacy_vector_raw_f32_compat() {
return Err(StorageError::InvalidInput(
"legacy raw f32 vector payload writes are disabled".to_string(),
));
}
inserted_space_ids.insert(0);
hnsw_inserts.push((0, entry.key, values.clone()));
}
}
invalidate_logical_node_cache(handle, entry.key);
increment_pending_delta(handle, entry.key);
let record = encode_wal_record(sstable::EntryKind::VectorDelta, delta);
entries.push(entry);
records.push(record);
logical_bytes += delta.len() as u64;
}
let wal_bytes = handle
.wal
.append_records_batch(records.iter().map(|r| r.as_slice()))?;
handle.metrics.wal_bytes_written += wal_bytes;
handle.metrics.logical_bytes_written += logical_bytes;
for entry in entries {
handle.embedding_pending_nodes.remove(&entry.key);
handle.memtable.put(entry);
}
let hnsw_count = hnsw_inserts.len();
for (space_id, node_id, vector) in hnsw_inserts {
super::vector::hnsw_insert_raw(handle, space_id, node_id, vector);
}
handle.hnsw_total_vectors = handle.hnsw_graphs.values().map(|g| g.len() as u64).sum();
handle.hnsw_updated_vectors = handle
.hnsw_updated_vectors
.saturating_add(hnsw_count as u64);
let any_suspended = inserted_space_ids
.iter()
.any(|id| handle.suspended_spaces.contains(id));
if !any_suspended {
if let Some(plan) = handle.hnsw_scheduler.should_rebuild(
handle.hnsw_total_vectors.max(1),
handle.hnsw_updated_vectors,
) {
handle.last_hnsw_rebuild_reason = Some(plan.reason);
handle.hnsw_updated_vectors = 0;
}
}
Ok(())
}
pub fn put_tombstone(handle: &mut StorageHandle, node_id: u64, version: u64) -> Result<()> {
let delta = encode_delta(node_id, version, &[]);
let record = encode_wal_record(sstable::EntryKind::Tombstone, &delta);
let wal_bytes = handle.wal.append_record(&record)?;
handle.metrics.wal_bytes_written += wal_bytes;
handle.tombstoned_node_ids.insert(node_id);
Ok(())
}
pub fn retire_vector_space(handle: &mut StorageHandle, space_id: u32) -> Result<()> {
handle.manifest.retire_space(space_id);
handle.manifest.persist()?;
Ok(())
}
pub fn put_vector_delta(handle: &mut StorageHandle, delta: &[u8]) -> Result<()> {
let entry = decode_delta(delta, sstable::EntryKind::VectorDelta)?;
let decoded = decode_vector_payload(&entry.value).map_err(StorageError::CorruptData)?;
let hnsw_vec = match &decoded {
DecodedVectorPayload::Structured(vector) => {
if handle.manifest.is_space_retired(vector.descriptor.space_id) {
return Err(StorageError::SpaceRetired(vector.descriptor.space_id));
}
register_vector_space_for_write(handle, &vector.descriptor)?;
if vector.descriptor.metric == VectorMetric::Cosine {
Some((vector.descriptor.space_id, vector.values.clone()))
} else {
None
}
}
DecodedVectorPayload::LegacyF32(values) => {
if !handle.manifest.legacy_vector_raw_f32_compat() {
return Err(StorageError::InvalidInput(
"legacy raw f32 vector payload writes are disabled".to_string(),
));
}
Some((0, values.clone()))
}
};
invalidate_logical_node_cache(handle, entry.key);
increment_pending_delta(handle, entry.key);
let record = encode_wal_record(sstable::EntryKind::VectorDelta, delta);
let wal_bytes = handle.wal.append_record(&record)?;
handle.metrics.wal_bytes_written += wal_bytes;
handle.metrics.logical_bytes_written += delta.len() as u64;
let node_id = entry.key;
handle.memtable.put(entry);
handle.embedding_pending_nodes.remove(&node_id);
if let Some((space_id, vec)) = hnsw_vec {
super::hnsw_insert_for_space(handle, space_id, node_id, vec);
}
Ok(())
}
pub fn put_full_node(
handle: &mut StorageHandle,
node_id: u64,
version: u64,
adjacency: &[u64],
) -> Result<()> {
let old_adjacency = get_logical_node(handle, node_id)?.adjacency();
let payload = encode_adjacency(adjacency);
let entry = sstable::Entry {
key: node_id,
version,
kind: sstable::EntryKind::FullNode,
value: payload.clone(),
};
let record = encode_wal_record(
sstable::EntryKind::FullNode,
&encode_delta(node_id, version, &payload),
);
let wal_bytes = handle.wal.append_record(&record)?;
handle.metrics.wal_bytes_written += wal_bytes;
handle.metrics.logical_bytes_written += payload.len() as u64;
handle.memtable.put(entry);
invalidate_logical_node_cache(handle, node_id);
handle.pending_deltas_per_node.remove(&node_id);
super::reads::update_inbound_adjacency_for_node(handle, node_id, &old_adjacency, adjacency)?;
Ok(())
}
pub fn flush(handle: &mut StorageHandle) -> Result<()> {
if handle.memtable.is_empty() && !handle.bitmap_store.is_dirty() {
return Ok(());
}
if !handle.memtable.is_empty() {
let run_id = handle.manifest.allocate_run_id(0);
let path = handle.sstable_dir.join(format!("ir.l0.{:04}.sst", run_id));
let table = handle
.memtable
.flush_to_sstable_with_reactor(&path, handle.reactor.as_ref())?;
if let Ok(len) = handle.reactor.metadata_len(&path) {
handle.metrics.sstable_bytes_written += len;
}
handle.memtable.clear();
clear_logical_node_cache(handle);
handle.l0_runs.push(path);
let last_path = handle.l0_runs.last().expect("pushed run").clone();
handle.sstable_cache.insert(last_path, table);
maybe_trigger_policy_compaction(handle)?;
}
handle
.bitmap_store
.flush_with_reactor(handle.reactor.as_ref())?;
handle.manifest.persist()?;
handle.wal.sync()?;
Ok(())
}
pub fn sync(handle: &mut StorageHandle) -> Result<()> {
handle.wal.sync()?;
Ok(())
}
pub fn compact(handle: &mut StorageHandle, level: Option<u32>) -> Result<()> {
if handle.compaction_in_progress {
return Ok(());
}
handle.compaction_in_progress = true;
let result = compact_with_job_tracking(handle, level);
handle.compaction_in_progress = false;
result
}
fn compact_with_job_tracking(handle: &mut StorageHandle, level: Option<u32>) -> Result<()> {
let job_id = handle
.compaction_jobs
.submit("storage.compaction")
.map_err(|err| StorageError::Sstable(format!("compaction job submit failed: {:?}", err)))?;
handle.last_compaction_job_id = Some(job_id);
let started_id = handle
.compaction_jobs
.start_next()
.ok_or_else(|| StorageError::Sstable("compaction job did not start".to_string()))?;
if started_id != job_id {
return Err(StorageError::Sstable(format!(
"compaction job mismatch: expected {}, got {}",
job_id, started_id
)));
}
let mut attempt = 0_u32;
loop {
attempt = attempt.saturating_add(1);
match compact_inner(handle, level) {
Ok(()) => {
let _ = handle.compaction_jobs.mark_succeeded(job_id);
return Ok(());
}
Err(err) => {
let retryable = is_retryable_compaction_error(&err);
if retryable && attempt <= handle.compaction_max_retries {
let _ = handle.compaction_jobs.retry_running(job_id);
continue;
}
let _ = handle
.compaction_jobs
.mark_failed(job_id, &format!("{:?}", err));
return Err(err);
}
}
}
}
fn is_retryable_compaction_error(err: &StorageError) -> bool {
matches!(err, StorageError::Io(_) | StorageError::Sstable(_))
}
pub fn latest_compaction_job_id(handle: &StorageHandle) -> Option<u64> {
handle.last_compaction_job_id
}
pub fn compaction_job_status(
handle: &StorageHandle,
job_id: u64,
) -> Option<ops::BackgroundJobRecord> {
handle.compaction_jobs.status(job_id).cloned()
}
pub fn compaction_jobs_snapshot(handle: &StorageHandle) -> Vec<ops::BackgroundJobRecord> {
handle.compaction_jobs.snapshot()
}
fn compact_inner(handle: &mut StorageHandle, level: Option<u32>) -> Result<()> {
let target_level = level.unwrap_or(0);
if target_level != 0 {
return Err(StorageError::InvalidInput(
"only level 0 compaction is currently supported".to_string(),
));
}
if !handle.memtable.is_empty() {
flush(handle)?;
}
if handle.l0_runs.len() <= 1 {
return Ok(());
}
let marker_path = compaction_marker_path(&handle.sstable_dir);
handle.reactor.write_file(&marker_path, b"in-progress")?;
let run_paths = handle.l0_runs.clone();
let mut merged_entries = Vec::new();
for path in &run_paths {
let table = if let Some(cached) = handle.sstable_cache.get(path) {
cached.clone()
} else {
sstable::read_sstable(path)?
};
if let Ok(len) = handle.reactor.metadata_len(path) {
handle.metrics.sstable_bytes_read += len;
}
merged_entries.extend(table.entries);
}
merged_entries.retain(|entry| {
if entry.kind != sstable::EntryKind::VectorDelta {
return true;
}
match decode_vector_payload(&entry.value) {
Ok(DecodedVectorPayload::Structured(v)) => {
!handle.manifest.is_space_retired(v.descriptor.space_id)
}
_ => true,
}
});
if merged_entries.is_empty() {
std::fs::remove_file(&marker_path).ok();
return Ok(());
}
let run_id = handle.manifest.allocate_run_id(0);
let temp_path = handle
.sstable_dir
.join(format!("ir.l0.{:04}.sst.tmp", run_id));
let compacted_path = handle.sstable_dir.join(format!("ir.l0.{:04}.sst", run_id));
sstable::write_sstable_with_reactor(&temp_path, &merged_entries, handle.reactor.as_ref())?;
let compacted_table = sstable::read_sstable_with_reactor(&temp_path, handle.reactor.as_ref())?;
let compacted_bytes = handle.reactor.read_file(&temp_path)?;
handle
.reactor
.write_file(&compacted_path, &compacted_bytes)?;
std::fs::remove_file(&temp_path).ok();
if let Ok(len) = handle.reactor.metadata_len(&compacted_path) {
handle.metrics.sstable_bytes_written += len;
}
for path in &run_paths {
handle.sstable_cache.remove(path);
std::fs::remove_file(path).ok();
}
handle.l0_runs.clear();
handle.l0_runs.push(compacted_path.clone());
handle.sstable_cache.insert(compacted_path, compacted_table);
clear_logical_node_cache(handle);
handle.pending_deltas_per_node.clear();
handle.manifest.persist()?;
std::fs::remove_file(&marker_path).ok();
Ok(())
}
pub fn recover_from_wal(handle: &mut StorageHandle) -> Result<usize> {
let records = handle.wal.replay()?;
let mut applied = 0usize;
for record in records {
match decode_wal_record(&record)? {
WalRecord::Delta(kind, delta) => {
let entry = decode_delta(delta, kind)?;
let old_adjacency = if entry.kind == sstable::EntryKind::FullNode {
get_logical_node(handle, entry.key)?.adjacency()
} else {
Vec::new()
};
let vector_for_hnsw = if entry.kind == sstable::EntryKind::VectorDelta {
let decoded =
decode_vector_payload(&entry.value).map_err(StorageError::CorruptData)?;
match &decoded {
DecodedVectorPayload::Structured(vector) => {
if handle.manifest.is_space_retired(vector.descriptor.space_id) {
applied += 1;
continue;
}
register_vector_space_for_write(handle, &vector.descriptor)?;
if vector.descriptor.metric == VectorMetric::Cosine {
Some((vector.descriptor.space_id, vector.values.clone()))
} else {
None
}
}
DecodedVectorPayload::LegacyF32(values) => {
if !handle.manifest.legacy_vector_raw_f32_compat() {
return Err(StorageError::CorruptData(
"legacy raw f32 vector payload encountered after compatibility was disabled"
.to_string(),
));
}
Some((0, values.clone()))
}
}
} else {
None
};
invalidate_logical_node_cache(handle, entry.key);
if entry.kind == sstable::EntryKind::EdgeDelta
|| entry.kind == sstable::EntryKind::VectorDelta
{
increment_pending_delta(handle, entry.key);
}
if entry.kind == sstable::EntryKind::FullNode {
handle.pending_deltas_per_node.remove(&entry.key);
}
let node_id = entry.key;
if entry.kind == sstable::EntryKind::Tombstone {
handle.tombstoned_node_ids.insert(node_id);
}
let new_adjacency = if entry.kind == sstable::EntryKind::FullNode {
bytes_to_u64_list(&entry.value)
} else {
Vec::new()
};
handle.memtable.put(entry);
if !old_adjacency.is_empty() || !new_adjacency.is_empty() {
super::reads::update_inbound_adjacency_for_node(
handle,
node_id,
&old_adjacency,
&new_adjacency,
)?;
}
if vector_for_hnsw.is_some() {
handle.embedding_pending_nodes.remove(&node_id);
}
if let Some((space_id, vector)) = vector_for_hnsw {
hnsw_insert_for_space(handle, space_id, node_id, vector);
}
}
WalRecord::BitmapPosting {
index_name,
value_key,
node_id,
} => {
apply_bitmap_posting(handle, &index_name, &value_key, node_id, false)?;
}
WalRecord::EmbeddingPending { node_id } => {
handle.embedding_pending_nodes.insert(node_id);
}
}
applied += 1;
}
Ok(applied)
}
pub fn encode_delta(node_id: u64, version: u64, payload: &[u8]) -> Vec<u8> {
let mut bytes = Vec::with_capacity(16 + payload.len());
bytes.extend_from_slice(&node_id.to_le_bytes());
bytes.extend_from_slice(&version.to_le_bytes());
bytes.extend_from_slice(payload);
bytes
}
pub fn encode_typed_edge_delta_add(target_id: u64, rel_type: &str) -> Vec<u8> {
encode_typed_edge_delta_payload(target_id, rel_type, TypedEdgeDeltaOp::Add)
}
pub fn encode_typed_edge_delta_delete(target_id: u64, rel_type: &str) -> Vec<u8> {
encode_typed_edge_delta_payload(target_id, rel_type, TypedEdgeDeltaOp::Delete)
}
pub fn encode_adjacency(nodes: &[u64]) -> Vec<u8> {
let mut bytes = Vec::with_capacity(nodes.len() * 8);
for node in nodes {
bytes.extend_from_slice(&node.to_le_bytes());
}
bytes
}
fn encode_wal_record(kind: sstable::EntryKind, delta: &[u8]) -> Vec<u8> {
let mut record = Vec::with_capacity(1 + delta.len());
record.push(kind as u8);
record.extend_from_slice(delta);
record
}
fn increment_pending_delta(handle: &mut StorageHandle, node_id: u64) {
let entry = handle.pending_deltas_per_node.entry(node_id).or_insert(0);
*entry = entry.saturating_add(1);
}
fn maybe_trigger_policy_compaction(handle: &mut StorageHandle) -> Result<()> {
if handle.compaction_in_progress || handle.l0_runs.is_empty() {
return Ok(());
}
let runs = handle
.l0_runs
.iter()
.map(|path| compaction::RunInfo {
level: 0,
run_id: parse_run_id(path).unwrap_or(0),
size_bytes: handle.reactor.metadata_len(path).unwrap_or(0),
max_delta_count: *handle.pending_deltas_per_node.values().max().unwrap_or(&0),
})
.collect::<Vec<_>>();
let stats = compaction::GraphStats {
hot_node_ids: handle.pending_deltas_per_node.keys().copied().collect(),
max_pending_deltas: *handle.pending_deltas_per_node.values().max().unwrap_or(&0),
};
if let Some(decision) = handle.compaction_policy.select_compaction(&runs, &stats) {
compact(handle, Some(decision.source_level))?;
}
Ok(())
}