use std::collections::HashMap;
use std::sync::Arc;
use grafeo_common::grafeo_warn;
use grafeo_common::types::{LogicalType, TransactionId, Value};
use grafeo_common::utils::error::{Error, Result};
use grafeo_core::execution::DataChunk;
use grafeo_core::execution::operators::{
BinaryFilterOp, FilterExpression, FilterOperator, HashAggregateOperator, Operator,
OperatorError, Predicate, ProjectExpr, ProjectOperator, SimpleAggregateOperator,
SingleRowOperator, SortOperator, UnaryFilterOp,
};
use grafeo_core::graph::GraphStore;
use grafeo_core::graph::rdf::{Literal, RdfStore, Term, Triple, TriplePattern};
use crate::query::plan::{
AddGraphOp, AggregateFunction as LogicalAggregateFunction, AggregateOp, AntiJoinOp, BindOp,
ClearGraphOp, CopyGraphOp, CreateGraphOp, DeleteTripleOp, DistinctOp, DropGraphOp, FilterOp,
InsertTripleOp, LeftJoinOp, LimitOp, LogicalExpression, LogicalOperator, LogicalPlan, ModifyOp,
MoveGraphOp, SkipOp, SortOp, TripleComponent, TripleScanOp, TripleTemplate,
};
use crate::query::planner::{PhysicalPlan, convert_aggregate_function, convert_filter_expression};
#[cfg(feature = "regex")]
use regex::Regex;
#[cfg(all(feature = "regex-lite", not(feature = "regex")))]
use regex_lite::Regex;
const DEFAULT_CHUNK_SIZE: usize = 1024;
#[cfg(feature = "wal")]
fn log_rdf_wal(wal: &Option<Arc<RdfWal>>, record: &grafeo_adapters::storage::wal::WalRecord) {
if let Some(wal) = wal
&& let Err(err) = wal.log(record)
{
grafeo_warn!("RDF WAL log failed: {err}");
}
}
#[cfg(feature = "wal")]
fn term_to_wal(term: &Term) -> String {
term.to_string()
}
#[cfg(feature = "cdc")]
fn record_cdc_triple_insert(
cdc_log: &Option<Arc<crate::cdc::CdcLog>>,
subject: &Term,
predicate: &Term,
object: &Term,
graph: Option<&str>,
epoch: grafeo_common::types::EpochId,
) {
if let Some(log) = cdc_log {
log.record_triple_insert(
&subject.to_string(),
&predicate.to_string(),
&object.to_string(),
graph,
epoch,
);
}
}
#[cfg(feature = "cdc")]
fn record_cdc_triple_delete(
cdc_log: &Option<Arc<crate::cdc::CdcLog>>,
subject: &Term,
predicate: &Term,
object: &Term,
graph: Option<&str>,
epoch: grafeo_common::types::EpochId,
) {
if let Some(log) = cdc_log {
log.record_triple_delete(
&subject.to_string(),
&predicate.to_string(),
&object.to_string(),
graph,
epoch,
);
}
}
#[cfg(feature = "wal")]
type RdfWal = grafeo_adapters::storage::wal::LpgWal;
struct TripleOperands {
subject: TripleComponent,
predicate: TripleComponent,
object: TripleComponent,
column_map: HashMap<String, usize>,
}
pub struct RdfPlanner {
store: Arc<RdfStore>,
chunk_size: usize,
transaction_id: Option<TransactionId>,
profiling: std::cell::Cell<bool>,
profile_entries: std::cell::RefCell<Vec<crate::query::profile::ProfileEntry>>,
#[cfg(feature = "wal")]
wal: Option<Arc<RdfWal>>,
#[cfg(feature = "cdc")]
cdc_log: Option<Arc<crate::cdc::CdcLog>>,
#[cfg(feature = "cdc")]
cdc_epoch: grafeo_common::types::EpochId,
}
impl RdfPlanner {
#[must_use]
pub fn new(store: Arc<RdfStore>) -> Self {
Self {
store,
chunk_size: DEFAULT_CHUNK_SIZE,
transaction_id: None,
profiling: std::cell::Cell::new(false),
profile_entries: std::cell::RefCell::new(Vec::new()),
#[cfg(feature = "wal")]
wal: None,
#[cfg(feature = "cdc")]
cdc_log: None,
#[cfg(feature = "cdc")]
cdc_epoch: grafeo_common::types::EpochId(0),
}
}
#[must_use]
pub fn with_chunk_size(mut self, chunk_size: usize) -> Self {
self.chunk_size = chunk_size;
self
}
#[must_use]
pub fn with_transaction_id(mut self, transaction_id: Option<TransactionId>) -> Self {
self.transaction_id = transaction_id;
self
}
#[cfg(feature = "wal")]
#[must_use]
pub fn with_wal(mut self, wal: Option<Arc<RdfWal>>) -> Self {
self.wal = wal;
self
}
#[cfg(feature = "cdc")]
#[must_use]
pub fn with_cdc_log(
mut self,
cdc_log: Option<Arc<crate::cdc::CdcLog>>,
epoch: grafeo_common::types::EpochId,
) -> Self {
self.cdc_log = cdc_log;
self.cdc_epoch = epoch;
self
}
pub fn plan(&self, logical_plan: &LogicalPlan) -> Result<PhysicalPlan> {
let (operator, columns) = self.plan_operator(&logical_plan.root)?;
Ok(PhysicalPlan {
operator,
columns,
adaptive_context: None,
})
}
pub fn plan_profiled(
&self,
logical_plan: &LogicalPlan,
) -> Result<(PhysicalPlan, Vec<crate::query::profile::ProfileEntry>)> {
self.profiling.set(true);
self.profile_entries.borrow_mut().clear();
let result = self.plan_operator(&logical_plan.root);
self.profiling.set(false);
let (operator, columns) = result?;
let entries = self.profile_entries.borrow_mut().drain(..).collect();
Ok((
PhysicalPlan {
operator,
columns,
adaptive_context: None,
},
entries,
))
}
fn maybe_profile(
&self,
result: Result<(Box<dyn Operator>, Vec<String>)>,
op: &LogicalOperator,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
if self.profiling.get() {
let (physical, columns) = result?;
let (entry, stats) =
crate::query::profile::ProfileEntry::new(physical.name(), op.display_label());
let profiled = grafeo_core::execution::ProfiledOperator::new(physical, stats);
self.profile_entries.borrow_mut().push(entry);
Ok((Box::new(profiled), columns))
} else {
result
}
}
fn plan_operator(&self, op: &LogicalOperator) -> Result<(Box<dyn Operator>, Vec<String>)> {
let result = match op {
LogicalOperator::TripleScan(scan) => self.plan_triple_scan(scan),
LogicalOperator::Filter(filter) => self.plan_filter(filter),
LogicalOperator::Project(project) => self.plan_project(project),
LogicalOperator::Limit(limit) => self.plan_limit(limit),
LogicalOperator::Skip(skip) => self.plan_skip(skip),
LogicalOperator::Sort(sort) => self.plan_sort(sort),
LogicalOperator::Aggregate(agg) => self.plan_aggregate(agg),
LogicalOperator::Return(ret) => self.plan_return(ret),
LogicalOperator::Join(join) => self.plan_join(join),
LogicalOperator::LeftJoin(join) => self.plan_left_join(join),
LogicalOperator::AntiJoin(join) => self.plan_anti_join(join),
LogicalOperator::Union(union) => self.plan_union(union),
LogicalOperator::Distinct(distinct) => self.plan_distinct(distinct),
LogicalOperator::InsertTriple(insert) => self.plan_insert_triple(insert),
LogicalOperator::DeleteTriple(delete) => self.plan_delete_triple(delete),
LogicalOperator::Modify(modify) => self.plan_modify(modify),
LogicalOperator::ClearGraph(clear) => self.plan_clear_graph(clear),
LogicalOperator::CreateGraph(create) => self.plan_create_graph(create),
LogicalOperator::DropGraph(drop_op) => self.plan_drop_graph(drop_op),
LogicalOperator::CopyGraph(copy) => self.plan_copy_graph(copy),
LogicalOperator::MoveGraph(move_op) => self.plan_move_graph(move_op),
LogicalOperator::AddGraph(add) => self.plan_add_graph(add),
LogicalOperator::Bind(bind) => self.plan_bind(bind),
LogicalOperator::MultiWayJoin(mwj) => self.plan_multi_way_join(mwj),
LogicalOperator::Empty => {
let op: Box<dyn Operator> = Box::new(SingleRowOperator::new());
Ok((op, vec![]))
}
_ => Err(Error::Internal(format!(
"Unsupported RDF operator: {:?}",
std::mem::discriminant(op)
))),
};
self.maybe_profile(result, op)
}
fn plan_triple_scan(&self, scan: &TripleScanOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
let pattern = self.build_triple_pattern(scan);
let mut columns = Vec::new();
let mut output_mask = [false, false, false, false];
if let TripleComponent::Variable(name) = &scan.subject {
columns.push(name.clone());
output_mask[0] = true;
}
if let TripleComponent::Variable(name) = &scan.predicate {
columns.push(name.clone());
output_mask[1] = true;
}
if let TripleComponent::Variable(name) = &scan.object {
columns.push(name.clone());
output_mask[2] = true;
}
if let Some(TripleComponent::Variable(name)) = &scan.graph {
columns.push(name.clone());
output_mask[3] = true;
}
let (graph_iri, scan_all_graphs) = match &scan.graph {
Some(TripleComponent::Iri(iri)) => (Some(iri.clone()), false),
Some(TripleComponent::Literal(Value::String(iri))) => (Some(iri.to_string()), false),
Some(TripleComponent::Variable(_)) => (None, true),
_ => (None, false),
};
let operator = Box::new(RdfTripleScanOperator::new(
Arc::clone(&self.store),
pattern,
output_mask,
self.chunk_size,
graph_iri,
scan_all_graphs,
));
Ok((operator, columns))
}
fn build_triple_pattern(&self, scan: &TripleScanOp) -> TriplePattern {
TriplePattern {
subject: component_to_term(&scan.subject),
predicate: component_to_term(&scan.predicate),
object: component_to_term(&scan.object),
}
}
fn plan_return(
&self,
ret: &crate::query::plan::ReturnOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, _input_columns) = self.plan_operator(&ret.input)?;
let columns: Vec<String> = ret
.items
.iter()
.map(|item| {
item.alias
.clone()
.unwrap_or_else(|| expression_to_string(&item.expression))
})
.collect();
Ok((input_op, columns))
}
fn plan_filter(&self, filter: &FilterOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
if let Some((subquery, is_negated)) = self.extract_exists_pattern(&filter.predicate) {
return self.plan_exists_as_join(&filter.input, subquery, is_negated);
}
let (input_op, columns) = self.plan_operator(&filter.input)?;
let variable_columns: HashMap<String, usize> = columns
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();
let filter_expr = convert_filter_expression(&filter.predicate)?;
let predicate = RdfExpressionPredicate::new(filter_expr, variable_columns);
let operator = Box::new(FilterOperator::new(input_op, Box::new(predicate)));
Ok((operator, columns))
}
fn extract_exists_pattern<'a>(
&self,
predicate: &'a LogicalExpression,
) -> Option<(&'a LogicalOperator, bool)> {
use crate::query::plan::UnaryOp;
match predicate {
LogicalExpression::ExistsSubquery(subquery) => Some((subquery.as_ref(), false)),
LogicalExpression::Unary {
op: UnaryOp::Not,
operand,
} => {
if let LogicalExpression::ExistsSubquery(subquery) = operand.as_ref() {
Some((subquery.as_ref(), true))
} else {
None
}
}
_ => None,
}
}
fn plan_exists_as_join(
&self,
input: &LogicalOperator,
subquery: &LogicalOperator,
is_negated: bool,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
use crate::query::planner::common;
let (left_op, left_columns) = self.plan_operator(input)?;
let (right_op, right_columns) = self.plan_operator(subquery)?;
let schema = derive_rdf_schema(&left_columns);
let result = if is_negated {
common::build_anti_join(left_op, right_op, left_columns, &right_columns, schema)
} else {
common::build_semi_join(left_op, right_op, left_columns, &right_columns, schema)
};
Ok(result)
}
fn plan_distinct(&self, distinct: &DistinctOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
use crate::query::planner::common;
let (input_op, columns) = self.plan_operator(&distinct.input)?;
let schema = derive_rdf_schema(&columns);
Ok(common::build_distinct(
input_op,
columns,
distinct.columns.as_deref(),
schema,
))
}
fn plan_limit(&self, limit: &LimitOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
use crate::query::planner::common;
let (input_op, columns) = self.plan_operator(&limit.input)?;
let schema = derive_rdf_schema(&columns);
Ok(common::build_limit(
input_op,
columns,
limit.count.value(),
schema,
))
}
fn plan_skip(&self, skip: &SkipOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
use crate::query::planner::common;
let (input_op, columns) = self.plan_operator(&skip.input)?;
let schema = derive_rdf_schema(&columns);
Ok(common::build_skip(
input_op,
columns,
skip.count.value(),
schema,
))
}
fn plan_sort(&self, sort: &SortOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
use crate::query::plan::SortOrder;
use grafeo_core::execution::operators::{
FilterExpression, NullOrder, ProjectExpr, ProjectOperator, SortDirection, SortKey,
};
let (mut input_op, columns) = self.plan_operator(&sort.input)?;
let mut variable_columns: HashMap<String, usize> = columns
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();
let mut expression_projections: Vec<(FilterExpression, String)> = Vec::new();
let mut next_col_idx = columns.len();
for key in &sort.keys {
match &key.expression {
LogicalExpression::Variable(_) => {}
_ => {
let col_name = format!("__expr_{:?}", key.expression);
if !variable_columns.contains_key(&col_name) {
let filter_expr = convert_filter_expression(&key.expression)?;
expression_projections.push((filter_expr, col_name.clone()));
variable_columns.insert(col_name, next_col_idx);
next_col_idx += 1;
}
}
}
}
if !expression_projections.is_empty() {
let mut projections: Vec<ProjectExpr> =
(0..columns.len()).map(ProjectExpr::Column).collect();
let mut output_types: Vec<LogicalType> =
columns.iter().map(|_| LogicalType::String).collect();
for (filter_expr, _col_name) in &expression_projections {
projections.push(ProjectExpr::Expression {
expr: filter_expr.clone(),
variable_columns: variable_columns.clone(),
});
output_types.push(LogicalType::Any);
}
input_op = Box::new(ProjectOperator::with_store(
input_op,
projections,
output_types,
Arc::new(grafeo_core::graph::NullGraphStore) as Arc<dyn GraphStore>,
));
}
let physical_keys: Vec<SortKey> = sort
.keys
.iter()
.map(|key| {
let col_idx = resolve_expression(&key.expression, &variable_columns)?;
Ok(SortKey {
column: col_idx,
direction: match key.order {
SortOrder::Ascending => SortDirection::Ascending,
SortOrder::Descending => SortDirection::Descending,
},
null_order: NullOrder::NullsLast,
})
})
.collect::<Result<Vec<_>>>()?;
let output_schema = derive_rdf_schema(&columns);
let operator = Box::new(SortOperator::new(input_op, physical_keys, output_schema));
Ok((operator, columns))
}
fn plan_project(
&self,
project: &crate::query::plan::ProjectOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, input_columns) = self.plan_operator(&project.input)?;
let variable_columns: HashMap<String, usize> = input_columns
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();
let mut projections = Vec::new();
let mut output_columns = Vec::new();
let mut output_types = Vec::new();
for proj in &project.projections {
match &proj.expression {
LogicalExpression::Variable(name) => {
if let Some(&col_idx) = variable_columns.get(name) {
projections.push(RdfProjectExpr::Column(col_idx));
output_columns.push(proj.alias.clone().unwrap_or_else(|| name.clone()));
output_types.push(LogicalType::Any); } else {
return Err(Error::Internal(format!(
"Variable '{}' not found in input columns",
name
)));
}
}
LogicalExpression::Literal(value) => {
projections.push(RdfProjectExpr::Constant(value.clone()));
output_columns.push(proj.alias.clone().unwrap_or_else(|| format!("{value}")));
output_types.push(LogicalType::Any);
}
expr => {
let filter_expr = convert_filter_expression(expr)?;
projections.push(RdfProjectExpr::Expression {
expr: filter_expr,
variable_columns: variable_columns.clone(),
});
output_columns.push(proj.alias.clone().unwrap_or_else(|| format!("{expr:?}")));
output_types.push(LogicalType::Any);
}
}
}
if projections.is_empty() {
return Ok((input_op, input_columns));
}
let operator: Box<dyn Operator> =
Box::new(RdfProjectOperator::new(input_op, projections, output_types));
Ok((operator, output_columns))
}
fn plan_bind(&self, bind: &BindOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (input_op, input_columns) = self.plan_operator(&bind.input)?;
let variable_columns: HashMap<String, usize> = input_columns
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();
let filter_expr = convert_filter_expression(&bind.expression)?;
let mut output_columns = input_columns;
output_columns.push(bind.variable.clone());
let operator = Box::new(RdfBindOperator::new(
input_op,
filter_expr,
variable_columns,
));
Ok((operator, output_columns))
}
fn plan_aggregate(&self, agg: &AggregateOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
use grafeo_core::execution::operators::AggregateExpr as PhysicalAggregateExpr;
let (mut input_op, input_columns) = self.plan_operator(&agg.input)?;
let mut variable_columns: HashMap<String, usize> = input_columns
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();
let mut expression_projections: Vec<(FilterExpression, String)> = Vec::new();
let mut next_col_idx = input_columns.len();
for expr in &agg.group_by {
match expr {
LogicalExpression::Variable(_) => {}
_ => {
let col_name = format!("__expr_{:?}", expr);
if !variable_columns.contains_key(&col_name) {
let filter_expr = convert_filter_expression(expr)?;
expression_projections.push((filter_expr, col_name.clone()));
variable_columns.insert(col_name, next_col_idx);
next_col_idx += 1;
}
}
}
}
for agg_expr in &agg.aggregates {
for expr_opt in [&agg_expr.expression, &agg_expr.expression2] {
let Some(expr) = expr_opt else { continue };
match expr {
LogicalExpression::Variable(_) => {}
_ => {
let col_name = format!("__expr_{:?}", expr);
if !variable_columns.contains_key(&col_name) {
let filter_expr = convert_filter_expression(expr)?;
expression_projections.push((filter_expr, col_name.clone()));
variable_columns.insert(col_name, next_col_idx);
next_col_idx += 1;
}
}
}
}
}
if !expression_projections.is_empty() {
let mut projections: Vec<ProjectExpr> =
(0..input_columns.len()).map(ProjectExpr::Column).collect();
let mut output_types: Vec<LogicalType> =
input_columns.iter().map(|_| LogicalType::String).collect();
for (filter_expr, _col_name) in &expression_projections {
projections.push(ProjectExpr::Expression {
expr: filter_expr.clone(),
variable_columns: variable_columns.clone(),
});
output_types.push(LogicalType::Any);
}
input_op = Box::new(ProjectOperator::with_store(
input_op,
projections,
output_types,
Arc::new(grafeo_core::graph::NullGraphStore) as Arc<dyn GraphStore>,
));
}
let group_columns: Vec<usize> = agg
.group_by
.iter()
.map(|expr| resolve_expression(expr, &variable_columns))
.collect::<Result<Vec<_>>>()?;
let physical_aggregates: Vec<PhysicalAggregateExpr> = agg
.aggregates
.iter()
.map(|agg_expr| {
let column = agg_expr
.expression
.as_ref()
.map(|e| resolve_expression(e, &variable_columns))
.transpose()?;
let column2 = agg_expr
.expression2
.as_ref()
.map(|e| resolve_expression(e, &variable_columns))
.transpose()?;
Ok(PhysicalAggregateExpr {
function: convert_aggregate_function(agg_expr.function),
column,
column2,
distinct: agg_expr.distinct,
alias: agg_expr.alias.clone(),
percentile: agg_expr.percentile,
separator: agg_expr.separator.clone(),
})
})
.collect::<Result<Vec<_>>>()?;
let mut output_schema = Vec::new();
let mut output_columns = Vec::new();
for expr in &agg.group_by {
output_schema.push(LogicalType::String);
output_columns.push(expression_to_string(expr));
}
for agg_expr in &agg.aggregates {
let result_type = match agg_expr.function {
LogicalAggregateFunction::Count | LogicalAggregateFunction::CountNonNull => {
LogicalType::Int64
}
LogicalAggregateFunction::Sum => LogicalType::Float64,
LogicalAggregateFunction::Avg => LogicalType::Float64,
_ => LogicalType::String,
};
output_schema.push(result_type);
output_columns.push(
agg_expr
.alias
.clone()
.unwrap_or_else(|| format!("{:?}(...)", agg_expr.function).to_lowercase()),
);
}
let mut operator: Box<dyn Operator> = if group_columns.is_empty() {
Box::new(SimpleAggregateOperator::new(
input_op,
physical_aggregates,
output_schema,
))
} else {
Box::new(HashAggregateOperator::new(
input_op,
group_columns,
physical_aggregates,
output_schema,
))
};
if let Some(having_expr) = &agg.having {
let having_var_columns: HashMap<String, usize> = output_columns
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();
let filter_expr = convert_filter_expression(having_expr)?;
let predicate = RdfExpressionPredicate::new(filter_expr, having_var_columns);
operator = Box::new(FilterOperator::new(operator, Box::new(predicate)));
}
Ok((operator, output_columns))
}
fn plan_join(
&self,
join: &crate::query::plan::JoinOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
use crate::query::planner::common;
let (left_op, left_columns) = self.plan_operator(&join.left)?;
let (right_op, right_columns) = self.plan_operator(&join.right)?;
let cardinalities = estimate_operator_cardinality(&join.left, &self.store)
.zip(estimate_operator_cardinality(&join.right, &self.store));
Ok(common::build_inner_join(
left_op,
right_op,
&left_columns,
&right_columns,
derive_rdf_schema,
cardinalities,
))
}
fn plan_left_join(&self, join: &LeftJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
use crate::query::planner::common;
let (left_op, left_columns) = self.plan_operator(&join.left)?;
let (right_op, right_columns) = self.plan_operator(&join.right)?;
Ok(common::build_left_join(
left_op,
right_op,
&left_columns,
&right_columns,
derive_rdf_schema,
))
}
fn plan_anti_join(&self, join: &AntiJoinOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
use crate::query::planner::common;
let (left_op, left_columns) = self.plan_operator(&join.left)?;
let (right_op, right_columns) = self.plan_operator(&join.right)?;
let schema = derive_rdf_schema(&left_columns);
Ok(common::build_anti_join(
left_op,
right_op,
left_columns,
&right_columns,
schema,
))
}
fn plan_multi_way_join(
&self,
mwj: &crate::query::plan::MultiWayJoinOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
use crate::query::planner::common;
if mwj.inputs.is_empty() {
return Err(Error::Internal(
"MultiWayJoin requires at least one input".to_string(),
));
}
let mut planned: Vec<(Box<dyn Operator>, Vec<String>, f64)> = Vec::new();
for input in &mwj.inputs {
let (op, cols) = self.plan_operator(input)?;
let card = estimate_operator_cardinality(input, &self.store).unwrap_or(1000.0);
planned.push((op, cols, card));
}
planned.sort_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal));
let (mut current_op, mut current_cols, mut current_card) = planned.remove(0);
for (right_op, right_cols, right_card) in planned {
let cardinalities = Some((current_card, right_card));
let (joined_op, joined_cols) = common::build_inner_join(
current_op,
right_op,
¤t_cols,
&right_cols,
derive_rdf_schema,
cardinalities,
);
current_card = (current_card * right_card * 0.1).max(1.0);
current_op = joined_op;
current_cols = joined_cols;
}
Ok((current_op, current_cols))
}
fn plan_union(
&self,
union: &crate::query::plan::UnionOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
if union.inputs.is_empty() {
return Err(Error::Internal("Empty UNION".to_string()));
}
let mut operators: Vec<Box<dyn Operator>> = Vec::new();
let mut columns = Vec::new();
for (i, input) in union.inputs.iter().enumerate() {
let (op, cols) = self.plan_operator(input)?;
operators.push(op);
if i == 0 {
columns = cols;
}
}
if operators.len() == 1 {
return Ok((
operators
.into_iter()
.next()
.expect("single-element iterator"),
columns,
));
}
let operator = Box::new(RdfUnionOperator::new(operators));
Ok((operator, columns))
}
fn plan_insert_triple(
&self,
insert: &InsertTripleOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let has_variables = matches!(&insert.subject, TripleComponent::Variable(_))
|| matches!(&insert.predicate, TripleComponent::Variable(_))
|| matches!(&insert.object, TripleComponent::Variable(_));
if has_variables {
if let Some(ref input) = insert.input {
let (input_op, input_columns) = self.plan_operator(input)?;
let column_map: HashMap<String, usize> = input_columns
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();
let operator = Box::new(RdfInsertPatternOperator::new(
Arc::clone(&self.store),
input_op,
TripleOperands {
subject: insert.subject.clone(),
predicate: insert.predicate.clone(),
object: insert.object.clone(),
column_map,
},
#[cfg(feature = "wal")]
self.wal.clone(),
#[cfg(feature = "cdc")]
self.cdc_log.clone(),
#[cfg(feature = "cdc")]
self.cdc_epoch,
));
return Ok((operator, Vec::new()));
}
}
let subject = self.component_to_term(&insert.subject)?;
let predicate = self.component_to_term(&insert.predicate)?;
let object = self.component_to_term(&insert.object)?;
let triple = Triple::new(subject, predicate, object);
let operator = Box::new(RdfInsertTripleOperator::new(
Arc::clone(&self.store),
triple,
insert.graph.clone(),
self.transaction_id,
#[cfg(feature = "wal")]
self.wal.clone(),
#[cfg(feature = "cdc")]
self.cdc_log.clone(),
#[cfg(feature = "cdc")]
self.cdc_epoch,
));
Ok((operator, Vec::new()))
}
fn component_to_term(&self, component: &TripleComponent) -> Result<Term> {
match component {
TripleComponent::Iri(iri) => Ok(Term::Iri(iri.clone().into())),
TripleComponent::Literal(value) => {
let lit = match value {
Value::String(s) => Literal::simple(s.to_string()),
Value::Int64(n) => Literal::integer(*n),
Value::Float64(f) => {
Literal::typed(f.to_string(), "http://www.w3.org/2001/XMLSchema#double")
}
Value::Bool(b) => {
Literal::typed(b.to_string(), "http://www.w3.org/2001/XMLSchema#boolean")
}
_ => Literal::simple(format!("{:?}", value)),
};
Ok(Term::Literal(lit))
}
TripleComponent::LangLiteral { value, lang } => {
Ok(Term::lang_literal(value.clone(), lang.clone()))
}
TripleComponent::BlankNode(label) => Ok(Term::blank(label.clone())),
TripleComponent::Variable(name) => {
Err(Error::Internal(format!(
"Unbound variable '{}' in INSERT DATA",
name
)))
}
}
}
fn plan_delete_triple(
&self,
delete: &DeleteTripleOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let has_variables = matches!(&delete.subject, TripleComponent::Variable(_))
|| matches!(&delete.predicate, TripleComponent::Variable(_))
|| matches!(&delete.object, TripleComponent::Variable(_));
if has_variables {
if let Some(ref input) = delete.input {
let (input_op, input_columns) = self.plan_operator(input)?;
let column_map: HashMap<String, usize> = input_columns
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();
let operator = Box::new(RdfDeletePatternOperator::new(
Arc::clone(&self.store),
input_op,
TripleOperands {
subject: delete.subject.clone(),
predicate: delete.predicate.clone(),
object: delete.object.clone(),
column_map,
},
#[cfg(feature = "wal")]
self.wal.clone(),
#[cfg(feature = "cdc")]
self.cdc_log.clone(),
#[cfg(feature = "cdc")]
self.cdc_epoch,
));
return Ok((operator, Vec::new()));
}
}
let subject = self.component_to_term(&delete.subject)?;
let predicate = self.component_to_term(&delete.predicate)?;
let object = self.component_to_term(&delete.object)?;
let triple = Triple::new(subject, predicate, object);
let operator = Box::new(RdfDeleteTripleOperator::new(
Arc::clone(&self.store),
triple,
delete.graph.clone(),
self.transaction_id,
#[cfg(feature = "wal")]
self.wal.clone(),
#[cfg(feature = "cdc")]
self.cdc_log.clone(),
#[cfg(feature = "cdc")]
self.cdc_epoch,
));
Ok((operator, Vec::new()))
}
fn plan_clear_graph(&self, clear: &ClearGraphOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
let operator = Box::new(RdfClearGraphOperator::new(
Arc::clone(&self.store),
clear.graph.clone(),
clear.silent,
#[cfg(feature = "wal")]
self.wal.clone(),
));
Ok((operator, Vec::new()))
}
fn plan_create_graph(
&self,
create: &CreateGraphOp,
) -> Result<(Box<dyn Operator>, Vec<String>)> {
let operator = Box::new(RdfCreateGraphOperator::new(
Arc::clone(&self.store),
create.graph.clone(),
create.silent,
#[cfg(feature = "wal")]
self.wal.clone(),
));
Ok((operator, Vec::new()))
}
fn plan_drop_graph(&self, drop_op: &DropGraphOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
let operator = Box::new(RdfDropGraphOperator::new(
Arc::clone(&self.store),
drop_op.graph.clone(),
drop_op.silent,
#[cfg(feature = "wal")]
self.wal.clone(),
));
Ok((operator, Vec::new()))
}
fn plan_copy_graph(&self, copy: &CopyGraphOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
let operator = Box::new(RdfCopyGraphOperator::new(
Arc::clone(&self.store),
copy.source.clone(),
copy.destination.clone(),
copy.silent,
));
Ok((operator, Vec::new()))
}
fn plan_move_graph(&self, move_op: &MoveGraphOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
let operator = Box::new(RdfMoveGraphOperator::new(
Arc::clone(&self.store),
move_op.source.clone(),
move_op.destination.clone(),
move_op.silent,
));
Ok((operator, Vec::new()))
}
fn plan_add_graph(&self, add: &AddGraphOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
let operator = Box::new(RdfAddGraphOperator::new(
Arc::clone(&self.store),
add.source.clone(),
add.destination.clone(),
add.silent,
));
Ok((operator, Vec::new()))
}
fn plan_modify(&self, modify: &ModifyOp) -> Result<(Box<dyn Operator>, Vec<String>)> {
let (where_op, where_columns) = self.plan_operator(&modify.where_clause)?;
let column_map: HashMap<String, usize> = where_columns
.iter()
.enumerate()
.map(|(i, name)| (name.clone(), i))
.collect();
let operator = Box::new(RdfModifyOperator::new(
Arc::clone(&self.store),
where_op,
modify.delete_templates.clone(),
modify.insert_templates.clone(),
column_map,
#[cfg(feature = "cdc")]
self.cdc_log.clone(),
#[cfg(feature = "cdc")]
self.cdc_epoch,
));
Ok((operator, Vec::new()))
}
}
struct RdfInsertTripleOperator {
store: Arc<RdfStore>,
triple: Triple,
graph_name: Option<String>,
transaction_id: Option<TransactionId>,
inserted: bool,
#[cfg(feature = "wal")]
wal: Option<Arc<RdfWal>>,
#[cfg(feature = "cdc")]
cdc_log: Option<Arc<crate::cdc::CdcLog>>,
#[cfg(feature = "cdc")]
cdc_epoch: grafeo_common::types::EpochId,
}
impl RdfInsertTripleOperator {
fn new(
store: Arc<RdfStore>,
triple: Triple,
graph_name: Option<String>,
transaction_id: Option<TransactionId>,
#[cfg(feature = "wal")] wal: Option<Arc<RdfWal>>,
#[cfg(feature = "cdc")] cdc_log: Option<Arc<crate::cdc::CdcLog>>,
#[cfg(feature = "cdc")] cdc_epoch: grafeo_common::types::EpochId,
) -> Self {
Self {
store,
triple,
graph_name,
transaction_id,
inserted: false,
#[cfg(feature = "wal")]
wal,
#[cfg(feature = "cdc")]
cdc_log,
#[cfg(feature = "cdc")]
cdc_epoch,
}
}
}
impl Operator for RdfInsertTripleOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
if self.inserted {
return Ok(None);
}
let target = match &self.graph_name {
Some(name) => self.store.graph_or_create(name),
None => Arc::clone(&self.store),
};
if let Some(transaction_id) = self.transaction_id {
target.insert_in_transaction(transaction_id, self.triple.clone());
} else {
target.insert(self.triple.clone());
}
#[cfg(feature = "wal")]
log_rdf_wal(
&self.wal,
&grafeo_adapters::storage::wal::WalRecord::InsertRdfTriple {
subject: term_to_wal(self.triple.subject()),
predicate: term_to_wal(self.triple.predicate()),
object: term_to_wal(self.triple.object()),
graph: self.graph_name.clone(),
},
);
#[cfg(feature = "cdc")]
record_cdc_triple_insert(
&self.cdc_log,
self.triple.subject(),
self.triple.predicate(),
self.triple.object(),
self.graph_name.as_deref(),
self.cdc_epoch,
);
self.inserted = true;
Ok(None)
}
fn reset(&mut self) {
self.inserted = false;
}
fn name(&self) -> &'static str {
"RdfInsertTriple"
}
}
struct RdfInsertPatternOperator {
store: Arc<RdfStore>,
input: Box<dyn Operator>,
subject: TripleComponent,
predicate: TripleComponent,
object: TripleComponent,
column_map: HashMap<String, usize>,
done: bool,
#[cfg(feature = "wal")]
wal: Option<Arc<RdfWal>>,
#[cfg(feature = "cdc")]
cdc_log: Option<Arc<crate::cdc::CdcLog>>,
#[cfg(feature = "cdc")]
cdc_epoch: grafeo_common::types::EpochId,
}
impl RdfInsertPatternOperator {
fn new(
store: Arc<RdfStore>,
input: Box<dyn Operator>,
operands: TripleOperands,
#[cfg(feature = "wal")] wal: Option<Arc<RdfWal>>,
#[cfg(feature = "cdc")] cdc_log: Option<Arc<crate::cdc::CdcLog>>,
#[cfg(feature = "cdc")] cdc_epoch: grafeo_common::types::EpochId,
) -> Self {
Self {
store,
input,
subject: operands.subject,
predicate: operands.predicate,
object: operands.object,
column_map: operands.column_map,
done: false,
#[cfg(feature = "wal")]
wal,
#[cfg(feature = "cdc")]
cdc_log,
#[cfg(feature = "cdc")]
cdc_epoch,
}
}
fn resolve_component(
&self,
component: &TripleComponent,
chunk: &DataChunk,
row: usize,
) -> Option<Term> {
match component {
TripleComponent::Iri(iri) => Some(Term::Iri(iri.clone().into())),
TripleComponent::BlankNode(label) => Some(Term::blank(label.clone())),
TripleComponent::Literal(value) => {
let lit = match value {
Value::String(s) => Literal::simple(s.to_string()),
Value::Int64(n) => Literal::integer(*n),
Value::Float64(f) => {
Literal::typed(f.to_string(), "http://www.w3.org/2001/XMLSchema#double")
}
Value::Bool(b) => {
Literal::typed(b.to_string(), "http://www.w3.org/2001/XMLSchema#boolean")
}
_ => Literal::simple(format!("{:?}", value)),
};
Some(Term::Literal(lit))
}
TripleComponent::LangLiteral { value, lang } => {
Some(Term::lang_literal(value.clone(), lang.clone()))
}
TripleComponent::Variable(name) => {
let var_name = name.strip_prefix('?').unwrap_or(name);
if let Some(&col_idx) = self.column_map.get(var_name)
&& let Some(col) = chunk.column(col_idx)
&& let Some(value) = col.get_value(row)
{
return Self::value_to_term(&value);
}
None
}
}
}
fn value_to_term(value: &Value) -> Option<Term> {
match value {
Value::String(s) => {
if s.starts_with("http://") || s.starts_with("https://") || s.starts_with("urn:") {
Some(Term::Iri(s.to_string().into()))
} else if let Ok(n) = s.parse::<i64>() {
Some(Term::Literal(Literal::integer(n)))
} else if let Ok(f) = s.parse::<f64>() {
Some(Term::Literal(Literal::typed(
f.to_string(),
"http://www.w3.org/2001/XMLSchema#double",
)))
} else {
Some(Term::Literal(Literal::simple(s.to_string())))
}
}
Value::Int64(n) => Some(Term::Literal(Literal::integer(*n))),
Value::Float64(f) => Some(Term::Literal(Literal::typed(
f.to_string(),
"http://www.w3.org/2001/XMLSchema#double",
))),
Value::Bool(b) => Some(Term::Literal(Literal::typed(
b.to_string(),
"http://www.w3.org/2001/XMLSchema#boolean",
))),
_ => None,
}
}
}
impl Operator for RdfInsertPatternOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
if self.done {
return Ok(None);
}
let mut triples_to_insert = Vec::new();
while let Some(chunk) = self.input.next()? {
for row in 0..chunk.row_count() {
let subject = self.resolve_component(&self.subject, &chunk, row);
let predicate = self.resolve_component(&self.predicate, &chunk, row);
let object = self.resolve_component(&self.object, &chunk, row);
if let (Some(s), Some(p), Some(o)) = (subject, predicate, object) {
triples_to_insert.push(Triple::new(s, p, o));
}
}
}
for triple in &triples_to_insert {
self.store.insert(triple.clone());
}
#[cfg(feature = "wal")]
for triple in &triples_to_insert {
log_rdf_wal(
&self.wal,
&grafeo_adapters::storage::wal::WalRecord::InsertRdfTriple {
subject: term_to_wal(triple.subject()),
predicate: term_to_wal(triple.predicate()),
object: term_to_wal(triple.object()),
graph: None,
},
);
}
#[cfg(feature = "cdc")]
for triple in &triples_to_insert {
record_cdc_triple_insert(
&self.cdc_log,
triple.subject(),
triple.predicate(),
triple.object(),
None,
self.cdc_epoch,
);
}
self.done = true;
Ok(None)
}
fn reset(&mut self) {
self.done = false;
self.input.reset();
}
fn name(&self) -> &'static str {
"RdfInsertPattern"
}
}
struct RdfDeleteTripleOperator {
store: Arc<RdfStore>,
triple: Triple,
graph_name: Option<String>,
transaction_id: Option<TransactionId>,
deleted: bool,
#[cfg(feature = "wal")]
wal: Option<Arc<RdfWal>>,
#[cfg(feature = "cdc")]
cdc_log: Option<Arc<crate::cdc::CdcLog>>,
#[cfg(feature = "cdc")]
cdc_epoch: grafeo_common::types::EpochId,
}
impl RdfDeleteTripleOperator {
fn new(
store: Arc<RdfStore>,
triple: Triple,
graph_name: Option<String>,
transaction_id: Option<TransactionId>,
#[cfg(feature = "wal")] wal: Option<Arc<RdfWal>>,
#[cfg(feature = "cdc")] cdc_log: Option<Arc<crate::cdc::CdcLog>>,
#[cfg(feature = "cdc")] cdc_epoch: grafeo_common::types::EpochId,
) -> Self {
Self {
store,
triple,
graph_name,
transaction_id,
deleted: false,
#[cfg(feature = "wal")]
wal,
#[cfg(feature = "cdc")]
cdc_log,
#[cfg(feature = "cdc")]
cdc_epoch,
}
}
}
impl Operator for RdfDeleteTripleOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
if self.deleted {
return Ok(None);
}
let target = match &self.graph_name {
Some(name) => self.store.graph_or_create(name),
None => Arc::clone(&self.store),
};
if let Some(transaction_id) = self.transaction_id {
target.remove_in_transaction(transaction_id, self.triple.clone());
} else {
target.remove(&self.triple);
}
#[cfg(feature = "wal")]
log_rdf_wal(
&self.wal,
&grafeo_adapters::storage::wal::WalRecord::DeleteRdfTriple {
subject: term_to_wal(self.triple.subject()),
predicate: term_to_wal(self.triple.predicate()),
object: term_to_wal(self.triple.object()),
graph: self.graph_name.clone(),
},
);
#[cfg(feature = "cdc")]
record_cdc_triple_delete(
&self.cdc_log,
self.triple.subject(),
self.triple.predicate(),
self.triple.object(),
self.graph_name.as_deref(),
self.cdc_epoch,
);
self.deleted = true;
Ok(None)
}
fn reset(&mut self) {
self.deleted = false;
}
fn name(&self) -> &'static str {
"RdfDeleteTriple"
}
}
struct RdfDeletePatternOperator {
store: Arc<RdfStore>,
input: Box<dyn Operator>,
subject: TripleComponent,
predicate: TripleComponent,
object: TripleComponent,
column_map: HashMap<String, usize>,
done: bool,
#[cfg(feature = "wal")]
wal: Option<Arc<RdfWal>>,
#[cfg(feature = "cdc")]
cdc_log: Option<Arc<crate::cdc::CdcLog>>,
#[cfg(feature = "cdc")]
cdc_epoch: grafeo_common::types::EpochId,
}
impl RdfDeletePatternOperator {
fn new(
store: Arc<RdfStore>,
input: Box<dyn Operator>,
operands: TripleOperands,
#[cfg(feature = "wal")] wal: Option<Arc<RdfWal>>,
#[cfg(feature = "cdc")] cdc_log: Option<Arc<crate::cdc::CdcLog>>,
#[cfg(feature = "cdc")] cdc_epoch: grafeo_common::types::EpochId,
) -> Self {
Self {
store,
input,
subject: operands.subject,
predicate: operands.predicate,
object: operands.object,
column_map: operands.column_map,
done: false,
#[cfg(feature = "wal")]
wal,
#[cfg(feature = "cdc")]
cdc_log,
#[cfg(feature = "cdc")]
cdc_epoch,
}
}
fn resolve_component(
&self,
component: &TripleComponent,
chunk: &DataChunk,
row: usize,
) -> Option<Term> {
match component {
TripleComponent::Iri(iri) => Some(Term::Iri(iri.clone().into())),
TripleComponent::BlankNode(label) => Some(Term::blank(label.clone())),
TripleComponent::Literal(value) => {
let lit = match value {
Value::String(s) => Literal::simple(s.to_string()),
Value::Int64(n) => Literal::integer(*n),
Value::Float64(f) => {
Literal::typed(f.to_string(), "http://www.w3.org/2001/XMLSchema#double")
}
Value::Bool(b) => {
Literal::typed(b.to_string(), "http://www.w3.org/2001/XMLSchema#boolean")
}
_ => Literal::simple(format!("{:?}", value)),
};
Some(Term::Literal(lit))
}
TripleComponent::LangLiteral { value, lang } => {
Some(Term::lang_literal(value.clone(), lang.clone()))
}
TripleComponent::Variable(name) => {
let var_name = name.strip_prefix('?').unwrap_or(name);
if let Some(&col_idx) = self.column_map.get(var_name)
&& let Some(col) = chunk.column(col_idx)
&& let Some(value) = col.get_value(row)
{
return Self::value_to_term(&value);
}
None
}
}
}
fn value_to_term(value: &Value) -> Option<Term> {
match value {
Value::String(s) => {
if s.starts_with("http://") || s.starts_with("https://") || s.starts_with("urn:") {
Some(Term::Iri(s.to_string().into()))
} else if let Ok(n) = s.parse::<i64>() {
Some(Term::Literal(Literal::integer(n)))
} else if let Ok(f) = s.parse::<f64>() {
Some(Term::Literal(Literal::typed(
f.to_string(),
"http://www.w3.org/2001/XMLSchema#double",
)))
} else {
Some(Term::Literal(Literal::simple(s.to_string())))
}
}
Value::Int64(n) => Some(Term::Literal(Literal::integer(*n))),
Value::Float64(f) => Some(Term::Literal(Literal::typed(
f.to_string(),
"http://www.w3.org/2001/XMLSchema#double",
))),
Value::Bool(b) => Some(Term::Literal(Literal::typed(
b.to_string(),
"http://www.w3.org/2001/XMLSchema#boolean",
))),
_ => None,
}
}
}
impl Operator for RdfDeletePatternOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
if self.done {
return Ok(None);
}
let mut triples_to_delete = Vec::new();
while let Some(chunk) = self.input.next()? {
for row in 0..chunk.row_count() {
let subject = self.resolve_component(&self.subject, &chunk, row);
let predicate = self.resolve_component(&self.predicate, &chunk, row);
let object = self.resolve_component(&self.object, &chunk, row);
if let (Some(s), Some(p), Some(o)) = (subject, predicate, object) {
triples_to_delete.push(Triple::new(s, p, o));
}
}
}
for triple in &triples_to_delete {
self.store.remove(triple);
}
#[cfg(feature = "wal")]
for triple in &triples_to_delete {
log_rdf_wal(
&self.wal,
&grafeo_adapters::storage::wal::WalRecord::DeleteRdfTriple {
subject: term_to_wal(triple.subject()),
predicate: term_to_wal(triple.predicate()),
object: term_to_wal(triple.object()),
graph: None,
},
);
}
#[cfg(feature = "cdc")]
for triple in &triples_to_delete {
record_cdc_triple_delete(
&self.cdc_log,
triple.subject(),
triple.predicate(),
triple.object(),
None,
self.cdc_epoch,
);
}
self.done = true;
Ok(None)
}
fn reset(&mut self) {
self.done = false;
self.input.reset();
}
fn name(&self) -> &'static str {
"RdfDeletePattern"
}
}
struct RdfClearGraphOperator {
store: Arc<RdfStore>,
graph: Option<String>,
cleared: bool,
#[cfg(feature = "wal")]
wal: Option<Arc<RdfWal>>,
}
impl RdfClearGraphOperator {
fn new(
store: Arc<RdfStore>,
graph: Option<String>,
_silent: bool,
#[cfg(feature = "wal")] wal: Option<Arc<RdfWal>>,
) -> Self {
Self {
store,
graph,
cleared: false,
#[cfg(feature = "wal")]
wal,
}
}
}
impl Operator for RdfClearGraphOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
if self.cleared {
return Ok(None);
}
self.store.clear_graph(self.graph.as_deref());
#[cfg(feature = "wal")]
log_rdf_wal(
&self.wal,
&grafeo_adapters::storage::wal::WalRecord::ClearRdfGraph {
graph: self.graph.clone(),
},
);
self.cleared = true;
Ok(None)
}
fn reset(&mut self) {
self.cleared = false;
}
fn name(&self) -> &'static str {
"RdfClearGraph"
}
}
struct RdfCreateGraphOperator {
store: Arc<RdfStore>,
graph: String,
silent: bool,
done: bool,
#[cfg(feature = "wal")]
wal: Option<Arc<RdfWal>>,
}
impl RdfCreateGraphOperator {
fn new(
store: Arc<RdfStore>,
graph: String,
silent: bool,
#[cfg(feature = "wal")] wal: Option<Arc<RdfWal>>,
) -> Self {
Self {
store,
graph,
silent,
done: false,
#[cfg(feature = "wal")]
wal,
}
}
}
impl Operator for RdfCreateGraphOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
if self.done {
return Ok(None);
}
self.done = true;
let created = self.store.create_graph(&self.graph);
if !created && !self.silent {
return Err(OperatorError::Execution(format!(
"Graph <{}> already exists",
self.graph
)));
}
#[cfg(feature = "wal")]
if created {
log_rdf_wal(
&self.wal,
&grafeo_adapters::storage::wal::WalRecord::CreateRdfGraph {
name: self.graph.clone(),
},
);
}
Ok(None)
}
fn reset(&mut self) {
self.done = false;
}
fn name(&self) -> &'static str {
"RdfCreateGraph"
}
}
struct RdfDropGraphOperator {
store: Arc<RdfStore>,
graph: Option<String>,
silent: bool,
done: bool,
#[cfg(feature = "wal")]
wal: Option<Arc<RdfWal>>,
}
impl RdfDropGraphOperator {
fn new(
store: Arc<RdfStore>,
graph: Option<String>,
silent: bool,
#[cfg(feature = "wal")] wal: Option<Arc<RdfWal>>,
) -> Self {
Self {
store,
graph,
silent,
done: false,
#[cfg(feature = "wal")]
wal,
}
}
}
impl Operator for RdfDropGraphOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
if self.done {
return Ok(None);
}
self.done = true;
match &self.graph {
None => {
self.store.clear();
}
Some(name) => {
let dropped = self.store.drop_graph(name);
if !dropped && !self.silent {
return Err(OperatorError::Execution(format!(
"Graph <{name}> does not exist"
)));
}
}
}
#[cfg(feature = "wal")]
log_rdf_wal(
&self.wal,
&grafeo_adapters::storage::wal::WalRecord::DropRdfGraph {
name: self.graph.clone(),
},
);
Ok(None)
}
fn reset(&mut self) {
self.done = false;
}
fn name(&self) -> &'static str {
"RdfDropGraph"
}
}
struct RdfCopyGraphOperator {
store: Arc<RdfStore>,
source: Option<String>,
destination: Option<String>,
silent: bool,
done: bool,
}
impl RdfCopyGraphOperator {
fn new(
store: Arc<RdfStore>,
source: Option<String>,
destination: Option<String>,
silent: bool,
) -> Self {
Self {
store,
source,
destination,
silent,
done: false,
}
}
}
impl Operator for RdfCopyGraphOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
if self.done {
return Ok(None);
}
self.done = true;
if !self.silent
&& let Some(ref name) = self.source
&& self.store.graph(name).is_none()
{
return Err(OperatorError::Execution(format!(
"Source graph <{name}> does not exist"
)));
}
self.store
.copy_graph(self.source.as_deref(), self.destination.as_deref());
Ok(None)
}
fn reset(&mut self) {
self.done = false;
}
fn name(&self) -> &'static str {
"RdfCopyGraph"
}
}
struct RdfMoveGraphOperator {
store: Arc<RdfStore>,
source: Option<String>,
destination: Option<String>,
silent: bool,
done: bool,
}
impl RdfMoveGraphOperator {
fn new(
store: Arc<RdfStore>,
source: Option<String>,
destination: Option<String>,
silent: bool,
) -> Self {
Self {
store,
source,
destination,
silent,
done: false,
}
}
}
impl Operator for RdfMoveGraphOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
if self.done {
return Ok(None);
}
self.done = true;
if !self.silent
&& let Some(ref name) = self.source
&& self.store.graph(name).is_none()
{
return Err(OperatorError::Execution(format!(
"Source graph <{name}> does not exist"
)));
}
self.store
.move_graph(self.source.as_deref(), self.destination.as_deref());
Ok(None)
}
fn reset(&mut self) {
self.done = false;
}
fn name(&self) -> &'static str {
"RdfMoveGraph"
}
}
struct RdfAddGraphOperator {
store: Arc<RdfStore>,
source: Option<String>,
destination: Option<String>,
silent: bool,
done: bool,
}
impl RdfAddGraphOperator {
fn new(
store: Arc<RdfStore>,
source: Option<String>,
destination: Option<String>,
silent: bool,
) -> Self {
Self {
store,
source,
destination,
silent,
done: false,
}
}
}
impl Operator for RdfAddGraphOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
if self.done {
return Ok(None);
}
self.done = true;
if !self.silent
&& let Some(ref name) = self.source
&& self.store.graph(name).is_none()
{
return Err(OperatorError::Execution(format!(
"Source graph <{name}> does not exist"
)));
}
self.store
.add_graph(self.source.as_deref(), self.destination.as_deref());
Ok(None)
}
fn reset(&mut self) {
self.done = false;
}
fn name(&self) -> &'static str {
"RdfAddGraph"
}
}
struct RdfModifyOperator {
store: Arc<RdfStore>,
input: Box<dyn Operator>,
delete_templates: Vec<TripleTemplate>,
insert_templates: Vec<TripleTemplate>,
column_map: HashMap<String, usize>,
done: bool,
#[cfg(feature = "cdc")]
cdc_log: Option<Arc<crate::cdc::CdcLog>>,
#[cfg(feature = "cdc")]
cdc_epoch: grafeo_common::types::EpochId,
}
impl RdfModifyOperator {
fn new(
store: Arc<RdfStore>,
input: Box<dyn Operator>,
delete_templates: Vec<TripleTemplate>,
insert_templates: Vec<TripleTemplate>,
column_map: HashMap<String, usize>,
#[cfg(feature = "cdc")] cdc_log: Option<Arc<crate::cdc::CdcLog>>,
#[cfg(feature = "cdc")] cdc_epoch: grafeo_common::types::EpochId,
) -> Self {
Self {
store,
input,
delete_templates,
insert_templates,
column_map,
done: false,
#[cfg(feature = "cdc")]
cdc_log,
#[cfg(feature = "cdc")]
cdc_epoch,
}
}
fn resolve_component(
&self,
component: &TripleComponent,
chunk: &DataChunk,
row: usize,
) -> Option<Term> {
match component {
TripleComponent::Iri(iri) => Some(Term::Iri(iri.clone().into())),
TripleComponent::BlankNode(label) => Some(Term::blank(label.clone())),
TripleComponent::Literal(value) => {
let lit = match value {
Value::String(s) => Literal::simple(s.to_string()),
Value::Int64(n) => Literal::integer(*n),
Value::Float64(f) => {
Literal::typed(f.to_string(), "http://www.w3.org/2001/XMLSchema#double")
}
Value::Bool(b) => {
Literal::typed(b.to_string(), "http://www.w3.org/2001/XMLSchema#boolean")
}
_ => Literal::simple(format!("{:?}", value)),
};
Some(Term::Literal(lit))
}
TripleComponent::LangLiteral { value, lang } => {
Some(Term::lang_literal(value.clone(), lang.clone()))
}
TripleComponent::Variable(name) => {
let var_name = name.strip_prefix('?').unwrap_or(name);
if let Some(&col_idx) = self.column_map.get(var_name)
&& let Some(col) = chunk.column(col_idx)
&& let Some(value) = col.get_value(row)
{
return Self::value_to_term(&value);
}
None
}
}
}
fn value_to_term(value: &Value) -> Option<Term> {
match value {
Value::String(s) => {
if s.starts_with("http://") || s.starts_with("https://") || s.starts_with("urn:") {
Some(Term::Iri(s.to_string().into()))
} else if let Ok(n) = s.parse::<i64>() {
Some(Term::Literal(Literal::integer(n)))
} else if let Ok(f) = s.parse::<f64>() {
Some(Term::Literal(Literal::typed(
f.to_string(),
"http://www.w3.org/2001/XMLSchema#double",
)))
} else {
Some(Term::Literal(Literal::simple(s.to_string())))
}
}
Value::Int64(n) => Some(Term::Literal(Literal::integer(*n))),
Value::Float64(f) => Some(Term::Literal(Literal::typed(
f.to_string(),
"http://www.w3.org/2001/XMLSchema#double",
))),
Value::Bool(b) => Some(Term::Literal(Literal::typed(
b.to_string(),
"http://www.w3.org/2001/XMLSchema#boolean",
))),
_ => None,
}
}
}
impl Operator for RdfModifyOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
if self.done {
return Ok(None);
}
let mut bindings: Vec<(DataChunk, usize)> = Vec::new();
while let Some(chunk) = self.input.next()? {
for row in 0..chunk.row_count() {
bindings.push((chunk.clone(), row));
}
}
for template in &self.delete_templates {
for (chunk, row) in &bindings {
let subject = self.resolve_component(&template.subject, chunk, *row);
let predicate = self.resolve_component(&template.predicate, chunk, *row);
let object = self.resolve_component(&template.object, chunk, *row);
if let (Some(s), Some(p), Some(o)) = (subject, predicate, object) {
let triple = Triple::new(s, p, o);
#[cfg(feature = "cdc")]
record_cdc_triple_delete(
&self.cdc_log,
triple.subject(),
triple.predicate(),
triple.object(),
None,
self.cdc_epoch,
);
self.store.remove(&triple);
}
}
}
for template in &self.insert_templates {
for (chunk, row) in &bindings {
let subject = self.resolve_component(&template.subject, chunk, *row);
let predicate = self.resolve_component(&template.predicate, chunk, *row);
let object = self.resolve_component(&template.object, chunk, *row);
if let (Some(s), Some(p), Some(o)) = (subject, predicate, object) {
let triple = Triple::new(s, p, o);
#[cfg(feature = "cdc")]
record_cdc_triple_insert(
&self.cdc_log,
triple.subject(),
triple.predicate(),
triple.object(),
None,
self.cdc_epoch,
);
self.store.insert(triple);
}
}
}
self.done = true;
Ok(None)
}
fn reset(&mut self) {
self.done = false;
self.input.reset();
}
fn name(&self) -> &'static str {
"RdfModify"
}
}
struct RdfUnionOperator {
operators: Vec<Box<dyn Operator>>,
current_idx: usize,
}
impl RdfUnionOperator {
fn new(operators: Vec<Box<dyn Operator>>) -> Self {
Self {
operators,
current_idx: 0,
}
}
}
impl Operator for RdfUnionOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
while self.current_idx < self.operators.len() {
let op = &mut self.operators[self.current_idx];
match op.next()? {
Some(chunk) => return Ok(Some(chunk)),
None => self.current_idx += 1,
}
}
Ok(None)
}
fn reset(&mut self) {
self.current_idx = 0;
for op in &mut self.operators {
op.reset();
}
}
fn name(&self) -> &'static str {
"RdfUnion"
}
}
struct RdfBindOperator {
child: Box<dyn Operator>,
expression: FilterExpression,
variable_columns: HashMap<String, usize>,
}
impl RdfBindOperator {
fn new(
child: Box<dyn Operator>,
expression: FilterExpression,
variable_columns: HashMap<String, usize>,
) -> Self {
Self {
child,
expression,
variable_columns,
}
}
}
impl Operator for RdfBindOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
let Some(input) = self.child.next()? else {
return Ok(None);
};
let input_col_count = input.column_count();
let row_count = input.row_count();
let mut output_types: Vec<LogicalType> = Vec::with_capacity(input_col_count + 1);
for col_idx in 0..input_col_count {
if let Some(col) = input.column(col_idx) {
output_types.push(col.logical_type());
} else {
output_types.push(LogicalType::String);
}
}
output_types.push(LogicalType::Any);
let mut output = DataChunk::with_capacity(&output_types, row_count);
for col_idx in 0..input_col_count {
let output_col = output
.column_mut(col_idx)
.expect("column exists: index within schema bounds");
if let Some(input_col) = input.column(col_idx) {
for row in input.selected_indices() {
if let Some(value) = input_col.get_value(row) {
output_col.push_value(value);
} else {
output_col.push_value(Value::Null);
}
}
}
}
let evaluator =
RdfExpressionPredicate::new(self.expression.clone(), self.variable_columns.clone());
let bind_col = output
.column_mut(input_col_count)
.expect("column exists: bind column is last in schema");
for row in input.selected_indices() {
let value = evaluator.eval(&input, row).unwrap_or(Value::Null);
bind_col.push_value(value);
}
output.set_count(row_count);
Ok(Some(output))
}
fn reset(&mut self) {
self.child.reset();
}
fn name(&self) -> &'static str {
"RdfBind"
}
}
enum RdfProjectExpr {
Column(usize),
Constant(Value),
Expression {
expr: FilterExpression,
variable_columns: HashMap<String, usize>,
},
}
struct RdfProjectOperator {
child: Box<dyn Operator>,
projections: Vec<RdfProjectExpr>,
output_types: Vec<LogicalType>,
}
impl RdfProjectOperator {
fn new(
child: Box<dyn Operator>,
projections: Vec<RdfProjectExpr>,
output_types: Vec<LogicalType>,
) -> Self {
assert_eq!(projections.len(), output_types.len());
Self {
child,
projections,
output_types,
}
}
}
impl Operator for RdfProjectOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
let Some(input) = self.child.next()? else {
return Ok(None);
};
let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
for (i, proj) in self.projections.iter().enumerate() {
let output_col = output
.column_mut(i)
.expect("column exists: index matches projection schema");
match proj {
RdfProjectExpr::Column(col_idx) => {
let input_col = input.column(*col_idx).ok_or_else(|| {
OperatorError::ColumnNotFound(format!("Column {col_idx}"))
})?;
for row in input.selected_indices() {
if let Some(value) = input_col.get_value(row) {
output_col.push_value(value);
} else {
output_col.push_value(Value::Null);
}
}
}
RdfProjectExpr::Constant(value) => {
for _ in input.selected_indices() {
output_col.push_value(value.clone());
}
}
RdfProjectExpr::Expression {
expr,
variable_columns,
} => {
let evaluator =
RdfExpressionPredicate::new(expr.clone(), variable_columns.clone());
for row in input.selected_indices() {
let value = evaluator.eval(&input, row).unwrap_or(Value::Null);
output_col.push_value(value);
}
}
}
}
output.set_count(input.row_count());
Ok(Some(output))
}
fn reset(&mut self) {
self.child.reset();
}
fn name(&self) -> &'static str {
"RdfProject"
}
}
struct RdfTripleScanOperator {
store: Arc<RdfStore>,
pattern: TriplePattern,
output_mask: [bool; 4],
graph: Option<String>,
scan_all_graphs: bool,
chunk_size: usize,
triples: Option<Vec<(Option<String>, Arc<Triple>)>>,
position: usize,
}
impl RdfTripleScanOperator {
fn new(
store: Arc<RdfStore>,
pattern: TriplePattern,
output_mask: [bool; 4],
chunk_size: usize,
graph: Option<String>,
scan_all_graphs: bool,
) -> Self {
Self {
store,
pattern,
output_mask,
graph,
scan_all_graphs,
chunk_size,
triples: None,
position: 0,
}
}
fn ensure_triples(&mut self) {
if self.triples.is_none() {
self.triples = Some(if self.scan_all_graphs {
self.store.find_in_graphs(&self.pattern, Some(&[]))
} else if let Some(ref graph_iri) = self.graph {
self.store
.graph(graph_iri)
.map(|g| {
g.find(&self.pattern)
.into_iter()
.map(|t| (Some(graph_iri.clone()), t))
.collect()
})
.unwrap_or_default()
} else {
self.store
.find(&self.pattern)
.into_iter()
.map(|t| (None, t))
.collect()
});
}
}
fn output_column_count(&self) -> usize {
self.output_mask.iter().filter(|&&b| b).count()
}
}
impl Operator for RdfTripleScanOperator {
fn next(&mut self) -> std::result::Result<Option<DataChunk>, OperatorError> {
self.ensure_triples();
let triples = self
.triples
.as_ref()
.expect("triples populated by ensure_triples");
if self.position >= triples.len() {
return Ok(None);
}
let end = (self.position + self.chunk_size).min(triples.len());
let batch_size = end - self.position;
let col_count = self.output_column_count();
let schema: Vec<LogicalType> = (0..col_count).map(|_| LogicalType::String).collect();
let mut chunk = DataChunk::with_capacity(&schema, batch_size);
for i in self.position..end {
let (ref graph_name, ref triple) = triples[i];
let mut col_idx = 0;
if self.output_mask[0] {
if let Some(col) = chunk.column_mut(col_idx) {
col.push_string(term_to_string(triple.subject()));
}
col_idx += 1;
}
if self.output_mask[1] {
if let Some(col) = chunk.column_mut(col_idx) {
col.push_string(term_to_string(triple.predicate()));
}
col_idx += 1;
}
if self.output_mask[2] {
if let Some(col) = chunk.column_mut(col_idx) {
push_term_value(col, triple.object());
}
col_idx += 1;
}
if self.output_mask[3] {
if let Some(col) = chunk.column_mut(col_idx) {
match graph_name {
Some(name) => col.push_string(name.clone()),
None => col.push_value(Value::Null),
}
}
}
}
chunk.set_count(batch_size);
self.position = end;
Ok(Some(chunk))
}
fn reset(&mut self) {
self.position = 0;
}
fn name(&self) -> &'static str {
"RdfTripleScan"
}
}
struct RdfExpressionPredicate {
expression: FilterExpression,
variable_columns: HashMap<String, usize>,
}
impl RdfExpressionPredicate {
fn new(expression: FilterExpression, variable_columns: HashMap<String, usize>) -> Self {
Self {
expression,
variable_columns,
}
}
fn eval(&self, chunk: &DataChunk, row: usize) -> Option<Value> {
self.eval_expr(&self.expression, chunk, row)
}
fn eval_expr(&self, expr: &FilterExpression, chunk: &DataChunk, row: usize) -> Option<Value> {
match expr {
FilterExpression::Literal(v) => Some(v.clone()),
FilterExpression::Variable(name) => {
let col_idx = *self.variable_columns.get(name)?;
chunk.column(col_idx)?.get_value(row)
}
FilterExpression::Property { variable, .. } => {
let col_idx = *self.variable_columns.get(variable)?;
chunk.column(col_idx)?.get_value(row)
}
FilterExpression::Binary { left, op, right } => {
if *op == BinaryFilterOp::In {
let left_val = self.eval_expr(left, chunk, row)?;
let right_val = self.eval_expr(right, chunk, row)?;
return match right_val {
Value::List(items) => {
if left_val.is_null() {
return Some(Value::Null);
}
let mut has_null = false;
for item in items.iter() {
if item.is_null() {
has_null = true;
} else if rdf_values_equal(&left_val, item) {
return Some(Value::Bool(true));
}
}
if has_null {
Some(Value::Null)
} else {
Some(Value::Bool(false))
}
}
_ => None,
};
}
let left_val = self.eval_expr(left, chunk, row)?;
let right_val = self.eval_expr(right, chunk, row)?;
self.eval_binary_op(&left_val, *op, &right_val)
}
FilterExpression::Unary { op, operand } => {
let val = self.eval_expr(operand, chunk, row);
self.eval_unary_op(*op, val)
}
FilterExpression::Id(var)
| FilterExpression::Labels(var)
| FilterExpression::Type(var) => {
let col_idx = *self.variable_columns.get(var)?;
chunk.column(col_idx)?.get_value(row)
}
FilterExpression::FunctionCall { name, args } => {
self.eval_function_call(name, args, chunk, row)
}
FilterExpression::List(items) => {
let values: Vec<Value> = items
.iter()
.filter_map(|item| self.eval_expr(item, chunk, row))
.collect();
Some(Value::List(values.into()))
}
FilterExpression::Case { .. }
| FilterExpression::Map(_)
| FilterExpression::IndexAccess { .. }
| FilterExpression::SliceAccess { .. }
| FilterExpression::ListComprehension { .. }
| FilterExpression::ListPredicate { .. }
| FilterExpression::ExistsSubquery { .. }
| FilterExpression::CountSubquery { .. }
| FilterExpression::Reduce { .. } => None,
}
}
fn eval_binary_op(&self, left: &Value, op: BinaryFilterOp, right: &Value) -> Option<Value> {
match op {
BinaryFilterOp::And => Some(Value::Bool(left.as_bool()? && right.as_bool()?)),
BinaryFilterOp::Or => Some(Value::Bool(left.as_bool()? || right.as_bool()?)),
BinaryFilterOp::Xor => Some(Value::Bool(left.as_bool()? != right.as_bool()?)),
BinaryFilterOp::Eq => compare_values(left, right, |o| o.is_eq()),
BinaryFilterOp::Ne => compare_values(left, right, |o| o.is_ne()),
BinaryFilterOp::Lt => compare_values(left, right, |o| o.is_lt()),
BinaryFilterOp::Le => compare_values(left, right, |o| o.is_le()),
BinaryFilterOp::Gt => compare_values(left, right, |o| o.is_gt()),
BinaryFilterOp::Ge => compare_values(left, right, |o| o.is_ge()),
BinaryFilterOp::Add => {
fn to_f64(v: &Value) -> Option<f64> {
match v {
Value::Int64(i) => Some(*i as f64),
Value::Float64(f) => Some(*f),
Value::String(s) => s.parse::<f64>().ok(),
_ => None,
}
}
match (left, right) {
(Value::Int64(l), Value::Int64(r)) => Some(Value::Int64(l + r)),
(Value::Float64(l), Value::Float64(r)) => Some(Value::Float64(l + r)),
(Value::Int64(l), Value::Float64(r)) => Some(Value::Float64(*l as f64 + r)),
(Value::Float64(l), Value::Int64(r)) => Some(Value::Float64(l + *r as f64)),
_ => {
let l = to_f64(left)?;
let r = to_f64(right)?;
Some(Value::Float64(l + r))
}
}
}
BinaryFilterOp::Sub => {
fn to_f64(v: &Value) -> Option<f64> {
match v {
Value::Int64(i) => Some(*i as f64),
Value::Float64(f) => Some(*f),
Value::String(s) => s.parse::<f64>().ok(),
_ => None,
}
}
match (left, right) {
(Value::Int64(l), Value::Int64(r)) => Some(Value::Int64(l - r)),
(Value::Float64(l), Value::Float64(r)) => Some(Value::Float64(l - r)),
(Value::Int64(l), Value::Float64(r)) => Some(Value::Float64(*l as f64 - r)),
(Value::Float64(l), Value::Int64(r)) => Some(Value::Float64(l - *r as f64)),
_ => {
let l = to_f64(left)?;
let r = to_f64(right)?;
Some(Value::Float64(l - r))
}
}
}
BinaryFilterOp::Mul => {
fn to_f64(v: &Value) -> Option<f64> {
match v {
Value::Int64(i) => Some(*i as f64),
Value::Float64(f) => Some(*f),
Value::String(s) => s.parse::<f64>().ok(),
_ => None,
}
}
match (left, right) {
(Value::Int64(l), Value::Int64(r)) => Some(Value::Int64(l * r)),
(Value::Float64(l), Value::Float64(r)) => Some(Value::Float64(l * r)),
(Value::Int64(l), Value::Float64(r)) => Some(Value::Float64(*l as f64 * r)),
(Value::Float64(l), Value::Int64(r)) => Some(Value::Float64(l * *r as f64)),
_ => {
let l = to_f64(left)?;
let r = to_f64(right)?;
Some(Value::Float64(l * r))
}
}
}
BinaryFilterOp::Div => {
fn to_f64(v: &Value) -> Option<f64> {
match v {
Value::Int64(i) => Some(*i as f64),
Value::Float64(f) => Some(*f),
Value::String(s) => s.parse::<f64>().ok(),
_ => None,
}
}
match (left, right) {
(Value::Int64(l), Value::Int64(r)) if *r != 0 => Some(Value::Int64(l / r)),
(Value::Float64(l), Value::Float64(r)) if *r != 0.0 => {
Some(Value::Float64(l / r))
}
(Value::Int64(l), Value::Float64(r)) if *r != 0.0 => {
Some(Value::Float64(*l as f64 / r))
}
(Value::Float64(l), Value::Int64(r)) if *r != 0 => {
Some(Value::Float64(l / *r as f64))
}
_ => {
let l = to_f64(left)?;
let r = to_f64(right)?;
if r != 0.0 {
Some(Value::Float64(l / r))
} else {
None
}
}
}
}
BinaryFilterOp::Mod => match (left, right) {
(Value::Int64(l), Value::Int64(r)) if *r != 0 => Some(Value::Int64(l % r)),
_ => None,
},
BinaryFilterOp::Contains => match (left, right) {
(Value::String(l), Value::String(r)) => Some(Value::Bool(l.contains(&**r))),
_ => None,
},
BinaryFilterOp::StartsWith => match (left, right) {
(Value::String(l), Value::String(r)) => Some(Value::Bool(l.starts_with(&**r))),
_ => None,
},
BinaryFilterOp::EndsWith => match (left, right) {
(Value::String(l), Value::String(r)) => Some(Value::Bool(l.ends_with(&**r))),
_ => None,
},
BinaryFilterOp::In => {
None
}
BinaryFilterOp::Regex => {
match (left, right) {
(Value::String(text), Value::String(pattern)) => {
match Regex::new(pattern) {
Ok(re) => Some(Value::Bool(re.is_match(text))),
Err(_) => None, }
}
_ => None,
}
}
BinaryFilterOp::Pow => {
match (left, right) {
(Value::Int64(base), Value::Int64(exp)) => {
Some(Value::Float64((*base as f64).powf(*exp as f64)))
}
(Value::Float64(base), Value::Float64(exp)) => {
Some(Value::Float64(base.powf(*exp)))
}
(Value::Int64(base), Value::Float64(exp)) => {
Some(Value::Float64((*base as f64).powf(*exp)))
}
(Value::Float64(base), Value::Int64(exp)) => {
Some(Value::Float64(base.powf(*exp as f64)))
}
_ => None,
}
}
BinaryFilterOp::Like => {
match (left, right) {
(Value::String(s), Value::String(pattern)) => {
let mut re_pat = String::with_capacity(pattern.len() + 4);
re_pat.push('^');
let mut chars = pattern.chars().peekable();
while let Some(ch) = chars.next() {
match ch {
'%' => re_pat.push_str(".*"),
'_' => re_pat.push('.'),
'\\' => {
if let Some(next) = chars.next() {
if ".+*?^${}()|[]\\".contains(next) {
re_pat.push('\\');
}
re_pat.push(next);
}
}
_ => {
if ".+*?^${}()|[]\\".contains(ch) {
re_pat.push('\\');
}
re_pat.push(ch);
}
}
}
re_pat.push('$');
match Regex::new(&re_pat) {
Ok(re) => Some(Value::Bool(re.is_match(s))),
Err(_) => None,
}
}
_ => None,
}
}
BinaryFilterOp::Concat => match (left, right) {
(Value::String(a), Value::String(b)) => {
let mut s = String::with_capacity(a.len() + b.len());
s.push_str(a);
s.push_str(b);
Some(Value::String(s.into()))
}
(Value::String(a), other) => {
let b = match other {
Value::Int64(i) => i.to_string(),
Value::Float64(f) => f.to_string(),
Value::Bool(b) => b.to_string(),
_ => return None,
};
let mut s = String::with_capacity(a.len() + b.len());
s.push_str(a);
s.push_str(&b);
Some(Value::String(s.into()))
}
(other, Value::String(b)) => {
let a = match other {
Value::Int64(i) => i.to_string(),
Value::Float64(f) => f.to_string(),
Value::Bool(bo) => bo.to_string(),
_ => return None,
};
let mut s = String::with_capacity(a.len() + b.len());
s.push_str(&a);
s.push_str(b);
Some(Value::String(s.into()))
}
_ => None,
},
}
}
fn eval_unary_op(&self, op: UnaryFilterOp, val: Option<Value>) -> Option<Value> {
match op {
UnaryFilterOp::Not => Some(Value::Bool(!val?.as_bool()?)),
UnaryFilterOp::IsNull => Some(Value::Bool(val.is_none())),
UnaryFilterOp::IsNotNull => Some(Value::Bool(val.is_some())),
UnaryFilterOp::Neg => match val? {
Value::Int64(v) => Some(Value::Int64(-v)),
Value::Float64(v) => Some(Value::Float64(-v)),
_ => None,
},
}
}
fn eval_function_call(
&self,
name: &str,
args: &[FilterExpression],
chunk: &DataChunk,
row: usize,
) -> Option<Value> {
let func_name = name.to_uppercase();
match func_name.as_str() {
"CONCAT" => {
let mut result = String::new();
for arg in args {
if let Some(Value::String(s)) = self.eval_expr(arg, chunk, row) {
result.push_str(&s);
} else if let Some(val) = self.eval_expr(arg, chunk, row) {
result.push_str(&value_to_string(&val));
}
}
Some(Value::String(result.into()))
}
"REPLACE" => {
if args.len() < 3 {
return None;
}
let text = match self.eval_expr(&args[0], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
let pattern = match self.eval_expr(&args[1], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
let replacement = match self.eval_expr(&args[2], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
if args.len() >= 4
&& let Some(Value::String(flags)) = self.eval_expr(&args[3], chunk, row)
&& (flags.contains('r') || flags.contains('i'))
{
let regex_pattern = if flags.contains('i') {
format!("(?i){}", &pattern)
} else {
pattern.clone()
};
if let Ok(re) = Regex::new(®ex_pattern) {
return Some(Value::String(re.replace_all(&text, &replacement).into()));
}
}
Some(Value::String(text.replace(&pattern, &replacement).into()))
}
"STRLEN" => {
if args.is_empty() {
return None;
}
let text = match self.eval_expr(&args[0], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
Some(Value::Int64(text.chars().count() as i64))
}
"UCASE" | "UPPER" => {
if args.is_empty() {
return None;
}
let text = match self.eval_expr(&args[0], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
Some(Value::String(text.to_uppercase().into()))
}
"LCASE" | "LOWER" => {
if args.is_empty() {
return None;
}
let text = match self.eval_expr(&args[0], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
Some(Value::String(text.to_lowercase().into()))
}
"SUBSTR" | "SUBSTRING" => {
if args.len() < 2 {
return None;
}
let text = match self.eval_expr(&args[0], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
let start = match self.eval_expr(&args[1], chunk, row)? {
Value::Int64(i) => (i.max(1) - 1) as usize, _ => return None,
};
let len = if args.len() >= 3 {
match self.eval_expr(&args[2], chunk, row)? {
Value::Int64(i) => Some(i.max(0) as usize),
_ => return None,
}
} else {
None
};
let chars: Vec<char> = text.chars().collect();
let substr: String = if let Some(len) = len {
chars.iter().skip(start).take(len).collect()
} else {
chars.iter().skip(start).collect()
};
Some(Value::String(substr.into()))
}
"STRSTARTS" => {
if args.len() < 2 {
return None;
}
let text = match self.eval_expr(&args[0], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
let prefix = match self.eval_expr(&args[1], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
Some(Value::Bool(text.starts_with(&prefix)))
}
"STRENDS" => {
if args.len() < 2 {
return None;
}
let text = match self.eval_expr(&args[0], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
let suffix = match self.eval_expr(&args[1], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
Some(Value::Bool(text.ends_with(&suffix)))
}
"CONTAINS" => {
if args.len() < 2 {
return None;
}
let text = match self.eval_expr(&args[0], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
let pattern = match self.eval_expr(&args[1], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
Some(Value::Bool(text.contains(&pattern)))
}
"STRBEFORE" => {
if args.len() < 2 {
return None;
}
let text = match self.eval_expr(&args[0], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
let pattern = match self.eval_expr(&args[1], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
if let Some(pos) = text.find(&pattern) {
Some(Value::String(text[..pos].to_string().into()))
} else {
Some(Value::String("".into()))
}
}
"STRAFTER" => {
if args.len() < 2 {
return None;
}
let text = match self.eval_expr(&args[0], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
let pattern = match self.eval_expr(&args[1], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
if let Some(pos) = text.find(&pattern) {
Some(Value::String(
text[pos + pattern.len()..].to_string().into(),
))
} else {
Some(Value::String("".into()))
}
}
"ENCODE_FOR_URI" => {
if args.is_empty() {
return None;
}
let text = match self.eval_expr(&args[0], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
let encoded: String = text
.chars()
.map(|c| match c {
'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '_' | '.' | '~' => c.to_string(),
_ => format!("%{:02X}", c as u32),
})
.collect();
Some(Value::String(encoded.into()))
}
"COALESCE" => {
for arg in args {
if let Some(val) = self.eval_expr(arg, chunk, row)
&& !matches!(val, Value::Null)
{
return Some(val);
}
}
None
}
"IF" => {
if args.len() < 3 {
return None;
}
let condition = self.eval_expr(&args[0], chunk, row)?;
if condition.as_bool()? {
self.eval_expr(&args[1], chunk, row)
} else {
self.eval_expr(&args[2], chunk, row)
}
}
"BOUND" => {
if args.is_empty() {
return None;
}
if let FilterExpression::Variable(var_name) = &args[0] {
if let Some(&col_idx) = self.variable_columns.get(var_name)
&& let Some(col) = chunk.column(col_idx)
{
return Some(Value::Bool(!col.is_null(row)));
}
return Some(Value::Bool(false));
}
let is_bound = self.eval_expr(&args[0], chunk, row).is_some();
Some(Value::Bool(is_bound))
}
"STR" => {
if args.is_empty() {
return None;
}
let val = self.eval_expr(&args[0], chunk, row)?;
Some(Value::String(value_to_string(&val).into()))
}
"ISIRI" | "ISURI" => {
if args.is_empty() {
return None;
}
let val = self.eval_expr(&args[0], chunk, row)?;
if let Value::String(s) = val {
let is_iri = s.contains("://") || s.starts_with("urn:");
Some(Value::Bool(is_iri))
} else {
Some(Value::Bool(false))
}
}
"ISBLANK" => {
if args.is_empty() {
return None;
}
let val = self.eval_expr(&args[0], chunk, row)?;
if let Value::String(s) = val {
Some(Value::Bool(s.starts_with("_:")))
} else {
Some(Value::Bool(false))
}
}
"ISLITERAL" => {
if args.is_empty() {
return None;
}
let val = self.eval_expr(&args[0], chunk, row)?;
match &val {
Value::String(s) => {
Some(Value::Bool(!s.contains("://") && !s.starts_with("_:")))
}
_ => Some(Value::Bool(true)),
}
}
"ISNUMERIC" => {
if args.is_empty() {
return None;
}
let val = self.eval_expr(&args[0], chunk, row)?;
let is_numeric = matches!(val, Value::Int64(_) | Value::Float64(_))
|| matches!(&val, Value::String(s) if s.parse::<f64>().is_ok());
Some(Value::Bool(is_numeric))
}
"ABS" => {
if args.is_empty() {
return None;
}
match self.eval_expr(&args[0], chunk, row)? {
Value::Int64(v) => Some(Value::Int64(v.abs())),
Value::Float64(v) => Some(Value::Float64(v.abs())),
_ => None,
}
}
"CEIL" => {
if args.is_empty() {
return None;
}
match self.eval_expr(&args[0], chunk, row)? {
Value::Int64(v) => Some(Value::Int64(v)),
Value::Float64(v) => Some(Value::Float64(v.ceil())),
_ => None,
}
}
"FLOOR" => {
if args.is_empty() {
return None;
}
match self.eval_expr(&args[0], chunk, row)? {
Value::Int64(v) => Some(Value::Int64(v)),
Value::Float64(v) => Some(Value::Float64(v.floor())),
_ => None,
}
}
"ROUND" => {
if args.is_empty() {
return None;
}
match self.eval_expr(&args[0], chunk, row)? {
Value::Int64(v) => Some(Value::Int64(v)),
Value::Float64(v) => Some(Value::Float64(v.round())),
_ => None,
}
}
"REGEX" => {
if args.len() < 2 {
return None;
}
let text = match self.eval_expr(&args[0], chunk, row)? {
Value::String(s) => s.to_string(),
v => value_to_string(&v),
};
let pattern = match self.eval_expr(&args[1], chunk, row)? {
Value::String(s) => s.to_string(),
_ => return None,
};
let regex_pattern = if args.len() >= 3
&& let Some(Value::String(flags)) = self.eval_expr(&args[2], chunk, row)
&& flags.contains('i')
{
format!("(?i){pattern}")
} else {
pattern
};
match Regex::new(®ex_pattern) {
Ok(re) => Some(Value::Bool(re.is_match(&text))),
Err(_) => None,
}
}
"NOW" => {
let ts = grafeo_common::types::Timestamp::now();
Some(Value::Timestamp(ts))
}
"YEAR" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
match val {
Value::Date(d) => Some(Value::Int64(i64::from(d.year()))),
Value::Timestamp(ts) => Some(Value::Int64(i64::from(ts.to_date().year()))),
Value::String(s) => grafeo_common::types::Date::parse(&s)
.map(|d| Value::Int64(i64::from(d.year()))),
_ => None,
}
}
"MONTH" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
match val {
Value::Date(d) => Some(Value::Int64(i64::from(d.month()))),
Value::Timestamp(ts) => Some(Value::Int64(i64::from(ts.to_date().month()))),
Value::String(s) => grafeo_common::types::Date::parse(&s)
.map(|d| Value::Int64(i64::from(d.month()))),
_ => None,
}
}
"DAY" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
match val {
Value::Date(d) => Some(Value::Int64(i64::from(d.day()))),
Value::Timestamp(ts) => Some(Value::Int64(i64::from(ts.to_date().day()))),
Value::String(s) => grafeo_common::types::Date::parse(&s)
.map(|d| Value::Int64(i64::from(d.day()))),
_ => None,
}
}
"HOURS" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
match val {
Value::Time(t) => Some(Value::Int64(i64::from(t.hour()))),
Value::Timestamp(ts) => Some(Value::Int64(i64::from(ts.to_time().hour()))),
Value::String(s) => grafeo_common::types::Time::parse(&s)
.map(|t| Value::Int64(i64::from(t.hour()))),
_ => None,
}
}
"MINUTES" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
match val {
Value::Time(t) => Some(Value::Int64(i64::from(t.minute()))),
Value::Timestamp(ts) => Some(Value::Int64(i64::from(ts.to_time().minute()))),
Value::String(s) => grafeo_common::types::Time::parse(&s)
.map(|t| Value::Int64(i64::from(t.minute()))),
_ => None,
}
}
"SECONDS" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
let to_secs = |t: &grafeo_common::types::Time| {
f64::from(t.second()) + f64::from(t.nanosecond()) / 1_000_000_000.0
};
match val {
Value::Time(t) => Some(Value::Float64(to_secs(&t))),
Value::Timestamp(ts) => Some(Value::Float64(to_secs(&ts.to_time()))),
Value::String(s) => {
grafeo_common::types::Time::parse(&s).map(|t| Value::Float64(to_secs(&t)))
}
_ => None,
}
}
"TIMEZONE" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
match val {
Value::Time(t) => t.offset_seconds().map(|offset| {
Value::Duration(grafeo_common::types::Duration::from_seconds(i64::from(
offset,
)))
}),
Value::ZonedDatetime(zdt) => Some(Value::Duration(
grafeo_common::types::Duration::from_seconds(i64::from(
zdt.offset_seconds(),
)),
)),
_ => None,
}
}
"TZ" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
match val {
Value::Time(t) => {
if let Some(offset) = t.offset_seconds() {
Some(Value::String(format_tz_offset(offset).into()))
} else {
Some(Value::String(String::new().into()))
}
}
Value::ZonedDatetime(zdt) => {
Some(Value::String(format_tz_offset(zdt.offset_seconds()).into()))
}
_ => Some(Value::String(String::new().into())),
}
}
"MD5" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
let s = value_to_string(&val);
let digest = md5::compute(s.as_bytes());
Some(Value::String(format!("{digest:x}").into()))
}
"SHA1" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
let s = value_to_string(&val);
use sha1::Digest as _;
let hash = sha1::Sha1::digest(s.as_bytes());
Some(Value::String(format!("{hash:x}").into()))
}
"SHA256" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
let s = value_to_string(&val);
use sha2::Digest as _;
let hash = sha2::Sha256::digest(s.as_bytes());
Some(Value::String(format!("{hash:x}").into()))
}
"SHA384" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
let s = value_to_string(&val);
use sha2::Digest as _;
let hash = sha2::Sha384::digest(s.as_bytes());
Some(Value::String(format!("{hash:x}").into()))
}
"SHA512" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
let s = value_to_string(&val);
use sha2::Digest as _;
let hash = sha2::Sha512::digest(s.as_bytes());
Some(Value::String(format!("{hash:x}").into()))
}
"LANG" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
match val {
Value::String(_) => Some(Value::String(String::new().into())),
_ => Some(Value::String(String::new().into())),
}
}
"DATATYPE" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
let dt = match &val {
Value::String(_) => "http://www.w3.org/2001/XMLSchema#string",
Value::Int64(_) => "http://www.w3.org/2001/XMLSchema#integer",
Value::Float64(_) => "http://www.w3.org/2001/XMLSchema#double",
Value::Bool(_) => "http://www.w3.org/2001/XMLSchema#boolean",
Value::Date(_) => "http://www.w3.org/2001/XMLSchema#date",
Value::Time(_) => "http://www.w3.org/2001/XMLSchema#time",
Value::Timestamp(_) => "http://www.w3.org/2001/XMLSchema#dateTime",
Value::Duration(_) => "http://www.w3.org/2001/XMLSchema#duration",
_ => return None,
};
Some(Value::String(dt.to_string().into()))
}
"IRI" | "URI" => {
let val = self.eval_expr(args.first()?, chunk, row)?;
match val {
Value::String(s) => Some(Value::String(s)),
v => Some(Value::String(value_to_string(&v).into())),
}
}
"BNODE" => {
if args.is_empty() {
use std::sync::atomic::{AtomicU64, Ordering};
static BNODE_COUNTER: AtomicU64 = AtomicU64::new(0);
let id = BNODE_COUNTER.fetch_add(1, Ordering::Relaxed);
Some(Value::String(format!("_:b{id}").into()))
} else {
let val = self.eval_expr(&args[0], chunk, row)?;
let label = value_to_string(&val);
Some(Value::String(format!("_:b{label}").into()))
}
}
"STRDT" => {
if args.len() < 2 {
return None;
}
self.eval_expr(&args[0], chunk, row)
}
"STRLANG" => {
if args.len() < 2 {
return None;
}
self.eval_expr(&args[0], chunk, row)
}
"UUID" => {
use std::sync::atomic::{AtomicU64, Ordering};
static UUID_COUNTER: AtomicU64 = AtomicU64::new(0);
let id = UUID_COUNTER.fetch_add(1, Ordering::Relaxed);
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_nanos());
Some(Value::String(format!("urn:uuid:{ts:032x}-{id:04x}").into()))
}
"STRUUID" => {
use std::sync::atomic::{AtomicU64, Ordering};
static STRUUID_COUNTER: AtomicU64 = AtomicU64::new(0);
let id = STRUUID_COUNTER.fetch_add(1, Ordering::Relaxed);
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_nanos());
Some(Value::String(format!("{ts:032x}-{id:04x}").into()))
}
"SAMETERM" => {
if args.len() < 2 {
return None;
}
let a = self.eval_expr(&args[0], chunk, row)?;
let b = self.eval_expr(&args[1], chunk, row)?;
Some(Value::Bool(a == b))
}
"RAND" => {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use std::sync::atomic::{AtomicU64, Ordering};
static RAND_STATE: AtomicU64 = AtomicU64::new(0);
let state = RAND_STATE.fetch_add(1, Ordering::Relaxed);
let mut hasher = DefaultHasher::new();
state.hash(&mut hasher);
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0u64, |d| d.as_nanos() as u64)
.hash(&mut hasher);
let bits = hasher.finish();
let value = (bits >> 11) as f64 / (1u64 << 53) as f64;
Some(Value::Float64(value))
}
_ => None,
}
}
}
fn format_tz_offset(offset_secs: i32) -> String {
if offset_secs == 0 {
return "Z".to_string();
}
let sign = if offset_secs >= 0 { '+' } else { '-' };
let abs = offset_secs.unsigned_abs();
let hours = abs / 3600;
let minutes = (abs % 3600) / 60;
format!("{sign}{hours:02}:{minutes:02}")
}
impl Predicate for RdfExpressionPredicate {
fn evaluate(&self, chunk: &DataChunk, row: usize) -> bool {
matches!(self.eval(chunk, row), Some(Value::Bool(true)))
}
}
fn term_to_string(term: &Term) -> String {
match term {
Term::Iri(iri) => iri.as_str().to_string(),
Term::BlankNode(bnode) => format!("_:{}", bnode.id()),
Term::Literal(lit) => lit.value().to_string(),
}
}
fn push_term_value(col: &mut grafeo_core::execution::ValueVector, term: &Term) {
match term {
Term::Iri(iri) => col.push_string(iri.as_str().to_string()),
Term::BlankNode(bnode) => col.push_string(format!("_:{}", bnode.id())),
Term::Literal(lit) => {
col.push_string(lit.value().to_string());
}
}
}
fn component_to_term(component: &TripleComponent) -> Option<Term> {
match component {
TripleComponent::Variable(_) => None,
TripleComponent::BlankNode(label) => Some(Term::blank(label.clone())),
TripleComponent::Iri(iri) => Some(Term::iri(iri.clone())),
TripleComponent::Literal(value) => match value {
Value::String(s) => Some(Term::literal(s.clone())),
Value::Int64(i) => Some(Term::typed_literal(i.to_string(), Literal::XSD_INTEGER)),
Value::Float64(f) => Some(Term::typed_literal(f.to_string(), Literal::XSD_DOUBLE)),
Value::Bool(b) => Some(Term::typed_literal(b.to_string(), Literal::XSD_BOOLEAN)),
_ => Some(Term::literal(value.to_string())),
},
TripleComponent::LangLiteral { value, lang } => {
Some(Term::lang_literal(value.clone(), lang.clone()))
}
}
}
fn derive_rdf_schema(columns: &[String]) -> Vec<LogicalType> {
columns.iter().map(|_| LogicalType::String).collect()
}
fn estimate_operator_cardinality(
op: &crate::query::plan::LogicalOperator,
store: &RdfStore,
) -> Option<f64> {
use crate::query::plan::LogicalOperator;
match op {
LogicalOperator::TripleScan(scan) => {
let stats = store.stats();
let total = stats.triple_count as f64;
if total == 0.0 {
return Some(0.0);
}
let s_bound = matches!(
scan.subject,
crate::query::plan::TripleComponent::Iri(_)
| crate::query::plan::TripleComponent::Literal(_)
| crate::query::plan::TripleComponent::LangLiteral { .. }
);
let p_bound = matches!(
scan.predicate,
crate::query::plan::TripleComponent::Iri(_)
| crate::query::plan::TripleComponent::Literal(_)
| crate::query::plan::TripleComponent::LangLiteral { .. }
);
let o_bound = matches!(
scan.object,
crate::query::plan::TripleComponent::Iri(_)
| crate::query::plan::TripleComponent::Literal(_)
| crate::query::plan::TripleComponent::LangLiteral { .. }
);
let estimate = match (s_bound, p_bound, o_bound) {
(true, true, true) => 1.0,
(true, true, false) => total / stats.subject_count.max(1) as f64,
(true, false, true) => total / stats.subject_count.max(1) as f64,
(false, true, true) => total / stats.predicate_count.max(1) as f64,
(true, false, false) => total / stats.subject_count.max(1) as f64,
(false, true, false) => total / stats.predicate_count.max(1) as f64,
(false, false, true) => total / stats.object_count.max(1) as f64,
(false, false, false) => total,
};
Some(estimate.max(1.0))
}
LogicalOperator::Filter(f) => {
estimate_operator_cardinality(&f.input, store).map(|c| (c * 0.33).max(1.0))
}
LogicalOperator::Join(j) => {
let left = estimate_operator_cardinality(&j.left, store)?;
let right = estimate_operator_cardinality(&j.right, store)?;
Some((left * right * 0.1).max(1.0))
}
LogicalOperator::Limit(l) => {
if let crate::query::plan::CountExpr::Literal(n) = l.count {
Some(n as f64)
} else {
estimate_operator_cardinality(&l.input, store)
}
}
_ => None,
}
}
fn resolve_expression(
expr: &LogicalExpression,
variable_columns: &HashMap<String, usize>,
) -> Result<usize> {
crate::query::planner::common::resolve_expression_to_column(expr, variable_columns, "")
}
use crate::query::planner::common::expression_to_string;
fn value_to_string(value: &Value) -> String {
match value {
Value::Null => String::new(),
Value::Bool(b) => b.to_string(),
Value::Int64(i) => i.to_string(),
Value::Float64(f) => f.to_string(),
Value::String(s) => s.to_string(),
Value::Bytes(b) => String::from_utf8_lossy(b).to_string(),
Value::Timestamp(t) => t.to_string(),
Value::Date(d) => d.to_string(),
Value::Time(t) => t.to_string(),
Value::Duration(d) => d.to_string(),
Value::List(items) => {
let parts: Vec<String> = items.iter().map(value_to_string).collect();
format!("[{}]", parts.join(", "))
}
Value::Map(entries) => {
let parts: Vec<String> = entries
.iter()
.map(|(k, v)| format!("{}: {}", k, value_to_string(v)))
.collect();
format!("{{{}}}", parts.join(", "))
}
Value::Vector(v) => {
let parts: Vec<String> = v.iter().map(|f| f.to_string()).collect();
format!("vector([{}])", parts.join(", "))
}
Value::ZonedDatetime(zdt) => zdt.to_string(),
Value::Path { nodes, edges } => {
format!("<path: {} nodes, {} edges>", nodes.len(), edges.len())
}
Value::GCounter(counts) => {
let total: u64 = counts.values().sum();
format!("GCounter({total})")
}
Value::OnCounter { pos, neg } => {
let pos_sum: i64 = pos.values().copied().map(|v| v as i64).sum();
let neg_sum: i64 = neg.values().copied().map(|v| v as i64).sum();
format!("OnCounter({})", pos_sum - neg_sum)
}
}
}
fn compare_values<F>(left: &Value, right: &Value, cmp: F) -> Option<Value>
where
F: Fn(std::cmp::Ordering) -> bool,
{
let ordering = match (left, right) {
(Value::Int64(l), Value::Int64(r)) => l.cmp(r),
(Value::Float64(l), Value::Float64(r)) => l.partial_cmp(r)?,
(Value::String(l), Value::String(r)) => {
if let (Ok(l_num), Ok(r_num)) = (l.parse::<f64>(), r.parse::<f64>()) {
l_num.partial_cmp(&r_num)?
} else {
l.cmp(r)
}
}
(Value::Int64(l), Value::Float64(r)) => (*l as f64).partial_cmp(r)?,
(Value::Float64(l), Value::Int64(r)) => l.partial_cmp(&(*r as f64))?,
(Value::String(s), Value::Int64(r)) => {
let l_num = s.parse::<f64>().ok()?;
l_num.partial_cmp(&(*r as f64))?
}
(Value::String(s), Value::Float64(r)) => {
let l_num = s.parse::<f64>().ok()?;
l_num.partial_cmp(r)?
}
(Value::Int64(l), Value::String(s)) => {
let r_num = s.parse::<f64>().ok()?;
(*l as f64).partial_cmp(&r_num)?
}
(Value::Float64(l), Value::String(s)) => {
let r_num = s.parse::<f64>().ok()?;
l.partial_cmp(&r_num)?
}
_ => return None,
};
Some(Value::Bool(cmp(ordering)))
}
fn rdf_values_equal(left: &Value, right: &Value) -> bool {
match (left, right) {
(Value::Bool(a), Value::Bool(b)) => a == b,
(Value::Int64(a), Value::Int64(b)) => a == b,
(Value::Float64(a), Value::Float64(b)) => (a - b).abs() < f64::EPSILON,
(Value::String(a), Value::String(b)) => a == b,
(Value::Int64(a), Value::Float64(b)) | (Value::Float64(b), Value::Int64(a)) => {
(*a as f64 - b).abs() < f64::EPSILON
}
(Value::String(s), Value::Int64(i)) | (Value::Int64(i), Value::String(s)) => {
s.parse::<i64>().is_ok_and(|n| n == *i)
}
(Value::String(s), Value::Float64(f)) | (Value::Float64(f), Value::String(s)) => {
s.parse::<f64>().is_ok_and(|n| (n - f).abs() < f64::EPSILON)
}
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::query::plan::LogicalPlan;
#[test]
fn test_rdf_planner_simple_scan() {
let store = Arc::new(RdfStore::new());
store.insert(Triple::new(
Term::iri("http://example.org/alix"),
Term::iri("http://xmlns.com/foaf/0.1/name"),
Term::literal("Alix"),
));
let planner = RdfPlanner::new(store);
let scan = TripleScanOp {
subject: TripleComponent::Variable("s".to_string()),
predicate: TripleComponent::Variable("p".to_string()),
object: TripleComponent::Variable("o".to_string()),
graph: None,
input: None,
};
let plan = LogicalPlan::new(LogicalOperator::TripleScan(scan));
let physical = planner.plan(&plan).unwrap();
assert_eq!(physical.columns, vec!["s", "p", "o"]);
}
#[test]
fn test_rdf_planner_with_pattern() {
let store = Arc::new(RdfStore::new());
store.insert(Triple::new(
Term::iri("http://example.org/alix"),
Term::iri("http://xmlns.com/foaf/0.1/name"),
Term::literal("Alix"),
));
store.insert(Triple::new(
Term::iri("http://example.org/gus"),
Term::iri("http://xmlns.com/foaf/0.1/name"),
Term::literal("Gus"),
));
store.insert(Triple::new(
Term::iri("http://example.org/alix"),
Term::iri("http://xmlns.com/foaf/0.1/age"),
Term::typed_literal("30", "http://www.w3.org/2001/XMLSchema#integer"),
));
let planner = RdfPlanner::new(store);
let scan = TripleScanOp {
subject: TripleComponent::Variable("s".to_string()),
predicate: TripleComponent::Iri("http://xmlns.com/foaf/0.1/name".to_string()),
object: TripleComponent::Variable("o".to_string()),
graph: None,
input: None,
};
let plan = LogicalPlan::new(LogicalOperator::TripleScan(scan));
let physical = planner.plan(&plan).unwrap();
assert_eq!(physical.columns, vec!["s", "o"]);
}
#[test]
fn test_rdf_scan_operator_chunking() {
let store = Arc::new(RdfStore::new());
for i in 0..100 {
store.insert(Triple::new(
Term::iri(format!("http://example.org/item{}", i)),
Term::iri("http://example.org/value"),
Term::literal(i.to_string()),
));
}
let pattern = TriplePattern {
subject: None,
predicate: None,
object: None,
};
let mut operator = RdfTripleScanOperator::new(
Arc::clone(&store),
pattern,
[true, true, true, false],
30,
None,
false,
);
let mut total_rows = 0;
while let Ok(Some(chunk)) = operator.next() {
total_rows += chunk.row_count();
assert!(chunk.row_count() <= 30); }
assert_eq!(total_rows, 100);
}
#[test]
fn test_copy_graph_operator() {
let store = Arc::new(RdfStore::new());
store.create_graph("http://example.org/src");
let src = store.graph("http://example.org/src").unwrap();
src.insert(Triple::new(
Term::iri("http://example.org/a"),
Term::iri("http://example.org/p"),
Term::literal("val"),
));
assert_eq!(src.len(), 1);
let plan = LogicalPlan::new(LogicalOperator::CopyGraph(CopyGraphOp {
source: Some("http://example.org/src".to_string()),
destination: Some("http://example.org/dst".to_string()),
silent: false,
}));
let planner = RdfPlanner::new(Arc::clone(&store));
let physical = planner.plan(&plan).unwrap();
let mut op = physical.operator;
while op.next().unwrap().is_some() {}
assert_eq!(store.graph("http://example.org/src").unwrap().len(), 1);
assert_eq!(store.graph("http://example.org/dst").unwrap().len(), 1);
}
#[test]
fn test_move_graph_operator() {
let store = Arc::new(RdfStore::new());
store.create_graph("http://example.org/src");
let src = store.graph("http://example.org/src").unwrap();
src.insert(Triple::new(
Term::iri("http://example.org/a"),
Term::iri("http://example.org/p"),
Term::literal("val"),
));
let plan = LogicalPlan::new(LogicalOperator::MoveGraph(MoveGraphOp {
source: Some("http://example.org/src".to_string()),
destination: Some("http://example.org/dst".to_string()),
silent: false,
}));
let planner = RdfPlanner::new(Arc::clone(&store));
let physical = planner.plan(&plan).unwrap();
let mut op = physical.operator;
while op.next().unwrap().is_some() {}
assert!(store.graph("http://example.org/src").is_none());
assert_eq!(store.graph("http://example.org/dst").unwrap().len(), 1);
}
#[test]
fn test_add_graph_operator() {
let store = Arc::new(RdfStore::new());
store.create_graph("http://example.org/src");
store
.graph("http://example.org/src")
.unwrap()
.insert(Triple::new(
Term::iri("http://example.org/a"),
Term::iri("http://example.org/p"),
Term::literal("from-src"),
));
store.create_graph("http://example.org/dst");
store
.graph("http://example.org/dst")
.unwrap()
.insert(Triple::new(
Term::iri("http://example.org/b"),
Term::iri("http://example.org/q"),
Term::literal("from-dst"),
));
let plan = LogicalPlan::new(LogicalOperator::AddGraph(AddGraphOp {
source: Some("http://example.org/src".to_string()),
destination: Some("http://example.org/dst".to_string()),
silent: false,
}));
let planner = RdfPlanner::new(Arc::clone(&store));
let physical = planner.plan(&plan).unwrap();
let mut op = physical.operator;
while op.next().unwrap().is_some() {}
assert_eq!(store.graph("http://example.org/src").unwrap().len(), 1);
assert_eq!(store.graph("http://example.org/dst").unwrap().len(), 2);
}
#[test]
fn test_copy_nonexistent_source_errors_without_silent() {
let store = Arc::new(RdfStore::new());
let plan = LogicalPlan::new(LogicalOperator::CopyGraph(CopyGraphOp {
source: Some("http://example.org/nope".to_string()),
destination: Some("http://example.org/dst".to_string()),
silent: false,
}));
let planner = RdfPlanner::new(Arc::clone(&store));
let physical = planner.plan(&plan).unwrap();
let mut op = physical.operator;
let result = op.next();
assert!(result.is_err());
}
#[test]
fn test_copy_nonexistent_source_silent_ok() {
let store = Arc::new(RdfStore::new());
let plan = LogicalPlan::new(LogicalOperator::CopyGraph(CopyGraphOp {
source: Some("http://example.org/nope".to_string()),
destination: Some("http://example.org/dst".to_string()),
silent: true,
}));
let planner = RdfPlanner::new(Arc::clone(&store));
let physical = planner.plan(&plan).unwrap();
let mut op = physical.operator;
assert!(op.next().is_ok());
}
}