1use crate::build_runtime_graph::graph_parse::CleanedDefinitionGraph;
2use crate::proto::{ChangeValue, ChangeValueWithCounter, DispatchResult, NodeWillExecute, WrappedChangeValue};
3
4pub trait ExecutionState {
5 fn get_count_node_execution(&self, node: &[u8]) -> Option<u64>;
6 fn inc_counter_node_execution(&mut self, node: &[u8]) -> u64;
7 fn get_value(&self, address: &[u8]) -> Option<(u64, ChangeValue)>;
8 fn set_value(&mut self, address: &[u8], counter: u64, value: ChangeValue);
9}
10
11
12pub fn evaluate_changes_against_node(
15 state: &impl ExecutionState,
16 paths_to_satisfy: &Vec<Vec<String>>
17) -> Option<Vec<WrappedChangeValue>> {
18 let mut satisfied_paths = vec![];
21
22 for path in paths_to_satisfy {
23 if let Some(change_value) = state.get_value(path.join(":").as_bytes()) {
24 satisfied_paths.push(change_value.clone());
25 }
26 }
27
28 if satisfied_paths.len() != paths_to_satisfy.len() { return None }
29
30 Some(satisfied_paths.into_iter().map(|(counter, v)| WrappedChangeValue {
31 monotonic_counter: counter,
32 change_value: Some(v),
33 }).collect())
34}
35
36
37pub fn dispatch_and_mutate_state(
44 clean_definition_graph: &CleanedDefinitionGraph,
45 state: &mut impl ExecutionState,
46 change_value_with_counter: &ChangeValueWithCounter
47) -> DispatchResult {
48 let g = clean_definition_graph;
49
50 for filled_value in &change_value_with_counter.filled_values {
54 let filled_value_address = &filled_value.clone().path.unwrap().address;
55
56 if let Some((prev_counter, _prev_change_value)) = state.get_value(filled_value_address.join(":").as_bytes()) {
60 if prev_counter >= change_value_with_counter.monotonic_counter {
61 continue
63 }
64 }
65
66 state.set_value(
67 filled_value_address.join(":").as_bytes().clone(),
68 change_value_with_counter.monotonic_counter,
69 filled_value.clone());
70
71 }
72
73
74 let mut node_executions: Vec<NodeWillExecute> = vec![];
77 for filled_value in &change_value_with_counter.filled_values {
79 let filled_value_address = &filled_value.clone().path.unwrap().address;
80
81 if let Some(matched_node_names) = g.dispatch_table.get(filled_value_address.join(":").as_str()) {
85 for node_that_should_exec in matched_node_names {
86 if let Some(choice_paths_to_satisfy) = g.query_paths.get(node_that_should_exec) {
87 for (idx, opt_paths_to_satisfy) in choice_paths_to_satisfy.iter().enumerate() {
88 if let Some(paths_to_satisfy) = opt_paths_to_satisfy {
90 if let Some(change_values_used_in_execution) = evaluate_changes_against_node(state, paths_to_satisfy) {
91 let node_will_execute = NodeWillExecute {
92 source_node: node_that_should_exec.clone(),
93 change_values_used_in_execution,
94 matched_query_index: idx as u64
95 };
96 node_executions.push(node_will_execute);
97 }
98 } else {
99 if state.get_count_node_execution(node_that_should_exec.as_bytes()).unwrap_or(0) > 0 {
102 continue;
103 }
104 node_executions.push(NodeWillExecute {
105 source_node: node_that_should_exec.clone(),
106 change_values_used_in_execution: vec![],
107 matched_query_index: idx as u64
108 });
109 }
110 state.inc_counter_node_execution(node_that_should_exec.as_bytes());
111
112 }
113
114 }
115 }
116 }
117 }
118
119 DispatchResult {
122 operations: node_executions,
123 }
124}
125
126
127#[cfg(test)]
128mod tests {
129 use crate::proto::{File, item, Item, ItemCore, OutputType, Path, PromptGraphNodeEcho, Query, SerializedValue};
130 use crate::graph_definition::DefinitionGraph;
131 use std::collections::HashMap;
132 use crate::proto::serialized_value::Val;
133
134 use super::*;
135
136 #[derive(Debug)]
137 pub struct TestState {
138 value: HashMap<Vec<u8>, (u64, ChangeValue)>,
139 node_executions: HashMap<Vec<u8>, u64>
140 }
141 impl TestState {
142 fn new() -> Self {
143 Self {
144 value: HashMap::new(),
145 node_executions: HashMap::new()
146 }
147 }
148 }
149
150 impl ExecutionState for TestState {
151 fn inc_counter_node_execution(&mut self, node: &[u8]) -> u64 {
152 let v = self.node_executions.entry(node.to_vec()).or_insert(0);
153 *v += 1;
154 *v
155 }
156
157 fn get_count_node_execution(&self, node: &[u8]) -> Option<u64> {
158 self.node_executions.get(node).map(|x| *x)
159 }
160
161 fn get_value(&self, address: &[u8]) -> Option<(u64, ChangeValue)> {
162 self.value.get(address).cloned()
163 }
164
165 fn set_value(&mut self, address: &[u8], counter: u64, value: ChangeValue) {
166 self.value.insert(address.to_vec(), (counter, value));
167 }
168 }
169
170 fn get_file_empty_query() -> File {
171 File {
172 id: "test".to_string(),
173 nodes: vec![Item{
174 core: Some(ItemCore {
175 name: "EmptyNode".to_string(),
176 triggers: vec![Query{ query: None}],
177 output: Some(OutputType {
178 output: "{}".to_string(),
179 }),
180 output_tables: vec![],
181 }),
182 item: Some(item::Item::NodeEcho(PromptGraphNodeEcho {
183 }))}],
184 }
185 }
186
187 fn get_file() -> File {
188 File {
189 id: "test".to_string(),
190 nodes: vec![Item{
191 core: Some(ItemCore {
192 name: "".to_string(),
193 triggers: vec![Query {
194 query: None,
195 }],
196 output: Some(OutputType {
197 output: "{} ".to_string(),
198 }),
199 output_tables: vec![]
200 }),
201 item: Some(item::Item::NodeEcho(PromptGraphNodeEcho {
202 }))}],
203 }
204 }
205
206 fn get_file_with_paths() -> File {
207 File {
208 id: "test".to_string(),
209 nodes: vec![Item{
210 core: Some(ItemCore {
211 name: "test_node".to_string(),
212 triggers: vec![Query {
213 query: Some("SELECT path1, path2 FROM source".to_string()),
214 }],
215 output: Some(OutputType {
216 output: "{} ".to_string(),
217 }),
218 output_tables: vec![]
219 }),
220 item: Some(item::Item::NodeEcho(PromptGraphNodeEcho { }))}],
221 }
222 }
223
224
225 #[test]
226 fn test_dispatch_with_file_and_change() {
227 let mut state = TestState::new();
228 let file = get_file();
229 let d = DefinitionGraph::from_file(file);
230 let g = CleanedDefinitionGraph::new(&d);
231 let c = ChangeValueWithCounter {
232 filled_values: vec![],
233 parent_monotonic_counters: vec![],
234 monotonic_counter: 0,
235 branch: 0,
236 source_node: "".to_string(),
237 };
238 let result = dispatch_and_mutate_state(&g, &mut state, &c);
239 assert_eq!(result.operations.len(), 0);
240 }
241
242 #[test]
243 fn test_we_dispatch_nodes_that_have_no_query_once() {
244 let mut state = TestState::new();
245 let file = get_file_empty_query();
246 let d = DefinitionGraph::from_file(file);
247 let g = CleanedDefinitionGraph::new(&d);
248 let c = ChangeValueWithCounter {
249 filled_values: vec![ChangeValue {
250 path: Some(Path {
251 address: vec![],
252 }),
253 value: None,
254 branch: 0,
255 }],
256 parent_monotonic_counters: vec![],
257 monotonic_counter: 0,
258 branch: 0,
259 source_node: "EmptyNode".to_string(),
260 };
261 let result = dispatch_and_mutate_state(&g, &mut state, &c);
262 assert_eq!(result.operations.len(), 1);
263 assert_eq!(result.operations[0], NodeWillExecute {
264 source_node: "EmptyNode".to_string(),
265 change_values_used_in_execution: vec![],
266 matched_query_index: 0
267 });
268
269 let result = dispatch_and_mutate_state(&g, &mut state, &c);
271 assert_eq!(result.operations.len(), 0);
272 }
273
274 #[test]
275 fn test_all_paths_must_be_satisfied_before_dispatch() {
276 let mut state = TestState::new();
278 let file = get_file_with_paths();
279 let d = DefinitionGraph::from_file(file);
280 let g = CleanedDefinitionGraph::new(&d);
281
282 assert_eq!(g.dispatch_table.get("source:path1"), Some(&vec!["test_node".to_string()]));
284 assert_eq!(g.dispatch_table.get("source:path2"), Some(&vec!["test_node".to_string()]));
285
286 let c = ChangeValueWithCounter {
288 filled_values: vec![
289 ChangeValue {
290 path: Some(Path {
291 address: vec!["source".to_string(), "path1".to_string()],
292 }),
293 value: Some(SerializedValue{ val: Some(Val::String("value".to_string()))}),
294 branch: 0,
295 },
296 ],
297 parent_monotonic_counters: vec![],
298 monotonic_counter: 1,
299 branch: 0,
300 source_node: "__initialize__".to_string(),
301 };
302
303 let result = dispatch_and_mutate_state(&g, &mut state, &c);
304 assert_eq!(result.operations.len(), 0);
306
307
308 let c = ChangeValueWithCounter {
310 filled_values: vec![
311 ChangeValue {
312 path: Some(Path {
313 address: vec!["source".to_string(), "path2".to_string()],
314 }),
315 value: Some(SerializedValue{ val: Some(Val::String("value".to_string()))}),
316 branch: 0,
317 },
318 ],
319 parent_monotonic_counters: vec![],
320 monotonic_counter: 1,
321 branch: 0,
322 source_node: "__initialize__".to_string(),
323 };
324 let result = dispatch_and_mutate_state(&g, &mut state, &c);
325 assert_eq!(result.operations.len(), 1);
327 assert_eq!(result.operations[0], NodeWillExecute {
328 source_node: "test_node".to_string(),
329 change_values_used_in_execution: vec![
330 WrappedChangeValue {
331 monotonic_counter: 1,
332 change_value: Some(ChangeValue {
333 path: Some(Path {
334 address: vec!["source".to_string(), "path1".to_string()],
335 }),
336 value: Some(SerializedValue{ val: Some(Val::String("value".to_string()))}),
337 branch: 0,
338 }),
339 },
340 WrappedChangeValue {
341 monotonic_counter: 1,
342 change_value: Some(ChangeValue {
343 path: Some(Path {
344 address: vec!["source".to_string(), "path2".to_string()],
345 }),
346 value: Some(SerializedValue{ val: Some(Val::String("value".to_string()))}),
347 branch: 0,
348 }),
349 },
350 ],
351 matched_query_index: 0,
352 });
353 }
354
355}