prompt_graph_core/
execution_router.rs

1use crate::build_runtime_graph::graph_parse::CleanedDefinitionGraph;
2use crate::proto::{ChangeValue, ChangeValueWithCounter, DispatchResult, NodeWillExecute, WrappedChangeValue};
3
4pub trait ExecutionState {
5    fn get_count_node_execution(&self, node: &[u8]) -> Option<u64>;
6    fn inc_counter_node_execution(&mut self, node: &[u8]) -> u64;
7    fn get_value(&self, address: &[u8]) -> Option<(u64, ChangeValue)>;
8    fn set_value(&mut self, address: &[u8], counter: u64, value: ChangeValue);
9}
10
11
12/// This is used to evaluate if a newly introduced node should be immediately evaluated
13/// against the state of the system.
14pub fn evaluate_changes_against_node(
15    state: &impl ExecutionState,
16    paths_to_satisfy: &Vec<Vec<String>>
17) -> Option<Vec<WrappedChangeValue>> {
18    // for each of the matched nodes, we need to evaluate the query against the current state
19    // check if the updated state object is satisfying all necessary paths for this query
20    let mut satisfied_paths = vec![];
21
22    for path in paths_to_satisfy {
23        if let Some(change_value) = state.get_value(path.join(":").as_bytes()) {
24            satisfied_paths.push(change_value.clone());
25        }
26    }
27
28    if satisfied_paths.len() != paths_to_satisfy.len() { return None }
29
30    Some(satisfied_paths.into_iter().map(|(counter, v)| WrappedChangeValue {
31        monotonic_counter: counter,
32        change_value: Some(v),
33    }).collect())
34}
35
36
37/// This dispatch method is responsible for identifying which nodes should execute based on
38/// a current key value state and a clean definition graph. It returns a list of nodes that
39/// should be executed, and the path references that they were satisfied with. This exists in
40/// the core implementation because it may be used in client code or our server. It will mutate
41/// the provided ExecutionState to reflect the application of the provided change. Execution state
42/// may internally persist records of what environment this change occurred in.
43pub fn dispatch_and_mutate_state(
44    clean_definition_graph: &CleanedDefinitionGraph,
45    state: &mut impl ExecutionState,
46    change_value_with_counter: &ChangeValueWithCounter
47) -> DispatchResult {
48    let g = clean_definition_graph;
49
50    // TODO: dispatch with an vec![] address path should do what?
51
52    // First pass we update the values present in the change
53    for filled_value in &change_value_with_counter.filled_values {
54        let filled_value_address = &filled_value.clone().path.unwrap().address;
55
56        // In order to avoid double-execution of nodes, we need to check if the value has changed.
57        // matching here means that the state we are assessing execution of has already bee applied to our state.
58        // The state may have already been applied in a parent branch if the execution is taking place there as well.
59        if let Some((prev_counter, _prev_change_value)) = state.get_value(filled_value_address.join(":").as_bytes()) {
60            if prev_counter >= change_value_with_counter.monotonic_counter {
61                // Value has not updated - skip this change reflecting it and continue to the next change
62                continue
63            }
64        }
65
66        state.set_value(
67            filled_value_address.join(":").as_bytes().clone(),
68            change_value_with_counter.monotonic_counter,
69            filled_value.clone());
70
71    }
72
73
74    // node_executions looks like a list of node names and their inputs
75    // Nodes may execute _multiple times_ in response to some changes that might occur.
76    let mut node_executions: Vec<NodeWillExecute> = vec![];
77    // Apply a second pass to resolve into nodes that should execute
78    for filled_value in &change_value_with_counter.filled_values {
79        let filled_value_address = &filled_value.clone().path.unwrap().address;
80
81        // TODO: if we're subscribed to all of the outputs of a node this will-eval a lot
82        // filter to nodes matched by the affected path -> name
83        // nodes with no queries are referred to by the empty string (derived from empty vec![]) and are always matched
84        if let Some(matched_node_names) = g.dispatch_table.get(filled_value_address.join(":").as_str()) {
85            for node_that_should_exec in matched_node_names {
86                if let Some(choice_paths_to_satisfy) = g.query_paths.get(node_that_should_exec) {
87                    for (idx, opt_paths_to_satisfy) in choice_paths_to_satisfy.iter().enumerate() {
88                        // TODO: NodeWillExecute should include _which_ query was satisfied
89                        if let Some(paths_to_satisfy) = opt_paths_to_satisfy {
90                            if let Some(change_values_used_in_execution)  = evaluate_changes_against_node(state, paths_to_satisfy) {
91                                let node_will_execute = NodeWillExecute {
92                                    source_node: node_that_should_exec.clone(),
93                                    change_values_used_in_execution,
94                                    matched_query_index: idx as u64
95                                };
96                                node_executions.push(node_will_execute);
97                            }
98                        } else {
99                            // No paths to satisfy
100                            // we've already executed this node, so we don't need to do it again
101                            if state.get_count_node_execution(node_that_should_exec.as_bytes()).unwrap_or(0) > 0 {
102                                continue;
103                            }
104                            node_executions.push(NodeWillExecute {
105                                source_node: node_that_should_exec.clone(),
106                                change_values_used_in_execution: vec![],
107                                matched_query_index: idx as u64
108                            });
109                        }
110                        state.inc_counter_node_execution(node_that_should_exec.as_bytes());
111
112                    }
113
114                }
115            }
116        }
117    }
118
119    // we only _tell_ what we think should happen. We don't actually do it.
120    // it is up to the wrapping SDK what to do or not do with our information
121    DispatchResult {
122        operations: node_executions,
123    }
124}
125
126
127#[cfg(test)]
128mod tests {
129    use crate::proto::{File, item, Item, ItemCore, OutputType, Path, PromptGraphNodeEcho, Query, SerializedValue};
130    use crate::graph_definition::DefinitionGraph;
131    use std::collections::HashMap;
132    use crate::proto::serialized_value::Val;
133
134    use super::*;
135
136    #[derive(Debug)]
137    pub struct TestState {
138        value: HashMap<Vec<u8>, (u64, ChangeValue)>,
139        node_executions: HashMap<Vec<u8>, u64>
140    }
141    impl TestState {
142        fn new() -> Self {
143            Self {
144                value: HashMap::new(),
145                node_executions: HashMap::new()
146            }
147        }
148    }
149
150    impl ExecutionState for TestState {
151        fn inc_counter_node_execution(&mut self, node: &[u8]) -> u64 {
152            let v = self.node_executions.entry(node.to_vec()).or_insert(0);
153            *v += 1;
154            *v
155        }
156
157        fn get_count_node_execution(&self, node: &[u8]) -> Option<u64> {
158            self.node_executions.get(node).map(|x| *x)
159        }
160
161        fn get_value(&self, address: &[u8]) -> Option<(u64, ChangeValue)> {
162            self.value.get(address).cloned()
163        }
164
165        fn set_value(&mut self, address: &[u8], counter: u64, value: ChangeValue) {
166            self.value.insert(address.to_vec(), (counter, value));
167        }
168    }
169
170    fn get_file_empty_query() -> File {
171        File {
172            id: "test".to_string(),
173            nodes: vec![Item{
174                core: Some(ItemCore {
175                    name: "EmptyNode".to_string(),
176                    triggers: vec![Query{ query: None}],
177                    output: Some(OutputType {
178                        output: "{}".to_string(),
179                    }),
180                    output_tables: vec![],
181                }),
182                item: Some(item::Item::NodeEcho(PromptGraphNodeEcho {
183                }))}],
184        }
185    }
186
187    fn get_file() -> File {
188        File {
189            id: "test".to_string(),
190            nodes: vec![Item{
191                core: Some(ItemCore {
192                    name: "".to_string(),
193                    triggers: vec![Query {
194                        query: None,
195                    }],
196                    output: Some(OutputType {
197                        output: "{} ".to_string(),
198                    }),
199                    output_tables: vec![]
200                }),
201                item: Some(item::Item::NodeEcho(PromptGraphNodeEcho {
202                }))}],
203        }
204    }
205
206    fn get_file_with_paths() -> File {
207        File {
208            id: "test".to_string(),
209            nodes: vec![Item{
210                core: Some(ItemCore {
211                    name: "test_node".to_string(),
212                    triggers: vec![Query {
213                        query: Some("SELECT path1, path2 FROM source".to_string()),
214                    }],
215                    output: Some(OutputType {
216                        output: "{} ".to_string(),
217                    }),
218                    output_tables: vec![]
219                }),
220                item: Some(item::Item::NodeEcho(PromptGraphNodeEcho { }))}],
221        }
222    }
223
224
225    #[test]
226    fn test_dispatch_with_file_and_change() {
227        let mut state = TestState::new();
228        let file = get_file();
229        let d = DefinitionGraph::from_file(file);
230        let g = CleanedDefinitionGraph::new(&d);
231        let c =  ChangeValueWithCounter {
232            filled_values: vec![],
233            parent_monotonic_counters: vec![],
234            monotonic_counter: 0,
235            branch: 0,
236            source_node: "".to_string(),
237        };
238        let result = dispatch_and_mutate_state(&g, &mut state, &c);
239        assert_eq!(result.operations.len(), 0);
240    }
241
242    #[test]
243    fn test_we_dispatch_nodes_that_have_no_query_once() {
244        let mut state = TestState::new();
245        let file = get_file_empty_query();
246        let d = DefinitionGraph::from_file(file);
247        let g = CleanedDefinitionGraph::new(&d);
248        let c =  ChangeValueWithCounter {
249            filled_values: vec![ChangeValue {
250                path: Some(Path {
251                    address: vec![],
252                }),
253                value: None,
254                branch: 0,
255            }],
256            parent_monotonic_counters: vec![],
257            monotonic_counter: 0,
258            branch: 0,
259            source_node: "EmptyNode".to_string(),
260        };
261        let result = dispatch_and_mutate_state(&g, &mut state, &c);
262        assert_eq!(result.operations.len(), 1);
263        assert_eq!(result.operations[0], NodeWillExecute {
264            source_node: "EmptyNode".to_string(),
265            change_values_used_in_execution: vec![],
266            matched_query_index: 0
267        });
268
269        // Does not re-execute
270        let result = dispatch_and_mutate_state(&g, &mut state, &c);
271        assert_eq!(result.operations.len(), 0);
272    }
273
274    #[test]
275    fn test_all_paths_must_be_satisfied_before_dispatch() {
276        // State should start empty
277        let mut state = TestState::new();
278        let file = get_file_with_paths();
279        let d = DefinitionGraph::from_file(file);
280        let g = CleanedDefinitionGraph::new(&d);
281
282        // Confirm the dispatch table has the paths that we expect
283        assert_eq!(g.dispatch_table.get("source:path1"), Some(&vec!["test_node".to_string()]));
284        assert_eq!(g.dispatch_table.get("source:path2"), Some(&vec!["test_node".to_string()]));
285
286        // Dispatch a change that satisfies only one of the two paths
287        let c = ChangeValueWithCounter {
288            filled_values: vec![
289                ChangeValue {
290                    path: Some(Path {
291                        address: vec!["source".to_string(), "path1".to_string()],
292                    }),
293                    value: Some(SerializedValue{ val: Some(Val::String("value".to_string()))}),
294                    branch: 0,
295                },
296            ],
297            parent_monotonic_counters: vec![],
298            monotonic_counter: 1,
299            branch: 0,
300            source_node: "__initialize__".to_string(),
301        };
302
303        let result = dispatch_and_mutate_state(&g, &mut state, &c);
304        // The dispatch should return no operations
305        assert_eq!(result.operations.len(), 0);
306
307
308        // Fill the second path
309        let c = ChangeValueWithCounter {
310            filled_values: vec![
311                ChangeValue {
312                    path: Some(Path {
313                        address: vec!["source".to_string(), "path2".to_string()],
314                    }),
315                    value: Some(SerializedValue{ val: Some(Val::String("value".to_string()))}),
316                    branch: 0,
317                },
318            ],
319            parent_monotonic_counters: vec![],
320            monotonic_counter: 1,
321            branch: 0,
322            source_node: "__initialize__".to_string(),
323        };
324        let result = dispatch_and_mutate_state(&g, &mut state, &c);
325        // This should now return an operation because both paths have been satisfied
326        assert_eq!(result.operations.len(), 1);
327        assert_eq!(result.operations[0], NodeWillExecute {
328            source_node: "test_node".to_string(),
329            change_values_used_in_execution: vec![
330                WrappedChangeValue {
331                    monotonic_counter: 1,
332                    change_value: Some(ChangeValue {
333                        path: Some(Path {
334                            address: vec!["source".to_string(), "path1".to_string()],
335                        }),
336                        value: Some(SerializedValue{ val: Some(Val::String("value".to_string()))}),
337                        branch: 0,
338                    }),
339                },
340                WrappedChangeValue {
341                    monotonic_counter: 1,
342                    change_value: Some(ChangeValue {
343                        path: Some(Path {
344                            address: vec!["source".to_string(), "path2".to_string()],
345                        }),
346                        value: Some(SerializedValue{ val: Some(Val::String("value".to_string()))}),
347                        branch: 0,
348                    }),
349                },
350            ],
351            matched_query_index: 0,
352        });
353    }
354
355}