1pub(crate) mod common;
15pub mod lpg;
16
17#[cfg(feature = "rdf")]
18pub mod rdf;
19
20pub use lpg::Planner;
22
23use crate::query::plan::{
24 AggregateFunction as LogicalAggregateFunction, BinaryOp, LogicalExpression, UnaryOp,
25};
26use grafeo_common::types::LogicalType;
27use grafeo_common::utils::error::{Error, Result};
28use grafeo_core::execution::AdaptiveContext;
29use grafeo_core::execution::operators::{
30 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, FilterExpression, Operator,
31 UnaryFilterOp,
32};
33
34pub struct PhysicalPlan {
36 pub operator: Box<dyn Operator>,
38 pub columns: Vec<String>,
40 pub adaptive_context: Option<AdaptiveContext>,
46}
47
48impl PhysicalPlan {
49 #[must_use]
51 pub fn columns(&self) -> &[String] {
52 &self.columns
53 }
54
55 pub fn into_operator(self) -> Box<dyn Operator> {
57 self.operator
58 }
59
60 #[must_use]
62 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
63 self.adaptive_context.as_ref()
64 }
65
66 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
68 self.adaptive_context.take()
69 }
70}
71
72pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
78 match op {
79 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
80 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
81 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
82 BinaryOp::Le => Ok(BinaryFilterOp::Le),
83 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
84 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
85 BinaryOp::And => Ok(BinaryFilterOp::And),
86 BinaryOp::Or => Ok(BinaryFilterOp::Or),
87 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
88 BinaryOp::Add => Ok(BinaryFilterOp::Add),
89 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
90 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
91 BinaryOp::Div => Ok(BinaryFilterOp::Div),
92 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
93 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
94 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
95 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
96 BinaryOp::In => Ok(BinaryFilterOp::In),
97 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
98 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
99 BinaryOp::Concat => Ok(BinaryFilterOp::Concat),
100 BinaryOp::Like => Ok(BinaryFilterOp::Like),
101 }
102}
103
104pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
106 match op {
107 UnaryOp::Not => Ok(UnaryFilterOp::Not),
108 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
109 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
110 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
111 }
112}
113
114pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
116 match func {
117 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
118 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
119 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
120 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
121 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
122 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
123 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
124 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
125 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
126 LogicalAggregateFunction::Variance => PhysicalAggregateFunction::Variance,
127 LogicalAggregateFunction::VariancePop => PhysicalAggregateFunction::VariancePop,
128 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
129 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
130 LogicalAggregateFunction::GroupConcat => PhysicalAggregateFunction::GroupConcat,
131 LogicalAggregateFunction::Sample => PhysicalAggregateFunction::Sample,
132 LogicalAggregateFunction::CovarSamp => PhysicalAggregateFunction::CovarSamp,
133 LogicalAggregateFunction::CovarPop => PhysicalAggregateFunction::CovarPop,
134 LogicalAggregateFunction::Corr => PhysicalAggregateFunction::Corr,
135 LogicalAggregateFunction::RegrSlope => PhysicalAggregateFunction::RegrSlope,
136 LogicalAggregateFunction::RegrIntercept => PhysicalAggregateFunction::RegrIntercept,
137 LogicalAggregateFunction::RegrR2 => PhysicalAggregateFunction::RegrR2,
138 LogicalAggregateFunction::RegrCount => PhysicalAggregateFunction::RegrCount,
139 LogicalAggregateFunction::RegrSxx => PhysicalAggregateFunction::RegrSxx,
140 LogicalAggregateFunction::RegrSyy => PhysicalAggregateFunction::RegrSyy,
141 LogicalAggregateFunction::RegrSxy => PhysicalAggregateFunction::RegrSxy,
142 LogicalAggregateFunction::RegrAvgx => PhysicalAggregateFunction::RegrAvgx,
143 LogicalAggregateFunction::RegrAvgy => PhysicalAggregateFunction::RegrAvgy,
144 }
145}
146
147pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
151 match expr {
152 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
153 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
154 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
155 variable: variable.clone(),
156 property: property.clone(),
157 }),
158 LogicalExpression::Binary { left, op, right } => {
159 let left_expr = convert_filter_expression(left)?;
160 let right_expr = convert_filter_expression(right)?;
161 let filter_op = convert_binary_op(*op)?;
162 Ok(FilterExpression::Binary {
163 left: Box::new(left_expr),
164 op: filter_op,
165 right: Box::new(right_expr),
166 })
167 }
168 LogicalExpression::Unary { op, operand } => {
169 let operand_expr = convert_filter_expression(operand)?;
170 let filter_op = convert_unary_op(*op)?;
171 Ok(FilterExpression::Unary {
172 op: filter_op,
173 operand: Box::new(operand_expr),
174 })
175 }
176 LogicalExpression::FunctionCall { name, args, .. } => {
177 let filter_args: Vec<FilterExpression> = args
178 .iter()
179 .map(convert_filter_expression)
180 .collect::<Result<Vec<_>>>()?;
181 Ok(FilterExpression::FunctionCall {
182 name: name.clone(),
183 args: filter_args,
184 })
185 }
186 LogicalExpression::Case {
187 operand,
188 when_clauses,
189 else_clause,
190 } => {
191 let filter_operand = operand
192 .as_ref()
193 .map(|e| convert_filter_expression(e))
194 .transpose()?
195 .map(Box::new);
196 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
197 .iter()
198 .map(|(cond, result)| {
199 Ok((
200 convert_filter_expression(cond)?,
201 convert_filter_expression(result)?,
202 ))
203 })
204 .collect::<Result<Vec<_>>>()?;
205 let filter_else = else_clause
206 .as_ref()
207 .map(|e| convert_filter_expression(e))
208 .transpose()?
209 .map(Box::new);
210 Ok(FilterExpression::Case {
211 operand: filter_operand,
212 when_clauses: filter_when_clauses,
213 else_clause: filter_else,
214 })
215 }
216 LogicalExpression::List(items) => {
217 let filter_items: Vec<FilterExpression> = items
218 .iter()
219 .map(convert_filter_expression)
220 .collect::<Result<Vec<_>>>()?;
221 Ok(FilterExpression::List(filter_items))
222 }
223 LogicalExpression::Map(pairs) => {
224 let filter_pairs: Vec<(String, FilterExpression)> = pairs
225 .iter()
226 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
227 .collect::<Result<Vec<_>>>()?;
228 Ok(FilterExpression::Map(filter_pairs))
229 }
230 LogicalExpression::IndexAccess { base, index } => {
231 let base_expr = convert_filter_expression(base)?;
232 let index_expr = convert_filter_expression(index)?;
233 Ok(FilterExpression::IndexAccess {
234 base: Box::new(base_expr),
235 index: Box::new(index_expr),
236 })
237 }
238 LogicalExpression::SliceAccess { base, start, end } => {
239 let base_expr = convert_filter_expression(base)?;
240 let start_expr = start
241 .as_ref()
242 .map(|s| convert_filter_expression(s))
243 .transpose()?
244 .map(Box::new);
245 let end_expr = end
246 .as_ref()
247 .map(|e| convert_filter_expression(e))
248 .transpose()?
249 .map(Box::new);
250 Ok(FilterExpression::SliceAccess {
251 base: Box::new(base_expr),
252 start: start_expr,
253 end: end_expr,
254 })
255 }
256 LogicalExpression::Parameter(_) => Err(Error::Internal(
257 "Parameters not yet supported in filters".to_string(),
258 )),
259 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
260 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
261 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
262 LogicalExpression::ListComprehension {
263 variable,
264 list_expr,
265 filter_expr,
266 map_expr,
267 } => {
268 let list = convert_filter_expression(list_expr)?;
269 let filter = filter_expr
270 .as_ref()
271 .map(|f| convert_filter_expression(f))
272 .transpose()?
273 .map(Box::new);
274 let map = convert_filter_expression(map_expr)?;
275 Ok(FilterExpression::ListComprehension {
276 variable: variable.clone(),
277 list_expr: Box::new(list),
278 filter_expr: filter,
279 map_expr: Box::new(map),
280 })
281 }
282 LogicalExpression::ListPredicate {
283 kind,
284 variable,
285 list_expr,
286 predicate,
287 } => {
288 use crate::query::plan::ListPredicateKind as LPK;
289 let filter_kind = match kind {
290 LPK::All => grafeo_core::execution::operators::ListPredicateKind::All,
291 LPK::Any => grafeo_core::execution::operators::ListPredicateKind::Any,
292 LPK::None => grafeo_core::execution::operators::ListPredicateKind::None,
293 LPK::Single => grafeo_core::execution::operators::ListPredicateKind::Single,
294 };
295 let list = convert_filter_expression(list_expr)?;
296 let pred = convert_filter_expression(predicate)?;
297 Ok(FilterExpression::ListPredicate {
298 kind: filter_kind,
299 variable: variable.clone(),
300 list_expr: Box::new(list),
301 predicate: Box::new(pred),
302 })
303 }
304 LogicalExpression::ExistsSubquery(_) | LogicalExpression::CountSubquery(_) => {
305 Err(Error::Internal(
310 "Subquery expressions in this position require the semi-join or Apply rewrite; \
311 move the EXISTS/COUNT subquery to a top-level WHERE predicate"
312 .to_string(),
313 ))
314 }
315 LogicalExpression::MapProjection { base, entries } => {
316 let physical_entries: Vec<(String, FilterExpression)> = entries
317 .iter()
318 .map(|entry| match entry {
319 crate::query::plan::MapProjectionEntry::PropertySelector(name) => Ok((
320 name.clone(),
321 FilterExpression::Property {
322 variable: base.clone(),
323 property: name.clone(),
324 },
325 )),
326 crate::query::plan::MapProjectionEntry::LiteralEntry(key, expr) => {
327 Ok((key.clone(), convert_filter_expression(expr)?))
328 }
329 crate::query::plan::MapProjectionEntry::AllProperties => Ok((
330 "*".to_string(),
331 FilterExpression::FunctionCall {
332 name: "properties".to_string(),
333 args: vec![FilterExpression::Variable(base.clone())],
334 },
335 )),
336 })
337 .collect::<Result<Vec<_>>>()?;
338 Ok(FilterExpression::Map(physical_entries))
339 }
340 LogicalExpression::Reduce {
341 accumulator,
342 initial,
343 variable,
344 list,
345 expression,
346 } => Ok(FilterExpression::Reduce {
347 accumulator: accumulator.clone(),
348 initial: Box::new(convert_filter_expression(initial)?),
349 variable: variable.clone(),
350 list: Box::new(convert_filter_expression(list)?),
351 expression: Box::new(convert_filter_expression(expression)?),
352 }),
353 LogicalExpression::PatternComprehension { projection, .. } => {
354 let proj = convert_filter_expression(projection)?;
355 Ok(FilterExpression::FunctionCall {
356 name: "collect".to_string(),
357 args: vec![proj],
358 })
359 }
360 }
361}
362
363pub(crate) fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
365 use grafeo_common::types::Value;
366 match value {
367 Value::Null => LogicalType::String,
368 Value::Bool(_) => LogicalType::Bool,
369 Value::Int64(_) => LogicalType::Int64,
370 Value::Float64(_) => LogicalType::Float64,
371 Value::String(_) => LogicalType::String,
372 Value::Bytes(_) => LogicalType::String,
373 Value::Timestamp(_) => LogicalType::Timestamp,
374 Value::Date(_) => LogicalType::Date,
375 Value::Time(_) => LogicalType::Time,
376 Value::Duration(_) => LogicalType::Duration,
377 Value::ZonedDatetime(_) => LogicalType::ZonedDatetime,
378 Value::List(_) => LogicalType::String,
379 Value::Map(_) => LogicalType::String,
380 Value::Vector(v) => LogicalType::Vector(v.len()),
381 Value::Path { .. } => LogicalType::Any,
382 }
383}
384
385pub(crate) fn eval_constant_expression(
390 expr: &crate::query::plan::LogicalExpression,
391) -> Result<grafeo_common::types::Value> {
392 use crate::query::plan::LogicalExpression;
393 use grafeo_common::types::Value;
394
395 match expr {
396 LogicalExpression::Literal(val) => Ok(val.clone()),
397 LogicalExpression::Unary {
398 op: crate::query::plan::UnaryOp::Neg,
399 operand,
400 } => {
401 let val = eval_constant_expression(operand)?;
402 match val {
403 Value::Int64(n) => Ok(Value::Int64(-n)),
404 Value::Float64(f) => Ok(Value::Float64(-f)),
405 _ => Err(Error::Internal("Cannot negate non-numeric value".into())),
406 }
407 }
408 _ => Err(Error::Internal(
409 "Procedure argument must be a constant value".into(),
410 )),
411 }
412}