moduforge_rules_engine/handler/
traversal.rs1use ahash::HashMap;
3use fixedbitset::FixedBitSet;
4use petgraph::data::DataMap;
5use petgraph::matrix_graph::Zero;
6use petgraph::prelude::{EdgeIndex, NodeIndex, StableDiGraph};
7use petgraph::visit::{EdgeRef, IntoNodeIdentifiers, VisitMap, Visitable};
8use petgraph::{Incoming, Outgoing};
9use serde_json::json;
10use std::rc::Rc;
11use std::sync::atomic::Ordering;
12use std::sync::Arc;
13use std::time::Instant;
14
15use crate::config::ZEN_CONFIG;
16use crate::model::{
17 DecisionEdge, DecisionNode, DecisionNodeKind, SwitchStatement,
18 SwitchStatementHitPolicy,
19};
20use crate::DecisionGraphTrace;
21use moduforge_rules_expression::variable::Variable;
22use moduforge_rules_expression::Isolate;
23
24pub(crate) type StableDiDecisionGraph =
81 StableDiGraph<Arc<DecisionNode>, Arc<DecisionEdge>>;
82
83pub(crate) struct GraphWalker {
85 ordered: FixedBitSet,
87 to_visit: Vec<NodeIndex>,
89 node_data: HashMap<NodeIndex, Variable>,
91 iter: usize,
93 visited_switch_nodes: Vec<NodeIndex>,
95 nodes_in_context: bool,
97}
98
99const ITER_MAX: usize = 1_000;
101
102impl GraphWalker {
103 pub fn new(graph: &StableDiDecisionGraph) -> Self {
106 let mut topo = Self::empty(graph);
107 topo.extend_with_initials(graph);
108 topo
109 }
110
111 fn extend_with_initials(
114 &mut self,
115 g: &StableDiDecisionGraph,
116 ) {
117 self.to_visit.extend(g.node_identifiers().filter(move |&nid| {
118 g.node_weight(nid).is_some_and(|n| {
119 matches!(n.kind, DecisionNodeKind::InputNode { content: _ })
120 })
121 }));
122 }
123
124 fn empty(graph: &StableDiDecisionGraph) -> Self {
127 Self {
128 ordered: graph.visit_map(),
129 to_visit: Vec::new(),
130 node_data: Default::default(),
131 visited_switch_nodes: Default::default(),
132 iter: 0,
133 nodes_in_context: ZEN_CONFIG
134 .nodes_in_context
135 .load(Ordering::Relaxed),
136 }
137 }
138
139 pub fn reset(
142 &mut self,
143 g: &StableDiDecisionGraph,
144 ) {
145 self.ordered.clear();
146 self.to_visit.clear();
147 self.extend_with_initials(g);
148 self.iter += 1;
149 }
150
151 pub fn get_node_data(
154 &self,
155 node_id: NodeIndex,
156 ) -> Option<Variable> {
157 self.node_data.get(&node_id).cloned()
158 }
159
160 pub fn ending_variables(
163 &self,
164 g: &StableDiDecisionGraph,
165 ) -> Variable {
166 g.node_indices()
167 .filter(|nid| {
168 self.ordered.is_visited(nid)
169 && g.neighbors_directed(*nid, Outgoing).count().is_zero()
170 })
171 .fold(Variable::empty_object(), |mut acc, curr| {
172 match self.node_data.get(&curr) {
173 None => acc,
174 Some(data) => acc.merge(data),
175 }
176 })
177 }
178
179 pub fn get_all_node_data(
182 &self,
183 g: &StableDiDecisionGraph,
184 ) -> Variable {
185 let node_values = self
186 .node_data
187 .iter()
188 .filter_map(|(idx, value)| {
189 let weight = g.node_weight(*idx)?;
190 Some((Rc::from(weight.name.as_str()), value.clone()))
191 })
192 .collect();
193
194 Variable::from_object(node_values)
195 }
196
197 pub fn set_node_data(
200 &mut self,
201 node_id: NodeIndex,
202 value: Variable,
203 ) {
204 self.node_data.insert(node_id, value);
205 }
206
207 pub fn incoming_node_data(
210 &self,
211 g: &StableDiDecisionGraph,
212 node_id: NodeIndex,
213 with_nodes: bool,
214 ) -> Variable {
215 let value = self
216 .merge_node_data(g.neighbors_directed(node_id, Incoming))
217 .depth_clone(1);
218 if self.nodes_in_context {
219 if let Some(object_ref) =
220 with_nodes.then_some(value.as_object()).flatten()
221 {
222 let mut object = object_ref.borrow_mut();
223 object.insert(Rc::from("$nodes"), self.get_all_node_data(g));
224 }
225 }
226 value
227 }
228
229 pub fn merge_node_data<I>(
232 &self,
233 iter: I,
234 ) -> Variable
235 where
236 I: Iterator<Item = NodeIndex>,
237 {
238 let default_map = Variable::empty_object();
239 iter.fold(Variable::empty_object(), |mut prev, curr| {
240 let data = self.node_data.get(&curr).unwrap_or(&default_map);
241 prev.merge_clone(data)
242 })
243 }
244
245 pub fn next<F: FnMut(DecisionGraphTrace)>(
269 &mut self,
270 g: &mut StableDiDecisionGraph,
271 mut on_trace: Option<F>,
272 ) -> Option<NodeIndex> {
273 let start = Instant::now();
275
276 if self.iter >= ITER_MAX {
278 return None;
279 }
280
281 while let Some(nid) = self.to_visit.pop() {
283 let decision_node = g.node_weight(nid)?.clone();
285
286 if self.ordered.is_visited(&nid) {
288 continue;
289 }
290
291 if !self.all_dependencies_resolved(g, nid) {
294 self.to_visit.push(nid);
295 self.to_visit.extend(self.get_unresolved_dependencies(g, nid));
296 continue;
297 }
298
299 self.ordered.visit(nid);
301
302 if let DecisionNodeKind::SwitchNode { content } =
304 &decision_node.kind
305 {
306 if !self.visited_switch_nodes.contains(&nid) {
308 let input_data = self.incoming_node_data(g, nid, true);
310 let env = input_data.depth_clone(1);
311 env.dot_insert("$", input_data.depth_clone(1));
312 let mut isolate = Isolate::with_environment(env);
313
314 let mut statement_iter = content.statements.iter();
316 let valid_statements: Vec<&SwitchStatement> =
317 match content.hit_policy {
318 SwitchStatementHitPolicy::First => statement_iter
320 .find(|&s| {
321 switch_statement_evaluate(&mut isolate, &s)
322 })
323 .into_iter()
324 .collect(),
325 SwitchStatementHitPolicy::Collect => statement_iter
327 .filter(|&s| {
328 switch_statement_evaluate(&mut isolate, &s)
329 })
330 .collect(),
331 };
332
333 let valid_statements_trace = Variable::from_array(
335 valid_statements
336 .iter()
337 .map(|&statement| {
338 let v = Variable::empty_object();
339 v.dot_insert(
340 "id",
341 Variable::String(Rc::from(
342 statement.id.as_str(),
343 )),
344 );
345 v
346 })
347 .collect(),
348 );
349
350 input_data.dot_remove("$nodes");
352
353 if let Some(on_trace) = &mut on_trace {
355 on_trace(DecisionGraphTrace {
356 id: decision_node.id.clone(),
357 name: decision_node.name.clone(),
358 input: input_data.shallow_clone(),
359 output: input_data.shallow_clone(),
360 order: 0,
361 performance: Some(format!(
362 "{:.1?}",
363 start.elapsed()
364 )),
365 trace_data: Some(
366 json!({ "statements": valid_statements_trace })
367 .into(),
368 ),
369 });
370 }
371
372 let edges_to_remove: Vec<EdgeIndex> = g
375 .edges_directed(nid, Outgoing)
376 .filter(|edge| {
377 edge.weight().source_handle.as_ref().map_or(
378 true,
379 |handle| {
380 !valid_statements
381 .iter()
382 .any(|s| s.id == *handle)
383 },
384 )
385 })
386 .map(|edge| edge.id())
387 .collect();
388
389 let edges_remove_count = edges_to_remove.len();
391
392 for edge in edges_to_remove {
394 remove_edge_recursive(g, edge);
395 }
396
397 self.visited_switch_nodes.push(nid);
399
400 if edges_remove_count > 0 {
402 self.reset(g);
403 continue;
404 }
405 }
406 }
407
408 let successors = g.neighbors_directed(nid, Outgoing);
410 self.to_visit.extend(successors);
411
412 return Some(nid);
414 }
415
416 None
418 }
419
420 fn all_dependencies_resolved(
423 &self,
424 g: &StableDiDecisionGraph,
425 nid: NodeIndex,
426 ) -> bool {
427 g.neighbors_directed(nid, Incoming)
428 .all(|dep| self.ordered.is_visited(&dep))
429 }
430
431 fn get_unresolved_dependencies(
434 &self,
435 g: &StableDiDecisionGraph,
436 nid: NodeIndex,
437 ) -> Vec<NodeIndex> {
438 g.neighbors_directed(nid, Incoming)
439 .filter(|dep| !self.ordered.is_visited(dep))
440 .collect()
441 }
442}
443
444fn switch_statement_evaluate<'a>(
447 isolate: &mut Isolate<'a>,
448 switch_statement: &'a SwitchStatement,
449) -> bool {
450 if switch_statement.condition.is_empty() {
451 return true;
452 }
453
454 isolate
456 .run_standard(switch_statement.condition.as_str())
457 .map_or(false, |v| v.as_bool().unwrap_or(false))
458}
459
460fn remove_edge_recursive(
463 g: &mut StableDiDecisionGraph,
464 edge_id: EdgeIndex,
465) {
466 let Some((source_nid, target_nid)) = g.edge_endpoints(edge_id) else {
467 return;
468 };
469
470 g.remove_edge(edge_id);
471
472 let target_incoming_count = g.edges_directed(target_nid, Incoming).count();
474 if target_incoming_count.is_zero() {
475 let edge_ids: Vec<EdgeIndex> = g
476 .edges_directed(target_nid, Outgoing)
477 .map(|edge| edge.id())
478 .collect();
479
480 edge_ids.iter().for_each(|edge_id| {
481 remove_edge_recursive(g, edge_id.clone());
482 });
483
484 if g.edges(target_nid).count().is_zero() {
485 g.remove_node(target_nid);
486 }
487 }
488
489 let source_outgoing_count = g.edges_directed(source_nid, Outgoing).count();
491 if source_outgoing_count.is_zero() {
492 let edge_ids: Vec<EdgeIndex> = g
493 .edges_directed(source_nid, Incoming)
494 .map(|edge| edge.id())
495 .collect();
496
497 edge_ids.iter().for_each(|edge_id| {
498 remove_edge_recursive(g, edge_id.clone());
499 });
500
501 if g.edges(source_nid).count().is_zero() {
502 g.remove_node(source_nid);
503 }
504 }
505}