mod assignment;
mod catalog;
mod composite_property_index;
mod delete;
mod delete_set;
mod factory_reset;
mod property_index;
mod remove;
mod text_index;
mod vector_index;
use std::sync::Arc;
use roaring::RoaringBitmap;
use selene_core::{
Change, DbString, EdgeId, GraphId, LabelDiff, LabelSet, NodeId, PropertyDiff, PropertyMap,
SchemaChange,
};
use crate::adjacency::AdjacencyEdge;
use crate::error::{GraphError, GraphResult};
use crate::graph_types::{GraphTypeDef, PropertyTypeDef};
use crate::index_provider::{IndexProvider, ProviderTag};
use crate::store::RowIndex;
use crate::type_validator::{EntityId, TypeViolation};
use crate::write_txn::WriteTxn;
pub struct Mutator<'tx, 'g> {
txn: &'tx mut WriteTxn<'g>,
}
impl<'tx, 'g> Mutator<'tx, 'g> {
pub(crate) fn new(txn: &'tx mut WriteTxn<'g>) -> Self {
Self { txn }
}
pub fn create_node(&mut self, labels: LabelSet, mut props: PropertyMap) -> GraphResult<NodeId> {
fill_node_defaults(self.txn.read(), &labels, &mut props)?;
assignment::coerce_node_properties(self.txn.read(), &labels, &mut props)?;
let id = self.txn.allocator.allocate_node();
{
let graph = self.txn.guard_mut();
let row = u32::try_from(graph.node_store.len())
.ok()
.filter(|&row| row != u32::MAX)
.ok_or(GraphError::RowSpaceExhausted {
kind: "node",
rows: graph.node_store.len() as u64,
max_rows: u32::MAX as u64,
})?;
crate::property_index::apply_node_create(
&mut graph.property_index,
&labels,
&props,
row,
)?;
crate::composite_property_index::apply_node_create(
&mut graph.composite_property_index,
&labels,
&props,
row,
)?;
crate::vector_index::apply_node_create(&mut graph.vector_index, &labels, &props, row)?;
crate::text_index::apply_node_create(&mut graph.text_index, &labels, &props, row, id);
graph.node_store.labels.push(labels.clone());
graph.node_store.properties.push(props.clone());
graph.node_store.row_to_id.push(id);
graph.node_store.alive_mut().insert(row);
graph.node_id_to_row.insert(id, RowIndex::new(row));
insert_node_labels(&mut graph.idx_label, row, &labels);
}
self.txn.changes.push(Change::NodeCreated {
id,
labels,
properties: props,
});
Ok(id)
}
pub fn create_edge(
&mut self,
label: DbString,
source: NodeId,
target: NodeId,
mut props: PropertyMap,
) -> GraphResult<EdgeId> {
self.require_live_node(source)?;
self.require_live_node(target)?;
fill_edge_defaults(self.txn.read(), label.clone(), source, target, &mut props)?;
assignment::coerce_edge_properties(
self.txn.read(),
label.clone(),
source,
target,
&mut props,
)?;
let id = self.txn.allocator.allocate_edge();
{
let graph = self.txn.guard_mut();
let row = u32::try_from(graph.edge_store.len())
.ok()
.filter(|&row| row != u32::MAX) .ok_or(GraphError::RowSpaceExhausted {
kind: "edge",
rows: graph.edge_store.len() as u64,
max_rows: u32::MAX as u64,
})?;
graph.edge_store.label.push(label.clone());
graph.edge_store.source.push(source);
graph.edge_store.target.push(target);
graph.edge_store.properties.push(props.clone());
graph.edge_store.row_to_id.push(id);
graph.edge_store.alive_mut().insert(row);
graph.edge_id_to_row.insert(id, RowIndex::new(row));
insert_index_row(&mut graph.idx_edge_label, label.clone(), row);
graph
.adjacency_out
.entry(source)
.or_default()
.add(AdjacencyEdge {
label: label.clone(),
neighbor: target,
edge_id: id,
});
graph
.adjacency_in
.entry(target)
.or_default()
.add(AdjacencyEdge {
label: label.clone(),
neighbor: source,
edge_id: id,
});
}
self.txn.changes.push(Change::EdgeCreated {
id,
label,
source,
target,
properties: props,
});
Ok(id)
}
pub fn update_node(
&mut self,
id: NodeId,
labels_diff: LabelDiff,
mut props_diff: PropertyDiff,
) -> GraphResult<()> {
let row = self.require_live_node(id)?;
let old_labels = self
.txn
.read()
.node_store
.labels
.get(row)
.cloned()
.unwrap_or_default();
let mut labels = old_labels.clone();
for label in labels_diff.added.iter().cloned() {
labels.insert(label);
}
for label in labels_diff.removed.iter() {
labels.remove(label);
}
reject_immutable_node_update(self.txn.read(), id, &old_labels, &props_diff)?;
reject_immutable_node_update(self.txn.read(), id, &labels, &props_diff)?;
assignment::coerce_node_property_diff(self.txn.read(), &labels, &mut props_diff)?;
let old_props = self
.txn
.read()
.node_store
.properties
.get(row)
.cloned()
.unwrap_or_default();
let mut props = old_props.clone();
apply_property_diff(&mut props, &props_diff)?;
{
let graph = self.txn.guard_mut();
crate::property_index::apply_node_update(
&mut graph.property_index,
&old_labels,
&old_props,
&labels,
&props,
row as u32,
)?;
crate::composite_property_index::apply_node_update(
&mut graph.composite_property_index,
&old_labels,
&old_props,
&labels,
&props,
row as u32,
)?;
crate::vector_index::apply_node_update(
&mut graph.vector_index,
&old_labels,
&old_props,
&labels,
&props,
row as u32,
)?;
crate::text_index::apply_node_update(
&mut graph.text_index,
&old_labels,
&old_props,
&labels,
&props,
row as u32,
id,
);
graph.node_store.labels.set(row, labels);
graph.node_store.properties.set(row, props);
for label in labels_diff.added.iter().cloned() {
insert_index_row(&mut graph.idx_label, label, row as u32);
}
for label in labels_diff.removed.iter() {
remove_index_row(&mut graph.idx_label, label, row as u32);
}
}
self.txn.changes.push(Change::NodeUpdated {
id,
labels_diff,
properties_diff: props_diff,
});
Ok(())
}
pub fn update_edge(&mut self, id: EdgeId, mut props_diff: PropertyDiff) -> GraphResult<()> {
let row = self.require_live_edge(id)?;
reject_immutable_edge_update(self.txn.read(), id, &props_diff)?;
assignment::coerce_edge_property_diff(self.txn.read(), id, &mut props_diff)?;
let mut props = self
.txn
.read()
.edge_store
.properties
.get(row)
.cloned()
.unwrap_or_default();
apply_property_diff(&mut props, &props_diff)?;
self.txn.guard_mut().edge_store.properties.set(row, props);
self.txn.changes.push(Change::EdgeUpdated {
id,
properties_diff: props_diff,
});
Ok(())
}
pub fn schema_change(&mut self, graph: GraphId, change: SchemaChange) {
self.txn
.changes
.push(Change::SchemaChanged { graph, change });
}
#[must_use]
pub fn index_provider_by_tag(&self, tag: ProviderTag) -> Option<Arc<dyn IndexProvider>> {
self.txn
.providers
.iter()
.find(|provider| provider.provider_tag() == tag)
.map(Arc::clone)
}
#[must_use]
pub fn read(&self) -> &crate::SeleneGraph {
self.txn.read()
}
fn require_live_node(&self, id: NodeId) -> GraphResult<usize> {
let graph = self.txn.read();
let row = graph
.row_for_node_id(id)
.ok_or(GraphError::NodeNotFound { id })?
.get();
if row as usize >= graph.node_store.len() {
return Err(GraphError::NodeNotFound { id });
}
if !graph.node_store.is_alive(row) {
return Err(GraphError::NodeNotAlive { id });
}
Ok(row as usize)
}
fn require_live_edge(&self, id: EdgeId) -> GraphResult<usize> {
let graph = self.txn.read();
let row = graph
.row_for_edge_id(id)
.ok_or(GraphError::EdgeNotFound { id })?
.get();
if row as usize >= graph.edge_store.len() {
return Err(GraphError::EdgeNotFound { id });
}
if !graph.edge_store.is_alive(row) {
return Err(GraphError::EdgeNotAlive { id });
}
Ok(row as usize)
}
}
fn insert_node_labels(
index: &mut imbl::HashMap<DbString, RoaringBitmap>,
row: u32,
labels: &LabelSet,
) {
for label in labels.iter().cloned() {
insert_index_row(index, label, row);
}
}
fn remove_node_labels(
index: &mut imbl::HashMap<DbString, RoaringBitmap>,
row: u32,
labels: &LabelSet,
) {
for label in labels.iter() {
remove_index_row(index, label, row);
}
}
fn insert_index_row(index: &mut imbl::HashMap<DbString, RoaringBitmap>, label: DbString, row: u32) {
index.entry(label).or_default().insert(row);
}
fn remove_index_row(
index: &mut imbl::HashMap<DbString, RoaringBitmap>,
label: &DbString,
row: u32,
) {
let now_empty = match index.get_mut(label) {
Some(bitmap) => {
bitmap.remove(row);
bitmap.is_empty()
}
None => false,
};
if now_empty {
index.remove(label);
}
}
fn fill_node_defaults(
graph: &crate::SeleneGraph,
labels: &LabelSet,
props: &mut PropertyMap,
) -> GraphResult<()> {
let Some(graph_type) = graph.meta.bound_type.as_deref() else {
return Ok(());
};
let Some(node_type) = graph_type.find_node_type(labels) else {
return Ok(());
};
fill_property_defaults(&node_type.properties, props)
}
fn fill_edge_defaults(
graph: &crate::SeleneGraph,
label: DbString,
source: NodeId,
target: NodeId,
props: &mut PropertyMap,
) -> GraphResult<()> {
let Some(graph_type) = graph.meta.bound_type.as_deref() else {
return Ok(());
};
let Some(source_type) = node_type_index_for_node(graph, graph_type, source) else {
return Ok(());
};
let Some(target_type) = node_type_index_for_node(graph, graph_type, target) else {
return Ok(());
};
let Some(edge_type) = graph_type.find_edge_type(label, source_type, target_type) else {
return Ok(());
};
fill_property_defaults(&edge_type.properties, props)
}
fn fill_property_defaults(
declarations: &[PropertyTypeDef],
props: &mut PropertyMap,
) -> GraphResult<()> {
for declaration in declarations {
if props.contains_key(&declaration.name) {
continue;
}
if let Some(default) = &declaration.default {
props.set(declaration.name.clone(), default.to_value()?)?;
}
}
Ok(())
}
fn reject_immutable_node_update(
graph: &crate::SeleneGraph,
id: NodeId,
labels: &LabelSet,
diff: &PropertyDiff,
) -> GraphResult<()> {
let Some(graph_type) = graph.meta.bound_type.as_deref() else {
return Ok(());
};
let Some(node_type) = graph_type.find_node_type(labels) else {
return Ok(());
};
reject_immutable_property_update(
EntityId::Node(id),
node_type.name.clone(),
&node_type.properties,
diff,
)
}
fn reject_immutable_edge_update(
graph: &crate::SeleneGraph,
id: EdgeId,
diff: &PropertyDiff,
) -> GraphResult<()> {
let Some(graph_type) = graph.meta.bound_type.as_deref() else {
return Ok(());
};
let Some(label) = graph.edge_label(id).cloned() else {
return Ok(());
};
let Some((source, target)) = graph.edge_endpoints(id) else {
return Ok(());
};
let Some(source_type) = node_type_index_for_node(graph, graph_type, source) else {
return Ok(());
};
let Some(target_type) = node_type_index_for_node(graph, graph_type, target) else {
return Ok(());
};
let Some(edge_type) = graph_type.find_edge_type(label, source_type, target_type) else {
return Ok(());
};
reject_immutable_property_update(
EntityId::Edge(id),
edge_type.name.clone(),
&edge_type.properties,
diff,
)
}
fn node_type_index_for_node(
graph: &crate::SeleneGraph,
graph_type: &GraphTypeDef,
node: NodeId,
) -> Option<u32> {
let labels = graph.node_labels(node)?;
graph_type.find_node_type_index(labels)
}
fn reject_immutable_property_update(
entity_id: EntityId,
declared_in: DbString,
declarations: &[PropertyTypeDef],
diff: &PropertyDiff,
) -> GraphResult<()> {
for (key, _) in &diff.set {
reject_if_immutable(entity_id, declared_in.clone(), declarations, key.clone())?;
}
for key in &diff.removed {
reject_if_immutable(entity_id, declared_in.clone(), declarations, key.clone())?;
}
Ok(())
}
fn reject_if_immutable(
entity_id: EntityId,
declared_in: DbString,
declarations: &[PropertyTypeDef],
property: DbString,
) -> GraphResult<()> {
if declarations
.iter()
.any(|declaration| declaration.name == property && declaration.immutable)
{
return Err(GraphError::TypeViolation(
TypeViolation::ImmutablePropertyUpdate {
entity_id,
property,
declared_in,
},
));
}
Ok(())
}
fn apply_property_diff(map: &mut PropertyMap, diff: &PropertyDiff) -> GraphResult<()> {
for (key, value) in diff.set.iter() {
map.set(key.clone(), value.clone())?;
}
for key in diff.removed.iter() {
map.remove(key);
}
Ok(())
}
#[cfg(test)]
mod tests;
#[cfg(test)]
mod id_map_tests;
#[cfg(test)]
mod row_cap_tests;
#[cfg(test)]
mod truncate_tests;
#[cfg(test)]
mod factory_reset_tests;
#[cfg(test)]
mod hub_delete_tests;
#[cfg(test)]
mod delete_set_tests;
#[cfg(test)]
mod payload_clear_tests;