1pub(crate) mod common;
15pub mod lpg;
16
17#[cfg(feature = "triple-store")]
18pub mod rdf;
19
20pub use lpg::Planner;
22
23use crate::query::plan::{
24 AggregateFunction as LogicalAggregateFunction, BinaryOp, LogicalExpression, UnaryOp,
25};
26use grafeo_common::types::LogicalType;
27use grafeo_common::utils::error::{Error, Result};
28use grafeo_core::execution::AdaptiveContext;
29use grafeo_core::execution::operators::{
30 AggregateFunction as PhysicalAggregateFunction, BinaryFilterOp, FilterExpression, Operator,
31 UnaryFilterOp,
32};
33
34pub struct PhysicalPlan {
36 pub operator: Box<dyn Operator>,
38 pub columns: Vec<String>,
40 pub adaptive_context: Option<AdaptiveContext>,
46}
47
48impl PhysicalPlan {
49 #[must_use]
51 pub fn columns(&self) -> &[String] {
52 &self.columns
53 }
54
55 pub fn into_operator(self) -> Box<dyn Operator> {
57 self.operator
58 }
59
60 #[must_use]
62 pub fn adaptive_context(&self) -> Option<&AdaptiveContext> {
63 self.adaptive_context.as_ref()
64 }
65
66 pub fn take_adaptive_context(&mut self) -> Option<AdaptiveContext> {
68 self.adaptive_context.take()
69 }
70}
71
72pub fn convert_binary_op(op: BinaryOp) -> Result<BinaryFilterOp> {
83 match op {
84 BinaryOp::Eq => Ok(BinaryFilterOp::Eq),
85 BinaryOp::Ne => Ok(BinaryFilterOp::Ne),
86 BinaryOp::Lt => Ok(BinaryFilterOp::Lt),
87 BinaryOp::Le => Ok(BinaryFilterOp::Le),
88 BinaryOp::Gt => Ok(BinaryFilterOp::Gt),
89 BinaryOp::Ge => Ok(BinaryFilterOp::Ge),
90 BinaryOp::And => Ok(BinaryFilterOp::And),
91 BinaryOp::Or => Ok(BinaryFilterOp::Or),
92 BinaryOp::Xor => Ok(BinaryFilterOp::Xor),
93 BinaryOp::Add => Ok(BinaryFilterOp::Add),
94 BinaryOp::Sub => Ok(BinaryFilterOp::Sub),
95 BinaryOp::Mul => Ok(BinaryFilterOp::Mul),
96 BinaryOp::Div => Ok(BinaryFilterOp::Div),
97 BinaryOp::Mod => Ok(BinaryFilterOp::Mod),
98 BinaryOp::StartsWith => Ok(BinaryFilterOp::StartsWith),
99 BinaryOp::EndsWith => Ok(BinaryFilterOp::EndsWith),
100 BinaryOp::Contains => Ok(BinaryFilterOp::Contains),
101 BinaryOp::In => Ok(BinaryFilterOp::In),
102 BinaryOp::Regex => Ok(BinaryFilterOp::Regex),
103 BinaryOp::Pow => Ok(BinaryFilterOp::Pow),
104 BinaryOp::Concat => Ok(BinaryFilterOp::Concat),
105 BinaryOp::Like => Ok(BinaryFilterOp::Like),
106 }
107}
108
109pub fn convert_unary_op(op: UnaryOp) -> Result<UnaryFilterOp> {
116 match op {
117 UnaryOp::Not => Ok(UnaryFilterOp::Not),
118 UnaryOp::IsNull => Ok(UnaryFilterOp::IsNull),
119 UnaryOp::IsNotNull => Ok(UnaryFilterOp::IsNotNull),
120 UnaryOp::Neg => Ok(UnaryFilterOp::Neg),
121 }
122}
123
124pub fn convert_aggregate_function(func: LogicalAggregateFunction) -> PhysicalAggregateFunction {
126 match func {
127 LogicalAggregateFunction::Count => PhysicalAggregateFunction::Count,
128 LogicalAggregateFunction::CountNonNull => PhysicalAggregateFunction::CountNonNull,
129 LogicalAggregateFunction::Sum => PhysicalAggregateFunction::Sum,
130 LogicalAggregateFunction::Avg => PhysicalAggregateFunction::Avg,
131 LogicalAggregateFunction::Min => PhysicalAggregateFunction::Min,
132 LogicalAggregateFunction::Max => PhysicalAggregateFunction::Max,
133 LogicalAggregateFunction::Collect => PhysicalAggregateFunction::Collect,
134 LogicalAggregateFunction::StdDev => PhysicalAggregateFunction::StdDev,
135 LogicalAggregateFunction::StdDevPop => PhysicalAggregateFunction::StdDevPop,
136 LogicalAggregateFunction::Variance => PhysicalAggregateFunction::Variance,
137 LogicalAggregateFunction::VariancePop => PhysicalAggregateFunction::VariancePop,
138 LogicalAggregateFunction::PercentileDisc => PhysicalAggregateFunction::PercentileDisc,
139 LogicalAggregateFunction::PercentileCont => PhysicalAggregateFunction::PercentileCont,
140 LogicalAggregateFunction::GroupConcat => PhysicalAggregateFunction::GroupConcat,
141 LogicalAggregateFunction::Sample => PhysicalAggregateFunction::Sample,
142 LogicalAggregateFunction::CovarSamp => PhysicalAggregateFunction::CovarSamp,
143 LogicalAggregateFunction::CovarPop => PhysicalAggregateFunction::CovarPop,
144 LogicalAggregateFunction::Corr => PhysicalAggregateFunction::Corr,
145 LogicalAggregateFunction::RegrSlope => PhysicalAggregateFunction::RegrSlope,
146 LogicalAggregateFunction::RegrIntercept => PhysicalAggregateFunction::RegrIntercept,
147 LogicalAggregateFunction::RegrR2 => PhysicalAggregateFunction::RegrR2,
148 LogicalAggregateFunction::RegrCount => PhysicalAggregateFunction::RegrCount,
149 LogicalAggregateFunction::RegrSxx => PhysicalAggregateFunction::RegrSxx,
150 LogicalAggregateFunction::RegrSyy => PhysicalAggregateFunction::RegrSyy,
151 LogicalAggregateFunction::RegrSxy => PhysicalAggregateFunction::RegrSxy,
152 LogicalAggregateFunction::RegrAvgx => PhysicalAggregateFunction::RegrAvgx,
153 LogicalAggregateFunction::RegrAvgy => PhysicalAggregateFunction::RegrAvgy,
154 }
155}
156
157pub fn convert_filter_expression(expr: &LogicalExpression) -> Result<FilterExpression> {
166 match expr {
167 LogicalExpression::Literal(v) => Ok(FilterExpression::Literal(v.clone())),
168 LogicalExpression::Variable(name) => Ok(FilterExpression::Variable(name.clone())),
169 LogicalExpression::Property { variable, property } => Ok(FilterExpression::Property {
170 variable: variable.clone(),
171 property: property.clone(),
172 }),
173 LogicalExpression::Binary { left, op, right } => {
174 let left_expr = convert_filter_expression(left)?;
175 let right_expr = convert_filter_expression(right)?;
176 let filter_op = convert_binary_op(*op)?;
177 Ok(FilterExpression::Binary {
178 left: Box::new(left_expr),
179 op: filter_op,
180 right: Box::new(right_expr),
181 })
182 }
183 LogicalExpression::Unary { op, operand } => {
184 let operand_expr = convert_filter_expression(operand)?;
185 let filter_op = convert_unary_op(*op)?;
186 Ok(FilterExpression::Unary {
187 op: filter_op,
188 operand: Box::new(operand_expr),
189 })
190 }
191 LogicalExpression::FunctionCall { name, args, .. } => {
192 let filter_args: Vec<FilterExpression> = args
193 .iter()
194 .map(convert_filter_expression)
195 .collect::<Result<Vec<_>>>()?;
196 Ok(FilterExpression::FunctionCall {
197 name: name.clone(),
198 args: filter_args,
199 })
200 }
201 LogicalExpression::Case {
202 operand,
203 when_clauses,
204 else_clause,
205 } => {
206 let filter_operand = operand
207 .as_ref()
208 .map(|e| convert_filter_expression(e))
209 .transpose()?
210 .map(Box::new);
211 let filter_when_clauses: Vec<(FilterExpression, FilterExpression)> = when_clauses
212 .iter()
213 .map(|(cond, result)| {
214 Ok((
215 convert_filter_expression(cond)?,
216 convert_filter_expression(result)?,
217 ))
218 })
219 .collect::<Result<Vec<_>>>()?;
220 let filter_else = else_clause
221 .as_ref()
222 .map(|e| convert_filter_expression(e))
223 .transpose()?
224 .map(Box::new);
225 Ok(FilterExpression::Case {
226 operand: filter_operand,
227 when_clauses: filter_when_clauses,
228 else_clause: filter_else,
229 })
230 }
231 LogicalExpression::List(items) => {
232 let filter_items: Vec<FilterExpression> = items
233 .iter()
234 .map(convert_filter_expression)
235 .collect::<Result<Vec<_>>>()?;
236 Ok(FilterExpression::List(filter_items))
237 }
238 LogicalExpression::Map(pairs) => {
239 let filter_pairs: Vec<(String, FilterExpression)> = pairs
240 .iter()
241 .map(|(k, v)| Ok((k.clone(), convert_filter_expression(v)?)))
242 .collect::<Result<Vec<_>>>()?;
243 Ok(FilterExpression::Map(filter_pairs))
244 }
245 LogicalExpression::IndexAccess { base, index } => {
246 let base_expr = convert_filter_expression(base)?;
247 let index_expr = convert_filter_expression(index)?;
248 Ok(FilterExpression::IndexAccess {
249 base: Box::new(base_expr),
250 index: Box::new(index_expr),
251 })
252 }
253 LogicalExpression::SliceAccess { base, start, end } => {
254 let base_expr = convert_filter_expression(base)?;
255 let start_expr = start
256 .as_ref()
257 .map(|s| convert_filter_expression(s))
258 .transpose()?
259 .map(Box::new);
260 let end_expr = end
261 .as_ref()
262 .map(|e| convert_filter_expression(e))
263 .transpose()?
264 .map(Box::new);
265 Ok(FilterExpression::SliceAccess {
266 base: Box::new(base_expr),
267 start: start_expr,
268 end: end_expr,
269 })
270 }
271 LogicalExpression::Parameter(_) => Err(Error::Internal(
272 "Parameters not yet supported in filters".to_string(),
273 )),
274 LogicalExpression::Labels(var) => Ok(FilterExpression::Labels(var.clone())),
275 LogicalExpression::Type(var) => Ok(FilterExpression::Type(var.clone())),
276 LogicalExpression::Id(var) => Ok(FilterExpression::Id(var.clone())),
277 LogicalExpression::ListComprehension {
278 variable,
279 list_expr,
280 filter_expr,
281 map_expr,
282 } => {
283 let list = convert_filter_expression(list_expr)?;
284 let filter = filter_expr
285 .as_ref()
286 .map(|f| convert_filter_expression(f))
287 .transpose()?
288 .map(Box::new);
289 let map = convert_filter_expression(map_expr)?;
290 Ok(FilterExpression::ListComprehension {
291 variable: variable.clone(),
292 list_expr: Box::new(list),
293 filter_expr: filter,
294 map_expr: Box::new(map),
295 })
296 }
297 LogicalExpression::ListPredicate {
298 kind,
299 variable,
300 list_expr,
301 predicate,
302 } => {
303 use crate::query::plan::ListPredicateKind as LPK;
304 let filter_kind = match kind {
305 LPK::All => grafeo_core::execution::operators::ListPredicateKind::All,
306 LPK::Any => grafeo_core::execution::operators::ListPredicateKind::Any,
307 LPK::None => grafeo_core::execution::operators::ListPredicateKind::None,
308 LPK::Single => grafeo_core::execution::operators::ListPredicateKind::Single,
309 };
310 let list = convert_filter_expression(list_expr)?;
311 let pred = convert_filter_expression(predicate)?;
312 Ok(FilterExpression::ListPredicate {
313 kind: filter_kind,
314 variable: variable.clone(),
315 list_expr: Box::new(list),
316 predicate: Box::new(pred),
317 })
318 }
319 LogicalExpression::ExistsSubquery(_)
320 | LogicalExpression::CountSubquery(_)
321 | LogicalExpression::ValueSubquery(_) => {
322 Err(Error::Internal(
327 "Subquery expressions in this position require the semi-join or Apply rewrite; \
328 move the EXISTS/COUNT/VALUE subquery to a top-level WHERE predicate or RETURN"
329 .to_string(),
330 ))
331 }
332 LogicalExpression::MapProjection { base, entries } => {
333 let physical_entries: Vec<(String, FilterExpression)> = entries
334 .iter()
335 .map(|entry| match entry {
336 crate::query::plan::MapProjectionEntry::PropertySelector(name) => Ok((
337 name.clone(),
338 FilterExpression::Property {
339 variable: base.clone(),
340 property: name.clone(),
341 },
342 )),
343 crate::query::plan::MapProjectionEntry::LiteralEntry(key, expr) => {
344 Ok((key.clone(), convert_filter_expression(expr)?))
345 }
346 crate::query::plan::MapProjectionEntry::AllProperties => Ok((
347 "*".to_string(),
348 FilterExpression::FunctionCall {
349 name: "properties".to_string(),
350 args: vec![FilterExpression::Variable(base.clone())],
351 },
352 )),
353 })
354 .collect::<Result<Vec<_>>>()?;
355 Ok(FilterExpression::Map(physical_entries))
356 }
357 LogicalExpression::Reduce {
358 accumulator,
359 initial,
360 variable,
361 list,
362 expression,
363 } => Ok(FilterExpression::Reduce {
364 accumulator: accumulator.clone(),
365 initial: Box::new(convert_filter_expression(initial)?),
366 variable: variable.clone(),
367 list: Box::new(convert_filter_expression(list)?),
368 expression: Box::new(convert_filter_expression(expression)?),
369 }),
370 LogicalExpression::PatternComprehension { projection, .. } => {
371 let proj = convert_filter_expression(projection)?;
372 Ok(FilterExpression::FunctionCall {
373 name: "collect".to_string(),
374 args: vec![proj],
375 })
376 }
377 }
378}
379
380pub(crate) fn value_to_logical_type(value: &grafeo_common::types::Value) -> LogicalType {
382 use grafeo_common::types::Value;
383 match value {
384 Value::Null => LogicalType::String,
385 Value::Bool(_) => LogicalType::Bool,
386 Value::Int64(_) => LogicalType::Int64,
387 Value::Float64(_) => LogicalType::Float64,
388 Value::String(_) => LogicalType::String,
389 Value::Bytes(_) => LogicalType::String,
390 Value::Timestamp(_) => LogicalType::Timestamp,
391 Value::Date(_) => LogicalType::Date,
392 Value::Time(_) => LogicalType::Time,
393 Value::Duration(_) => LogicalType::Duration,
394 Value::ZonedDatetime(_) => LogicalType::ZonedDatetime,
395 Value::List(_) => LogicalType::String,
396 Value::Map(_) => LogicalType::String,
397 Value::Vector(v) => LogicalType::Vector(v.len()),
398 Value::Path { .. } => LogicalType::Any,
399 Value::GCounter(_) | Value::OnCounter { .. } => LogicalType::Any,
400 _ => LogicalType::Any,
401 }
402}
403
404#[cfg(feature = "algos")]
409pub(crate) fn eval_constant_expression(
410 expr: &crate::query::plan::LogicalExpression,
411) -> Result<grafeo_common::types::Value> {
412 use crate::query::plan::LogicalExpression;
413 use grafeo_common::types::Value;
414
415 match expr {
416 LogicalExpression::Literal(val) => Ok(val.clone()),
417 LogicalExpression::Unary {
418 op: crate::query::plan::UnaryOp::Neg,
419 operand,
420 } => {
421 let val = eval_constant_expression(operand)?;
422 match val {
423 Value::Int64(n) => Ok(Value::Int64(-n)),
424 Value::Float64(f) => Ok(Value::Float64(-f)),
425 _ => Err(Error::Internal("Cannot negate non-numeric value".into())),
426 }
427 }
428 _ => Err(Error::Internal(
429 "Procedure argument must be a constant value".into(),
430 )),
431 }
432}