use std::collections::HashMap;
use crate::error::MutationError;
use crate::traversal::step::Step;
use crate::traversal::{ExecutionContext, Traverser};
use crate::value::{EdgeId, Value, VertexId};
#[derive(Clone, Debug)]
pub struct AddVStep {
label: String,
properties: HashMap<String, Value>,
}
impl AddVStep {
pub fn new(label: impl Into<String>) -> Self {
Self {
label: label.into(),
properties: HashMap::new(),
}
}
pub fn with_properties(label: impl Into<String>, properties: HashMap<String, Value>) -> Self {
Self {
label: label.into(),
properties,
}
}
#[inline]
pub fn label(&self) -> &str {
&self.label
}
#[inline]
pub fn properties(&self) -> &HashMap<String, Value> {
&self.properties
}
}
impl Step for AddVStep {
type Iter<'a>
= impl Iterator<Item = Traverser> + 'a
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
_input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let label = self.label.clone();
let properties = self.properties.clone();
let track_paths = ctx.is_tracking_paths();
std::iter::once_with(move || {
let mut t = Traverser::new(Value::Map(crate::value::ValueMap::from([
("__pending_add_v".to_string(), Value::Bool(true)),
("label".to_string(), Value::String(label.clone())),
(
"properties".to_string(),
Value::Map(
properties
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
),
),
])));
if track_paths {
t.extend_path_unlabeled();
}
t
})
}
fn name(&self) -> &'static str {
"addV"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::SideEffect
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
Box::new(std::iter::once(input))
}
}
#[derive(Clone, Debug)]
pub struct PropertyStep {
key: String,
value: Value,
}
impl PropertyStep {
pub fn new(key: impl Into<String>, value: impl Into<Value>) -> Self {
Self {
key: key.into(),
value: value.into(),
}
}
#[inline]
pub fn key(&self) -> &str {
&self.key
}
#[inline]
pub fn value(&self) -> &Value {
&self.value
}
}
impl Step for PropertyStep {
type Iter<'a>
= impl Iterator<Item = Traverser> + 'a
where
Self: 'a;
fn apply<'a>(
&'a self,
_ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let key = self.key.clone();
let value = self.value.clone();
input.map(move |mut t| {
if let Value::Map(ref mut map) = t.value {
if map.get("__pending_add_v").is_some() {
if let Some(Value::Map(props)) = map.get_mut("properties") {
props.insert(key.clone(), value.clone());
}
return t;
}
if map.get("__pending_add_e").is_some() {
if let Some(Value::Map(props)) = map.get_mut("properties") {
props.insert(key.clone(), value.clone());
}
return t;
}
}
match &t.value {
Value::Vertex(id) => {
t.value = Value::Map(crate::value::ValueMap::from([
("__pending_property_vertex".to_string(), Value::Bool(true)),
("id".to_string(), Value::Vertex(*id)),
("key".to_string(), Value::String(key.clone())),
("value".to_string(), value.clone()),
]));
}
Value::Edge(id) => {
t.value = Value::Map(crate::value::ValueMap::from([
("__pending_property_edge".to_string(), Value::Bool(true)),
("id".to_string(), Value::Edge(*id)),
("key".to_string(), Value::String(key.clone())),
("value".to_string(), value.clone()),
]));
}
_ => {
}
}
t
})
}
fn name(&self) -> &'static str {
"property"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::SideEffect
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
Box::new(std::iter::once(input))
}
}
#[derive(Clone, Debug, Default)]
pub struct DropStep;
impl DropStep {
pub fn new() -> Self {
Self
}
}
impl Step for DropStep {
type Iter<'a>
= impl Iterator<Item = Traverser> + 'a
where
Self: 'a;
fn apply<'a>(
&'a self,
_ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
input.filter_map(move |t| {
match &t.value {
Value::Vertex(id) => {
Some(Traverser::new(Value::Map(crate::value::ValueMap::from([
("__pending_drop_vertex".to_string(), Value::Bool(true)),
("id".to_string(), Value::Vertex(*id)),
]))))
}
Value::Edge(id) => {
Some(Traverser::new(Value::Map(crate::value::ValueMap::from([
("__pending_drop_edge".to_string(), Value::Bool(true)),
("id".to_string(), Value::Edge(*id)),
]))))
}
_ => {
None
}
}
})
}
fn name(&self) -> &'static str {
"drop"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::SideEffect
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
Box::new(std::iter::once(input))
}
}
#[derive(Clone, Debug)]
pub enum EdgeEndpoint {
VertexId(VertexId),
Traverser,
StepLabel(String),
}
#[derive(Clone, Debug)]
pub struct AddEStep {
label: String,
from: Option<EdgeEndpoint>,
to: Option<EdgeEndpoint>,
properties: HashMap<String, Value>,
}
impl AddEStep {
pub fn new(label: impl Into<String>) -> Self {
Self {
label: label.into(),
from: None,
to: None,
properties: HashMap::new(),
}
}
pub fn from_vertex(mut self, id: VertexId) -> Self {
self.from = Some(EdgeEndpoint::VertexId(id));
self
}
pub fn from_traverser(mut self) -> Self {
self.from = Some(EdgeEndpoint::Traverser);
self
}
pub fn from_label(mut self, label: impl Into<String>) -> Self {
self.from = Some(EdgeEndpoint::StepLabel(label.into()));
self
}
pub fn to_vertex(mut self, id: VertexId) -> Self {
self.to = Some(EdgeEndpoint::VertexId(id));
self
}
pub fn to_traverser(mut self) -> Self {
self.to = Some(EdgeEndpoint::Traverser);
self
}
pub fn to_label(mut self, label: impl Into<String>) -> Self {
self.to = Some(EdgeEndpoint::StepLabel(label.into()));
self
}
pub fn property(mut self, key: impl Into<String>, value: impl Into<Value>) -> Self {
self.properties.insert(key.into(), value.into());
self
}
#[inline]
pub fn label(&self) -> &str {
&self.label
}
#[inline]
pub fn from_endpoint(&self) -> Option<&EdgeEndpoint> {
self.from.as_ref()
}
#[inline]
pub fn to_endpoint(&self) -> Option<&EdgeEndpoint> {
self.to.as_ref()
}
fn resolve_endpoint(
endpoint: &EdgeEndpoint,
traverser: &Traverser,
) -> Result<VertexId, MutationError> {
match endpoint {
EdgeEndpoint::VertexId(id) => Ok(*id),
EdgeEndpoint::Traverser => {
traverser
.as_vertex_id()
.ok_or(MutationError::MissingEdgeEndpoint(
"traverser is not a vertex",
))
}
EdgeEndpoint::StepLabel(label) => {
if let Some(values) = traverser.path.get(label) {
values
.first()
.and_then(|pv| pv.as_vertex_id())
.ok_or_else(|| MutationError::StepLabelNotVertex(label.clone()))
} else {
Err(MutationError::StepLabelNotFound(label.clone()))
}
}
}
}
}
impl Step for AddEStep {
type Iter<'a>
= impl Iterator<Item = Traverser> + 'a
where
Self: 'a;
fn apply<'a>(
&'a self,
ctx: &'a ExecutionContext<'a>,
input: Box<dyn Iterator<Item = Traverser> + 'a>,
) -> Self::Iter<'a> {
let label = self.label.clone();
let from = self.from.clone();
let to = self.to.clone();
let properties = self.properties.clone();
let track_paths = ctx.is_tracking_paths();
let explicit_endpoints = matches!(
(&from, &to),
(
Some(EdgeEndpoint::VertexId(_)),
Some(EdgeEndpoint::VertexId(_))
)
);
if explicit_endpoints {
let from_id = match &from {
Some(EdgeEndpoint::VertexId(id)) => *id,
_ => unreachable!(),
};
let to_id = match &to {
Some(EdgeEndpoint::VertexId(id)) => *id,
_ => unreachable!(),
};
let iter: Box<dyn Iterator<Item = Traverser> + 'a> =
Box::new(std::iter::once_with(move || {
let mut new_t = Traverser::new(Value::Map(crate::value::ValueMap::from([
("__pending_add_e".to_string(), Value::Bool(true)),
("label".to_string(), Value::String(label.clone())),
("from".to_string(), Value::Vertex(from_id)),
("to".to_string(), Value::Vertex(to_id)),
(
"properties".to_string(),
Value::Map(
properties
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
),
),
])));
if track_paths {
new_t.extend_path_unlabeled();
}
new_t
}));
return iter;
}
Box::new(input.filter_map(move |t| {
let from_endpoint = from.as_ref().unwrap_or(&EdgeEndpoint::Traverser);
let to_endpoint = to.as_ref()?;
let from_id = Self::resolve_endpoint(from_endpoint, &t).ok()?;
let to_id = Self::resolve_endpoint(to_endpoint, &t).ok()?;
let mut new_t = Traverser::new(Value::Map(crate::value::ValueMap::from([
("__pending_add_e".to_string(), Value::Bool(true)),
("label".to_string(), Value::String(label.clone())),
("from".to_string(), Value::Vertex(from_id)),
("to".to_string(), Value::Vertex(to_id)),
(
"properties".to_string(),
Value::Map(
properties
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
),
),
])));
if track_paths {
new_t.extend_path_unlabeled();
}
Some(new_t)
}))
}
fn name(&self) -> &'static str {
"addE"
}
fn category(&self) -> crate::traversal::explain::StepCategory {
crate::traversal::explain::StepCategory::SideEffect
}
fn apply_streaming(
&self,
_ctx: crate::traversal::context::StreamingContext,
input: Traverser,
) -> Box<dyn Iterator<Item = Traverser> + Send + 'static> {
Box::new(std::iter::once(input))
}
}
#[derive(Clone, Debug)]
pub enum PendingMutation {
AddVertex {
label: String,
properties: HashMap<String, Value>,
},
AddEdge {
label: String,
from: VertexId,
to: VertexId,
properties: HashMap<String, Value>,
},
SetVertexProperty {
id: VertexId,
key: String,
value: Value,
},
SetEdgeProperty {
id: EdgeId,
key: String,
value: Value,
},
DropVertex { id: VertexId },
DropEdge { id: EdgeId },
}
impl PendingMutation {
pub fn from_value(value: &Value) -> Option<Self> {
let map = match value {
Value::Map(m) => m,
_ => return None,
};
if map.get("__pending_add_v").is_some() {
let label = map
.get("label")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let properties = map
.get("properties")
.and_then(|v| match v {
Value::Map(m) => Some(m.clone().into_iter().collect()),
_ => None,
})
.unwrap_or_default();
return Some(PendingMutation::AddVertex { label, properties });
}
if map.get("__pending_add_e").is_some() {
let label = map
.get("label")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let from = map.get("from").and_then(|v| v.as_vertex_id())?;
let to = map.get("to").and_then(|v| v.as_vertex_id())?;
let properties = map
.get("properties")
.and_then(|v| match v {
Value::Map(m) => Some(m.clone().into_iter().collect()),
_ => None,
})
.unwrap_or_default();
return Some(PendingMutation::AddEdge {
label,
from,
to,
properties,
});
}
if map.get("__pending_property_vertex").is_some() {
let id = map.get("id").and_then(|v| v.as_vertex_id())?;
let key = map
.get("key")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let value = map.get("value").cloned().unwrap_or(Value::Null);
return Some(PendingMutation::SetVertexProperty { id, key, value });
}
if map.get("__pending_property_edge").is_some() {
let id = map.get("id").and_then(|v| v.as_edge_id())?;
let key = map
.get("key")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let value = map.get("value").cloned().unwrap_or(Value::Null);
return Some(PendingMutation::SetEdgeProperty { id, key, value });
}
if map.get("__pending_drop_vertex").is_some() {
let id = map.get("id").and_then(|v| v.as_vertex_id())?;
return Some(PendingMutation::DropVertex { id });
}
if map.get("__pending_drop_edge").is_some() {
let id = map.get("id").and_then(|v| v.as_edge_id())?;
return Some(PendingMutation::DropEdge { id });
}
None
}
}
#[derive(Debug)]
pub struct MutationResult {
pub values: Vec<Value>,
pub vertices_added: usize,
pub edges_added: usize,
pub vertices_removed: usize,
pub edges_removed: usize,
pub properties_set: usize,
}
impl MutationResult {
pub fn new() -> Self {
Self {
values: Vec::new(),
vertices_added: 0,
edges_added: 0,
vertices_removed: 0,
edges_removed: 0,
properties_set: 0,
}
}
}
impl Default for MutationResult {
fn default() -> Self {
Self::new()
}
}
pub struct MutationExecutor<'s, S: crate::storage::GraphStorageMut> {
storage: &'s mut S,
}
impl<'s, S: crate::storage::GraphStorageMut> MutationExecutor<'s, S> {
pub fn new(storage: &'s mut S) -> Self {
Self { storage }
}
pub fn execute(
&mut self,
traversers: impl Iterator<Item = crate::traversal::Traverser>,
) -> MutationResult {
let mut result = MutationResult::new();
for traverser in traversers {
if let Some(mutation) = PendingMutation::from_value(&traverser.value) {
match mutation {
PendingMutation::AddVertex { label, properties } => {
let id = self.storage.add_vertex(&label, properties);
result.values.push(Value::Vertex(id));
result.vertices_added += 1;
}
PendingMutation::AddEdge {
label,
from,
to,
properties,
} => {
match self.storage.add_edge(from, to, &label, properties) {
Ok(id) => {
result.values.push(Value::Edge(id));
result.edges_added += 1;
}
Err(_) => {
}
}
}
PendingMutation::SetVertexProperty { id, key, value } => {
if self.storage.set_vertex_property(id, &key, value).is_ok() {
result.properties_set += 1;
result.values.push(Value::Vertex(id));
}
}
PendingMutation::SetEdgeProperty { id, key, value } => {
if self.storage.set_edge_property(id, &key, value).is_ok() {
result.properties_set += 1;
result.values.push(Value::Edge(id));
}
}
PendingMutation::DropVertex { id } => {
if self.storage.remove_vertex(id).is_ok() {
result.vertices_removed += 1;
}
}
PendingMutation::DropEdge { id } => {
if self.storage.remove_edge(id).is_ok() {
result.edges_removed += 1;
}
}
}
} else {
result.values.push(traverser.value);
}
}
result
}
pub fn execute_mutation(&mut self, mutation: PendingMutation) -> Option<Value> {
match mutation {
PendingMutation::AddVertex { label, properties } => {
let id = self.storage.add_vertex(&label, properties);
Some(Value::Vertex(id))
}
PendingMutation::AddEdge {
label,
from,
to,
properties,
} => match self.storage.add_edge(from, to, &label, properties) {
Ok(id) => Some(Value::Edge(id)),
Err(_) => None,
},
PendingMutation::SetVertexProperty { id, key, value } => {
self.storage.set_vertex_property(id, &key, value).ok()?;
Some(Value::Vertex(id))
}
PendingMutation::SetEdgeProperty { id, key, value } => {
self.storage.set_edge_property(id, &key, value).ok()?;
Some(Value::Edge(id))
}
PendingMutation::DropVertex { id } => {
self.storage.remove_vertex(id).ok()?;
None }
PendingMutation::DropEdge { id } => {
self.storage.remove_edge(id).ok()?;
None }
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::traversal::step::DynStep;
#[test]
fn add_v_step_new() {
let step = AddVStep::new("person");
assert_eq!(step.label(), "person");
assert!(step.properties().is_empty());
assert_eq!(step.name(), "addV");
}
#[test]
fn add_v_step_with_properties() {
let props = HashMap::from([
("name".to_string(), Value::String("Alice".into())),
("age".to_string(), Value::Int(30)),
]);
let step = AddVStep::with_properties("person", props.clone());
assert_eq!(step.label(), "person");
assert_eq!(step.properties().len(), 2);
}
#[test]
fn add_v_step_clone_box() {
let step = AddVStep::new("person");
let cloned = DynStep::clone_box(&step);
assert_eq!(cloned.dyn_name(), "addV");
}
#[test]
fn property_step_new() {
let step = PropertyStep::new("name", "Alice");
assert_eq!(step.key(), "name");
assert_eq!(step.value(), &Value::String("Alice".to_string()));
assert_eq!(step.name(), "property");
}
#[test]
fn property_step_clone_box() {
let step = PropertyStep::new("name", "Alice");
let cloned = DynStep::clone_box(&step);
assert_eq!(cloned.dyn_name(), "property");
}
#[test]
fn drop_step_new() {
let step = DropStep::new();
assert_eq!(step.name(), "drop");
}
#[test]
fn drop_step_clone_box() {
let step = DropStep::new();
let cloned = DynStep::clone_box(&step);
assert_eq!(cloned.dyn_name(), "drop");
}
#[test]
fn add_e_step_builder() {
let step = AddEStep::new("knows")
.from_vertex(VertexId(1))
.to_vertex(VertexId(2))
.property("since", 2020i64);
assert_eq!(step.label(), "knows");
assert!(matches!(
step.from_endpoint(),
Some(EdgeEndpoint::VertexId(VertexId(1)))
));
assert!(matches!(
step.to_endpoint(),
Some(EdgeEndpoint::VertexId(VertexId(2)))
));
assert_eq!(step.name(), "addE");
}
#[test]
fn add_e_step_from_traverser() {
let step = AddEStep::new("knows")
.from_traverser()
.to_vertex(VertexId(2));
assert!(matches!(
step.from_endpoint(),
Some(EdgeEndpoint::Traverser)
));
}
#[test]
fn add_e_step_from_label() {
let step = AddEStep::new("knows").from_label("start").to_label("end");
assert!(matches!(
step.from_endpoint(),
Some(EdgeEndpoint::StepLabel(ref s)) if s == "start"
));
assert!(matches!(
step.to_endpoint(),
Some(EdgeEndpoint::StepLabel(ref s)) if s == "end"
));
}
#[test]
fn add_e_step_clone_box() {
let step = AddEStep::new("knows");
let cloned = DynStep::clone_box(&step);
assert_eq!(cloned.dyn_name(), "addE");
}
#[test]
fn pending_mutation_from_add_v() {
let value = Value::Map(crate::value::ValueMap::from([
("__pending_add_v".to_string(), Value::Bool(true)),
("label".to_string(), Value::String("person".to_string())),
(
"properties".to_string(),
Value::Map(crate::value::ValueMap::from([(
"name".to_string(),
Value::String("Alice".to_string()),
)])),
),
]));
let mutation = PendingMutation::from_value(&value);
assert!(matches!(
mutation,
Some(PendingMutation::AddVertex { label, properties })
if label == "person" && properties.len() == 1
));
}
#[test]
fn pending_mutation_from_add_e() {
let value = Value::Map(crate::value::ValueMap::from([
("__pending_add_e".to_string(), Value::Bool(true)),
("label".to_string(), Value::String("knows".to_string())),
("from".to_string(), Value::Vertex(VertexId(1))),
("to".to_string(), Value::Vertex(VertexId(2))),
(
"properties".to_string(),
Value::Map(crate::value::ValueMap::new()),
),
]));
let mutation = PendingMutation::from_value(&value);
assert!(matches!(
mutation,
Some(PendingMutation::AddEdge { label, from, to, .. })
if label == "knows" && from == VertexId(1) && to == VertexId(2)
));
}
#[test]
fn pending_mutation_from_drop_vertex() {
let value = Value::Map(crate::value::ValueMap::from([
("__pending_drop_vertex".to_string(), Value::Bool(true)),
("id".to_string(), Value::Vertex(VertexId(42))),
]));
let mutation = PendingMutation::from_value(&value);
assert!(matches!(
mutation,
Some(PendingMutation::DropVertex { id })
if id == VertexId(42)
));
}
#[test]
fn pending_mutation_from_regular_value() {
assert!(PendingMutation::from_value(&Value::Int(42)).is_none());
assert!(PendingMutation::from_value(&Value::String("test".into())).is_none());
assert!(PendingMutation::from_value(&Value::Vertex(VertexId(1))).is_none());
}
}