hinge 0.1.0

SQL-native ELT engine — dependency graph resolved automatically from FROM/JOIN clauses, parallel execution, single binary
Documentation
use crate::domain::{Asset, Graph};
use std::collections::{HashMap, HashSet, VecDeque};

pub struct Engine {
    graph: Graph,
}

impl Engine {
    pub fn new(graph: Graph) -> Self {
        Self { graph }
    }

    /// All nodes in topological order. Useful for display / dry-run.
    pub fn all_nodes(&self) -> Vec<Asset> {
        self.graph.topo_sort()
    }

    /// Strict ancestors of `start` in topo order.
    pub fn propagate_upstream(&self, start: &Asset) -> Vec<Asset> {
        let set = self.ancestors(start);
        self.topo_sort_subset(&set)
    }

    /// Strict descendants of `start` in topo order.
    pub fn propagate_downstream(&self, start: &Asset) -> Vec<Asset> {
        let set = self.descendants(start);
        self.topo_sort_subset(&set)
    }

    // ── level-based (used by execute()) ───────────────────────────────────────

    /// All nodes grouped into parallel waves (wave N+1 waits for wave N).
    pub fn topo_levels(&self) -> Vec<Vec<Asset>> {
        self.graph.topo_levels()
    }

    /// Ancestors of `start` grouped into parallel waves.
    pub fn upstream_levels(&self, start: &Asset) -> Vec<Vec<Asset>> {
        let set = self.ancestors(start);
        self.filter_levels(set)
    }

    /// Descendants of `start` grouped into parallel waves.
    pub fn downstream_levels(&self, start: &Asset) -> Vec<Vec<Asset>> {
        let set = self.descendants(start);
        self.filter_levels(set)
    }

    fn filter_levels(&self, subset: HashSet<Asset>) -> Vec<Vec<Asset>> {
        self.graph
            .topo_levels()
            .into_iter()
            .map(|level| level.into_iter().filter(|a| subset.contains(a)).collect::<Vec<_>>())
            .filter(|level| !level.is_empty())
            .collect()
    }

    fn ancestors(&self, start: &Asset) -> HashSet<Asset> {
        self.bfs_traverse(start, |node| self.graph.dependencies_of(node).to_vec())
    }

    fn descendants(&self, start: &Asset) -> HashSet<Asset> {
        self.bfs_traverse(start, |node| self.graph.children_of(node).to_vec())
    }

    fn bfs_traverse(&self, start: &Asset, neighbors: impl Fn(&Asset) -> Vec<Asset>) -> HashSet<Asset> {
        let mut visited = HashSet::new();
        let mut queue = VecDeque::new();
        queue.push_back(start.clone());
        while let Some(node) = queue.pop_front() {
            for neighbor in neighbors(&node) {
                if visited.insert(neighbor.clone()) {
                    queue.push_back(neighbor);
                }
            }
        }
        visited
    }

    // Kahn's algorithm restricted to `subset` — O(A + EA) vs O(V + E) for full topo.
    fn topo_sort_subset(&self, subset: &HashSet<Asset>) -> Vec<Asset> {
        let mut in_degree: HashMap<Asset, usize> = subset
            .iter()
            .map(|node| {
                let count = self.graph.dependencies_of(node)
                    .iter()
                    .filter(|dep| subset.contains(*dep))
                    .count();
                (node.clone(), count)
            })
            .collect();

        let mut queue: VecDeque<Asset> = in_degree
            .iter()
            .filter(|&(_, d)| *d == 0)
            .map(|(a, _)| a.clone())
            .collect();

        let mut result = Vec::with_capacity(subset.len());
        while let Some(node) = queue.pop_front() {
            for child in self.graph.children_of(&node) {
                if let Some(d) = in_degree.get_mut(child) {
                    *d -= 1;
                    if *d == 0 {
                        queue.push_back(child.clone());
                    }
                }
            }
            result.push(node);
        }
        result
    }
}