graph_process_manager_core/process/
manager.rs

1/*
2Copyright 2020 Erwan Mahe (github.com/erwanM974)
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17use std::collections::HashSet;
18
19
20use crate::process::config::AbstractProcessConfiguration;
21use crate::process::logger::AbstractProcessLogger;
22use crate::queue::delegate::ProcessQueueDelegate;
23use crate::queue::memorized_node::MemorizedNode;
24use crate::queue::priorities::GenericProcessPriorities;
25use crate::queue::queued_step::EnqueuedStep;
26use crate::queue::strategy::QueueSearchStrategy;
27
28use crate::process::persistent_state::AbstractProcessMutablePersistentState;
29use crate::process::handler::AbstractAlgorithmOperationHandler;
30
31use super::filter::GenericFiltersManager;
32use super::identifier::UniqueIdentifierGenerator;
33use super::logger::*;
34use super::node_memoizer::NodeMemoizer;
35
36
37
38/** 
39 * Keeps track of the internal state (not domain-specific) of the process.
40 * **/
41pub(crate) struct ProcessManagerInternalStateManager<Conf : AbstractProcessConfiguration> {
42    /// before the process starts, the initial node is kept in an Option
43    pub initial_node_if_not_yet_started : Option<Conf::DomainSpecificNode>,
44    /// this generator guarantees uniqueness of the identifiers of the nodes
45    pub identifier_generator : UniqueIdentifierGenerator,
46    /// keeps track of nodes that have at least one child
47    /// this is used for the HCS queue and "loggers_notify_last_child_step_of_node_processed"
48    /// once all the children have been processed this is garbage collected 
49    pub node_has_processed_child_tracker : HashSet<u32>,
50    /// for memoizing nodes and exploring the process as a graph instead of a tree
51    pub node_memoizer : NodeMemoizer<Conf>,
52}
53
54impl<Conf: AbstractProcessConfiguration> ProcessManagerInternalStateManager<Conf> {
55    pub fn new(
56        initial_node: Conf::DomainSpecificNode, 
57        node_memoizer: NodeMemoizer<Conf>
58    ) -> Self {
59        Self { 
60            initial_node_if_not_yet_started : Some(initial_node), 
61            identifier_generator : UniqueIdentifierGenerator::default(),
62            node_has_processed_child_tracker : HashSet::new(),
63            node_memoizer 
64        }
65    }
66}
67
68
69
70/** 
71 * Entity responsible of the execution of the overall process.
72 * **/
73pub struct GenericProcessManager<Conf : AbstractProcessConfiguration> {
74    pub context_and_param : Conf::ContextAndParameterization,
75    // ***
76    delegate : ProcessQueueDelegate<Conf::DomainSpecificStep,Conf::DomainSpecificNode,Conf::Priorities>,
77    // ***
78    pub global_state : Conf::MutablePersistentState,
79    // ***
80    filters_manager : GenericFiltersManager<Conf>,
81    // ***
82    pub loggers : Vec<Box< dyn AbstractProcessLogger<Conf>>>,
83    // ***
84    internal_state : ProcessManagerInternalStateManager<Conf>
85}
86
87
88
89impl<Conf : 'static + AbstractProcessConfiguration> GenericProcessManager<Conf> {
90
91    pub fn new(
92        context_and_param : Conf::ContextAndParameterization,
93        strategy: QueueSearchStrategy,
94        priorities: GenericProcessPriorities<Conf::Priorities>,
95        filters_manager : GenericFiltersManager<Conf>,
96        loggers : Vec<Box< dyn AbstractProcessLogger<Conf>>>,
97        is_memoized : bool,
98        initial_node : Conf::DomainSpecificNode
99    ) -> GenericProcessManager<Conf> {
100        let initial_global_state = Conf::MutablePersistentState::get_initial_state(
101            &context_and_param,
102            &initial_node
103        );
104        let internal_state = ProcessManagerInternalStateManager::new(
105            initial_node, 
106            NodeMemoizer::new(is_memoized)
107        );
108        GenericProcessManager{
109            context_and_param,
110            delegate : ProcessQueueDelegate::new(strategy, priorities),
111            global_state : initial_global_state,
112            filters_manager,
113            loggers,
114            internal_state
115        }
116    }
117
118    pub fn get_logger(&self, logger_id : usize) -> Option<&dyn AbstractProcessLogger<Conf>> {
119        self.loggers.get(logger_id).map(|x| &**x)
120    }
121
122    pub fn start_process(
123        &mut self
124    ) -> bool {
125
126        if self.internal_state.initial_node_if_not_yet_started.is_none() {
127            return false;
128        }
129
130        loggers_initialize(
131            self.loggers.iter_mut(),
132            &self.context_and_param,
133            self.delegate.get_strategy(),
134            self.delegate.get_priorities(),
135            &self.filters_manager,
136            &self.global_state,
137            self.internal_state.node_memoizer.is_memoized()
138        );
139
140        let initial_node = self.internal_state.initial_node_if_not_yet_started.take().unwrap();
141        
142        let warrants_termination = {
143            let new_node_id = self.internal_state.identifier_generator.get_next();
144            self.pre_process_new_node(
145                &initial_node,
146                new_node_id
147            );
148            self.process_new_node_and_check_termination(
149                initial_node,
150                new_node_id
151            )
152        };
153
154        if !warrants_termination {
155
156            'process_step_loop : while let Some(
157                (step_to_process,mut opt_parent_node)
158            ) = self.delegate.extract_from_queue() {
159                
160                {
161                    // this is isolated to avoid borrow checker problems
162
163                    let parent_node =
164                    opt_parent_node.as_mut().unwrap_or_else(|| self.delegate.get_mut_memorized_node(step_to_process.parent_node_id));
165                    
166                    // we will process the step that may be fired from the parent node
167                    // in any case, we update the parent node's remainign to process childrens
168                    parent_node.remaining_child_steps_ids_to_process.remove(&step_to_process.id_as_potential_step_from_parent);
169                }
170
171                // we need an immutable reference to the parent node
172                // but it may be under self.delegate
173                // so then when calling "self.process_step_and_check_termination(step_to_process,parent_node)"
174                // we run into borrow checker problem
175                // for now the solution is to clone the node even though not ideal
176                let parent_node_clone = match opt_parent_node {
177                    None => {
178                        self.delegate.get_memorized_node(step_to_process.parent_node_id).clone()
179                    },
180                    Some(x) => {
181                        x
182                    }
183                };
184
185                let warrants_termination_inner = self.process_step_and_check_termination(
186                    step_to_process,
187                    &parent_node_clone
188                );
189                if warrants_termination_inner {
190                    break 'process_step_loop;
191                }
192            }
193
194        }
195
196        loggers_terminate_process(
197            self.loggers.iter_mut(),
198            &self.context_and_param,
199            &self.global_state
200        );
201
202        // the process has terminated successfully
203        true 
204    }
205
206    
207
208    fn process_step_and_check_termination(
209        &mut self,
210        step_to_process : EnqueuedStep<Conf::DomainSpecificStep>,
211        parent_node : &MemorizedNode<Conf::DomainSpecificNode>
212    ) -> bool {
213        let mut step_to_process = step_to_process;
214        // apply the step filters
215        let warrants_termination = match self.filters_manager.apply_step_filters(
216            &self.context_and_param,
217            &self.global_state,
218            &parent_node.domain_specific_node,
219            &step_to_process.domain_specific_step
220        ) {
221            Some(filtration_result) => {
222                // here, a filter was activated
223                // this means that we won't explore further the successors from this specific step
224                // ***
225                // below we notify the loggers
226                let filtration_result_id = self.internal_state.identifier_generator.get_next();
227                loggers_filtered(
228                    self.loggers.iter_mut(), 
229                    &self.context_and_param,
230                    step_to_process.parent_node_id,
231                    filtration_result_id, 
232                    &filtration_result
233                );
234                // and we update the global state
235                self.global_state.update_on_filtered(
236                    &self.context_and_param,
237                    &parent_node.domain_specific_node,
238                    &filtration_result
239                );
240                // the filtration may warrant process termination
241                self.global_state.warrants_termination_of_the_process(&self.context_and_param)
242            },
243            None => {
244                // here there are no filter that prevent the firing of the step
245                // ***
246                // because we can process it, this means that the parent node of the step (from which the step is fired)
247                // is guaranteed to have at least one child
248                // thus we update the tracker
249                self.internal_state.node_has_processed_child_tracker.insert(step_to_process.id_as_potential_step_from_parent);
250                // ***
251                // processing the step yields a successor node
252                // thus we process it to get the successor node
253                let successor_node = Conf::AlgorithmOperationHandler::process_new_step(
254                    &self.context_and_param,
255                    &mut self.global_state,
256                    &parent_node.domain_specific_node,
257                    &mut step_to_process.domain_specific_step
258                );
259                // now, if the memoization option is active,
260                // we check if this node has already been reached previously
261                // and return the id of the successor node
262                let (successor_node_id,check_termination) = match self.internal_state.node_memoizer.check_memo(&successor_node) {
263                    Some(memoized_node_id) => {
264                        // here the sucessor node is already known and memoized, so we return its unique id
265                        // also because the global state is not updated, termination is not warranted
266                        (memoized_node_id,false)
267                    },
268                    None => {
269                        // here the successor node is entirely new
270                        // so we create a new unique identifier
271                        let new_node_id = self.internal_state.identifier_generator.get_next();
272                        // we pre-process the new node
273                        self.pre_process_new_node(
274                            &successor_node,
275                            new_node_id
276                        );
277                        // here the fact that we have a new node
278                        // requires us to check termination
279                        (new_node_id,true)
280                    },
281                };
282                // now that we have the "successor_node_id", we can log the new step
283                loggers_new_step(
284                    self.loggers.iter_mut(),
285                    &self.context_and_param,
286                    step_to_process.parent_node_id,
287                    &step_to_process.domain_specific_step,
288                    successor_node_id,
289                    &successor_node
290                );
291                // ***
292                // and we propagate "warrants_termination"
293                if check_termination {
294                    // here we process the new node further
295                    // and incidentally check termination
296                    self.process_new_node_and_check_termination(
297                        successor_node,
298                        successor_node_id
299                    )
300                } else {
301                    false
302                }
303            }
304        };
305        // ***
306        if parent_node.remaining_child_steps_ids_to_process.is_empty() {
307            let parent_had_at_least_one_processed_child = self.internal_state.node_has_processed_child_tracker.remove(
308                &step_to_process.id_as_potential_step_from_parent
309            );
310            if !parent_had_at_least_one_processed_child {
311                // for the HCS queue to know the node id'ed by parent_id is terminal
312                self.delegate.queue_set_last_reached_has_no_child();
313            }
314            loggers_notify_last_child_step_of_node_processed(
315                self.loggers.iter_mut(),
316                &self.context_and_param,
317                step_to_process.parent_node_id
318            )
319        }
320        // and we propagate "warrants_termination"
321        warrants_termination
322    }
323
324
325    /** 
326     * We preprocess the new node that it to be considered.
327     * We separate this code from "process_new_node_and_check_termination"
328     * so that we may only use a reference to the new node
329     * and notify the loggers of the new node
330     * before notifying the loggers of the new step between the parent node and this new node
331     * **/
332    fn pre_process_new_node(
333        &mut self,
334        new_node : &Conf::DomainSpecificNode,
335        new_node_id : u32) {
336        // we notify the memoizer of the new node (actually memoizes only if the memoizer is active)
337        self.internal_state.node_memoizer.memoize_new_node(new_node,new_node_id);
338        // we notify the loggers of the new node
339        loggers_new_node(
340            self.loggers.iter_mut(),
341            &self.context_and_param, 
342            new_node_id, 
343            new_node
344        );
345        // we update the global state
346        self.global_state.update_on_node_reached(
347            &self.context_and_param,
348            new_node
349        );
350    }
351
352
353    fn process_new_node_and_check_termination(
354        &mut self,
355        new_node : Conf::DomainSpecificNode,
356        new_node_id : u32
357    ) -> bool {
358        // updating the global state may warrant termination
359        if self.global_state.warrants_termination_of_the_process(&self.context_and_param) {
360            return true;
361        }
362        // ***
363        // here it does not warrant termination
364        // so we process the new node further
365        // ***
366        // we apply the node pre filters
367        let (has_no_children,warrants_termination) = match self.filters_manager.apply_node_pre_filters(
368            &self.context_and_param,
369            &self.global_state,
370            &new_node
371        ) {
372            Some(filtration_result) => {
373                // here, a filter was activated
374                // this means that we won't explore further the successors from this specific node
375                // ***
376                // below we notify the loggers of the filtration
377                let filtration_result_id = self.internal_state.identifier_generator.get_next();
378                loggers_filtered(
379                    self.loggers.iter_mut(), 
380                    &self.context_and_param,
381                    new_node_id,
382                    filtration_result_id, 
383                    &filtration_result
384                );
385                // and we update the global state
386                self.global_state.update_on_filtered(
387                    &self.context_and_param,
388                    &new_node,
389                    &filtration_result
390                );
391                // the filtration may warrant process termination
392                let warrants_termination = self.global_state.warrants_termination_of_the_process(&self.context_and_param);
393                // ***
394                (true,warrants_termination)
395            },
396            None => {
397                // here no node pre filters were activated
398                // so we can collect the next steps that may be fired from that node
399                let next_steps = Conf::AlgorithmOperationHandler::collect_next_steps(
400                    &self.context_and_param,
401                    &mut self.global_state,
402                    &new_node
403                );
404                // we update the global state
405                self.global_state.update_on_next_steps_collected_reached(
406                    &self.context_and_param, 
407                    &new_node, 
408                    &next_steps,
409                );
410                // we apply the node post filters
411                match self.filters_manager.apply_node_post_filters(
412                    &self.context_and_param,
413                    &self.global_state,
414                    &new_node,
415                    &next_steps
416                ) {
417                    Some(filtration_result) => {
418                        // here, a filter was activated
419                        // this means that we won't explore further the successors from this specific node
420                        // ***
421                        // below we notify the loggers of the filtration
422                        let filtration_result_id = self.internal_state.identifier_generator.get_next();
423                        loggers_filtered(
424                            self.loggers.iter_mut(), 
425                            &self.context_and_param,
426                            new_node_id,
427                            filtration_result_id, 
428                            &filtration_result
429                        );
430                        // and we update the global state
431                        self.global_state.update_on_filtered(
432                            &self.context_and_param,
433                            &new_node,
434                            &filtration_result
435                        );
436                        // the filtration may warrant process termination
437                        let warrants_termination = self.global_state.warrants_termination_of_the_process(&self.context_and_param);
438                        // ***
439                        (true,warrants_termination)
440                    },
441                    None => {
442                        let warrants_termination = false;
443                        // here no node post filters were activated
444                        // this means we can enqueue all these next steps
445                        // if there are any
446                        let has_no_children = if next_steps.is_empty() {
447                            true
448                        } else {
449                            let mut to_enqueue = vec![];
450                            let mut max_id_of_child = 0;
451                            for domain_specific_step in next_steps {
452                                max_id_of_child += 1;
453                                to_enqueue.push( 
454                                    EnqueuedStep::new(
455                                        new_node_id, 
456                                        max_id_of_child, 
457                                        domain_specific_step
458                                    )
459                                );
460                            }
461                            let remaining_ids_to_process : HashSet<u32> = HashSet::from_iter((1..(max_id_of_child+1)).collect::<Vec<u32>>().iter().cloned() );
462                            let memorized_node = MemorizedNode::new(
463                                new_node,
464                                remaining_ids_to_process
465                            );
466                            self.delegate.enqueue_new_steps(
467                                memorized_node,
468                                new_node_id,
469                                to_enqueue
470                            );
471                            false
472                        };
473                        (has_no_children,warrants_termination)
474                    }
475                }
476            }
477        };
478        if has_no_children {
479            // the node does not have any children : it is a terminal node
480            // notifies the queue
481            self.delegate.queue_set_last_reached_has_no_child();
482            // notifies the loggers
483            loggers_notify_node_without_children(
484                self.loggers.iter_mut(),
485                &self.context_and_param,
486                new_node_id
487            );
488        }
489        // and we propagate "warrants_termination"
490        warrants_termination
491    }
492
493}