use super::{ConstraintValidator, Operator, OperatorResult, PropertySource};
use crate::execution::chunk::{DataChunk, DataChunkBuilder};
use crate::graph::{GraphStore, GraphStoreMut};
use grafeo_common::types::{
EdgeId, EpochId, LogicalType, NodeId, PropertyKey, TransactionId, Value,
};
use std::sync::Arc;
pub struct MergeConfig {
pub variable: String,
pub labels: Vec<String>,
pub match_properties: Vec<(String, PropertySource)>,
pub on_create_properties: Vec<(String, PropertySource)>,
pub on_match_properties: Vec<(String, PropertySource)>,
pub output_schema: Vec<LogicalType>,
pub output_column: usize,
pub bound_variable_column: Option<usize>,
}
pub struct MergeOperator {
store: Arc<dyn GraphStoreMut>,
input: Option<Box<dyn Operator>>,
config: MergeConfig,
executed: bool,
viewing_epoch: Option<EpochId>,
transaction_id: Option<TransactionId>,
validator: Option<Arc<dyn ConstraintValidator>>,
}
impl MergeOperator {
pub fn new(
store: Arc<dyn GraphStoreMut>,
input: Option<Box<dyn Operator>>,
config: MergeConfig,
) -> Self {
Self {
store,
input,
config,
executed: false,
viewing_epoch: None,
transaction_id: None,
validator: None,
}
}
#[must_use]
pub fn variable(&self) -> &str {
&self.config.variable
}
pub fn with_transaction_context(
mut self,
epoch: EpochId,
transaction_id: Option<TransactionId>,
) -> Self {
self.viewing_epoch = Some(epoch);
self.transaction_id = transaction_id;
self
}
pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
self.validator = Some(validator);
self
}
fn resolve_properties(
props: &[(String, PropertySource)],
chunk: Option<&DataChunk>,
row: usize,
store: &dyn GraphStore,
) -> Vec<(String, Value)> {
props
.iter()
.map(|(name, source)| {
let value = if let Some(chunk) = chunk {
source.resolve(chunk, row, store)
} else {
match source {
PropertySource::Constant(v) => v.clone(),
_ => Value::Null,
}
};
(name.clone(), value)
})
.collect()
}
fn find_matching_node(&self, resolved_match_props: &[(String, Value)]) -> Option<NodeId> {
let candidates: Vec<NodeId> = if let Some(first_label) = self.config.labels.first() {
self.store.nodes_by_label(first_label)
} else {
self.store.node_ids()
};
for node_id in candidates {
if let Some(node) = self.store.get_node(node_id) {
let has_all_labels = self.config.labels.iter().all(|label| node.has_label(label));
if !has_all_labels {
continue;
}
let has_all_props = resolved_match_props.iter().all(|(key, expected_value)| {
node.properties
.get(&PropertyKey::new(key.as_str()))
.is_some_and(|v| v == expected_value)
});
if has_all_props {
return Some(node_id);
}
}
}
None
}
fn create_node(
&self,
resolved_match_props: &[(String, Value)],
resolved_create_props: &[(String, Value)],
) -> Result<NodeId, super::OperatorError> {
if let Some(ref validator) = self.validator {
validator.validate_node_labels_allowed(&self.config.labels)?;
let all_props: Vec<(String, Value)> = resolved_match_props
.iter()
.chain(resolved_create_props.iter())
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
for (name, value) in &all_props {
validator.validate_node_property(&self.config.labels, name, value)?;
validator.check_unique_node_property(&self.config.labels, name, value)?;
}
validator.validate_node_complete(&self.config.labels, &all_props)?;
}
let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
.iter()
.map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
.collect();
for (k, v) in resolved_create_props {
if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
existing.1 = v.clone();
} else {
all_props.push((PropertyKey::new(k.as_str()), v.clone()));
}
}
let labels: Vec<&str> = self.config.labels.iter().map(String::as_str).collect();
Ok(self.store.create_node_with_props(&labels, &all_props))
}
fn merge_node_for_row(
&self,
chunk: Option<&DataChunk>,
row: usize,
) -> Result<NodeId, super::OperatorError> {
let store_ref: &dyn GraphStore = self.store.as_ref();
let resolved_match =
Self::resolve_properties(&self.config.match_properties, chunk, row, store_ref);
if let Some(existing_id) = self.find_matching_node(&resolved_match) {
let resolved_on_match =
Self::resolve_properties(&self.config.on_match_properties, chunk, row, store_ref);
self.apply_on_match(existing_id, &resolved_on_match);
Ok(existing_id)
} else {
let resolved_on_create =
Self::resolve_properties(&self.config.on_create_properties, chunk, row, store_ref);
self.create_node(&resolved_match, &resolved_on_create)
}
}
fn apply_on_match(&self, node_id: NodeId, resolved_on_match: &[(String, Value)]) {
for (key, value) in resolved_on_match {
if let Some(tid) = self.transaction_id {
self.store
.set_node_property_versioned(node_id, key.as_str(), value.clone(), tid);
} else {
self.store
.set_node_property(node_id, key.as_str(), value.clone());
}
}
}
}
impl Operator for MergeOperator {
fn next(&mut self) -> OperatorResult {
if let Some(ref mut input) = self.input {
if let Some(chunk) = input.next()? {
let mut builder =
DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
for row in chunk.selected_indices() {
if let Some(bound_col) = self.config.bound_variable_column {
let is_null = chunk.column(bound_col).map_or(true, |col| col.is_null(row));
if is_null {
return Err(super::OperatorError::TypeMismatch {
expected: format!(
"non-null node for MERGE variable '{}'",
self.config.variable
),
found: "NULL".to_string(),
});
}
}
let node_id = self.merge_node_for_row(Some(&chunk), row)?;
for col_idx in 0..chunk.column_count() {
if let (Some(src), Some(dst)) =
(chunk.column(col_idx), builder.column_mut(col_idx))
{
if let Some(val) = src.get_value(row) {
dst.push_value(val);
} else {
dst.push_value(Value::Null);
}
}
}
if let Some(dst) = builder.column_mut(self.config.output_column) {
dst.push_node_id(node_id);
}
builder.advance_row();
}
return Ok(Some(builder.finish()));
}
return Ok(None);
}
if self.executed {
return Ok(None);
}
self.executed = true;
let node_id = self.merge_node_for_row(None, 0)?;
let mut builder = DataChunkBuilder::new(&self.config.output_schema);
if let Some(dst) = builder.column_mut(self.config.output_column) {
dst.push_node_id(node_id);
}
builder.advance_row();
Ok(Some(builder.finish()))
}
fn reset(&mut self) {
self.executed = false;
if let Some(ref mut input) = self.input {
input.reset();
}
}
fn name(&self) -> &'static str {
"Merge"
}
}
pub struct MergeRelationshipConfig {
pub source_column: usize,
pub target_column: usize,
pub source_variable: String,
pub target_variable: String,
pub edge_type: String,
pub match_properties: Vec<(String, PropertySource)>,
pub on_create_properties: Vec<(String, PropertySource)>,
pub on_match_properties: Vec<(String, PropertySource)>,
pub output_schema: Vec<LogicalType>,
pub edge_output_column: usize,
}
pub struct MergeRelationshipOperator {
store: Arc<dyn GraphStoreMut>,
input: Box<dyn Operator>,
config: MergeRelationshipConfig,
viewing_epoch: Option<EpochId>,
transaction_id: Option<TransactionId>,
validator: Option<Arc<dyn ConstraintValidator>>,
}
impl MergeRelationshipOperator {
pub fn new(
store: Arc<dyn GraphStoreMut>,
input: Box<dyn Operator>,
config: MergeRelationshipConfig,
) -> Self {
Self {
store,
input,
config,
viewing_epoch: None,
transaction_id: None,
validator: None,
}
}
pub fn with_transaction_context(
mut self,
epoch: EpochId,
transaction_id: Option<TransactionId>,
) -> Self {
self.viewing_epoch = Some(epoch);
self.transaction_id = transaction_id;
self
}
pub fn with_validator(mut self, validator: Arc<dyn ConstraintValidator>) -> Self {
self.validator = Some(validator);
self
}
fn find_matching_edge(
&self,
src: NodeId,
dst: NodeId,
resolved_match_props: &[(String, Value)],
) -> Option<EdgeId> {
use crate::graph::Direction;
for (target, edge_id) in self.store.edges_from(src, Direction::Outgoing) {
if target != dst {
continue;
}
if let Some(edge) = self.store.get_edge(edge_id) {
if edge.edge_type.as_str() != self.config.edge_type {
continue;
}
let has_all_props = resolved_match_props
.iter()
.all(|(key, expected)| edge.get_property(key).is_some_and(|v| v == expected));
if has_all_props {
return Some(edge_id);
}
}
}
None
}
fn create_edge(
&self,
src: NodeId,
dst: NodeId,
resolved_match_props: &[(String, Value)],
resolved_create_props: &[(String, Value)],
) -> Result<EdgeId, super::OperatorError> {
if let Some(ref validator) = self.validator {
validator.validate_edge_type_allowed(&self.config.edge_type)?;
let all_props: Vec<(String, Value)> = resolved_match_props
.iter()
.chain(resolved_create_props.iter())
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
for (name, value) in &all_props {
validator.validate_edge_property(&self.config.edge_type, name, value)?;
}
validator.validate_edge_complete(&self.config.edge_type, &all_props)?;
}
let mut all_props: Vec<(PropertyKey, Value)> = resolved_match_props
.iter()
.map(|(k, v)| (PropertyKey::new(k.as_str()), v.clone()))
.collect();
for (k, v) in resolved_create_props {
if let Some(existing) = all_props.iter_mut().find(|(key, _)| key.as_str() == k) {
existing.1 = v.clone();
} else {
all_props.push((PropertyKey::new(k.as_str()), v.clone()));
}
}
Ok(self
.store
.create_edge_with_props(src, dst, &self.config.edge_type, &all_props))
}
fn apply_on_match_edge(&self, edge_id: EdgeId, resolved_on_match: &[(String, Value)]) {
for (key, value) in resolved_on_match {
if let Some(tid) = self.transaction_id {
self.store
.set_edge_property_versioned(edge_id, key.as_str(), value.clone(), tid);
} else {
self.store
.set_edge_property(edge_id, key.as_str(), value.clone());
}
}
}
}
impl Operator for MergeRelationshipOperator {
fn next(&mut self) -> OperatorResult {
use super::OperatorError;
if let Some(chunk) = self.input.next()? {
let mut builder =
DataChunkBuilder::with_capacity(&self.config.output_schema, chunk.row_count());
for row in chunk.selected_indices() {
let src_val = chunk
.column(self.config.source_column)
.and_then(|c| c.get_node_id(row))
.ok_or_else(|| OperatorError::TypeMismatch {
expected: format!(
"non-null node for MERGE variable '{}'",
self.config.source_variable
),
found: "NULL".to_string(),
})?;
let dst_val = chunk
.column(self.config.target_column)
.and_then(|c| c.get_node_id(row))
.ok_or_else(|| OperatorError::TypeMismatch {
expected: format!(
"non-null node for MERGE variable '{}'",
self.config.target_variable
),
found: "None".to_string(),
})?;
let store_ref: &dyn GraphStore = self.store.as_ref();
let resolved_match = MergeOperator::resolve_properties(
&self.config.match_properties,
Some(&chunk),
row,
store_ref,
);
let edge_id = if let Some(existing) =
self.find_matching_edge(src_val, dst_val, &resolved_match)
{
let resolved_on_match = MergeOperator::resolve_properties(
&self.config.on_match_properties,
Some(&chunk),
row,
store_ref,
);
self.apply_on_match_edge(existing, &resolved_on_match);
existing
} else {
let resolved_on_create = MergeOperator::resolve_properties(
&self.config.on_create_properties,
Some(&chunk),
row,
store_ref,
);
self.create_edge(src_val, dst_val, &resolved_match, &resolved_on_create)?
};
for col_idx in 0..self.config.output_schema.len() {
if col_idx == self.config.edge_output_column {
if let Some(dst_col) = builder.column_mut(col_idx) {
dst_col.push_edge_id(edge_id);
}
} else if let (Some(src_col), Some(dst_col)) =
(chunk.column(col_idx), builder.column_mut(col_idx))
&& let Some(val) = src_col.get_value(row)
{
dst_col.push_value(val);
}
}
builder.advance_row();
}
return Ok(Some(builder.finish()));
}
Ok(None)
}
fn reset(&mut self) {
self.input.reset();
}
fn name(&self) -> &'static str {
"MergeRelationship"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::lpg::LpgStore;
fn const_props(props: Vec<(&str, Value)>) -> Vec<(String, PropertySource)> {
props
.into_iter()
.map(|(k, v)| (k.to_string(), PropertySource::Constant(v)))
.collect()
}
#[test]
fn test_merge_creates_new_node() {
let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
let mut merge = MergeOperator::new(
Arc::clone(&store),
None,
MergeConfig {
variable: "n".to_string(),
labels: vec!["Person".to_string()],
match_properties: const_props(vec![("name", Value::String("Alix".into()))]),
on_create_properties: vec![],
on_match_properties: vec![],
output_schema: vec![LogicalType::Node],
output_column: 0,
bound_variable_column: None,
},
);
let result = merge.next().unwrap();
assert!(result.is_some());
let nodes = store.nodes_by_label("Person");
assert_eq!(nodes.len(), 1);
let node = store.get_node(nodes[0]).unwrap();
assert!(node.has_label("Person"));
assert_eq!(
node.properties.get(&PropertyKey::new("name")),
Some(&Value::String("Alix".into()))
);
}
#[test]
fn test_merge_matches_existing_node() {
let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
store.create_node_with_props(
&["Person"],
&[(PropertyKey::new("name"), Value::String("Gus".into()))],
);
let mut merge = MergeOperator::new(
Arc::clone(&store),
None,
MergeConfig {
variable: "n".to_string(),
labels: vec!["Person".to_string()],
match_properties: const_props(vec![("name", Value::String("Gus".into()))]),
on_create_properties: vec![],
on_match_properties: vec![],
output_schema: vec![LogicalType::Node],
output_column: 0,
bound_variable_column: None,
},
);
let result = merge.next().unwrap();
assert!(result.is_some());
let nodes = store.nodes_by_label("Person");
assert_eq!(nodes.len(), 1);
}
#[test]
fn test_merge_with_on_create() {
let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
let mut merge = MergeOperator::new(
Arc::clone(&store),
None,
MergeConfig {
variable: "n".to_string(),
labels: vec!["Person".to_string()],
match_properties: const_props(vec![("name", Value::String("Vincent".into()))]),
on_create_properties: const_props(vec![("created", Value::Bool(true))]),
on_match_properties: vec![],
output_schema: vec![LogicalType::Node],
output_column: 0,
bound_variable_column: None,
},
);
let _ = merge.next().unwrap();
let nodes = store.nodes_by_label("Person");
let node = store.get_node(nodes[0]).unwrap();
assert_eq!(
node.properties.get(&PropertyKey::new("name")),
Some(&Value::String("Vincent".into()))
);
assert_eq!(
node.properties.get(&PropertyKey::new("created")),
Some(&Value::Bool(true))
);
}
#[test]
fn test_merge_with_on_match() {
let store: Arc<dyn GraphStoreMut> = Arc::new(LpgStore::new().unwrap());
let node_id = store.create_node_with_props(
&["Person"],
&[(PropertyKey::new("name"), Value::String("Jules".into()))],
);
let mut merge = MergeOperator::new(
Arc::clone(&store),
None,
MergeConfig {
variable: "n".to_string(),
labels: vec!["Person".to_string()],
match_properties: const_props(vec![("name", Value::String("Jules".into()))]),
on_create_properties: vec![],
on_match_properties: const_props(vec![("updated", Value::Bool(true))]),
output_schema: vec![LogicalType::Node],
output_column: 0,
bound_variable_column: None,
},
);
let _ = merge.next().unwrap();
let node = store.get_node(node_id).unwrap();
assert_eq!(
node.properties.get(&PropertyKey::new("updated")),
Some(&Value::Bool(true))
);
}
}