graph_process_manager_core 0.3.1

Utilities to explore parts of a tree-like or graph-like structure that is not known in advance
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
/*
Copyright 2020 Erwan Mahe (github.com/erwanM974)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

use std::collections::HashSet;


use crate::process::config::AbstractProcessConfiguration;
use crate::process::logger::AbstractProcessLogger;
use crate::queue::delegate::ProcessQueueDelegate;
use crate::queue::memorized_node::MemorizedNode;
use crate::queue::priorities::GenericProcessPriorities;
use crate::queue::queued_step::EnqueuedStep;
use crate::queue::strategy::QueueSearchStrategy;

use crate::process::persistent_state::AbstractProcessMutablePersistentState;
use crate::process::handler::AbstractAlgorithmOperationHandler;

use super::filter::GenericFiltersManager;
use super::identifier::UniqueIdentifierGenerator;
use super::logger::*;
use super::node_memoizer::NodeMemoizer;



/** 
 * Keeps track of the internal state (not domain-specific) of the process.
 * **/
pub(crate) struct ProcessManagerInternalStateManager<Conf : AbstractProcessConfiguration> {
    /// before the process starts, the initial node is kept in an Option
    pub initial_node_if_not_yet_started : Option<Conf::DomainSpecificNode>,
    /// this generator guarantees uniqueness of the identifiers of the nodes
    pub identifier_generator : UniqueIdentifierGenerator,
    /// keeps track of nodes that have at least one child
    /// this is used for the HCS queue and "loggers_notify_last_child_step_of_node_processed"
    /// once all the children have been processed this is garbage collected 
    pub node_has_processed_child_tracker : HashSet<u32>,
    /// for memoizing nodes and exploring the process as a graph instead of a tree
    pub node_memoizer : NodeMemoizer<Conf>,
}

impl<Conf: AbstractProcessConfiguration> ProcessManagerInternalStateManager<Conf> {
    pub fn new(
        initial_node: Conf::DomainSpecificNode, 
        node_memoizer: NodeMemoizer<Conf>
    ) -> Self {
        Self { 
            initial_node_if_not_yet_started : Some(initial_node), 
            identifier_generator : UniqueIdentifierGenerator::default(),
            node_has_processed_child_tracker : HashSet::new(),
            node_memoizer 
        }
    }
}



/** 
 * Entity responsible of the execution of the overall process.
 * **/
pub struct GenericProcessManager<Conf : AbstractProcessConfiguration> {
    pub context_and_param : Conf::ContextAndParameterization,
    // ***
    delegate : ProcessQueueDelegate<Conf::DomainSpecificStep,Conf::DomainSpecificNode,Conf::Priorities>,
    // ***
    pub global_state : Conf::MutablePersistentState,
    // ***
    filters_manager : GenericFiltersManager<Conf>,
    // ***
    pub loggers : Vec<Box< dyn AbstractProcessLogger<Conf>>>,
    // ***
    internal_state : ProcessManagerInternalStateManager<Conf>
}



