use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use arc_swap::{ArcSwap, Guard};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use dashmap::mapref::one::Ref;
use smallvec::SmallVec;
use crate::core::id::{EdgeId, NodeId};
use crate::index::adjacency::{AdjacencyEntry, AdjacencyIndex};
pub struct IncrementalAdjacencyIndex {
frozen: ArcSwap<AdjacencyIndex>,
delta: DashMap<NodeId, SmallVec<[AdjacencyEntry; 8]>>,
tombstones: DashMap<EdgeId, Tombstone>,
stats: AdjacencyStats,
config: IncrementalConfig,
#[doc(hidden)]
test_panic_on_compact: AtomicBool,
}
#[derive(Debug, Clone)]
pub struct Tombstone {
pub edge_id: EdgeId,
pub deleted_at: DateTime<Utc>,
pub transaction_time: DateTime<Utc>,
}
#[derive(Debug)]
struct AdjacencyStats {
delta_edge_count: AtomicUsize,
tombstone_count: AtomicUsize,
frozen_edge_count: AtomicUsize,
last_compaction: AtomicU64, }
#[derive(Debug, Clone)]
pub struct IncrementalConfig {
pub compaction_ratio: f64,
pub max_delta_edges: usize,
pub max_tombstones: usize,
pub smallvec_capacity: usize,
pub check_interval: Duration,
}
impl Default for IncrementalConfig {
fn default() -> Self {
Self {
compaction_ratio: 0.1, max_delta_edges: 10_000, max_tombstones: 1_000, smallvec_capacity: 8, check_interval: Duration::from_secs(1), }
}
}
impl AdjacencyStats {
fn new() -> Self {
Self {
delta_edge_count: AtomicUsize::new(0),
tombstone_count: AtomicUsize::new(0),
frozen_edge_count: AtomicUsize::new(0),
last_compaction: AtomicU64::new(0),
}
}
}
impl IncrementalAdjacencyIndex {
pub fn new() -> Self {
Self::with_config(
Arc::new(AdjacencyIndex::new()),
IncrementalConfig::default(),
)
}
pub fn from_frozen(frozen: Arc<AdjacencyIndex>) -> Self {
Self::with_config(frozen, IncrementalConfig::default())
}
pub fn with_config(frozen: Arc<AdjacencyIndex>, config: IncrementalConfig) -> Self {
let frozen_edge_count = frozen.edge_count();
Self {
frozen: ArcSwap::from_pointee((*frozen).clone()),
delta: DashMap::new(),
tombstones: DashMap::new(),
stats: AdjacencyStats {
frozen_edge_count: AtomicUsize::new(frozen_edge_count),
..AdjacencyStats::new()
},
config,
test_panic_on_compact: AtomicBool::new(false),
}
}
#[doc(hidden)]
pub fn test_inject_panic_on_compact(&self) {
self.test_panic_on_compact.store(true, Ordering::Relaxed);
}
pub fn frozen_edge_count(&self) -> usize {
self.stats.frozen_edge_count.load(Ordering::Acquire)
}
pub fn export_frozen_csr(&self) -> (Vec<u64>, Vec<u64>, Vec<u64>) {
self.frozen.load().export_csr()
}
pub fn import_frozen_csr(&self, frozen_csr: Arc<AdjacencyIndex>) {
self.frozen.store(frozen_csr);
self.delta.clear();
self.tombstones.clear();
let frozen_count = self.frozen.load().edge_count();
self.stats
.frozen_edge_count
.store(frozen_count, Ordering::Relaxed);
self.stats.delta_edge_count.store(0, Ordering::Relaxed);
self.stats.tombstone_count.store(0, Ordering::Relaxed);
}
pub fn try_import_frozen_csr(&self, frozen_csr: Arc<AdjacencyIndex>) -> Result<(), String> {
let delta_count = self.delta.len();
let tombstone_count = self.tombstones.len();
if delta_count > 0 || tombstone_count > 0 {
return Err(format!(
"Cannot import: {} uncommitted delta edges and {} tombstones would be lost. \
Call compact() first or use import_frozen_csr() to force.",
delta_count, tombstone_count
));
}
self.import_frozen_csr(frozen_csr);
Ok(())
}
pub fn delta_edge_count(&self) -> usize {
self.stats.delta_edge_count.load(Ordering::Relaxed)
}
pub fn tombstone_count(&self) -> usize {
self.stats.tombstone_count.load(Ordering::Relaxed)
}
pub fn insert(&self, source: NodeId, entry: AdjacencyEntry) {
self.stats.delta_edge_count.fetch_add(1, Ordering::Relaxed);
self.delta.entry(source).or_default().push(entry);
}
pub fn delete(&self, edge_id: EdgeId) {
self.stats.tombstone_count.fetch_add(1, Ordering::Relaxed);
let tombstone = Tombstone {
edge_id,
deleted_at: Utc::now(),
transaction_time: Utc::now(),
};
self.tombstones.insert(edge_id, tombstone);
}
pub fn get_tombstone(&self, edge_id: EdgeId) -> Option<Tombstone> {
self.tombstones.get(&edge_id).map(|t| t.clone())
}
pub fn get_adjacency(&self, node: NodeId) -> MergedAdjacencyGuard<'_> {
let frozen_guard = self.frozen.load();
let delta_empty = self.stats.delta_edge_count.load(Ordering::Relaxed) == 0;
let tombstones_empty = self.stats.tombstone_count.load(Ordering::Relaxed) == 0;
if delta_empty && tombstones_empty {
return MergedAdjacencyGuard {
node,
frozen: frozen_guard,
delta: None,
tombstones: &self.tombstones,
fast_path: true, };
}
let delta_guard = self.delta.get(&node);
MergedAdjacencyGuard {
node,
frozen: frozen_guard,
delta: delta_guard,
tombstones: &self.tombstones,
fast_path: false,
}
}
pub fn should_compact(&self) -> bool {
let delta = self.stats.delta_edge_count.load(Ordering::Relaxed);
let frozen = self.stats.frozen_edge_count.load(Ordering::Acquire);
let tombstones = self.stats.tombstone_count.load(Ordering::Relaxed);
if delta >= self.config.max_delta_edges {
return true;
}
if frozen > 0 && delta as f64 >= frozen as f64 * self.config.compaction_ratio {
return true;
}
if tombstones >= self.config.max_tombstones {
return true;
}
false
}
pub fn compact(&self) {
if self.test_panic_on_compact.swap(false, Ordering::Relaxed) {
panic!("Test-injected panic during compaction");
}
let frozen = self.frozen.load();
let mut local_tombstones = HashSet::new();
self.tombstones.retain(|edge_id, _| {
local_tombstones.insert(*edge_id);
false });
self.stats
.tombstone_count
.fetch_sub(local_tombstones.len(), Ordering::Relaxed);
let delta_count_estimate = self.stats.delta_edge_count.load(Ordering::Relaxed);
let mut local_delta = Vec::with_capacity(delta_count_estimate);
let mut drained_edge_count = 0;
self.delta.retain(|source, edges| {
for entry in edges.iter() {
local_delta.push((*source, *entry));
drained_edge_count += 1;
}
false });
self.stats
.delta_edge_count
.fetch_sub(drained_edge_count, Ordering::Relaxed);
let estimated_capacity = frozen.edge_count() + drained_edge_count;
let mut all_edges = Vec::with_capacity(estimated_capacity);
for node_id in frozen.iter_nodes() {
let frozen_slice = frozen.get_adjacency(node_id);
for adj in frozen_slice {
if !local_tombstones.contains(&adj.edge_id) {
all_edges.push((node_id, adj.target, adj.edge_id, adj.label));
}
}
}
for (source, adj) in local_delta {
if !local_tombstones.contains(&adj.edge_id) {
all_edges.push((source, adj.target, adj.edge_id, adj.label));
}
}
let new_frozen = AdjacencyIndex::build(all_edges);
let new_edge_count = new_frozen.edge_count();
self.frozen.store(Arc::new(new_frozen));
self.stats
.frozen_edge_count
.store(new_edge_count, Ordering::Release);
self.stats
.last_compaction
.store(Utc::now().timestamp() as u64, Ordering::Release);
}
pub fn frozen_view(&self) -> Option<FrozenAdjacencyView> {
if self.stats.delta_edge_count.load(Ordering::Relaxed) > 0 {
return None;
}
if self.stats.tombstone_count.load(Ordering::Relaxed) > 0 {
return None;
}
Some(FrozenAdjacencyView {
frozen: self.frozen.load(),
})
}
}
pub struct FrozenAdjacencyView {
frozen: Guard<Arc<AdjacencyIndex>>,
}
impl FrozenAdjacencyView {
#[inline]
pub fn get_adjacency(&self, node: NodeId) -> &[AdjacencyEntry] {
self.frozen.get_adjacency(node)
}
}
pub struct MergedAdjacencyGuard<'a> {
node: NodeId,
frozen: Guard<Arc<AdjacencyIndex>>,
delta: Option<Ref<'a, NodeId, SmallVec<[AdjacencyEntry; 8]>>>,
tombstones: &'a DashMap<EdgeId, Tombstone>,
fast_path: bool,
}
impl<'a> MergedAdjacencyGuard<'a> {
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &AdjacencyEntry> + '_ {
let frozen_slice = self.frozen.get_adjacency(self.node);
let fast_path = self.fast_path;
let frozen_iter = frozen_slice.iter().filter(move |e| {
fast_path || !self.tombstones.contains_key(&e.edge_id)
});
let delta_iter = self
.delta
.as_ref()
.into_iter()
.flat_map(|d| d.iter())
.filter(move |e| fast_path || !self.tombstones.contains_key(&e.edge_id));
frozen_iter.chain(delta_iter)
}
#[inline]
pub fn len(&self) -> usize {
self.iter().count()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.iter().next().is_none()
}
#[inline]
pub fn get(&self, index: usize) -> Option<&AdjacencyEntry> {
self.iter().nth(index)
}
#[inline]
pub fn capacity_hint(&self) -> usize {
let frozen_len = self.frozen.get_adjacency(self.node).len();
let delta_len = self.delta.as_ref().map(|d| d.len()).unwrap_or(0);
frozen_len + delta_len
}
#[inline]
pub fn fast_len(&self) -> Option<usize> {
if self.fast_path {
Some(self.frozen.get_adjacency(self.node).len())
} else {
None
}
}
#[inline]
pub fn as_slice(&self) -> Option<&[AdjacencyEntry]> {
if self.delta.is_none() && self.tombstones.is_empty() {
Some(self.frozen.get_adjacency(self.node))
} else {
None
}
}
#[inline]
pub fn frozen_slice(&self) -> &[AdjacencyEntry] {
self.frozen.get_adjacency(self.node)
}
#[inline]
pub fn delta_slice(&self) -> &[AdjacencyEntry] {
self.delta.as_ref().map(|d| d.as_slice()).unwrap_or(&[])
}
#[inline]
pub fn is_tombstoned(&self, edge_id: EdgeId) -> bool {
!self.fast_path && self.tombstones.contains_key(&edge_id)
}
}
impl<'a> std::ops::Index<usize> for MergedAdjacencyGuard<'a> {
type Output = AdjacencyEntry;
fn index(&self, index: usize) -> &Self::Output {
self.get(index).unwrap_or_else(|| {
panic!(
"index {} out of bounds for MergedAdjacencyGuard (len: {})",
index,
self.len()
)
})
}
}
impl<'a> std::fmt::Debug for MergedAdjacencyGuard<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MergedAdjacencyGuard")
.field("node", &self.node)
.field("entry_count", &self.len())
.finish()
}
}
impl<'a> PartialEq for MergedAdjacencyGuard<'a> {
fn eq(&self, other: &Self) -> bool {
let self_entries: Vec<_> = self.iter().collect();
let other_entries: Vec<_> = other.iter().collect();
self_entries == other_entries
}
}
impl<'a> Eq for MergedAdjacencyGuard<'a> {}
impl Default for IncrementalAdjacencyIndex {
fn default() -> Self {
Self::new()
}
}
pub struct CompactionScheduler {
index: Arc<IncrementalAdjacencyIndex>,
running: Arc<AtomicBool>,
paused: Arc<AtomicBool>,
panic_count: Arc<AtomicUsize>,
consecutive_panics: Arc<AtomicUsize>,
}
const MAX_CONSECUTIVE_PANICS: usize = 5;
impl CompactionScheduler {
pub fn new(index: Arc<IncrementalAdjacencyIndex>) -> Self {
Self {
index,
running: Arc::new(AtomicBool::new(false)),
paused: Arc::new(AtomicBool::new(false)),
panic_count: Arc::new(AtomicUsize::new(0)),
consecutive_panics: Arc::new(AtomicUsize::new(0)),
}
}
pub fn panic_count(&self) -> usize {
self.panic_count.load(Ordering::Relaxed)
}
pub fn start(&self) -> JoinHandle<()> {
self.running.store(true, Ordering::SeqCst);
let index = Arc::clone(&self.index);
let running = Arc::clone(&self.running);
let paused = Arc::clone(&self.paused);
let panic_count = Arc::clone(&self.panic_count);
let consecutive_panics = Arc::clone(&self.consecutive_panics);
thread::spawn(move || {
let check_interval = index.config.check_interval;
while running.load(Ordering::SeqCst) {
if !paused.load(Ordering::SeqCst) {
if index.should_compact() {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
index.compact();
}));
if let Err(panic_payload) = result {
panic_count.fetch_add(1, Ordering::Relaxed);
let consecutive =
consecutive_panics.fetch_add(1, Ordering::Relaxed) + 1;
eprintln!(
"[CompactionScheduler] Panic during compaction (total: {}, consecutive: {}): {:?}",
panic_count.load(Ordering::Relaxed),
consecutive,
panic_payload
);
if consecutive >= MAX_CONSECUTIVE_PANICS {
eprintln!(
"[CompactionScheduler] Exiting after {} consecutive panics",
consecutive
);
break;
}
} else {
consecutive_panics.store(0, Ordering::Relaxed);
}
}
}
thread::sleep(check_interval);
}
if index.delta_edge_count() > 0 || index.tombstone_count() > 0 {
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
index.compact();
}));
}
})
}
pub fn pause(&self) {
self.paused.store(true, Ordering::SeqCst);
}
pub fn resume(&self) {
self.paused.store(false, Ordering::SeqCst);
}
pub fn shutdown(&self) {
self.running.store(false, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_creates_empty_index() {
let index = IncrementalAdjacencyIndex::new();
assert_eq!(index.frozen_edge_count(), 0);
assert_eq!(index.delta_edge_count(), 0);
assert_eq!(index.tombstone_count(), 0);
}
#[test]
fn test_guard_capacity_hint_and_fast_len() {
use crate::core::interning::InternedString;
let index = IncrementalAdjacencyIndex::new();
let node = NodeId::new(1).unwrap();
{
let guard = index.get_adjacency(node);
assert_eq!(guard.capacity_hint(), 0);
assert_eq!(guard.fast_len(), Some(0));
}
let entry = AdjacencyEntry::new(
NodeId::new(2).unwrap(),
EdgeId::new(1).unwrap(),
InternedString::from_raw(1),
);
index.insert(node, entry);
{
let guard = index.get_adjacency(node);
assert_eq!(guard.capacity_hint(), 1);
assert_eq!(guard.fast_len(), None); }
index.compact();
{
let guard = index.get_adjacency(node);
assert_eq!(guard.capacity_hint(), 1);
assert_eq!(guard.fast_len(), Some(1));
}
index.delete(EdgeId::new(1).unwrap());
{
let guard = index.get_adjacency(node);
assert_eq!(guard.capacity_hint(), 1); assert_eq!(guard.fast_len(), None); }
}
#[test]
fn test_guard_frozen_slice() {
use crate::core::interning::InternedString;
let index = IncrementalAdjacencyIndex::new();
let node = NodeId::new(1).unwrap();
let guard = index.get_adjacency(node);
assert!(guard.frozen_slice().is_empty());
drop(guard);
let edge = EdgeId::new(1).unwrap();
let entry = AdjacencyEntry::new(NodeId::new(2).unwrap(), edge, InternedString::from_raw(1));
index.insert(node, entry);
index.compact();
let guard = index.get_adjacency(node);
assert_eq!(guard.frozen_slice().len(), 1);
assert_eq!(guard.frozen_slice()[0].edge_id, edge);
}
#[test]
fn test_guard_delta_slice() {
use crate::core::interning::InternedString;
let index = IncrementalAdjacencyIndex::new();
let node = NodeId::new(1).unwrap();
let guard = index.get_adjacency(node);
assert!(guard.delta_slice().is_empty());
drop(guard);
let edge = EdgeId::new(1).unwrap();
let entry = AdjacencyEntry::new(NodeId::new(2).unwrap(), edge, InternedString::from_raw(1));
index.insert(node, entry);
let guard = index.get_adjacency(node);
assert_eq!(guard.delta_slice().len(), 1);
assert_eq!(guard.delta_slice()[0].edge_id, edge);
}
#[test]
fn test_guard_is_tombstoned() {
use crate::core::interning::InternedString;
let index = IncrementalAdjacencyIndex::new();
let node = NodeId::new(1).unwrap();
let edge = EdgeId::new(1).unwrap();
let other_edge = EdgeId::new(2).unwrap();
let entry = AdjacencyEntry::new(NodeId::new(2).unwrap(), edge, InternedString::from_raw(1));
index.insert(node, entry);
index.compact();
let guard = index.get_adjacency(node);
assert!(!guard.is_tombstoned(edge));
drop(guard);
index.delete(edge);
let guard = index.get_adjacency(node);
assert!(guard.is_tombstoned(edge));
assert!(!guard.is_tombstoned(other_edge));
}
}