hinge 0.1.0

SQL-native ELT engine — dependency graph resolved automatically from FROM/JOIN clauses, parallel execution, single binary
Documentation
mod error;

use crate::domain::asset::Asset;
pub use crate::domain::graph::error::GraphError;
use std::collections::HashMap;

/// A directed acyclic graph (DAG) of SQL assets.
///
/// Nodes are [`Asset`] values; a directed edge `A → B` means *B depends on A*
/// (A must execute before B). Built by [`BuildGraph::from_dir`] or from a raw
/// adjacency map via [`Graph::build`].
///
/// Construction enforces:
/// - At least one node (no empty graphs)
/// - No self-dependencies
/// - No cycles
///
/// [`BuildGraph::from_dir`]: crate::BuildGraph::from_dir
#[derive(Debug, Clone)]
pub struct Graph {
    edges: HashMap<Asset, Vec<Asset>>,
    reverse_edges: HashMap<Asset, Vec<Asset>>,
}

impl Graph {
    /// Build a graph from an adjacency map where `edges[A]` lists *children* of A
    /// (i.e. assets that depend on A and must run after A).
    pub fn build(mut edges: HashMap<Asset, Vec<Asset>>) -> Result<Self, GraphError> {
        if edges.is_empty() {
            return Err(GraphError::EmptyGraph);
        }

        let mut reverse_edges: HashMap<Asset, Vec<Asset>> = HashMap::new();

        let keys: Vec<Asset> = edges.keys().cloned().collect();

        for from in keys {
            let children = edges.get(&from).cloned().unwrap_or_default();

            reverse_edges.entry(from.clone()).or_default();

            for to in children {
                if from == to {
                    return Err(GraphError::SelfDependency(from.to_string()));
                }

                reverse_edges
                    .entry(to.clone())
                    .or_default()
                    .push(from.clone());

                edges.entry(to).or_default();
            }
        }

        let graph = Self {
            edges,
            reverse_edges,
        };

        graph.ensure_no_cycles()?;

        Ok(graph)
    }

    fn ensure_no_cycles(&self) -> Result<(), GraphError> {
        if self.topo_sort().len() != self.edges.len() {
            Err(GraphError::CycleDetected)
        } else {
            Ok(())
        }
    }

    /// Returns nodes grouped by execution wave.
    ///
    /// All nodes in wave N can run **in parallel**; wave N+1 starts only after
    /// every node in wave N has completed.
    ///
    /// ```text
    /// wave 0 : [raw]
    /// wave 1 : [staging]
    /// wave 2 : [mart_a, mart_b]   ← independent, safe to run concurrently
    /// ```
    pub fn topo_levels(&self) -> Vec<Vec<Asset>> {
        let mut indegree: HashMap<&Asset, usize> = self
            .reverse_edges
            .iter()
            .map(|(n, parents)| (n, parents.len()))
            .collect();

        let mut current: Vec<&Asset> = indegree
            .iter()
            .filter(|&(_, &deg)| deg == 0)
            .map(|(n, _)| *n)
            .collect();

        let mut levels = Vec::new();

        while !current.is_empty() {
            levels.push(current.iter().map(|&a| a.clone()).collect());

            let mut next = Vec::new();
            for node in &current {
                if let Some(children) = self.edges.get(node) {
                    for child in children {
                        if let Some(deg) = indegree.get_mut(child) {
                            *deg -= 1;
                            if *deg == 0 {
                                next.push(child);
                            }
                        }
                    }
                }
            }
            current = next;
        }

        levels
    }

    /// Returns all nodes in topological order (dependencies before dependents).
    pub fn topo_sort(&self) -> Vec<Asset> {
        self.topo_levels().into_iter().flatten().collect()
    }

    /// Iterate over all nodes in the graph (order unspecified).
    pub fn nodes(&self) -> impl Iterator<Item = &Asset> {
        self.edges.keys()
    }

    /// Find an asset by `schema` + `name`. Returns `None` if not in the graph.
    pub fn find(&self, schema: &str, name: &str) -> Option<&Asset> {
        self.edges
            .keys()
            .find(|a| a.reference().schema() == schema && a.reference().name() == name)
    }

    pub fn children_of(&self, node: &Asset) -> &[Asset] {
        self.edges.get(node).map(Vec::as_slice).unwrap_or(&[])
    }

    pub fn dependencies_of(&self, node: &Asset) -> &[Asset] {
        self.reverse_edges
            .get(node)
            .map(Vec::as_slice)
            .unwrap_or(&[])
    }
}