use std::collections::{BTreeMap, BTreeSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use arrow::array::{Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use cargo_metadata::MetadataCommand;
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;
use petgraph::algo::toposort;
use petgraph::graph::{DiGraph, NodeIndex};
use uuid::Uuid;
use super::iceberg::IcebergWarehouse;
use crate::workspace::descriptor::WorkspaceDescriptor;
#[derive(Debug, Clone)]
pub struct RepoFacts {
pub name: String,
pub root: PathBuf,
pub produces: BTreeSet<String>,
pub consumes: BTreeSet<String>,
}
#[derive(Debug, Clone)]
pub struct CrossRepoEdge {
pub from: String,
pub to: String,
pub via: BTreeSet<String>,
}
#[derive(Debug)]
pub struct WorkspaceGraph {
pub facts: BTreeMap<String, RepoFacts>,
pub edges: Vec<CrossRepoEdge>,
inner: DiGraph<String, usize>,
}
impl WorkspaceGraph {
pub fn from_query_parts(facts: BTreeMap<String, RepoFacts>, edges: Vec<CrossRepoEdge>) -> Self {
Self { facts, edges, inner: DiGraph::new() }
}
pub fn build(desc: &WorkspaceDescriptor) -> Result<Self> {
let resolved = crate::workspace::resolve::resolve_sources(desc)?;
let mut facts: BTreeMap<String, RepoFacts> = BTreeMap::new();
for (name, root) in resolved {
facts.insert(name.clone(), inspect_repo(&name, &root)?);
}
let mut producer: BTreeMap<&str, &str> = BTreeMap::new();
for f in facts.values() {
for c in &f.produces {
if let Some(prev) = producer.insert(c.as_str(), f.name.as_str()) {
if prev != f.name {
return Err(anyhow!(
"crate `{c}` is produced by both `{prev}` and `{}` — \
workspaces must produce disjoint crate names",
f.name
));
}
}
}
}
let mut edges: Vec<CrossRepoEdge> = Vec::new();
let mut inner: DiGraph<String, usize> = DiGraph::new();
let mut indices: BTreeMap<String, NodeIndex> = BTreeMap::new();
for name in facts.keys() {
indices.insert(name.clone(), inner.add_node(name.clone()));
}
for from_facts in facts.values() {
let mut grouped: BTreeMap<&str, BTreeSet<String>> = BTreeMap::new();
for consumed in &from_facts.consumes {
if let Some(&owner) = producer.get(consumed.as_str()) {
if owner != from_facts.name {
grouped.entry(owner).or_default().insert(consumed.clone());
}
}
}
for (to_name, via) in grouped {
let weight = via.len();
inner.add_edge(indices[&from_facts.name], indices[to_name], weight);
edges.push(CrossRepoEdge {
from: from_facts.name.clone(),
to: to_name.to_string(),
via,
});
}
}
Ok(Self { facts, edges, inner })
}
pub fn build_order(&self) -> Result<Vec<String>> {
let order = toposort(&self.inner, None).map_err(|cyc| {
anyhow!(
"cross-repo dependency cycle detected at node `{}`",
self.inner[cyc.node_id()]
)
})?;
Ok(order.into_iter().rev().map(|n| self.inner[n].clone()).collect())
}
pub fn dependencies_of(&self, repo: &str) -> Vec<&CrossRepoEdge> {
self.edges.iter().filter(|e| e.from == repo).collect()
}
pub fn dependents_of(&self, repo: &str) -> Vec<&CrossRepoEdge> {
self.edges.iter().filter(|e| e.to == repo).collect()
}
pub fn deps_transitive(&self, repo: &str) -> BTreeSet<String> {
self.reachable(repo, Direction::Forward)
}
pub fn dependents_transitive(&self, repo: &str) -> BTreeSet<String> {
self.reachable(repo, Direction::Reverse)
}
fn reachable(&self, start: &str, dir: Direction) -> BTreeSet<String> {
use std::collections::VecDeque;
let mut seen: BTreeSet<String> = BTreeSet::new();
let mut queue: VecDeque<String> = VecDeque::new();
queue.push_back(start.to_string());
while let Some(cur) = queue.pop_front() {
for e in &self.edges {
let next = match dir {
Direction::Forward if e.from == cur => &e.to,
Direction::Reverse if e.to == cur => &e.from,
_ => continue,
};
if seen.insert(next.clone()) {
queue.push_back(next.clone());
}
}
}
seen.remove(start);
seen
}
pub fn affected_by_change(&self, changed: &[String]) -> Vec<String> {
let mut set: BTreeSet<String> = BTreeSet::new();
for c in changed {
set.insert(c.clone());
set.extend(self.dependents_transitive(c));
}
let repos: Vec<String> = set.into_iter().collect();
topo_order_from_edges(&repos, &self.edges)
}
pub fn dep_path(&self, from: &str, to: &str) -> Option<Vec<String>> {
use std::collections::VecDeque;
if from == to {
return self.facts.contains_key(from).then(|| vec![from.to_string()]);
}
let mut parent: BTreeMap<String, String> = BTreeMap::new();
let mut seen: BTreeSet<String> = BTreeSet::new();
let mut queue: VecDeque<String> = VecDeque::new();
seen.insert(from.to_string());
queue.push_back(from.to_string());
while let Some(cur) = queue.pop_front() {
for e in &self.edges {
if e.from != cur || !seen.insert(e.to.clone()) {
continue;
}
parent.insert(e.to.clone(), cur.clone());
if e.to == to {
let mut path = vec![to.to_string()];
let mut node = to.to_string();
while let Some(p) = parent.get(&node) {
path.push(p.clone());
node = p.clone();
}
path.reverse();
return Some(path);
}
queue.push_back(e.to.clone());
}
}
None
}
pub fn external_deps(&self, repo: &str) -> BTreeSet<String> {
let produced: BTreeSet<&str> = self
.facts
.values()
.flat_map(|f| f.produces.iter().map(String::as_str))
.collect();
match self.facts.get(repo) {
Some(f) => f
.consumes
.iter()
.filter(|c| !produced.contains(c.as_str()))
.cloned()
.collect(),
None => BTreeSet::new(),
}
}
pub fn external_dep_users(&self, krate: &str) -> Vec<String> {
self.facts
.values()
.filter(|f| f.consumes.contains(krate))
.map(|f| f.name.clone())
.collect()
}
}
#[derive(Clone, Copy)]
enum Direction {
Forward,
Reverse,
}
fn inspect_repo(name: &str, root: &Path) -> Result<RepoFacts> {
let meta = MetadataCommand::new()
.current_dir(root)
.no_deps()
.exec()
.with_context(|| format!("cargo_metadata for repo `{name}` at {}", root.display()))?;
let mut produces: BTreeSet<String> = BTreeSet::new();
let mut all_local: BTreeSet<String> = BTreeSet::new();
let mut all_deps: BTreeSet<String> = BTreeSet::new();
for p in &meta.packages {
all_local.insert(p.name.to_string());
let is_private = matches!(&p.publish, Some(v) if v.is_empty());
if !is_private {
produces.insert(p.name.to_string());
}
for d in &p.dependencies {
all_deps.insert(d.name.clone());
}
}
let consumes: BTreeSet<String> = all_deps.difference(&all_local).cloned().collect();
Ok(RepoFacts {
name: name.to_string(),
root: root.to_path_buf(),
produces,
consumes,
})
}
#[derive(Debug, Clone)]
pub struct DepGraphSnapshot {
pub snapshot_id: Uuid,
pub workspace_name: String,
pub timestamp: DateTime<Utc>,
pub edges: Vec<CrossRepoEdge>,
}
pub async fn record_dep_graph(
wh: &IcebergWarehouse,
workspace_name: &str,
graph: &WorkspaceGraph,
) -> Result<Uuid> {
let snapshot_id = Uuid::new_v4();
let ts = Utc::now();
let id_str = snapshot_id.to_string();
let mut snapshot_ids = Vec::new();
let mut ws_names = Vec::new();
let mut ts_vals: Vec<i64> = Vec::new();
let mut from_repos = Vec::new();
let mut to_repos = Vec::new();
let mut via_crates = Vec::new();
for e in &graph.edges {
for via in &e.via {
snapshot_ids.push(id_str.clone());
ws_names.push(workspace_name.to_string());
ts_vals.push(ts.timestamp_micros());
from_repos.push(e.from.clone());
to_repos.push(e.to.clone());
via_crates.push(via.clone());
}
}
if snapshot_ids.is_empty() {
snapshot_ids.push(id_str);
ws_names.push(workspace_name.to_string());
ts_vals.push(ts.timestamp_micros());
from_repos.push(String::new());
to_repos.push(String::new());
via_crates.push(String::new());
}
let table = wh.catalog()
.load_table(&wh.table_ident(super::iceberg::TABLE_DEP_GRAPH_EDGES))
.await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(snapshot_ids)),
Arc::new(StringArray::from(ws_names)),
Arc::new(TimestampMicrosecondArray::from(ts_vals).with_timezone("+00:00")),
Arc::new(StringArray::from(from_repos)),
Arc::new(StringArray::from(to_repos)),
Arc::new(StringArray::from(via_crates)),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
super::iceberg::append_batch(wh.catalog(), table, batch).await?;
Ok(snapshot_id)
}
pub async fn query_dep_graph_snapshots(
wh: &IcebergWarehouse,
workspace_name: &str,
limit: Option<usize>,
) -> Result<Vec<DepGraphSnapshot>> {
let table = wh.catalog()
.load_table(&wh.table_ident(super::iceberg::TABLE_DEP_GRAPH_EDGES))
.await?;
let scan = table
.scan()
.with_filter(Reference::new("workspace_name").equal_to(Datum::string(workspace_name)))
.select(["snapshot_id", "workspace_name", "ts_micros", "from_repo", "to_repo", "via_crate"])
.build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut by_snapshot: BTreeMap<
(Uuid, i64),
(String, BTreeMap<(String, String), BTreeSet<String>>),
> = BTreeMap::new();
for batch in &batches {
let ids = col::<StringArray>(batch, "snapshot_id")?;
let wss = col::<StringArray>(batch, "workspace_name")?;
let tss = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
let froms = col::<StringArray>(batch, "from_repo")?;
let tos = col::<StringArray>(batch, "to_repo")?;
let vias = col::<StringArray>(batch, "via_crate")?;
for i in 0..batch.num_rows() {
if wss.value(i) != workspace_name {
continue;
}
let uid = Uuid::parse_str(ids.value(i))?;
let key = (uid, tss.value(i));
let entry = by_snapshot
.entry(key)
.or_insert_with(|| (wss.value(i).to_string(), BTreeMap::new()));
let f = froms.value(i).to_string();
let t = tos.value(i).to_string();
if !f.is_empty() || !t.is_empty() {
entry.1.entry((f, t)).or_default().insert(vias.value(i).to_string());
}
}
}
let mut out: Vec<DepGraphSnapshot> = by_snapshot
.into_iter()
.map(|((snapshot_id, ts_micros), (ws, edge_map))| {
let edges = edge_map
.into_iter()
.map(|((from, to), via)| CrossRepoEdge { from, to, via })
.collect();
let timestamp = chrono::TimeZone::timestamp_micros(&Utc, ts_micros)
.single()
.unwrap_or_else(Utc::now);
DepGraphSnapshot { snapshot_id, workspace_name: ws, timestamp, edges }
})
.collect();
out.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
if let Some(n) = limit {
let drop_n = out.len().saturating_sub(n);
out.drain(..drop_n);
}
Ok(out)
}
pub fn topo_order_from_edges(repos: &[String], edges: &[CrossRepoEdge]) -> Vec<String> {
use std::collections::{BTreeMap, BTreeSet, VecDeque};
let set: BTreeSet<&str> = repos.iter().map(|s| s.as_str()).collect();
let mut indeg: BTreeMap<&str, usize> = repos.iter().map(|r| (r.as_str(), 0)).collect();
let mut adj: BTreeMap<&str, Vec<&str>> = BTreeMap::new();
for e in edges {
let from = e.from.as_str();
let to = e.to.as_str();
if !set.contains(from) || !set.contains(to) {
continue;
}
adj.entry(to).or_default().push(from);
*indeg.entry(from).or_insert(0) += 1;
}
let mut q: VecDeque<&str> =
indeg.iter().filter(|(_, d)| **d == 0).map(|(r, _)| *r).collect();
let mut out: Vec<String> = Vec::with_capacity(repos.len());
while let Some(r) = q.pop_front() {
out.push(r.to_string());
if let Some(children) = adj.get(r) {
for &c in children {
let d = indeg.get_mut(c).unwrap();
*d -= 1;
if *d == 0 {
q.push_back(c);
}
}
}
}
if out.len() == repos.len() {
out
} else {
repos.to_vec()
}
}
fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
batch
.column_by_name(name)
.ok_or_else(|| anyhow!("projected batch missing column `{name}`"))?
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("column `{name}` has unexpected arrow type"))
}
#[cfg(test)]
mod mimir_tests {
use super::*;
fn graph(facts: Vec<RepoFacts>, edges: Vec<CrossRepoEdge>) -> WorkspaceGraph {
let mut fmap = BTreeMap::new();
for f in facts {
fmap.insert(f.name.clone(), f);
}
WorkspaceGraph { facts: fmap, edges, inner: DiGraph::new() }
}
fn facts(name: &str, produces: &[&str], consumes: &[&str]) -> RepoFacts {
RepoFacts {
name: name.to_string(),
root: PathBuf::from("/dev/null"),
produces: produces.iter().map(|s| s.to_string()).collect(),
consumes: consumes.iter().map(|s| s.to_string()).collect(),
}
}
fn edge(from: &str, to: &str, via: &[&str]) -> CrossRepoEdge {
CrossRepoEdge {
from: from.to_string(),
to: to.to_string(),
via: via.iter().map(|s| s.to_string()).collect(),
}
}
fn diamond() -> WorkspaceGraph {
graph(
vec![
facts("app", &["app_c"], &["a_c", "b_c", "serde"]),
facts("liba", &["a_c"], &["util_c"]),
facts("libb", &["b_c"], &["util_c"]),
facts("util", &["util_c"], &["libc"]),
],
vec![
edge("app", "liba", &["a_c"]),
edge("app", "libb", &["b_c"]),
edge("liba", "util", &["util_c"]),
edge("libb", "util", &["util_c"]),
],
)
}
fn names(edges: Vec<&CrossRepoEdge>, pick_to: bool) -> BTreeSet<String> {
edges
.into_iter()
.map(|e| if pick_to { e.to.clone() } else { e.from.clone() })
.collect()
}
#[test]
fn dependents_of_is_reverse_of_dependencies() {
let g = diamond();
assert_eq!(
names(g.dependents_of("util"), false),
["liba", "libb"].iter().map(|s| s.to_string()).collect()
);
assert_eq!(
names(g.dependencies_of("app"), true),
["liba", "libb"].iter().map(|s| s.to_string()).collect()
);
assert!(g.dependents_of("app").is_empty());
}
#[test]
fn transitive_closures() {
let g = diamond();
assert_eq!(
g.deps_transitive("app"),
["liba", "libb", "util"].iter().map(|s| s.to_string()).collect()
);
assert_eq!(
g.dependents_transitive("util"),
["app", "liba", "libb"].iter().map(|s| s.to_string()).collect()
);
assert!(g.deps_transitive("util").is_empty());
assert!(g.dependents_transitive("app").is_empty());
}
#[test]
fn affected_by_change_is_blast_radius_in_build_order() {
let g = diamond();
let affected = g.affected_by_change(&["util".to_string()]);
assert_eq!(
affected.iter().cloned().collect::<BTreeSet<_>>(),
["app", "liba", "libb", "util"].iter().map(|s| s.to_string()).collect()
);
let pos = |n: &str| affected.iter().position(|x| x == n).unwrap();
assert!(pos("util") < pos("liba"));
assert!(pos("util") < pos("libb"));
assert!(pos("liba") < pos("app"));
assert!(pos("libb") < pos("app"));
}
#[test]
fn dep_path_finds_shortest_route() {
let g = diamond();
let p = g.dep_path("app", "util").expect("path exists");
assert_eq!(p.len(), 3);
assert_eq!(p.first().unwrap(), "app");
assert_eq!(p.last().unwrap(), "util");
assert_eq!(g.dep_path("app", "app"), Some(vec!["app".to_string()]));
assert_eq!(g.dep_path("util", "app"), None);
assert_eq!(g.dep_path("app", "ghost"), None);
}
#[test]
fn external_deps_and_users() {
let g = diamond();
assert_eq!(
g.external_deps("app"),
["serde"].iter().map(|s| s.to_string()).collect()
);
assert_eq!(
g.external_deps("util"),
["libc"].iter().map(|s| s.to_string()).collect()
);
assert!(g.external_deps("liba").is_empty());
assert_eq!(g.external_dep_users("serde"), vec!["app".to_string()]);
assert_eq!(g.external_dep_users("libc"), vec!["util".to_string()]);
assert_eq!(
g.external_dep_users("util_c"),
vec!["liba".to_string(), "libb".to_string()]
);
}
}