1use itertools::Itertools;
2use std::any::Any;
3use std::borrow::Cow;
4use std::cell::RefCell;
5
6use delegate::delegate;
7use petgraph::algo::toposort;
8use petgraph::dot::Dot;
9use petgraph::prelude::StableGraph;
10use petgraph::{Directed, Incoming, Outgoing};
11use std::fmt::Debug;
12
13use partiql_value::{BindingsName, Value};
14
15use crate::env::basic::MapBindings;
16
17use petgraph::graph::NodeIndex;
18
19use crate::error::{EvalErr, EvaluationError};
20use partiql_catalog::context::{Bindings, SessionContext, SystemContext};
21use petgraph::visit::EdgeRef;
22use rustc_hash::FxHashMap;
23use unicase::UniCase;
24
25use crate::eval::evaluable::{EvalType, Evaluable};
26use crate::plan::EvaluationMode;
27
28#[cfg(feature = "serde")]
29use serde::{Deserialize, Serialize};
30
31pub(crate) mod eval_expr_wrapper;
32pub mod evaluable;
33pub mod expr;
34pub mod graph;
35
36#[derive(Debug)]
39pub struct EvalPlan {
40 mode: EvaluationMode,
41 plan_graph: StableGraph<Box<dyn Evaluable>, u8, Directed>,
42}
43
44impl Default for EvalPlan {
45 fn default() -> Self {
46 Self::new(EvaluationMode::Permissive, Default::default())
47 }
48}
49
50#[inline]
51fn err_illegal_state(msg: impl AsRef<str>) -> EvalErr {
52 EvalErr {
53 errors: vec![EvaluationError::IllegalState(msg.as_ref().to_string())],
54 }
55}
56
57impl EvalPlan {
58 #[must_use]
60 pub fn new(
61 mode: EvaluationMode,
62 plan_graph: StableGraph<Box<dyn Evaluable>, u8, Directed>,
63 ) -> Self {
64 EvalPlan { mode, plan_graph }
65 }
66
67 #[inline]
68 fn plan_graph(&self) -> &StableGraph<Box<dyn Evaluable>, u8> {
69 &self.plan_graph
70 }
71
72 #[inline]
73 fn get_node(&self, idx: NodeIndex) -> Result<&dyn Evaluable, EvalErr> {
74 self.plan_graph()
75 .node_weight(idx)
76 .map(|node| node.as_ref())
77 .ok_or_else(|| err_illegal_state("Error in retrieving node"))
78 }
79
80 pub fn execute(&self, ctx: &dyn EvalContext) -> Result<Evaluated, EvalErr> {
83 let ops = toposort(&self.plan_graph, None).map_err(|e| EvalErr {
90 errors: vec![EvaluationError::InvalidEvaluationPlan(format!(
91 "Malformed evaluation plan detected: {e:?}"
92 ))],
93 })?;
94 let mut inputs: FxHashMap<NodeIndex, [Option<Value>; 2]> = FxHashMap::default();
95
96 for idx in ops.clone() {
98 let source_node = self.plan_graph.edges_directed(idx, Incoming).count() == 0;
99 let managed = self
100 .get_node(idx)
101 .map(|d| d.eval_type() != EvalType::GraphManaged)
102 .unwrap_or(false);
103 if source_node || managed {
104 inputs.insert(idx, [None, None]);
105 }
106 }
107
108 let mut result = None;
109 for idx in ops {
110 let destinations: Vec<(usize, (u8, NodeIndex))> = self
111 .plan_graph()
112 .edges_directed(idx, Outgoing)
113 .map(|e| (*e.weight(), e.target()))
114 .enumerate()
115 .collect_vec();
116
117 let graph_managed = destinations.is_empty()
119 || destinations.iter().any(|(_, (_, dest_idx))| {
120 matches!(
121 self.get_node(*dest_idx).map(|d| d.eval_type()),
122 Ok(EvalType::GraphManaged)
123 )
124 });
125 if graph_managed {
126 let src = self.get_node(idx)?;
127 let input = inputs
128 .remove(&idx)
129 .ok_or_else(|| err_illegal_state("Error in retrieving node input"))?;
130 result = Some(src.evaluate(input, ctx));
131
132 if ctx.has_errors() && self.mode == EvaluationMode::Strict {
134 return Err(EvalErr {
135 errors: ctx.errors(),
136 });
137 }
138
139 let num_destinations = destinations.len();
140 for (i, (branch_num, dst_id)) in destinations {
141 let res = if i == num_destinations - 1 {
142 result.take()
143 } else {
144 result.clone()
145 };
146
147 let res =
148 res.ok_or_else(|| err_illegal_state("Error in retrieving source value"))?;
149 let inputs = inputs.entry(dst_id).or_insert_with(|| [None, None]);
150 inputs[branch_num as usize] = Some(res);
151 }
152 }
153 }
154
155 let result = result.ok_or_else(|| err_illegal_state("Error in retrieving eval output"))?;
156 Ok(Evaluated { result })
157 }
158
159 #[must_use]
160 pub fn to_dot_graph(&self) -> String {
161 format!("{:?}", Dot::with_config(&self.plan_graph, &[]))
162 }
163}
164
165pub type EvalResult = Result<Evaluated, EvalErr>;
167
168#[non_exhaustive]
170#[derive(Debug)]
171#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
172pub struct Evaluated {
173 pub result: Value,
174}
175
176pub trait EvalContext: Bindings<Value> + SessionContext + Debug {
178 fn add_error(&self, error: EvaluationError);
179 fn has_errors(&self) -> bool;
180 fn errors(&self) -> Vec<EvaluationError>;
181}
182
183#[derive(Debug)]
184pub struct BasicContext<'u> {
185 pub bindings: MapBindings<Value>,
186
187 pub sys: SystemContext,
188 pub user: FxHashMap<UniCase<String>, &'u (dyn Any)>,
189
190 pub errors: RefCell<Vec<EvaluationError>>,
191}
192
193impl BasicContext<'_> {
194 #[must_use]
195 pub fn new(bindings: MapBindings<Value>, sys: SystemContext) -> Self {
196 BasicContext {
197 bindings,
198 sys,
199 user: Default::default(),
200 errors: RefCell::new(vec![]),
201 }
202 }
203}
204
205impl SessionContext for BasicContext<'_> {
206 fn system_context(&self) -> &SystemContext {
207 &self.sys
208 }
209
210 fn user_context(&self, name: &str) -> Option<&(dyn Any)> {
211 let key = name.into();
212 self.user.get(&key).copied()
213 }
214}
215
216impl Bindings<Value> for BasicContext<'_> {
217 fn get<'a>(&'a self, name: &BindingsName<'_>) -> Option<Cow<'a, Value>> {
218 self.bindings.get(name)
219 }
220}
221
222impl EvalContext for BasicContext<'_> {
223 fn add_error(&self, error: EvaluationError) {
224 self.errors.borrow_mut().push(error);
225 }
226
227 fn has_errors(&self) -> bool {
228 !self.errors.borrow().is_empty()
229 }
230
231 fn errors(&self) -> Vec<EvaluationError> {
232 self.errors.take()
233 }
234}
235
236#[derive(Debug)]
237pub struct NestedContext<'c> {
238 pub bindings: MapBindings<Value>,
239 pub parent: &'c dyn EvalContext,
240}
241
242impl<'c> NestedContext<'c> {
243 pub fn new(bindings: MapBindings<Value>, parent: &'c dyn EvalContext) -> Self {
244 NestedContext { bindings, parent }
245 }
246}
247
248impl SessionContext for NestedContext<'_> {
249 delegate! {
250 to self.parent {
251 fn system_context(&self) -> &SystemContext;
252 fn user_context(&self, name: &str) -> Option<& (dyn Any )>;
253 }
254 }
255}
256
257impl Bindings<Value> for NestedContext<'_> {
258 fn get<'a>(&'a self, name: &BindingsName<'_>) -> Option<Cow<'a, Value>> {
259 match self.bindings.get(name) {
260 Some(v) => Some(v),
261 None => self.parent.get(name),
262 }
263 }
264}
265
266impl EvalContext for NestedContext<'_> {
267 delegate! {
268 to self.parent {
269 fn add_error(&self, error: EvaluationError);
270 fn has_errors(&self) -> bool;
271 fn errors(&self) -> Vec<EvaluationError>;
272 }
273 }
274}