psmatcher 0.4.0-alpha.0

A pub/sub matcher algorithm implementation
Documentation
// Copyright (c) Subzero Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{
    collections::*,
    error::MatcherError,
    event::Event,
    predicate::ElementaryPredicate,
    subscription::{Subscription, SubscriptionKey},
};

/// A node in the matching tree
#[derive(Debug, Clone)]
pub struct TreeNode<T: SubscriptionKey> {
    /// The predicate associated with this node
    pub predicate: Option<ElementaryPredicate>,

    /// Child nodes for when the predicate evaluates to true
    pub true_children: Vec<usize>,

    /// Child nodes for when the predicate evaluates to false
    pub false_children: Vec<usize>,

    /// Subscriptions that are satisfied when this node is reached
    pub subscriptions: HashSet<T>,
}

impl<T: SubscriptionKey> TreeNode<T> {
    /// Creates a new tree node
    pub fn new(predicate: Option<ElementaryPredicate>) -> Self {
        Self {
            predicate,
            true_children: Vec::new(),
            false_children: Vec::new(),
            subscriptions: HashSet::new(),
        }
    }
}

/// The matching tree for efficient event matching
///
/// This implementation follows the tree-based algorithm described in Aguilera et al. (1999)
/// for content-based subscription systems. The tree structure optimizes the matching process
/// by organizing subscriptions based on their predicates, allowing for efficient traversal
/// during event matching.
///
/// ## Tree Structure
///
/// - Each node in the tree represents a predicate test
/// - Nodes have branches for both true and false evaluation results
/// - Subscriptions are stored at nodes where all their predicates are satisfied
/// - The tree is traversed during matching, evaluating predicates at each node
///
/// ## Performance Characteristics
///
/// The tree-based approach significantly reduces the computational complexity of matching:
///
/// - Time complexity: O(d + m) where d is the depth of the tree and m is the number of matching subscriptions
/// - Space complexity: O(n * p) where n is the number of subscriptions and p is the average number of predicates
///
/// This is more efficient than the naive approach of evaluating each subscription individually,
/// which would have O(n * p) time complexity for matching.
#[derive(Debug, Clone)]
pub struct MatchingTree<T: SubscriptionKey> {
    /// The nodes in the tree
    pub nodes: Vec<TreeNode<T>>,

    /// Map from subscription ID to the subscription
    pub subscriptions: HashMap<T, Subscription<T>>,

    /// Index of the root node
    pub root: usize,
}

impl<T: SubscriptionKey> MatchingTree<T> {
    /// Creates a new empty matching tree
    pub fn new() -> Self {
        // Create the root node with no predicate
        let root_node = TreeNode::new(None);

        Self {
            nodes: vec![root_node],
            subscriptions: HashMap::new(),
            root: 0,
        }
    }

    /// Adds a subscription to the tree
    ///
    /// This method integrates a subscription into the matching tree structure by:
    /// 1. Processing each predicate in the subscription
    /// 2. Finding or creating nodes in the tree for each predicate
    /// 3. Storing the subscription ID at the appropriate leaf nodes
    ///
    /// The tree structure is maintained to optimize the matching process by sharing
    /// common predicates among subscriptions.
    pub fn add_subscription(&mut self, subscription: Subscription<T>) {
        // Store the subscription
        let sub_id = subscription.id.clone();

        // If the subscription has no predicates, add it to the root node
        if subscription.predicates.is_empty() {
            self.nodes[self.root].subscriptions.insert(sub_id.clone());
            self.subscriptions.insert(sub_id, subscription);
            return;
        }

        // Start from the root node
        let mut current_nodes = vec![self.root];

        // Process each predicate in the subscription
        for predicate in &subscription.predicates {
            let mut next_nodes = Vec::new();

            for &node_idx in &current_nodes {
                // Try to find an existing node with the same predicate
                let mut found = false;

                // Check true children
                for &child_idx in &self.nodes[node_idx].true_children {
                    if let Some(child_pred) = &self.nodes[child_idx].predicate {
                        if child_pred == predicate {
                            next_nodes.push(child_idx);
                            found = true;
                            break;
                        }
                    }
                }

                // Check false children
                if !found {
                    for &child_idx in &self.nodes[node_idx].false_children {
                        if let Some(child_pred) = &self.nodes[child_idx].predicate {
                            if child_pred == predicate {
                                next_nodes.push(child_idx);
                                found = true;
                                break;
                            }
                        }
                    }
                }

                // If no existing node found, create a new one
                if !found {
                    let new_node = TreeNode::new(Some(predicate.clone()));
                    let new_idx = self.nodes.len();
                    self.nodes.push(new_node);

                    // Add as a child to the current node
                    self.nodes[node_idx].true_children.push(new_idx);
                    next_nodes.push(new_idx);
                }
            }

            current_nodes = next_nodes;
        }

        // Add the subscription to all leaf nodes
        for &node_idx in &current_nodes {
            self.nodes[node_idx].subscriptions.insert(sub_id.clone());
        }

        // Store the subscription
        self.subscriptions.insert(sub_id, subscription);
    }

