Skip to main content

partiql_eval/eval/
mod.rs

1use itertools::Itertools;
2use std::any::Any;
3use std::borrow::Cow;
4use std::cell::RefCell;
5
6use delegate::delegate;
7use petgraph::algo::toposort;
8use petgraph::dot::Dot;
9use petgraph::prelude::StableGraph;
10use petgraph::{Directed, Incoming, Outgoing};
11use std::fmt::Debug;
12
13use partiql_value::{BindingsName, Value};
14
15use crate::env::basic::MapBindings;
16
17use petgraph::graph::NodeIndex;
18
19use crate::error::{EvalErr, EvaluationError};
20use partiql_catalog::context::{Bindings, SessionContext, SystemContext};
21use petgraph::visit::EdgeRef;
22use rustc_hash::FxHashMap;
23use unicase::UniCase;
24
25use crate::eval::evaluable::{EvalType, Evaluable};
26use crate::plan::EvaluationMode;
27
28#[cfg(feature = "serde")]
29use serde::{Deserialize, Serialize};
30
31pub(crate) mod eval_expr_wrapper;
32pub mod evaluable;
33pub mod expr;
34pub mod graph;
35
36/// Represents a `PartiQL` evaluation query plan which is a plan that can be evaluated to produce
37/// a result. The plan uses a directed `petgraph::StableGraph`.
38#[derive(Debug)]
39pub struct EvalPlan {
40    mode: EvaluationMode,
41    plan_graph: StableGraph<Box<dyn Evaluable>, u8, Directed>,
42}
43
44impl Default for EvalPlan {
45    fn default() -> Self {
46        Self::new(EvaluationMode::Permissive, Default::default())
47    }
48}
49
50#[inline]
51fn err_illegal_state(msg: impl AsRef<str>) -> EvalErr {
52    EvalErr {
53        errors: vec![EvaluationError::IllegalState(msg.as_ref().to_string())],
54    }
55}
56
57impl EvalPlan {
58    /// Creates a new evaluation plan.
59    #[must_use]
60    pub fn new(
61        mode: EvaluationMode,
62        plan_graph: StableGraph<Box<dyn Evaluable>, u8, Directed>,
63    ) -> Self {
64        EvalPlan { mode, plan_graph }
65    }
66
67    #[inline]
68    fn plan_graph(&self) -> &StableGraph<Box<dyn Evaluable>, u8> {
69        &self.plan_graph
70    }
71
72    #[inline]
73    fn get_node(&self, idx: NodeIndex) -> Result<&dyn Evaluable, EvalErr> {
74        self.plan_graph()
75            .node_weight(idx)
76            .map(|node| node.as_ref())
77            .ok_or_else(|| err_illegal_state("Error in retrieving node"))
78    }
79
80    /// Executes the plan while mutating its state by changing the inputs and outputs of plan
81    /// operators.
82    pub fn execute(&self, ctx: &dyn EvalContext) -> Result<Evaluated, EvalErr> {
83        // We are only interested in DAGs that can be used as execution plans, which leads to the
84        // following definition.
85        // A DAG is a directed, cycle-free graph G = (V, E) with a denoted root node v0 ∈ V such
86        // that all v ∈ V \{v0} are reachable from v0. Note that this is the definition of trees
87        // without the condition |E| = |V | − 1. Hence, all trees are DAGs.
88        // Reference: https://link.springer.com/article/10.1007/s00450-009-0061-0
89        let ops = toposort(&self.plan_graph, None).map_err(|e| EvalErr {
90            errors: vec![EvaluationError::InvalidEvaluationPlan(format!(
91                "Malformed evaluation plan detected: {e:?}"
92            ))],
93        })?;
94        let mut inputs: FxHashMap<NodeIndex, [Option<Value>; 2]> = FxHashMap::default();
95
96        // Set source node inputs to empty
97        for idx in ops.clone() {
98            let source_node = self.plan_graph.edges_directed(idx, Incoming).count() == 0;
99            let managed = self
100                .get_node(idx)
101                .map(|d| d.eval_type() != EvalType::GraphManaged)
102                .unwrap_or(false);
103            if source_node || managed {
104                inputs.insert(idx, [None, None]);
105            }
106        }
107
108        let mut result = None;
109        for idx in ops {
110            let destinations: Vec<(usize, (u8, NodeIndex))> = self
111                .plan_graph()
112                .edges_directed(idx, Outgoing)
113                .map(|e| (*e.weight(), e.target()))
114                .enumerate()
115                .collect_vec();
116
117            // Some evaluables (i.e., `JOIN`) manage their own inputs
118            let graph_managed = destinations.is_empty()
119                || destinations.iter().any(|(_, (_, dest_idx))| {
120                    matches!(
121                        self.get_node(*dest_idx).map(|d| d.eval_type()),
122                        Ok(EvalType::GraphManaged)
123                    )
124                });
125            if graph_managed {
126                let src = self.get_node(idx)?;
127                let input = inputs
128                    .remove(&idx)
129                    .ok_or_else(|| err_illegal_state("Error in retrieving node input"))?;
130                result = Some(src.evaluate(input, ctx));
131
132                // return on first evaluation error
133                if ctx.has_errors() && self.mode == EvaluationMode::Strict {
134                    return Err(EvalErr {
135                        errors: ctx.errors(),
136                    });
137                }
138
139                let num_destinations = destinations.len();
140                for (i, (branch_num, dst_id)) in destinations {
141                    let res = if i == num_destinations - 1 {
142                        result.take()
143                    } else {
144                        result.clone()
145                    };
146
147                    let res =
148                        res.ok_or_else(|| err_illegal_state("Error in retrieving source value"))?;
149                    let inputs = inputs.entry(dst_id).or_insert_with(|| [None, None]);
150                    inputs[branch_num as usize] = Some(res);
151                }
152            }
153        }
154
155        let result = result.ok_or_else(|| err_illegal_state("Error in retrieving eval output"))?;
156        Ok(Evaluated { result })
157    }
158
159    #[must_use]
160    pub fn to_dot_graph(&self) -> String {
161        format!("{:?}", Dot::with_config(&self.plan_graph, &[]))
162    }
163}
164
165/// Represents an evaluation result that contains evaluated result or the error.
166pub type EvalResult = Result<Evaluated, EvalErr>;
167
168/// Represents result of evaluation as an evaluated entity.
169#[non_exhaustive]
170#[derive(Debug)]
171#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
172pub struct Evaluated {
173    pub result: Value,
174}
175
176/// Represents an evaluation context that is used during evaluation of a plan.
177pub trait EvalContext: Bindings<Value> + SessionContext + Debug {
178    fn add_error(&self, error: EvaluationError);
179    fn has_errors(&self) -> bool;
180    fn errors(&self) -> Vec<EvaluationError>;
181}
182
183#[derive(Debug)]
184pub struct BasicContext<'u> {
185    pub bindings: MapBindings<Value>,
186
187    pub sys: SystemContext,
188    pub user: FxHashMap<UniCase<String>, &'u (dyn Any)>,
189
190    pub errors: RefCell<Vec<EvaluationError>>,
191}
192
193impl BasicContext<'_> {
194    #[must_use]
195    pub fn new(bindings: MapBindings<Value>, sys: SystemContext) -> Self {
196        BasicContext {
197            bindings,
198            sys,
199            user: Default::default(),
200            errors: RefCell::new(vec![]),
201        }
202    }
203}
204
205impl SessionContext for BasicContext<'_> {
206    fn system_context(&self) -> &SystemContext {
207        &self.sys
208    }
209
210    fn user_context(&self, name: &str) -> Option<&(dyn Any)> {
211        let key = name.into();
212        self.user.get(&key).copied()
213    }
214}
215
216impl Bindings<Value> for BasicContext<'_> {
217    fn get<'a>(&'a self, name: &BindingsName<'_>) -> Option<Cow<'a, Value>> {
218        self.bindings.get(name)
219    }
220}
221
222impl EvalContext for BasicContext<'_> {
223    fn add_error(&self, error: EvaluationError) {
224        self.errors.borrow_mut().push(error);
225    }
226
227    fn has_errors(&self) -> bool {
228        !self.errors.borrow().is_empty()
229    }
230
231    fn errors(&self) -> Vec<EvaluationError> {
232        self.errors.take()
233    }
234}
235
236#[derive(Debug)]
237pub struct NestedContext<'c> {
238    pub bindings: MapBindings<Value>,
239    pub parent: &'c dyn EvalContext,
240}
241
242impl<'c> NestedContext<'c> {
243    pub fn new(bindings: MapBindings<Value>, parent: &'c dyn EvalContext) -> Self {
244        NestedContext { bindings, parent }
245    }
246}
247
248impl SessionContext for NestedContext<'_> {
249    delegate! {
250        to self.parent {
251            fn system_context(&self) -> &SystemContext;
252            fn user_context(&self, name: &str) -> Option<& (dyn Any )>;
253        }
254    }
255}
256
257impl Bindings<Value> for NestedContext<'_> {
258    fn get<'a>(&'a self, name: &BindingsName<'_>) -> Option<Cow<'a, Value>> {
259        match self.bindings.get(name) {
260            Some(v) => Some(v),
261            None => self.parent.get(name),
262        }
263    }
264}
265
266impl EvalContext for NestedContext<'_> {
267    delegate! {
268        to self.parent {
269            fn add_error(&self, error: EvaluationError);
270            fn has_errors(&self) -> bool;
271            fn errors(&self) -> Vec<EvaluationError>;
272        }
273    }
274}