use super::filter::{ExpressionPredicate, FilterExpression, SessionContext};
use super::{Operator, OperatorError, OperatorResult};
use crate::execution::DataChunk;
use crate::graph::GraphStore;
use crate::graph::lpg::{Edge, Node};
use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
pub enum ProjectExpr {
Column(usize),
Constant(Value),
PropertyAccess {
column: usize,
property: String,
},
EdgeType {
column: usize,
},
Expression {
expr: FilterExpression,
variable_columns: HashMap<String, usize>,
},
NodeResolve {
column: usize,
},
EdgeResolve {
column: usize,
},
Coalesce {
first: usize,
second: usize,
},
}
pub struct ProjectOperator {
child: Box<dyn Operator>,
projections: Vec<ProjectExpr>,
output_types: Vec<LogicalType>,
store: Option<Arc<dyn GraphStore>>,
transaction_id: Option<TransactionId>,
viewing_epoch: Option<EpochId>,
session_context: SessionContext,
}
impl ProjectOperator {
pub fn new(
child: Box<dyn Operator>,
projections: Vec<ProjectExpr>,
output_types: Vec<LogicalType>,
) -> Self {
assert_eq!(projections.len(), output_types.len());
Self {
child,
projections,
output_types,
store: None,
transaction_id: None,
viewing_epoch: None,
session_context: SessionContext::default(),
}
}
pub fn with_store(
child: Box<dyn Operator>,
projections: Vec<ProjectExpr>,
output_types: Vec<LogicalType>,
store: Arc<dyn GraphStore>,
) -> Self {
assert_eq!(projections.len(), output_types.len());
Self {
child,
projections,
output_types,
store: Some(store),
transaction_id: None,
viewing_epoch: None,
session_context: SessionContext::default(),
}
}
pub fn with_transaction_context(
mut self,
epoch: EpochId,
transaction_id: Option<TransactionId>,
) -> Self {
self.viewing_epoch = Some(epoch);
self.transaction_id = transaction_id;
self
}
pub fn with_session_context(mut self, context: SessionContext) -> Self {
self.session_context = context;
self
}
pub fn select_columns(
child: Box<dyn Operator>,
columns: Vec<usize>,
types: Vec<LogicalType>,
) -> Self {
let projections = columns.into_iter().map(ProjectExpr::Column).collect();
Self::new(child, projections, types)
}
}
impl Operator for ProjectOperator {
fn next(&mut self) -> OperatorResult {
let Some(input) = self.child.next()? else {
return Ok(None);
};
let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
for (i, proj) in self.projections.iter().enumerate() {
match proj {
ProjectExpr::Column(col_idx) => {
let input_col = input.column(*col_idx).ok_or_else(|| {
OperatorError::ColumnNotFound(format!("Column {col_idx}"))
})?;
let output_col = output
.column_mut(i)
.expect("column exists: index matches projection schema");
for row in input.selected_indices() {
if let Some(value) = input_col.get_value(row) {
output_col.push_value(value);
}
}
}
ProjectExpr::Constant(value) => {
let output_col = output
.column_mut(i)
.expect("column exists: index matches projection schema");
for _ in input.selected_indices() {
output_col.push_value(value.clone());
}
}
ProjectExpr::PropertyAccess { column, property } => {
let input_col = input
.column(*column)
.ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
let output_col = output
.column_mut(i)
.expect("column exists: index matches projection schema");
let store = self.store.as_ref().ok_or_else(|| {
OperatorError::Execution("Store required for property access".to_string())
})?;
let prop_key = PropertyKey::new(property);
let epoch = self.viewing_epoch;
let tx_id = self.transaction_id;
for row in input.selected_indices() {
let value = if let Some(node_id) = input_col.get_node_id(row) {
let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
store.get_node_versioned(node_id, ep, tx)
} else if let Some(ep) = epoch {
store.get_node_at_epoch(node_id, ep)
} else {
store.get_node(node_id)
};
if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
{
prop
} else if let Some(edge_id) = input_col.get_edge_id(row) {
let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
store.get_edge_versioned(edge_id, ep, tx)
} else if let Some(ep) = epoch {
store.get_edge_at_epoch(edge_id, ep)
} else {
store.get_edge(edge_id)
};
edge.and_then(|e| e.get_property(property).cloned())
.unwrap_or(Value::Null)
} else {
Value::Null
}
} else if let Some(edge_id) = input_col.get_edge_id(row) {
let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
store.get_edge_versioned(edge_id, ep, tx)
} else if let Some(ep) = epoch {
store.get_edge_at_epoch(edge_id, ep)
} else {
store.get_edge(edge_id)
};
edge.and_then(|e| e.get_property(property).cloned())
.unwrap_or(Value::Null)
} else if let Some(Value::Map(map)) = input_col.get_value(row) {
map.get(&prop_key).cloned().unwrap_or(Value::Null)
} else {
Value::Null
};
output_col.push_value(value);
}
}
ProjectExpr::EdgeType { column } => {
let input_col = input
.column(*column)
.ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
let output_col = output
.column_mut(i)
.expect("column exists: index matches projection schema");
let store = self.store.as_ref().ok_or_else(|| {
OperatorError::Execution("Store required for edge type access".to_string())
})?;
let epoch = self.viewing_epoch;
let tx_id = self.transaction_id;
for row in input.selected_indices() {
let value = if let Some(edge_id) = input_col.get_edge_id(row) {
let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
store.edge_type_versioned(edge_id, ep, tx)
} else {
store.edge_type(edge_id)
};
etype.map_or(Value::Null, Value::String)
} else {
Value::Null
};
output_col.push_value(value);
}
}
ProjectExpr::Expression {
expr,
variable_columns,
} => {
let output_col = output
.column_mut(i)
.expect("column exists: index matches projection schema");
let store = self.store.as_ref().ok_or_else(|| {
OperatorError::Execution(
"Store required for expression evaluation".to_string(),
)
})?;
let mut evaluator = ExpressionPredicate::new(
expr.clone(),
variable_columns.clone(),
Arc::clone(store),
)
.with_session_context(self.session_context.clone());
if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
evaluator = evaluator.with_transaction_context(ep, tx_id);
}
for row in input.selected_indices() {
let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
output_col.push_value(value);
}
}
ProjectExpr::NodeResolve { column } => {
let input_col = input
.column(*column)
.ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
let output_col = output
.column_mut(i)
.expect("column exists: index matches projection schema");
let store = self.store.as_ref().ok_or_else(|| {
OperatorError::Execution("Store required for node resolution".to_string())
})?;
let epoch = self.viewing_epoch;
let tx_id = self.transaction_id;
for row in input.selected_indices() {
let value = if let Some(node_id) = input_col.get_node_id(row) {
let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
store.get_node_versioned(node_id, ep, tx)
} else if let Some(ep) = epoch {
store.get_node_at_epoch(node_id, ep)
} else {
store.get_node(node_id)
};
node.map_or(Value::Null, |n| node_to_map(&n))
} else {
Value::Null
};
output_col.push_value(value);
}
}
ProjectExpr::EdgeResolve { column } => {
let input_col = input
.column(*column)
.ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
let output_col = output
.column_mut(i)
.expect("column exists: index matches projection schema");
let store = self.store.as_ref().ok_or_else(|| {
OperatorError::Execution("Store required for edge resolution".to_string())
})?;
let epoch = self.viewing_epoch;
let tx_id = self.transaction_id;
for row in input.selected_indices() {
let value = if let Some(edge_id) = input_col.get_edge_id(row) {
let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
store.get_edge_versioned(edge_id, ep, tx)
} else if let Some(ep) = epoch {
store.get_edge_at_epoch(edge_id, ep)
} else {
store.get_edge(edge_id)
};
edge.map_or(Value::Null, |e| edge_to_map(&e))
} else {
Value::Null
};
output_col.push_value(value);
}
}
ProjectExpr::Coalesce { first, second } => {
let first_col = input
.column(*first)
.ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {first}")))?;
let second_col = input
.column(*second)
.ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {second}")))?;
let output_col = output
.column_mut(i)
.expect("column exists: index matches projection schema");
for row in input.selected_indices() {
let value = match first_col.get_value(row) {
Some(Value::Null) | None => {
second_col.get_value(row).unwrap_or(Value::Null)
}
Some(v) => v,
};
output_col.push_value(value);
}
}
}
}
output.set_count(input.row_count());
Ok(Some(output))
}
fn reset(&mut self) {
self.child.reset();
}
fn name(&self) -> &'static str {
"Project"
}
}
fn node_to_map(node: &Node) -> Value {
let mut map = BTreeMap::new();
map.insert(
PropertyKey::new("_id"),
Value::Int64(node.id.as_u64() as i64),
);
let labels: Vec<Value> = node
.labels
.iter()
.map(|l| Value::String(l.clone()))
.collect();
map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
for (key, value) in &node.properties {
map.insert(key.clone(), value.clone());
}
Value::Map(Arc::new(map))
}
fn edge_to_map(edge: &Edge) -> Value {
let mut map = BTreeMap::new();
map.insert(
PropertyKey::new("_id"),
Value::Int64(edge.id.as_u64() as i64),
);
map.insert(
PropertyKey::new("_type"),
Value::String(edge.edge_type.clone()),
);
map.insert(
PropertyKey::new("_source"),
Value::Int64(edge.src.as_u64() as i64),
);
map.insert(
PropertyKey::new("_target"),
Value::Int64(edge.dst.as_u64() as i64),
);
for (key, value) in &edge.properties {
map.insert(key.clone(), value.clone());
}
Value::Map(Arc::new(map))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::chunk::DataChunkBuilder;
use crate::graph::lpg::LpgStore;
use grafeo_common::types::Value;
struct MockScanOperator {
chunks: Vec<DataChunk>,
position: usize,
}
impl Operator for MockScanOperator {
fn next(&mut self) -> OperatorResult {
if self.position < self.chunks.len() {
let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
self.position += 1;
Ok(Some(chunk))
} else {
Ok(None)
}
}
fn reset(&mut self) {
self.position = 0;
}
fn name(&self) -> &'static str {
"MockScan"
}
}
#[test]
fn test_project_select_columns() {
let mut builder =
DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
builder.column_mut(0).unwrap().push_int64(1);
builder.column_mut(1).unwrap().push_string("hello");
builder.column_mut(2).unwrap().push_int64(100);
builder.advance_row();
builder.column_mut(0).unwrap().push_int64(2);
builder.column_mut(1).unwrap().push_string("world");
builder.column_mut(2).unwrap().push_int64(200);
builder.advance_row();
let chunk = builder.finish();
let mock_scan = MockScanOperator {
chunks: vec![chunk],
position: 0,
};
let mut project = ProjectOperator::select_columns(
Box::new(mock_scan),
vec![2, 0],
vec![LogicalType::Int64, LogicalType::Int64],
);
let result = project.next().unwrap().unwrap();
assert_eq!(result.column_count(), 2);
assert_eq!(result.row_count(), 2);
assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
}
#[test]
fn test_project_constant() {
let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
builder.column_mut(0).unwrap().push_int64(1);
builder.advance_row();
builder.column_mut(0).unwrap().push_int64(2);
builder.advance_row();
let chunk = builder.finish();
let mock_scan = MockScanOperator {
chunks: vec![chunk],
position: 0,
};
let mut project = ProjectOperator::new(
Box::new(mock_scan),
vec![
ProjectExpr::Column(0),
ProjectExpr::Constant(Value::String("constant".into())),
],
vec![LogicalType::Int64, LogicalType::String],
);
let result = project.next().unwrap().unwrap();
assert_eq!(result.column_count(), 2);
assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
}
#[test]
fn test_project_empty_input() {
let mock_scan = MockScanOperator {
chunks: vec![],
position: 0,
};
let mut project =
ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
assert!(project.next().unwrap().is_none());
}
#[test]
fn test_project_column_not_found() {
let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
builder.column_mut(0).unwrap().push_int64(1);
builder.advance_row();
let chunk = builder.finish();
let mock_scan = MockScanOperator {
chunks: vec![chunk],
position: 0,
};
let mut project = ProjectOperator::new(
Box::new(mock_scan),
vec![ProjectExpr::Column(5)],
vec![LogicalType::Int64],
);
let result = project.next();
assert!(result.is_err(), "Should fail with ColumnNotFound");
}
#[test]
fn test_project_multiple_constants() {
let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
builder.column_mut(0).unwrap().push_int64(1);
builder.advance_row();
let chunk = builder.finish();
let mock_scan = MockScanOperator {
chunks: vec![chunk],
position: 0,
};
let mut project = ProjectOperator::new(
Box::new(mock_scan),
vec![
ProjectExpr::Constant(Value::Int64(42)),
ProjectExpr::Constant(Value::String("fixed".into())),
ProjectExpr::Constant(Value::Bool(true)),
],
vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
);
let result = project.next().unwrap().unwrap();
assert_eq!(result.column_count(), 3);
assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
assert_eq!(
result.column(2).unwrap().get_value(0),
Some(Value::Bool(true))
);
}
#[test]
fn test_project_identity() {
let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
builder.column_mut(0).unwrap().push_int64(10);
builder.column_mut(1).unwrap().push_string("test");
builder.advance_row();
let chunk = builder.finish();
let mock_scan = MockScanOperator {
chunks: vec![chunk],
position: 0,
};
let mut project = ProjectOperator::select_columns(
Box::new(mock_scan),
vec![0, 1],
vec![LogicalType::Int64, LogicalType::String],
);
let result = project.next().unwrap().unwrap();
assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
}
#[test]
fn test_project_name() {
let mock_scan = MockScanOperator {
chunks: vec![],
position: 0,
};
let project =
ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
assert_eq!(project.name(), "Project");
}
#[test]
fn test_project_node_resolve() {
let store = LpgStore::new().unwrap();
let node_id = store.create_node(&["Person"]);
store.set_node_property(node_id, "name", Value::String("Alix".into()));
store.set_node_property(node_id, "age", Value::Int64(30));
let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
builder.column_mut(0).unwrap().push_node_id(node_id);
builder.advance_row();
let chunk = builder.finish();
let mock_scan = MockScanOperator {
chunks: vec![chunk],
position: 0,
};
let mut project = ProjectOperator::with_store(
Box::new(mock_scan),
vec![ProjectExpr::NodeResolve { column: 0 }],
vec![LogicalType::Any],
Arc::new(store),
);
let result = project.next().unwrap().unwrap();
assert_eq!(result.column_count(), 1);
let value = result.column(0).unwrap().get_value(0).unwrap();
if let Value::Map(map) = value {
assert_eq!(
map.get(&PropertyKey::new("_id")),
Some(&Value::Int64(node_id.as_u64() as i64))
);
assert!(map.get(&PropertyKey::new("_labels")).is_some());
assert_eq!(
map.get(&PropertyKey::new("name")),
Some(&Value::String("Alix".into()))
);
assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
} else {
panic!("Expected Value::Map, got {:?}", value);
}
}
#[test]
fn test_project_edge_resolve() {
let store = LpgStore::new().unwrap();
let src = store.create_node(&["Person"]);
let dst = store.create_node(&["Company"]);
let edge_id = store.create_edge(src, dst, "WORKS_AT");
store.set_edge_property(edge_id, "since", Value::Int64(2020));
let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
builder.column_mut(0).unwrap().push_edge_id(edge_id);
builder.advance_row();
let chunk = builder.finish();
let mock_scan = MockScanOperator {
chunks: vec![chunk],
position: 0,
};
let mut project = ProjectOperator::with_store(
Box::new(mock_scan),
vec![ProjectExpr::EdgeResolve { column: 0 }],
vec![LogicalType::Any],
Arc::new(store),
);
let result = project.next().unwrap().unwrap();
let value = result.column(0).unwrap().get_value(0).unwrap();
if let Value::Map(map) = value {
assert_eq!(
map.get(&PropertyKey::new("_id")),
Some(&Value::Int64(edge_id.as_u64() as i64))
);
assert_eq!(
map.get(&PropertyKey::new("_type")),
Some(&Value::String("WORKS_AT".into()))
);
assert_eq!(
map.get(&PropertyKey::new("_source")),
Some(&Value::Int64(src.as_u64() as i64))
);
assert_eq!(
map.get(&PropertyKey::new("_target")),
Some(&Value::Int64(dst.as_u64() as i64))
);
assert_eq!(
map.get(&PropertyKey::new("since")),
Some(&Value::Int64(2020))
);
} else {
panic!("Expected Value::Map, got {:?}", value);
}
}
#[test]
fn test_project_resolve_missing_entity() {
use grafeo_common::types::NodeId;
let store = LpgStore::new().unwrap();
let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
builder
.column_mut(0)
.unwrap()
.push_node_id(NodeId::new(999));
builder.advance_row();
let chunk = builder.finish();
let mock_scan = MockScanOperator {
chunks: vec![chunk],
position: 0,
};
let mut project = ProjectOperator::with_store(
Box::new(mock_scan),
vec![ProjectExpr::NodeResolve { column: 0 }],
vec![LogicalType::Any],
Arc::new(store),
);
let result = project.next().unwrap().unwrap();
assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
}
}