    /// Removes a subscription from the tree
    ///
    /// This method removes a subscription from all nodes in the tree and from the
    /// subscription map. The tree structure remains unchanged.
    pub fn remove_subscription(&mut self, subscription_id: &T) -> Result<(), MatcherError> {
        // Check if the subscription exists
        if !self.subscriptions.contains_key(subscription_id) {
            return Err(MatcherError::Other(format!(
                "Subscription {subscription_id:?} not found"
            )));
        }

        // Remove the subscription from all nodes
        for node in &mut self.nodes {
            node.subscriptions.remove(subscription_id);
        }

        // Remove the subscription from the map
        self.subscriptions.remove(subscription_id);

        Ok(())
    }

    /// Matches an event against the tree and returns the IDs of matching subscriptions
    ///
    /// This method implements the core matching algorithm from Aguilera et al. (1999):
    /// 1. Traverse the tree starting from the root
    /// 2. At each node, evaluate the predicate against the event
    /// 3. Follow the appropriate branches based on the evaluation result
    /// 4. Collect potential matches from the subscriptions at visited nodes
    /// 5. Perform a final verification to ensure all predicates are satisfied
    ///
    /// The final verification step is crucial for correctness, as the tree traversal
    /// may identify false positives that need to be filtered out.
    pub fn match_event(&self, event: &impl Event) -> Result<HashSet<T>, MatcherError> {
        let mut matches = HashSet::new();
        let mut to_visit = vec![self.root];

        while let Some(node_idx) = to_visit.pop() {
            let node = &self.nodes[node_idx];

            // Add any subscriptions at this node
            matches.extend(node.subscriptions.iter().cloned());

            // If this is a leaf node (no predicate), continue
            if node.predicate.is_none() {
                // Add all children to visit
                to_visit.extend(node.true_children.iter().cloned());
                to_visit.extend(node.false_children.iter().cloned());
                continue;
            }

            // Evaluate the predicate
            let predicate = node.predicate.as_ref().unwrap();
            match predicate.evaluate(event) {
                Ok(true) => {
                    // Add true children to visit
                    to_visit.extend(node.true_children.iter().cloned());
                }
                Ok(false) => {
                    // Add false children to visit
                    to_visit.extend(node.false_children.iter().cloned());
                }
                Err(e) => return Err(e),
            }
        }

        // Filter out subscriptions that don't match all their predicates
        let mut final_matches = HashSet::new();
        for sub_id in matches {
            if let Some(subscription) = self.subscriptions.get(&sub_id) {
                if subscription.matches(event)? {
                    final_matches.insert(sub_id);
                }
            }
        }

        Ok(final_matches)
    }

    /// Optimizes the tree structure for more efficient matching
    ///
    /// This method should implement optimization techniques described in Aguilera et al. (1999)
    /// to improve the performance of the matching process.
    pub fn optimize(&mut self) -> Result<(), MatcherError> {
        // This is a placeholder for a more sophisticated optimization algorithm
        // In a real implementation, you might:
        // 1. Reorder predicates based on selectivity
        // 2. Merge nodes with the same predicate
        // 3. Remove redundant nodes
        // 4. Balance the tree

        Ok(())
    }

    /// Returns statistics about the tree
    pub fn stats(&self) -> TreeStats {
        let mut stats = TreeStats {
            node_count: self.nodes.len(),
            subscription_count: self.subscriptions.len(),
            max_depth: 0,
            avg_branching_factor: 0.0,
        };

        // Calculate max depth and branching factor
        let max_depth = 0;
        let mut total_children = 0;

        for node in &self.nodes {
            let children_count = node.true_children.len() + node.false_children.len();
            total_children += children_count;
        }

        if self.nodes.len() > 1 {
            stats.avg_branching_factor = total_children as f64 / (self.nodes.len() - 1) as f64;
        }

        // Calculate max depth (simplified)
        stats.max_depth = max_depth;

        stats
    }
}

impl<T: SubscriptionKey> Default for MatchingTree<T> {
    fn default() -> Self {
        Self::new()
    }
}

/// Statistics about the matching tree
#[derive(Debug, Clone)]
pub struct TreeStats {
    /// Number of nodes in the tree
    pub node_count: usize,

    /// Number of subscriptions in the tree
    pub subscription_count: usize,

    /// Maximum depth of the tree
    pub max_depth: usize,

    /// Average branching factor of the tree
    pub avg_branching_factor: f64,
}