use std::sync::Arc;
use crate::storage::index::{IndexRegistry, IndexScope};
use super::label_registry::Namespace;
use super::*;
impl GraphStore {
pub fn new() -> Self {
Self::with_registry(Arc::new(LabelRegistry::with_legacy_seed()))
}
pub fn with_registry(registry: Arc<LabelRegistry>) -> Self {
const SHARD_COUNT: usize = 16;
let initial_node_page = Page::new(PageType::GraphNode, 0);
let initial_edge_page = Page::new(PageType::GraphEdge, 0);
Self {
node_index: ShardedIndex::new(SHARD_COUNT),
edge_index: EdgeIndex::new(SHARD_COUNT),
node_secondary: Arc::new(NodeSecondaryIndex::new(8192)),
registry,
node_pages: RwLock::new(vec![initial_node_page]),
edge_pages: RwLock::new(vec![initial_edge_page]),
current_node_page: AtomicU32::new(0),
current_edge_page: AtomicU32::new(0),
stats: GraphStats::default(),
node_count: AtomicU64::new(0),
edge_count: AtomicU64::new(0),
}
}
pub fn intern_node_label(&self, label: &str) -> Result<LabelId, GraphStoreError> {
self.registry
.intern(Namespace::Node, label)
.map_err(|e| GraphStoreError::InvalidData(e.to_string()))
}
pub fn intern_edge_label(&self, label: &str) -> Result<LabelId, GraphStoreError> {
self.registry
.intern(Namespace::Edge, label)
.map_err(|e| GraphStoreError::InvalidData(e.to_string()))
}
pub fn publish_indexes(&self, registry: &IndexRegistry, collection: &str) {
registry.register(
IndexScope::graph(collection),
Arc::clone(&self.node_secondary) as Arc<dyn crate::storage::index::IndexBase>,
);
}
pub fn add_node_with_label(
&self,
id: &str,
display_label: &str,
category: &str,
) -> Result<RecordLocation, GraphStoreError> {
if self.node_index.contains(id) {
return Err(GraphStoreError::NodeExists(id.to_string()));
}
let label_id = self.intern_node_label(category)?;
let node = StoredNode {
id: id.to_string(),
label: display_label.to_string(),
node_type: category.to_string(),
label_id,
flags: 0,
out_edge_count: 0,
in_edge_count: 0,
page_id: 0,
slot: 0,
table_ref: None,
vector_ref: None,
};
let location = self.write_node_record(id, &node)?;
self.node_index.insert(id.to_string(), location);
self.node_secondary.insert(id, label_id, display_label);
self.node_count.fetch_add(1, Ordering::Relaxed);
Ok(location)
}
pub fn add_edge_with_label(
&self,
source_id: &str,
target_id: &str,
category: &str,
weight: f32,
) -> Result<RecordLocation, GraphStoreError> {
if !self.node_index.contains(source_id) {
return Err(GraphStoreError::NodeNotFound(source_id.to_string()));
}
if !self.node_index.contains(target_id) {
return Err(GraphStoreError::NodeNotFound(target_id.to_string()));
}
let label_id = self.intern_edge_label(category)?;
let edge = StoredEdge {
source_id: source_id.to_string(),
target_id: target_id.to_string(),
edge_type: category.to_string(),
label_id,
weight,
page_id: 0,
slot: 0,
};
let location = self.write_edge_record(source_id, target_id, label_id, &edge)?;
self.edge_index
.add_edge(source_id, target_id, category, weight);
self.edge_count.fetch_add(1, Ordering::Relaxed);
Ok(location)
}
fn write_node_record(
&self,
id: &str,
node: &StoredNode,
) -> Result<RecordLocation, GraphStoreError> {
let encoded = node.encode();
let mut pages = self
.node_pages
.write()
.map_err(|_| GraphStoreError::LockPoisoned)?;
let current_page_id = self.current_node_page.load(Ordering::Acquire);
let page = &mut pages[current_page_id as usize];
match page.insert_cell(id.as_bytes(), &encoded) {
Ok(slot) => Ok(RecordLocation {
page_id: current_page_id,
slot: slot as u16,
}),
Err(_) => {
let new_page_id = pages.len() as u32;
let mut new_page = Page::new(PageType::GraphNode, new_page_id);
let slot = new_page
.insert_cell(id.as_bytes(), &encoded)
.map_err(|_| GraphStoreError::PageFull)?;
pages.push(new_page);
self.current_node_page.store(new_page_id, Ordering::Release);
Ok(RecordLocation {
page_id: new_page_id,
slot: slot as u16,
})
}
}
}
fn write_edge_record(
&self,
source_id: &str,
target_id: &str,
label_id: LabelId,
edge: &StoredEdge,
) -> Result<RecordLocation, GraphStoreError> {
let encoded = edge.encode();
let edge_key = format!("{}|{}|{}", source_id, label_id.as_u32(), target_id);
let mut pages = self
.edge_pages
.write()
.map_err(|_| GraphStoreError::LockPoisoned)?;
let current_page_id = self.current_edge_page.load(Ordering::Acquire);
let page = &mut pages[current_page_id as usize];
match page.insert_cell(edge_key.as_bytes(), &encoded) {
Ok(slot) => Ok(RecordLocation {
page_id: current_page_id,
slot: slot as u16,
}),
Err(_) => {
let new_page_id = pages.len() as u32;
let mut new_page = Page::new(PageType::GraphEdge, new_page_id);
let slot = new_page
.insert_cell(edge_key.as_bytes(), &encoded)
.map_err(|_| GraphStoreError::PageFull)?;
pages.push(new_page);
self.current_edge_page.store(new_page_id, Ordering::Release);
Ok(RecordLocation {
page_id: new_page_id,
slot: slot as u16,
})
}
}
}
pub fn add_node_linked(
&self,
id: &str,
label: &str,
category: &str,
table_id: u16,
row_id: u64,
) -> Result<RecordLocation, GraphStoreError> {
if self.node_index.contains(id) {
return Err(GraphStoreError::NodeExists(id.to_string()));
}
let label_id = self.intern_node_label(category)?;
let node = StoredNode {
id: id.to_string(),
label: label.to_string(),
node_type: category.to_string(),
label_id,
flags: NODE_FLAG_HAS_TABLE_REF,
out_edge_count: 0,
in_edge_count: 0,
page_id: 0,
slot: 0,
table_ref: Some(TableRef::new(table_id, row_id)),
vector_ref: None,
};
let location = self.write_node_record(id, &node)?;
self.node_index.insert(id.to_string(), location);
self.node_secondary.insert(id, label_id, label);
self.node_count.fetch_add(1, Ordering::Relaxed);
Ok(location)
}
pub fn get_node_table_ref(&self, node_id: &str) -> Option<TableRef> {
self.get_node(node_id).and_then(|n| n.table_ref)
}
pub fn get_node(&self, id: &str) -> Option<StoredNode> {
let location = self.node_index.get(id)?;
let pages = self.node_pages.read().ok()?;
let page = pages.get(location.page_id as usize)?;
let (_, value) = page.read_cell(location.slot as usize).ok()?;
StoredNode::decode(&value, location.page_id, location.slot)
}
#[inline]
pub fn outgoing_edges(&self, source_id: &str) -> Vec<(String, String, f32)> {
self.edge_index.outgoing(source_id)
}
#[inline]
pub fn incoming_edges(&self, target_id: &str) -> Vec<(String, String, f32)> {
self.edge_index.incoming(target_id)
}
#[inline]
pub fn outgoing_of_type(&self, source_id: &str, edge_label: &str) -> Vec<(String, f32)> {
self.edge_index.outgoing_of_type(source_id, edge_label)
}
#[inline]
pub fn has_node(&self, id: &str) -> bool {
self.node_index.contains(id)
}
#[inline]
pub fn node_count(&self) -> u64 {
self.node_count.load(Ordering::Relaxed)
}
#[inline]
pub fn edge_count(&self) -> u64 {
self.edge_count.load(Ordering::Relaxed)
}
pub fn iter_nodes(&self) -> NodeIterator<'_> {
NodeIterator {
store: self,
page_idx: 0,
cell_idx: 0,
}
}
pub fn iter_all_edges(&self) -> Vec<StoredEdge> {
let mut edges = Vec::new();
for node in self.iter_nodes() {
for (edge_label, target_id, weight) in self.outgoing_edges(&node.id) {
let label_id = self
.registry
.lookup(Namespace::Edge, &edge_label)
.unwrap_or(UNSET_LABEL_ID);
edges.push(StoredEdge {
source_id: node.id.clone(),
target_id,
edge_type: edge_label,
label_id,
weight,
page_id: 0,
slot: 0,
});
}
}
edges
}
pub fn nodes_of_label(&self, label_id: LabelId) -> Vec<StoredNode> {
self.node_secondary
.nodes_by_type(label_id)
.into_iter()
.filter_map(|id| self.get_node(&id))
.collect()
}
pub fn nodes_by_label(&self, label: &str) -> Vec<StoredNode> {
self.node_secondary
.nodes_by_label(label)
.into_iter()
.filter_map(|id| self.get_node(&id))
.collect()
}
pub fn nodes_with_category(&self, category: &str) -> Vec<StoredNode> {
let Some(label_id) = self.registry.lookup(Namespace::Node, category) else {
return Vec::new();
};
self.nodes_of_label(label_id)
}
pub fn may_contain_label(&self, label: &str) -> bool {
self.node_secondary.may_contain_label(label)
}
pub fn node_secondary_index(&self) -> &NodeSecondaryIndex {
&self.node_secondary
}
pub fn stats(&self) -> GraphStats {
let mut stats = GraphStats {
node_count: self.node_count.load(Ordering::Relaxed),
edge_count: self.edge_count.load(Ordering::Relaxed),
node_pages: self.node_pages.read().map(|p| p.len() as u32).unwrap_or(0),
edge_pages: self.edge_pages.read().map(|p| p.len() as u32).unwrap_or(0),
..Default::default()
};
for (label_id, count) in self.node_secondary.label_id_counts() {
if let Some((Namespace::Node, label)) = self.registry.resolve(label_id) {
stats.nodes_by_label.insert(label, count);
}
}
stats
}
pub fn serialize(&self) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(b"RBGR"); buf.extend_from_slice(&2u32.to_le_bytes()); buf.extend_from_slice(&self.node_count.load(Ordering::Relaxed).to_le_bytes());
buf.extend_from_slice(&self.edge_count.load(Ordering::Relaxed).to_le_bytes());
let registry_bytes = self.registry.encode().unwrap_or_default();
buf.extend_from_slice(&(registry_bytes.len() as u32).to_le_bytes());
buf.extend_from_slice(®istry_bytes);
if let Ok(pages) = self.node_pages.read() {
buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
for page in pages.iter() {
buf.extend_from_slice(page.as_bytes());
}
}
if let Ok(pages) = self.edge_pages.read() {
buf.extend_from_slice(&(pages.len() as u32).to_le_bytes());
for page in pages.iter() {
buf.extend_from_slice(page.as_bytes());
}
}
buf
}
pub fn deserialize(data: &[u8]) -> Result<Self, GraphStoreError> {
if data.len() < 24 {
return Err(GraphStoreError::InvalidData("Too short".to_string()));
}
if &data[0..4] != b"RBGR" {
return Err(GraphStoreError::InvalidData("Invalid magic".to_string()));
}
let version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
let node_count = u64::from_le_bytes([
data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
]);
let edge_count = u64::from_le_bytes([
data[16], data[17], data[18], data[19], data[20], data[21], data[22], data[23],
]);
let mut offset = 24;
let registry: Arc<LabelRegistry> = match version {
1 => Arc::new(LabelRegistry::with_legacy_seed()),
2 => {
if data.len() < offset + 4 {
return Err(GraphStoreError::InvalidData(
"Truncated v2 header".to_string(),
));
}
let reg_len = u32::from_le_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]) as usize;
offset += 4;
if data.len() < offset + reg_len {
return Err(GraphStoreError::InvalidData(
"Truncated registry blob".to_string(),
));
}
let reg = LabelRegistry::decode(&data[offset..offset + reg_len])
.map_err(|e| GraphStoreError::InvalidData(e.to_string()))?;
offset += reg_len;
Arc::new(reg)
}
v => {
return Err(GraphStoreError::InvalidData(format!(
"Unsupported graph file version {}",
v
)));
}
};
if data.len() < offset + 4 {
return Err(GraphStoreError::InvalidData(
"Truncated before node-page count".to_string(),
));
}
let node_page_count = u32::from_le_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]) as usize;
offset += 4;
let mut node_pages = Vec::with_capacity(node_page_count);
for _ in 0..node_page_count {
if offset + PAGE_SIZE > data.len() {
return Err(GraphStoreError::InvalidData(
"Truncated node pages".to_string(),
));
}
let page = Page::from_slice(&data[offset..offset + PAGE_SIZE])
.map_err(|_| GraphStoreError::InvalidData("Invalid page".to_string()))?;
node_pages.push(page);
offset += PAGE_SIZE;
}
if data.len() < offset + 4 {
return Err(GraphStoreError::InvalidData(
"Truncated before edge-page count".to_string(),
));
}
let edge_page_count = u32::from_le_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]) as usize;
offset += 4;
let mut edge_pages = Vec::with_capacity(edge_page_count);
for _ in 0..edge_page_count {
if offset + PAGE_SIZE > data.len() {
return Err(GraphStoreError::InvalidData(
"Truncated edge pages".to_string(),
));
}
let page = Page::from_slice(&data[offset..offset + PAGE_SIZE])
.map_err(|_| GraphStoreError::InvalidData("Invalid page".to_string()))?;
edge_pages.push(page);
offset += PAGE_SIZE;
}
if version == 1 {
let store = Self::with_registry(Arc::clone(®istry));
for (page_idx, page) in node_pages.iter().enumerate() {
let cell_count = page.cell_count() as usize;
for cell_idx in 0..cell_count {
if let Ok((_, value)) = page.read_cell(cell_idx) {
if let Some(n) =
StoredNode::decode_v1(&value, page_idx as u32, cell_idx as u16)
{
store.add_node_with_label(&n.id, &n.label, &n.node_type)?;
}
}
}
}
for (page_idx, page) in edge_pages.iter().enumerate() {
let cell_count = page.cell_count() as usize;
for cell_idx in 0..cell_count {
if let Ok((_, value)) = page.read_cell(cell_idx) {
if let Some(e) =
StoredEdge::decode_v1(&value, page_idx as u32, cell_idx as u16)
{
if !store.has_node(&e.source_id) || !store.has_node(&e.target_id) {
continue;
}
store.add_edge_with_label(
&e.source_id,
&e.target_id,
&e.edge_type,
e.weight,
)?;
}
}
}
}
let _ = (node_count, edge_count);
return Ok(store);
}
let store = Self {
node_index: ShardedIndex::new(16),
edge_index: EdgeIndex::new(16),
node_secondary: Arc::new(NodeSecondaryIndex::new(8192)),
registry,
node_pages: RwLock::new(node_pages),
edge_pages: RwLock::new(edge_pages),
current_node_page: AtomicU32::new(0),
current_edge_page: AtomicU32::new(0),
stats: GraphStats::default(),
node_count: AtomicU64::new(node_count),
edge_count: AtomicU64::new(edge_count),
};
store.rebuild_indexes(version)?;
Ok(store)
}
fn rebuild_indexes(&self, version: u32) -> Result<(), GraphStoreError> {
let decode_node = |bytes: &[u8], page_idx: u32, slot: u16| match version {
1 => StoredNode::decode_v1(bytes, page_idx, slot),
_ => StoredNode::decode(bytes, page_idx, slot),
};
let decode_edge = |bytes: &[u8], page_idx: u32, slot: u16| match version {
1 => StoredEdge::decode_v1(bytes, page_idx, slot),
_ => StoredEdge::decode(bytes, page_idx, slot),
};
self.node_secondary.clear();
if let Ok(pages) = self.node_pages.read() {
for (page_idx, page) in pages.iter().enumerate() {
let cell_count = page.cell_count() as usize;
for cell_idx in 0..cell_count {
if let Ok((key, value)) = page.read_cell(cell_idx) {
let id = String::from_utf8_lossy(&key).to_string();
self.node_index.insert(
id.clone(),
RecordLocation {
page_id: page_idx as u32,
slot: cell_idx as u16,
},
);
if let Some(node) = decode_node(&value, page_idx as u32, cell_idx as u16) {
self.node_secondary.insert(&id, node.label_id, &node.label);
}
}
}
}
if !pages.is_empty() {
self.current_node_page
.store((pages.len() - 1) as u32, Ordering::Release);
}
}
if let Ok(pages) = self.edge_pages.read() {
for (page_idx, page) in pages.iter().enumerate() {
let cell_count = page.cell_count() as usize;
for cell_idx in 0..cell_count {
if let Ok((_, value)) = page.read_cell(cell_idx) {
if let Some(edge) = decode_edge(&value, page_idx as u32, cell_idx as u16) {
self.edge_index.add_edge(
&edge.source_id,
&edge.target_id,
&edge.edge_type,
edge.weight,
);
}
}
}
}
if !pages.is_empty() {
self.current_edge_page
.store((pages.len() - 1) as u32, Ordering::Release);
}
}
Ok(())
}
}