use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::Bound;
use std::path::Path;
use std::sync::Arc;
use parking_lot::RwLock;
use roaring::RoaringBitmap;
use crate::error::StorageError;
#[cfg(feature = "gql")]
use crate::gql::{self, GqlError};
use crate::graph_elements::{GraphEdge, GraphVertex, PersistentEdge, PersistentVertex};
use crate::index::{
BTreeIndex, ElementType, IndexError, IndexSpec, IndexType, PropertyIndex, RTreeIndex,
UniqueIndex,
};
use crate::schema::GraphSchema;
use crate::storage::cow::{EdgeData, GraphState, NodeData};
use crate::storage::interner::StringInterner;
use crate::storage::mmap::MmapGraph;
use crate::storage::{Edge, GraphStorage, StreamableStorage, Vertex};
use crate::traversal::markers::{Edge as EdgeMarker, OutputMarker, Scalar, Vertex as VertexMarker};
use crate::traversal::mutation::{DropStep, PendingMutation, PropertyStep};
use crate::traversal::step::Step;
use crate::traversal::{
ExecutionContext, HasLabelStep, HasStep, HasValueStep, IdStep, InEStep, InStep, InVStep,
LabelStep, LimitStep, OutEStep, OutStep, OutVStep, SkipStep, Traversal, TraversalSource,
Traverser, ValuesStep,
};
use crate::value::{EdgeId, Value, VertexId};
pub struct CowMmapGraph {
mmap: MmapGraph,
state: Arc<RwLock<GraphState>>,
schema: RwLock<Option<GraphSchema>>,
indexes: RwLock<HashMap<String, Box<dyn PropertyIndex>>>,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
event_bus: std::sync::Arc<crate::storage::events::EventBus>,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
subscription_manager: std::sync::Arc<crate::traversal::reactive::SubscriptionManager>,
}
impl CowMmapGraph {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, StorageError> {
let mmap = MmapGraph::open(path)?;
let state = Self::load_state_from_mmap(&mmap);
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let event_bus = std::sync::Arc::new(crate::storage::events::EventBus::new());
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let subscription_manager = {
let eb = event_bus.clone();
std::sync::Arc::new(crate::traversal::reactive::SubscriptionManager::new(
std::sync::Arc::new(move || eb.subscribe()),
))
};
Ok(Self {
mmap,
state: Arc::new(RwLock::new(state)),
schema: RwLock::new(None),
indexes: RwLock::new(HashMap::new()),
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
event_bus,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
subscription_manager,
})
}
pub fn open_with_schema<P: AsRef<Path>>(
path: P,
schema: GraphSchema,
) -> Result<Self, StorageError> {
let mmap = MmapGraph::open(path)?;
let state = Self::load_state_from_mmap(&mmap);
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let event_bus = std::sync::Arc::new(crate::storage::events::EventBus::new());
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let subscription_manager = {
let eb = event_bus.clone();
std::sync::Arc::new(crate::traversal::reactive::SubscriptionManager::new(
std::sync::Arc::new(move || eb.subscribe()),
))
};
Ok(Self {
mmap,
state: Arc::new(RwLock::new(state)),
schema: RwLock::new(Some(schema)),
indexes: RwLock::new(HashMap::new()),
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
event_bus,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
subscription_manager,
})
}
fn load_state_from_mmap(mmap: &MmapGraph) -> GraphState {
let mut state = GraphState::new();
let mut max_vertex_id: u64 = 0;
let mut max_edge_id: u64 = 0;
for vertex in mmap.all_vertices() {
let label_id = state.interner.write().intern(&vertex.label);
let node = Arc::new(NodeData {
id: vertex.id,
label_id,
properties: vertex.properties,
out_edges: Vec::new(), in_edges: Vec::new(),
});
state.vertices = state.vertices.update(vertex.id, node);
if vertex.id.0 >= max_vertex_id {
max_vertex_id = vertex.id.0 + 1;
}
let bitmap = state
.vertex_labels
.get(&label_id)
.cloned()
.unwrap_or_else(|| Arc::new(RoaringBitmap::new()));
let mut new_bitmap = (*bitmap).clone();
new_bitmap.insert(vertex.id.0 as u32);
state.vertex_labels = state.vertex_labels.update(label_id, Arc::new(new_bitmap));
}
for edge in mmap.all_edges() {
let label_id = state.interner.write().intern(&edge.label);
let edge_data = Arc::new(EdgeData {
id: edge.id,
label_id,
src: edge.src,
dst: edge.dst,
properties: edge.properties,
});
state.edges = state.edges.update(edge.id, edge_data);
if edge.id.0 >= max_edge_id {
max_edge_id = edge.id.0 + 1;
}
let bitmap = state
.edge_labels
.get(&label_id)
.cloned()
.unwrap_or_else(|| Arc::new(RoaringBitmap::new()));
let mut new_bitmap = (*bitmap).clone();
new_bitmap.insert(edge.id.0 as u32);
state.edge_labels = state.edge_labels.update(label_id, Arc::new(new_bitmap));
if let Some(src_node) = state.vertices.get(&edge.src) {
let mut new_node = (**src_node).clone();
new_node.out_edges.push(edge.id);
state.vertices = state.vertices.update(edge.src, Arc::new(new_node));
}
if let Some(dst_node) = state.vertices.get(&edge.dst) {
let mut new_node = (**dst_node).clone();
new_node.in_edges.push(edge.id);
state.vertices = state.vertices.update(edge.dst, Arc::new(new_node));
}
}
state.next_vertex_id = max_vertex_id;
state.next_edge_id = max_edge_id;
state.version = 0;
state
}
pub fn snapshot(&self) -> CowMmapSnapshot {
let state = self.state.read();
let interner_snapshot = Arc::new(state.interner.read().clone());
CowMmapSnapshot {
state: Arc::new((*state).clone()),
interner_snapshot,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
subscription_manager: self.subscription_manager.clone(),
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
snapshot_fn: {
let state_arc = self.state.clone();
std::sync::Arc::new(move || {
let state = state_arc.read();
let interner_snapshot = Arc::new(state.interner.read().clone());
let snap = CowMmapSnapshot {
state: Arc::new((*state).clone()),
interner_snapshot,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
subscription_manager: std::sync::Arc::new(
crate::traversal::reactive::SubscriptionManager::placeholder(),
),
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
snapshot_fn: std::sync::Arc::new(|| {
panic!("nested snapshot_fn not supported")
}),
};
Box::new(snap)
})
},
}
}
pub fn gremlin(&self, graph_arc: Arc<CowMmapGraph>) -> CowMmapTraversalSource<'_> {
CowMmapTraversalSource::new_with_arc(self, graph_arc)
}
pub fn version(&self) -> u64 {
self.state.read().version
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
pub fn event_bus(&self) -> &crate::storage::events::EventBus {
&self.event_bus
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
pub fn subscription_manager(
&self,
) -> &std::sync::Arc<crate::traversal::reactive::SubscriptionManager> {
&self.subscription_manager
}
pub fn vertex_count(&self) -> u64 {
self.state.read().vertices.len() as u64
}
pub fn edge_count(&self) -> u64 {
self.state.read().edges.len() as u64
}
pub fn get_vertex(&self, id: VertexId) -> Option<Vertex> {
let state = self.state.read();
state.vertices.get(&id).map(|node| {
let label = state
.interner
.read()
.resolve(node.label_id)
.unwrap_or("")
.to_string();
Vertex {
id: node.id,
label,
properties: node.properties.clone(),
}
})
}
pub fn get_edge(&self, id: EdgeId) -> Option<Edge> {
let state = self.state.read();
state.edges.get(&id).map(|edge| {
let label = state
.interner
.read()
.resolve(edge.label_id)
.unwrap_or("")
.to_string();
Edge {
id: edge.id,
label,
src: edge.src,
dst: edge.dst,
properties: edge.properties.clone(),
}
})
}
pub fn schema(&self) -> Option<GraphSchema> {
self.schema.read().clone()
}
pub fn set_schema(&self, schema: Option<GraphSchema>) {
*self.schema.write() = schema;
}
pub fn create_index(&self, spec: IndexSpec) -> Result<(), IndexError> {
let mut indexes = self.indexes.write();
if indexes.contains_key(&spec.name) {
return Err(IndexError::AlreadyExists(spec.name.clone()));
}
let mut index: Box<dyn PropertyIndex> = match spec.index_type {
IndexType::BTree => Box::new(BTreeIndex::new(spec.clone())?),
IndexType::Unique => Box::new(UniqueIndex::new(spec.clone())?),
IndexType::RTree => Box::new(RTreeIndex::new(spec.clone())?),
};
let state = self.state.read();
Self::populate_index_internal(&state, &mut *index)?;
indexes.insert(spec.name.clone(), index);
Ok(())
}
pub fn drop_index(&self, name: &str) -> Result<(), IndexError> {
self.indexes
.write()
.remove(name)
.map(|_| ())
.ok_or_else(|| IndexError::NotFound(name.to_string()))
}
pub fn list_indexes(&self) -> Vec<IndexSpec> {
self.indexes
.read()
.values()
.map(|idx| idx.spec().clone())
.collect()
}
pub fn has_index(&self, name: &str) -> bool {
self.indexes.read().contains_key(name)
}
pub fn index_count(&self) -> usize {
self.indexes.read().len()
}
pub fn supports_indexes(&self) -> bool {
true
}
pub fn vertices_by_property(
&self,
label: Option<&str>,
property: &str,
value: &Value,
) -> Box<dyn Iterator<Item = Vertex> + '_> {
let indexes = self.indexes.read();
let state = self.state.read();
for index in indexes.values() {
let spec = index.spec();
if spec.element_type != ElementType::Vertex {
continue;
}
if spec.property != property {
continue;
}
match (&spec.label, label) {
(Some(idx_label), Some(filter_label)) if idx_label != filter_label => continue,
(Some(_), None) => continue, _ => {}
}
let ids: Vec<u64> = index.lookup_eq(value).collect();
let label_owned = label.map(|s| s.to_string());
drop(indexes);
drop(state);
return Box::new(
ids.into_iter()
.filter_map(move |id| {
let state = self.state.read();
let node = state.vertices.get(&VertexId(id))?;
let label = state.interner.read().resolve(node.label_id)?.to_string();
Some(Vertex {
id: node.id,
label,
properties: node.properties.clone(),
})
})
.filter(move |v| {
label_owned.is_none() || Some(v.label.as_str()) == label_owned.as_deref()
}),
);
}
drop(indexes);
drop(state);
let label_owned = label.map(|s| s.to_string());
let property_owned = property.to_string();
let value_clone = value.clone();
let vertices: Vec<Vertex> = self
.snapshot()
.all_vertices()
.filter(move |v| {
if let Some(ref l) = label_owned {
if &v.label != l {
return false;
}
}
v.properties.get(&property_owned) == Some(&value_clone)
})
.collect();
Box::new(vertices.into_iter())
}
pub fn edges_by_property(
&self,
label: Option<&str>,
property: &str,
value: &Value,
) -> Box<dyn Iterator<Item = Edge> + '_> {
let indexes = self.indexes.read();
let state = self.state.read();
for index in indexes.values() {
let spec = index.spec();
if spec.element_type != ElementType::Edge {
continue;
}
if spec.property != property {
continue;
}
match (&spec.label, label) {
(Some(idx_label), Some(filter_label)) if idx_label != filter_label => continue,
(Some(_), None) => continue, _ => {}
}
let ids: Vec<u64> = index.lookup_eq(value).collect();
let label_owned = label.map(|s| s.to_string());
drop(indexes);
drop(state);
return Box::new(
ids.into_iter()
.filter_map(move |id| {
let state = self.state.read();
let edge = state.edges.get(&EdgeId(id))?;
let label = state.interner.read().resolve(edge.label_id)?.to_string();
Some(Edge {
id: edge.id,
label,
src: edge.src,
dst: edge.dst,
properties: edge.properties.clone(),
})
})
.filter(move |e| {
label_owned.is_none() || Some(e.label.as_str()) == label_owned.as_deref()
}),
);
}
drop(indexes);
drop(state);
let label_owned = label.map(|s| s.to_string());
let property_owned = property.to_string();
let value_clone = value.clone();
let edges: Vec<Edge> = self
.snapshot()
.all_edges()
.filter(move |e| {
if let Some(ref l) = label_owned {
if &e.label != l {
return false;
}
}
e.properties.get(&property_owned) == Some(&value_clone)
})
.collect();
Box::new(edges.into_iter())
}
pub fn vertices_by_property_range(
&self,
label: Option<&str>,
property: &str,
start: Bound<&Value>,
end: Bound<&Value>,
) -> Box<dyn Iterator<Item = Vertex> + '_> {
let indexes = self.indexes.read();
for index in indexes.values() {
let spec = index.spec();
if spec.element_type != ElementType::Vertex {
continue;
}
if spec.property != property {
continue;
}
if spec.index_type != IndexType::BTree {
continue;
}
match (&spec.label, label) {
(Some(idx_label), Some(filter_label)) if idx_label != filter_label => continue,
(Some(_), None) => continue, _ => {}
}
let ids: Vec<u64> = index.lookup_range(start, end).collect();
let label_owned = label.map(|s| s.to_string());
drop(indexes);
return Box::new(
ids.into_iter()
.filter_map(move |id| {
let state = self.state.read();
let node = state.vertices.get(&VertexId(id))?;
let label = state.interner.read().resolve(node.label_id)?.to_string();
Some(Vertex {
id: node.id,
label,
properties: node.properties.clone(),
})
})
.filter(move |v| {
label_owned.is_none() || Some(v.label.as_str()) == label_owned.as_deref()
}),
);
}
drop(indexes);
let label_owned = label.map(|s| s.to_string());
let property_owned = property.to_string();
let start_clone = match start {
Bound::Included(v) => Bound::Included(v.clone()),
Bound::Excluded(v) => Bound::Excluded(v.clone()),
Bound::Unbounded => Bound::Unbounded,
};
let end_clone = match end {
Bound::Included(v) => Bound::Included(v.clone()),
Bound::Excluded(v) => Bound::Excluded(v.clone()),
Bound::Unbounded => Bound::Unbounded,
};
let vertices: Vec<Vertex> = self
.snapshot()
.all_vertices()
.filter(move |v| {
if let Some(ref l) = label_owned {
if &v.label != l {
return false;
}
}
if let Some(prop_value) = v.properties.get(&property_owned) {
let prop_cmp = prop_value.to_comparable();
let in_start = match &start_clone {
Bound::Included(s) => prop_cmp >= s.to_comparable(),
Bound::Excluded(s) => prop_cmp > s.to_comparable(),
Bound::Unbounded => true,
};
let in_end = match &end_clone {
Bound::Included(e) => prop_cmp <= e.to_comparable(),
Bound::Excluded(e) => prop_cmp < e.to_comparable(),
Bound::Unbounded => true,
};
in_start && in_end
} else {
false
}
})
.collect();
Box::new(vertices.into_iter())
}
fn populate_index_internal(
state: &GraphState,
index: &mut dyn PropertyIndex,
) -> Result<(), IndexError> {
let spec = index.spec().clone();
match spec.element_type {
ElementType::Vertex => {
for (id, node) in state.vertices.iter() {
if let Some(ref label) = spec.label {
let node_label = state
.interner
.read()
.resolve(node.label_id)
.map(|s| s.to_string());
if node_label.as_deref() != Some(label.as_str()) {
continue;
}
}
if let Some(value) = node.properties.get(&spec.property) {
index.insert(value.clone(), id.0)?;
}
}
}
ElementType::Edge => {
for (id, edge) in state.edges.iter() {
if let Some(ref label) = spec.label {
let edge_label = state
.interner
.read()
.resolve(edge.label_id)
.map(|s| s.to_string());
if edge_label.as_deref() != Some(label.as_str()) {
continue;
}
}
if let Some(value) = edge.properties.get(&spec.property) {
index.insert(value.clone(), id.0)?;
}
}
}
}
Ok(())
}
fn index_vertex_insert(&self, id: VertexId, label: &str, properties: &HashMap<String, Value>) {
let mut indexes = self.indexes.write();
for index in indexes.values_mut() {
let spec = index.spec();
if spec.element_type != ElementType::Vertex {
continue;
}
if let Some(ref idx_label) = spec.label {
if idx_label != label {
continue;
}
}
if let Some(value) = properties.get(&spec.property) {
let _ = index.insert(value.clone(), id.0);
}
}
}
fn index_vertex_remove(&self, id: VertexId, label: &str, properties: &HashMap<String, Value>) {
let mut indexes = self.indexes.write();
for index in indexes.values_mut() {
let spec = index.spec();
if spec.element_type != ElementType::Vertex {
continue;
}
if let Some(ref idx_label) = spec.label {
if idx_label != label {
continue;
}
}
if let Some(value) = properties.get(&spec.property) {
let _ = index.remove(value, id.0);
}
}
}
fn index_edge_insert(&self, id: EdgeId, label: &str, properties: &HashMap<String, Value>) {
let mut indexes = self.indexes.write();
for index in indexes.values_mut() {
let spec = index.spec();
if spec.element_type != ElementType::Edge {
continue;
}
if let Some(ref idx_label) = spec.label {
if idx_label != label {
continue;
}
}
if let Some(value) = properties.get(&spec.property) {
let _ = index.insert(value.clone(), id.0);
}
}
}
fn index_edge_remove(&self, id: EdgeId, label: &str, properties: &HashMap<String, Value>) {
let mut indexes = self.indexes.write();
for index in indexes.values_mut() {
let spec = index.spec();
if spec.element_type != ElementType::Edge {
continue;
}
if let Some(ref idx_label) = spec.label {
if idx_label != label {
continue;
}
}
if let Some(value) = properties.get(&spec.property) {
let _ = index.remove(value, id.0);
}
}
}
fn update_vertex_property_in_indexes(
&self,
id: VertexId,
label: &str,
property: &str,
old_value: Option<&Value>,
new_value: &Value,
) -> Result<(), StorageError> {
let mut indexes = self.indexes.write();
for index in indexes.values_mut() {
let spec = index.spec();
if spec.element_type != ElementType::Vertex {
continue;
}
if spec.property != property {
continue;
}
if let Some(ref idx_label) = spec.label {
if idx_label != label {
continue;
}
}
if let Some(old) = old_value {
let _ = index.remove(old, id.0);
}
index
.insert(new_value.clone(), id.0)
.map_err(|e| StorageError::IndexError(e.to_string()))?;
}
Ok(())
}
fn update_edge_property_in_indexes(
&self,
id: EdgeId,
label: &str,
property: &str,
old_value: Option<&Value>,
new_value: &Value,
) -> Result<(), StorageError> {
let mut indexes = self.indexes.write();
for index in indexes.values_mut() {
let spec = index.spec();
if spec.element_type != ElementType::Edge {
continue;
}
if spec.property != property {
continue;
}
if let Some(ref idx_label) = spec.label {
if idx_label != label {
continue;
}
}
if let Some(old) = old_value {
let _ = index.remove(old, id.0);
}
index
.insert(new_value.clone(), id.0)
.map_err(|e| StorageError::IndexError(e.to_string()))?;
}
Ok(())
}
pub fn add_vertex(
&self,
label: &str,
properties: HashMap<String, Value>,
) -> Result<VertexId, StorageError> {
let id = self.mmap.add_vertex(label, properties.clone())?;
let mut state = self.state.write();
if id.0 >= state.next_vertex_id {
state.next_vertex_id = id.0 + 1;
}
let label_id = state.interner.write().intern(label);
let node = Arc::new(NodeData {
id,
label_id,
properties: properties.clone(),
out_edges: Vec::new(),
in_edges: Vec::new(),
});
state.vertices = state.vertices.update(id, node);
let bitmap = state
.vertex_labels
.get(&label_id)
.cloned()
.unwrap_or_else(|| Arc::new(RoaringBitmap::new()));
let mut new_bitmap = (*bitmap).clone();
new_bitmap.insert(id.0 as u32);
state.vertex_labels = state.vertex_labels.update(label_id, Arc::new(new_bitmap));
state.version += 1;
drop(state);
self.index_vertex_insert(id, label, &properties);
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if self.event_bus.subscriber_count() > 0 {
self.event_bus.emit(crate::storage::events::GraphEvent::VertexAdded {
id,
label: label.to_string(),
properties,
});
}
Ok(id)
}
pub fn add_edge(
&self,
src: VertexId,
dst: VertexId,
label: &str,
properties: HashMap<String, Value>,
) -> Result<EdgeId, StorageError> {
{
let state = self.state.read();
if !state.vertices.contains_key(&src) {
return Err(StorageError::VertexNotFound(src));
}
if !state.vertices.contains_key(&dst) {
return Err(StorageError::VertexNotFound(dst));
}
}
let id = self.mmap.add_edge(src, dst, label, properties.clone())?;
let mut state = self.state.write();
if id.0 >= state.next_edge_id {
state.next_edge_id = id.0 + 1;
}
let label_id = state.interner.write().intern(label);
let edge = Arc::new(EdgeData {
id,
label_id,
src,
dst,
properties: properties.clone(),
});
state.edges = state.edges.update(id, edge);
let bitmap = state
.edge_labels
.get(&label_id)
.cloned()
.unwrap_or_else(|| Arc::new(RoaringBitmap::new()));
let mut new_bitmap = (*bitmap).clone();
new_bitmap.insert(id.0 as u32);
state.edge_labels = state.edge_labels.update(label_id, Arc::new(new_bitmap));
if let Some(src_node) = state.vertices.get(&src) {
let mut new_node = (**src_node).clone();
new_node.out_edges.push(id);
state.vertices = state.vertices.update(src, Arc::new(new_node));
}
if let Some(dst_node) = state.vertices.get(&dst) {
let mut new_node = (**dst_node).clone();
new_node.in_edges.push(id);
state.vertices = state.vertices.update(dst, Arc::new(new_node));
}
state.version += 1;
drop(state);
self.index_edge_insert(id, label, &properties);
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if self.event_bus.subscriber_count() > 0 {
self.event_bus.emit(crate::storage::events::GraphEvent::EdgeAdded {
id,
src,
dst,
label: label.to_string(),
properties,
});
}
Ok(id)
}
pub fn set_vertex_property(
&self,
id: VertexId,
key: &str,
value: Value,
) -> Result<(), StorageError> {
let mut state = self.state.write();
let node = state
.vertices
.get(&id)
.ok_or(StorageError::VertexNotFound(id))?;
let label = state
.interner
.read()
.resolve(node.label_id)
.map(|s| s.to_string())
.unwrap_or_default();
let old_value = node.properties.get(key).cloned();
let mut new_node = (**node).clone();
new_node.properties.insert(key.to_string(), value.clone());
state.vertices = state.vertices.update(id, Arc::new(new_node));
state.version += 1;
drop(state);
self.update_vertex_property_in_indexes(id, &label, key, old_value.as_ref(), &value)?;
self.mmap.set_vertex_property(id, key, value.clone())?;
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if self.event_bus.subscriber_count() > 0 {
self.event_bus.emit(crate::storage::events::GraphEvent::VertexPropertyChanged {
id,
key: key.to_string(),
old_value,
new_value: value,
});
}
Ok(())
}
pub fn set_edge_property(
&self,
id: EdgeId,
key: &str,
value: Value,
) -> Result<(), StorageError> {
let mut state = self.state.write();
let edge = state.edges.get(&id).ok_or(StorageError::EdgeNotFound(id))?;
let label = state
.interner
.read()
.resolve(edge.label_id)
.map(|s| s.to_string())
.unwrap_or_default();
let old_value = edge.properties.get(key).cloned();
let mut new_edge = (**edge).clone();
new_edge.properties.insert(key.to_string(), value.clone());
state.edges = state.edges.update(id, Arc::new(new_edge));
state.version += 1;
drop(state);
self.update_edge_property_in_indexes(id, &label, key, old_value.as_ref(), &value)?;
self.mmap.set_edge_property(id, key, value.clone())?;
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if self.event_bus.subscriber_count() > 0 {
self.event_bus.emit(crate::storage::events::GraphEvent::EdgePropertyChanged {
id,
key: key.to_string(),
old_value,
new_value: value,
});
}
Ok(())
}
pub fn remove_vertex(&self, id: VertexId) -> Result<(), StorageError> {
let mut state = self.state.write();
let node = state
.vertices
.get(&id)
.ok_or(StorageError::VertexNotFound(id))?
.clone();
let label = state
.interner
.read()
.resolve(node.label_id)
.map(|s| s.to_string())
.unwrap_or_default();
let properties = node.properties.clone();
#[allow(clippy::type_complexity)]
let edges_to_remove: Vec<(EdgeId, VertexId, VertexId, String, HashMap<String, Value>)> = node
.out_edges
.iter()
.chain(node.in_edges.iter())
.filter_map(|&edge_id| {
state.edges.get(&edge_id).map(|e| {
let edge_label = state
.interner
.read()
.resolve(e.label_id)
.map(|s| s.to_string())
.unwrap_or_default();
(edge_id, e.src, e.dst, edge_label, e.properties.clone())
})
})
.collect();
for (edge_id, _, _, _, _) in &edges_to_remove {
let edge_info = state.edges.get(edge_id).map(|e| (e.label_id, e.src, e.dst));
if let Some((label_id, src, dst)) = edge_info {
if let Some(bitmap) = state.edge_labels.get(&label_id) {
let mut new_bitmap = (**bitmap).clone();
new_bitmap.remove(edge_id.0 as u32);
state.edge_labels = state.edge_labels.update(label_id, Arc::new(new_bitmap));
}
let other_vertex = if src == id { dst } else { src };
if let Some(other_node) = state.vertices.get(&other_vertex) {
let mut new_node = (**other_node).clone();
new_node.out_edges.retain(|e| e != edge_id);
new_node.in_edges.retain(|e| e != edge_id);
state.vertices = state.vertices.update(other_vertex, Arc::new(new_node));
}
state.edges = state.edges.without(edge_id);
}
}
if let Some(bitmap) = state.vertex_labels.get(&node.label_id) {
let mut new_bitmap = (**bitmap).clone();
new_bitmap.remove(id.0 as u32);
state.vertex_labels = state
.vertex_labels
.update(node.label_id, Arc::new(new_bitmap));
}
state.vertices = state.vertices.without(&id);
state.version += 1;
drop(state);
self.index_vertex_remove(id, &label, &properties);
for (edge_id, _edge_src, _edge_dst, edge_label, edge_props) in &edges_to_remove {
self.index_edge_remove(*edge_id, edge_label, edge_props);
}
self.mmap.remove_vertex(id)?;
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if self.event_bus.subscriber_count() > 0 {
for (edge_id, edge_src, edge_dst, edge_label, _) in edges_to_remove {
self.event_bus.emit(crate::storage::events::GraphEvent::EdgeRemoved {
id: edge_id,
src: edge_src,
dst: edge_dst,
label: edge_label,
});
}
self.event_bus.emit(crate::storage::events::GraphEvent::VertexRemoved {
id,
label,
});
}
Ok(())
}
pub fn remove_edge(&self, id: EdgeId) -> Result<(), StorageError> {
let mut state = self.state.write();
let edge = state
.edges
.get(&id)
.ok_or(StorageError::EdgeNotFound(id))?
.clone();
let label = state
.interner
.read()
.resolve(edge.label_id)
.map(|s| s.to_string())
.unwrap_or_default();
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let (src, dst) = (edge.src, edge.dst);
let properties = edge.properties.clone();
if let Some(src_node) = state.vertices.get(&edge.src) {
let mut new_node = (**src_node).clone();
new_node.out_edges.retain(|e| *e != id);
state.vertices = state.vertices.update(edge.src, Arc::new(new_node));
}
if let Some(dst_node) = state.vertices.get(&edge.dst) {
let mut new_node = (**dst_node).clone();
new_node.in_edges.retain(|e| *e != id);
state.vertices = state.vertices.update(edge.dst, Arc::new(new_node));
}
if let Some(bitmap) = state.edge_labels.get(&edge.label_id) {
let mut new_bitmap = (**bitmap).clone();
new_bitmap.remove(id.0 as u32);
state.edge_labels = state
.edge_labels
.update(edge.label_id, Arc::new(new_bitmap));
}
state.edges = state.edges.without(&id);
state.version += 1;
drop(state);
self.index_edge_remove(id, &label, &properties);
self.mmap.remove_edge(id)?;
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if self.event_bus.subscriber_count() > 0 {
self.event_bus.emit(crate::storage::events::GraphEvent::EdgeRemoved {
id, src, dst, label,
});
}
Ok(())
}
pub fn batch<F, T>(&self, f: F) -> Result<T, BatchError>
where
F: FnOnce(&mut CowMmapBatchContext) -> Result<T, BatchError>,
{
self.mmap.begin_batch().map_err(BatchError::Storage)?;
let pending_state = self.state.read().clone();
let mut ctx = CowMmapBatchContext {
graph: self,
pending_state,
operations: Vec::new(),
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
pending_events: Vec::new(),
};
match f(&mut ctx) {
Ok(result) => {
for op in &ctx.operations {
self.apply_operation_to_mmap(op)
.map_err(BatchError::Storage)?;
}
self.mmap.commit_batch().map_err(BatchError::Storage)?;
*self.state.write() = ctx.pending_state;
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if !ctx.pending_events.is_empty() && self.event_bus.subscriber_count() > 0 {
self.event_bus.emit(crate::storage::events::GraphEvent::Batch(ctx.pending_events));
}
Ok(result)
}
Err(e) => {
let _ = self.mmap.abort_batch();
Err(e)
}
}
}
fn apply_operation_to_mmap(&self, op: &BatchOperation) -> Result<(), StorageError> {
match op {
BatchOperation::AddVertex { label, properties } => {
self.mmap.add_vertex(label, properties.clone())?;
}
BatchOperation::AddEdge {
src,
dst,
label,
properties,
} => {
self.mmap.add_edge(*src, *dst, label, properties.clone())?;
}
BatchOperation::SetVertexProperty { id, key, value } => {
self.mmap.set_vertex_property(*id, key, value.clone())?;
}
BatchOperation::SetEdgeProperty { id, key, value } => {
self.mmap.set_edge_property(*id, key, value.clone())?;
}
BatchOperation::RemoveVertex { id } => {
self.mmap.remove_vertex(*id)?;
}
BatchOperation::RemoveEdge { id } => {
self.mmap.remove_edge(*id)?;
}
}
Ok(())
}
#[cfg(feature = "gql")]
pub fn gql(&self, query: &str) -> Result<Vec<Value>, GqlError> {
let stmt = gql::parse_statement(query)?;
if stmt.is_read_only() {
let snapshot = self.snapshot();
gql::compile_statement(&stmt, &snapshot).map_err(GqlError::Compile)
} else {
let mut wrapper = CowMmapGraphMutWrapper { graph: self };
let schema = self.schema();
gql::execute_mutation_with_schema(&stmt, &mut wrapper, schema.as_ref())
.map_err(|e| GqlError::Mutation(e.to_string()))
}
}
#[cfg(feature = "gql")]
pub fn gql_with_params(
&self,
query: &str,
params: &gql::Parameters,
) -> Result<Vec<Value>, GqlError> {
let stmt = gql::parse_statement(query)?;
if stmt.is_read_only() {
let snapshot = self.snapshot();
gql::compile_statement_with_params(&stmt, &snapshot, params).map_err(GqlError::Compile)
} else {
Err(GqlError::Mutation(
"Parameterized mutations are not yet supported".into(),
))
}
}
#[cfg(feature = "gremlin")]
pub fn query(
&self,
query: &str,
) -> Result<crate::gremlin::ExecutionResult, crate::gremlin::GremlinError> {
self.snapshot().query(query)
}
#[cfg(feature = "gremlin")]
pub fn mutate(
&self,
query: &str,
) -> Result<crate::gremlin::ExecutionResult, crate::gremlin::GremlinError> {
use crate::gremlin::{compile, parse, ExecutionResult};
use crate::traversal::mutation::PendingMutation;
let ast = parse(query)?;
let snapshot = self.snapshot();
let g = snapshot.gremlin();
let compiled = compile(&ast, &g)?;
let terminal = compiled.terminal().cloned();
let raw_values = compiled.traversal.to_list();
let mut final_results = Vec::with_capacity(raw_values.len());
for value in raw_values {
if let Some(mutation) = PendingMutation::from_value(&value) {
let extract_id = value
.as_map()
.map(|m| m.contains_key("__extract_id"))
.unwrap_or(false);
let result = match mutation {
PendingMutation::AddVertex { label, properties } => {
match self.add_vertex(&label, properties) {
Ok(id) => Some(Value::Vertex(id)),
Err(_) => None,
}
}
PendingMutation::AddEdge {
label,
from,
to,
properties,
} => match self.add_edge(from, to, &label, properties) {
Ok(id) => Some(Value::Edge(id)),
Err(_) => None,
},
PendingMutation::SetVertexProperty { id, key, value } => {
if self.set_vertex_property(id, &key, value).is_ok() {
Some(Value::Vertex(id))
} else {
None
}
}
PendingMutation::SetEdgeProperty { id, key, value } => {
if self.set_edge_property(id, &key, value).is_ok() {
Some(Value::Edge(id))
} else {
None
}
}
PendingMutation::DropVertex { id } => {
let _ = self.remove_vertex(id);
None
}
PendingMutation::DropEdge { id } => {
let _ = self.remove_edge(id);
None
}
};
if let Some(result) = result {
if extract_id {
let id_value = match result {
Value::Vertex(vid) => Value::Int(vid.0 as i64),
Value::Edge(eid) => Value::Int(eid.0 as i64),
other => other,
};
final_results.push(id_value);
} else {
final_results.push(result);
}
}
} else {
final_results.push(value);
}
}
Ok(match terminal {
None | Some(crate::gremlin::TerminalStep::ToList { .. }) => {
ExecutionResult::List(final_results)
}
Some(crate::gremlin::TerminalStep::Next { count: None, .. }) => {
ExecutionResult::Single(final_results.into_iter().next())
}
Some(crate::gremlin::TerminalStep::Next { count: Some(n), .. }) => {
ExecutionResult::List(final_results.into_iter().take(n as usize).collect())
}
Some(crate::gremlin::TerminalStep::ToSet { .. }) => {
ExecutionResult::Set(final_results.into_iter().collect())
}
Some(crate::gremlin::TerminalStep::Iterate { .. }) => ExecutionResult::Unit,
Some(crate::gremlin::TerminalStep::HasNext { .. }) => {
ExecutionResult::Bool(!final_results.is_empty())
}
Some(crate::gremlin::TerminalStep::Explain { .. }) => {
ExecutionResult::Explain("explain not supported in mmap backend".to_string())
}
})
}
#[cfg(feature = "gremlin")]
pub fn execute_script(
&self,
script: &str,
) -> Result<crate::gremlin::ScriptResult, crate::gremlin::GremlinError> {
self.execute_script_with_context(script, crate::gremlin::VariableContext::new())
}
#[cfg(feature = "gremlin")]
pub fn execute_script_with_context(
&self,
script: &str,
context: crate::gremlin::VariableContext,
) -> Result<crate::gremlin::ScriptResult, crate::gremlin::GremlinError> {
use crate::gremlin::{parse_script, ExecutionResult, ScriptResult, Statement};
let ast = parse_script(script)?;
let mut ctx = context;
let mut last_result = ExecutionResult::Unit;
for statement in &ast.statements {
match statement {
Statement::Assignment {
name, traversal, ..
} => {
let snapshot = self.snapshot();
let g = snapshot.gremlin();
let compiled = crate::gremlin::compile_with_vars(traversal, &g, &ctx)?;
let terminal = compiled.terminal().cloned();
let raw_values = compiled.traversal.to_list();
let final_results = self.process_script_mutations(raw_values);
let result = match terminal {
Some(crate::gremlin::TerminalStep::Next { count: None, .. }) => {
ExecutionResult::Single(final_results.into_iter().next())
}
_ => ExecutionResult::List(final_results),
};
match result {
ExecutionResult::Single(Some(value)) => {
ctx.bind(name.clone(), value);
}
ExecutionResult::List(values) if values.len() == 1 => {
ctx.bind(name.clone(), values.into_iter().next().unwrap());
}
_ => {
return Err(crate::gremlin::GremlinError::Compile(
crate::gremlin::CompileError::InvalidArguments {
step: "assignment".to_string(),
message: format!(
"assignment to '{}' requires single value from .next()",
name
),
},
));
}
}
last_result = ExecutionResult::Unit;
}
Statement::Traversal { traversal, .. } => {
let snapshot = self.snapshot();
let g = snapshot.gremlin();
let compiled = crate::gremlin::compile_with_vars(traversal, &g, &ctx)?;
let terminal = compiled.terminal().cloned();
let raw_values = compiled.traversal.to_list();
let final_results = self.process_script_mutations(raw_values);
last_result = match terminal {
None | Some(crate::gremlin::TerminalStep::ToList { .. }) => {
ExecutionResult::List(final_results)
}
Some(crate::gremlin::TerminalStep::Next { count: None, .. }) => {
ExecutionResult::Single(final_results.into_iter().next())
}
Some(crate::gremlin::TerminalStep::Next { count: Some(n), .. }) => {
ExecutionResult::List(
final_results.into_iter().take(n as usize).collect(),
)
}
Some(crate::gremlin::TerminalStep::ToSet { .. }) => {
ExecutionResult::Set(final_results.into_iter().collect())
}
Some(crate::gremlin::TerminalStep::Iterate { .. }) => ExecutionResult::Unit,
Some(crate::gremlin::TerminalStep::HasNext { .. }) => {
ExecutionResult::Bool(!final_results.is_empty())
}
Some(crate::gremlin::TerminalStep::Explain { .. }) => {
ExecutionResult::Explain(
"explain not supported in mmap backend".to_string(),
)
}
};
}
}
}
Ok(ScriptResult {
result: last_result,
variables: ctx,
})
}
#[cfg(feature = "gremlin")]
fn process_script_mutations(&self, raw_values: Vec<Value>) -> Vec<Value> {
use crate::traversal::mutation::PendingMutation;
let mut final_results = Vec::with_capacity(raw_values.len());
for value in raw_values {
if let Some(mutation) = PendingMutation::from_value(&value) {
let extract_id = value
.as_map()
.map(|m| m.contains_key("__extract_id"))
.unwrap_or(false);
let result = match mutation {
PendingMutation::AddVertex { label, properties } => {
match self.add_vertex(&label, properties) {
Ok(id) => Some(Value::Vertex(id)),
Err(_) => None,
}
}
PendingMutation::AddEdge {
label,
from,
to,
properties,
} => match self.add_edge(from, to, &label, properties) {
Ok(id) => Some(Value::Edge(id)),
Err(_) => None,
},
PendingMutation::SetVertexProperty { id, key, value } => {
if self.set_vertex_property(id, &key, value).is_ok() {
Some(Value::Vertex(id))
} else {
None
}
}
PendingMutation::SetEdgeProperty { id, key, value } => {
if self.set_edge_property(id, &key, value).is_ok() {
Some(Value::Edge(id))
} else {
None
}
}
PendingMutation::DropVertex { id } => {
let _ = self.remove_vertex(id);
None
}
PendingMutation::DropEdge { id } => {
let _ = self.remove_edge(id);
None
}
};
if let Some(result) = result {
if extract_id {
let id_value = match result {
Value::Vertex(vid) => Value::Int(vid.0 as i64),
Value::Edge(eid) => Value::Int(eid.0 as i64),
other => other,
};
final_results.push(id_value);
} else {
final_results.push(result);
}
}
} else {
final_results.push(value);
}
}
final_results
}
pub fn checkpoint(&self) -> Result<(), StorageError> {
self.mmap.checkpoint()
}
pub fn is_batch_mode(&self) -> bool {
self.mmap.is_batch_mode()
}
pub fn mmap_graph(&self) -> &MmapGraph {
&self.mmap
}
pub fn save_query(
&self,
name: &str,
query_type: crate::query::QueryType,
description: &str,
query_text: &str,
) -> Result<u32, crate::error::QueryError> {
self.mmap
.save_query(name, query_type, description, query_text)
}
pub fn get_query(&self, name: &str) -> Option<crate::query::SavedQuery> {
self.mmap.get_query(name)
}
pub fn get_query_by_id(&self, id: u32) -> Option<crate::query::SavedQuery> {
self.mmap.get_query_by_id(id)
}
pub fn list_queries(&self) -> Vec<crate::query::SavedQuery> {
self.mmap.list_queries()
}
pub fn delete_query(&self, name: &str) -> Result<(), crate::error::QueryError> {
self.mmap.delete_query(name)
}
}
pub struct CowMmapSnapshot {
state: Arc<GraphState>,
interner_snapshot: Arc<StringInterner>,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
subscription_manager: std::sync::Arc<crate::traversal::reactive::SubscriptionManager>,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
snapshot_fn: std::sync::Arc<
dyn Fn() -> Box<dyn crate::traversal::context::SnapshotLike + Send> + Send + Sync,
>,
}
impl CowMmapSnapshot {
pub fn version(&self) -> u64 {
self.state.version
}
pub fn interner(&self) -> &StringInterner {
&self.interner_snapshot
}
pub fn gremlin(&self) -> crate::traversal::GraphTraversalSource<'_> {
crate::traversal::GraphTraversalSource::from_snapshot(self)
}
#[cfg(feature = "gremlin")]
pub fn query(
&self,
query: &str,
) -> Result<crate::gremlin::ExecutionResult, crate::gremlin::GremlinError> {
let ast = crate::gremlin::parse(query)?;
let g = self.gremlin();
let compiled = crate::gremlin::compile(&ast, &g)?;
Ok(compiled.execute())
}
}
impl crate::traversal::SnapshotLike for CowMmapSnapshot {
fn storage(&self) -> &dyn GraphStorage {
self
}
fn interner(&self) -> &StringInterner {
&self.interner_snapshot
}
fn as_dyn(&self) -> &dyn crate::traversal::SnapshotLike {
self
}
fn arc_storage(&self) -> std::sync::Arc<dyn GraphStorage + Send + Sync> {
std::sync::Arc::new(self.clone())
}
fn arc_interner(&self) -> std::sync::Arc<StringInterner> {
std::sync::Arc::clone(&self.interner_snapshot)
}
fn arc_streamable(&self) -> std::sync::Arc<dyn StreamableStorage> {
self.arc_streamable()
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
fn subscription_manager(&self) -> Option<&crate::traversal::reactive::SubscriptionManager> {
Some(&self.subscription_manager)
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
fn reactive_snapshot_fn(
&self,
) -> Option<
std::sync::Arc<
dyn Fn() -> Box<dyn crate::traversal::context::SnapshotLike + Send> + Send + Sync,
>,
> {
Some(self.snapshot_fn.clone())
}
}
impl Clone for CowMmapSnapshot {
fn clone(&self) -> Self {
Self {
state: Arc::clone(&self.state),
interner_snapshot: Arc::clone(&self.interner_snapshot),
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
subscription_manager: self.subscription_manager.clone(),
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
snapshot_fn: self.snapshot_fn.clone(),
}
}
}
unsafe impl Send for CowMmapSnapshot {}
unsafe impl Sync for CowMmapSnapshot {}
impl StreamableStorage for CowMmapSnapshot {
fn stream_all_vertices(&self) -> Box<dyn Iterator<Item = VertexId> + Send> {
let vertices = self.state.vertices.clone();
Box::new(vertices.into_iter().map(|(id, _)| id))
}
fn stream_all_edges(&self) -> Box<dyn Iterator<Item = EdgeId> + Send> {
let edges = self.state.edges.clone();
Box::new(edges.into_iter().map(|(id, _)| id))
}
fn stream_vertices_with_label(&self, label: &str) -> Box<dyn Iterator<Item = VertexId> + Send> {
let label_id = self.interner_snapshot.lookup(label);
if let Some(lid) = label_id {
if let Some(bitmap) = self.state.vertex_labels.get(&lid) {
let bitmap_owned: RoaringBitmap = (**bitmap).clone();
return Box::new(bitmap_owned.into_iter().map(|id| VertexId(id as u64)));
}
}
Box::new(std::iter::empty())
}
fn stream_edges_with_label(&self, label: &str) -> Box<dyn Iterator<Item = EdgeId> + Send> {
let label_id = self.interner_snapshot.lookup(label);
if let Some(lid) = label_id {
if let Some(bitmap) = self.state.edge_labels.get(&lid) {
let bitmap_owned: RoaringBitmap = (**bitmap).clone();
return Box::new(bitmap_owned.into_iter().map(|id| EdgeId(id as u64)));
}
}
Box::new(std::iter::empty())
}
fn stream_out_edges(&self, vertex: VertexId) -> Box<dyn Iterator<Item = EdgeId> + Send> {
if let Some(node) = self.state.vertices.get(&vertex) {
let out_edges = node.out_edges.clone();
Box::new(out_edges.into_iter())
} else {
Box::new(std::iter::empty())
}
}
fn stream_in_edges(&self, vertex: VertexId) -> Box<dyn Iterator<Item = EdgeId> + Send> {
if let Some(node) = self.state.vertices.get(&vertex) {
let in_edges = node.in_edges.clone();
Box::new(in_edges.into_iter())
} else {
Box::new(std::iter::empty())
}
}
fn stream_out_neighbors(
&self,
vertex: VertexId,
label_ids: &[u32],
) -> Box<dyn Iterator<Item = VertexId> + Send> {
let state = Arc::clone(&self.state);
let label_ids_owned: Vec<u32> = label_ids.to_vec();
if let Some(node) = state.vertices.get(&vertex) {
let out_edges = node.out_edges.clone();
let state_for_iter = Arc::clone(&state);
Box::new(out_edges.into_iter().filter_map(move |edge_id| {
state_for_iter.edges.get(&edge_id).and_then(|edge| {
if label_ids_owned.is_empty() || label_ids_owned.contains(&edge.label_id) {
Some(edge.dst)
} else {
None
}
})
}))
} else {
Box::new(std::iter::empty())
}
}
fn stream_in_neighbors(
&self,
vertex: VertexId,
label_ids: &[u32],
) -> Box<dyn Iterator<Item = VertexId> + Send> {
let state = Arc::clone(&self.state);
let label_ids_owned: Vec<u32> = label_ids.to_vec();
if let Some(node) = state.vertices.get(&vertex) {
let in_edges = node.in_edges.clone();
let state_for_iter = Arc::clone(&state);
Box::new(in_edges.into_iter().filter_map(move |edge_id| {
state_for_iter.edges.get(&edge_id).and_then(|edge| {
if label_ids_owned.is_empty() || label_ids_owned.contains(&edge.label_id) {
Some(edge.src)
} else {
None
}
})
}))
} else {
Box::new(std::iter::empty())
}
}
}
impl CowMmapSnapshot {
#[inline]
pub fn arc_streamable(&self) -> Arc<dyn StreamableStorage> {
Arc::new(self.clone())
}
}
impl GraphStorage for CowMmapSnapshot {
fn get_vertex(&self, id: VertexId) -> Option<Vertex> {
self.state.vertices.get(&id).map(|node| {
let label = self
.interner_snapshot
.resolve(node.label_id)
.unwrap_or("")
.to_string();
Vertex {
id: node.id,
label,
properties: node.properties.clone(),
}
})
}
fn get_edge(&self, id: EdgeId) -> Option<Edge> {
self.state.edges.get(&id).map(|edge| {
let label = self
.interner_snapshot
.resolve(edge.label_id)
.unwrap_or("")
.to_string();
Edge {
id: edge.id,
label,
src: edge.src,
dst: edge.dst,
properties: edge.properties.clone(),
}
})
}
fn vertex_count(&self) -> u64 {
self.state.vertices.len() as u64
}
fn edge_count(&self) -> u64 {
self.state.edges.len() as u64
}
fn out_edges(&self, vertex: VertexId) -> Box<dyn Iterator<Item = Edge> + '_> {
match self.state.vertices.get(&vertex) {
Some(node) => {
let edge_ids = node.out_edges.clone();
Box::new(edge_ids.into_iter().filter_map(|id| self.get_edge(id)))
}
None => Box::new(std::iter::empty()),
}
}
fn in_edges(&self, vertex: VertexId) -> Box<dyn Iterator<Item = Edge> + '_> {
match self.state.vertices.get(&vertex) {
Some(node) => {
let edge_ids = node.in_edges.clone();
Box::new(edge_ids.into_iter().filter_map(|id| self.get_edge(id)))
}
None => Box::new(std::iter::empty()),
}
}
fn vertices_with_label(&self, label: &str) -> Box<dyn Iterator<Item = Vertex> + '_> {
let label_id = self.interner_snapshot.lookup(label);
match label_id.and_then(|id| self.state.vertex_labels.get(&id).cloned()) {
Some(bitmap) => {
let vertex_ids: Vec<u64> = bitmap.iter().map(|id| id as u64).collect();
Box::new(
vertex_ids
.into_iter()
.filter_map(move |id| self.get_vertex(VertexId(id))),
)
}
None => Box::new(std::iter::empty()),
}
}
fn edges_with_label(&self, label: &str) -> Box<dyn Iterator<Item = Edge> + '_> {
let label_id = self.interner_snapshot.lookup(label);
match label_id.and_then(|id| self.state.edge_labels.get(&id).cloned()) {
Some(bitmap) => {
let edge_ids: Vec<u64> = bitmap.iter().map(|id| id as u64).collect();
Box::new(
edge_ids
.into_iter()
.filter_map(move |id| self.get_edge(EdgeId(id))),
)
}
None => Box::new(std::iter::empty()),
}
}
fn all_vertices(&self) -> Box<dyn Iterator<Item = Vertex> + '_> {
let vertex_ids: Vec<VertexId> = self.state.vertices.keys().copied().collect();
Box::new(
vertex_ids
.into_iter()
.filter_map(move |id| self.get_vertex(id)),
)
}
fn all_edges(&self) -> Box<dyn Iterator<Item = Edge> + '_> {
let edge_ids: Vec<EdgeId> = self.state.edges.keys().copied().collect();
Box::new(edge_ids.into_iter().filter_map(move |id| self.get_edge(id)))
}
fn interner(&self) -> &StringInterner {
&self.interner_snapshot
}
}
#[derive(Debug)]
pub enum BatchError {
Storage(StorageError),
VertexNotFound(VertexId),
EdgeNotFound(EdgeId),
Custom(String),
}
impl std::fmt::Display for BatchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BatchError::Storage(e) => write!(f, "storage error: {}", e),
BatchError::VertexNotFound(id) => write!(f, "vertex not found: {:?}", id),
BatchError::EdgeNotFound(id) => write!(f, "edge not found: {:?}", id),
BatchError::Custom(msg) => write!(f, "{}", msg),
}
}
}
impl std::error::Error for BatchError {}
impl From<StorageError> for BatchError {
fn from(e: StorageError) -> Self {
BatchError::Storage(e)
}
}
enum BatchOperation {
AddVertex {
label: String,
properties: HashMap<String, Value>,
},
AddEdge {
src: VertexId,
dst: VertexId,
label: String,
properties: HashMap<String, Value>,
},
SetVertexProperty {
id: VertexId,
key: String,
value: Value,
},
SetEdgeProperty {
id: EdgeId,
key: String,
value: Value,
},
RemoveVertex {
id: VertexId,
},
RemoveEdge {
id: EdgeId,
},
}
pub struct CowMmapBatchContext<'g> {
#[allow(dead_code)] graph: &'g CowMmapGraph,
pending_state: GraphState,
operations: Vec<BatchOperation>,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
pub(crate) pending_events: Vec<crate::storage::events::GraphEvent>,
}
impl<'g> CowMmapBatchContext<'g> {
pub fn add_vertex(&mut self, label: &str, properties: HashMap<String, Value>) -> VertexId {
let id = VertexId(self.pending_state.next_vertex_id);
self.pending_state.next_vertex_id += 1;
let label_id = self.pending_state.interner.write().intern(label);
let node = Arc::new(NodeData {
id,
label_id,
properties: properties.clone(),
out_edges: Vec::new(),
in_edges: Vec::new(),
});
self.pending_state.vertices = self.pending_state.vertices.update(id, node);
let bitmap = self
.pending_state
.vertex_labels
.get(&label_id)
.cloned()
.unwrap_or_else(|| Arc::new(RoaringBitmap::new()));
let mut new_bitmap = (*bitmap).clone();
new_bitmap.insert(id.0 as u32);
self.pending_state.vertex_labels = self
.pending_state
.vertex_labels
.update(label_id, Arc::new(new_bitmap));
self.pending_state.version += 1;
self.operations.push(BatchOperation::AddVertex {
label: label.to_string(),
properties: properties.clone(),
});
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
self.pending_events.push(crate::storage::events::GraphEvent::VertexAdded {
id,
label: label.to_string(),
properties,
});
id
}
pub fn add_edge(
&mut self,
src: VertexId,
dst: VertexId,
label: &str,
properties: HashMap<String, Value>,
) -> Result<EdgeId, BatchError> {
if !self.pending_state.vertices.contains_key(&src) {
return Err(BatchError::VertexNotFound(src));
}
if !self.pending_state.vertices.contains_key(&dst) {
return Err(BatchError::VertexNotFound(dst));
}
let id = EdgeId(self.pending_state.next_edge_id);
self.pending_state.next_edge_id += 1;
let label_id = self.pending_state.interner.write().intern(label);
let edge = Arc::new(EdgeData {
id,
label_id,
src,
dst,
properties: properties.clone(),
});
self.pending_state.edges = self.pending_state.edges.update(id, edge);
let bitmap = self
.pending_state
.edge_labels
.get(&label_id)
.cloned()
.unwrap_or_else(|| Arc::new(RoaringBitmap::new()));
let mut new_bitmap = (*bitmap).clone();
new_bitmap.insert(id.0 as u32);
self.pending_state.edge_labels = self
.pending_state
.edge_labels
.update(label_id, Arc::new(new_bitmap));
if let Some(src_node) = self.pending_state.vertices.get(&src) {
let mut new_node = (**src_node).clone();
new_node.out_edges.push(id);
self.pending_state.vertices =
self.pending_state.vertices.update(src, Arc::new(new_node));
}
if let Some(dst_node) = self.pending_state.vertices.get(&dst) {
let mut new_node = (**dst_node).clone();
new_node.in_edges.push(id);
self.pending_state.vertices =
self.pending_state.vertices.update(dst, Arc::new(new_node));
}
self.pending_state.version += 1;
self.operations.push(BatchOperation::AddEdge {
src,
dst,
label: label.to_string(),
properties: properties.clone(),
});
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
self.pending_events.push(crate::storage::events::GraphEvent::EdgeAdded {
id,
src,
dst,
label: label.to_string(),
properties,
});
Ok(id)
}
pub fn set_vertex_property(
&mut self,
id: VertexId,
key: &str,
value: Value,
) -> Result<(), BatchError> {
let node = self
.pending_state
.vertices
.get(&id)
.ok_or(BatchError::VertexNotFound(id))?;
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let old_value = node.properties.get(key).cloned();
let mut new_node = (**node).clone();
new_node.properties.insert(key.to_string(), value.clone());
self.pending_state.vertices = self.pending_state.vertices.update(id, Arc::new(new_node));
self.pending_state.version += 1;
self.operations.push(BatchOperation::SetVertexProperty {
id,
key: key.to_string(),
value: value.clone(),
});
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
self.pending_events.push(crate::storage::events::GraphEvent::VertexPropertyChanged {
id,
key: key.to_string(),
old_value,
new_value: value,
});
Ok(())
}
pub fn set_edge_property(
&mut self,
id: EdgeId,
key: &str,
value: Value,
) -> Result<(), BatchError> {
let edge = self
.pending_state
.edges
.get(&id)
.ok_or(BatchError::EdgeNotFound(id))?;
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let old_value = edge.properties.get(key).cloned();
let mut new_edge = (**edge).clone();
new_edge.properties.insert(key.to_string(), value.clone());
self.pending_state.edges = self.pending_state.edges.update(id, Arc::new(new_edge));
self.pending_state.version += 1;
self.operations.push(BatchOperation::SetEdgeProperty {
id,
key: key.to_string(),
value: value.clone(),
});
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
self.pending_events.push(crate::storage::events::GraphEvent::EdgePropertyChanged {
id,
key: key.to_string(),
old_value,
new_value: value,
});
Ok(())
}
pub fn remove_vertex(&mut self, id: VertexId) -> Result<(), BatchError> {
let node = self
.pending_state
.vertices
.get(&id)
.ok_or(BatchError::VertexNotFound(id))?
.clone();
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let vertex_label = self
.pending_state
.interner
.read()
.resolve(node.label_id)
.map(|s| s.to_string())
.unwrap_or_default();
let edges_to_remove: Vec<EdgeId> = node
.out_edges
.iter()
.chain(node.in_edges.iter())
.copied()
.collect();
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let edge_infos: Vec<(EdgeId, VertexId, VertexId, String)> = edges_to_remove
.iter()
.filter_map(|&edge_id| {
self.pending_state.edges.get(&edge_id).map(|e| {
let elabel = self
.pending_state
.interner
.read()
.resolve(e.label_id)
.map(|s| s.to_string())
.unwrap_or_default();
(edge_id, e.src, e.dst, elabel)
})
})
.collect();
for edge_id in &edges_to_remove {
if let Some(edge) = self.pending_state.edges.get(edge_id) {
let label_id = edge.label_id;
if let Some(bitmap) = self.pending_state.edge_labels.get(&label_id) {
let mut new_bitmap = (**bitmap).clone();
new_bitmap.remove(edge_id.0 as u32);
self.pending_state.edge_labels = self
.pending_state
.edge_labels
.update(label_id, Arc::new(new_bitmap));
}
let other_vertex = if edge.src == id { edge.dst } else { edge.src };
if let Some(other_node) = self.pending_state.vertices.get(&other_vertex) {
let mut new_node = (**other_node).clone();
new_node.out_edges.retain(|e| e != edge_id);
new_node.in_edges.retain(|e| e != edge_id);
self.pending_state.vertices = self
.pending_state
.vertices
.update(other_vertex, Arc::new(new_node));
}
self.pending_state.edges = self.pending_state.edges.without(edge_id);
}
}
if let Some(bitmap) = self.pending_state.vertex_labels.get(&node.label_id) {
let mut new_bitmap = (**bitmap).clone();
new_bitmap.remove(id.0 as u32);
self.pending_state.vertex_labels = self
.pending_state
.vertex_labels
.update(node.label_id, Arc::new(new_bitmap));
}
self.pending_state.vertices = self.pending_state.vertices.without(&id);
self.pending_state.version += 1;
self.operations.push(BatchOperation::RemoveVertex { id });
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
{
for (eid, src, dst, elabel) in edge_infos {
self.pending_events.push(crate::storage::events::GraphEvent::EdgeRemoved {
id: eid, src, dst, label: elabel,
});
}
self.pending_events.push(crate::storage::events::GraphEvent::VertexRemoved {
id,
label: vertex_label,
});
}
Ok(())
}
pub fn remove_edge(&mut self, id: EdgeId) -> Result<(), BatchError> {
let edge = self
.pending_state
.edges
.get(&id)
.ok_or(BatchError::EdgeNotFound(id))?
.clone();
if let Some(src_node) = self.pending_state.vertices.get(&edge.src) {
let mut new_node = (**src_node).clone();
new_node.out_edges.retain(|e| *e != id);
self.pending_state.vertices = self
.pending_state
.vertices
.update(edge.src, Arc::new(new_node));
}
if let Some(dst_node) = self.pending_state.vertices.get(&edge.dst) {
let mut new_node = (**dst_node).clone();
new_node.in_edges.retain(|e| *e != id);
self.pending_state.vertices = self
.pending_state
.vertices
.update(edge.dst, Arc::new(new_node));
}
if let Some(bitmap) = self.pending_state.edge_labels.get(&edge.label_id) {
let mut new_bitmap = (**bitmap).clone();
new_bitmap.remove(id.0 as u32);
self.pending_state.edge_labels = self
.pending_state
.edge_labels
.update(edge.label_id, Arc::new(new_bitmap));
}
self.pending_state.edges = self.pending_state.edges.without(&id);
self.pending_state.version += 1;
self.operations.push(BatchOperation::RemoveEdge { id });
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
{
let elabel = self
.pending_state
.interner
.read()
.resolve(edge.label_id)
.map(|s| s.to_string())
.unwrap_or_default();
self.pending_events.push(crate::storage::events::GraphEvent::EdgeRemoved {
id,
src: edge.src,
dst: edge.dst,
label: elabel,
});
}
Ok(())
}
}
struct CowMmapGraphMutWrapper<'a> {
graph: &'a CowMmapGraph,
}
impl<'a> GraphStorage for CowMmapGraphMutWrapper<'a> {
fn get_vertex(&self, id: VertexId) -> Option<Vertex> {
self.graph.get_vertex(id)
}
fn vertex_count(&self) -> u64 {
self.graph.vertex_count()
}
fn get_edge(&self, id: EdgeId) -> Option<Edge> {
self.graph.get_edge(id)
}
fn edge_count(&self) -> u64 {
self.graph.edge_count()
}
fn out_edges(&self, vertex: VertexId) -> Box<dyn Iterator<Item = Edge> + '_> {
let snapshot = self.graph.snapshot();
let edges: Vec<Edge> = snapshot.out_edges(vertex).collect();
Box::new(edges.into_iter())
}
fn in_edges(&self, vertex: VertexId) -> Box<dyn Iterator<Item = Edge> + '_> {
let snapshot = self.graph.snapshot();
let edges: Vec<Edge> = snapshot.in_edges(vertex).collect();
Box::new(edges.into_iter())
}
fn vertices_with_label(&self, label: &str) -> Box<dyn Iterator<Item = Vertex> + '_> {
let snapshot = self.graph.snapshot();
let vertices: Vec<Vertex> = snapshot.vertices_with_label(label).collect();
Box::new(vertices.into_iter())
}
fn edges_with_label(&self, label: &str) -> Box<dyn Iterator<Item = Edge> + '_> {
let snapshot = self.graph.snapshot();
let edges: Vec<Edge> = snapshot.edges_with_label(label).collect();
Box::new(edges.into_iter())
}
fn all_vertices(&self) -> Box<dyn Iterator<Item = Vertex> + '_> {
let snapshot = self.graph.snapshot();
let vertices: Vec<Vertex> = snapshot.all_vertices().collect();
Box::new(vertices.into_iter())
}
fn all_edges(&self) -> Box<dyn Iterator<Item = Edge> + '_> {
let snapshot = self.graph.snapshot();
let edges: Vec<Edge> = snapshot.all_edges().collect();
Box::new(edges.into_iter())
}
fn interner(&self) -> &StringInterner {
let state = self.graph.state.read();
let guard = state.interner.read();
let ptr = &*guard as *const StringInterner;
std::mem::forget(guard);
unsafe { &*ptr }
}
}
impl<'a> crate::storage::GraphStorageMut for CowMmapGraphMutWrapper<'a> {
fn add_vertex(&mut self, label: &str, properties: HashMap<String, Value>) -> VertexId {
self.graph
.add_vertex(label, properties)
.expect("add_vertex failed during GQL mutation")
}
fn add_edge(
&mut self,
src: VertexId,
dst: VertexId,
label: &str,
properties: HashMap<String, Value>,
) -> Result<EdgeId, StorageError> {
self.graph.add_edge(src, dst, label, properties)
}
fn set_vertex_property(
&mut self,
id: VertexId,
key: &str,
value: Value,
) -> Result<(), StorageError> {
self.graph.set_vertex_property(id, key, value)
}
fn set_edge_property(
&mut self,
id: EdgeId,
key: &str,
value: Value,
) -> Result<(), StorageError> {
self.graph.set_edge_property(id, key, value)
}
fn remove_vertex(&mut self, id: VertexId) -> Result<(), StorageError> {
self.graph.remove_vertex(id)
}
fn remove_edge(&mut self, id: EdgeId) -> Result<(), StorageError> {
self.graph.remove_edge(id)
}
}
pub struct CowMmapTraversalSource<'g> {
graph: &'g CowMmapGraph,
graph_arc: Arc<CowMmapGraph>,
}
impl<'g> CowMmapTraversalSource<'g> {
pub fn new_with_arc(graph: &'g CowMmapGraph, graph_arc: Arc<CowMmapGraph>) -> Self {
Self { graph, graph_arc }
}
pub fn v(&self) -> CowMmapBoundTraversal<'g, (), Value, VertexMarker> {
CowMmapBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::AllVertices),
)
}
pub fn v_ids<I>(&self, ids: I) -> CowMmapBoundTraversal<'g, (), Value, VertexMarker>
where
I: IntoIterator<Item = VertexId>,
{
CowMmapBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::Vertices(ids.into_iter().collect())),
)
}
pub fn v_id(&self, id: VertexId) -> CowMmapBoundTraversal<'g, (), Value, VertexMarker> {
self.v_ids([id])
}
pub fn e(&self) -> CowMmapBoundTraversal<'g, (), Value, EdgeMarker> {
CowMmapBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::AllEdges),
)
}
pub fn e_ids<I>(&self, ids: I) -> CowMmapBoundTraversal<'g, (), Value, EdgeMarker>
where
I: IntoIterator<Item = EdgeId>,
{
CowMmapBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::Edges(ids.into_iter().collect())),
)
}
pub fn add_v(
&self,
label: impl Into<String>,
) -> CowMmapBoundTraversal<'g, (), Value, VertexMarker> {
use crate::traversal::mutation::AddVStep;
let mut traversal = Traversal::<(), Value>::with_source(TraversalSource::Inject(vec![]));
traversal = traversal.add_step(AddVStep::new(label));
CowMmapBoundTraversal::new_typed(self.graph, Arc::clone(&self.graph_arc), traversal)
}
pub fn add_e(&self, label: impl Into<String>) -> CowMmapAddEdgeBuilder<'g> {
CowMmapAddEdgeBuilder::new(self.graph, Arc::clone(&self.graph_arc), label.into())
}
pub fn inject<I>(&self, values: I) -> CowMmapBoundTraversal<'g, (), Value, Scalar>
where
I: IntoIterator<Item = Value>,
{
CowMmapBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::Inject(values.into_iter().collect())),
)
}
pub fn v_untyped(&self) -> CowMmapBoundTraversal<'g, (), Value, Scalar> {
CowMmapBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::AllVertices),
)
}
pub fn v_ids_untyped<I>(&self, ids: I) -> CowMmapBoundTraversal<'g, (), Value, Scalar>
where
I: IntoIterator<Item = VertexId>,
{
CowMmapBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::Vertices(ids.into_iter().collect())),
)
}
pub fn e_untyped(&self) -> CowMmapBoundTraversal<'g, (), Value, Scalar> {
CowMmapBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::AllEdges),
)
}
pub fn e_ids_untyped<I>(&self, ids: I) -> CowMmapBoundTraversal<'g, (), Value, Scalar>
where
I: IntoIterator<Item = EdgeId>,
{
CowMmapBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::Edges(ids.into_iter().collect())),
)
}
}
pub struct CowMmapBoundTraversal<'g, In, Out, Marker: OutputMarker = Scalar> {
graph: &'g CowMmapGraph,
graph_arc: Arc<CowMmapGraph>,
traversal: Traversal<In, Out>,
track_paths: bool,
_marker: PhantomData<Marker>,
}
impl<'g, In, Out, Marker: OutputMarker> CowMmapBoundTraversal<'g, In, Out, Marker> {
pub(crate) fn new_typed(
graph: &'g CowMmapGraph,
graph_arc: Arc<CowMmapGraph>,
traversal: Traversal<In, Out>,
) -> Self {
Self {
graph,
graph_arc,
traversal,
track_paths: false,
_marker: PhantomData,
}
}
pub fn with_path(mut self) -> Self {
self.track_paths = true;
self
}
pub fn add_step_same<NewOut>(
self,
step: impl crate::traversal::step::Step,
) -> CowMmapBoundTraversal<'g, In, NewOut, Marker> {
CowMmapBoundTraversal {
graph: self.graph,
graph_arc: self.graph_arc,
traversal: self.traversal.add_step(step),
track_paths: self.track_paths,
_marker: PhantomData,
}
}
pub fn add_step<NewOut>(
self,
step: impl crate::traversal::step::Step,
) -> CowMmapBoundTraversal<'g, In, NewOut, Marker> {
self.add_step_same(step)
}
pub fn add_step_with_marker<NewOut, NewMarker: OutputMarker>(
self,
step: impl crate::traversal::step::Step,
) -> CowMmapBoundTraversal<'g, In, NewOut, NewMarker> {
CowMmapBoundTraversal {
graph: self.graph,
graph_arc: self.graph_arc,
traversal: self.traversal.add_step(step),
track_paths: self.track_paths,
_marker: PhantomData,
}
}
pub fn append<Mid>(
self,
anon: Traversal<Out, Mid>,
) -> CowMmapBoundTraversal<'g, In, Mid, Marker> {
CowMmapBoundTraversal {
graph: self.graph,
graph_arc: self.graph_arc,
traversal: self.traversal.append(anon),
track_paths: self.track_paths,
_marker: PhantomData,
}
}
fn execute_with_mutations(&self) -> Vec<Value> {
use crate::traversal::step::StartStep;
use crate::traversal::traverser::TraversalSource;
let traversal_clone = self.traversal.clone();
let (source, steps) = traversal_clone.into_steps();
let is_mutation_only =
matches!(&source, Some(TraversalSource::Inject(values)) if values.is_empty());
let results: Vec<Traverser> = if is_mutation_only {
let snapshot = self.graph.snapshot();
let interner = snapshot.interner();
let storage_ref: &dyn GraphStorage = &snapshot;
let ctx = if self.track_paths {
ExecutionContext::with_path_tracking(storage_ref, interner)
} else {
ExecutionContext::new(storage_ref, interner)
};
let mut current: Vec<Traverser> = Vec::new();
if !steps.is_empty() {
current = vec![Traverser::new(Value::Null)];
}
for step in &steps {
current = step
.apply_dyn(&ctx, Box::new(current.into_iter()))
.collect();
}
current
} else {
let snapshot = self.graph.snapshot();
let interner = snapshot.interner();
let storage_ref: &dyn GraphStorage = &snapshot;
let ctx = if self.track_paths {
ExecutionContext::with_path_tracking(storage_ref, interner)
} else {
ExecutionContext::new(storage_ref, interner)
};
let mut current: Vec<Traverser> = match source {
Some(src) => {
let start_step = StartStep::new(src);
start_step
.apply(&ctx, Box::new(std::iter::empty()))
.collect()
}
None => Vec::new(),
};
for step in &steps {
current = step
.apply_dyn(&ctx, Box::new(current.into_iter()))
.collect();
}
current
};
let mut wrapper = CowMmapGraphMutWrapper { graph: self.graph };
let mut final_results = Vec::with_capacity(results.len());
for traverser in results {
if let Some(mutation) = PendingMutation::from_value(&traverser.value) {
if let Some(result) = Self::execute_mutation(&mut wrapper, mutation) {
final_results.push(result);
}
} else {
final_results.push(traverser.value);
}
}
final_results
}
fn execute_mutation(
wrapper: &mut CowMmapGraphMutWrapper<'_>,
mutation: PendingMutation,
) -> Option<Value> {
use crate::storage::GraphStorageMut;
match mutation {
PendingMutation::AddVertex { label, properties } => {
let id = wrapper.add_vertex(&label, properties);
Some(Value::Vertex(id))
}
PendingMutation::AddEdge {
label,
from,
to,
properties,
} => match wrapper.add_edge(from, to, &label, properties) {
Ok(id) => Some(Value::Edge(id)),
Err(_) => None,
},
PendingMutation::SetVertexProperty { id, key, value } => {
wrapper.set_vertex_property(id, &key, value).ok()?;
Some(Value::Vertex(id))
}
PendingMutation::SetEdgeProperty { id, key, value } => {
wrapper.set_edge_property(id, &key, value).ok()?;
Some(Value::Edge(id))
}
PendingMutation::DropVertex { id } => {
wrapper.remove_vertex(id).ok()?;
None
}
PendingMutation::DropEdge { id } => {
wrapper.remove_edge(id).ok()?;
None
}
}
}
}
impl<'g, In, Out, Marker: OutputMarker> CowMmapBoundTraversal<'g, In, Out, Marker> {
pub fn iterate(self) {
let _ = self.execute_with_mutations();
}
pub fn has_next(&self) -> bool {
!self.execute_with_mutations().is_empty()
}
}
impl<'g, In, Out> CowMmapBoundTraversal<'g, In, Out, VertexMarker> {
pub fn next(self) -> Option<PersistentVertex> {
let graph_arc = Arc::clone(&self.graph_arc);
self.execute_with_mutations()
.into_iter()
.find_map(|v| match v {
Value::Vertex(id) => Some(GraphVertex::new(id, Arc::clone(&graph_arc))),
_ => None,
})
}
pub fn to_list(self) -> Vec<PersistentVertex> {
let graph_arc = Arc::clone(&self.graph_arc);
self.execute_with_mutations()
.into_iter()
.filter_map(|v| match v {
Value::Vertex(id) => Some(GraphVertex::new(id, Arc::clone(&graph_arc))),
_ => None,
})
.collect()
}
pub fn one(self) -> Result<PersistentVertex, crate::error::TraversalError> {
let graph_arc = Arc::clone(&self.graph_arc);
let ids: Vec<_> = self
.execute_with_mutations()
.into_iter()
.filter_map(|v| match v {
Value::Vertex(id) => Some(id),
_ => None,
})
.take(2)
.collect();
match ids.len() {
1 => Ok(GraphVertex::new(ids[0], graph_arc)),
n => Err(crate::error::TraversalError::NotOne(n)),
}
}
pub fn count(self) -> u64 {
self.execute_with_mutations()
.into_iter()
.filter(|v| matches!(v, Value::Vertex(_)))
.count() as u64
}
#[allow(clippy::mutable_key_type)]
pub fn to_set(self) -> std::collections::HashSet<PersistentVertex> {
self.to_list().into_iter().collect()
}
}
impl<'g, In, Out> CowMmapBoundTraversal<'g, In, Out, EdgeMarker> {
pub fn next(self) -> Option<PersistentEdge> {
let graph_arc = Arc::clone(&self.graph_arc);
self.execute_with_mutations()
.into_iter()
.find_map(|v| match v {
Value::Edge(id) => Some(GraphEdge::new(id, Arc::clone(&graph_arc))),
_ => None,
})
}
pub fn to_list(self) -> Vec<PersistentEdge> {
let graph_arc = Arc::clone(&self.graph_arc);
self.execute_with_mutations()
.into_iter()
.filter_map(|v| match v {
Value::Edge(id) => Some(GraphEdge::new(id, Arc::clone(&graph_arc))),
_ => None,
})
.collect()
}
pub(crate) fn into_list_values(self) -> Vec<Value> {
self.execute_with_mutations()
}
pub fn one(self) -> Result<PersistentEdge, crate::error::TraversalError> {
let graph_arc = Arc::clone(&self.graph_arc);
let ids: Vec<_> = self
.execute_with_mutations()
.into_iter()
.filter_map(|v| match v {
Value::Edge(id) => Some(id),
_ => None,
})
.take(2)
.collect();
match ids.len() {
1 => Ok(GraphEdge::new(ids[0], graph_arc)),
n => Err(crate::error::TraversalError::NotOne(n)),
}
}
pub fn count(self) -> u64 {
self.execute_with_mutations()
.into_iter()
.filter(|v| matches!(v, Value::Edge(_)))
.count() as u64
}
#[allow(clippy::mutable_key_type)]
pub fn to_set(self) -> std::collections::HashSet<PersistentEdge> {
self.to_list().into_iter().collect()
}
}
impl<'g, In, Out> CowMmapBoundTraversal<'g, In, Out, Scalar> {
pub fn next(self) -> Option<Value> {
self.execute_with_mutations().into_iter().next()
}
pub fn to_list(self) -> Vec<Value> {
self.execute_with_mutations()
}
pub fn one(self) -> Result<Value, crate::error::TraversalError> {
let results: Vec<_> = self.execute_with_mutations().into_iter().take(2).collect();
match results.len() {
1 => Ok(results.into_iter().next().unwrap()),
n => Err(crate::error::TraversalError::NotOne(n)),
}
}
pub fn count(self) -> u64 {
self.execute_with_mutations().len() as u64
}
pub fn to_set(self) -> std::collections::HashSet<Value> {
self.execute_with_mutations().into_iter().collect()
}
pub fn to_vertex_list(self, graph: Arc<CowMmapGraph>) -> Vec<PersistentVertex> {
self.execute_with_mutations()
.into_iter()
.filter_map(|v| match v {
Value::Vertex(id) => Some(GraphVertex::new(id, Arc::clone(&graph))),
_ => None,
})
.collect()
}
#[deprecated(note = "Use typed traversal with `next()` instead")]
pub fn next_vertex(self, graph: Arc<CowMmapGraph>) -> Option<PersistentVertex> {
self.execute_with_mutations()
.into_iter()
.find_map(|v| match v {
Value::Vertex(id) => Some(GraphVertex::new(id, Arc::clone(&graph))),
_ => None,
})
}
#[deprecated(note = "Use typed traversal with `one()` instead")]
pub fn one_vertex(
self,
graph: Arc<CowMmapGraph>,
) -> Result<PersistentVertex, crate::error::TraversalError> {
let ids: Vec<_> = self
.execute_with_mutations()
.into_iter()
.filter_map(|v| match v {
Value::Vertex(id) => Some(id),
_ => None,
})
.take(2)
.collect();
match ids.len() {
1 => Ok(GraphVertex::new(ids[0], graph)),
n => Err(crate::error::TraversalError::NotOne(n)),
}
}
#[deprecated(note = "Use typed traversal with `to_list()` instead")]
pub fn to_edge_list(self, graph: Arc<CowMmapGraph>) -> Vec<PersistentEdge> {
self.execute_with_mutations()
.into_iter()
.filter_map(|v| match v {
Value::Edge(id) => Some(GraphEdge::new(id, Arc::clone(&graph))),
_ => None,
})
.collect()
}
#[deprecated(note = "Use typed traversal with `next()` instead")]
pub fn next_edge(self, graph: Arc<CowMmapGraph>) -> Option<PersistentEdge> {
self.execute_with_mutations()
.into_iter()
.find_map(|v| match v {
Value::Edge(id) => Some(GraphEdge::new(id, Arc::clone(&graph))),
_ => None,
})
}
pub fn one_edge(
self,
graph: Arc<CowMmapGraph>,
) -> Result<PersistentEdge, crate::error::TraversalError> {
let ids: Vec<_> = self
.execute_with_mutations()
.into_iter()
.filter_map(|v| match v {
Value::Edge(id) => Some(id),
_ => None,
})
.take(2)
.collect();
match ids.len() {
1 => Ok(GraphEdge::new(ids[0], graph)),
n => Err(crate::error::TraversalError::NotOne(n)),
}
}
}
impl<'g, In> CowMmapBoundTraversal<'g, In, Value> {
pub fn has_label(self, label: impl Into<String>) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(HasLabelStep::single(label))
}
pub fn has(self, key: impl Into<String>) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(HasStep::new(key.into()))
}
pub fn has_value(
self,
key: impl Into<String>,
value: Value,
) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(HasValueStep::new(key.into(), value))
}
pub fn out(self) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(OutStep::new())
}
pub fn out_label(self, label: impl Into<String>) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(OutStep::with_labels(vec![label.into()]))
}
pub fn in_(self) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(InStep::new())
}
pub fn in_label(self, label: impl Into<String>) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(InStep::with_labels(vec![label.into()]))
}
pub fn out_e(self) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(OutEStep::new())
}
pub fn in_e(self) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(InEStep::new())
}
pub fn in_v(self) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(InVStep)
}
pub fn out_v(self) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(OutVStep)
}
pub fn values(self, key: impl Into<String>) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(ValuesStep::new(key.into()))
}
pub fn property(
self,
key: impl Into<String>,
value: impl Into<Value>,
) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(PropertyStep::new(key.into(), value.into()))
}
pub fn drop(self) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(DropStep)
}
pub fn add_e(self, label: impl Into<String>) -> CowMmapBoundAddEdgeBuilder<'g, In> {
CowMmapBoundAddEdgeBuilder::new(
self.graph,
self.graph_arc,
self.traversal,
label.into(),
self.track_paths,
)
}
pub fn limit(self, n: usize) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(LimitStep::new(n))
}
pub fn skip(self, n: usize) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(SkipStep::new(n))
}
pub fn id(self) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(IdStep)
}
pub fn label(self) -> CowMmapBoundTraversal<'g, In, Value> {
self.add_step(LabelStep)
}
}
impl<'g, In> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
pub fn has_label(
self,
label: impl Into<String>,
) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(HasLabelStep::single(label))
}
pub fn has(self, key: impl Into<String>) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(HasStep::new(key.into()))
}
pub fn has_value(
self,
key: impl Into<String>,
value: Value,
) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(HasValueStep::new(key.into(), value))
}
pub fn out(self) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(OutStep::new())
}
pub fn out_label(
self,
label: impl Into<String>,
) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(OutStep::with_labels(vec![label.into()]))
}
pub fn in_(self) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(InStep::new())
}
pub fn in_label(
self,
label: impl Into<String>,
) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(InStep::with_labels(vec![label.into()]))
}
pub fn both(self) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(crate::traversal::BothStep::new())
}
pub fn out_e(self) -> CowMmapBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_with_marker(OutEStep::new())
}
pub fn in_e(self) -> CowMmapBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_with_marker(InEStep::new())
}
pub fn property(
self,
key: impl Into<String>,
value: impl Into<Value>,
) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(PropertyStep::new(key.into(), value.into()))
}
pub fn drop(self) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(DropStep)
}
pub fn values(self, key: impl Into<String>) -> CowMmapBoundTraversal<'g, In, Value, Scalar> {
self.add_step_with_marker(ValuesStep::new(key.into()))
}
pub fn id(self) -> CowMmapBoundTraversal<'g, In, Value, Scalar> {
self.add_step_with_marker(IdStep)
}
pub fn label(self) -> CowMmapBoundTraversal<'g, In, Value, Scalar> {
self.add_step_with_marker(LabelStep)
}
pub fn limit(self, n: usize) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(LimitStep::new(n))
}
pub fn skip(self, n: usize) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(SkipStep::new(n))
}
pub fn add_e(self, label: impl Into<String>) -> CowMmapBoundAddEdgeBuilder<'g, In> {
CowMmapBoundAddEdgeBuilder::new(
self.graph,
self.graph_arc,
self.traversal,
label.into(),
self.track_paths,
)
}
}
impl<'g, In> CowMmapBoundTraversal<'g, In, Value, EdgeMarker> {
pub fn has_label(
self,
label: impl Into<String>,
) -> CowMmapBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(HasLabelStep::single(label))
}
pub fn has(self, key: impl Into<String>) -> CowMmapBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(HasStep::new(key.into()))
}
pub fn has_value(
self,
key: impl Into<String>,
value: Value,
) -> CowMmapBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(HasValueStep::new(key.into(), value))
}
pub fn in_v(self) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_with_marker(InVStep)
}
pub fn out_v(self) -> CowMmapBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_with_marker(OutVStep)
}
pub fn property(
self,
key: impl Into<String>,
value: impl Into<Value>,
) -> CowMmapBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(PropertyStep::new(key.into(), value.into()))
}
pub fn drop(self) -> CowMmapBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(DropStep)
}
pub fn values(self, key: impl Into<String>) -> CowMmapBoundTraversal<'g, In, Value, Scalar> {
self.add_step_with_marker(ValuesStep::new(key.into()))
}
pub fn id(self) -> CowMmapBoundTraversal<'g, In, Value, Scalar> {
self.add_step_with_marker(IdStep)
}
pub fn label(self) -> CowMmapBoundTraversal<'g, In, Value, Scalar> {
self.add_step_with_marker(LabelStep)
}
pub fn limit(self, n: usize) -> CowMmapBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(LimitStep::new(n))
}
pub fn skip(self, n: usize) -> CowMmapBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(SkipStep::new(n))
}
}
pub struct CowMmapAddEdgeBuilder<'g> {
graph: &'g CowMmapGraph,
graph_arc: Arc<CowMmapGraph>,
label: String,
from: Option<VertexId>,
to: Option<VertexId>,
properties: HashMap<String, Value>,
}
impl<'g> CowMmapAddEdgeBuilder<'g> {
fn new(graph: &'g CowMmapGraph, graph_arc: Arc<CowMmapGraph>, label: String) -> Self {
Self {
graph,
graph_arc,
label,
from: None,
to: None,
properties: HashMap::new(),
}
}
pub fn from_id(mut self, id: VertexId) -> Self {
self.from = Some(id);
self
}
pub fn to_id(mut self, id: VertexId) -> Self {
self.to = Some(id);
self
}
pub fn property(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
self.properties.insert(key.into(), value.into());
self
}
pub fn next(self) -> Option<PersistentEdge> {
let from = self.from?;
let to = self.to?;
match self.graph.add_edge(from, to, &self.label, self.properties) {
Ok(id) => Some(GraphEdge::new(id, self.graph_arc)),
Err(_) => None,
}
}
pub fn iterate(self) {
let _ = self.next();
}
pub fn to_list(self) -> Vec<PersistentEdge> {
self.next().into_iter().collect()
}
}
pub struct CowMmapBoundAddEdgeBuilder<'g, In> {
graph: &'g CowMmapGraph,
graph_arc: Arc<CowMmapGraph>,
traversal: Traversal<In, Value>,
label: String,
to: Option<VertexId>,
properties: HashMap<String, Value>,
track_paths: bool,
}
impl<'g, In> CowMmapBoundAddEdgeBuilder<'g, In> {
fn new(
graph: &'g CowMmapGraph,
graph_arc: Arc<CowMmapGraph>,
traversal: Traversal<In, Value>,
label: String,
track_paths: bool,
) -> Self {
Self {
graph,
graph_arc,
traversal,
label,
to: None,
properties: HashMap::new(),
track_paths,
}
}
pub fn to_id(mut self, id: VertexId) -> Self {
self.to = Some(id);
self
}
pub fn property(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
self.properties.insert(key.into(), value.into());
self
}
pub fn to_list(self) -> Vec<Value> {
use crate::traversal::mutation::AddEStep;
let to_id = match self.to {
Some(id) => id,
None => return vec![],
};
let mut step = AddEStep::new(&self.label);
step = step.to_vertex(to_id);
for (k, v) in self.properties {
step = step.property(k, v);
}
let traversal: Traversal<In, Value> = self.traversal.add_step(step);
let bound: CowMmapBoundTraversal<'_, In, Value, EdgeMarker> = CowMmapBoundTraversal {
graph: self.graph,
graph_arc: self.graph_arc,
traversal,
track_paths: self.track_paths,
_marker: PhantomData,
};
bound.into_list_values()
}
pub fn next(self) -> Option<Value> {
self.to_list().into_iter().next()
}
pub fn iterate(self) {
let _ = self.to_list();
}
}
impl crate::graph_access::GraphAccess for Arc<CowMmapGraph> {
fn get_vertex(&self, id: VertexId) -> Option<Vertex> {
self.snapshot().get_vertex(id)
}
fn get_edge(&self, id: EdgeId) -> Option<Edge> {
self.snapshot().get_edge(id)
}
fn out_edge_ids(&self, vertex: VertexId) -> Vec<EdgeId> {
self.snapshot().out_edges(vertex).map(|e| e.id).collect()
}
fn in_edge_ids(&self, vertex: VertexId) -> Vec<EdgeId> {
self.snapshot().in_edges(vertex).map(|e| e.id).collect()
}
fn set_vertex_property(
&self,
id: VertexId,
key: &str,
value: Value,
) -> Result<(), StorageError> {
CowMmapGraph::set_vertex_property(self, id, key, value)
}
fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) -> Result<(), StorageError> {
CowMmapGraph::set_edge_property(self, id, key, value)
}
fn add_edge(
&self,
src: VertexId,
dst: VertexId,
label: &str,
properties: HashMap<String, Value>,
) -> Result<EdgeId, StorageError> {
CowMmapGraph::add_edge(self, src, dst, label, properties)
}
fn remove_vertex(&self, id: VertexId) -> Result<(), StorageError> {
CowMmapGraph::remove_vertex(self, id)
}
fn remove_edge(&self, id: EdgeId) -> Result<(), StorageError> {
CowMmapGraph::remove_edge(self, id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn temp_db_path() -> (tempfile::TempDir, std::path::PathBuf) {
let dir = tempdir().unwrap();
let path = dir.path().join("test.db");
(dir, path)
}
#[test]
fn test_open_new_database() {
let (_dir, path) = temp_db_path();
let graph = CowMmapGraph::open(&path).unwrap();
assert_eq!(graph.vertex_count(), 0);
assert_eq!(graph.edge_count(), 0);
}
#[test]
fn test_add_vertex() {
let (_dir, path) = temp_db_path();
let graph = CowMmapGraph::open(&path).unwrap();
let id = graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Alice".into()))]),
)
.unwrap();
assert_eq!(graph.vertex_count(), 1);
let vertex = graph.get_vertex(id).unwrap();
assert_eq!(vertex.label, "person");
assert_eq!(
vertex.properties.get("name"),
Some(&Value::String("Alice".into()))
);
}
#[test]
fn test_add_edge() {
let (_dir, path) = temp_db_path();
let graph = CowMmapGraph::open(&path).unwrap();
let alice = graph.add_vertex("person", HashMap::new()).unwrap();
let bob = graph.add_vertex("person", HashMap::new()).unwrap();
let edge = graph.add_edge(alice, bob, "knows", HashMap::new()).unwrap();
assert_eq!(graph.edge_count(), 1);
let e = graph.get_edge(edge).unwrap();
assert_eq!(e.label, "knows");
assert_eq!(e.src, alice);
assert_eq!(e.dst, bob);
}
#[test]
fn test_snapshot_isolation() {
let (_dir, path) = temp_db_path();
let graph = CowMmapGraph::open(&path).unwrap();
graph.add_vertex("person", HashMap::new()).unwrap();
let snap1 = graph.snapshot();
graph.add_vertex("person", HashMap::new()).unwrap();
let snap2 = graph.snapshot();
assert_eq!(snap1.vertex_count(), 1);
assert_eq!(snap2.vertex_count(), 2);
assert_eq!(graph.vertex_count(), 2);
}
#[test]
fn test_snapshot_send_sync() {
let (_dir, path) = temp_db_path();
let graph = CowMmapGraph::open(&path).unwrap();
graph.add_vertex("person", HashMap::new()).unwrap();
let snapshot = graph.snapshot();
let handle = std::thread::spawn(move || {
assert_eq!(snapshot.vertex_count(), 1);
});
handle.join().unwrap();
}
#[test]
fn test_persistence() {
let (_dir, path) = temp_db_path();
{
let graph = CowMmapGraph::open(&path).unwrap();
graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Alice".into()))]),
)
.unwrap();
graph.checkpoint().unwrap();
}
{
let graph = CowMmapGraph::open(&path).unwrap();
assert_eq!(graph.vertex_count(), 1);
let vertex = graph.get_vertex(VertexId(0)).unwrap();
assert_eq!(vertex.label, "person");
assert_eq!(
vertex.properties.get("name"),
Some(&Value::String("Alice".into()))
);
}
}
#[test]
fn test_batch_operations() {
let (_dir, path) = temp_db_path();
let graph = CowMmapGraph::open(&path).unwrap();
let (alice, bob, edge) = graph
.batch(|ctx| {
let alice = ctx.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Alice".into()))]),
);
let bob = ctx.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Bob".into()))]),
);
let edge = ctx.add_edge(alice, bob, "knows", HashMap::new())?;
Ok((alice, bob, edge))
})
.unwrap();
assert_eq!(graph.vertex_count(), 2);
assert_eq!(graph.edge_count(), 1);
assert!(graph.get_vertex(alice).is_some());
assert!(graph.get_vertex(bob).is_some());
assert!(graph.get_edge(edge).is_some());
}
#[test]
fn test_batch_rollback() {
let (_dir, path) = temp_db_path();
let graph = CowMmapGraph::open(&path).unwrap();
graph.add_vertex("person", HashMap::new()).unwrap();
assert_eq!(graph.vertex_count(), 1);
let result: Result<(), BatchError> = graph.batch(|ctx| {
ctx.add_vertex("person", HashMap::new());
ctx.add_edge(VertexId(100), VertexId(101), "knows", HashMap::new())?;
Ok(())
});
assert!(result.is_err());
assert_eq!(graph.vertex_count(), 1);
}
#[test]
fn test_remove_vertex() {
let (_dir, path) = temp_db_path();
let graph = CowMmapGraph::open(&path).unwrap();
let alice = graph.add_vertex("person", HashMap::new()).unwrap();
let bob = graph.add_vertex("person", HashMap::new()).unwrap();
let _edge = graph.add_edge(alice, bob, "knows", HashMap::new()).unwrap();
assert_eq!(graph.vertex_count(), 2);
assert_eq!(graph.edge_count(), 1);
graph.remove_vertex(alice).unwrap();
assert_eq!(graph.vertex_count(), 1);
assert_eq!(graph.edge_count(), 0);
assert!(graph.get_vertex(alice).is_none());
assert!(graph.get_vertex(bob).is_some());
}
#[test]
fn test_remove_edge() {
let (_dir, path) = temp_db_path();
let graph = CowMmapGraph::open(&path).unwrap();
let alice = graph.add_vertex("person", HashMap::new()).unwrap();
let bob = graph.add_vertex("person", HashMap::new()).unwrap();
let edge = graph.add_edge(alice, bob, "knows", HashMap::new()).unwrap();
assert_eq!(graph.edge_count(), 1);
graph.remove_edge(edge).unwrap();
assert_eq!(graph.edge_count(), 0);
assert_eq!(graph.vertex_count(), 2);
}
#[test]
fn test_set_vertex_property() {
let (_dir, path) = temp_db_path();
let graph = CowMmapGraph::open(&path).unwrap();
let id = graph.add_vertex("person", HashMap::new()).unwrap();
graph
.set_vertex_property(id, "name", Value::String("Alice".into()))
.unwrap();
let vertex = graph.get_vertex(id).unwrap();
assert_eq!(
vertex.properties.get("name"),
Some(&Value::String("Alice".into()))
);
}
#[test]
fn test_adjacency_via_snapshot() {
let (_dir, path) = temp_db_path();
let graph = CowMmapGraph::open(&path).unwrap();
let alice = graph.add_vertex("person", HashMap::new()).unwrap();
let bob = graph.add_vertex("person", HashMap::new()).unwrap();
let charlie = graph.add_vertex("person", HashMap::new()).unwrap();
graph.add_edge(alice, bob, "knows", HashMap::new()).unwrap();
graph
.add_edge(alice, charlie, "knows", HashMap::new())
.unwrap();
let snapshot = graph.snapshot();
let out_edges: Vec<_> = snapshot.out_edges(alice).collect();
assert_eq!(out_edges.len(), 2);
let in_edges: Vec<_> = snapshot.in_edges(bob).collect();
assert_eq!(in_edges.len(), 1);
assert_eq!(in_edges[0].src, alice);
}
#[test]
fn test_to_vertex_list_returns_persistent_vertices() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Alice".into()))]),
)
.unwrap();
graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Bob".into()))]),
)
.unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let vertices = g.v().to_list();
assert_eq!(vertices.len(), 2);
for v in &vertices {
assert_eq!(v.label(), Some("person".to_string()));
assert!(v.property("name").is_some());
}
}
#[test]
fn test_next_vertex_returns_persistent_vertex() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Alice".into()))]),
)
.unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let v = g.v().next().unwrap();
assert_eq!(v.label(), Some("person".to_string()));
assert_eq!(v.property("name"), Some(Value::String("Alice".into())));
}
#[test]
fn test_next_vertex_returns_none_when_empty() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
let g = graph.gremlin(Arc::clone(&graph));
let v = g.v().next();
assert!(v.is_none());
}
#[test]
fn test_one_vertex_success() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Alice".into()))]),
)
.unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let v = g.v().one().unwrap();
assert_eq!(v.label(), Some("person".to_string()));
}
#[test]
fn test_one_vertex_fails_when_multiple() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
graph.add_vertex("person", HashMap::new()).unwrap();
graph.add_vertex("person", HashMap::new()).unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let result = g.v().one();
assert!(result.is_err());
}
#[test]
fn test_one_vertex_fails_when_empty() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
let g = graph.gremlin(Arc::clone(&graph));
let result = g.v().one();
assert!(result.is_err());
}
#[test]
fn test_to_edge_list_returns_persistent_edges() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
let alice = graph.add_vertex("person", HashMap::new()).unwrap();
let bob = graph.add_vertex("person", HashMap::new()).unwrap();
graph.add_edge(alice, bob, "knows", HashMap::new()).unwrap();
graph
.add_edge(bob, alice, "knows_too", HashMap::new())
.unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let edges = g.e().to_list();
assert_eq!(edges.len(), 2);
for e in &edges {
assert!(e.label().is_some());
assert!(e.out_v().is_some());
assert!(e.in_v().is_some());
}
}
#[test]
fn test_next_edge_returns_persistent_edge() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
let alice = graph.add_vertex("person", HashMap::new()).unwrap();
let bob = graph.add_vertex("person", HashMap::new()).unwrap();
graph.add_edge(alice, bob, "knows", HashMap::new()).unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let e = g.e().next().unwrap();
assert_eq!(e.label(), Some("knows".to_string()));
let src = e.out_v().unwrap();
let dst = e.in_v().unwrap();
assert_eq!(src.id(), alice);
assert_eq!(dst.id(), bob);
}
#[test]
fn test_next_edge_returns_none_when_empty() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
let g = graph.gremlin(Arc::clone(&graph));
let e = g.e().next();
assert!(e.is_none());
}
#[test]
fn test_one_edge_success() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
let alice = graph.add_vertex("person", HashMap::new()).unwrap();
let bob = graph.add_vertex("person", HashMap::new()).unwrap();
graph.add_edge(alice, bob, "knows", HashMap::new()).unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let e = g.e().one().unwrap();
assert_eq!(e.label(), Some("knows".to_string()));
}
#[test]
fn test_one_edge_fails_when_multiple() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
let alice = graph.add_vertex("person", HashMap::new()).unwrap();
let bob = graph.add_vertex("person", HashMap::new()).unwrap();
graph.add_edge(alice, bob, "knows", HashMap::new()).unwrap();
graph.add_edge(bob, alice, "knows", HashMap::new()).unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let result = g.e().one();
assert!(result.is_err());
}
#[test]
fn test_persistent_vertex_traversal() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
let alice = graph.add_vertex("person", HashMap::new()).unwrap();
let bob = graph.add_vertex("person", HashMap::new()).unwrap();
graph.add_edge(alice, bob, "knows", HashMap::new()).unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let v = g.v_id(alice).next().unwrap();
let neighbors: Vec<_> = v.out("knows").to_list();
assert_eq!(neighbors.len(), 1);
assert_eq!(neighbors[0].id(), bob);
}
#[test]
fn test_persistent_vertex_mutation() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
let alice = graph
.add_vertex("person", HashMap::from([("name".into(), "Alice".into())]))
.unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let v = g.v_id(alice).next().unwrap();
v.property_set("age", 30i64).unwrap();
let updated = graph.get_vertex(alice).unwrap();
assert_eq!(updated.properties.get("age"), Some(&Value::Int(30)));
}
#[test]
fn test_persistent_edge_mutation() {
let (_dir, path) = temp_db_path();
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
let alice = graph.add_vertex("person", HashMap::new()).unwrap();
let bob = graph.add_vertex("person", HashMap::new()).unwrap();
let edge_id = graph.add_edge(alice, bob, "knows", HashMap::new()).unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let e = g.e().next().unwrap();
e.property_set("since", 2020i64).unwrap();
let updated = graph.get_edge(edge_id).unwrap();
assert_eq!(updated.properties.get("since"), Some(&Value::Int(2020)));
}
#[test]
#[cfg(feature = "gremlin")]
fn test_gremlin_query() {
use crate::gremlin::ExecutionResult;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("gremlin_query.db");
let graph = CowMmapGraph::open(&path).unwrap();
graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Alice".to_string()))]),
)
.unwrap();
graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Bob".to_string()))]),
)
.unwrap();
let result = graph
.query("g.V().hasLabel('person').values('name').toList()")
.unwrap();
if let ExecutionResult::List(names) = result {
assert_eq!(names.len(), 2);
} else {
panic!("Expected List result");
}
}
#[test]
#[cfg(feature = "gremlin")]
fn test_gremlin_mutate_add_vertex() {
use crate::gremlin::ExecutionResult;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("gremlin_mutate.db");
let graph = CowMmapGraph::open(&path).unwrap();
assert_eq!(graph.vertex_count(), 0);
let result = graph
.mutate("g.addV('person').property('name', 'Alice')")
.unwrap();
if let ExecutionResult::List(values) = result {
assert_eq!(values.len(), 1);
assert!(
values[0].is_vertex(),
"Expected Vertex, got: {:?}",
values[0]
);
} else {
panic!("Expected List result");
}
assert_eq!(graph.vertex_count(), 1, "Vertex was not created");
let result = graph
.query("g.V().hasLabel('person').values('name').toList()")
.unwrap();
if let ExecutionResult::List(names) = result {
assert_eq!(names.len(), 1);
assert_eq!(names[0], Value::String("Alice".to_string()));
} else {
panic!("Expected List result");
}
}
#[test]
#[cfg(feature = "gremlin")]
fn test_gremlin_mutate_add_edge() {
use crate::gremlin::ExecutionResult;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("gremlin_edge.db");
let graph = CowMmapGraph::open(&path).unwrap();
let alice = graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Alice".to_string()))]),
)
.unwrap();
let bob = graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Bob".to_string()))]),
)
.unwrap();
assert_eq!(graph.edge_count(), 0);
let result = graph
.mutate(&format!("g.addE('knows').from({}).to({})", alice.0, bob.0))
.unwrap();
if let ExecutionResult::List(values) = result {
assert_eq!(values.len(), 1);
assert!(values[0].is_edge(), "Expected Edge, got: {:?}", values[0]);
} else {
panic!("Expected List result");
}
assert_eq!(graph.edge_count(), 1, "Edge was not created");
}
#[test]
#[cfg(feature = "gremlin")]
fn test_gremlin_mutate_drop() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("gremlin_drop.db");
let graph = CowMmapGraph::open(&path).unwrap();
let alice = graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Alice".to_string()))]),
)
.unwrap();
graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Bob".to_string()))]),
)
.unwrap();
assert_eq!(graph.vertex_count(), 2);
graph.mutate(&format!("g.V({}).drop()", alice.0)).unwrap();
assert_eq!(graph.vertex_count(), 1, "Vertex was not dropped");
}
#[test]
#[cfg(feature = "gremlin")]
fn test_gremlin_snapshot_query() {
use crate::gremlin::ExecutionResult;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("snapshot_query.db");
let graph = CowMmapGraph::open(&path).unwrap();
graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Alice".to_string()))]),
)
.unwrap();
let snapshot = graph.snapshot();
let result = snapshot
.query("g.V().hasLabel('person').values('name').toList()")
.unwrap();
if let ExecutionResult::List(names) = result {
assert_eq!(names.len(), 1);
assert_eq!(names[0], Value::String("Alice".to_string()));
} else {
panic!("Expected List result");
}
graph
.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Bob".to_string()))]),
)
.unwrap();
let result = snapshot.query("g.V().hasLabel('person').toList()").unwrap();
if let ExecutionResult::List(vertices) = result {
assert_eq!(vertices.len(), 1, "Snapshot should be isolated");
} else {
panic!("Expected List result");
}
let result = graph.query("g.V().hasLabel('person').toList()").unwrap();
if let ExecutionResult::List(vertices) = result {
assert_eq!(vertices.len(), 2, "Graph should have both vertices");
} else {
panic!("Expected List result");
}
}
#[test]
fn test_gremlin_fluent_api_mutations() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("fluent_mutations.db");
let graph = Arc::new(CowMmapGraph::open(&path).unwrap());
let g = graph.gremlin(Arc::clone(&graph));
let alice = g.add_v("Person").property("name", "Alice").next();
assert!(alice.is_some(), "Should return a PersistentVertex");
assert_eq!(graph.vertex_count(), 1, "Vertex should be created");
let bob = g.add_v("Person").property("name", "Bob").next();
assert!(bob.is_some());
assert_eq!(graph.vertex_count(), 2);
let alice_id = alice.unwrap().id();
let bob_id = bob.unwrap().id();
let edge = g.add_e("KNOWS").from_id(alice_id).to_id(bob_id).next();
assert!(edge.is_some(), "Should return a PersistentEdge");
assert_eq!(graph.edge_count(), 1, "Edge should be created");
let people = g.v().has_label("Person").to_list();
assert_eq!(people.len(), 2, "Should find 2 people");
let friends = g.v_id(alice_id).out_label("KNOWS").to_list();
assert_eq!(friends.len(), 1, "Alice should know 1 person");
assert_eq!(friends[0].id(), bob_id);
g.v_id(alice_id).property("age", 30i64).iterate();
let alice_vertex = graph.get_vertex(alice_id).unwrap();
assert_eq!(alice_vertex.properties.get("age"), Some(&Value::Int(30)));
g.v_id(bob_id).drop().iterate();
assert_eq!(graph.vertex_count(), 1, "Bob should be deleted");
assert_eq!(graph.edge_count(), 0, "Edge should be cascade deleted");
}
}