impl<Conf : 'static + AbstractProcessConfiguration> GenericProcessManager<Conf> {

    pub fn new(
        context_and_param : Conf::ContextAndParameterization,
        strategy: QueueSearchStrategy,
        priorities: GenericProcessPriorities<Conf::Priorities>,
        filters_manager : GenericFiltersManager<Conf>,
        loggers : Vec<Box< dyn AbstractProcessLogger<Conf>>>,
        is_memoized : bool,
        initial_node : Conf::DomainSpecificNode
    ) -> GenericProcessManager<Conf> {
        let initial_global_state = Conf::MutablePersistentState::get_initial_state(
            &context_and_param,
            &initial_node
        );
        let internal_state = ProcessManagerInternalStateManager::new(
            initial_node, 
            NodeMemoizer::new(is_memoized)
        );
        GenericProcessManager{
            context_and_param,
            delegate : ProcessQueueDelegate::new(strategy, priorities),
            global_state : initial_global_state,
            filters_manager,
            loggers,
            internal_state
        }
    }

    pub fn get_logger(&self, logger_id : usize) -> Option<&dyn AbstractProcessLogger<Conf>> {
        self.loggers.get(logger_id).map(|x| &**x)
    }

    pub fn start_process(
        &mut self
    ) -> bool {

        if self.internal_state.initial_node_if_not_yet_started.is_none() {
            return false;
        }

        loggers_initialize(
            self.loggers.iter_mut(),
            &self.context_and_param,
            self.delegate.get_strategy(),
            self.delegate.get_priorities(),
            &self.filters_manager,
            &self.global_state,
            self.internal_state.node_memoizer.is_memoized()
        );

        let initial_node = self.internal_state.initial_node_if_not_yet_started.take().unwrap();
        
        let warrants_termination = {
            let new_node_id = self.internal_state.identifier_generator.get_next();
            self.pre_process_new_node(
                &initial_node,
                new_node_id
            );
            self.process_new_node_and_check_termination(
                initial_node,
                new_node_id
            )
        };

        if !warrants_termination {

            'process_step_loop : while let Some(
                (step_to_process,mut opt_parent_node)
            ) = self.delegate.extract_from_queue() {
                
                {
                    // this is isolated to avoid borrow checker problems

                    let parent_node =
                    opt_parent_node.as_mut().unwrap_or_else(|| self.delegate.get_mut_memorized_node(step_to_process.parent_node_id));
                    
                    // we will process the step that may be fired from the parent node
                    // in any case, we update the parent node's remainign to process childrens
                    parent_node.remaining_child_steps_ids_to_process.remove(&step_to_process.id_as_potential_step_from_parent);
                }

                // we need an immutable reference to the parent node
                // but it may be under self.delegate
                // so then when calling "self.process_step_and_check_termination(step_to_process,parent_node)"
                // we run into borrow checker problem
                // for now the solution is to clone the node even though not ideal
                let parent_node_clone = match opt_parent_node {
                    None => {
                        self.delegate.get_memorized_node(step_to_process.parent_node_id).clone()
                    },
                    Some(x) => {
                        x
                    }
                };

                let warrants_termination_inner = self.process_step_and_check_termination(
                    step_to_process,
                    &parent_node_clone
                );
                if warrants_termination_inner {
                    break 'process_step_loop;
                }
            }

        }

        loggers_terminate_process(
            self.loggers.iter_mut(),
            &self.context_and_param,
            &self.global_state
        );

        // the process has terminated successfully
        true 
    }

    

    fn process_step_and_check_termination(
        &mut self,
        step_to_process : EnqueuedStep<Conf::DomainSpecificStep>,
        parent_node : &MemorizedNode<Conf::DomainSpecificNode>
    ) -> bool {
        let mut step_to_process = step_to_process;
        // apply the step filters
        let warrants_termination = match self.filters_manager.apply_step_filters(
            &self.context_and_param,
            &self.global_state,
            &parent_node.domain_specific_node,
            &step_to_process.domain_specific_step
        ) {
            Some(filtration_result) => {
                // here, a filter was activated
                // this means that we won't explore further the successors from this specific step
                // ***
                // below we notify the loggers
                let filtration_result_id = self.internal_state.identifier_generator.get_next();
                loggers_filtered(
                    self.loggers.iter_mut(), 
                    &self.context_and_param,
                    step_to_process.parent_node_id,
                    filtration_result_id, 
                    &filtration_result
                );
                // and we update the global state
                self.global_state.update_on_filtered(
                    &self.context_and_param,
                    &parent_node.domain_specific_node,
                    &filtration_result
                );
                // the filtration may warrant process termination
                self.global_state.warrants_termination_of_the_process(&self.context_and_param)
            },
            None => {
                // here there are no filter that prevent the firing of the step
                // ***
                // because we can process it, this means that the parent node of the step (from which the step is fired)
                // is guaranteed to have at least one child
                // thus we update the tracker
                self.internal_state.node_has_processed_child_tracker.insert(step_to_process.id_as_potential_step_from_parent);
                // ***
                // processing the step yields a successor node
                // thus we process it to get the successor node
                let successor_node = Conf::AlgorithmOperationHandler::process_new_step(
                    &self.context_and_param,
                    &mut self.global_state,
                    &parent_node.domain_specific_node,
                    &mut step_to_process.domain_specific_step
                );
                // now, if the memoization option is active,
                // we check if this node has already been reached previously
                // and return the id of the successor node
                let (successor_node_id,check_termination) = match self.internal_state.node_memoizer.check_memo(&successor_node) {
                    Some(memoized_node_id) => {
                        // here the sucessor node is already known and memoized, so we return its unique id
                        // also because the global state is not updated, termination is not warranted
                        (memoized_node_id,false)
                    },
                    None => {
                        // here the successor node is entirely new
                        // so we create a new unique identifier
                        let new_node_id = self.internal_state.identifier_generator.get_next();
                        // we pre-process the new node
                        self.pre_process_new_node(
                            &successor_node,
                            new_node_id
                        );
                        // here the fact that we have a new node
                        // requires us to check termination
                        (new_node_id,true)
                    },
                };
                // now that we have the "successor_node_id", we can log the new step
                loggers_new_step(
                    self.loggers.iter_mut(),
                    &self.context_and_param,
                    step_to_process.parent_node_id,
                    &step_to_process.domain_specific_step,
                    successor_node_id,
                    &successor_node
                );
                // ***
                // and we propagate "warrants_termination"
                if check_termination {
                    // here we process the new node further
                    // and incidentally check termination
                    self.process_new_node_and_check_termination(
                        successor_node,
                        successor_node_id
                    )
                } else {
                    false
                }
            }
        };
        // ***
        if parent_node.remaining_child_steps_ids_to_process.is_empty() {
            let parent_had_at_least_one_processed_child = self.internal_state.node_has_processed_child_tracker.remove(
                &step_to_process.id_as_potential_step_from_parent
            );
            if !parent_had_at_least_one_processed_child {
                // for the HCS queue to know the node id'ed by parent_id is terminal
                self.delegate.queue_set_last_reached_has_no_child();
            }
            loggers_notify_last_child_step_of_node_processed(
                self.loggers.iter_mut(),
                &self.context_and_param,
                step_to_process.parent_node_id
            )
        }
        // and we propagate "warrants_termination"
        warrants_termination
    }


    /** 
     * We preprocess the new node that it to be considered.
     * We separate this code from "process_new_node_and_check_termination"
     * so that we may only use a reference to the new node
     * and notify the loggers of the new node
     * before notifying the loggers of the new step between the parent node and this new node
     * **/
    fn pre_process_new_node(
        &mut self,
        new_node : &Conf::DomainSpecificNode,
        new_node_id : u32) {
        // we notify the memoizer of the new node (actually memoizes only if the memoizer is active)
        self.internal_state.node_memoizer.memoize_new_node(new_node,new_node_id);
        // we notify the loggers of the new node
        loggers_new_node(
            self.loggers.iter_mut(),
            &self.context_and_param, 
            new_node_id, 
            new_node
        );
        // we update the global state
        self.global_state.update_on_node_reached(
            &self.context_and_param,
            new_node
        );
    }


    fn process_new_node_and_check_termination(
        &mut self,
        new_node : Conf::DomainSpecificNode,
        new_node_id : u32
    ) -> bool {
        // updating the global state may warrant termination
        if self.global_state.warrants_termination_of_the_process(&self.context_and_param) {
            return true;
        }
        // ***
        // here it does not warrant termination
        // so we process the new node further
        // ***
        // we apply the node pre filters
        let (has_no_children,warrants_termination) = match self.filters_manager.apply_node_pre_filters(
            &self.context_and_param,
            &self.global_state,
            &new_node
        ) {
            Some(filtration_result) => {
                // here, a filter was activated
                // this means that we won't explore further the successors from this specific node
                // ***
                // below we notify the loggers of the filtration
                let filtration_result_id = self.internal_state.identifier_generator.get_next();
                loggers_filtered(
                    self.loggers.iter_mut(), 
                    &self.context_and_param,
                    new_node_id,
                    filtration_result_id, 
                    &filtration_result
                );
                // and we update the global state
                self.global_state.update_on_filtered(
                    &self.context_and_param,
                    &new_node,
                    &filtration_result
                );
                // the filtration may warrant process termination
                let warrants_termination = self.global_state.warrants_termination_of_the_process(&self.context_and_param);
                // ***
                (true,warrants_termination)
            },
            None => {
                // here no node pre filters were activated
                // so we can collect the next steps that may be fired from that node
                let next_steps = Conf::AlgorithmOperationHandler::collect_next_steps(
                    &self.context_and_param,
                    &mut self.global_state,
                    &new_node
                );
                // we update the global state
                self.global_state.update_on_next_steps_collected_reached(
                    &self.context_and_param, 
                    &new_node, 
                    &next_steps,
                );
                // we apply the node post filters
                match self.filters_manager.apply_node_post_filters(
                    &self.context_and_param,
                    &self.global_state,
                    &new_node,
                    &next_steps
                ) {
                    Some(filtration_result) => {
                        // here, a filter was activated
                        // this means that we won't explore further the successors from this specific node
                        // ***
                        // below we notify the loggers of the filtration
                        let filtration_result_id = self.internal_state.identifier_generator.get_next();
                        loggers_filtered(
                            self.loggers.iter_mut(), 
                            &self.context_and_param,
                            new_node_id,
                            filtration_result_id, 
                            &filtration_result
                        );
                        // and we update the global state
                        self.global_state.update_on_filtered(
                            &self.context_and_param,
                            &new_node,
                            &filtration_result
                        );
                        // the filtration may warrant process termination
                        let warrants_termination = self.global_state.warrants_termination_of_the_process(&self.context_and_param);
                        // ***
                        (true,warrants_termination)
                    },
                    None => {
                        let warrants_termination = false;
                        // here no node post filters were activated
                        // this means we can enqueue all these next steps
                        // if there are any
                        let has_no_children = if next_steps.is_empty() {
                            true
                        } else {
                            let mut to_enqueue = vec![];
                            let mut max_id_of_child = 0;
                            for domain_specific_step in next_steps {
                                max_id_of_child += 1;
                                to_enqueue.push( 
                                    EnqueuedStep::new(
                                        new_node_id, 
                                        max_id_of_child, 
                                        domain_specific_step
                                    )
                                );
                            }
                            let remaining_ids_to_process : HashSet<u32> = HashSet::from_iter((1..(max_id_of_child+1)).collect::<Vec<u32>>().iter().cloned() );
                            let memorized_node = MemorizedNode::new(
                                new_node,
                                remaining_ids_to_process
                            );
                            self.delegate.enqueue_new_steps(
                                memorized_node,
                                new_node_id,
                                to_enqueue
                            );
                            false
                        };
                        (has_no_children,warrants_termination)
                    }
                }
            }
        };
        if has_no_children {
            // the node does not have any children : it is a terminal node
            // notifies the queue
            self.delegate.queue_set_last_reached_has_no_child();
            // notifies the loggers
            loggers_notify_node_without_children(
                self.loggers.iter_mut(),
                &self.context_and_param,
                new_node_id
            );
        }
        // and we propagate "warrants_termination"
        warrants_termination
    }

}