graph_process_manager_core 0.4.0

Utilities to explore parts of a tree-like or graph-like structure that is not known in advance
Documentation
/*
Copyright 2020 Erwan Mahe (github.com/erwanM974)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

use std::collections::VecDeque;
use std::sync::Arc;

use crate::process::config::AbstractProcessConfiguration;
use crate::process::event::ExplorationEvent;
use crate::process::persistent_state::AbstractProcessMutablePersistentState;
use crate::queue::delegate::ProcessQueueDelegate;
use crate::queue::memorized_node::MemorizedNode;
use crate::queue::priorities::GenericProcessPriorities;
use crate::queue::queued_step::EnqueuedStep;
use crate::queue::strategy::QueueSearchStrategy;

use super::filter::GenericFiltersManager;
use super::node_memoizer::NodeMemoizer;



pub(crate) struct ProcessManagerInternalState<Conf: AbstractProcessConfiguration> {
    pub initial_node: Option<Conf::DomainSpecificNode>,
    pub node_memoizer: NodeMemoizer<Conf>,
}

impl<Conf: AbstractProcessConfiguration> ProcessManagerInternalState<Conf> {
    pub fn new(initial_node: Conf::DomainSpecificNode, node_memoizer: NodeMemoizer<Conf>) -> Self {
        Self {
            initial_node: Some(initial_node),
            node_memoizer,
        }
    }
}


pub struct GenericProcessManager<Conf: AbstractProcessConfiguration> {
    pub context_and_param: Conf::ContextAndParameterization,
    delegate: ProcessQueueDelegate<Conf::DomainSpecificStep, Conf::DomainSpecificNode, Conf::Priorities>,
    pub global_state: Conf::MutablePersistentState,
    filters_manager: GenericFiltersManager<Conf>,
    internal_state: ProcessManagerInternalState<Conf>,
    pending_events: VecDeque<ExplorationEvent<Conf>>,
    terminated: bool,
    next_id : u32
}


impl<Conf: 'static + AbstractProcessConfiguration> GenericProcessManager<Conf> {

    pub fn new(
        context_and_param: Conf::ContextAndParameterization,
        strategy: QueueSearchStrategy,
        priorities: GenericProcessPriorities<Conf::Priorities>,
        filters_manager: GenericFiltersManager<Conf>,
        is_memoized: bool,
        initial_node: Conf::DomainSpecificNode,
    ) -> Self {
        let global_state = Conf::MutablePersistentState::get_initial_state(
            &context_and_param,
            &initial_node,
        );
        let internal_state = ProcessManagerInternalState::new(
            initial_node,
            NodeMemoizer::new(is_memoized),
        );
        Self {
            context_and_param,
            delegate: ProcessQueueDelegate::new(strategy, priorities),
            global_state,
            filters_manager,
            internal_state,
            pending_events: VecDeque::new(),
            terminated: false,
            next_id : 1
        }
    }

    pub fn get_strategy(&self) -> &QueueSearchStrategy {
        self.delegate.get_strategy()
    }

    pub fn get_filters_manager(&self) -> &GenericFiltersManager<Conf> {
        &self.filters_manager
    }

    pub fn is_memoized(&self) -> bool {
        self.internal_state.node_memoizer.is_memoized()
    }

    pub fn get_priorities(&self) -> &GenericProcessPriorities<Conf::Priorities> {
        self.delegate.get_priorities()
    }
}


impl<Conf: 'static + AbstractProcessConfiguration> Iterator for GenericProcessManager<Conf> {
    type Item = ExplorationEvent<Conf>;

    fn next(&mut self) -> Option<Self::Item> {
        if let Some(event) = self.pending_events.pop_front() {
            return Some(event);
        }
        if self.terminated {
            return None;
        }
        // First call: process the initial node.
        if let Some(initial_node) = self.internal_state.initial_node.take() {
            let node_id = self.next_id;
            self.next_id += 1;
            let arc = Arc::new(initial_node);
            self.emit_new_node(Arc::clone(&arc), node_id);
            if !self.terminated {
                self.process_node(arc, node_id);
            }
        } else {
            self.advance();
        }
        self.pending_events.pop_front()
    }
}


impl<Conf: 'static + AbstractProcessConfiguration> GenericProcessManager<Conf> {

    // Dequeue and handle one step from the queue.
    fn advance(&mut self) {
        let (step, mut opt_parent) = match self.delegate.extract_from_queue() {
            None => {
                self.terminated = true;
                return;
            }
            Some(x) => x,
        };
        let parent_node_id = step.parent_node_id;

        // Arc::clone is a cheap refcount increment — no node data is copied.
        let parent_arc = match opt_parent.as_ref() {
            Some(p) => Arc::clone(&p.domain_specific_node),
            None => Arc::clone(&self.delegate.get_memorized_node(parent_node_id).domain_specific_node),
        };

        let step_not_filtered = self.handle_step(step, &parent_arc, parent_node_id);

        // Update the per-node "had a real child" flag now that we know if the step was filtered.
        if step_not_filtered {
            match opt_parent.as_mut() {
                Some(p) => p.had_at_least_one_processed_child = true,
                None => {
                    self.delegate
                        .get_mut_memorized_node(parent_node_id)
                        .had_at_least_one_processed_child = true;
                }
            }
        }

        // When opt_parent is Some, the queue has exhausted all steps for this parent
        // (it removed the MemorizedNode from its map and returned it here).
        if let Some(parent) = opt_parent {
            if !parent.had_at_least_one_processed_child {
                // Every step from this parent was filtered: it is effectively terminal.
                self.delegate.queue_set_last_reached_has_no_child();
            }
            self.pending_events
                .push_back(ExplorationEvent::AllChildrenProcessed { parent_node_id });
        }
    }


    // Apply step filters, call the handler, and push the relevant events.
    // Returns true if the step was processed (not filtered).
    fn handle_step(
        &mut self,
        step: EnqueuedStep<Conf::DomainSpecificStep>,
        parent_domain_node: &Conf::DomainSpecificNode,
        parent_node_id: u32,
    ) -> bool {
        if let Some(filtration_result) = self.filters_manager.apply_step_filters(
            &self.context_and_param,
            &self.global_state,
            parent_domain_node,
            &step.domain_specific_step,
        ) {
            self.global_state.update_on_filtered(
                &self.context_and_param,
                parent_domain_node,
                &filtration_result,
            );
            self.pending_events.push_back(ExplorationEvent::Filtered {
                parent_node_id,
                filtration_result,
            });
            if self.global_state.warrants_termination_of_the_process(&self.context_and_param) {
                self.terminated = true;
            }
            return false;
        }

        let successor = Conf::process_new_step(
            &self.context_and_param,
            &mut self.global_state,
            parent_domain_node,
            &step.domain_specific_step,
        );

        let (successor_id, is_new) = match self.internal_state.node_memoizer.check_memo(&successor) {
            Some(known_id) => (known_id, false),
            None => {
                let successor_id = self.next_id;
                self.next_id += 1;
                (successor_id, true)
            },
        };

        // Wrap the successor in Arc once; all subsequent uses are cheap refcount increments.
        // None when the node was already memoized (no further exploration needed).
        let arc_if_new = if is_new {
            let arc = Arc::new(successor);
            self.emit_new_node(Arc::clone(&arc), successor_id);
            Some(arc)
        } else {
            None
        };

        self.pending_events.push_back(ExplorationEvent::NewStep {
            origin_node_id: parent_node_id,
            step: step.domain_specific_step,
            target_node_id: successor_id,
        });

        // Decide what to explore from the new node.
        if let Some(arc) = arc_if_new {
            if !self.terminated {
                self.process_node(arc, successor_id);
            }
        }

        true
    }


    // Memoize a new node, push a NewNode event, and update the global state.
    // Takes Arc by value: stores one clone in the memoizer, moves the other into the event.
    fn emit_new_node(&mut self, arc: Arc<Conf::DomainSpecificNode>, node_id: u32) {
        self.internal_state.node_memoizer.memoize_new_node(Arc::clone(&arc), node_id);
        self.global_state.update_on_node_reached(&self.context_and_param, &arc);
        self.pending_events.push_back(ExplorationEvent::NewNode { id: node_id, node: arc });
        if self.global_state.warrants_termination_of_the_process(&self.context_and_param) {
            self.terminated = true;
        }
    }


    // Apply node filters and enqueue children (or mark as terminal).
    // Takes Arc by value so it can be moved into MemorizedNode without cloning.
    fn process_node(&mut self, arc: Arc<Conf::DomainSpecificNode>, node_id: u32) {
        if self.global_state.warrants_termination_of_the_process(&self.context_and_param) {
            return;
        }

        // Pre-filter: runs before collecting steps.
        if let Some(filtration_result) = self.filters_manager.apply_node_pre_filters(
            &self.context_and_param,
            &self.global_state,
            &arc,
        ) {
            self.push_node_filtration(&arc, node_id, filtration_result);
            return;
        }

        let next_steps = Conf::collect_next_steps(
            &self.context_and_param,
            &self.global_state,
            &arc,
        );
        self.global_state.update_on_next_steps_collected_reached(
            &self.context_and_param,
            &arc,
            &next_steps,
        );

        // Post-filter: runs after collecting steps, so it can inspect them.
        if let Some(filtration_result) = self.filters_manager.apply_node_post_filters(
            &self.context_and_param,
            &self.global_state,
            &arc,
            &next_steps,
        ) {
            self.push_node_filtration(&arc, node_id, filtration_result);
            return;
        }

        if next_steps.is_empty() {
            self.delegate.queue_set_last_reached_has_no_child();
            self.pending_events
                .push_back(ExplorationEvent::NodeWithoutChildren { node_id });
        } else {
            let to_enqueue: Vec<EnqueuedStep<Conf::DomainSpecificStep>> = next_steps
                .into_iter()
                .enumerate()
                .map(|(i, s)| EnqueuedStep::new(node_id, i as u32 + 1, s))
                .collect();
            // Move the Arc into MemorizedNode — no clone of the node data.
            let memorized = MemorizedNode::new(arc);
            self.delegate.enqueue_new_steps(memorized, node_id, to_enqueue);
        }
    }


    // Shared logic for when a node filter fires (pre or post).
    fn push_node_filtration(
        &mut self,
        node: &Conf::DomainSpecificNode,
        node_id: u32,
        filtration_result: Conf::FiltrationResult,
    ) {
        self.global_state
            .update_on_filtered(&self.context_and_param, node, &filtration_result);
        self.pending_events.push_back(ExplorationEvent::Filtered {
            parent_node_id: node_id,
            filtration_result,
        });
        if self.global_state.warrants_termination_of_the_process(&self.context_and_param) {
            self.terminated = true;
        }
        self.delegate.queue_set_last_reached_has_no_child();
        self.pending_events
            .push_back(ExplorationEvent::NodeWithoutChildren { node_id });
    }
}