nornir 0.1.0

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
//! Cross-repo dependency graph — computation, build-order topological
//! sort, and warehouse persistence.
//!
//! Two phases:
//!
//! 1. **Compute** ([`WorkspaceGraph::build`]) — for every repo, run
//!    `cargo_metadata --no-deps` to learn what it `produces`
//!    (workspace member crate names) and `consumes` (external dep
//!    names). For each pair `(A, B)` add edge `A → B` justified by
//!    `A.consumes ∩ B.produces`. petgraph then gives the build order.
//!
//! 2. **Persist** ([`record_dep_graph`] /
//!    [`query_dep_graph_snapshots`] on [`IcebergWarehouse`]) — every
//!    `record` writes one row per `(snapshot, edge, crate)` into the
//!    long-format `dep_graph_edges` Iceberg table. Reads roll the long
//!    rows back into [`DepGraphSnapshot`] structs grouped by edge.
//!
//! Graphs are immutable historical artefacts: a snapshot belongs to
//! Urðr the moment the workspace's Cargo.tomls have settled.

use std::collections::{BTreeMap, BTreeSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;

use anyhow::{anyhow, Context, Result};
use arrow::array::{Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use cargo_metadata::MetadataCommand;
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::Catalog;
use petgraph::algo::toposort;
use petgraph::graph::{DiGraph, NodeIndex};
use uuid::Uuid;

use super::iceberg::IcebergWarehouse;
use crate::workspace::descriptor::WorkspaceDescriptor;

#[derive(Debug, Clone)]
pub struct RepoFacts {
    pub name: String,
    pub root: PathBuf,
    pub produces: BTreeSet<String>,
    pub consumes: BTreeSet<String>,
}

#[derive(Debug, Clone)]
pub struct CrossRepoEdge {
    pub from: String,
    pub to: String,
    /// Crate names that justify this edge — `from` consumes these,
    /// `to` produces them.
    pub via: BTreeSet<String>,
}

#[derive(Debug)]
pub struct WorkspaceGraph {
    pub facts: BTreeMap<String, RepoFacts>,
    pub edges: Vec<CrossRepoEdge>,
    inner: DiGraph<String, usize>,
}

impl WorkspaceGraph {
    pub fn build(desc: &WorkspaceDescriptor) -> Result<Self> {
        let resolved = crate::workspace::resolve::resolve_sources(desc)?;
        let mut facts: BTreeMap<String, RepoFacts> = BTreeMap::new();
        for (name, root) in resolved {
            facts.insert(name.clone(), inspect_repo(&name, &root)?);
        }

        // Inverse index: producing-crate-name → owning repo name.
        let mut producer: BTreeMap<&str, &str> = BTreeMap::new();
        for f in facts.values() {
            for c in &f.produces {
                if let Some(prev) = producer.insert(c.as_str(), f.name.as_str()) {
                    if prev != f.name {
                        return Err(anyhow!(
                            "crate `{c}` is produced by both `{prev}` and `{}` — \
                             workspaces must produce disjoint crate names",
                            f.name
                        ));
                    }
                }
            }
        }

        let mut edges: Vec<CrossRepoEdge> = Vec::new();
        let mut inner: DiGraph<String, usize> = DiGraph::new();
        let mut indices: BTreeMap<String, NodeIndex> = BTreeMap::new();
        for name in facts.keys() {
            indices.insert(name.clone(), inner.add_node(name.clone()));
        }

        for from_facts in facts.values() {
            let mut grouped: BTreeMap<&str, BTreeSet<String>> = BTreeMap::new();
            for consumed in &from_facts.consumes {
                if let Some(&owner) = producer.get(consumed.as_str()) {
                    if owner != from_facts.name {
                        grouped.entry(owner).or_default().insert(consumed.clone());
                    }
                }
            }
            for (to_name, via) in grouped {
                let weight = via.len();
                inner.add_edge(indices[&from_facts.name], indices[to_name], weight);
                edges.push(CrossRepoEdge {
                    from: from_facts.name.clone(),
                    to: to_name.to_string(),
                    via,
                });
            }
        }

        Ok(Self { facts, edges, inner })
    }

    /// Topological build order: dependencies first, consumers last.
    /// Errors if the cross-repo graph has a cycle.
    pub fn build_order(&self) -> Result<Vec<String>> {
        // petgraph's toposort returns sources (in-degree 0) first.
        // Our edges point consumer → producer, so the "source" is a
        // top-level consumer; reverse to get deps-first.
        let order = toposort(&self.inner, None).map_err(|cyc| {
            anyhow!(
                "cross-repo dependency cycle detected at node `{}`",
                self.inner[cyc.node_id()]
            )
        })?;
        Ok(order.into_iter().rev().map(|n| self.inner[n].clone()).collect())
    }

    pub fn dependencies_of(&self, repo: &str) -> Vec<&CrossRepoEdge> {
        self.edges.iter().filter(|e| e.from == repo).collect()
    }
}

fn inspect_repo(name: &str, root: &Path) -> Result<RepoFacts> {
    let meta = MetadataCommand::new()
        .current_dir(root)
        .no_deps()
        .exec()
        .with_context(|| format!("cargo_metadata for repo `{name}` at {}", root.display()))?;
    let mut produces: BTreeSet<String> = BTreeSet::new();
    let mut all_local: BTreeSet<String> = BTreeSet::new();
    let mut all_deps: BTreeSet<String> = BTreeSet::new();
    for p in &meta.packages {
        all_local.insert(p.name.to_string());
        // `publish == Some([])` means `publish = false` — a strictly
        // local crate (typical for `xtask` helpers). Such crates are
        // not visible to any other workspace, so they don't count as
        // something this repo "produces" for cross-repo wiring.
        let is_private = matches!(&p.publish, Some(v) if v.is_empty());
        if !is_private {
            produces.insert(p.name.to_string());
        }
        for d in &p.dependencies {
            all_deps.insert(d.name.clone());
        }
    }
    // Strip *all* in-workspace dep names (published or not) from the
    // consumed set — even private crates can be intra-workspace deps,
    // they just can't be cross-workspace deps.
    let consumes: BTreeSet<String> = all_deps.difference(&all_local).cloned().collect();
    Ok(RepoFacts {
        name: name.to_string(),
        root: root.to_path_buf(),
        produces,
        consumes,
    })
}

// ─── persistence ────────────────────────────────────────────────────────

/// One row materialised from the `dep_graph_edges` table per
/// `(snapshot, edge, crate)`. Use [`group_snapshots`] to fold back into
/// [`DepGraphSnapshot`]s.
#[derive(Debug, Clone)]
pub struct DepGraphSnapshot {
    pub snapshot_id: Uuid,
    pub workspace_name: String,
    pub timestamp: DateTime<Utc>,
    pub edges: Vec<CrossRepoEdge>,
}

/// Append a graph snapshot to the warehouse. Returns the snapshot UUID.
/// Edges with no `via` crates (shouldn't happen — guarded by build)
/// are written as a single placeholder row to preserve the edge.
pub async fn record_dep_graph(
    wh: &IcebergWarehouse,
    workspace_name: &str,
    graph: &WorkspaceGraph,
) -> Result<Uuid> {
    let snapshot_id = Uuid::new_v4();
    let ts = Utc::now();
    let id_str = snapshot_id.to_string();

    let mut snapshot_ids = Vec::new();
    let mut ws_names = Vec::new();
    let mut ts_vals: Vec<i64> = Vec::new();
    let mut from_repos = Vec::new();
    let mut to_repos = Vec::new();
    let mut via_crates = Vec::new();
    for e in &graph.edges {
        for via in &e.via {
            snapshot_ids.push(id_str.clone());
            ws_names.push(workspace_name.to_string());
            ts_vals.push(ts.timestamp_micros());
            from_repos.push(e.from.clone());
            to_repos.push(e.to.clone());
            via_crates.push(via.clone());
        }
    }
    if snapshot_ids.is_empty() {
        // Empty snapshot — still record a no-edges marker by writing a
        // single row with empty from/to/via. (Future: a separate
        // snapshots table makes this cleaner.)
        snapshot_ids.push(id_str);
        ws_names.push(workspace_name.to_string());
        ts_vals.push(ts.timestamp_micros());
        from_repos.push(String::new());
        to_repos.push(String::new());
        via_crates.push(String::new());
    }

    let table = wh.catalog()
        .load_table(&wh.table_ident(super::iceberg::TABLE_DEP_GRAPH_EDGES))
        .await?;
    let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
    let cols: Vec<Arc<dyn Array>> = vec![
        Arc::new(StringArray::from(snapshot_ids)),
        Arc::new(StringArray::from(ws_names)),
        Arc::new(TimestampMicrosecondArray::from(ts_vals).with_timezone("+00:00")),
        Arc::new(StringArray::from(from_repos)),
        Arc::new(StringArray::from(to_repos)),
        Arc::new(StringArray::from(via_crates)),
    ];
    let batch = RecordBatch::try_new(arrow_schema, cols)?;
    super::iceberg::append_batch(wh.catalog(), table, batch).await?;
    Ok(snapshot_id)
}

/// Read every dep-graph snapshot row for a workspace, optionally
/// limited to the most recent N snapshots (after grouping).
pub async fn query_dep_graph_snapshots(
    wh: &IcebergWarehouse,
    workspace_name: &str,
    limit: Option<usize>,
) -> Result<Vec<DepGraphSnapshot>> {
    let table = wh.catalog()
        .load_table(&wh.table_ident(super::iceberg::TABLE_DEP_GRAPH_EDGES))
        .await?;
    let scan = table.scan().build()?;
    let stream = scan.to_arrow().await?;
    let batches: Vec<RecordBatch> = stream.try_collect().await?;

    // (snapshot_id, ts) → (workspace_name, BTreeMap<(from,to), BTreeSet<via>>)
    let mut by_snapshot: BTreeMap<
        (Uuid, i64),
        (String, BTreeMap<(String, String), BTreeSet<String>>),
    > = BTreeMap::new();

    for batch in &batches {
        let ids = downcast::<StringArray>(batch, 0)?;
        let wss = downcast::<StringArray>(batch, 1)?;
        let tss = downcast::<TimestampMicrosecondArray>(batch, 2)?;
        let froms = downcast::<StringArray>(batch, 3)?;
        let tos = downcast::<StringArray>(batch, 4)?;
        let vias = downcast::<StringArray>(batch, 5)?;
        for i in 0..batch.num_rows() {
            if wss.value(i) != workspace_name {
                continue;
            }
            let uid = Uuid::parse_str(ids.value(i))?;
            let key = (uid, tss.value(i));
            let entry = by_snapshot
                .entry(key)
                .or_insert_with(|| (wss.value(i).to_string(), BTreeMap::new()));
            let f = froms.value(i).to_string();
            let t = tos.value(i).to_string();
            if !f.is_empty() || !t.is_empty() {
                entry.1.entry((f, t)).or_default().insert(vias.value(i).to_string());
            }
        }
    }

    let mut out: Vec<DepGraphSnapshot> = by_snapshot
        .into_iter()
        .map(|((snapshot_id, ts_micros), (ws, edge_map))| {
            let edges = edge_map
                .into_iter()
                .map(|((from, to), via)| CrossRepoEdge { from, to, via })
                .collect();
            let timestamp = chrono::TimeZone::timestamp_micros(&Utc, ts_micros)
                .single()
                .unwrap_or_else(Utc::now);
            DepGraphSnapshot { snapshot_id, workspace_name: ws, timestamp, edges }
        })
        .collect();

    out.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
    if let Some(n) = limit {
        let drop_n = out.len().saturating_sub(n);
        out.drain(..drop_n);
    }
    Ok(out)
}

/// Topological order over `repos` derived from `edges`. Edges with
/// endpoints outside `repos` are ignored. Convention: dependencies
/// first, dependents last — matches [`WorkspaceGraph::build_order`].
/// On cycle / partial graph, returns `repos` in input order so callers
/// always get a deterministic permutation.
pub fn topo_order_from_edges(repos: &[String], edges: &[CrossRepoEdge]) -> Vec<String> {
    use std::collections::{BTreeMap, BTreeSet, VecDeque};
    let set: BTreeSet<&str> = repos.iter().map(|s| s.as_str()).collect();
    let mut indeg: BTreeMap<&str, usize> = repos.iter().map(|r| (r.as_str(), 0)).collect();
    let mut adj: BTreeMap<&str, Vec<&str>> = BTreeMap::new();
    for e in edges {
        let from = e.from.as_str();
        let to = e.to.as_str();
        if !set.contains(from) || !set.contains(to) {
            continue;
        }
        adj.entry(to).or_default().push(from);
        *indeg.entry(from).or_insert(0) += 1;
    }
    let mut q: VecDeque<&str> =
        indeg.iter().filter(|(_, d)| **d == 0).map(|(r, _)| *r).collect();
    let mut out: Vec<String> = Vec::with_capacity(repos.len());
    while let Some(r) = q.pop_front() {
        out.push(r.to_string());
        if let Some(children) = adj.get(r) {
            for &c in children {
                let d = indeg.get_mut(c).unwrap();
                *d -= 1;
                if *d == 0 {
                    q.push_back(c);
                }
            }
        }
    }
    if out.len() == repos.len() {
        out
    } else {
        repos.to_vec()
    }
}

fn downcast<T: 'static>(batch: &RecordBatch, idx: usize) -> Result<&T> {
    batch
        .column(idx)
        .as_any()
        .downcast_ref::<T>()
        .ok_or_else(|| anyhow!("column {idx} has unexpected type {:?}", batch.column(idx).data_type()))
}