use std::collections::HashMap;
use std::ops::Bound;
use std::sync::Arc;
use im::HashMap as ImHashMap;
use parking_lot::RwLock;
use roaring::RoaringBitmap;
use crate::error::StorageError;
#[cfg(feature = "gql")]
use crate::gql::{self, GqlError};
use crate::graph_elements::{GraphEdge, GraphVertex, InMemoryEdge, InMemoryVertex};
use crate::index::{
BTreeIndex, ElementType, IndexError, IndexSpec, IndexType, PropertyIndex, RTreeIndex,
UniqueIndex,
};
use crate::schema::GraphSchema;
use crate::storage::interner::StringInterner;
use crate::storage::{Edge, GraphStorage, StreamableStorage, Vertex};
use crate::traversal::markers::{Edge as EdgeMarker, OutputMarker, Scalar, Vertex as VertexMarker};
use crate::traversal::mutation::{DropStep, PendingMutation, PropertyStep};
use crate::traversal::step::Step;
use crate::traversal::{
ExecutionContext, HasLabelStep, HasStep, HasValueStep, IdStep, InEStep, InStep, InVStep,
LabelStep, LimitStep, OutEStep, OutStep, OutVStep, SkipStep, Traversal, TraversalSource,
Traverser, ValuesStep,
};
use crate::value::{EdgeId, IntoVertexId, Value, VertexId};
use std::marker::PhantomData;
#[derive(Clone, Debug)]
pub(crate) struct NodeData {
pub id: VertexId,
pub label_id: u32,
pub properties: HashMap<String, Value>,
pub out_edges: Vec<EdgeId>,
pub in_edges: Vec<EdgeId>,
}
#[derive(Clone, Debug)]
pub(crate) struct EdgeData {
pub id: EdgeId,
pub label_id: u32,
pub src: VertexId,
pub dst: VertexId,
pub properties: HashMap<String, Value>,
}
#[derive(Clone)]
pub struct GraphState {
pub(crate) vertices: ImHashMap<VertexId, Arc<NodeData>>,
pub(crate) edges: ImHashMap<EdgeId, Arc<EdgeData>>,
pub(crate) vertex_labels: ImHashMap<u32, Arc<RoaringBitmap>>,
pub(crate) edge_labels: ImHashMap<u32, Arc<RoaringBitmap>>,
pub(crate) interner: Arc<RwLock<StringInterner>>,
pub(crate) version: u64,
pub(crate) next_vertex_id: u64,
pub(crate) next_edge_id: u64,
}
impl GraphState {
pub fn new() -> Self {
Self {
vertices: ImHashMap::new(),
edges: ImHashMap::new(),
vertex_labels: ImHashMap::new(),
edge_labels: ImHashMap::new(),
interner: Arc::new(RwLock::new(StringInterner::new())),
version: 0,
next_vertex_id: 0,
next_edge_id: 0,
}
}
}
impl Default for GraphState {
fn default() -> Self {
Self::new()
}
}
pub struct Graph {
state: Arc<RwLock<GraphState>>,
schema: RwLock<Option<GraphSchema>>,
indexes: RwLock<HashMap<String, Box<dyn PropertyIndex>>>,
#[cfg(feature = "full-text")]
text_indexes_vertex:
RwLock<HashMap<String, std::sync::Arc<dyn crate::storage::text::TextIndex>>>,
#[cfg(feature = "full-text")]
text_indexes_edge: RwLock<HashMap<String, std::sync::Arc<dyn crate::storage::text::TextIndex>>>,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
event_bus: std::sync::Arc<crate::storage::events::EventBus>,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
subscription_manager: std::sync::Arc<crate::traversal::reactive::SubscriptionManager>,
}
impl Graph {
pub fn new() -> Self {
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let event_bus = std::sync::Arc::new(crate::storage::events::EventBus::new());
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let subscription_manager = {
let eb = event_bus.clone();
let event_sub_fn: std::sync::Arc<
dyn Fn() -> std::sync::mpsc::Receiver<crate::storage::events::GraphEvent>
+ Send
+ Sync,
> = std::sync::Arc::new(move || eb.subscribe());
std::sync::Arc::new(crate::traversal::reactive::SubscriptionManager::new(
event_sub_fn,
))
};
Self {
state: Arc::new(RwLock::new(GraphState::new())),
schema: RwLock::new(None),
indexes: RwLock::new(HashMap::new()),
#[cfg(feature = "full-text")]
text_indexes_vertex: RwLock::new(HashMap::new()),
#[cfg(feature = "full-text")]
text_indexes_edge: RwLock::new(HashMap::new()),
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
event_bus,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
subscription_manager,
}
}
#[inline]
pub fn in_memory() -> Self {
Self::new()
}
#[inline]
pub fn new_arc() -> Arc<Self> {
Arc::new(Self::new())
}
#[inline]
pub fn in_memory_with_schema(schema: GraphSchema) -> Self {
Self::with_schema(schema)
}
pub fn with_schema(schema: GraphSchema) -> Self {
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let event_bus = std::sync::Arc::new(crate::storage::events::EventBus::new());
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let subscription_manager = {
let eb = event_bus.clone();
let event_sub_fn: std::sync::Arc<
dyn Fn() -> std::sync::mpsc::Receiver<crate::storage::events::GraphEvent>
+ Send
+ Sync,
> = std::sync::Arc::new(move || eb.subscribe());
std::sync::Arc::new(crate::traversal::reactive::SubscriptionManager::new(
event_sub_fn,
))
};
Self {
state: Arc::new(RwLock::new(GraphState::new())),
schema: RwLock::new(Some(schema)),
indexes: RwLock::new(HashMap::new()),
#[cfg(feature = "full-text")]
text_indexes_vertex: RwLock::new(HashMap::new()),
#[cfg(feature = "full-text")]
text_indexes_edge: RwLock::new(HashMap::new()),
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
event_bus,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
subscription_manager,
}
}
pub fn snapshot(&self) -> GraphSnapshot {
let state = self.state.read();
let interner_snapshot = Arc::new(state.interner.read().clone());
GraphSnapshot {
state: Arc::new((*state).clone()),
interner_snapshot,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
subscription_manager: self.subscription_manager.clone(),
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
snapshot_fn: {
let state_arc = self.state.clone();
std::sync::Arc::new(move || {
let state = state_arc.read();
let interner_snapshot = Arc::new(state.interner.read().clone());
let snap = GraphSnapshot {
state: Arc::new((*state).clone()),
interner_snapshot,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
subscription_manager: std::sync::Arc::new(
crate::traversal::reactive::SubscriptionManager::placeholder(),
),
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
snapshot_fn: std::sync::Arc::new(|| {
panic!("nested snapshot_fn not supported")
}),
};
Box::new(snap)
})
},
}
}
pub fn as_storage_mut(&self) -> GraphMutWrapper<'_> {
GraphMutWrapper { graph: self }
}
pub fn gremlin(&self, graph_arc: Arc<Graph>) -> CowTraversalSource<'_> {
CowTraversalSource::new_with_arc(self, graph_arc)
}
pub fn typed_gremlin<'a>(
&self,
snapshot: &'a GraphSnapshot,
graph_arc: Arc<Graph>,
) -> crate::traversal::typed::TypedTraversalSource<'a> {
crate::traversal::typed::TypedTraversalSource::new(snapshot, graph_arc)
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
pub fn event_bus(&self) -> &crate::storage::events::EventBus {
&self.event_bus
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
pub fn subscription_manager(
&self,
) -> &std::sync::Arc<crate::traversal::reactive::SubscriptionManager> {
&self.subscription_manager
}
pub fn vertex_count(&self) -> u64 {
self.state.read().vertices.len() as u64
}
pub fn edge_count(&self) -> u64 {
self.state.read().edges.len() as u64
}
pub fn version(&self) -> u64 {
self.state.read().version
}
pub fn schema(&self) -> Option<GraphSchema> {
self.schema.read().clone()
}
pub fn set_schema(&self, schema: Option<GraphSchema>) {
*self.schema.write() = schema;
}
pub fn create_index(&self, spec: IndexSpec) -> Result<(), IndexError> {
let mut indexes = self.indexes.write();
if indexes.contains_key(&spec.name) {
return Err(IndexError::AlreadyExists(spec.name.clone()));
}
let mut index: Box<dyn PropertyIndex> = match spec.index_type {
IndexType::BTree => Box::new(BTreeIndex::new(spec.clone())?),
IndexType::Unique => Box::new(UniqueIndex::new(spec.clone())?),
IndexType::RTree => Box::new(RTreeIndex::new(spec.clone())?),
};
let state = self.state.read();
Self::populate_index_internal(&state, &mut *index)?;
indexes.insert(spec.name.clone(), index);
Ok(())
}
pub fn drop_index(&self, name: &str) -> Result<(), IndexError> {
self.indexes
.write()
.remove(name)
.map(|_| ())
.ok_or_else(|| IndexError::NotFound(name.to_string()))
}
pub fn list_indexes(&self) -> Vec<IndexSpec> {
self.indexes
.read()
.values()
.map(|idx| idx.spec().clone())
.collect()
}
pub fn has_index(&self, name: &str) -> bool {
self.indexes.read().contains_key(name)
}
pub fn index_count(&self) -> usize {
self.indexes.read().len()
}
pub fn supports_indexes(&self) -> bool {
true
}
#[cfg(feature = "full-text")]
pub fn create_text_index_v(
&self,
property: &str,
config: crate::storage::text::TextIndexConfig,
) -> Result<(), crate::storage::text::TextIndexError> {
self.create_text_index_inner(crate::index::ElementType::Vertex, property, config)
}
#[cfg(feature = "full-text")]
pub fn create_text_index_e(
&self,
property: &str,
config: crate::storage::text::TextIndexConfig,
) -> Result<(), crate::storage::text::TextIndexError> {
self.create_text_index_inner(crate::index::ElementType::Edge, property, config)
}
#[cfg(feature = "full-text")]
fn create_text_index_inner(
&self,
element_type: crate::index::ElementType,
property: &str,
config: crate::storage::text::TextIndexConfig,
) -> Result<(), crate::storage::text::TextIndexError> {
use crate::index::ElementType;
use crate::storage::text::{TantivyTextIndex, TextIndex, TextIndexError};
let mut vmap = self.text_indexes_vertex.write();
let mut emap = self.text_indexes_edge.write();
if vmap.contains_key(property) || emap.contains_key(property) {
return Err(TextIndexError::Storage(StorageError::IndexError(format!(
"text index already exists for property `{property}` (property names are \
globally unique across vertex and edge indexes)"
))));
}
let index = TantivyTextIndex::in_memory(element_type, config)?;
let arc: std::sync::Arc<dyn TextIndex> = std::sync::Arc::new(index);
let state = self.state.read();
match element_type {
ElementType::Vertex => {
for (vid, node) in state.vertices.iter() {
if let Some(Value::String(s)) = node.properties.get(property) {
arc.upsert(vid.0, s.as_str())?;
}
}
}
ElementType::Edge => {
for (eid, edge) in state.edges.iter() {
if let Some(Value::String(s)) = edge.properties.get(property) {
arc.upsert(eid.0, s.as_str())?;
}
}
}
}
drop(state);
arc.commit()?;
match element_type {
ElementType::Vertex => {
vmap.insert(property.to_string(), arc);
}
ElementType::Edge => {
emap.insert(property.to_string(), arc);
}
}
Ok(())
}
#[cfg(feature = "full-text")]
pub fn drop_text_index_v(
&self,
property: &str,
) -> Result<(), crate::storage::text::TextIndexError> {
use crate::storage::text::TextIndexError;
self.text_indexes_vertex
.write()
.remove(property)
.map(|_| ())
.ok_or_else(|| {
TextIndexError::Storage(StorageError::IndexError(format!(
"no vertex text index registered for property `{property}`"
)))
})
}
#[cfg(feature = "full-text")]
pub fn drop_text_index_e(
&self,
property: &str,
) -> Result<(), crate::storage::text::TextIndexError> {
use crate::storage::text::TextIndexError;
self.text_indexes_edge
.write()
.remove(property)
.map(|_| ())
.ok_or_else(|| {
TextIndexError::Storage(StorageError::IndexError(format!(
"no edge text index registered for property `{property}`"
)))
})
}
#[cfg(feature = "full-text")]
pub fn text_index_v(
&self,
property: &str,
) -> Option<std::sync::Arc<dyn crate::storage::text::TextIndex>> {
self.text_indexes_vertex.read().get(property).cloned()
}
#[cfg(feature = "full-text")]
pub fn text_index_e(
&self,
property: &str,
) -> Option<std::sync::Arc<dyn crate::storage::text::TextIndex>> {
self.text_indexes_edge.read().get(property).cloned()
}
#[cfg(feature = "full-text")]
pub fn has_text_index_v(&self, property: &str) -> bool {
self.text_indexes_vertex.read().contains_key(property)
}
#[cfg(feature = "full-text")]
pub fn has_text_index_e(&self, property: &str) -> bool {
self.text_indexes_edge.read().contains_key(property)
}
#[cfg(feature = "full-text")]
pub fn list_text_indexes_v(&self) -> Vec<String> {
self.text_indexes_vertex.read().keys().cloned().collect()
}
#[cfg(feature = "full-text")]
pub fn list_text_indexes_e(&self) -> Vec<String> {
self.text_indexes_edge.read().keys().cloned().collect()
}
#[cfg(feature = "full-text")]
pub fn text_index_count_v(&self) -> usize {
self.text_indexes_vertex.read().len()
}
#[cfg(feature = "full-text")]
pub fn text_index_count_e(&self) -> usize {
self.text_indexes_edge.read().len()
}
pub fn vertices_by_property(
&self,
label: Option<&str>,
property: &str,
value: &Value,
) -> Box<dyn Iterator<Item = Vertex> + '_> {
let indexes = self.indexes.read();
let state = self.state.read();
for index in indexes.values() {
let spec = index.spec();
if spec.element_type != ElementType::Vertex {
continue;
}
if spec.property != property {
continue;
}
match (&spec.label, label) {
(Some(idx_label), Some(filter_label)) if idx_label != filter_label => continue,
(Some(_), None) => continue, _ => {}
}
let ids: Vec<u64> = index.lookup_eq(value).collect();
let label_owned = label.map(|s| s.to_string());
drop(indexes);
drop(state);
return Box::new(
ids.into_iter()
.filter_map(move |id| {
let state = self.state.read();
let node = state.vertices.get(&VertexId(id))?;
let label = state.interner.read().resolve(node.label_id)?.to_string();
Some(Vertex {
id: node.id,
label,
properties: node.properties.clone(),
})
})
.filter(move |v| {
label_owned.is_none() || Some(v.label.as_str()) == label_owned.as_deref()
}),
);
}
drop(indexes);
drop(state);
let label_owned = label.map(|s| s.to_string());
let property_owned = property.to_string();
let value_clone = value.clone();
let vertices: Vec<Vertex> = self
.snapshot()
.all_vertices()
.filter(move |v| {
if let Some(ref l) = label_owned {
if &v.label != l {
return false;
}
}
v.properties.get(&property_owned) == Some(&value_clone)
})
.collect();
Box::new(vertices.into_iter())
}
pub fn edges_by_property(
&self,
label: Option<&str>,
property: &str,
value: &Value,
) -> Box<dyn Iterator<Item = Edge> + '_> {
let indexes = self.indexes.read();
let state = self.state.read();
for index in indexes.values() {
let spec = index.spec();
if spec.element_type != ElementType::Edge {
continue;
}
if spec.property != property {
continue;
}
match (&spec.label, label) {
(Some(idx_label), Some(filter_label)) if idx_label != filter_label => continue,
(Some(_), None) => continue, _ => {}
}
let ids: Vec<u64> = index.lookup_eq(value).collect();
let label_owned = label.map(|s| s.to_string());
drop(indexes);
drop(state);
return Box::new(
ids.into_iter()
.filter_map(move |id| {
let state = self.state.read();
let edge = state.edges.get(&EdgeId(id))?;
let label = state.interner.read().resolve(edge.label_id)?.to_string();
Some(Edge {
id: edge.id,
label,
src: edge.src,
dst: edge.dst,
properties: edge.properties.clone(),
})
})
.filter(move |e| {
label_owned.is_none() || Some(e.label.as_str()) == label_owned.as_deref()
}),
);
}
drop(indexes);
drop(state);
let label_owned = label.map(|s| s.to_string());
let property_owned = property.to_string();
let value_clone = value.clone();
let edges: Vec<Edge> = self
.snapshot()
.all_edges()
.filter(move |e| {
if let Some(ref l) = label_owned {
if &e.label != l {
return false;
}
}
e.properties.get(&property_owned) == Some(&value_clone)
})
.collect();
Box::new(edges.into_iter())
}
pub fn vertices_by_property_range(
&self,
label: Option<&str>,
property: &str,
start: Bound<&Value>,
end: Bound<&Value>,
) -> Box<dyn Iterator<Item = Vertex> + '_> {
let indexes = self.indexes.read();
for index in indexes.values() {
let spec = index.spec();
if spec.element_type != ElementType::Vertex {
continue;
}
if spec.property != property {
continue;
}
if spec.index_type != IndexType::BTree {
continue;
}
match (&spec.label, label) {
(Some(idx_label), Some(filter_label)) if idx_label != filter_label => continue,
(Some(_), None) => continue, _ => {}
}
let ids: Vec<u64> = index.lookup_range(start, end).collect();
let label_owned = label.map(|s| s.to_string());
drop(indexes);
return Box::new(
ids.into_iter()
.filter_map(move |id| {
let state = self.state.read();
let node = state.vertices.get(&VertexId(id))?;
let label = state.interner.read().resolve(node.label_id)?.to_string();
Some(Vertex {
id: node.id,
label,
properties: node.properties.clone(),
})
})
.filter(move |v| {
label_owned.is_none() || Some(v.label.as_str()) == label_owned.as_deref()
}),
);
}
drop(indexes);
let label_owned = label.map(|s| s.to_string());
let property_owned = property.to_string();
let start_clone = match start {
Bound::Included(v) => Bound::Included(v.clone()),
Bound::Excluded(v) => Bound::Excluded(v.clone()),
Bound::Unbounded => Bound::Unbounded,
};
let end_clone = match end {
Bound::Included(v) => Bound::Included(v.clone()),
Bound::Excluded(v) => Bound::Excluded(v.clone()),
Bound::Unbounded => Bound::Unbounded,
};
let vertices: Vec<Vertex> = self
.snapshot()
.all_vertices()
.filter(move |v| {
if let Some(ref l) = label_owned {
if &v.label != l {
return false;
}
}
if let Some(prop_value) = v.properties.get(&property_owned) {
let prop_cmp = prop_value.to_comparable();
let in_start = match &start_clone {
Bound::Included(s) => prop_cmp >= s.to_comparable(),
Bound::Excluded(s) => prop_cmp > s.to_comparable(),
Bound::Unbounded => true,
};
let in_end = match &end_clone {
Bound::Included(e) => prop_cmp <= e.to_comparable(),
Bound::Excluded(e) => prop_cmp < e.to_comparable(),
Bound::Unbounded => true,
};
in_start && in_end
} else {
false
}
})
.collect();
Box::new(vertices.into_iter())
}
fn populate_index_internal(
state: &GraphState,
index: &mut dyn PropertyIndex,
) -> Result<(), IndexError> {
let spec = index.spec().clone();
match spec.element_type {
ElementType::Vertex => {
for (id, node) in state.vertices.iter() {
if let Some(ref label) = spec.label {
let node_label = state
.interner
.read()
.resolve(node.label_id)
.map(|s| s.to_string());
if node_label.as_deref() != Some(label.as_str()) {
continue;
}
}
if let Some(value) = node.properties.get(&spec.property) {
index.insert(value.clone(), id.0)?;
}
}
}
ElementType::Edge => {
for (id, edge) in state.edges.iter() {
if let Some(ref label) = spec.label {
let edge_label = state
.interner
.read()
.resolve(edge.label_id)
.map(|s| s.to_string());
if edge_label.as_deref() != Some(label.as_str()) {
continue;
}
}
if let Some(value) = edge.properties.get(&spec.property) {
index.insert(value.clone(), id.0)?;
}
}
}
}
Ok(())
}
fn index_vertex_insert(&self, id: VertexId, label: &str, properties: &HashMap<String, Value>) {
let mut indexes = self.indexes.write();
for index in indexes.values_mut() {
let spec = index.spec();
if spec.element_type != ElementType::Vertex {
continue;
}
if let Some(ref idx_label) = spec.label {
if idx_label != label {
continue;
}
}
if let Some(value) = properties.get(&spec.property) {
let _ = index.insert(value.clone(), id.0);
}
}
}
fn index_vertex_remove(&self, id: VertexId, label: &str, properties: &HashMap<String, Value>) {
let mut indexes = self.indexes.write();
for index in indexes.values_mut() {
let spec = index.spec();
if spec.element_type != ElementType::Vertex {
continue;
}
if let Some(ref idx_label) = spec.label {
if idx_label != label {
continue;
}
}
if let Some(value) = properties.get(&spec.property) {
let _ = index.remove(value, id.0);
}
}
}
fn index_edge_insert(&self, id: EdgeId, label: &str, properties: &HashMap<String, Value>) {
let mut indexes = self.indexes.write();
for index in indexes.values_mut() {
let spec = index.spec();
if spec.element_type != ElementType::Edge {
continue;
}
if let Some(ref idx_label) = spec.label {
if idx_label != label {
continue;
}
}
if let Some(value) = properties.get(&spec.property) {
let _ = index.insert(value.clone(), id.0);
}
}
}
fn index_edge_remove(&self, id: EdgeId, label: &str, properties: &HashMap<String, Value>) {
let mut indexes = self.indexes.write();
for index in indexes.values_mut() {
let spec = index.spec();
if spec.element_type != ElementType::Edge {
continue;
}
if let Some(ref idx_label) = spec.label {
if idx_label != label {
continue;
}
}
if let Some(value) = properties.get(&spec.property) {
let _ = index.remove(value, id.0);
}
}
}
fn update_vertex_property_in_indexes(
&self,
id: VertexId,
label: &str,
property: &str,
old_value: Option<&Value>,
new_value: &Value,
) -> Result<(), StorageError> {
let mut indexes = self.indexes.write();
for index in indexes.values_mut() {
let spec = index.spec();
if spec.element_type != ElementType::Vertex {
continue;
}
if spec.property != property {
continue;
}
if let Some(ref idx_label) = spec.label {
if idx_label != label {
continue;
}
}
if let Some(old) = old_value {
let _ = index.remove(old, id.0);
}
index
.insert(new_value.clone(), id.0)
.map_err(|e| StorageError::IndexError(e.to_string()))?;
}
Ok(())
}
fn update_edge_property_in_indexes(
&self,
id: EdgeId,
label: &str,
property: &str,
old_value: Option<&Value>,
new_value: &Value,
) -> Result<(), StorageError> {
let mut indexes = self.indexes.write();
for index in indexes.values_mut() {
let spec = index.spec();
if spec.element_type != ElementType::Edge {
continue;
}
if spec.property != property {
continue;
}
if let Some(ref idx_label) = spec.label {
if idx_label != label {
continue;
}
}
if let Some(old) = old_value {
let _ = index.remove(old, id.0);
}
index
.insert(new_value.clone(), id.0)
.map_err(|e| StorageError::IndexError(e.to_string()))?;
}
Ok(())
}
#[cfg(feature = "full-text")]
fn text_index_vertex_insert(
&self,
id: VertexId,
properties: &HashMap<String, Value>,
) -> Result<(), crate::storage::text::TextIndexError> {
let indexes = self.text_indexes_vertex.read();
for (prop, idx) in indexes.iter() {
if let Some(Value::String(s)) = properties.get(prop) {
idx.upsert(id.0, s.as_str())?;
idx.commit()?;
}
}
Ok(())
}
#[cfg(feature = "full-text")]
fn text_index_vertex_remove(
&self,
id: VertexId,
) -> Result<(), crate::storage::text::TextIndexError> {
let indexes = self.text_indexes_vertex.read();
for idx in indexes.values() {
idx.delete(id.0)?;
idx.commit()?;
}
Ok(())
}
#[cfg(feature = "full-text")]
fn text_index_property_update(
&self,
id: VertexId,
property: &str,
new_value: &Value,
) -> Result<(), crate::storage::text::TextIndexError> {
let indexes = self.text_indexes_vertex.read();
let Some(idx) = indexes.get(property) else {
return Ok(());
};
match new_value {
Value::String(s) => {
idx.upsert(id.0, s.as_str())?;
idx.commit()
}
_ => {
idx.delete(id.0)?;
idx.commit()
}
}
}
#[cfg(feature = "full-text")]
fn text_index_edge_insert(
&self,
id: EdgeId,
properties: &HashMap<String, Value>,
) -> Result<(), crate::storage::text::TextIndexError> {
let indexes = self.text_indexes_edge.read();
for (prop, idx) in indexes.iter() {
if let Some(Value::String(s)) = properties.get(prop) {
idx.upsert(id.0, s.as_str())?;
idx.commit()?;
}
}
Ok(())
}
#[cfg(feature = "full-text")]
fn text_index_edge_remove(
&self,
id: EdgeId,
) -> Result<(), crate::storage::text::TextIndexError> {
let indexes = self.text_indexes_edge.read();
for idx in indexes.values() {
idx.delete(id.0)?;
idx.commit()?;
}
Ok(())
}
#[cfg(feature = "full-text")]
fn text_index_edge_property_update(
&self,
id: EdgeId,
property: &str,
new_value: &Value,
) -> Result<(), crate::storage::text::TextIndexError> {
let indexes = self.text_indexes_edge.read();
let Some(idx) = indexes.get(property) else {
return Ok(());
};
match new_value {
Value::String(s) => {
idx.upsert(id.0, s.as_str())?;
idx.commit()
}
_ => {
idx.delete(id.0)?;
idx.commit()
}
}
}
pub fn add_vertex(&self, label: &str, properties: HashMap<String, Value>) -> VertexId {
let mut state = self.state.write();
let id = VertexId(state.next_vertex_id);
state.next_vertex_id += 1;
let label_id = state.interner.write().intern(label);
let node = Arc::new(NodeData {
id,
label_id,
properties: properties.clone(),
out_edges: Vec::new(),
in_edges: Vec::new(),
});
state.vertices = state.vertices.update(id, node);
let bitmap = state
.vertex_labels
.get(&label_id)
.cloned()
.unwrap_or_else(|| Arc::new(RoaringBitmap::new()));
let mut new_bitmap = (*bitmap).clone();
new_bitmap.insert(id.0 as u32);
state.vertex_labels = state.vertex_labels.update(label_id, Arc::new(new_bitmap));
state.version += 1;
drop(state);
self.index_vertex_insert(id, label, &properties);
#[cfg(feature = "full-text")]
{
let _ = self.text_index_vertex_insert(id, &properties);
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if self.event_bus.subscriber_count() > 0 {
self.event_bus.emit(crate::storage::events::GraphEvent::VertexAdded {
id,
label: label.to_string(),
properties,
});
}
id
}
pub fn add_edge(
&self,
src: VertexId,
dst: VertexId,
label: &str,
properties: HashMap<String, Value>,
) -> Result<EdgeId, StorageError> {
let mut state = self.state.write();
if !state.vertices.contains_key(&src) {
return Err(StorageError::VertexNotFound(src));
}
if !state.vertices.contains_key(&dst) {
return Err(StorageError::VertexNotFound(dst));
}
let edge_id = EdgeId(state.next_edge_id);
state.next_edge_id += 1;
let label_id = state.interner.write().intern(label);
let edge = Arc::new(EdgeData {
id: edge_id,
label_id,
src,
dst,
properties: properties.clone(),
});
state.edges = state.edges.update(edge_id, edge);
if let Some(src_node) = state.vertices.get(&src) {
let mut new_src = (**src_node).clone();
new_src.out_edges.push(edge_id);
state.vertices = state.vertices.update(src, Arc::new(new_src));
}
if let Some(dst_node) = state.vertices.get(&dst) {
let mut new_dst = (**dst_node).clone();
new_dst.in_edges.push(edge_id);
state.vertices = state.vertices.update(dst, Arc::new(new_dst));
}
let bitmap = state
.edge_labels
.get(&label_id)
.cloned()
.unwrap_or_else(|| Arc::new(RoaringBitmap::new()));
let mut new_bitmap = (*bitmap).clone();
new_bitmap.insert(edge_id.0 as u32);
state.edge_labels = state.edge_labels.update(label_id, Arc::new(new_bitmap));
state.version += 1;
drop(state);
self.index_edge_insert(edge_id, label, &properties);
#[cfg(feature = "full-text")]
{
self.text_index_edge_insert(edge_id, &properties)
.map_err(|e| StorageError::IndexError(e.to_string()))?;
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if self.event_bus.subscriber_count() > 0 {
self.event_bus.emit(crate::storage::events::GraphEvent::EdgeAdded {
id: edge_id,
src,
dst,
label: label.to_string(),
properties,
});
}
Ok(edge_id)
}
pub fn set_vertex_property(
&self,
id: VertexId,
key: &str,
value: Value,
) -> Result<(), StorageError> {
let mut state = self.state.write();
let node = state
.vertices
.get(&id)
.ok_or(StorageError::VertexNotFound(id))?;
let label = state
.interner
.read()
.resolve(node.label_id)
.map(|s| s.to_string())
.unwrap_or_default();
let old_value = node.properties.get(key).cloned();
let mut new_node = (**node).clone();
new_node.properties.insert(key.to_string(), value.clone());
state.vertices = state.vertices.update(id, Arc::new(new_node));
state.version += 1;
drop(state);
self.update_vertex_property_in_indexes(id, &label, key, old_value.as_ref(), &value)?;
#[cfg(feature = "full-text")]
{
self.text_index_property_update(id, key, &value)
.map_err(|e| StorageError::IndexError(e.to_string()))?;
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if self.event_bus.subscriber_count() > 0 {
self.event_bus.emit(crate::storage::events::GraphEvent::VertexPropertyChanged {
id,
key: key.to_string(),
old_value,
new_value: value,
});
}
Ok(())
}
pub fn set_edge_property(
&self,
id: EdgeId,
key: &str,
value: Value,
) -> Result<(), StorageError> {
let mut state = self.state.write();
let edge = state.edges.get(&id).ok_or(StorageError::EdgeNotFound(id))?;
let label = state
.interner
.read()
.resolve(edge.label_id)
.map(|s| s.to_string())
.unwrap_or_default();
let old_value = edge.properties.get(key).cloned();
let mut new_edge = (**edge).clone();
new_edge.properties.insert(key.to_string(), value.clone());
state.edges = state.edges.update(id, Arc::new(new_edge));
state.version += 1;
drop(state);
self.update_edge_property_in_indexes(id, &label, key, old_value.as_ref(), &value)?;
#[cfg(feature = "full-text")]
{
self.text_index_edge_property_update(id, key, &value)
.map_err(|e| StorageError::IndexError(e.to_string()))?;
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if self.event_bus.subscriber_count() > 0 {
self.event_bus.emit(crate::storage::events::GraphEvent::EdgePropertyChanged {
id,
key: key.to_string(),
old_value,
new_value: value,
});
}
Ok(())
}
pub fn remove_vertex(&self, id: VertexId) -> Result<(), StorageError> {
let mut state = self.state.write();
let node = state
.vertices
.get(&id)
.ok_or(StorageError::VertexNotFound(id))?
.clone();
let label = state
.interner
.read()
.resolve(node.label_id)
.map(|s| s.to_string())
.unwrap_or_default();
let properties = node.properties.clone();
#[allow(clippy::type_complexity)]
let edges_to_remove: Vec<(EdgeId, VertexId, VertexId, String, HashMap<String, Value>)> = node
.out_edges
.iter()
.chain(node.in_edges.iter())
.filter_map(|&edge_id| {
state.edges.get(&edge_id).map(|e| {
let edge_label = state
.interner
.read()
.resolve(e.label_id)
.map(|s| s.to_string())
.unwrap_or_default();
(edge_id, e.src, e.dst, edge_label, e.properties.clone())
})
})
.collect();
state.vertices = state.vertices.without(&id);
if let Some(bitmap) = state.vertex_labels.get(&node.label_id) {
let mut new_bitmap = (**bitmap).clone();
new_bitmap.remove(id.0 as u32);
if new_bitmap.is_empty() {
state.vertex_labels = state.vertex_labels.without(&node.label_id);
} else {
state.vertex_labels = state
.vertex_labels
.update(node.label_id, Arc::new(new_bitmap));
}
}
for (edge_id, _, _, _, _) in &edges_to_remove {
Self::remove_edge_internal(&mut state, *edge_id, Some(id));
}
state.version += 1;
drop(state);
self.index_vertex_remove(id, &label, &properties);
#[cfg(feature = "full-text")]
{
let _ = self.text_index_vertex_remove(id);
}
for (edge_id, _edge_src, _edge_dst, edge_label, edge_props) in &edges_to_remove {
self.index_edge_remove(*edge_id, edge_label, edge_props);
#[cfg(feature = "full-text")]
{
let _ = self.text_index_edge_remove(*edge_id);
}
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if self.event_bus.subscriber_count() > 0 {
for (edge_id, edge_src, edge_dst, edge_label, _) in edges_to_remove {
self.event_bus.emit(crate::storage::events::GraphEvent::EdgeRemoved {
id: edge_id,
src: edge_src,
dst: edge_dst,
label: edge_label,
});
}
self.event_bus.emit(crate::storage::events::GraphEvent::VertexRemoved {
id,
label,
});
}
Ok(())
}
pub fn remove_edge(&self, id: EdgeId) -> Result<(), StorageError> {
let mut state = self.state.write();
let edge = state.edges.get(&id).ok_or(StorageError::EdgeNotFound(id))?;
let label = state
.interner
.read()
.resolve(edge.label_id)
.map(|s| s.to_string())
.unwrap_or_default();
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let (src, dst) = (edge.src, edge.dst);
let properties = edge.properties.clone();
Self::remove_edge_internal(&mut state, id, None);
state.version += 1;
drop(state);
self.index_edge_remove(id, &label, &properties);
#[cfg(feature = "full-text")]
{
let _ = self.text_index_edge_remove(id);
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if self.event_bus.subscriber_count() > 0 {
self.event_bus.emit(crate::storage::events::GraphEvent::EdgeRemoved {
id, src, dst, label,
});
}
Ok(())
}
fn remove_edge_internal(state: &mut GraphState, id: EdgeId, skip_vertex: Option<VertexId>) {
let Some(edge) = state.edges.get(&id).cloned() else {
return;
};
state.edges = state.edges.without(&id);
if let Some(bitmap) = state.edge_labels.get(&edge.label_id) {
let mut new_bitmap = (**bitmap).clone();
new_bitmap.remove(id.0 as u32);
if new_bitmap.is_empty() {
state.edge_labels = state.edge_labels.without(&edge.label_id);
} else {
state.edge_labels = state
.edge_labels
.update(edge.label_id, Arc::new(new_bitmap));
}
}
if skip_vertex != Some(edge.src) {
if let Some(src_node) = state.vertices.get(&edge.src) {
let mut new_src = (**src_node).clone();
new_src.out_edges.retain(|&e| e != id);
state.vertices = state.vertices.update(edge.src, Arc::new(new_src));
}
}
if skip_vertex != Some(edge.dst) {
if let Some(dst_node) = state.vertices.get(&edge.dst) {
let mut new_dst = (**dst_node).clone();
new_dst.in_edges.retain(|&e| e != id);
state.vertices = state.vertices.update(edge.dst, Arc::new(new_dst));
}
}
}
#[cfg(feature = "gql")]
pub fn gql(self: &Arc<Self>, query: &str) -> Result<Vec<Value>, GqlError> {
let stmt = gql::parse_statement(query)?;
if stmt.is_read_only() {
let snapshot = self.snapshot();
gql::compile_statement_with_graph(&stmt, &snapshot, Some(Arc::clone(self)))
.map_err(GqlError::Compile)
} else {
let mut wrapper = GraphMutWrapper {
graph: self.as_ref(),
};
let schema = self.schema();
gql::execute_mutation_with_schema(&stmt, &mut wrapper, schema.as_ref())
.map_err(|e| GqlError::Mutation(e.to_string()))
}
}
#[cfg(feature = "gql")]
pub fn gql_with_params(
self: &Arc<Self>,
query: &str,
params: &gql::Parameters,
) -> Result<Vec<Value>, GqlError> {
let stmt = gql::parse_statement(query)?;
if stmt.is_read_only() {
let snapshot = self.snapshot();
gql::compile_statement_with_params_and_graph(
&stmt,
&snapshot,
params,
Some(Arc::clone(self)),
)
.map_err(GqlError::Compile)
} else {
Err(GqlError::Mutation(
"Parameterized mutations are not yet supported".into(),
))
}
}
#[cfg(feature = "gql")]
pub fn ddl(&self, query: &str) -> Result<GraphSchema, GqlError> {
let stmt = gql::parse_statement(query)?;
let ddl = match stmt {
gql::Statement::Ddl(ddl) => ddl,
_ => {
return Err(GqlError::Compile(gql::CompileError::UnsupportedFeature(
"Expected DDL statement (CREATE TYPE, ALTER TYPE, DROP TYPE, SET SCHEMA VALIDATION)".into(),
)))
}
};
let mut schema = self.schema.read().clone().unwrap_or_default();
gql::execute_ddl(&mut schema, &ddl)
.map_err(|e| GqlError::Compile(gql::CompileError::UnsupportedFeature(e.to_string())))?;
*self.schema.write() = Some(schema.clone());
Ok(schema)
}
pub fn batch<F, T>(&self, f: F) -> Result<T, BatchError>
where
F: FnOnce(&mut BatchContext) -> Result<T, BatchError>,
{
let mut working_state = self.state.read().clone();
let mut ctx = BatchContext {
state: &mut working_state,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
pending_events: Vec::new(),
};
let result = f(&mut ctx)?;
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let pending_events = std::mem::take(&mut ctx.pending_events);
*self.state.write() = working_state;
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
if !pending_events.is_empty() && self.event_bus.subscriber_count() > 0 {
self.event_bus.emit(crate::storage::events::GraphEvent::Batch(pending_events));
}
Ok(result)
}
#[cfg(feature = "graphson")]
pub fn to_graphson(&self) -> Result<String, serde_json::Error> {
crate::graphson::to_string(&self.snapshot())
}
#[cfg(feature = "graphson")]
pub fn to_graphson_pretty(&self) -> Result<String, serde_json::Error> {
crate::graphson::to_string_pretty(&self.snapshot())
}
#[cfg(feature = "graphson")]
pub fn from_graphson(json: &str) -> crate::graphson::Result<Self> {
crate::graphson::from_str(json)
}
}
#[cfg(feature = "gremlin")]
impl Graph {
pub fn query(
self: &Arc<Self>,
query: &str,
) -> Result<crate::gremlin::ExecutionResult, crate::gremlin::GremlinError> {
let snapshot = self.snapshot();
let ast = crate::gremlin::parse(query)?;
let g = crate::traversal::GraphTraversalSource::from_snapshot_with_graph(
&snapshot,
Arc::clone(self),
);
let compiled = crate::gremlin::compile(&ast, &g)?;
Ok(compiled.execute())
}
#[cfg(feature = "gremlin")]
pub fn mutate(
&self,
query: &str,
) -> Result<crate::gremlin::ExecutionResult, crate::gremlin::GremlinError> {
use crate::gremlin::{compile, parse, ExecutionResult};
use crate::traversal::mutation::PendingMutation;
let ast = parse(query)?;
let snapshot = self.snapshot();
let g = snapshot.gremlin();
let compiled = compile(&ast, &g)?;
let terminal = compiled.terminal().cloned();
let raw_values = compiled.traversal.to_list();
let mut final_results = Vec::with_capacity(raw_values.len());
for value in raw_values {
if let Some(mutation) = PendingMutation::from_value(&value) {
let extract_id = value
.as_map()
.map(|m| m.contains_key("__extract_id"))
.unwrap_or(false);
let result = match mutation {
PendingMutation::AddVertex { label, properties } => {
let id = self.add_vertex(&label, properties);
Some(Value::Vertex(id))
}
PendingMutation::AddEdge {
label,
from,
to,
properties,
} => match self.add_edge(from, to, &label, properties) {
Ok(id) => Some(Value::Edge(id)),
Err(_) => None,
},
PendingMutation::SetVertexProperty { id, key, value } => {
if self.set_vertex_property(id, &key, value).is_ok() {
Some(Value::Vertex(id))
} else {
None
}
}
PendingMutation::SetEdgeProperty { id, key, value } => {
if self.set_edge_property(id, &key, value).is_ok() {
Some(Value::Edge(id))
} else {
None
}
}
PendingMutation::DropVertex { id } => {
let _ = self.remove_vertex(id);
None
}
PendingMutation::DropEdge { id } => {
let _ = self.remove_edge(id);
None
}
};
if let Some(result) = result {
if extract_id {
let id_value = match result {
Value::Vertex(vid) => Value::Int(vid.0 as i64),
Value::Edge(eid) => Value::Int(eid.0 as i64),
other => other,
};
final_results.push(id_value);
} else {
final_results.push(result);
}
}
} else {
final_results.push(value);
}
}
Ok(match terminal {
None | Some(crate::gremlin::TerminalStep::ToList { .. }) => {
ExecutionResult::List(final_results)
}
Some(crate::gremlin::TerminalStep::Next { count: None, .. }) => {
ExecutionResult::Single(final_results.into_iter().next())
}
Some(crate::gremlin::TerminalStep::Next { count: Some(n), .. }) => {
ExecutionResult::List(final_results.into_iter().take(n as usize).collect())
}
Some(crate::gremlin::TerminalStep::ToSet { .. }) => {
ExecutionResult::Set(final_results.into_iter().collect())
}
Some(crate::gremlin::TerminalStep::Iterate { .. }) => ExecutionResult::Unit,
Some(crate::gremlin::TerminalStep::HasNext { .. }) => {
ExecutionResult::Bool(!final_results.is_empty())
}
Some(crate::gremlin::TerminalStep::Explain { .. }) => {
ExecutionResult::List(final_results)
}
})
}
#[cfg(feature = "gremlin")]
pub fn execute_script(
self: &Arc<Self>,
script: &str,
) -> Result<crate::gremlin::ScriptResult, crate::gremlin::GremlinError> {
self.execute_script_with_context(script, crate::gremlin::VariableContext::new())
}
#[cfg(feature = "gremlin")]
pub fn execute_script_with_context(
self: &Arc<Self>,
script: &str,
context: crate::gremlin::VariableContext,
) -> Result<crate::gremlin::ScriptResult, crate::gremlin::GremlinError> {
use crate::gremlin::{parse_script, ExecutionResult, ScriptResult, Statement};
let ast = parse_script(script)?;
let mut ctx = context;
let mut last_result = ExecutionResult::Unit;
for statement in &ast.statements {
match statement {
Statement::Assignment {
name, traversal, ..
} => {
let snapshot = self.snapshot();
let g = crate::traversal::GraphTraversalSource::from_snapshot_with_graph(
&snapshot,
Arc::clone(self),
);
let compiled = crate::gremlin::compile_with_vars(traversal, &g, &ctx)?;
let terminal = compiled.terminal().cloned();
let raw_values = compiled.traversal.to_list();
let final_results = self.process_mutations(raw_values);
let result = match terminal {
Some(crate::gremlin::TerminalStep::Next { count: None, .. }) => {
ExecutionResult::Single(final_results.into_iter().next())
}
_ => ExecutionResult::List(final_results),
};
match result {
ExecutionResult::Single(Some(value)) => {
ctx.bind(name.clone(), value);
}
ExecutionResult::List(values) if values.len() == 1 => {
ctx.bind(name.clone(), values.into_iter().next().unwrap());
}
_ => {
return Err(crate::gremlin::GremlinError::Compile(
crate::gremlin::CompileError::InvalidArguments {
step: "assignment".to_string(),
message: format!(
"assignment to '{}' requires single value from .next()",
name
),
},
));
}
}
last_result = ExecutionResult::Unit;
}
Statement::Traversal { traversal, .. } => {
let snapshot = self.snapshot();
let g = crate::traversal::GraphTraversalSource::from_snapshot_with_graph(
&snapshot,
Arc::clone(self),
);
let compiled = crate::gremlin::compile_with_vars(traversal, &g, &ctx)?;
let terminal = compiled.terminal().cloned();
let raw_values = compiled.traversal.to_list();
let final_results = self.process_mutations(raw_values);
last_result = match terminal {
None | Some(crate::gremlin::TerminalStep::ToList { .. }) => {
ExecutionResult::List(final_results)
}
Some(crate::gremlin::TerminalStep::Next { count: None, .. }) => {
ExecutionResult::Single(final_results.into_iter().next())
}
Some(crate::gremlin::TerminalStep::Next { count: Some(n), .. }) => {
ExecutionResult::List(
final_results.into_iter().take(n as usize).collect(),
)
}
Some(crate::gremlin::TerminalStep::ToSet { .. }) => {
ExecutionResult::Set(final_results.into_iter().collect())
}
Some(crate::gremlin::TerminalStep::Iterate { .. }) => ExecutionResult::Unit,
Some(crate::gremlin::TerminalStep::HasNext { .. }) => {
ExecutionResult::Bool(!final_results.is_empty())
}
Some(crate::gremlin::TerminalStep::Explain { .. }) => {
ExecutionResult::List(final_results)
}
};
}
}
}
Ok(ScriptResult {
result: last_result,
variables: ctx,
})
}
#[cfg(feature = "gremlin")]
fn process_mutations(&self, raw_values: Vec<Value>) -> Vec<Value> {
use crate::traversal::mutation::PendingMutation;
let mut final_results = Vec::with_capacity(raw_values.len());
for value in raw_values {
if let Some(mutation) = PendingMutation::from_value(&value) {
let extract_id = value
.as_map()
.map(|m| m.contains_key("__extract_id"))
.unwrap_or(false);
let result = match mutation {
PendingMutation::AddVertex { label, properties } => {
let id = self.add_vertex(&label, properties);
Some(Value::Vertex(id))
}
PendingMutation::AddEdge {
label,
from,
to,
properties,
} => match self.add_edge(from, to, &label, properties) {
Ok(id) => Some(Value::Edge(id)),
Err(_) => None,
},
PendingMutation::SetVertexProperty { id, key, value } => {
if self.set_vertex_property(id, &key, value).is_ok() {
Some(Value::Vertex(id))
} else {
None
}
}
PendingMutation::SetEdgeProperty { id, key, value } => {
if self.set_edge_property(id, &key, value).is_ok() {
Some(Value::Edge(id))
} else {
None
}
}
PendingMutation::DropVertex { id } => {
let _ = self.remove_vertex(id);
None
}
PendingMutation::DropEdge { id } => {
let _ = self.remove_edge(id);
None
}
};
if let Some(result) = result {
if extract_id {
let id_value = match result {
Value::Vertex(vid) => Value::Int(vid.0 as i64),
Value::Edge(eid) => Value::Int(eid.0 as i64),
other => other,
};
final_results.push(id_value);
} else {
final_results.push(result);
}
}
} else {
final_results.push(value);
}
}
final_results
}
}
impl Default for Graph {
fn default() -> Self {
Self::new()
}
}
pub struct CowTraversalSource<'g> {
graph: &'g Graph,
graph_arc: Arc<Graph>,
}
impl<'g> CowTraversalSource<'g> {
pub fn new_with_arc(graph: &'g Graph, graph_arc: Arc<Graph>) -> Self {
Self { graph, graph_arc }
}
pub fn v(&self) -> CowBoundTraversal<'g, (), Value, VertexMarker> {
CowBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::AllVertices),
)
}
pub fn v_ids<I>(&self, ids: I) -> CowBoundTraversal<'g, (), Value, VertexMarker>
where
I: IntoIterator<Item = VertexId>,
{
CowBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::Vertices(ids.into_iter().collect())),
)
}
pub fn v_id(&self, id: VertexId) -> CowBoundTraversal<'g, (), Value, VertexMarker> {
self.v_ids([id])
}
pub fn v_ref(
&self,
vertex: impl IntoVertexId,
) -> CowBoundTraversal<'g, (), Value, VertexMarker> {
self.v_id(vertex.into_vertex_id())
}
#[cfg(feature = "full-text")]
pub fn search_text(
&self,
property: &str,
query: &str,
k: usize,
) -> Result<CowBoundTraversal<'g, (), Value, VertexMarker>, crate::storage::text::TextIndexError>
{
use crate::storage::text::TextQuery;
self.search_text_query(property, &TextQuery::Match(query.to_string()), k)
}
#[cfg(feature = "full-text")]
pub fn search_text_query(
&self,
property: &str,
query: &crate::storage::text::TextQuery,
k: usize,
) -> Result<CowBoundTraversal<'g, (), Value, VertexMarker>, crate::storage::text::TextIndexError>
{
let index = self.graph.text_index_v(property).ok_or_else(|| {
crate::storage::text::TextIndexError::Storage(crate::error::StorageError::IndexError(
format!("no vertex text index registered for property {property:?}"),
))
})?;
let hits = index.search(query, k)?;
let scored: Vec<(VertexId, f32)> = hits
.into_iter()
.filter_map(|h| h.element.as_vertex().map(|v| (v, h.score)))
.collect();
Ok(CowBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::VerticesWithTextScore(scored)),
))
}
#[cfg(feature = "full-text")]
pub fn search_text_e(
&self,
property: &str,
query: &str,
k: usize,
) -> Result<CowBoundTraversal<'g, (), Value, EdgeMarker>, crate::storage::text::TextIndexError>
{
use crate::storage::text::TextQuery;
self.search_text_query_e(property, &TextQuery::Match(query.to_string()), k)
}
#[cfg(feature = "full-text")]
pub fn search_text_query_e(
&self,
property: &str,
query: &crate::storage::text::TextQuery,
k: usize,
) -> Result<CowBoundTraversal<'g, (), Value, EdgeMarker>, crate::storage::text::TextIndexError>
{
let index = self.graph.text_index_e(property).ok_or_else(|| {
crate::storage::text::TextIndexError::Storage(crate::error::StorageError::IndexError(
format!("no edge text index registered for property {property:?}"),
))
})?;
let hits = index.search(query, k)?;
let scored: Vec<(EdgeId, f32)> = hits
.into_iter()
.filter_map(|h| h.element.as_edge().map(|e| (e, h.score)))
.collect();
Ok(CowBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::EdgesWithTextScore(scored)),
))
}
pub fn e(&self) -> CowBoundTraversal<'g, (), Value, EdgeMarker> {
CowBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::AllEdges),
)
}
pub fn e_ids<I>(&self, ids: I) -> CowBoundTraversal<'g, (), Value, EdgeMarker>
where
I: IntoIterator<Item = EdgeId>,
{
CowBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::Edges(ids.into_iter().collect())),
)
}
pub fn add_v(
&self,
label: impl Into<String>,
) -> CowBoundTraversal<'g, (), Value, VertexMarker> {
use crate::traversal::mutation::AddVStep;
let mut traversal = Traversal::<(), Value>::with_source(TraversalSource::Inject(vec![]));
traversal = traversal.add_step(AddVStep::new(label));
CowBoundTraversal::new_typed(self.graph, Arc::clone(&self.graph_arc), traversal)
}
pub fn add_e(&self, label: impl Into<String>) -> CowAddEdgeBuilder<'g> {
CowAddEdgeBuilder::new_with_arc(self.graph, Arc::clone(&self.graph_arc), label.into())
}
pub fn inject<I>(&self, values: I) -> CowBoundTraversal<'g, (), Value, Scalar>
where
I: IntoIterator<Item = Value>,
{
CowBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::Inject(values.into_iter().collect())),
)
}
pub fn v_untyped(&self) -> CowBoundTraversal<'g, (), Value, Scalar> {
CowBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::AllVertices),
)
}
pub fn v_ids_untyped<I>(&self, ids: I) -> CowBoundTraversal<'g, (), Value, Scalar>
where
I: IntoIterator<Item = VertexId>,
{
CowBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::Vertices(ids.into_iter().collect())),
)
}
pub fn e_untyped(&self) -> CowBoundTraversal<'g, (), Value, Scalar> {
CowBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::AllEdges),
)
}
pub fn e_ids_untyped<I>(&self, ids: I) -> CowBoundTraversal<'g, (), Value, Scalar>
where
I: IntoIterator<Item = EdgeId>,
{
CowBoundTraversal::new_typed(
self.graph,
Arc::clone(&self.graph_arc),
Traversal::with_source(TraversalSource::Edges(ids.into_iter().collect())),
)
}
}
pub struct CowBoundTraversal<'g, In, Out, Marker: OutputMarker = Scalar> {
graph: &'g Graph,
graph_arc: Arc<Graph>,
traversal: Traversal<In, Out>,
track_paths: bool,
_marker: PhantomData<Marker>,
}
impl<'g, In, Out, Marker: OutputMarker> CowBoundTraversal<'g, In, Out, Marker> {
pub(crate) fn new_typed(
graph: &'g Graph,
graph_arc: Arc<Graph>,
traversal: Traversal<In, Out>,
) -> Self {
Self {
graph,
graph_arc,
traversal,
track_paths: false,
_marker: PhantomData,
}
}
pub fn with_path(mut self) -> Self {
self.track_paths = true;
self
}
pub fn add_step_same<NewOut>(
self,
step: impl crate::traversal::step::Step,
) -> CowBoundTraversal<'g, In, NewOut, Marker> {
CowBoundTraversal {
graph: self.graph,
graph_arc: self.graph_arc,
traversal: self.traversal.add_step(step),
track_paths: self.track_paths,
_marker: PhantomData,
}
}
pub fn add_step<NewOut>(
self,
step: impl crate::traversal::step::Step,
) -> CowBoundTraversal<'g, In, NewOut, Marker> {
self.add_step_same(step)
}
pub fn add_step_with_marker<NewOut, NewMarker: OutputMarker>(
self,
step: impl crate::traversal::step::Step,
) -> CowBoundTraversal<'g, In, NewOut, NewMarker> {
CowBoundTraversal {
graph: self.graph,
graph_arc: self.graph_arc,
traversal: self.traversal.add_step(step),
track_paths: self.track_paths,
_marker: PhantomData,
}
}
pub fn append<Mid>(self, anon: Traversal<Out, Mid>) -> CowBoundTraversal<'g, In, Mid, Scalar> {
CowBoundTraversal {
graph: self.graph,
graph_arc: self.graph_arc,
traversal: self.traversal.append(anon),
track_paths: self.track_paths,
_marker: PhantomData,
}
}
fn execute_with_mutations(self) -> Vec<Value> {
use crate::traversal::step::StartStep;
use crate::traversal::traverser::TraversalSource;
let (source, steps) = self.traversal.into_steps();
let is_mutation_only =
matches!(&source, Some(TraversalSource::Inject(values)) if values.is_empty());
let results: Vec<Traverser> = if is_mutation_only {
let snapshot = self.graph.snapshot();
let interner = snapshot.interner();
let storage_ref: &dyn GraphStorage = &snapshot;
let ctx = if self.track_paths {
ExecutionContext::with_path_tracking(storage_ref, interner)
} else {
ExecutionContext::new(storage_ref, interner)
};
let mut current: Vec<Traverser> = Vec::new();
if !steps.is_empty() {
current = vec![Traverser::new(Value::Null)];
}
for step in &steps {
current = step
.apply_dyn(&ctx, Box::new(current.into_iter()))
.collect();
}
current
} else {
let snapshot = self.graph.snapshot();
let interner = snapshot.interner();
let storage_ref: &dyn GraphStorage = &snapshot;
let ctx = if self.track_paths {
ExecutionContext::with_path_tracking(storage_ref, interner)
} else {
ExecutionContext::new(storage_ref, interner)
};
let mut current: Vec<Traverser> = match source {
Some(src) => {
let start_step = StartStep::new(src);
start_step
.apply(&ctx, Box::new(std::iter::empty()))
.collect()
}
None => Vec::new(),
};
for step in &steps {
current = step
.apply_dyn(&ctx, Box::new(current.into_iter()))
.collect();
}
current
};
let mut wrapper = GraphMutWrapper { graph: self.graph };
let mut final_results = Vec::with_capacity(results.len());
for traverser in results {
if let Some(mutation) = PendingMutation::from_value(&traverser.value) {
let extract_id = traverser
.value
.as_map()
.map(|m| m.contains_key("__extract_id"))
.unwrap_or(false);
if let Some(result) = Self::execute_mutation(&mut wrapper, mutation) {
if extract_id {
let id_value = match result {
Value::Vertex(vid) => Value::Int(vid.0 as i64),
Value::Edge(eid) => Value::Int(eid.0 as i64),
other => other,
};
final_results.push(id_value);
} else {
final_results.push(result);
}
}
} else {
final_results.push(traverser.value);
}
}
final_results
}
fn execute_mutation(
wrapper: &mut GraphMutWrapper<'_>,
mutation: PendingMutation,
) -> Option<Value> {
use crate::storage::GraphStorageMut;
match mutation {
PendingMutation::AddVertex { label, properties } => {
let id = wrapper.add_vertex(&label, properties);
Some(Value::Vertex(id))
}
PendingMutation::AddEdge {
label,
from,
to,
properties,
} => match wrapper.add_edge(from, to, &label, properties) {
Ok(id) => Some(Value::Edge(id)),
Err(_) => None,
},
PendingMutation::SetVertexProperty { id, key, value } => {
wrapper.set_vertex_property(id, &key, value).ok()?;
Some(Value::Vertex(id))
}
PendingMutation::SetEdgeProperty { id, key, value } => {
wrapper.set_edge_property(id, &key, value).ok()?;
Some(Value::Edge(id))
}
PendingMutation::DropVertex { id } => {
wrapper.remove_vertex(id).ok()?;
None
}
PendingMutation::DropEdge { id } => {
wrapper.remove_edge(id).ok()?;
None
}
}
}
pub fn to_value_list(self) -> Vec<Value> {
self.execute_with_mutations()
}
pub fn next_value(self) -> Option<Value> {
self.execute_with_mutations().into_iter().next()
}
pub fn iterate(self) {
let _ = self.execute_with_mutations();
}
pub fn has_next(self) -> bool {
!self.execute_with_mutations().is_empty()
}
}
impl<'g, In, Out> CowBoundTraversal<'g, In, Out, VertexMarker> {
pub fn next(self) -> Option<InMemoryVertex> {
let graph_arc = Arc::clone(&self.graph_arc);
self.execute_with_mutations()
.into_iter()
.find_map(|v| match v {
Value::Vertex(id) => Some(GraphVertex::new(id, Arc::clone(&graph_arc))),
_ => None,
})
}
pub fn to_list(self) -> Vec<InMemoryVertex> {
let graph_arc = Arc::clone(&self.graph_arc);
self.execute_with_mutations()
.into_iter()
.filter_map(|v| match v {
Value::Vertex(id) => Some(GraphVertex::new(id, Arc::clone(&graph_arc))),
_ => None,
})
.collect()
}
pub fn one(self) -> Result<InMemoryVertex, crate::error::TraversalError> {
let graph_arc = Arc::clone(&self.graph_arc);
let ids: Vec<_> = self
.execute_with_mutations()
.into_iter()
.filter_map(|v| match v {
Value::Vertex(id) => Some(id),
_ => None,
})
.take(2)
.collect();
match ids.len() {
1 => Ok(GraphVertex::new(ids[0], graph_arc)),
n => Err(crate::error::TraversalError::NotOne(n)),
}
}
pub fn count(self) -> u64 {
self.execute_with_mutations()
.into_iter()
.filter(|v| matches!(v, Value::Vertex(_)))
.count() as u64
}
#[allow(clippy::mutable_key_type)]
pub fn to_set(self) -> std::collections::HashSet<InMemoryVertex> {
self.to_list().into_iter().collect()
}
}
impl<'g, In, Out> CowBoundTraversal<'g, In, Out, EdgeMarker> {
pub fn next(self) -> Option<InMemoryEdge> {
let graph_arc = Arc::clone(&self.graph_arc);
self.execute_with_mutations()
.into_iter()
.find_map(|v| match v {
Value::Edge(id) => Some(GraphEdge::new(id, Arc::clone(&graph_arc))),
_ => None,
})
}
pub fn to_list(self) -> Vec<InMemoryEdge> {
let graph_arc = Arc::clone(&self.graph_arc);
self.execute_with_mutations()
.into_iter()
.filter_map(|v| match v {
Value::Edge(id) => Some(GraphEdge::new(id, Arc::clone(&graph_arc))),
_ => None,
})
.collect()
}
pub fn one(self) -> Result<InMemoryEdge, crate::error::TraversalError> {
let graph_arc = Arc::clone(&self.graph_arc);
let ids: Vec<_> = self
.execute_with_mutations()
.into_iter()
.filter_map(|v| match v {
Value::Edge(id) => Some(id),
_ => None,
})
.take(2)
.collect();
match ids.len() {
1 => Ok(GraphEdge::new(ids[0], graph_arc)),
n => Err(crate::error::TraversalError::NotOne(n)),
}
}
pub fn count(self) -> u64 {
self.execute_with_mutations()
.into_iter()
.filter(|v| matches!(v, Value::Edge(_)))
.count() as u64
}
#[allow(clippy::mutable_key_type)]
pub fn to_set(self) -> std::collections::HashSet<InMemoryEdge> {
self.to_list().into_iter().collect()
}
}
impl<'g, In, Out> CowBoundTraversal<'g, In, Out, Scalar> {
pub fn next(self) -> Option<Value> {
self.execute_with_mutations().into_iter().next()
}
pub fn to_list(self) -> Vec<Value> {
self.execute_with_mutations()
}
pub fn one(self) -> Result<Value, crate::error::TraversalError> {
let results: Vec<_> = self.execute_with_mutations().into_iter().take(2).collect();
match results.len() {
1 => Ok(results.into_iter().next().unwrap()),
n => Err(crate::error::TraversalError::NotOne(n)),
}
}
pub fn count(self) -> u64 {
self.execute_with_mutations().len() as u64
}
pub fn to_set(self) -> std::collections::HashSet<Value> {
self.execute_with_mutations().into_iter().collect()
}
pub fn sum(self) -> Value {
let mut int_sum: i64 = 0;
let mut float_sum: f64 = 0.0;
let mut has_float = false;
for value in self.execute_with_mutations() {
match value {
Value::Int(n) => int_sum += n,
Value::Float(f) => {
has_float = true;
float_sum += f;
}
_ => {}
}
}
if has_float {
Value::Float(int_sum as f64 + float_sum)
} else {
Value::Int(int_sum)
}
}
}
impl<'g, In> CowBoundTraversal<'g, In, Value, VertexMarker> {
pub fn has_label(
self,
label: impl Into<String>,
) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(HasLabelStep::single(label))
}
pub fn has(self, key: impl Into<String>) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(HasStep::new(key.into()))
}
pub fn has_value(
self,
key: impl Into<String>,
value: Value,
) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(HasValueStep::new(key.into(), value))
}
pub fn out(self) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(OutStep::new())
}
pub fn out_label(
self,
label: impl Into<String>,
) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(OutStep::with_labels(vec![label.into()]))
}
pub fn in_(self) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(InStep::new())
}
pub fn in_label(
self,
label: impl Into<String>,
) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(InStep::with_labels(vec![label.into()]))
}
pub fn both(self) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(crate::traversal::BothStep::new())
}
pub fn out_e(self) -> CowBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_with_marker(OutEStep::new())
}
pub fn in_e(self) -> CowBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_with_marker(InEStep::new())
}
pub fn values(self, key: impl Into<String>) -> CowBoundTraversal<'g, In, Value, Scalar> {
self.add_step_with_marker(ValuesStep::new(key.into()))
}
pub fn property(
self,
key: impl Into<String>,
value: impl Into<Value>,
) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(PropertyStep::new(key.into(), value.into()))
}
pub fn drop(self) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(DropStep)
}
pub fn add_e(self, label: impl Into<String>) -> CowBoundAddEdgeBuilder<'g, In> {
CowBoundAddEdgeBuilder::new_with_arc(
self.graph,
self.graph_arc,
self.traversal,
label.into(),
self.track_paths,
)
}
pub fn limit(self, n: usize) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(LimitStep::new(n))
}
pub fn skip(self, n: usize) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_same(SkipStep::new(n))
}
pub fn id(self) -> CowBoundTraversal<'g, In, Value, Scalar> {
self.add_step_with_marker(IdStep)
}
pub fn label(self) -> CowBoundTraversal<'g, In, Value, Scalar> {
self.add_step_with_marker(LabelStep)
}
}
impl<'g, In> CowBoundTraversal<'g, In, Value, EdgeMarker> {
pub fn has_label(
self,
label: impl Into<String>,
) -> CowBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(HasLabelStep::single(label))
}
pub fn has(self, key: impl Into<String>) -> CowBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(HasStep::new(key.into()))
}
pub fn has_value(
self,
key: impl Into<String>,
value: Value,
) -> CowBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(HasValueStep::new(key.into(), value))
}
pub fn in_v(self) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_with_marker(InVStep)
}
pub fn out_v(self) -> CowBoundTraversal<'g, In, Value, VertexMarker> {
self.add_step_with_marker(OutVStep)
}
pub fn values(self, key: impl Into<String>) -> CowBoundTraversal<'g, In, Value, Scalar> {
self.add_step_with_marker(ValuesStep::new(key.into()))
}
pub fn property(
self,
key: impl Into<String>,
value: impl Into<Value>,
) -> CowBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(PropertyStep::new(key.into(), value.into()))
}
pub fn drop(self) -> CowBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(DropStep)
}
pub fn limit(self, n: usize) -> CowBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(LimitStep::new(n))
}
pub fn skip(self, n: usize) -> CowBoundTraversal<'g, In, Value, EdgeMarker> {
self.add_step_same(SkipStep::new(n))
}
pub fn id(self) -> CowBoundTraversal<'g, In, Value, Scalar> {
self.add_step_with_marker(IdStep)
}
pub fn label(self) -> CowBoundTraversal<'g, In, Value, Scalar> {
self.add_step_with_marker(LabelStep)
}
}
impl<'g, In> CowBoundTraversal<'g, In, Value, Scalar> {
pub fn has(self, key: impl Into<String>) -> CowBoundTraversal<'g, In, Value, Scalar> {
self.add_step_same(HasStep::new(key.into()))
}
pub fn limit(self, n: usize) -> CowBoundTraversal<'g, In, Value, Scalar> {
self.add_step_same(LimitStep::new(n))
}
pub fn skip(self, n: usize) -> CowBoundTraversal<'g, In, Value, Scalar> {
self.add_step_same(SkipStep::new(n))
}
}
pub struct CowAddEdgeBuilder<'g> {
graph: &'g Graph,
graph_arc: Arc<Graph>,
label: String,
from: Option<VertexId>,
to: Option<VertexId>,
properties: HashMap<String, Value>,
}
impl<'g> CowAddEdgeBuilder<'g> {
fn new_with_arc(graph: &'g Graph, graph_arc: Arc<Graph>, label: String) -> Self {
Self {
graph,
graph_arc,
label,
from: None,
to: None,
properties: HashMap::new(),
}
}
pub fn from_id(mut self, id: VertexId) -> Self {
self.from = Some(id);
self
}
pub fn to_id(mut self, id: VertexId) -> Self {
self.to = Some(id);
self
}
pub fn from(self, vertex: impl IntoVertexId) -> Self {
self.from_id(vertex.into_vertex_id())
}
pub fn to(self, vertex: impl IntoVertexId) -> Self {
self.to_id(vertex.into_vertex_id())
}
pub fn property(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
self.properties.insert(key.into(), value.into());
self
}
pub fn next(self) -> Option<InMemoryEdge> {
let from = self.from?;
let to = self.to?;
match self.graph.add_edge(from, to, &self.label, self.properties) {
Ok(id) => Some(GraphEdge::new(id, self.graph_arc)),
Err(_) => None,
}
}
pub fn iterate(self) {
let _ = self.next();
}
pub fn to_list(self) -> Vec<InMemoryEdge> {
self.next().into_iter().collect()
}
}
pub struct CowBoundAddEdgeBuilder<'g, In> {
graph: &'g Graph,
graph_arc: Arc<Graph>,
traversal: Traversal<In, Value>,
label: String,
to: Option<VertexId>,
properties: HashMap<String, Value>,
track_paths: bool,
}
impl<'g, In> CowBoundAddEdgeBuilder<'g, In> {
fn new_with_arc(
graph: &'g Graph,
graph_arc: Arc<Graph>,
traversal: Traversal<In, Value>,
label: String,
track_paths: bool,
) -> Self {
Self {
graph,
graph_arc,
traversal,
label,
to: None,
properties: HashMap::new(),
track_paths,
}
}
pub fn to_id(mut self, id: VertexId) -> Self {
self.to = Some(id);
self
}
pub fn to(self, vertex: impl IntoVertexId) -> Self {
self.to_id(vertex.into_vertex_id())
}
pub fn property(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
self.properties.insert(key.into(), value.into());
self
}
pub fn to_list(self) -> Vec<InMemoryEdge> {
use crate::traversal::mutation::AddEStep;
let to_id = match self.to {
Some(id) => id,
None => return vec![],
};
let mut step = AddEStep::new(&self.label);
step = step.to_vertex(to_id);
for (k, v) in self.properties {
step = step.property(k, v);
}
let traversal: Traversal<In, Value> = self.traversal.add_step(step);
let bound: CowBoundTraversal<'_, In, Value, EdgeMarker> = CowBoundTraversal {
graph: self.graph,
graph_arc: self.graph_arc,
traversal,
track_paths: self.track_paths,
_marker: PhantomData,
};
bound.to_list()
}
pub fn next(self) -> Option<InMemoryEdge> {
self.to_list().into_iter().next()
}
pub fn iterate(self) {
let _ = self.to_list();
}
}
#[derive(Clone)]
pub struct GraphSnapshot {
state: Arc<GraphState>,
interner_snapshot: Arc<StringInterner>,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
subscription_manager: std::sync::Arc<crate::traversal::reactive::SubscriptionManager>,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
snapshot_fn: std::sync::Arc<
dyn Fn() -> Box<dyn crate::traversal::context::SnapshotLike + Send> + Send + Sync,
>,
}
impl GraphSnapshot {
pub fn version(&self) -> u64 {
self.state.version
}
pub fn interner(&self) -> &StringInterner {
&self.interner_snapshot
}
#[inline]
pub fn arc_storage(&self) -> Arc<dyn GraphStorage + Send + Sync> {
Arc::new(self.clone())
}
#[inline]
pub fn arc_interner(&self) -> Arc<StringInterner> {
Arc::clone(&self.interner_snapshot)
}
pub fn gremlin(&self) -> crate::traversal::GraphTraversalSource<'_> {
crate::traversal::GraphTraversalSource::from_snapshot(self)
}
}
#[cfg(feature = "gremlin")]
impl GraphSnapshot {
pub fn query(
&self,
query: &str,
) -> Result<crate::gremlin::ExecutionResult, crate::gremlin::GremlinError> {
let ast = crate::gremlin::parse(query)?;
let g = self.gremlin();
let compiled = crate::gremlin::compile(&ast, &g)?;
Ok(compiled.execute())
}
}
impl crate::traversal::SnapshotLike for GraphSnapshot {
fn storage(&self) -> &dyn GraphStorage {
self
}
fn interner(&self) -> &StringInterner {
&self.interner_snapshot
}
fn as_dyn(&self) -> &dyn crate::traversal::SnapshotLike {
self
}
fn arc_storage(&self) -> std::sync::Arc<dyn GraphStorage + Send + Sync> {
self.arc_storage()
}
fn arc_interner(&self) -> std::sync::Arc<StringInterner> {
self.arc_interner()
}
fn arc_streamable(&self) -> std::sync::Arc<dyn StreamableStorage> {
self.arc_streamable()
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
fn subscription_manager(&self) -> Option<&crate::traversal::reactive::SubscriptionManager> {
Some(&self.subscription_manager)
}
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
fn reactive_snapshot_fn(
&self,
) -> Option<
std::sync::Arc<
dyn Fn() -> Box<dyn crate::traversal::context::SnapshotLike + Send> + Send + Sync,
>,
> {
Some(self.snapshot_fn.clone())
}
}
impl GraphStorage for GraphSnapshot {
fn get_vertex(&self, id: VertexId) -> Option<Vertex> {
let node = self.state.vertices.get(&id)?;
let label = self.interner_snapshot.resolve(node.label_id)?.to_string();
Some(Vertex {
id: node.id,
label,
properties: node.properties.clone(),
})
}
fn vertex_count(&self) -> u64 {
self.state.vertices.len() as u64
}
fn get_edge(&self, id: EdgeId) -> Option<Edge> {
let edge = self.state.edges.get(&id)?;
let label = self.interner_snapshot.resolve(edge.label_id)?.to_string();
Some(Edge {
id: edge.id,
label,
src: edge.src,
dst: edge.dst,
properties: edge.properties.clone(),
})
}
fn edge_count(&self) -> u64 {
self.state.edges.len() as u64
}
fn out_edges(&self, vertex: VertexId) -> Box<dyn Iterator<Item = Edge> + '_> {
let Some(node) = self.state.vertices.get(&vertex) else {
return Box::new(std::iter::empty());
};
let edges: Vec<Edge> = node
.out_edges
.iter()
.filter_map(|&eid| self.get_edge(eid))
.collect();
Box::new(edges.into_iter())
}
fn in_edges(&self, vertex: VertexId) -> Box<dyn Iterator<Item = Edge> + '_> {
let Some(node) = self.state.vertices.get(&vertex) else {
return Box::new(std::iter::empty());
};
let edges: Vec<Edge> = node
.in_edges
.iter()
.filter_map(|&eid| self.get_edge(eid))
.collect();
Box::new(edges.into_iter())
}
fn vertices_with_label(&self, label: &str) -> Box<dyn Iterator<Item = Vertex> + '_> {
let label_id = self.interner_snapshot.lookup(label);
let Some(id) = label_id else {
return Box::new(std::iter::empty());
};
let Some(bitmap) = self.state.vertex_labels.get(&id) else {
return Box::new(std::iter::empty());
};
let vertices: Vec<Vertex> = bitmap
.iter()
.filter_map(|vid| self.get_vertex(VertexId(vid as u64)))
.collect();
Box::new(vertices.into_iter())
}
fn edges_with_label(&self, label: &str) -> Box<dyn Iterator<Item = Edge> + '_> {
let label_id = self.interner_snapshot.lookup(label);
let Some(id) = label_id else {
return Box::new(std::iter::empty());
};
let Some(bitmap) = self.state.edge_labels.get(&id) else {
return Box::new(std::iter::empty());
};
let edges: Vec<Edge> = bitmap
.iter()
.filter_map(|eid| self.get_edge(EdgeId(eid as u64)))
.collect();
Box::new(edges.into_iter())
}
fn all_vertices(&self) -> Box<dyn Iterator<Item = Vertex> + '_> {
let vertices: Vec<Vertex> = self
.state
.vertices
.keys()
.filter_map(|&id| self.get_vertex(id))
.collect();
Box::new(vertices.into_iter())
}
fn all_edges(&self) -> Box<dyn Iterator<Item = Edge> + '_> {
let edges: Vec<Edge> = self
.state
.edges
.keys()
.filter_map(|&id| self.get_edge(id))
.collect();
Box::new(edges.into_iter())
}
fn interner(&self) -> &StringInterner {
&self.interner_snapshot
}
fn vertices_by_property(
&self,
label: Option<&str>,
property: &str,
value: &Value,
) -> Box<dyn Iterator<Item = Vertex> + '_> {
let label_owned = label.map(|s| s.to_string());
let property_owned = property.to_string();
let value_clone = value.clone();
let vertices: Vec<Vertex> = self
.all_vertices()
.filter(move |v| {
if let Some(ref l) = label_owned {
if &v.label != l {
return false;
}
}
v.properties.get(&property_owned) == Some(&value_clone)
})
.collect();
Box::new(vertices.into_iter())
}
fn edges_by_property(
&self,
label: Option<&str>,
property: &str,
value: &Value,
) -> Box<dyn Iterator<Item = Edge> + '_> {
let label_owned = label.map(|s| s.to_string());
let property_owned = property.to_string();
let value_clone = value.clone();
let edges: Vec<Edge> = self
.all_edges()
.filter(move |e| {
if let Some(ref l) = label_owned {
if &e.label != l {
return false;
}
}
e.properties.get(&property_owned) == Some(&value_clone)
})
.collect();
Box::new(edges.into_iter())
}
fn vertices_by_property_range(
&self,
label: Option<&str>,
property: &str,
start: Bound<&Value>,
end: Bound<&Value>,
) -> Box<dyn Iterator<Item = Vertex> + '_> {
let label_owned = label.map(|s| s.to_string());
let property_owned = property.to_string();
let start_clone = match start {
Bound::Included(v) => Bound::Included(v.clone()),
Bound::Excluded(v) => Bound::Excluded(v.clone()),
Bound::Unbounded => Bound::Unbounded,
};
let end_clone = match end {
Bound::Included(v) => Bound::Included(v.clone()),
Bound::Excluded(v) => Bound::Excluded(v.clone()),
Bound::Unbounded => Bound::Unbounded,
};
let vertices: Vec<Vertex> = self
.all_vertices()
.filter(move |v| {
if let Some(ref l) = label_owned {
if &v.label != l {
return false;
}
}
if let Some(prop_value) = v.properties.get(&property_owned) {
let prop_cmp = prop_value.to_comparable();
let in_start = match &start_clone {
Bound::Included(s) => prop_cmp >= s.to_comparable(),
Bound::Excluded(s) => prop_cmp > s.to_comparable(),
Bound::Unbounded => true,
};
let in_end = match &end_clone {
Bound::Included(e) => prop_cmp <= e.to_comparable(),
Bound::Excluded(e) => prop_cmp < e.to_comparable(),
Bound::Unbounded => true,
};
in_start && in_end
} else {
false
}
})
.collect();
Box::new(vertices.into_iter())
}
}
unsafe impl Send for GraphSnapshot {}
unsafe impl Sync for GraphSnapshot {}
impl StreamableStorage for GraphSnapshot {
fn stream_all_vertices(&self) -> Box<dyn Iterator<Item = VertexId> + Send> {
let vertices = self.state.vertices.clone();
Box::new(vertices.into_iter().map(|(id, _)| id))
}
fn stream_all_edges(&self) -> Box<dyn Iterator<Item = EdgeId> + Send> {
let edges = self.state.edges.clone();
Box::new(edges.into_iter().map(|(id, _)| id))
}
fn stream_vertices_with_label(&self, label: &str) -> Box<dyn Iterator<Item = VertexId> + Send> {
let label_id = self.interner_snapshot.lookup(label);
if let Some(lid) = label_id {
if let Some(bitmap) = self.state.vertex_labels.get(&lid) {
let bitmap_owned: RoaringBitmap = (**bitmap).clone();
return Box::new(bitmap_owned.into_iter().map(|id| VertexId(id as u64)));
}
}
Box::new(std::iter::empty())
}
fn stream_edges_with_label(&self, label: &str) -> Box<dyn Iterator<Item = EdgeId> + Send> {
let label_id = self.interner_snapshot.lookup(label);
if let Some(lid) = label_id {
if let Some(bitmap) = self.state.edge_labels.get(&lid) {
let bitmap_owned: RoaringBitmap = (**bitmap).clone();
return Box::new(bitmap_owned.into_iter().map(|id| EdgeId(id as u64)));
}
}
Box::new(std::iter::empty())
}
fn stream_out_edges(&self, vertex: VertexId) -> Box<dyn Iterator<Item = EdgeId> + Send> {
if let Some(node) = self.state.vertices.get(&vertex) {
let out_edges = node.out_edges.clone();
Box::new(out_edges.into_iter())
} else {
Box::new(std::iter::empty())
}
}
fn stream_in_edges(&self, vertex: VertexId) -> Box<dyn Iterator<Item = EdgeId> + Send> {
if let Some(node) = self.state.vertices.get(&vertex) {
let in_edges = node.in_edges.clone();
Box::new(in_edges.into_iter())
} else {
Box::new(std::iter::empty())
}
}
fn stream_out_neighbors(
&self,
vertex: VertexId,
label_ids: &[u32],
) -> Box<dyn Iterator<Item = VertexId> + Send> {
let state = Arc::clone(&self.state);
let label_ids_owned: Vec<u32> = label_ids.to_vec();
if let Some(node) = state.vertices.get(&vertex) {
let out_edges = node.out_edges.clone();
let state_for_iter = Arc::clone(&state);
Box::new(out_edges.into_iter().filter_map(move |edge_id| {
state_for_iter.edges.get(&edge_id).and_then(|edge| {
if label_ids_owned.is_empty() || label_ids_owned.contains(&edge.label_id) {
Some(edge.dst)
} else {
None
}
})
}))
} else {
Box::new(std::iter::empty())
}
}
fn stream_in_neighbors(
&self,
vertex: VertexId,
label_ids: &[u32],
) -> Box<dyn Iterator<Item = VertexId> + Send> {
let state = Arc::clone(&self.state);
let label_ids_owned: Vec<u32> = label_ids.to_vec();
if let Some(node) = state.vertices.get(&vertex) {
let in_edges = node.in_edges.clone();
let state_for_iter = Arc::clone(&state);
Box::new(in_edges.into_iter().filter_map(move |edge_id| {
state_for_iter.edges.get(&edge_id).and_then(|edge| {
if label_ids_owned.is_empty() || label_ids_owned.contains(&edge.label_id) {
Some(edge.src)
} else {
None
}
})
}))
} else {
Box::new(std::iter::empty())
}
}
}
impl GraphSnapshot {
#[inline]
pub fn arc_streamable(&self) -> Arc<dyn StreamableStorage> {
Arc::new(self.clone())
}
}
#[derive(Debug, thiserror::Error)]
pub enum BatchError {
#[error("storage error: {0}")]
Storage(#[from] StorageError),
#[error("{0}")]
Custom(String),
}
pub struct BatchContext<'a> {
state: &'a mut GraphState,
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
pub(crate) pending_events: Vec<crate::storage::events::GraphEvent>,
}
impl<'a> BatchContext<'a> {
pub fn add_vertex(&mut self, label: &str, properties: HashMap<String, Value>) -> VertexId {
let id = VertexId(self.state.next_vertex_id);
self.state.next_vertex_id += 1;
let label_id = self.state.interner.write().intern(label);
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let properties_clone = properties.clone();
let node = Arc::new(NodeData {
id,
label_id,
properties,
out_edges: Vec::new(),
in_edges: Vec::new(),
});
self.state.vertices = self.state.vertices.update(id, node);
let bitmap = self
.state
.vertex_labels
.get(&label_id)
.cloned()
.unwrap_or_else(|| Arc::new(RoaringBitmap::new()));
let mut new_bitmap = (*bitmap).clone();
new_bitmap.insert(id.0 as u32);
self.state.vertex_labels = self
.state
.vertex_labels
.update(label_id, Arc::new(new_bitmap));
self.state.version += 1;
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
self.pending_events.push(crate::storage::events::GraphEvent::VertexAdded {
id,
label: label.to_string(),
properties: properties_clone,
});
id
}
pub fn add_edge(
&mut self,
src: VertexId,
dst: VertexId,
label: &str,
properties: HashMap<String, Value>,
) -> Result<EdgeId, BatchError> {
if !self.state.vertices.contains_key(&src) {
return Err(BatchError::Storage(StorageError::VertexNotFound(src)));
}
if !self.state.vertices.contains_key(&dst) {
return Err(BatchError::Storage(StorageError::VertexNotFound(dst)));
}
let edge_id = EdgeId(self.state.next_edge_id);
self.state.next_edge_id += 1;
let label_id = self.state.interner.write().intern(label);
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
let properties_clone = properties.clone();
let edge = Arc::new(EdgeData {
id: edge_id,
label_id,
src,
dst,
properties,
});
self.state.edges = self.state.edges.update(edge_id, edge);
if let Some(src_node) = self.state.vertices.get(&src) {
let mut new_src = (**src_node).clone();
new_src.out_edges.push(edge_id);
self.state.vertices = self.state.vertices.update(src, Arc::new(new_src));
}
if let Some(dst_node) = self.state.vertices.get(&dst) {
let mut new_dst = (**dst_node).clone();
new_dst.in_edges.push(edge_id);
self.state.vertices = self.state.vertices.update(dst, Arc::new(new_dst));
}
let bitmap = self
.state
.edge_labels
.get(&label_id)
.cloned()
.unwrap_or_else(|| Arc::new(RoaringBitmap::new()));
let mut new_bitmap = (*bitmap).clone();
new_bitmap.insert(edge_id.0 as u32);
self.state.edge_labels = self
.state
.edge_labels
.update(label_id, Arc::new(new_bitmap));
self.state.version += 1;
#[cfg(all(feature = "reactive", not(target_arch = "wasm32")))]
self.pending_events.push(crate::storage::events::GraphEvent::EdgeAdded {
id: edge_id,
src,
dst,
label: label.to_string(),
properties: properties_clone,
});
Ok(edge_id)
}
pub fn get_vertex(&self, id: VertexId) -> Option<Vertex> {
let node = self.state.vertices.get(&id)?;
let label = self
.state
.interner
.read()
.resolve(node.label_id)?
.to_string();
Some(Vertex {
id: node.id,
label,
properties: node.properties.clone(),
})
}
}
pub struct GraphMutWrapper<'a> {
graph: &'a Graph,
}
impl<'a> GraphStorage for GraphMutWrapper<'a> {
fn get_vertex(&self, id: VertexId) -> Option<Vertex> {
let state = self.graph.state.read();
let node = state.vertices.get(&id)?;
let label = state.interner.read().resolve(node.label_id)?.to_string();
Some(Vertex {
id: node.id,
label,
properties: node.properties.clone(),
})
}
fn vertex_count(&self) -> u64 {
self.graph.state.read().vertices.len() as u64
}
fn get_edge(&self, id: EdgeId) -> Option<Edge> {
let state = self.graph.state.read();
let edge = state.edges.get(&id)?;
let label = state.interner.read().resolve(edge.label_id)?.to_string();
Some(Edge {
id: edge.id,
label,
src: edge.src,
dst: edge.dst,
properties: edge.properties.clone(),
})
}
fn edge_count(&self) -> u64 {
self.graph.state.read().edges.len() as u64
}
fn out_edges(&self, vertex: VertexId) -> Box<dyn Iterator<Item = Edge> + '_> {
let state = self.graph.state.read();
let Some(node) = state.vertices.get(&vertex) else {
return Box::new(std::iter::empty());
};
let edge_ids: Vec<EdgeId> = node.out_edges.clone();
drop(state);
let edges: Vec<Edge> = edge_ids
.into_iter()
.filter_map(|eid| self.get_edge(eid))
.collect();
Box::new(edges.into_iter())
}
fn in_edges(&self, vertex: VertexId) -> Box<dyn Iterator<Item = Edge> + '_> {
let state = self.graph.state.read();
let Some(node) = state.vertices.get(&vertex) else {
return Box::new(std::iter::empty());
};
let edge_ids: Vec<EdgeId> = node.in_edges.clone();
drop(state);
let edges: Vec<Edge> = edge_ids
.into_iter()
.filter_map(|eid| self.get_edge(eid))
.collect();
Box::new(edges.into_iter())
}
fn vertices_with_label(&self, label: &str) -> Box<dyn Iterator<Item = Vertex> + '_> {
let state = self.graph.state.read();
let label_id = state.interner.read().lookup(label);
let Some(id) = label_id else {
return Box::new(std::iter::empty());
};
let Some(bitmap) = state.vertex_labels.get(&id) else {
return Box::new(std::iter::empty());
};
let vertex_ids: Vec<u64> = bitmap.iter().map(|v| v as u64).collect();
drop(state);
let vertices: Vec<Vertex> = vertex_ids
.into_iter()
.filter_map(|vid| self.get_vertex(VertexId(vid)))
.collect();
Box::new(vertices.into_iter())
}
fn edges_with_label(&self, label: &str) -> Box<dyn Iterator<Item = Edge> + '_> {
let state = self.graph.state.read();
let label_id = state.interner.read().lookup(label);
let Some(id) = label_id else {
return Box::new(std::iter::empty());
};
let Some(bitmap) = state.edge_labels.get(&id) else {
return Box::new(std::iter::empty());
};
let edge_ids: Vec<u64> = bitmap.iter().map(|e| e as u64).collect();
drop(state);
let edges: Vec<Edge> = edge_ids
.into_iter()
.filter_map(|eid| self.get_edge(EdgeId(eid)))
.collect();
Box::new(edges.into_iter())
}
fn all_vertices(&self) -> Box<dyn Iterator<Item = Vertex> + '_> {
let state = self.graph.state.read();
let vertex_ids: Vec<VertexId> = state.vertices.keys().copied().collect();
drop(state);
let vertices: Vec<Vertex> = vertex_ids
.into_iter()
.filter_map(|id| self.get_vertex(id))
.collect();
Box::new(vertices.into_iter())
}
fn all_edges(&self) -> Box<dyn Iterator<Item = Edge> + '_> {
let state = self.graph.state.read();
let edge_ids: Vec<EdgeId> = state.edges.keys().copied().collect();
drop(state);
let edges: Vec<Edge> = edge_ids
.into_iter()
.filter_map(|id| self.get_edge(id))
.collect();
Box::new(edges.into_iter())
}
fn interner(&self) -> &StringInterner {
unsafe {
let state = self.graph.state.read();
let guard = state.interner.read();
let ptr: *const StringInterner = &*guard;
std::mem::forget(guard);
&*ptr
}
}
}
impl<'a> crate::storage::GraphStorageMut for GraphMutWrapper<'a> {
fn add_vertex(&mut self, label: &str, properties: HashMap<String, Value>) -> VertexId {
self.graph.add_vertex(label, properties)
}
fn add_edge(
&mut self,
src: VertexId,
dst: VertexId,
label: &str,
properties: HashMap<String, Value>,
) -> Result<EdgeId, StorageError> {
self.graph.add_edge(src, dst, label, properties)
}
fn set_vertex_property(
&mut self,
id: VertexId,
key: &str,
value: Value,
) -> Result<(), StorageError> {
self.graph.set_vertex_property(id, key, value)
}
fn set_edge_property(
&mut self,
id: EdgeId,
key: &str,
value: Value,
) -> Result<(), StorageError> {
self.graph.set_edge_property(id, key, value)
}
fn remove_vertex(&mut self, id: VertexId) -> Result<(), StorageError> {
self.graph.remove_vertex(id)
}
fn remove_edge(&mut self, id: EdgeId) -> Result<(), StorageError> {
self.graph.remove_edge(id)
}
}
impl crate::graph_access::GraphAccess for Arc<Graph> {
fn get_vertex(&self, id: VertexId) -> Option<Vertex> {
self.snapshot().get_vertex(id)
}
fn get_edge(&self, id: EdgeId) -> Option<Edge> {
self.snapshot().get_edge(id)
}
fn out_edge_ids(&self, vertex: VertexId) -> Vec<EdgeId> {
self.snapshot().out_edges(vertex).map(|e| e.id).collect()
}
fn in_edge_ids(&self, vertex: VertexId) -> Vec<EdgeId> {
self.snapshot().in_edges(vertex).map(|e| e.id).collect()
}
fn set_vertex_property(
&self,
id: VertexId,
key: &str,
value: Value,
) -> Result<(), StorageError> {
Graph::set_vertex_property(self, id, key, value)
}
fn set_edge_property(&self, id: EdgeId, key: &str, value: Value) -> Result<(), StorageError> {
Graph::set_edge_property(self, id, key, value)
}
fn add_edge(
&self,
src: VertexId,
dst: VertexId,
label: &str,
properties: HashMap<String, Value>,
) -> Result<EdgeId, StorageError> {
Graph::add_edge(self, src, dst, label, properties)
}
fn remove_vertex(&self, id: VertexId) -> Result<(), StorageError> {
Graph::remove_vertex(self, id)
}
fn remove_edge(&self, id: EdgeId) -> Result<(), StorageError> {
Graph::remove_edge(self, id)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_graph_is_empty() {
let graph = Graph::new();
assert_eq!(graph.vertex_count(), 0);
assert_eq!(graph.edge_count(), 0);
assert_eq!(graph.version(), 0);
}
#[test]
fn test_add_vertex() {
let graph = Graph::new();
let id = graph.add_vertex("person", HashMap::new());
assert_eq!(id.0, 0);
assert_eq!(graph.vertex_count(), 1);
assert_eq!(graph.version(), 1);
}
#[test]
fn test_add_vertex_with_properties() {
let graph = Graph::new();
let props = HashMap::from([
("name".to_string(), Value::String("Alice".to_string())),
("age".to_string(), Value::Int(30)),
]);
let id = graph.add_vertex("person", props);
let snap = graph.snapshot();
let vertex = snap.get_vertex(id).unwrap();
assert_eq!(vertex.label, "person");
assert_eq!(
vertex.properties.get("name"),
Some(&Value::String("Alice".to_string()))
);
assert_eq!(vertex.properties.get("age"), Some(&Value::Int(30)));
}
#[test]
fn test_add_edge() {
let graph = Graph::new();
let v1 = graph.add_vertex("person", HashMap::new());
let v2 = graph.add_vertex("person", HashMap::new());
let edge_id = graph.add_edge(v1, v2, "knows", HashMap::new()).unwrap();
let snap = graph.snapshot();
let edge = snap.get_edge(edge_id).unwrap();
assert_eq!(edge.src, v1);
assert_eq!(edge.dst, v2);
assert_eq!(edge.label, "knows");
}
#[test]
fn test_add_edge_missing_source() {
let graph = Graph::new();
let v1 = graph.add_vertex("person", HashMap::new());
let result = graph.add_edge(VertexId(999), v1, "knows", HashMap::new());
assert!(matches!(result, Err(StorageError::VertexNotFound(_))));
}
#[test]
fn test_snapshot_isolation() {
let graph = Graph::new();
let v1 = graph.add_vertex(
"person",
HashMap::from([("name".to_string(), Value::String("Alice".to_string()))]),
);
let snap = graph.snapshot();
graph
.set_vertex_property(v1, "name", Value::String("Alicia".to_string()))
.unwrap();
let vertex = snap.get_vertex(v1).unwrap();
assert_eq!(
vertex.properties.get("name"),
Some(&Value::String("Alice".to_string()))
);
let snap2 = graph.snapshot();
let vertex2 = snap2.get_vertex(v1).unwrap();
assert_eq!(
vertex2.properties.get("name"),
Some(&Value::String("Alicia".to_string()))
);
}
#[test]
fn test_snapshot_survives_modification() {
let graph = Graph::new();
for i in 0..1000 {
graph.add_vertex("node", HashMap::from([("id".to_string(), Value::Int(i))]));
}
let snap = graph.snapshot();
assert_eq!(snap.vertex_count(), 1000);
for i in 1000..2000 {
graph.add_vertex("node", HashMap::from([("id".to_string(), Value::Int(i))]));
}
assert_eq!(snap.vertex_count(), 1000);
assert_eq!(graph.snapshot().vertex_count(), 2000);
}
#[test]
fn test_remove_vertex() {
let graph = Graph::new();
let v1 = graph.add_vertex("person", HashMap::new());
let v2 = graph.add_vertex("person", HashMap::new());
graph.add_edge(v1, v2, "knows", HashMap::new()).unwrap();
graph.remove_vertex(v1).unwrap();
assert_eq!(graph.vertex_count(), 1);
assert_eq!(graph.edge_count(), 0); }
#[test]
fn test_remove_edge() {
let graph = Graph::new();
let v1 = graph.add_vertex("person", HashMap::new());
let v2 = graph.add_vertex("person", HashMap::new());
let edge_id = graph.add_edge(v1, v2, "knows", HashMap::new()).unwrap();
graph.remove_edge(edge_id).unwrap();
assert_eq!(graph.edge_count(), 0);
assert_eq!(graph.vertex_count(), 2); }
#[test]
fn test_vertices_with_label() {
let graph = Graph::new();
graph.add_vertex("person", HashMap::new());
graph.add_vertex("person", HashMap::new());
graph.add_vertex("software", HashMap::new());
let snap = graph.snapshot();
let people: Vec<_> = snap.vertices_with_label("person").collect();
let software: Vec<_> = snap.vertices_with_label("software").collect();
assert_eq!(people.len(), 2);
assert_eq!(software.len(), 1);
}
#[test]
fn test_out_edges() {
let graph = Graph::new();
let v1 = graph.add_vertex("person", HashMap::new());
let v2 = graph.add_vertex("person", HashMap::new());
let v3 = graph.add_vertex("person", HashMap::new());
graph.add_edge(v1, v2, "knows", HashMap::new()).unwrap();
graph.add_edge(v1, v3, "knows", HashMap::new()).unwrap();
graph.add_edge(v2, v1, "knows", HashMap::new()).unwrap();
let snap = graph.snapshot();
let out: Vec<_> = snap.out_edges(v1).collect();
assert_eq!(out.len(), 2);
assert!(out.iter().all(|e| e.src == v1));
}
#[test]
fn test_in_edges() {
let graph = Graph::new();
let v1 = graph.add_vertex("person", HashMap::new());
let v2 = graph.add_vertex("person", HashMap::new());
let v3 = graph.add_vertex("person", HashMap::new());
graph.add_edge(v2, v1, "knows", HashMap::new()).unwrap();
graph.add_edge(v3, v1, "knows", HashMap::new()).unwrap();
graph.add_edge(v1, v2, "knows", HashMap::new()).unwrap();
let snap = graph.snapshot();
let in_e: Vec<_> = snap.in_edges(v1).collect();
assert_eq!(in_e.len(), 2);
assert!(in_e.iter().all(|e| e.dst == v1));
}
#[test]
fn test_batch_success() {
let graph = Graph::new();
graph
.batch(|ctx| {
let alice = ctx.add_vertex(
"Person",
HashMap::from([("name".to_string(), "Alice".into())]),
);
let bob = ctx.add_vertex(
"Person",
HashMap::from([("name".to_string(), "Bob".into())]),
);
ctx.add_edge(alice, bob, "knows", HashMap::new())?;
Ok(())
})
.unwrap();
assert_eq!(graph.vertex_count(), 2);
assert_eq!(graph.edge_count(), 1);
}
#[test]
fn test_batch_rollback_on_error() {
let graph = Graph::new();
graph.add_vertex("existing", HashMap::new());
let result: Result<(), BatchError> = graph.batch(|ctx| {
ctx.add_vertex("new", HashMap::new());
ctx.add_edge(VertexId(0), VertexId(999), "invalid", HashMap::new())?;
Ok(())
});
assert!(result.is_err());
assert_eq!(graph.vertex_count(), 1);
}
#[test]
fn test_snapshot_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<GraphSnapshot>();
assert_send_sync::<Graph>();
}
#[test]
fn test_snapshot_can_outlive_scope() {
let snap = {
let graph = Graph::new();
graph.add_vertex("person", HashMap::new());
graph.snapshot()
};
assert_eq!(snap.vertex_count(), 1);
}
#[test]
fn test_concurrent_snapshots() {
use std::sync::Arc;
use std::thread;
let graph = Arc::new(Graph::new());
for i in 0..100 {
graph.add_vertex("node", HashMap::from([("id".to_string(), Value::Int(i))]));
}
let handles: Vec<_> = (0..10)
.map(|_| {
let g = Arc::clone(&graph);
thread::spawn(move || {
let snap = g.snapshot();
snap.vertex_count()
})
})
.collect();
for handle in handles {
assert_eq!(handle.join().unwrap(), 100);
}
}
#[test]
fn test_self_loop_edge() {
let graph = Graph::new();
let v1 = graph.add_vertex("person", HashMap::new());
let e = graph.add_edge(v1, v1, "self", HashMap::new()).unwrap();
let snap = graph.snapshot();
let out: Vec<_> = snap.out_edges(v1).collect();
let in_e: Vec<_> = snap.in_edges(v1).collect();
assert_eq!(out.len(), 1);
assert_eq!(in_e.len(), 1);
assert_eq!(out[0].id, e);
assert_eq!(in_e[0].id, e);
}
#[test]
fn test_remove_vertex_with_self_loop() {
let graph = Graph::new();
let v1 = graph.add_vertex("person", HashMap::new());
graph.add_edge(v1, v1, "self", HashMap::new()).unwrap();
graph.remove_vertex(v1).unwrap();
assert_eq!(graph.vertex_count(), 0);
assert_eq!(graph.edge_count(), 0);
}
#[test]
fn test_cow_to_vertex_list() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
graph.add_vertex(
"person",
HashMap::from([("name".to_string(), "Alice".into())]),
);
graph.add_vertex(
"person",
HashMap::from([("name".to_string(), "Bob".into())]),
);
let g = graph.gremlin(Arc::clone(&graph));
let vertices = g.v().to_list();
assert_eq!(vertices.len(), 2);
let names: Vec<_> = vertices.iter().filter_map(|v| v.property("name")).collect();
assert_eq!(names.len(), 2);
}
#[test]
fn test_cow_next_vertex() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
graph.add_vertex(
"person",
HashMap::from([("name".to_string(), "Alice".into())]),
);
let g = graph.gremlin(Arc::clone(&graph));
let v = g.v().next();
assert!(v.is_some());
assert_eq!(
v.unwrap().property("name"),
Some(crate::value::Value::String("Alice".to_string()))
);
}
#[test]
fn test_cow_one_vertex() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
let id = graph.add_vertex("person", HashMap::new());
let g = graph.gremlin(Arc::clone(&graph));
let result = g.v().one();
assert!(result.is_ok());
assert_eq!(result.unwrap().id(), id);
graph.add_vertex("person", HashMap::new());
let g2 = graph.gremlin(Arc::clone(&graph));
let result = g2.v().one();
assert!(result.is_err());
}
#[test]
fn test_cow_to_edge_list() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
let a = graph.add_vertex("person", HashMap::new());
let b = graph.add_vertex("person", HashMap::new());
graph.add_edge(a, b, "knows", HashMap::new()).unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let edges = g.e().to_list();
assert_eq!(edges.len(), 1);
assert_eq!(edges[0].label(), Some("knows".to_string()));
}
#[test]
fn test_cow_next_edge() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
let a = graph.add_vertex("person", HashMap::new());
let b = graph.add_vertex("person", HashMap::new());
graph.add_edge(a, b, "knows", HashMap::new()).unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let e = g.e().next();
assert!(e.is_some());
assert_eq!(e.unwrap().label(), Some("knows".to_string()));
}
#[test]
fn test_cow_one_edge() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
let a = graph.add_vertex("person", HashMap::new());
let b = graph.add_vertex("person", HashMap::new());
let edge_id = graph.add_edge(a, b, "knows", HashMap::new()).unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let result = g.e().one();
assert!(result.is_ok());
assert_eq!(result.unwrap().id(), edge_id);
graph.add_edge(b, a, "knows", HashMap::new()).unwrap();
let g2 = graph.gremlin(Arc::clone(&graph));
let result = g2.e().one();
assert!(result.is_err());
}
#[test]
fn test_cow_typed_gremlin() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
graph.add_vertex(
"person",
HashMap::from([("name".to_string(), "Alice".into())]),
);
let snapshot = graph.snapshot();
let g = graph.typed_gremlin(&snapshot, Arc::clone(&graph));
let v = g.v().next();
assert!(v.is_some());
assert_eq!(
v.unwrap().property("name"),
Some(crate::value::Value::String("Alice".to_string()))
);
}
#[test]
fn test_cow_typed_vertex_traversal() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
let alice = graph.add_vertex(
"person",
HashMap::from([("name".to_string(), "Alice".into())]),
);
let bob = graph.add_vertex(
"person",
HashMap::from([("name".to_string(), "Bob".into())]),
);
graph.add_edge(alice, bob, "knows", HashMap::new()).unwrap();
let g = graph.gremlin(Arc::clone(&graph));
let vertices = g
.v()
.has_value("name", crate::value::Value::String("Alice".to_string()))
.to_list();
assert_eq!(vertices.len(), 1);
assert_eq!(
vertices[0].property("name"),
Some(crate::value::Value::String("Alice".to_string()))
);
let friends = vertices[0].out("knows").to_list();
assert_eq!(friends.len(), 1);
assert_eq!(
friends[0].property("name"),
Some(crate::value::Value::String("Bob".to_string()))
);
}
#[test]
fn test_from_to_accept_graph_vertex() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
let g = graph.gremlin(Arc::clone(&graph));
let alice = g.add_v("person").property("name", "Alice").next().unwrap();
let bob = g.add_v("person").property("name", "Bob").next().unwrap();
let edge = g.add_e("knows").from(&alice).to(&bob).next();
assert!(edge.is_some());
let names: Vec<String> = g
.v_ref(&alice)
.out_label("knows")
.values("name")
.to_list()
.into_iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
assert_eq!(names, vec!["Bob"]);
}
#[test]
fn test_from_to_accept_vertex_id() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
let g = graph.gremlin(Arc::clone(&graph));
let alice = g.add_v("person").next().unwrap();
let bob = g.add_v("person").next().unwrap();
let edge = g.add_e("knows").from(alice.id()).to(bob.id()).next();
assert!(edge.is_some());
}
#[test]
fn test_from_to_accept_u64() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
let g = graph.gremlin(Arc::clone(&graph));
g.add_v("person").next();
g.add_v("person").next();
let edge = g.add_e("knows").from(0u64).to(1u64).next();
assert!(edge.is_some());
}
#[test]
fn test_v_ref_accepts_graph_vertex_ref() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
let g = graph.gremlin(Arc::clone(&graph));
let alice = g
.add_v("person")
.property("name", "Alice")
.property("age", 30i64)
.next()
.unwrap();
let values = g.v_ref(&alice).values("name").to_list();
assert_eq!(values.len(), 1);
assert_eq!(values[0].as_str(), Some("Alice"));
}
#[test]
fn test_v_ref_accepts_vertex_id() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
let g = graph.gremlin(Arc::clone(&graph));
let alice = g.add_v("person").property("name", "Alice").next().unwrap();
let values = g.v_ref(alice.id()).values("name").to_list();
assert_eq!(values.len(), 1);
assert_eq!(values[0].as_str(), Some("Alice"));
}
#[test]
fn test_full_gremlin_workflow_with_variables() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
let g = graph.gremlin(Arc::clone(&graph));
let alice = g
.add_v("person")
.property("name", "Alice")
.property("age", 30i64)
.next()
.unwrap();
let bob = g
.add_v("person")
.property("name", "Bob")
.property("age", 25i64)
.next()
.unwrap();
let charlie = g
.add_v("person")
.property("name", "Charlie")
.property("age", 35i64)
.next()
.unwrap();
let acme = g
.add_v("company")
.property("name", "Acme Corp")
.next()
.unwrap();
g.add_e("knows")
.from(&alice)
.to(&bob)
.property("since", 2020i64)
.next();
g.add_e("knows")
.from(&bob)
.to(&charlie)
.property("since", 2021i64)
.next();
g.add_e("works_at").from(&alice).to(&acme).next();
g.add_e("works_at").from(&bob).to(&acme).next();
assert_eq!(graph.vertex_count(), 4);
assert_eq!(graph.edge_count(), 4);
let alice_knows: Vec<String> = g
.v_ref(&alice)
.out_label("knows")
.values("name")
.to_list()
.into_iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
assert_eq!(alice_knows, vec!["Bob"]);
let alice_knows_knows: Vec<String> = g
.v_ref(&alice)
.out_label("knows")
.out_label("knows")
.values("name")
.to_list()
.into_iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
assert_eq!(alice_knows_knows, vec!["Charlie"]);
let acme_employees: Vec<String> = g
.v_ref(&acme)
.in_label("works_at")
.values("name")
.to_list()
.into_iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
assert_eq!(acme_employees.len(), 2);
assert!(acme_employees.contains(&"Alice".to_string()));
assert!(acme_employees.contains(&"Bob".to_string()));
}
#[test]
fn test_bound_add_edge_builder_to_accepts_graph_vertex() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
let g = graph.gremlin(Arc::clone(&graph));
let alice = g.add_v("person").property("name", "Alice").next().unwrap();
let bob = g.add_v("person").property("name", "Bob").next().unwrap();
let edge = g.v_ref(&alice).add_e("knows").to(&bob).next();
assert!(edge.is_some());
assert_eq!(edge.unwrap().label(), Some("knows".to_string()));
assert_eq!(graph.edge_count(), 1);
}
#[test]
fn test_mixed_from_to_types() {
use std::sync::Arc;
let graph = Arc::new(Graph::new());
let g = graph.gremlin(Arc::clone(&graph));
let alice = g.add_v("person").next().unwrap();
let bob_id = g.add_v("person").next().unwrap().id();
let edge = g.add_e("knows").from(&alice).to(bob_id).next();
assert!(edge.is_some());
let charlie = g.add_v("person").next().unwrap().id();
let edge2 = g.add_e("knows").from(charlie).to(1u64).next();
assert!(edge2.is_some());
}
}