graph_process_manager_core/process/manager.rs
1/*
2Copyright 2020 Erwan Mahe (github.com/erwanM974)
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17use std::collections::HashSet;
18
19
20use crate::process::config::AbstractProcessConfiguration;
21use crate::process::logger::AbstractProcessLogger;
22use crate::queue::delegate::ProcessQueueDelegate;
23use crate::queue::memorized_node::MemorizedNode;
24use crate::queue::priorities::GenericProcessPriorities;
25use crate::queue::queued_step::EnqueuedStep;
26use crate::queue::strategy::QueueSearchStrategy;
27
28use crate::process::persistent_state::AbstractProcessMutablePersistentState;
29use crate::process::handler::AbstractAlgorithmOperationHandler;
30
31use super::filter::GenericFiltersManager;
32use super::identifier::UniqueIdentifierGenerator;
33use super::logger::*;
34use super::node_memoizer::NodeMemoizer;
35
36
37
38/**
39 * Keeps track of the internal state (not domain-specific) of the process.
40 * **/
41pub(crate) struct ProcessManagerInternalStateManager<Conf : AbstractProcessConfiguration> {
42 /// before the process starts, the initial node is kept in an Option
43 pub initial_node_if_not_yet_started : Option<Conf::DomainSpecificNode>,
44 /// this generator guarantees uniqueness of the identifiers of the nodes
45 pub identifier_generator : UniqueIdentifierGenerator,
46 /// keeps track of nodes that have at least one child
47 /// this is used for the HCS queue and "loggers_notify_last_child_step_of_node_processed"
48 /// once all the children have been processed this is garbage collected
49 pub node_has_processed_child_tracker : HashSet<u32>,
50 /// for memoizing nodes and exploring the process as a graph instead of a tree
51 pub node_memoizer : NodeMemoizer<Conf>,
52}
53
54impl<Conf: AbstractProcessConfiguration> ProcessManagerInternalStateManager<Conf> {
55 pub fn new(
56 initial_node: Conf::DomainSpecificNode,
57 node_memoizer: NodeMemoizer<Conf>
58 ) -> Self {
59 Self {
60 initial_node_if_not_yet_started : Some(initial_node),
61 identifier_generator : UniqueIdentifierGenerator::default(),
62 node_has_processed_child_tracker : HashSet::new(),
63 node_memoizer
64 }
65 }
66}
67
68
69
70/**
71 * Entity responsible of the execution of the overall process.
72 * **/
73pub struct GenericProcessManager<Conf : AbstractProcessConfiguration> {
74 pub context_and_param : Conf::ContextAndParameterization,
75 // ***
76 delegate : ProcessQueueDelegate<Conf::DomainSpecificStep,Conf::DomainSpecificNode,Conf::Priorities>,
77 // ***
78 pub global_state : Conf::MutablePersistentState,
79 // ***
80 filters_manager : GenericFiltersManager<Conf>,
81 // ***
82 pub loggers : Vec<Box< dyn AbstractProcessLogger<Conf>>>,
83 // ***
84 internal_state : ProcessManagerInternalStateManager<Conf>
85}
86
87
88
89impl<Conf : 'static + AbstractProcessConfiguration> GenericProcessManager<Conf> {
90
91 pub fn new(
92 context_and_param : Conf::ContextAndParameterization,
93 strategy: QueueSearchStrategy,
94 priorities: GenericProcessPriorities<Conf::Priorities>,
95 filters_manager : GenericFiltersManager<Conf>,
96 loggers : Vec<Box< dyn AbstractProcessLogger<Conf>>>,
97 is_memoized : bool,
98 initial_node : Conf::DomainSpecificNode
99 ) -> GenericProcessManager<Conf> {
100 let initial_global_state = Conf::MutablePersistentState::get_initial_state(
101 &context_and_param,
102 &initial_node
103 );
104 let internal_state = ProcessManagerInternalStateManager::new(
105 initial_node,
106 NodeMemoizer::new(is_memoized)
107 );
108 GenericProcessManager{
109 context_and_param,
110 delegate : ProcessQueueDelegate::new(strategy, priorities),
111 global_state : initial_global_state,
112 filters_manager,
113 loggers,
114 internal_state
115 }
116 }
117
118 pub fn get_logger(&self, logger_id : usize) -> Option<&dyn AbstractProcessLogger<Conf>> {
119 self.loggers.get(logger_id).map(|x| &**x)
120 }
121
122 pub fn start_process(
123 &mut self
124 ) -> bool {
125
126 if self.internal_state.initial_node_if_not_yet_started.is_none() {
127 return false;
128 }
129
130 loggers_initialize(
131 self.loggers.iter_mut(),
132 &self.context_and_param,
133 self.delegate.get_strategy(),
134 self.delegate.get_priorities(),
135 &self.filters_manager,
136 &self.global_state,
137 self.internal_state.node_memoizer.is_memoized()
138 );
139
140 let initial_node = self.internal_state.initial_node_if_not_yet_started.take().unwrap();
141
142 let warrants_termination = {
143 let new_node_id = self.internal_state.identifier_generator.get_next();
144 self.pre_process_new_node(
145 &initial_node,
146 new_node_id
147 );
148 self.process_new_node_and_check_termination(
149 initial_node,
150 new_node_id
151 )
152 };
153
154 if !warrants_termination {
155
156 'process_step_loop : while let Some(
157 (step_to_process,mut opt_parent_node)
158 ) = self.delegate.extract_from_queue() {
159
160 {
161 // this is isolated to avoid borrow checker problems
162
163 let parent_node =
164 opt_parent_node.as_mut().unwrap_or_else(|| self.delegate.get_mut_memorized_node(step_to_process.parent_node_id));
165
166 // we will process the step that may be fired from the parent node
167 // in any case, we update the parent node's remainign to process childrens
168 parent_node.remaining_child_steps_ids_to_process.remove(&step_to_process.id_as_potential_step_from_parent);
169 }
170
171 // we need an immutable reference to the parent node
172 // but it may be under self.delegate
173 // so then when calling "self.process_step_and_check_termination(step_to_process,parent_node)"
174 // we run into borrow checker problem
175 // for now the solution is to clone the node even though not ideal
176 let parent_node_clone = match opt_parent_node {
177 None => {
178 self.delegate.get_memorized_node(step_to_process.parent_node_id).clone()
179 },
180 Some(x) => {
181 x
182 }
183 };
184
185 let warrants_termination_inner = self.process_step_and_check_termination(
186 step_to_process,
187 &parent_node_clone
188 );
189 if warrants_termination_inner {
190 break 'process_step_loop;
191 }
192 }
193
194 }
195
196 loggers_terminate_process(
197 self.loggers.iter_mut(),
198 &self.context_and_param,
199 &self.global_state
200 );
201
202 // the process has terminated successfully
203 true
204 }
205
206
207
208 fn process_step_and_check_termination(
209 &mut self,
210 step_to_process : EnqueuedStep<Conf::DomainSpecificStep>,
211 parent_node : &MemorizedNode<Conf::DomainSpecificNode>
212 ) -> bool {
213 let mut step_to_process = step_to_process;
214 // apply the step filters
215 let warrants_termination = match self.filters_manager.apply_step_filters(
216 &self.context_and_param,
217 &self.global_state,
218 &parent_node.domain_specific_node,
219 &step_to_process.domain_specific_step
220 ) {
221 Some(filtration_result) => {
222 // here, a filter was activated
223 // this means that we won't explore further the successors from this specific step
224 // ***
225 // below we notify the loggers
226 let filtration_result_id = self.internal_state.identifier_generator.get_next();
227 loggers_filtered(
228 self.loggers.iter_mut(),
229 &self.context_and_param,
230 step_to_process.parent_node_id,
231 filtration_result_id,
232 &filtration_result
233 );
234 // and we update the global state
235 self.global_state.update_on_filtered(
236 &self.context_and_param,
237 &parent_node.domain_specific_node,
238 &filtration_result
239 );
240 // the filtration may warrant process termination
241 self.global_state.warrants_termination_of_the_process(&self.context_and_param)
242 },
243 None => {
244 // here there are no filter that prevent the firing of the step
245 // ***
246 // because we can process it, this means that the parent node of the step (from which the step is fired)
247 // is guaranteed to have at least one child
248 // thus we update the tracker
249 self.internal_state.node_has_processed_child_tracker.insert(step_to_process.id_as_potential_step_from_parent);
250 // ***
251 // processing the step yields a successor node
252 // thus we process it to get the successor node
253 let successor_node = Conf::AlgorithmOperationHandler::process_new_step(
254 &self.context_and_param,
255 &mut self.global_state,
256 &parent_node.domain_specific_node,
257 &mut step_to_process.domain_specific_step
258 );
259 // now, if the memoization option is active,
260 // we check if this node has already been reached previously
261 // and return the id of the successor node
262 let (successor_node_id,check_termination) = match self.internal_state.node_memoizer.check_memo(&successor_node) {
263 Some(memoized_node_id) => {
264 // here the sucessor node is already known and memoized, so we return its unique id
265 // also because the global state is not updated, termination is not warranted
266 (memoized_node_id,false)
267 },
268 None => {
269 // here the successor node is entirely new
270 // so we create a new unique identifier
271 let new_node_id = self.internal_state.identifier_generator.get_next();
272 // we pre-process the new node
273 self.pre_process_new_node(
274 &successor_node,
275 new_node_id
276 );
277 // here the fact that we have a new node
278 // requires us to check termination
279 (new_node_id,true)
280 },
281 };
282 // now that we have the "successor_node_id", we can log the new step
283 loggers_new_step(
284 self.loggers.iter_mut(),
285 &self.context_and_param,
286 step_to_process.parent_node_id,
287 &step_to_process.domain_specific_step,
288 successor_node_id,
289 &successor_node
290 );
291 // ***
292 // and we propagate "warrants_termination"
293 if check_termination {
294 // here we process the new node further
295 // and incidentally check termination
296 self.process_new_node_and_check_termination(
297 successor_node,
298 successor_node_id
299 )
300 } else {
301 false
302 }
303 }
304 };
305 // ***
306 if parent_node.remaining_child_steps_ids_to_process.is_empty() {
307 let parent_had_at_least_one_processed_child = self.internal_state.node_has_processed_child_tracker.remove(
308 &step_to_process.id_as_potential_step_from_parent
309 );
310 if !parent_had_at_least_one_processed_child {
311 // for the HCS queue to know the node id'ed by parent_id is terminal
312 self.delegate.queue_set_last_reached_has_no_child();
313 }
314 loggers_notify_last_child_step_of_node_processed(
315 self.loggers.iter_mut(),
316 &self.context_and_param,
317 step_to_process.parent_node_id
318 )
319 }
320 // and we propagate "warrants_termination"
321 warrants_termination
322 }
323
324
325 /**
326 * We preprocess the new node that it to be considered.
327 * We separate this code from "process_new_node_and_check_termination"
328 * so that we may only use a reference to the new node
329 * and notify the loggers of the new node
330 * before notifying the loggers of the new step between the parent node and this new node
331 * **/
332 fn pre_process_new_node(
333 &mut self,
334 new_node : &Conf::DomainSpecificNode,
335 new_node_id : u32) {
336 // we notify the memoizer of the new node (actually memoizes only if the memoizer is active)
337 self.internal_state.node_memoizer.memoize_new_node(new_node,new_node_id);
338 // we notify the loggers of the new node
339 loggers_new_node(
340 self.loggers.iter_mut(),
341 &self.context_and_param,
342 new_node_id,
343 new_node
344 );
345 // we update the global state
346 self.global_state.update_on_node_reached(
347 &self.context_and_param,
348 new_node
349 );
350 }
351
352
353 fn process_new_node_and_check_termination(
354 &mut self,
355 new_node : Conf::DomainSpecificNode,
356 new_node_id : u32
357 ) -> bool {
358 // updating the global state may warrant termination
359 if self.global_state.warrants_termination_of_the_process(&self.context_and_param) {
360 return true;
361 }
362 // ***
363 // here it does not warrant termination
364 // so we process the new node further
365 // ***
366 // we apply the node pre filters
367 let (has_no_children,warrants_termination) = match self.filters_manager.apply_node_pre_filters(
368 &self.context_and_param,
369 &self.global_state,
370 &new_node
371 ) {
372 Some(filtration_result) => {
373 // here, a filter was activated
374 // this means that we won't explore further the successors from this specific node
375 // ***
376 // below we notify the loggers of the filtration
377 let filtration_result_id = self.internal_state.identifier_generator.get_next();
378 loggers_filtered(
379 self.loggers.iter_mut(),
380 &self.context_and_param,
381 new_node_id,
382 filtration_result_id,
383 &filtration_result
384 );
385 // and we update the global state
386 self.global_state.update_on_filtered(
387 &self.context_and_param,
388 &new_node,
389 &filtration_result
390 );
391 // the filtration may warrant process termination
392 let warrants_termination = self.global_state.warrants_termination_of_the_process(&self.context_and_param);
393 // ***
394 (true,warrants_termination)
395 },
396 None => {
397 // here no node pre filters were activated
398 // so we can collect the next steps that may be fired from that node
399 let next_steps = Conf::AlgorithmOperationHandler::collect_next_steps(
400 &self.context_and_param,
401 &mut self.global_state,
402 &new_node
403 );
404 // we update the global state
405 self.global_state.update_on_next_steps_collected_reached(
406 &self.context_and_param,
407 &new_node,
408 &next_steps,
409 );
410 // we apply the node post filters
411 match self.filters_manager.apply_node_post_filters(
412 &self.context_and_param,
413 &self.global_state,
414 &new_node,
415 &next_steps
416 ) {
417 Some(filtration_result) => {
418 // here, a filter was activated
419 // this means that we won't explore further the successors from this specific node
420 // ***
421 // below we notify the loggers of the filtration
422 let filtration_result_id = self.internal_state.identifier_generator.get_next();
423 loggers_filtered(
424 self.loggers.iter_mut(),
425 &self.context_and_param,
426 new_node_id,
427 filtration_result_id,
428 &filtration_result
429 );
430 // and we update the global state
431 self.global_state.update_on_filtered(
432 &self.context_and_param,
433 &new_node,
434 &filtration_result
435 );
436 // the filtration may warrant process termination
437 let warrants_termination = self.global_state.warrants_termination_of_the_process(&self.context_and_param);
438 // ***
439 (true,warrants_termination)
440 },
441 None => {
442 let warrants_termination = false;
443 // here no node post filters were activated
444 // this means we can enqueue all these next steps
445 // if there are any
446 let has_no_children = if next_steps.is_empty() {
447 true
448 } else {
449 let mut to_enqueue = vec![];
450 let mut max_id_of_child = 0;
451 for domain_specific_step in next_steps {
452 max_id_of_child += 1;
453 to_enqueue.push(
454 EnqueuedStep::new(
455 new_node_id,
456 max_id_of_child,
457 domain_specific_step
458 )
459 );
460 }
461 let remaining_ids_to_process : HashSet<u32> = HashSet::from_iter((1..(max_id_of_child+1)).collect::<Vec<u32>>().iter().cloned() );
462 let memorized_node = MemorizedNode::new(
463 new_node,
464 remaining_ids_to_process
465 );
466 self.delegate.enqueue_new_steps(
467 memorized_node,
468 new_node_id,
469 to_enqueue
470 );
471 false
472 };
473 (has_no_children,warrants_termination)
474 }
475 }
476 }
477 };
478 if has_no_children {
479 // the node does not have any children : it is a terminal node
480 // notifies the queue
481 self.delegate.queue_set_last_reached_has_no_child();
482 // notifies the loggers
483 loggers_notify_node_without_children(
484 self.loggers.iter_mut(),
485 &self.context_and_param,
486 new_node_id
487 );
488 }
489 // and we propagate "warrants_termination"
490 warrants_termination
491 }
492
493}