use crate::datatypes::values::Value;
use crate::graph::core::iterators::{
DiskEdgeIndices, DiskEdgeReferences, DiskEdges, DiskEdgesConnecting, DiskNeighbors,
DiskNodeIndices,
};
use crate::graph::schema::{EdgeData, InternedKey, NodeData};
use crate::graph::storage::mapped::mmap_vec::MmapOrVec;
use petgraph::graph::{EdgeIndex, NodeIndex};
use petgraph::Direction;
use std::borrow::Cow;
use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use super::csr::{CsrEdge, DiskNodeSlot, EdgeEndpoints, TOMBSTONE_EDGE};
use super::edge_properties::EdgePropertyStore;
use super::property_index;
pub(crate) fn segment_subdir(id: u32) -> String {
format!("seg_{id:03}")
}
pub(crate) fn enumerate_segment_dirs(root: &Path) -> Vec<(u32, PathBuf)> {
let Ok(entries) = std::fs::read_dir(root) else {
return Vec::new();
};
let mut out: Vec<(u32, PathBuf)> = entries
.flatten()
.filter_map(|e| {
if !e.file_type().ok()?.is_dir() {
return None;
}
let name = e.file_name();
let s = name.to_str()?;
let id_str = s.strip_prefix("seg_")?;
let id: u32 = id_str.parse().ok()?;
Some((id, e.path()))
})
.collect();
out.sort_by_key(|(id, _)| *id);
out
}
pub(crate) const CURRENT_CSR_LAYOUT_VERSION: u8 = 1;
pub struct DiskGraph {
pub(crate) node_slots: MmapOrVec<DiskNodeSlot>,
pub(super) node_count: usize,
pub(super) free_node_slots: Vec<u32>,
#[allow(clippy::vec_box)]
pub(super) node_arena: std::sync::Mutex<Vec<Box<NodeData>>>,
pub(crate) column_stores:
HashMap<InternedKey, Arc<crate::graph::storage::column_store::ColumnStore>>,
pub(super) out_offsets: MmapOrVec<u64>,
pub(super) out_edges: MmapOrVec<CsrEdge>,
pub(super) in_offsets: MmapOrVec<u64>,
pub(super) in_edges: MmapOrVec<CsrEdge>,
pub(crate) edge_endpoints: MmapOrVec<EdgeEndpoints>,
pub(crate) edge_count: usize,
pub(crate) next_edge_idx: u32,
pub(super) edge_properties: EdgePropertyStore,
#[allow(clippy::vec_box)]
pub(super) edge_arena: std::sync::Mutex<Vec<Box<EdgeData>>>,
pub(super) edge_mut_cache: HashMap<u32, EdgeData>,
pub(super) node_mut_cache: HashMap<u32, NodeData>,
pub(crate) pending_edges: UnsafeCell<MmapOrVec<(u32, u32, u64)>>,
pub(super) overflow_out: HashMap<u32, Vec<CsrEdge>>,
pub(super) overflow_in: HashMap<u32, Vec<CsrEdge>>,
pub(super) free_edge_slots: Vec<u32>,
pub(crate) data_dir: PathBuf,
pub(super) metadata_dirty: bool,
pub(crate) csr_sorted_by_type: bool,
pub(crate) defer_csr: bool,
pub(crate) edge_type_counts_raw: Option<HashMap<u64, usize>>,
pub(crate) conn_type_index_types: MmapOrVec<u64>,
pub(crate) conn_type_index_offsets: MmapOrVec<u64>,
pub(crate) conn_type_index_sources: MmapOrVec<u32>,
pub(crate) peer_count_types: MmapOrVec<u64>,
pub(crate) peer_count_offsets: MmapOrVec<u64>, pub(crate) peer_count_entries: MmapOrVec<u32>, pub(crate) has_tombstones: bool,
pub(crate) property_indexes: PropertyIndexCache,
pub(crate) global_indexes: GlobalIndexCache,
pub(crate) segment_manifest: super::segment_summary::SegmentManifest,
pub(crate) sealed_nodes_bound: u32,
}
type PropertyIndexCache =
std::sync::RwLock<HashMap<(String, String), Option<Arc<property_index::PropertyIndex>>>>;
type GlobalIndexCache =
std::sync::RwLock<HashMap<String, Option<Arc<property_index::PropertyIndex>>>>;
use std::sync::Arc;
unsafe impl Send for DiskGraph {}
unsafe impl Sync for DiskGraph {}
impl std::fmt::Debug for DiskGraph {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"DiskGraph({} nodes, {} edges, dir={:?})",
self.node_count,
self.edge_count,
self.data_dir.display()
)
}
}
impl DiskGraph {
pub fn new_at_path(root_dir: &Path) -> std::io::Result<Self> {
std::fs::create_dir_all(root_dir)?;
let data_dir = root_dir.join(segment_subdir(0));
std::fs::create_dir_all(&data_dir)?;
let data_dir = data_dir.as_path();
Ok(DiskGraph {
node_slots: MmapOrVec::mapped(&data_dir.join("node_slots.bin"), 1024)?,
node_count: 0,
free_node_slots: Vec::new(),
node_arena: std::sync::Mutex::new(Vec::with_capacity(256)),
column_stores: HashMap::new(),
out_offsets: MmapOrVec::mapped(&data_dir.join("out_offsets.bin"), 1025)?,
out_edges: MmapOrVec::new(),
in_offsets: MmapOrVec::mapped(&data_dir.join("in_offsets.bin"), 1025)?,
in_edges: MmapOrVec::new(),
edge_endpoints: MmapOrVec::new(),
edge_count: 0,
next_edge_idx: 0,
edge_properties: EdgePropertyStore::new(),
edge_arena: std::sync::Mutex::new(Vec::with_capacity(256)),
edge_mut_cache: HashMap::new(),
node_mut_cache: HashMap::new(),
pending_edges: UnsafeCell::new(
MmapOrVec::mapped(&data_dir.join("_pending_edges.bin"), 1 << 20)
.unwrap_or_else(|_| MmapOrVec::new()),
),
overflow_out: HashMap::new(),
overflow_in: HashMap::new(),
free_edge_slots: Vec::new(),
data_dir: data_dir.to_path_buf(),
metadata_dirty: false,
csr_sorted_by_type: false,
defer_csr: false,
edge_type_counts_raw: None,
conn_type_index_types: MmapOrVec::new(),
conn_type_index_offsets: MmapOrVec::new(),
conn_type_index_sources: MmapOrVec::new(),
peer_count_types: MmapOrVec::new(),
peer_count_offsets: MmapOrVec::new(),
peer_count_entries: MmapOrVec::new(),
has_tombstones: false,
property_indexes: std::sync::RwLock::new(HashMap::new()),
global_indexes: std::sync::RwLock::new(HashMap::new()),
segment_manifest: super::segment_summary::SegmentManifest::new(),
sealed_nodes_bound: 0,
})
}
pub fn from_stable_digraph(
graph: &mut petgraph::stable_graph::StableDiGraph<NodeData, EdgeData>,
root_dir: &Path,
) -> std::io::Result<Self> {
use petgraph::visit::{EdgeRef, IntoEdgeReferences, NodeIndexable};
std::fs::create_dir_all(root_dir)?;
let data_dir_buf = root_dir.join(segment_subdir(0));
std::fs::create_dir_all(&data_dir_buf)?;
let data_dir = data_dir_buf.as_path();
let node_bound = graph.node_bound();
let edge_count = graph.edge_count();
let mut node_slots = MmapOrVec::mapped(&data_dir.join("node_slots.bin"), node_bound)?;
let mut node_count = 0usize;
for i in 0..node_bound {
let idx = NodeIndex::new(i);
if let Some(node) = graph.node_weight(idx) {
let row_id = match &node.properties {
crate::graph::schema::PropertyStorage::Columnar { row_id, .. } => *row_id,
_ => i as u32,
};
node_slots.push(DiskNodeSlot {
node_type: node.node_type.as_u64(),
row_id,
flags: DiskNodeSlot::ALIVE_BIT,
});
node_count += 1;
} else {
node_slots.push(DiskNodeSlot::default()); }
}
let mut out_counts = vec![0u64; node_bound];
let mut in_counts = vec![0u64; node_bound];
for edge in graph.edge_references() {
let s = edge.source().index();
let t = edge.target().index();
out_counts[s] += 1;
in_counts[t] += 1;
}
let mut out_offsets = MmapOrVec::mapped(&data_dir.join("out_offsets.bin"), node_bound + 1)?;
let mut in_offsets = MmapOrVec::mapped(&data_dir.join("in_offsets.bin"), node_bound + 1)?;
let mut out_acc = 0u64;
let mut in_acc = 0u64;
for i in 0..node_bound {
out_offsets.push(out_acc);
in_offsets.push(in_acc);
out_acc += out_counts[i];
in_acc += in_counts[i];
}
out_offsets.push(out_acc);
in_offsets.push(in_acc);
let mut out_edges = MmapOrVec::mapped(&data_dir.join("out_edges.bin"), edge_count)?;
let mut in_edges = MmapOrVec::mapped(&data_dir.join("in_edges.bin"), edge_count)?;
let mut edge_endpoints_vec =
MmapOrVec::mapped(&data_dir.join("edge_endpoints.bin"), edge_count)?;
let mut edge_properties: HashMap<u32, Vec<(InternedKey, Value)>> = HashMap::new();
for _ in 0..edge_count {
out_edges.push(CsrEdge::default());
in_edges.push(CsrEdge::default());
edge_endpoints_vec.push(EdgeEndpoints::default());
}
let mut out_cursors = vec![0u64; node_bound];
let mut in_cursors = vec![0u64; node_bound];
let mut edge_idx = 0u32;
for edge in graph.edge_references() {
let s = edge.source().index();
let t = edge.target().index();
let ct = edge.weight().connection_type;
let csr_out = CsrEdge {
peer: t as u32,
edge_idx,
};
let out_pos = out_offsets.get(s) + out_cursors[s];
out_edges.set(out_pos as usize, csr_out);
out_cursors[s] += 1;
let csr_in = CsrEdge {
peer: s as u32,
edge_idx,
};
let in_pos = in_offsets.get(t) + in_cursors[t];
in_edges.set(in_pos as usize, csr_in);
in_cursors[t] += 1;
edge_endpoints_vec.set(
edge_idx as usize,
EdgeEndpoints {
source: s as u32,
target: t as u32,
connection_type: ct.as_u64(),
},
);
if !edge.weight().properties.is_empty() {
edge_properties.insert(edge_idx, edge.weight().properties.clone());
}
edge_idx += 1;
}
Ok(DiskGraph {
node_slots,
node_count,
free_node_slots: Vec::new(),
node_arena: std::sync::Mutex::new(Vec::with_capacity(1024)),
column_stores: HashMap::new(), out_offsets,
out_edges,
in_offsets,
in_edges,
edge_endpoints: edge_endpoints_vec,
edge_count,
next_edge_idx: edge_idx,
edge_properties: EdgePropertyStore::from_overlay(edge_properties),
edge_arena: std::sync::Mutex::new(Vec::with_capacity(1024)),
edge_mut_cache: HashMap::new(),
node_mut_cache: HashMap::new(),
pending_edges: UnsafeCell::new(MmapOrVec::new()),
overflow_out: HashMap::new(),
overflow_in: HashMap::new(),
free_edge_slots: Vec::new(),
data_dir: data_dir.to_path_buf(),
metadata_dirty: false,
csr_sorted_by_type: false,
defer_csr: false,
edge_type_counts_raw: None,
conn_type_index_types: MmapOrVec::new(),
conn_type_index_offsets: MmapOrVec::new(),
conn_type_index_sources: MmapOrVec::new(),
peer_count_types: MmapOrVec::new(),
peer_count_offsets: MmapOrVec::new(),
peer_count_entries: MmapOrVec::new(),
has_tombstones: false,
global_indexes: std::sync::RwLock::new(HashMap::new()),
property_indexes: std::sync::RwLock::new(HashMap::new()),
segment_manifest: super::segment_summary::SegmentManifest::new(),
sealed_nodes_bound: 0,
})
}
pub fn set_column_stores(
&mut self,
stores: HashMap<InternedKey, Arc<crate::graph::storage::column_store::ColumnStore>>,
) {
self.column_stores = stores;
}
pub fn column_stores_iter(
&self,
) -> impl Iterator<
Item = (
&InternedKey,
&Arc<crate::graph::storage::column_store::ColumnStore>,
),
> {
self.column_stores.iter()
}
#[inline]
pub fn node_type_of(&self, idx: NodeIndex) -> Option<InternedKey> {
let i = idx.index();
if i >= self.node_slots.len() {
return None;
}
let slot = self.node_slots.get(i);
if !slot.is_alive() {
return None;
}
Some(InternedKey::from_u64(slot.node_type))
}
#[inline]
pub fn get_node_property(&self, idx: NodeIndex, key: InternedKey) -> Option<Value> {
let i = idx.index();
if i >= self.node_slots.len() {
return None;
}
let slot = self.node_slots.get(i);
if !slot.is_alive() {
return None;
}
let type_key = InternedKey::from_u64(slot.node_type);
let store = self.column_stores.get(&type_key)?;
if let Some(val) = store.get(slot.row_id, key) {
return Some(val);
}
if key == InternedKey::from_str("title") {
return store.get_title(slot.row_id);
}
if key == InternedKey::from_str("id") {
return store.get_id(slot.row_id);
}
None
}
#[inline]
pub fn get_node_id(&self, idx: NodeIndex) -> Option<Value> {
let i = idx.index();
if i >= self.node_slots.len() {
return None;
}
let slot = self.node_slots.get(i);
if !slot.is_alive() {
return None;
}
let type_key = InternedKey::from_u64(slot.node_type);
let store = self.column_stores.get(&type_key)?;
store.get_id(slot.row_id)
}
#[inline]
pub fn get_node_title(&self, idx: NodeIndex) -> Option<Value> {
let i = idx.index();
if i >= self.node_slots.len() {
return None;
}
let slot = self.node_slots.get(i);
if !slot.is_alive() {
return None;
}
let type_key = InternedKey::from_u64(slot.node_type);
let store = self.column_stores.get(&type_key)?;
store.get_title(slot.row_id)
}
#[inline]
pub fn node_slot(&self, i: usize) -> DiskNodeSlot {
if i < self.node_slots.len() {
self.node_slots.get(i)
} else {
DiskNodeSlot::default()
}
}
#[inline]
pub fn node_weight(&self, idx: NodeIndex) -> Option<&NodeData> {
let i = idx.index();
if i >= self.node_slots.len() {
return None;
}
let slot = self.node_slots.get(i);
if !slot.is_alive() {
return None;
}
#[cfg(debug_assertions)]
if let Some(staged) = self.node_mut_cache.get(&(i as u32)) {
use crate::graph::schema::PropertyStorage;
if matches!(staged.properties, PropertyStorage::Map(_))
&& !matches!(staged.properties, PropertyStorage::Map(ref m) if m.is_empty())
{
eprintln!(
"BUG: DiskGraph::node_weight({}) called while node_mut_cache holds a \
staged Map-typed write for that index. Missing flush_pending_writes() \
call. See 0.9.0 readiness Cluster 6 / node_weight_mut docs.",
i
);
}
}
let node_type_key = InternedKey::from_u64(slot.node_type);
let store = self.column_stores.get(&node_type_key);
let node_data = if let Some(store) = store {
let id = store.get_id(slot.row_id).unwrap_or(Value::Null);
let title = store.get_title(slot.row_id).unwrap_or(Value::Null);
NodeData {
id,
title,
node_type: node_type_key,
extra_labels: Vec::new(),
properties: crate::graph::schema::PropertyStorage::Columnar {
store: Arc::clone(store),
row_id: slot.row_id,
},
}
} else {
NodeData {
id: Value::Null,
title: Value::Null,
node_type: node_type_key,
extra_labels: Vec::new(),
properties: crate::graph::schema::PropertyStorage::Map(HashMap::new()),
}
};
let boxed = Box::new(node_data);
let ptr: *const NodeData = &*boxed;
self.node_arena.lock().unwrap().push(boxed);
unsafe { Some(&*ptr) }
}
pub fn node_weight_mut(&mut self, idx: NodeIndex) -> Option<&mut NodeData> {
let i = idx.index();
if i >= self.node_slots.len() {
return None;
}
let slot = self.node_slots.get(i);
if !slot.is_alive() {
return None;
}
let node_type_key = InternedKey::from_u64(slot.node_type);
let key = i as u32;
let needs_reseed = match self.node_mut_cache.get(&key) {
None => true,
Some(nd) => !matches!(nd.properties, crate::graph::schema::PropertyStorage::Map(_)),
};
if needs_reseed {
let store = self.column_stores.get(&node_type_key);
let (id_val, title_val) = if let Some(s) = store {
(
s.get_id(slot.row_id).unwrap_or(Value::Null),
s.get_title(slot.row_id).unwrap_or(Value::Null),
)
} else {
(Value::Null, Value::Null)
};
self.node_mut_cache.insert(
key,
NodeData {
id: id_val,
title: title_val,
node_type: node_type_key,
extra_labels: Vec::new(),
properties: crate::graph::schema::PropertyStorage::Map(HashMap::new()),
},
);
}
Some(self.node_mut_cache.get_mut(&key).unwrap())
}
#[inline]
pub fn node_count(&self) -> usize {
self.node_count
}
#[inline]
pub fn node_bound(&self) -> usize {
self.node_slots.len()
}
pub fn add_node(&mut self, data: NodeData) -> NodeIndex {
self.clear_arenas();
self.metadata_dirty = true;
let row_id = match &data.properties {
crate::graph::schema::PropertyStorage::Columnar { row_id, .. } => *row_id,
_ => self.node_slots.len() as u32,
};
let slot = DiskNodeSlot {
node_type: data.node_type.as_u64(),
row_id,
flags: DiskNodeSlot::ALIVE_BIT,
};
if let Some(recycled) = self.free_node_slots.pop() {
let idx = recycled as usize;
self.node_slots.set(idx, slot);
self.node_count += 1;
NodeIndex::new(idx)
} else {
let idx = self.node_slots.len();
self.node_slots.push(slot);
let last_out = if !self.out_offsets.is_empty() {
self.out_offsets.get(self.out_offsets.len() - 1)
} else {
0
};
self.out_offsets.push(last_out);
let last_in = if !self.in_offsets.is_empty() {
self.in_offsets.get(self.in_offsets.len() - 1)
} else {
0
};
self.in_offsets.push(last_in);
self.node_count += 1;
NodeIndex::new(idx)
}
}
pub fn remove_node(&mut self, idx: NodeIndex) -> Option<NodeData> {
self.metadata_dirty = true;
self.clear_arenas();
let i = idx.index();
if i >= self.node_slots.len() {
return None;
}
let slot = self.node_slots.get(i);
if !slot.is_alive() {
return None;
}
let node_type_key = InternedKey::from_u64(slot.node_type);
let store = self.column_stores.get(&node_type_key).cloned();
let (id_val, title_val) = if let Some(ref s) = store {
(
s.get_id(slot.row_id).unwrap_or(Value::Null),
s.get_title(slot.row_id).unwrap_or(Value::Null),
)
} else {
(Value::Null, Value::Null)
};
let data = NodeData {
id: id_val,
title: title_val,
node_type: node_type_key,
extra_labels: Vec::new(),
properties: if let Some(s) = store {
crate::graph::schema::PropertyStorage::Columnar {
store: s,
row_id: slot.row_id,
}
} else {
crate::graph::schema::PropertyStorage::Map(HashMap::new())
},
};
let mut dead_slot = slot;
dead_slot.flags = 0;
self.node_slots.set(i, dead_slot);
self.node_count -= 1;
self.free_node_slots.push(i as u32);
self.has_tombstones = true;
self.tombstone_edges_for_node(i);
Some(data)
}
pub fn update_row_id(&mut self, node_idx: NodeIndex, row_id: u32) {
let i = node_idx.index();
if i < self.node_slots.len() {
let mut slot = self.node_slots.get(i);
slot.row_id = row_id;
self.node_slots.set(i, slot);
}
}
pub fn node_indices_iter(&self) -> DiskNodeIndices<'_> {
DiskNodeIndices::new(&self.node_slots)
}
#[inline]
pub(crate) fn materialize_edge(&self, edge_idx: u32) -> &EdgeData {
let ep = self.edge_endpoints.get(edge_idx as usize);
let ct = InternedKey::from_u64(ep.connection_type);
let props = if self.edge_properties.is_empty() {
Vec::new()
} else {
self.edge_properties
.get(edge_idx)
.map(|cow| cow.into_owned())
.unwrap_or_default()
};
let boxed = Box::new(EdgeData {
connection_type: ct,
properties: props,
});
let ptr = &*boxed as *const EdgeData;
let mut arena = self.edge_arena.lock().unwrap();
arena.push(boxed);
unsafe { &*ptr }
}
pub fn count_edges_filtered(
&self,
node: NodeIndex,
dir: Direction,
conn_type: Option<u64>,
other_node_type: Option<InternedKey>,
deadline: Option<std::time::Instant>,
) -> Result<usize, String> {
self.ensure_csr();
let idx = node.index();
let (offsets, edges) = match dir {
Direction::Outgoing => (&self.out_offsets, &self.out_edges),
Direction::Incoming => (&self.in_offsets, &self.in_edges),
};
if idx >= offsets.len().saturating_sub(1) {
return Ok(0);
}
let mut start = offsets.get(idx) as usize;
let mut end = offsets.get(idx + 1) as usize;
if let Some(ct) = conn_type {
if self.csr_sorted_by_type {
let (lo, hi) = crate::graph::core::iterators::binary_search_conn_type(
edges,
&self.edge_endpoints,
start,
end,
ct,
);
start = lo;
end = hi;
}
}
let can_shortcut = !self.has_tombstones
&& other_node_type.is_none()
&& (conn_type.is_none() || self.csr_sorted_by_type);
if can_shortcut {
let overflow = match dir {
Direction::Outgoing => self.overflow_out.get(&(idx as u32)),
Direction::Incoming => self.overflow_in.get(&(idx as u32)),
};
let mut overflow_count = 0usize;
if let Some(list) = overflow {
for e in list {
if let Some(ct) = conn_type {
if self.edge_endpoints.get(e.edge_idx as usize).connection_type != ct {
continue;
}
}
overflow_count += 1;
}
}
return Ok(end.saturating_sub(start) + overflow_count);
}
let mut count = 0usize;
for i in start..end {
if (i - start).is_multiple_of(1 << 20) {
if let Some(dl) = deadline {
if std::time::Instant::now() > dl {
return Err("Query timed out".to_string());
}
}
}
let e = edges.get(i);
if e.edge_idx == TOMBSTONE_EDGE {
continue;
}
if let Some(ct) = conn_type {
if !self.csr_sorted_by_type
&& self.edge_endpoints.get(e.edge_idx as usize).connection_type != ct
{
continue;
}
}
if let Some(required_type) = other_node_type {
let peer_idx = NodeIndex::new(e.peer as usize);
if let Some(nt) = self.node_type_of(peer_idx) {
if nt != required_type {
continue;
}
} else {
continue;
}
}
count += 1;
}
let overflow = match dir {
Direction::Outgoing => self.overflow_out.get(&(idx as u32)),
Direction::Incoming => self.overflow_in.get(&(idx as u32)),
};
if let Some(list) = overflow {
for e in list {
if e.edge_idx == TOMBSTONE_EDGE {
continue;
}
if let Some(ct) = conn_type {
if self.edge_endpoints.get(e.edge_idx as usize).connection_type != ct {
continue;
}
}
if let Some(required_type) = other_node_type {
let peer_idx = NodeIndex::new(e.peer as usize);
if let Some(nt) = self.node_type_of(peer_idx) {
if nt != required_type {
continue;
}
} else {
continue;
}
}
count += 1;
}
}
Ok(count)
}
pub fn iter_peers_filtered(
&self,
node: NodeIndex,
dir: Direction,
conn_type: Option<u64>,
) -> Vec<(NodeIndex, u32)> {
self.ensure_csr();
let idx = node.index();
let (offsets, edges) = match dir {
Direction::Outgoing => (&self.out_offsets, &self.out_edges),
Direction::Incoming => (&self.in_offsets, &self.in_edges),
};
if idx >= offsets.len().saturating_sub(1) {
return Vec::new();
}
let mut start = offsets.get(idx) as usize;
let mut end = offsets.get(idx + 1) as usize;
if let Some(ct) = conn_type {
if self.csr_sorted_by_type {
let (lo, hi) = crate::graph::core::iterators::binary_search_conn_type(
edges,
&self.edge_endpoints,
start,
end,
ct,
);
start = lo;
end = hi;
}
}
let mut result = Vec::with_capacity(end - start);
for i in start..end {
let e = edges.get(i);
if e.edge_idx == TOMBSTONE_EDGE {
continue;
}
if let Some(ct) = conn_type {
if !self.csr_sorted_by_type
&& self.edge_endpoints.get(e.edge_idx as usize).connection_type != ct
{
continue;
}
}
result.push((NodeIndex::new(e.peer as usize), e.edge_idx));
}
let overflow = match dir {
Direction::Outgoing => self.overflow_out.get(&(idx as u32)),
Direction::Incoming => self.overflow_in.get(&(idx as u32)),
};
if let Some(list) = overflow {
for e in list {
if e.edge_idx == TOMBSTONE_EDGE {
continue;
}
if let Some(ct) = conn_type {
if self.edge_endpoints.get(e.edge_idx as usize).connection_type != ct {
continue;
}
}
result.push((NodeIndex::new(e.peer as usize), e.edge_idx));
}
}
result
}
pub fn count_edges_grouped_by_peer(
&self,
conn_type: u64,
dir: Direction,
deadline: Option<std::time::Instant>,
) -> Result<HashMap<u32, i64>, String> {
self.ensure_csr();
let mut counts: HashMap<u32, i64> = HashMap::new();
self.edge_endpoints.advise_sequential();
let total = self.next_edge_idx as usize;
for i in 0..total {
if i.is_multiple_of(1 << 20) {
if let Some(dl) = deadline {
if std::time::Instant::now() > dl {
self.edge_endpoints.advise_dontneed();
return Err("Query timed out".to_string());
}
}
}
let ep = self.edge_endpoints.get(i);
if ep.source == TOMBSTONE_EDGE {
continue;
}
if ep.connection_type != conn_type {
continue;
}
let peer = match dir {
Direction::Outgoing => ep.target, Direction::Incoming => ep.source, };
*counts.entry(peer).or_insert(0) += 1;
}
self.edge_endpoints.advise_dontneed();
Ok(counts)
}
#[allow(dead_code)] pub fn sources_for_conn_type(&self, conn_type: u64) -> Option<Vec<u32>> {
self.sources_for_conn_type_bounded(conn_type, None)
}
pub fn sources_for_conn_type_bounded(
&self,
conn_type: u64,
max: Option<usize>,
) -> Option<Vec<u32>> {
if self.conn_type_index_types.is_empty() && self.overflow_out.is_empty() {
return None;
}
let mut sources = Vec::new();
if !self.conn_type_index_types.is_empty() {
let num_types = self.conn_type_index_types.len();
let mut lo = 0usize;
let mut hi = num_types;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let mid_type = self.conn_type_index_types.get(mid);
if mid_type < conn_type {
lo = mid + 1;
} else if mid_type > conn_type {
hi = mid;
} else {
let start = self.conn_type_index_offsets.get(mid) as usize;
let end = self.conn_type_index_offsets.get(mid + 1) as usize;
let take_end = match max {
Some(m) => start + (end - start).min(m),
None => end,
};
sources.reserve(take_end - start);
for i in start..take_end {
sources.push(self.conn_type_index_sources.get(i));
}
break;
}
}
}
if !self.overflow_out.is_empty() {
for (&node_id, edges) in &self.overflow_out {
for e in edges {
if e.edge_idx != TOMBSTONE_EDGE {
let ep = self.edge_endpoints.get(e.edge_idx as usize);
if ep.connection_type == conn_type {
sources.push(node_id);
break; }
}
}
}
sources.sort_unstable();
sources.dedup();
}
Some(sources)
}
pub fn for_each_edge_of_conn_type<F>(&self, conn_type: u64, mut f: F)
where
F: FnMut(NodeIndex, NodeIndex, u32) -> bool,
{
self.ensure_csr();
if !self.conn_type_index_types.is_empty() {
let num_types = self.conn_type_index_types.len();
let mut lo = 0usize;
let mut hi = num_types;
let mut range: Option<(usize, usize)> = None;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let mid_type = self.conn_type_index_types.get(mid);
if mid_type < conn_type {
lo = mid + 1;
} else if mid_type > conn_type {
hi = mid;
} else {
let s = self.conn_type_index_offsets.get(mid) as usize;
let e = self.conn_type_index_offsets.get(mid + 1) as usize;
range = Some((s, e));
break;
}
}
if let Some((src_start, src_end)) = range {
let out_offsets_len = self.out_offsets.len().saturating_sub(1);
for i in src_start..src_end {
let src_u32 = self.conn_type_index_sources.get(i);
let src_idx = src_u32 as usize;
if src_idx >= out_offsets_len {
continue;
}
let csr_start = self.out_offsets.get(src_idx) as usize;
let csr_end = self.out_offsets.get(src_idx + 1) as usize;
if self.csr_sorted_by_type {
let (lo_p, hi_p) = crate::graph::core::iterators::binary_search_conn_type(
&self.out_edges,
&self.edge_endpoints,
csr_start,
csr_end,
conn_type,
);
for p in lo_p..hi_p {
let e = self.out_edges.get(p);
if e.edge_idx == TOMBSTONE_EDGE {
continue;
}
if !f(
NodeIndex::new(src_u32 as usize),
NodeIndex::new(e.peer as usize),
e.edge_idx,
) {
return;
}
}
} else {
for p in csr_start..csr_end {
let e = self.out_edges.get(p);
if e.edge_idx == TOMBSTONE_EDGE {
continue;
}
let ep = self.edge_endpoints.get(e.edge_idx as usize);
if ep.connection_type == conn_type
&& !f(
NodeIndex::new(src_u32 as usize),
NodeIndex::new(e.peer as usize),
e.edge_idx,
)
{
return;
}
}
}
}
}
}
for (&src_u32, edges) in &self.overflow_out {
for e in edges {
if e.edge_idx == TOMBSTONE_EDGE {
continue;
}
let ep = self.edge_endpoints.get(e.edge_idx as usize);
if ep.connection_type == conn_type
&& !f(
NodeIndex::new(src_u32 as usize),
NodeIndex::new(e.peer as usize),
e.edge_idx,
)
{
return;
}
}
}
}
#[inline]
pub fn edge_properties_at(&self, edge_idx: u32) -> Option<Cow<'_, [(InternedKey, Value)]>> {
self.edge_properties.get(edge_idx)
}
pub fn scan_edges_of_conn_type_linear<F>(&self, conn_type: u64, mut f: F)
where
F: FnMut(NodeIndex, NodeIndex, u32) -> bool,
{
let n = self.next_edge_idx as usize;
for edge_idx in 0..n {
let ep = self.edge_endpoints.get(edge_idx);
if ep.source == TOMBSTONE_EDGE {
continue;
}
if ep.connection_type != conn_type {
continue;
}
if !f(
NodeIndex::new(ep.source as usize),
NodeIndex::new(ep.target as usize),
edge_idx as u32,
) {
return;
}
}
}
pub fn prefetch_hot_regions(&self) {
self.out_offsets.advise_willneed();
self.in_offsets.advise_willneed();
}
#[inline]
fn ensure_csr(&self) {
}
#[inline]
pub(crate) fn clear_arenas(&mut self) {
for (edge_idx, edge_data) in self.edge_mut_cache.drain() {
if edge_data.properties.is_empty() {
self.edge_properties.remove(edge_idx);
} else {
self.edge_properties.insert(edge_idx, edge_data.properties);
}
}
self.flush_node_mut_cache();
self.node_arena.lock().unwrap().clear();
self.edge_arena.lock().unwrap().clear();
}
fn flush_node_mut_cache(&mut self) {
if self.node_mut_cache.is_empty() {
return;
}
use crate::graph::schema::PropertyStorage;
let drained: Vec<(u32, NodeData)> = self.node_mut_cache.drain().collect();
let mut by_type: HashMap<InternedKey, Vec<(u32, NodeData)>> = HashMap::new();
for (i, nd) in drained {
if (i as usize) >= self.node_slots.len() {
continue;
}
let slot = self.node_slots.get(i as usize);
let type_key = InternedKey::from_u64(slot.node_type);
by_type.entry(type_key).or_default().push((i, nd));
}
for (type_key, updates) in by_type {
let Some(current_arc) = self.column_stores.get(&type_key) else {
continue;
};
let any_writes_needed = updates.iter().any(|(i, nd)| {
let slot = self.node_slots.get(*i as usize);
if !slot.is_alive() {
return true;
}
if let PropertyStorage::Map(map) = &nd.properties {
if !map.is_empty() {
return true;
}
}
if !matches!(nd.title, Value::Null) {
let current = current_arc.get_title(slot.row_id);
return match (current, &nd.title) {
(Some(a), b) => a != *b,
(None, _) => true,
};
}
false
});
if !any_writes_needed {
continue;
}
let mut new_store: crate::graph::storage::column_store::ColumnStore =
(**current_arc).clone();
for (i, nd) in updates {
let slot = self.node_slots.get(i as usize);
let row_id = slot.row_id;
if !slot.is_alive() {
new_store.tombstone(row_id);
continue;
}
if !matches!(nd.title, Value::Null) {
let current = new_store.get_title(row_id);
let differs = match (¤t, &nd.title) {
(Some(a), b) => a != b,
(None, _) => true,
};
if differs {
let _ = new_store.set_title(row_id, &nd.title);
}
}
if let PropertyStorage::Map(map) = &nd.properties {
for (key, value) in map {
let _ = new_store.set(row_id, *key, value, None);
}
}
}
self.column_stores
.insert(type_key, std::sync::Arc::new(new_store));
}
}
pub fn reset_arenas(&self) {
self.node_arena.lock().unwrap().clear();
self.edge_arena.lock().unwrap().clear();
}
pub fn edges_directed_iter(&self, a: NodeIndex, dir: Direction) -> DiskEdges<'_> {
self.edges_directed_filtered_iter(a, dir, None)
}
pub fn edges_directed_filtered_iter(
&self,
a: NodeIndex,
dir: Direction,
conn_type_filter: Option<u64>,
) -> DiskEdges<'_> {
self.ensure_csr();
let node = a.index();
let (offsets, edges) = match dir {
Direction::Outgoing => (&self.out_offsets, &self.out_edges),
Direction::Incoming => (&self.in_offsets, &self.in_edges),
};
let overflow = match dir {
Direction::Outgoing => self.overflow_out.get(&(node as u32)),
Direction::Incoming => self.overflow_in.get(&(node as u32)),
};
let (start, end) = if node < offsets.len().saturating_sub(1) {
(offsets.get(node) as usize, offsets.get(node + 1) as usize)
} else {
(0, 0)
};
let iter = DiskEdges::new(self, dir, a, edges, start, end, overflow);
if let Some(ct) = conn_type_filter {
iter.with_conn_type_filter(ct)
} else {
iter
}
}
pub fn edge_references_iter(&self) -> DiskEdgeReferences<'_> {
self.ensure_csr();
DiskEdgeReferences::new(self)
}
pub fn edge_indices_iter(&self) -> DiskEdgeIndices<'_> {
self.ensure_csr();
DiskEdgeIndices::new(self.next_edge_idx, &self.edge_endpoints)
}
#[inline]
pub fn edge_count(&self) -> usize {
self.edge_count
}
pub fn edge_weight(&self, idx: EdgeIndex) -> Option<&EdgeData> {
self.ensure_csr();
let ei = idx.index();
if ei >= self.next_edge_idx as usize {
return None;
}
let ep = self.edge_endpoints.get(ei);
if ep.source == TOMBSTONE_EDGE {
return None;
}
Some(self.materialize_edge(ei as u32))
}
pub fn edge_weight_mut(&mut self, idx: EdgeIndex) -> Option<&mut EdgeData> {
let ei = idx.index();
if ei >= self.next_edge_idx as usize {
return None;
}
let ep = self.edge_endpoints.get(ei);
if ep.source == TOMBSTONE_EDGE {
return None;
}
self.metadata_dirty = true;
let ct = InternedKey::from_u64(ep.connection_type);
let props = self
.edge_properties
.get(ei as u32)
.map(|cow| cow.into_owned())
.unwrap_or_default();
self.edge_mut_cache.entry(ei as u32).or_insert(EdgeData {
connection_type: ct,
properties: props,
});
Some(self.edge_mut_cache.get_mut(&(ei as u32)).unwrap())
}
pub fn edge_endpoints_fn(&self, idx: EdgeIndex) -> Option<(NodeIndex, NodeIndex)> {
self.ensure_csr();
let ei = idx.index();
if ei >= self.next_edge_idx as usize {
return None;
}
let ep = self.edge_endpoints.get(ei);
if ep.source == TOMBSTONE_EDGE {
return None;
}
Some((
NodeIndex::new(ep.source as usize),
NodeIndex::new(ep.target as usize),
))
}
pub fn add_edge(&mut self, a: NodeIndex, b: NodeIndex, data: EdgeData) -> EdgeIndex {
self.clear_arenas();
self.metadata_dirty = true;
let edge_idx = self.next_edge_idx;
self.next_edge_idx += 1;
let ct = data.connection_type;
if !data.properties.is_empty() {
self.edge_properties.insert(edge_idx, data.properties);
}
let src = a.index() as u32;
let tgt = b.index() as u32;
let ct_u64 = ct.as_u64();
if self.defer_csr {
self.pending_edges.get_mut().push((src, tgt, ct_u64));
} else {
self.edge_endpoints.push(EdgeEndpoints {
source: src,
target: tgt,
connection_type: ct_u64,
});
self.overflow_out.entry(src).or_default().push(CsrEdge {
peer: tgt,
edge_idx,
});
self.overflow_in.entry(tgt).or_default().push(CsrEdge {
peer: src,
edge_idx,
});
}
self.edge_count += 1;
EdgeIndex::new(edge_idx as usize)
}
pub fn remove_edge(&mut self, idx: EdgeIndex) -> Option<EdgeData> {
self.clear_arenas();
self.metadata_dirty = true;
let ei = idx.index();
if ei >= self.next_edge_idx as usize {
return None;
}
let ep = self.edge_endpoints.get(ei);
if ep.source == TOMBSTONE_EDGE {
return None;
}
let ct = InternedKey::from_u64(ep.connection_type);
let props = self.edge_properties.take(ei as u32).unwrap_or_default();
let result = EdgeData {
connection_type: ct,
properties: props,
};
let src = ep.source as usize;
let tgt = ep.target as usize;
let ei32 = ei as u32;
Self::tombstone_in_array(&self.out_offsets, &mut self.out_edges, src, ei32);
Self::tombstone_in_array(&self.in_offsets, &mut self.in_edges, tgt, ei32);
if let Some(list) = self.overflow_out.get_mut(&(src as u32)) {
list.retain(|e| e.edge_idx != ei32);
}
if let Some(list) = self.overflow_in.get_mut(&(tgt as u32)) {
list.retain(|e| e.edge_idx != ei32);
}
self.edge_endpoints.set(
ei,
EdgeEndpoints {
source: TOMBSTONE_EDGE,
target: TOMBSTONE_EDGE,
connection_type: 0,
},
);
self.edge_count -= 1;
self.free_edge_slots.push(ei32);
self.has_tombstones = true;
Some(result)
}
pub fn find_edge(&self, a: NodeIndex, b: NodeIndex) -> Option<EdgeIndex> {
self.ensure_csr();
let src = a.index();
let tgt = b.index() as u32;
if src < self.out_offsets.len().saturating_sub(1) {
let start = self.out_offsets.get(src) as usize;
let end = self.out_offsets.get(src + 1) as usize;
for i in start..end {
let e = self.out_edges.get(i);
if e.edge_idx != TOMBSTONE_EDGE && e.peer == tgt {
return Some(EdgeIndex::new(e.edge_idx as usize));
}
}
}
if let Some(list) = self.overflow_out.get(&(src as u32)) {
for e in list {
if e.edge_idx != TOMBSTONE_EDGE && e.peer == tgt {
return Some(EdgeIndex::new(e.edge_idx as usize));
}
}
}
None
}
pub fn edges_connecting_iter(&self, a: NodeIndex, b: NodeIndex) -> DiskEdgesConnecting<'_> {
self.ensure_csr();
DiskEdgesConnecting::new(self, a, b)
}
pub fn edge_weights_iter(&self) -> Box<dyn Iterator<Item = &EdgeData> + '_> {
self.ensure_csr();
Box::new((0..self.next_edge_idx).filter_map(move |i| {
let ep = self.edge_endpoints.get(i as usize);
if ep.source == TOMBSTONE_EDGE {
return None;
}
Some(self.materialize_edge(i))
}))
}
pub fn neighbors_directed_iter(&self, a: NodeIndex, dir: Direction) -> DiskNeighbors {
self.ensure_csr();
let node = a.index();
let (offsets, edges) = match dir {
Direction::Outgoing => (&self.out_offsets, &self.out_edges),
Direction::Incoming => (&self.in_offsets, &self.in_edges),
};
let overflow = match dir {
Direction::Outgoing => self.overflow_out.get(&(node as u32)),
Direction::Incoming => self.overflow_in.get(&(node as u32)),
};
if node >= offsets.len().saturating_sub(1) {
return DiskNeighbors::new_empty();
}
let start = offsets.get(node) as usize;
let end = offsets.get(node + 1) as usize;
DiskNeighbors::new(edges, start, end, overflow)
}
pub fn neighbors_undirected_iter(&self, a: NodeIndex) -> DiskNeighbors {
self.ensure_csr();
let node = a.index();
let mut peers = Vec::new();
if node < self.out_offsets.len().saturating_sub(1) {
let start = self.out_offsets.get(node) as usize;
let end = self.out_offsets.get(node + 1) as usize;
for i in start..end {
let e = self.out_edges.get(i);
if e.edge_idx != TOMBSTONE_EDGE {
peers.push(NodeIndex::new(e.peer as usize));
}
}
}
if let Some(list) = self.overflow_out.get(&(node as u32)) {
for e in list {
if e.edge_idx != TOMBSTONE_EDGE {
peers.push(NodeIndex::new(e.peer as usize));
}
}
}
if node < self.in_offsets.len().saturating_sub(1) {
let start = self.in_offsets.get(node) as usize;
let end = self.in_offsets.get(node + 1) as usize;
for i in start..end {
let e = self.in_edges.get(i);
if e.edge_idx != TOMBSTONE_EDGE {
peers.push(NodeIndex::new(e.peer as usize));
}
}
}
if let Some(list) = self.overflow_in.get(&(node as u32)) {
for e in list {
if e.edge_idx != TOMBSTONE_EDGE {
peers.push(NodeIndex::new(e.peer as usize));
}
}
}
DiskNeighbors::from_collected(peers)
}
pub fn has_overflow(&self) -> bool {
self.overflow_out.values().any(|v| !v.is_empty())
|| self.overflow_in.values().any(|v| !v.is_empty())
}
pub fn build_csr_from_pending(&mut self) {
let pending = self.pending_edges.get_mut();
if pending.is_empty() {
return;
}
let node_bound = self.node_slots.len();
let edge_count = pending.len();
let verbose = std::env::var("KGLITE_BUILD_DEBUG").is_ok();
let use_merge_sort = std::env::var("KGLITE_CSR_ALGO").is_ok_and(|v| v == "merge_sort");
if use_merge_sort {
self.build_csr_merge_sort(node_bound, edge_count, verbose);
} else {
self.build_csr_partitioned(node_bound, edge_count, verbose);
}
self.defer_csr = false;
}
pub fn compact(&mut self) -> usize {
let overflow_count: usize = self.overflow_out.values().map(|v| v.len()).sum();
if overflow_count == 0 {
return 0;
}
let verbose = std::env::var("KGLITE_BUILD_DEBUG").is_ok();
if verbose {
eprintln!(
"Compacting: {} CSR edges + {} overflow edges",
self.edge_count.saturating_sub(overflow_count),
overflow_count
);
}
let node_bound = self.node_slots.len();
let mut live_count = 0usize;
let total_endpoints = self.next_edge_idx as usize;
let pending_path = self.data_dir.join("_compact_pending.bin");
let mut new_pending: MmapOrVec<(u32, u32, u64)> =
MmapOrVec::mapped(&pending_path, total_endpoints)
.unwrap_or_else(|_| MmapOrVec::with_capacity(total_endpoints));
let mut idx_remap: Vec<u32> = vec![TOMBSTONE_EDGE; total_endpoints];
for (old_idx, remap_slot) in idx_remap.iter_mut().enumerate().take(total_endpoints) {
let ep = self.edge_endpoints.get(old_idx);
if ep.source != TOMBSTONE_EDGE
&& (ep.source as usize) < node_bound
&& (ep.target as usize) < node_bound
{
*remap_slot = live_count as u32;
new_pending.push((ep.source, ep.target, ep.connection_type));
live_count += 1;
}
}
let old_props = std::mem::take(&mut self.edge_properties);
let upper = old_props.upper_bound();
for old_idx in 0..upper {
if let Some(cow) = old_props.get(old_idx) {
let new_idx = idx_remap[old_idx as usize];
if new_idx != TOMBSTONE_EDGE {
self.edge_properties.insert(new_idx, cow.into_owned());
}
}
}
drop(old_props);
self.overflow_out.clear();
self.overflow_in.clear();
self.free_edge_slots.clear();
self.edge_count = live_count;
self.next_edge_idx = live_count as u32;
let old_pending_path = self
.pending_edges
.get_mut()
.file_path()
.map(|p| p.to_path_buf());
*self.pending_edges.get_mut() = new_pending;
if let Some(path) = old_pending_path {
let _ = std::fs::remove_file(path);
}
self.build_csr_from_pending();
let _ = std::fs::remove_file(&pending_path);
if verbose {
eprintln!(
"Compaction done: {} live edges (removed {} tombstoned)",
live_count,
total_endpoints - live_count
);
}
overflow_count
}
pub fn lookup_peer_counts(&self, conn_type: u64) -> Option<HashMap<u32, i64>> {
if self.peer_count_types.is_empty() {
return None;
}
let n = self.peer_count_types.len();
let mut lo = 0usize;
let mut hi = n;
while lo < hi {
let mid = lo + (hi - lo) / 2;
let t = self.peer_count_types.get(mid);
match t.cmp(&conn_type) {
std::cmp::Ordering::Less => lo = mid + 1,
std::cmp::Ordering::Greater => hi = mid,
std::cmp::Ordering::Equal => {
let start = self.peer_count_offsets.get(mid) as usize;
let end = self.peer_count_offsets.get(mid + 1) as usize;
let mut out: HashMap<u32, i64> = HashMap::with_capacity(end - start);
for i in start..end {
let peer = self.peer_count_entries.get(i * 2);
let count = self.peer_count_entries.get(i * 2 + 1);
out.insert(peer, count as i64);
}
return Some(out);
}
}
}
None
}
fn tombstone_edges_for_node(&mut self, node: usize) {
if node < self.out_offsets.len().saturating_sub(1) {
let start = self.out_offsets.get(node) as usize;
let end = self.out_offsets.get(node + 1) as usize;
for i in start..end {
let mut e = self.out_edges.get(i);
if e.edge_idx != TOMBSTONE_EDGE {
let ei = e.edge_idx;
e.edge_idx = TOMBSTONE_EDGE;
self.out_edges.set(i, e);
self.tombstone_in_edge_for(e.peer as usize, ei);
self.edge_endpoints.set(
ei as usize,
EdgeEndpoints {
source: TOMBSTONE_EDGE,
target: TOMBSTONE_EDGE,
connection_type: 0,
},
);
self.edge_properties.remove(ei);
self.edge_count -= 1;
self.free_edge_slots.push(ei);
}
}
}
if node < self.in_offsets.len().saturating_sub(1) {
let start = self.in_offsets.get(node) as usize;
let end = self.in_offsets.get(node + 1) as usize;
for i in start..end {
let mut e = self.in_edges.get(i);
if e.edge_idx != TOMBSTONE_EDGE {
let ei = e.edge_idx;
e.edge_idx = TOMBSTONE_EDGE;
self.in_edges.set(i, e);
self.tombstone_out_edge_for(e.peer as usize, ei);
self.edge_endpoints.set(
ei as usize,
EdgeEndpoints {
source: TOMBSTONE_EDGE,
target: TOMBSTONE_EDGE,
connection_type: 0,
},
);
self.edge_properties.remove(ei);
self.edge_count -= 1;
self.free_edge_slots.push(ei);
}
}
}
if let Some(list) = self.overflow_out.remove(&(node as u32)) {
for e in &list {
if e.edge_idx != TOMBSTONE_EDGE {
self.tombstone_in_edge_for(e.peer as usize, e.edge_idx);
self.edge_endpoints.set(
e.edge_idx as usize,
EdgeEndpoints {
source: TOMBSTONE_EDGE,
target: TOMBSTONE_EDGE,
connection_type: 0,
},
);
self.edge_properties.remove(e.edge_idx);
self.edge_count -= 1;
self.free_edge_slots.push(e.edge_idx);
}
}
}
if let Some(list) = self.overflow_in.remove(&(node as u32)) {
for e in &list {
if e.edge_idx != TOMBSTONE_EDGE {
self.tombstone_out_edge_for(e.peer as usize, e.edge_idx);
self.edge_endpoints.set(
e.edge_idx as usize,
EdgeEndpoints {
source: TOMBSTONE_EDGE,
target: TOMBSTONE_EDGE,
connection_type: 0,
},
);
self.edge_properties.remove(e.edge_idx);
self.edge_count -= 1;
self.free_edge_slots.push(e.edge_idx);
}
}
}
}
fn tombstone_in_edge_for(&mut self, node: usize, edge_idx: u32) {
if node < self.in_offsets.len().saturating_sub(1) {
let start = self.in_offsets.get(node) as usize;
let end = self.in_offsets.get(node + 1) as usize;
for i in start..end {
let mut e = self.in_edges.get(i);
if e.edge_idx == edge_idx {
e.edge_idx = TOMBSTONE_EDGE;
self.in_edges.set(i, e);
return;
}
}
}
if let Some(list) = self.overflow_in.get_mut(&(node as u32)) {
list.retain(|e| e.edge_idx != edge_idx);
}
}
fn tombstone_out_edge_for(&mut self, node: usize, edge_idx: u32) {
if node < self.out_offsets.len().saturating_sub(1) {
let start = self.out_offsets.get(node) as usize;
let end = self.out_offsets.get(node + 1) as usize;
for i in start..end {
let mut e = self.out_edges.get(i);
if e.edge_idx == edge_idx {
e.edge_idx = TOMBSTONE_EDGE;
self.out_edges.set(i, e);
return;
}
}
}
if let Some(list) = self.overflow_out.get_mut(&(node as u32)) {
list.retain(|e| e.edge_idx != edge_idx);
}
}
fn tombstone_in_array(
offsets: &MmapOrVec<u64>,
edges: &mut MmapOrVec<CsrEdge>,
node: usize,
edge_idx: u32,
) {
if node < offsets.len().saturating_sub(1) {
let start = offsets.get(node) as usize;
let end = offsets.get(node + 1) as usize;
for i in start..end {
let mut e = edges.get(i);
if e.edge_idx == edge_idx {
e.edge_idx = TOMBSTONE_EDGE;
edges.set(i, e);
return;
}
}
}
}
}
impl Clone for DiskGraph {
fn clone(&self) -> Self {
let mut node_slots = MmapOrVec::with_capacity(self.node_slots.len());
for i in 0..self.node_slots.len() {
node_slots.push(self.node_slots.get(i));
}
let mut out_offsets = MmapOrVec::with_capacity(self.out_offsets.len());
for i in 0..self.out_offsets.len() {
out_offsets.push(self.out_offsets.get(i));
}
let mut out_edges = MmapOrVec::with_capacity(self.out_edges.len());
for i in 0..self.out_edges.len() {
out_edges.push(self.out_edges.get(i));
}
let mut in_offsets = MmapOrVec::with_capacity(self.in_offsets.len());
for i in 0..self.in_offsets.len() {
in_offsets.push(self.in_offsets.get(i));
}
let mut in_edges = MmapOrVec::with_capacity(self.in_edges.len());
for i in 0..self.in_edges.len() {
in_edges.push(self.in_edges.get(i));
}
let mut edge_endpoints = MmapOrVec::with_capacity(self.edge_endpoints.len());
for i in 0..self.edge_endpoints.len() {
edge_endpoints.push(self.edge_endpoints.get(i));
}
DiskGraph {
node_slots,
node_count: self.node_count,
free_node_slots: self.free_node_slots.clone(),
node_arena: std::sync::Mutex::new(Vec::new()),
column_stores: self.column_stores.clone(),
out_offsets,
out_edges,
in_offsets,
in_edges,
edge_endpoints,
edge_count: self.edge_count,
next_edge_idx: self.next_edge_idx,
edge_properties: self.edge_properties.deep_clone(),
edge_arena: std::sync::Mutex::new(Vec::new()),
edge_mut_cache: HashMap::new(),
node_mut_cache: HashMap::new(),
pending_edges: UnsafeCell::new(MmapOrVec::new()),
overflow_out: self.overflow_out.clone(),
overflow_in: self.overflow_in.clone(),
free_edge_slots: self.free_edge_slots.clone(),
data_dir: self.data_dir.clone(),
metadata_dirty: false,
csr_sorted_by_type: self.csr_sorted_by_type,
defer_csr: false,
edge_type_counts_raw: None,
conn_type_index_types: MmapOrVec::new(),
conn_type_index_offsets: MmapOrVec::new(),
conn_type_index_sources: MmapOrVec::new(),
peer_count_types: MmapOrVec::new(),
peer_count_offsets: MmapOrVec::new(),
peer_count_entries: MmapOrVec::new(),
global_indexes: std::sync::RwLock::new(HashMap::new()),
has_tombstones: self.has_tombstones,
property_indexes: std::sync::RwLock::new(HashMap::new()),
segment_manifest: self.segment_manifest.clone(),
sealed_nodes_bound: self.sealed_nodes_bound,
}
}
}
impl Drop for DiskGraph {
fn drop(&mut self) {
if self.metadata_dirty {
let _ = self.write_metadata();
}
}
}
impl std::ops::Index<NodeIndex> for DiskGraph {
type Output = NodeData;
#[inline]
fn index(&self, index: NodeIndex) -> &NodeData {
self.node_weight(index).expect("DiskGraph: node not found")
}
}
impl std::ops::Index<EdgeIndex> for DiskGraph {
type Output = EdgeData;
#[inline]
fn index(&self, index: EdgeIndex) -> &EdgeData {
self.edge_weight(index).expect("DiskGraph: edge not found")
}
}
#[cfg(test)]
#[path = "graph_tests.rs"]
mod tests;