use std::path::{Path, PathBuf};
use anyhow::{anyhow, bail, Context, Result};
use clap::{Args, Parser, Subcommand};
use nornir::bench;
use nornir::config::{self, Loaded};
use nornir::docs;
use nornir::guard;
use nornir::index;
use nornir::introspect;
use nornir::release;
use nornir::warehouse;
#[derive(Parser)]
#[command(name = "nornir", version, about = "Companion to cargo: release, bench, docs, guard.")]
struct Cli {
#[arg(long, global = true)]
config: Option<PathBuf>,
#[arg(long, global = true)]
skip_heavy: bool,
#[command(subcommand)]
cmd: Cmd,
}
#[derive(Subcommand)]
enum Cmd {
Guard {
#[command(subcommand)]
op: GuardOp,
},
Bench {
#[command(subcommand)]
op: BenchOp,
},
Release {
#[command(subcommand)]
op: ReleaseOp,
},
Test {
#[command(subcommand)]
op: TestOp,
},
Docs {
#[command(subcommand)]
op: DocsOp,
},
Introspect {
#[command(subcommand)]
op: IntrospectOp,
},
Warehouse {
#[command(subcommand)]
op: WarehouseOp,
},
Index {
#[command(subcommand)]
op: IndexOp,
},
#[cfg(feature = "robot")]
Robot {
#[command(subcommand)]
op: RobotOp,
},
#[cfg(feature = "vector")]
Vector {
#[command(subcommand)]
op: VectorOp,
},
Knowledge {
#[command(subcommand)]
op: KnowledgeOp,
},
Map(RepoArg),
Funnel {
#[command(subcommand)]
op: FunnelOp,
},
Repos,
Root,
Serve(ServeArgs),
Viz(VizArgs),
Install {
#[command(subcommand)]
op: InstallOp,
},
Key {
#[command(subcommand)]
op: KeyOp,
},
Workspace {
#[command(subcommand)]
op: WorkspaceOp,
},
Security {
#[command(subcommand)]
cmd: SecurityCmd,
},
Mimir {
#[command(subcommand)]
op: MimirOp,
},
Diagram {
#[command(subcommand)]
op: DiagramOp,
},
Bakeoff {
#[command(subcommand)]
op: BakeoffOp,
},
}
#[derive(Subcommand)]
enum BakeoffOp {
Run(BakeoffRunArgs),
Leaderboard(BakeoffLeaderboardArgs),
Demo,
}
#[derive(Args)]
struct BakeoffRunArgs {
#[arg(long, visible_alias = "prompt")]
task: String,
#[arg(long, default_value = "default", visible_alias = "prompt-id")]
task_id: String,
#[arg(long, value_delimiter = ',')]
agents: Vec<String>,
#[arg(long, value_delimiter = ',')]
models: Vec<String>,
#[arg(long)]
host: Option<String>,
#[arg(long)]
mock: bool,
#[arg(long)]
json: bool,
#[arg(long)]
matrix: bool,
}
#[derive(Args)]
struct BakeoffLeaderboardArgs {
#[arg(default_value = "")]
reference: String,
#[arg(long)]
json: bool,
#[arg(long)]
matrix: bool,
}
#[derive(Subcommand)]
enum MimirOp {
DepsOf {
repo: String,
#[arg(long)]
transitive: bool,
},
DependentsOf {
repo: String,
#[arg(long)]
transitive: bool,
},
Affected {
repos: Vec<String>,
},
BuildOrder,
DepPath { from: String, to: String },
ExternalUsers { krate: String },
Mermaid,
Overview { repo: String },
Changed,
}
#[derive(Subcommand)]
enum DiagramOp {
Timeline(DiagramArgs),
LaneSummary(DiagramArgs),
Depgraph(DiagramArgs),
SnapshotEdges(DiagramArgs),
GateMatrix(DiagramArgs),
ReleaseVersions(DiagramArgs),
BenchHistory {
#[command(flatten)]
args: DiagramArgs,
#[arg(long)]
limit: Option<usize>,
},
BenchCompare(DiagramArgs),
}
#[derive(Args)]
struct DiagramArgs {
#[arg(long)]
workspace: Option<String>,
}
#[derive(Subcommand)]
enum SecurityCmd {
Scan {
#[arg(default_value = ".")]
repo: std::path::PathBuf,
#[arg(long)]
out: Option<std::path::PathBuf>,
},
UpdateDb,
}
#[derive(Subcommand)]
enum WorkspaceOp {
Ls,
Show { name: String },
Register {
name: String,
#[arg(long)]
descriptor: String,
#[arg(long)]
monitored: bool,
#[arg(long)]
external: bool,
#[arg(long)]
poll: Option<String>,
},
Add {
name: String,
#[arg(long)]
descriptor: String,
#[arg(long)]
monitored: bool,
#[arg(long)]
external: bool,
#[arg(long)]
poll: Option<String>,
},
Rm { name: String },
Fetch { name: String },
}
#[derive(Subcommand)]
enum KeyOp {
Show,
Path,
Generate,
}
#[derive(Args)]
struct ServeArgs {
#[arg(long)]
config: Option<PathBuf>,
#[arg(long)]
addr: Option<String>,
#[arg(long)]
token: Option<String>,
#[arg(long)]
server_bin: Option<PathBuf>,
}
#[derive(Args)]
struct VizArgs {
#[arg(long)]
server: Option<String>,
#[arg(long)]
token: Option<String>,
#[arg(long)]
workspace: Option<String>,
#[arg(long)]
warehouse: Option<PathBuf>,
#[arg(long)]
viz_bin: Option<PathBuf>,
}
#[derive(Subcommand)]
enum InstallOp {
Systemd(SystemdArgs),
Uninstall(UninstallArgs),
}
#[derive(Args)]
struct UninstallArgs {
#[arg(long)]
purge: bool,
#[arg(long, default_value = "nornir")]
user: String,
}
#[derive(Args)]
struct SystemdArgs {
#[arg(long)]
config: Option<PathBuf>,
#[arg(long, default_value = "127.0.0.1:7878")]
addr: String,
#[arg(long, default_value = "nornir")]
user: String,
#[arg(long)]
server_bin: Option<PathBuf>,
#[arg(long)]
root: Option<PathBuf>,
#[arg(long = "monitor")]
monitor: Vec<String>,
#[arg(long)]
poll: Option<String>,
}
#[derive(Subcommand)]
enum FunnelOp {
Submit {
text: Option<String>,
#[arg(long)]
file: Option<std::path::PathBuf>,
#[arg(long)]
source: Option<String>,
},
Plan { idea_id: String, summary: String },
Node {
plan_id: String,
kind: String,
#[arg(long)]
title: Option<String>,
#[arg(long = "target")]
targets: Vec<String>,
#[arg(long = "needs")]
needs: Vec<String>,
#[arg(long)]
prompt: Option<String>,
},
Link { plan_id: String, from: String, to: String },
Next,
Status {
plan_id: String,
node_id: String,
status: String,
#[arg(long)]
why: Option<String>,
},
Show,
Demo {
#[arg(long, default_value_t = 1)]
size: usize,
},
Nuke {
#[arg(long)]
yes: bool,
},
}
#[derive(Subcommand)]
enum KnowledgeOp {
Scan {
repo: String,
#[arg(long = "no-save")]
no_save: bool,
},
Query {
repo: String,
kind: String,
arg: String,
#[arg(long)]
to: Option<String>,
#[arg(long, default_value_t = 50)]
limit: usize,
},
}
#[derive(Subcommand)]
enum GuardOp {
Status,
Apply,
Verify,
Release,
}
#[derive(Subcommand)]
enum BenchOp {
HistoryShow(HistoryShowArgs),
Run(RepoArg),
}
#[derive(Subcommand)]
enum TestOp {
Run(TestRunArgs),
All(TestAllArgs),
History(TestHistoryArgs),
}
#[derive(Args)]
struct TestRunArgs {
repo: String,
#[arg(long)]
aspects: Option<String>,
}
#[derive(Args)]
struct TestAllArgs {
#[arg(long)]
aspects: Option<String>,
}
#[derive(Args)]
struct TestHistoryArgs {
repo: String,
#[arg(long)]
json: bool,
#[arg(long, default_value_t = 0)]
limit: usize,
}
#[derive(Subcommand)]
enum ReleaseOp {
GatePathPatches(RepoArg),
GatePathDepVersions(RepoArg),
GateCrateMetadata(RepoArg),
GateLinksConflicts(RepoArg),
GateNexusFloor(RepoArg),
GateNoRegression(RepoArg),
GateDocsFresh(RepoArg),
GateRoundtrip(RepoArg),
GateAll(RepoArg),
Gate {
name: String,
repo: String,
},
Trace {
repo: String,
#[arg(long, default_value = "")]
workspace: String,
#[arg(long)]
json: bool,
},
Events {
selector: String,
#[arg(long)]
json: bool,
},
Run(ReleaseRunArgs),
Publish(PublishArgs),
WaitForIndex {
krate: String,
version: String,
#[arg(long, default_value_t = 300)]
timeout_secs: u64,
},
TarballStats {
repo: String,
krate: String,
},
StripPatchBlocks(RepoArg),
Changelog {
repo: String,
range: String,
},
ImpactedCrates {
repo: String,
#[arg(long, default_value = "main")]
base: String,
},
YankCascade {
repo: String,
#[arg(long)]
undo: bool,
#[arg(long)]
dry_run: bool,
},
MirrorToHolger {
repo: String,
krate: String,
version: String,
#[arg(long)]
holger_base_url: String,
#[arg(long)]
token: Option<String>,
},
BumpVersion {
repo: String,
pkg: String,
new_version: String,
#[arg(long)]
bump_consumers: bool,
#[arg(long)]
dry_run: bool,
},
}
#[derive(Subcommand)]
enum DocsOp {
Init(RepoArg),
Render(RepoArg),
Check(RepoArg),
#[cfg(feature = "docs-export")]
Export(DocsExportArgs),
#[cfg(feature = "docs-export")]
Book(DocsBookArgs),
History(DocsHistoryArgs),
Index(DocsIndexArgs),
Search(DocsSearchArgs),
}
#[derive(Args)]
struct DocsIndexArgs {
#[command(flatten)]
repo: RepoArg,
#[arg(long)]
skip_snapshot: bool,
}
#[derive(Args)]
struct DocsSearchArgs {
#[command(flatten)]
repo: RepoArg,
query: String,
#[arg(long)]
sha: Option<String>,
#[arg(long, default_value_t = 10)]
limit: usize,
}
#[derive(Args)]
struct DocsHistoryArgs {
#[command(flatten)]
repo: RepoArg,
#[arg(long)]
doc: Option<String>,
#[arg(long)]
version: Option<String>,
#[arg(long)]
format: Option<String>,
#[arg(long, default_value_t = 20)]
limit: usize,
}
#[cfg(feature = "docs-export")]
#[derive(Args)]
struct DocsExportArgs {
#[command(flatten)]
repo: RepoArg,
#[arg(long, default_value = "pdf")]
format: String,
#[arg(long)]
out: Option<PathBuf>,
}
#[cfg(feature = "docs-export")]
#[derive(Args)]
struct DocsBookArgs {
#[command(flatten)]
repo: RepoArg,
#[arg(long, default_value = "pdf")]
format: String,
#[arg(long)]
out: Option<PathBuf>,
}
#[derive(Args)]
struct DocsFileArgs {
#[command(flatten)]
repo: RepoArg,
}
#[allow(dead_code)] type _DocsFileArgsRef = DocsFileArgs;
#[derive(Subcommand)]
enum IntrospectOp {
Depgraph(RepoArg),
Symbols(DwarfSymbolsArgs),
SymbolLookup(SymbolLookupArgs),
DefinedIn(DefinedInArgs),
Callgraph(SymbolsArgs),
CallgraphLlvm(CallgraphLlvmArgs),
Callers(CallQueryArgs),
Callees(CallQueryArgs),
PathBetween(PathArgs),
}
#[derive(Args)]
struct CallQueryArgs {
binary: PathBuf,
name: String,
}
#[derive(Args)]
struct PathArgs {
binary: PathBuf,
from: String,
to: String,
}
#[derive(Args)]
struct HistoryShowArgs {
repo: String,
#[arg(long)]
json: bool,
#[arg(long, default_value_t = 0)]
limit: usize,
}
#[derive(Args)]
struct SymbolsArgs {
binary: PathBuf,
}
#[derive(Args)]
struct DwarfSymbolsArgs {
binary: PathBuf,
#[arg(long = "no-save")]
no_save: bool,
#[arg(long)]
repo: Option<String>,
}
#[derive(Args)]
struct CallgraphLlvmArgs {
path: PathBuf,
#[arg(long)]
crates: Option<String>,
}
#[derive(Args)]
struct SymbolLookupArgs {
binary: PathBuf,
pattern: String,
#[arg(long, default_value_t = 20)]
limit: usize,
}
#[derive(Args)]
struct DefinedInArgs {
binary: PathBuf,
file: String,
#[arg(long, default_value_t = 50)]
limit: usize,
}
#[derive(Subcommand)]
enum WarehouseOp {
ImportJsonl(RepoArg),
Query(WarehouseQueryArgs),
}
#[derive(Args)]
struct WarehouseQueryArgs {
repo: String,
#[arg(long)]
machine: Option<String>,
#[arg(long)]
last: Option<usize>,
}
#[derive(Subcommand)]
enum IndexOp {
Build(IndexBuildArgs),
Search(IndexSearchArgs),
Stats,
Snapshot(IndexSnapshotArgs),
Restore(IndexRestoreArgs),
Upload(IndexSnapshotArgs),
}
#[derive(Args)]
struct IndexBuildArgs {
#[arg(long)]
clean: bool,
#[arg(long = "no-snapshot")]
no_snapshot: bool,
}
#[derive(Args)]
struct IndexSnapshotArgs {
#[arg(long)]
repo: Option<String>,
#[arg(long, default_value = "workspace_holger")]
workspace: String,
}
#[derive(Args)]
struct IndexRestoreArgs {
#[arg(long)]
repo: Option<String>,
#[arg(long)]
sha: Option<String>,
#[arg(long)]
into: Option<PathBuf>,
}
#[derive(Args)]
struct IndexSearchArgs {
query: String,
#[arg(long)]
corpus: Option<String>,
#[arg(long)]
repo: Option<String>,
#[arg(long, default_value_t = 10)]
limit: usize,
}
#[cfg(feature = "robot")]
#[derive(Subcommand)]
enum RobotOp {
Run(RobotRunArgs),
}
#[cfg(feature = "robot")]
#[derive(Args)]
struct RobotRunArgs {
repo: String,
#[arg(long, default_value = "robot/scenarios.toml")]
scenarios: PathBuf,
#[arg(long, default_value = "http://127.0.0.1:3000")]
base_url: String,
#[arg(long, default_value = "http://127.0.0.1:9515")]
webdriver: String,
#[arg(long)]
dry_run: bool,
}
#[cfg(feature = "vector")]
#[derive(Subcommand)]
enum VectorOp {
Index(VectorIndexArgs),
Search(VectorSearchArgs),
Stats,
Doctor,
SetupCuda(SetupCudaArgs),
}
#[cfg(feature = "vector")]
#[derive(Args)]
struct SetupCudaArgs {
#[arg(long, default_value = "/opt/nornir/cuda")]
target: PathBuf,
}
#[cfg(feature = "vector")]
#[derive(Args)]
struct VectorIndexArgs {
repo: String,
}
#[cfg(feature = "vector")]
#[derive(Args)]
struct VectorSearchArgs {
query: String,
#[arg(long)]
repo: Option<String>,
#[arg(long)]
sha: Option<String>,
#[arg(long, default_value = "semantic")]
mode: String,
#[arg(long, default_value_t = 10)]
limit: usize,
}
#[derive(Args)]
struct RepoArg {
repo: String,
}
#[derive(Args)]
struct PublishArgs {
repo: String,
#[arg(long)]
dry_run: bool,
}
#[derive(Args)]
struct ReleaseRunArgs {
#[arg(long)]
repo: Option<String>,
#[arg(long, default_value = "workspace_holger")]
workspace: String,
#[arg(long)]
skip_tests: bool,
#[arg(long)]
skip_bench: bool,
#[arg(long)]
skip_gates: bool,
#[arg(long)]
skip_render_docs: bool,
#[arg(long)]
skip_snapshot: bool,
#[arg(long)]
skip_push: bool,
#[arg(long)]
skip_publish: bool,
#[arg(long)]
dry_run_publish: bool,
#[arg(long)]
recapture: bool,
}
fn main() -> Result<()> {
let cli = Cli::parse();
if cli.skip_heavy {
unsafe { std::env::set_var("NORNIR_SKIP_HEAVY", "1") };
}
match &cli.cmd {
Cmd::Serve(a) => return run_serve(a),
Cmd::Viz(a) => return run_viz(a, cli.config.as_deref()),
Cmd::Install { op } => return run_install(op),
Cmd::Key { op } => return run_key(op),
Cmd::Workspace { op } => return run_workspace(op),
Cmd::Security { cmd } => return run_security(cmd),
Cmd::Root => {
println!("{}", nornir::config::nornir_home().display());
return Ok(());
}
_ => {}
}
let loaded = load_config(cli.config.as_deref())?;
match cli.cmd {
Cmd::Serve(_)
| Cmd::Viz(_)
| Cmd::Install { .. }
| Cmd::Key { .. }
| Cmd::Workspace { .. }
| Cmd::Security { .. }
| Cmd::Root => {
unreachable!("handled before load_config")
}
Cmd::Repos => {
if let Some(server) = server_target() {
for name in list_repos_remote(&server)? {
println!("{name}");
}
} else {
for name in loaded.nornir.repo.keys() {
println!("{name}");
}
}
}
Cmd::Guard { op } => run_guard(op, &loaded)?,
Cmd::Bench { op } => run_bench(op, &loaded)?,
Cmd::Release { op } => run_release(op, &loaded)?,
Cmd::Test { op } => run_test(op, &loaded)?,
Cmd::Docs { op } => run_docs(op, &loaded)?,
Cmd::Introspect { op } => run_introspect(op, &loaded)?,
Cmd::Warehouse { op } => run_warehouse(op, &loaded)?,
Cmd::Index { op } => run_index(op, &loaded)?,
#[cfg(feature = "vector")]
Cmd::Vector { op } => run_vector(op, &loaded)?,
#[cfg(feature = "robot")]
Cmd::Robot { op } => run_robot(op, &loaded)?,
Cmd::Knowledge { op } => run_knowledge(op, &loaded)?,
Cmd::Map(a) => run_map(&a.repo, &loaded)?,
Cmd::Funnel { op } => run_funnel(op, &loaded)?,
Cmd::Mimir { op } => run_mimir(op, &loaded)?,
Cmd::Diagram { op } => run_diagram(op, &loaded)?,
Cmd::Bakeoff { op } => run_bakeoff_cmd(op, &loaded)?,
}
Ok(())
}
fn run_bakeoff_cmd(op: BakeoffOp, loaded: &Loaded) -> Result<()> {
use nornir::warehouse::agent_model_runs::{
default_agents, default_models, demo_matrix_rows, new_run_id,
query_agent_model_runs, render_leaderboard, render_matrix, rows_to_json,
run_bakeoff_matrix, short_run, BakeoffSelector, MockCaller, ModelAnswer, ModelCaller,
OllamaCaller,
};
use nornir::warehouse::iceberg::IcebergWarehouse;
match op {
BakeoffOp::Run(a) => {
let agents = if a.agents.is_empty() { default_agents() } else { a.agents.clone() };
let models = if a.models.is_empty() { default_models() } else { a.models.clone() };
if models.is_empty() {
bail!("no models to run — pass --models a,b,c or configure the registry");
}
let run_id = new_run_id();
let ts_micros = chrono::Utc::now().timestamp_micros();
let caller: Box<dyn ModelCaller> = if a.mock {
let mut mock = MockCaller::new();
for (ai, agent) in agents.iter().enumerate() {
let hosted = agent != "local-llm" && agent != "-" && agent != "local";
for (mi, m) in models.iter().enumerate() {
let tout = 16 + (mi as i64) + (ai as i64) * 2;
let lat = 100.0 + (mi as f64) * 50.0 + (ai as f64) * 30.0;
mock = mock.with_cell(
agent.clone(),
m.clone(),
ModelAnswer {
output: format!("[mock answer from {agent}/{m}]"),
latency_ms: lat,
tokens_in: 12,
tokens_out: tout,
tokens_per_s: tout as f64 / (lat / 1000.0),
score: (1.0 - (mi as f64) * 0.1 + if hosted { 0.05 } else { 0.0 })
.min(1.0),
cost_usd: if hosted { 0.01 * (mi as f64 + 1.0) } else { 0.0 },
mcp_tool_calls: if hosted { 3 + mi as i64 } else { 0 },
},
);
}
}
Box::new(mock)
} else {
Box::new(OllamaCaller::new(a.host.clone()))
};
println!(
"▶ bake-off `{}` matrix: {} agent(s) [{}] × {} model(s) [{}] = {} cells",
a.task_id,
agents.len(),
agents.join(", "),
models.len(),
models.join(", "),
agents.len() * models.len(),
);
let rows = run_bakeoff_matrix(
caller.as_ref(),
&run_id,
ts_micros,
&a.task_id,
&a.task,
&agents,
&models,
);
persist_bakeoff(&loaded, &rows)?;
println!("✓ recorded run {} ({} rows)\n", short_run(&run_id), rows.len());
if a.json {
println!("{}", rows_to_json(&rows));
} else if a.matrix {
print!("{}", render_matrix(&rows));
} else {
print!("{}", render_leaderboard(&rows));
}
}
BakeoffOp::Demo => {
let run_id = new_run_id();
let ts_micros = chrono::Utc::now().timestamp_micros();
let rows = demo_matrix_rows(&run_id, ts_micros);
persist_bakeoff(&loaded, &rows)?;
println!(
"✓ seeded demo bake-off matrix: run {} ({} cells, 3 agents × 3 models)\n",
short_run(&run_id),
rows.len()
);
print!("{}", render_matrix(&rows));
println!(
"\nopen the 🏆 Leaderboard tab in `nornir viz`, or: nornir bakeoff leaderboard --matrix"
);
}
BakeoffOp::Leaderboard(a) => {
let warehouse_root = loaded.warehouse_root();
let wh = IcebergWarehouse::open_read_only(&warehouse_root).with_context(|| {
format!("open iceberg warehouse at {}", warehouse_root.display())
})?;
let rows = if a.reference.is_empty() {
let all = wh.block_on(query_agent_model_runs(&wh, &BakeoffSelector::All))?;
match all.iter().max_by_key(|r| r.ts_micros).map(|r| r.run_id.clone()) {
Some(run_id) => all.into_iter().filter(|r| r.run_id == run_id).collect(),
None => Vec::new(),
}
} else {
let by_run = wh.block_on(query_agent_model_runs(
&wh,
&BakeoffSelector::Run(a.reference.clone()),
))?;
if by_run.is_empty() {
wh.block_on(query_agent_model_runs(
&wh,
&BakeoffSelector::Model(a.reference.clone()),
))?
} else {
by_run
}
};
if a.json {
println!("{}", rows_to_json(&rows));
} else if rows.is_empty() {
println!("no bake-off runs recorded yet — run `nornir bakeoff run --task …` or `nornir bakeoff demo` first");
} else if a.matrix {
print!("{}", render_matrix(&rows));
} else {
print!("{}", render_leaderboard(&rows));
}
}
}
Ok(())
}
fn run_funnel(op: FunnelOp, loaded: &Loaded) -> Result<()> {
use chrono::Utc;
use nornir::funnel::{self, Event, IdeaId, NodeId, NodeStatus, PlanId, PlanStatus};
let root = funnel_root(loaded);
match &op {
FunnelOp::Demo { size } => {
let manifest = funnel::demo::run_demo(&root, *size)?;
println!(
"funnel demo: created {} fake repo(s) under {}",
manifest.repos.len(),
manifest
.container
.as_ref()
.map(|c| c.display().to_string())
.unwrap_or_default(),
);
for r in &manifest.repos {
println!(" {}", r.display());
}
println!("warehouse funnel DAG injected at {}", root.display());
return Ok(());
}
FunnelOp::Nuke { yes } => {
let plan = funnel::demo::nuke_plan(&root);
let Some(manifest) = plan else {
println!("funnel nuke: no demo manifest found — nothing to delete");
return Ok(());
};
if !*yes {
println!("funnel nuke would delete (disk only — warehouse funnel data preserved):");
for r in &manifest.repos {
println!(" {}", r.display());
}
if let Some(c) = &manifest.container {
println!(" {} (container, if empty)", c.display());
}
println!("re-run with --yes to confirm");
return Ok(());
}
let removed = funnel::demo::nuke_disk(&root)?;
println!("funnel nuke: removed {} fake repo(s) on disk", removed.len());
for r in &removed {
println!(" {}", r.display());
}
println!("warehouse funnel data preserved (plan still visible in `funnel show`)");
return Ok(());
}
_ => {}
}
let mut store = if matches!(op, FunnelOp::Show) {
funnel::Store::open_read_only(&root)?
} else {
funnel::Store::open(&root)?
};
match op {
FunnelOp::Submit { text, file, source } => {
let text = match (file, text) {
(Some(path), _) => std::fs::read_to_string(&path)
.with_context(|| format!("read krav/prompt file {}", path.display()))?,
(None, Some(t)) => t,
(None, None) => {
use std::io::Read;
let mut buf = String::new();
std::io::stdin().read_to_string(&mut buf).context("read idea text from stdin")?;
buf
}
};
let text = text.trim().to_string();
if text.is_empty() {
anyhow::bail!("empty idea text — pass a krav/prompt as an arg, --file, or via stdin");
}
let id = IdeaId::seq(store.funnel.next_idea);
store.record(Event::IdeaSubmitted {
id: id.clone(),
source: source.unwrap_or_else(|| "cli".into()),
text,
refs: Vec::new(),
ts: Utc::now(),
})?;
println!("{}", id.as_str());
}
FunnelOp::Plan { idea_id, summary } => {
let plan_id = PlanId::seq(store.funnel.next_plan);
store.record(Event::PlanCreated {
id: plan_id.clone(),
idea_id: IdeaId::new(idea_id),
summary,
planner: "cli".into(),
ts: Utc::now(),
})?;
store.record(Event::PlanStatusChanged {
plan_id: plan_id.clone(),
status: PlanStatus::Active,
why: None,
ts: Utc::now(),
})?;
println!("{}", plan_id.as_str());
}
FunnelOp::Node { plan_id, kind, title, targets, needs, prompt } => {
let plan_id = PlanId::new(plan_id);
let node_id = NodeId::seq(store.funnel.next_node);
let mut params = serde_json::Map::new();
if let Some(t) = title {
params.insert("title".into(), serde_json::Value::String(t));
}
store.record(Event::NodeAdded {
plan_id: plan_id.clone(),
node_id: node_id.clone(),
kind,
params,
targets,
prompt_excerpt: prompt,
ts: Utc::now(),
})?;
for from in &needs {
store.record(Event::EdgeAdded {
plan_id: plan_id.clone(),
from_node: NodeId::new(from.clone()),
to_node: node_id.clone(),
ts: Utc::now(),
})?;
}
store.funnel.promote_ready();
println!("{}", node_id.as_str());
}
FunnelOp::Link { plan_id, from, to } => {
store.record(Event::EdgeAdded {
plan_id: PlanId::new(plan_id),
from_node: NodeId::new(from),
to_node: NodeId::new(to),
ts: Utc::now(),
})?;
store.funnel.promote_ready();
println!("ok");
}
FunnelOp::Next => {
store.funnel.promote_ready();
let next = funnel::topo_ready(&mut store.funnel);
println!("{}", serde_json::to_string_pretty(&next)?);
}
FunnelOp::Status { plan_id, node_id, status, why } => {
let st = match status.as_str() {
"ready" => NodeStatus::Ready,
"active" | "in_progress" => NodeStatus::InProgress,
"blocked" => NodeStatus::Blocked,
"done" => NodeStatus::Done,
"failed" | "abandoned" => NodeStatus::Failed,
other => bail!("unknown status {other:?}; expected ready|active|blocked|done|failed"),
};
store.record(Event::NodeStatusChanged {
plan_id: PlanId::new(plan_id),
node_id: NodeId::new(node_id),
status: st,
why,
ts: Utc::now(),
})?;
store.funnel.promote_ready();
println!("ok");
}
FunnelOp::Show => {
let f = &store.funnel;
println!("ideas: {}, plans: {}", f.ideas.len(), f.plans.len());
for (iid, idea) in &f.ideas {
println!(" {} [{}] {}", iid.as_str(), idea.source, idea.text);
}
for (pid, plan) in &f.plans {
println!(
" {} (idea {}) [{:?}] {} — {} nodes, {} edges",
pid.as_str(),
plan.idea_id.as_str(),
plan.status,
plan.summary,
plan.nodes.len(),
plan.edges.len(),
);
for (nid, n) in &plan.nodes {
let title = n.params.get("title").and_then(|v| v.as_str()).unwrap_or("");
println!(" {} [{:?}] {} {}", nid.as_str(), n.status, n.kind, title);
}
}
}
FunnelOp::Demo { .. } | FunnelOp::Nuke { .. } => unreachable!(),
}
Ok(())
}
fn run_map(repo: &str, loaded: &Loaded) -> Result<()> {
let _ = repo_or_err(loaded, repo)?; let warehouse_root = iceberg_warehouse_root(loaded);
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(&warehouse_root)
.with_context(|| format!("open warehouse at {}", warehouse_root.display()))?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, repo);
let res = nornir::knowledge::scan_all(&repo_root, repo)?;
wh.append_symbol_scan(&res.symbols)?;
wh.append_git_heat_scan(&res.git)?;
println!(
"scan: symbols={} calls={} files={} (symbol_snapshot={} git_snapshot={})",
res.symbols.symbols.len(),
res.symbols.calls.len(),
res.git.files.len(),
res.symbols.snapshot_id,
res.git.snapshot_id,
);
let index_dir = cache_index_dir(loaded);
let repos: Vec<String> = loaded.nornir.repo.keys().cloned().collect();
let idx = index::Index::open_at(&loaded.workspace_root, &index_dir)?.with_repo_scope(repos);
let stats = idx.build()?;
println!(
"index: scanned={} added={} updated={} unchanged={}",
stats.scanned, stats.added, stats.updated, stats.skipped_unchanged,
);
let (sha, branch) =
read_git_head(&repo_root).unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
let workspace = workspace_name(loaded);
let snap = nornir::index::snapshot::snapshot_to_iceberg(
&wh, &workspace, repo, &sha, &branch, &index_dir,
)?;
println!(
"✓ mapped {} — snapshot {} ({} blob(s), {} bytes, sha={})",
repo,
snap.snapshot_id,
snap.blob_count,
snap.total_bytes,
&snap.git_sha[..snap.git_sha.len().min(12)],
);
map_vectors(&wh, &workspace, repo, &repo_root, &sha, &branch)?;
Ok(())
}
#[cfg(all(feature = "vector", any(feature = "embed-tract", feature = "embed-ort")))]
fn map_vectors(
wh: &nornir::warehouse::iceberg::IcebergWarehouse,
workspace: &str,
repo: &str,
repo_root: &std::path::Path,
sha: &str,
branch: &str,
) -> Result<()> {
use nornir::vector::store;
let files = store::collect_rust_sources(repo_root);
if files.is_empty() {
return Ok(());
}
let embedder = load_vector_embedder()?;
let opts = nornir::vector::chunk::ChunkOptions::default();
let snap = store::index_repo(
wh,
&store::RepoRef { workspace, repo, git_sha: sha, branch, complete: true },
&files,
&opts,
&*embedder,
)?;
println!(
"vectors: snapshot {} — {} occurrences, {} new vector(s) embedded ({})",
snap.snapshot_id, snap.occurrences, snap.new_vectors, vector_backend_and_model(),
);
Ok(())
}
#[cfg(not(all(feature = "vector", any(feature = "embed-tract", feature = "embed-ort"))))]
fn map_vectors(
_wh: &nornir::warehouse::iceberg::IcebergWarehouse,
_workspace: &str,
_repo: &str,
_repo_root: &std::path::Path,
_sha: &str,
_branch: &str,
) -> Result<()> {
println!("vectors: skipped (build without embedder; `--features embed-tract` or `embed-ort` to enable)");
Ok(())
}
fn capture_dwarf(
wh: &nornir::warehouse::iceberg::IcebergWarehouse,
workspace: &str,
repo: &str,
repo_root: &std::path::Path,
sha: &str,
branch: &str,
workspace_root: &std::path::Path,
warehouse_root: &std::path::Path,
) -> Option<String> {
use cargo_metadata::MetadataCommand;
let meta = match MetadataCommand::new().current_dir(repo_root).no_deps().exec() {
Ok(m) => m,
Err(e) => {
println!(" 🔬 dwarf: ⚠ cargo metadata failed: {e}");
return None;
}
};
let mut bins: Vec<String> = Vec::new();
for p in &meta.packages {
for t in &p.targets {
if t.is_bin() && !bins.contains(&t.name) {
bins.push(t.name.clone());
}
}
}
if bins.is_empty() {
return None;
}
let target_dir = meta.target_directory.as_std_path();
let mut first: Option<String> = None;
for bin in &bins {
let cands = [target_dir.join("release").join(bin), target_dir.join("debug").join(bin)];
let Some(path) = cands.iter().find(|p| p.is_file()) else {
continue; };
let syms = match nornir::introspect::artifact::extract_symbols(path, workspace_root) {
Ok(s) if !s.is_empty() => s,
Ok(_) => continue, Err(e) => {
println!(" 🔬 dwarf: ⚠ {bin}: {e:#}");
continue;
}
};
let calls = nornir::introspect::callgraph_dwarf::extract_callgraph(path, workspace_root)
.unwrap_or_default();
let facts = nornir::introspect::persist::DwarfFacts { symbols: syms, calls };
let cache_dir = warehouse_root
.parent()
.unwrap_or(warehouse_root)
.join("cache/dwarf")
.join(repo)
.join(bin);
match nornir::introspect::persist::snapshot_facts(
wh, workspace, repo, sha, branch, &facts, &cache_dir,
) {
Ok(snap) => {
println!(
" 🔬 dwarf: {bin} → {} symbol(s), {} edge(s), snapshot {}",
facts.symbols.len(),
facts.calls.len(),
snap.snapshot_id,
);
first.get_or_insert(snap.snapshot_id.to_string());
}
Err(e) => println!(" 🔬 dwarf: ⚠ {bin}: snapshot failed: {e:#}"),
}
}
first
}
fn run_knowledge(op: KnowledgeOp, loaded: &Loaded) -> Result<()> {
match op {
KnowledgeOp::Scan { repo, no_save } => {
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &repo);
let res = nornir::knowledge::scan_all(&repo_root, &repo)?;
println!(
"ok: symbols={} calls={} features={} files={}",
res.symbols.symbols.len(),
res.symbols.calls.len(),
res.symbols.features.len(),
res.git.files.len(),
);
let mut top = res.git.files.iter().collect::<Vec<_>>();
top.sort_by_key(|r| -r.commits_total);
for r in top.iter().take(10) {
println!(
" {:60} commits={} 30d={} authors={}",
r.file, r.commits_total, r.commits_30d, r.authors_total
);
}
if !no_save {
let warehouse_root = loaded.warehouse_root();
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(&warehouse_root)
.with_context(|| format!("open warehouse at {}", warehouse_root.display()))?;
wh.append_symbol_scan(&res.symbols)?;
wh.append_git_heat_scan(&res.git)?;
println!("persisted: symbol_snapshot={} git_snapshot={}",
res.symbols.snapshot_id, res.git.snapshot_id);
}
}
KnowledgeOp::Query { repo, kind, arg, to, limit } => {
if let Some(server) = server_target() {
return knowledge_query_remote(&server, &repo, &kind, &arg, to.as_deref(), limit);
}
let warehouse_root = loaded.warehouse_root();
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(&warehouse_root)
.with_context(|| format!("open warehouse at {}", warehouse_root.display()))?;
let view = nornir::knowledge::query::load_latest(&wh, &repo)?;
match kind.as_str() {
"callers" => {
let hits = view.callers_of(&arg);
println!("callers of `{arg}` ({}):", hits.len());
for c in hits.iter().take(limit) {
println!(" {} → {} [{}:{}]", c.caller_path, c.callee_ident, c.file, c.line);
}
}
"callees" => {
let hits = view.callees_of(&arg);
println!("callees of `{arg}` ({}):", hits.len());
for c in hits.iter().take(limit) {
println!(" {} → {} [{}:{}]", c.caller_path, c.callee_ident, c.file, c.line);
}
}
"defined-in" => {
let hits = view.defined_in(&arg);
println!("symbols defined in `*{arg}` ({}):", hits.len());
for s in hits.iter().take(limit) {
println!(" {:8} {}::{} [{}:{}]", s.item_kind, s.module_path, s.item_name, s.file, s.line);
}
}
"lookup" => {
let hits = view.symbol_lookup(&arg, limit);
println!("symbols matching `{arg}` ({} shown):", hits.len());
for s in &hits {
let sig = s.signature.as_deref().unwrap_or("");
println!(" {:8} {} [{}:{}] {}", s.item_kind, s.item_name, s.file, s.line, sig);
}
}
"path" => {
let to = to.as_deref()
.context("`path` query needs a target: pass --to <function>")?;
match view.call_path(&arg, to) {
Some(p) => {
println!("call path `{arg}` → `{to}` ({} hops):", p.len().saturating_sub(1));
println!(" {}", p.join(" → "));
}
None => println!("no call path from `{arg}` to `{to}`"),
}
}
other => bail!("unknown query kind `{other}` (want: callers|callees|defined-in|lookup|path)"),
}
}
}
Ok(())
}
fn load_config(explicit: Option<&Path>) -> Result<Loaded> {
if let Some(p) = explicit {
return config::load_explicit(p);
}
if let Some(p) = std::env::var_os("NORNIR_CONFIG") {
return config::load_explicit(Path::new(&p));
}
let cwd = std::env::current_dir()?;
config::discover(&cwd)
}
fn sibling_bin(name: &str) -> Result<PathBuf> {
let exe = std::env::current_exe().context("locate current executable")?;
Ok(exe.parent().unwrap_or_else(|| Path::new(".")).join(name))
}
#[cfg(unix)]
fn install_binary(src: &Path, dst: &Path) -> Result<()> {
use std::os::unix::fs::PermissionsExt;
let tmp = dst.with_extension("new");
std::fs::copy(src, &tmp)
.with_context(|| format!("copy {} -> {}", src.display(), tmp.display()))?;
std::fs::set_permissions(&tmp, std::fs::Permissions::from_mode(0o755))?;
std::fs::rename(&tmp, dst)
.with_context(|| format!("install {} -> {}", src.display(), dst.display()))?;
Ok(())
}
#[cfg(unix)]
fn run_serve(a: &ServeArgs) -> Result<()> {
use std::os::unix::process::CommandExt;
let bin = match &a.server_bin {
Some(p) => p.clone(),
None => sibling_bin("nornir-server")?,
};
if !bin.exists() {
bail!(
"nornir-server not found at {} — build it with `cargo build --features server`, \
or pass --server-bin",
bin.display()
);
}
let mut cmd = std::process::Command::new(&bin);
if let Some(c) = &a.config {
cmd.env("NORNIR_CONFIG", c);
}
if let Some(ad) = &a.addr {
cmd.env("NORNIR_SERVER_ADDR", ad);
}
if let Some(t) = &a.token {
cmd.env("NORNIR_SERVER_TOKEN", t);
}
eprintln!("nornir: exec {}", bin.display());
Err(anyhow!("exec {}: {}", bin.display(), cmd.exec())) }
#[cfg(not(unix))]
fn run_serve(_a: &ServeArgs) -> Result<()> {
bail!("`nornir serve` is only supported on unix")
}
#[cfg(unix)]
#[derive(Debug, PartialEq, Eq)]
enum VizMode {
Thin,
Fat,
EmptyLocal,
}
fn viz_mode(
server_env_set: bool,
warehouse_set: bool,
has_config: bool,
server_reachable: bool,
) -> VizMode {
if server_env_set {
VizMode::Thin
} else if warehouse_set || has_config {
VizMode::Fat
} else if server_reachable {
VizMode::Thin
} else {
VizMode::EmptyLocal
}
}
#[cfg(unix)]
fn viz_local_server_reachable() -> bool {
let addr: std::net::SocketAddr = "127.0.0.1:7878".parse().unwrap();
std::net::TcpStream::connect_timeout(&addr, std::time::Duration::from_millis(300)).is_ok()
}
#[cfg(test)]
mod viz_mode_tests {
use super::{viz_mode, VizMode};
#[test]
fn viz_mode_covers_all_start_permutations() {
assert_eq!(viz_mode(true, false, false, false), VizMode::Thin);
assert_eq!(viz_mode(true, true, true, true), VizMode::Thin);
assert_eq!(viz_mode(false, true, false, true), VizMode::Fat);
assert_eq!(viz_mode(false, false, true, true), VizMode::Fat);
assert_eq!(viz_mode(false, false, true, false), VizMode::Fat);
assert_eq!(viz_mode(false, false, false, true), VizMode::Thin);
assert_eq!(viz_mode(false, false, false, false), VizMode::EmptyLocal);
}
}
fn run_viz(a: &VizArgs, config: Option<&Path>) -> Result<()> {
use std::os::unix::process::CommandExt;
let bin = match &a.viz_bin {
Some(p) => p.clone(),
None => {
let sib = sibling_bin("urdr-threads")?;
if sib.exists() {
sib
} else {
PathBuf::from("urdr-threads") }
}
};
let mut cmd = std::process::Command::new(&bin);
if let Some(s) = &a.server {
let url = if s.starts_with("http://") || s.starts_with("https://") {
s.clone()
} else {
format!("http://{s}")
};
cmd.env("NORNIR_SERVER", &url);
if let Some(t) = &a.token {
cmd.env("NORNIR_SERVER_TOKEN", t);
} else if std::env::var_os("NORNIR_SERVER_TOKEN").is_none() {
if let Ok(tok) = std::fs::read_to_string(nornir::config::nornir_home().join("token")) {
let tok = tok.trim();
if !tok.is_empty() {
cmd.env("NORNIR_SERVER_TOKEN", tok);
}
}
}
eprintln!("nornir viz: --server {url} → thin client");
}
let server_env_set = a.server.is_some() || std::env::var_os("NORNIR_SERVER").is_some();
let warehouse_set = a.warehouse.is_some();
if !server_env_set && !warehouse_set {
let has_config = config.is_some()
|| std::env::current_dir()
.ok()
.and_then(|cwd| nornir::config::discover(&cwd).ok())
.is_some();
let server_reachable = !has_config && viz_local_server_reachable();
match viz_mode(false, false, has_config, server_reachable) {
VizMode::Thin => {
cmd.env("NORNIR_SERVER", "http://127.0.0.1:7878");
if std::env::var_os("NORNIR_SERVER_TOKEN").is_none() {
if let Ok(tok) =
std::fs::read_to_string(nornir::config::nornir_home().join("token"))
{
let tok = tok.trim();
if !tok.is_empty() {
cmd.env("NORNIR_SERVER_TOKEN", tok);
}
}
}
eprintln!(
"nornir viz: no local workspace config → thin client → http://127.0.0.1:7878"
);
}
VizMode::EmptyLocal => {
eprintln!(
"nornir viz: no workspace config and no server on :7878 — opening an empty \
local view. `cd` into a `workspace_<name>/` (fat) or start the server (thin)."
);
}
VizMode::Fat => {}
}
}
if let Some(c) = config {
cmd.arg("--config").arg(c);
}
if let Some(w) = &a.workspace {
cmd.arg("--workspace").arg(w);
}
if let Some(wh) = &a.warehouse {
cmd.arg("--warehouse").arg(wh);
}
eprintln!("nornir: exec {}", bin.display());
Err(anyhow!(
"exec {}: {} — is the viz built? (`cargo build --features viz`, or \
`cargo install nornir --features viz`)",
bin.display(),
cmd.exec()
))
}
#[cfg(not(unix))]
fn run_viz(_a: &VizArgs, _config: Option<&Path>) -> Result<()> {
bail!("`nornir viz` is only supported on unix")
}
fn run_install(op: &InstallOp) -> Result<()> {
match op {
InstallOp::Systemd(a) => run_install_systemd(a),
InstallOp::Uninstall(a) => run_uninstall(a),
}
}
#[cfg(unix)]
fn run_uninstall(a: &UninstallArgs) -> Result<()> {
if unsafe { geteuid() } != 0 {
bail!("`nornir install uninstall` must run as root. Re-run with sudo.");
}
let _ = std::process::Command::new("systemctl").args(["disable", "--now", "nornir.service"]).status();
let _ = std::process::Command::new("systemctl").args(["reset-failed", "nornir.service"]).status();
let removed = |p: &str| {
let existed = Path::new(p).exists();
let _ = std::fs::remove_file(p);
existed
};
let unit = removed("/etc/systemd/system/nornir.service");
let _ = std::fs::remove_dir_all("/etc/nornir");
systemctl(&["daemon-reload"])?;
println!("✓ uninstalled nornir systemd service");
println!(" unit : {}", if unit { "removed" } else { "absent" });
println!(" env : /etc/nornir removed");
if a.purge {
for bin in ["nornir-server", "nornir", "urdr-threads"] {
let _ = std::fs::remove_file(format!("/usr/local/bin/{bin}"));
}
let _ = std::fs::remove_dir_all(NORNIR_HOME);
let _ = std::process::Command::new("userdel").arg(&a.user).status();
println!(" purged : /usr/local/bin/{{nornir,nornir-server,urdr-threads}}, {NORNIR_HOME}, user `{}`", a.user);
} else {
println!(" kept : {NORNIR_HOME} (data + key), /usr/local/bin binaries, user `{}`", a.user);
println!(" (use --purge to remove those too)");
}
Ok(())
}
#[cfg(not(unix))]
fn run_uninstall(_a: &UninstallArgs) -> Result<()> {
bail!("`nornir install uninstall` is linux-only")
}
const NORNIR_HOME: &str = "/home/nornir";
fn nornir_ssh_dir() -> PathBuf {
if let Some(d) = std::env::var_os("NORNIR_SSH_DIR") {
return PathBuf::from(d);
}
#[cfg(unix)]
{
let sys = Path::new(NORNIR_HOME).join(".ssh");
if sys.exists() || unsafe { geteuid() } == 0 {
return sys;
}
}
if let Some(home) = std::env::var_os("HOME") {
return Path::new(&home).join(".nornir/ssh");
}
PathBuf::from(".nornir/ssh")
}
fn generate_keypair(dir: &Path) -> Result<bool> {
let priv_path = dir.join("id_ed25519");
let pub_path = dir.join("id_ed25519.pub");
if priv_path.exists() {
return Ok(false);
}
std::fs::create_dir_all(dir).with_context(|| format!("create {}", dir.display()))?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(dir, std::fs::Permissions::from_mode(0o700)).ok();
}
let host = std::fs::read_to_string("/etc/hostname")
.ok()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| "localhost".to_string());
let mut key = ssh_key::PrivateKey::random(&mut rand_core::OsRng, ssh_key::Algorithm::Ed25519)
.context("generate ed25519 key")?;
key.set_comment(format!("nornir@{host}"));
let openssh = key
.to_openssh(ssh_key::LineEnding::LF)
.context("encode private key (OpenSSH)")?;
std::fs::write(&priv_path, openssh.as_bytes())
.with_context(|| format!("write {}", priv_path.display()))?;
let pubtxt = key
.public_key()
.to_openssh()
.context("encode public key (OpenSSH)")?;
std::fs::write(&pub_path, format!("{pubtxt}\n"))
.with_context(|| format!("write {}", pub_path.display()))?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&priv_path, std::fs::Permissions::from_mode(0o600))?;
std::fs::set_permissions(&pub_path, std::fs::Permissions::from_mode(0o644))?;
}
Ok(true)
}
fn run_key(op: &KeyOp) -> Result<()> {
let dir = nornir_ssh_dir();
let pub_path = dir.join("id_ed25519.pub");
match op {
KeyOp::Path => println!("{}", pub_path.display()),
KeyOp::Generate => {
let created = generate_keypair(&dir)?;
println!(
"{} {}",
if created { "✓ generated" } else { "• already present:" },
pub_path.display()
);
print!("{}", std::fs::read_to_string(&pub_path)?);
}
KeyOp::Show => {
if !pub_path.exists() {
bail!(
"no public key at {} — run `nornir key generate` (or \
`nornir install systemd`, which creates one)",
pub_path.display()
);
}
print!("{}", std::fs::read_to_string(&pub_path)?);
}
}
Ok(())
}
fn server_root() -> PathBuf {
nornir::config::registry_root()
}
fn run_security(cmd: &SecurityCmd) -> Result<()> {
match cmd {
SecurityCmd::Scan { repo, out } => {
#[cfg(any(feature = "mcp", feature = "server"))]
if let Some(server) = server_target() {
use pb_client::mimir_client::MimirClient;
use pb_client::RepoOnly;
let repo_name = repo
.file_name()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|| repo.display().to_string());
let json = on_server(&server, move |channel, bearer| async move {
let mut c = MimirClient::with_interceptor(channel, auth_interceptor(bearer));
let r = c
.security_scan(RepoOnly { repo: repo_name })
.await
.map_err(|e| anyhow!("Mimir.SecurityScan RPC failed: {e}"))?;
Ok(r.into_inner().json)
})?;
let v: serde_json::Value = serde_json::from_str(&json)?;
println!("repo {}", v["repo"].as_str().unwrap_or(""));
println!("components {} (deep, server-side)", v["components"]);
println!("cache {} hit / {} miss", v["cache_hits"], v["cache_misses"]);
let vulns = v["vulns"].as_array().cloned().unwrap_or_default();
println!("vulnerabilities {} across {} crate(s)", vulns.len(), vulns.len());
for x in &vulns {
let ids = x["ids"]
.as_array()
.map(|a| a.iter().filter_map(|s| s.as_str()).collect::<Vec<_>>().join(", "))
.unwrap_or_default();
println!(" ⚠ {} {}: {}", x["crate"].as_str().unwrap_or(""), x["version"].as_str().unwrap_or(""), ids);
}
std::process::exit(vulns.len().min(120) as i32);
}
let rep = nornir::security::scan(repo)?;
println!("repo {}", rep.repo);
println!("components {} crates (deep, incl. transitive)", rep.components.len());
println!("vulnerabilities {} across {} crate(s)", rep.vuln_count(), rep.vulns.len());
for v in &rep.vulns {
println!(" ⚠ {} {}: {}", v.crate_name, v.version, v.ids.join(", "));
}
let top: Vec<String> =
rep.license_tally().into_iter().take(6).map(|(k, n)| format!("{k}×{n}")).collect();
println!("licenses {}", top.join(", "));
if let Some(o) = out {
std::fs::write(o, serde_json::to_string_pretty(&rep.to_cyclonedx())?)?;
println!("CycloneDX SBOM {}", o.display());
}
std::process::exit(rep.vulns.len().min(120) as i32);
}
SecurityCmd::UpdateDb => {
let path = nornir::security::update_advisory_db()?;
let n = std::fs::read_dir(path.join("crates")).map(|d| d.flatten().count()).unwrap_or(0);
println!(
"advisory-db ready at {} ({n} crate dirs) — offline (airgap) vuln matching enabled",
path.display()
);
Ok(())
}
}
}
fn run_workspace(op: &WorkspaceOp) -> Result<()> {
#[cfg(any(feature = "mcp", feature = "server"))]
if let Some(server) = server_target() {
return run_workspace_remote(&server, op);
}
use nornir::registry::{Mode, Registry, Workspace};
let reg = Registry::open(&server_root())?;
match op {
WorkspaceOp::Ls => {
let all = reg.list()?;
if all.is_empty() {
println!("(no workspaces registered — `nornir workspace register …`)");
return Ok(());
}
println!("{:<20} {:<10} {:>7} {}", "NAME", "MODE", "MEMBERS", "LAST SYNCED");
for ws in all {
let last = ws
.members
.iter()
.map(|m| m.last_synced.as_str())
.filter(|s| !s.is_empty())
.max()
.unwrap_or("never");
println!(
"{:<20} {:<10} {:>7} {}",
ws.name,
ws.mode.as_str(),
ws.members.len(),
last
);
}
}
WorkspaceOp::Show { name } => {
let ws = reg
.get(name)?
.ok_or_else(|| anyhow!("no workspace `{name}` in the registry"))?;
println!("{}", serde_json::to_string_pretty(&ws)?);
}
WorkspaceOp::Register {
name,
descriptor,
monitored,
external,
poll,
} => {
if *monitored && *external {
bail!("--monitored and --external are mutually exclusive");
}
let mode = if *monitored {
Mode::Monitored
} else if *external {
Mode::External
} else {
Mode::Pushed
};
let created_at = reg.get(name)?.map(|w| w.created_at);
let ws = Workspace::new(
name.clone(),
descriptor.clone(),
mode,
poll.clone().unwrap_or_default(),
created_at,
);
reg.upsert(&ws)?;
println!(
"✓ registered `{}` ({}, {} member(s)) → {}/{}/{{git,builds}}",
ws.name,
ws.mode.as_str(),
ws.members.len(),
server_root().display(),
ws.name
);
}
WorkspaceOp::Add {
name,
descriptor,
monitored,
external,
poll,
} => {
if *monitored && *external {
bail!("--monitored and --external are mutually exclusive");
}
let mode = if *monitored {
Mode::Monitored
} else if *external {
Mode::External
} else {
Mode::Pushed
};
let created_at = reg.get(name)?.map(|w| w.created_at);
let ws = Workspace::new(
name.clone(),
descriptor.clone(),
mode,
poll.clone().unwrap_or_default(),
created_at,
);
let n = ws.members.len();
reg.upsert(&ws)?;
println!("① registered `{name}` ({n} member(s))");
println!("② fetching {n} member(s)…");
nornir::monitor::fetch_workspace(®, &server_root(), name)
.with_context(|| format!("fetch members of `{name}`"))?;
println!("③ building warehouse…");
let snap = nornir::monitor::republish(®, &server_root(), name, None)
.with_context(|| format!("build `{name}` warehouse"))?;
println!(
"✓ added `{name}` — populated now (snapshot {}) at {}/{name}/builds",
&snap[..snap.len().min(12)],
server_root().display(),
);
}
WorkspaceOp::Rm { name } => {
if reg.remove(name)? {
println!("✓ removed `{name}`");
} else {
bail!("no workspace `{name}` in the registry");
}
}
WorkspaceOp::Fetch { name } => {
let report = nornir::monitor::fetch_workspace(®, &server_root(), name)?;
let ws = reg
.get(name)?
.ok_or_else(|| anyhow!("no workspace `{name}` in the registry"))?;
for m in &ws.members {
if m.remote.is_empty() {
continue;
}
let sha = if m.last_seen_sha.is_empty() {
"—".to_string()
} else {
m.last_seen_sha[..m.last_seen_sha.len().min(12)].to_string()
};
let state = if report.changed.contains(&m.name) {
"(changed)"
} else if m.sync_state.starts_with("error") {
"(error)"
} else {
"(unchanged)"
};
println!(" {:<16} {:<12} {}", m.name, sha, state);
}
for (m, e) in &report.errors {
eprintln!(" {m}: {e}");
}
println!(
"✓ fetched `{}` — {} of {} member(s) changed",
ws.name,
report.changed.len(),
report.fetched
);
}
}
Ok(())
}
#[cfg(unix)]
fn passwd_ids(user: &str) -> Option<(u32, u32)> {
let passwd = std::fs::read_to_string("/etc/passwd").ok()?;
for line in passwd.lines() {
let mut f = line.split(':');
if f.next() == Some(user) {
let _ = f.next(); let uid = f.next()?.parse().ok()?;
let gid = f.next()?.parse().ok()?;
return Some((uid, gid));
}
}
None
}
#[cfg(unix)]
extern "C" {
fn chown(path: *const std::os::raw::c_char, owner: u32, group: u32) -> i32;
}
#[cfg(unix)]
fn chown_recursive(path: &Path, uid: u32, gid: u32) -> Result<()> {
use std::os::unix::ffi::OsStrExt;
let c = std::ffi::CString::new(path.as_os_str().as_bytes())
.with_context(|| format!("path has interior NUL: {}", path.display()))?;
if unsafe { chown(c.as_ptr(), uid, gid) } != 0 {
bail!("chown {} failed: {}", path.display(), std::io::Error::last_os_error());
}
if path.is_dir() {
for entry in std::fs::read_dir(path).with_context(|| format!("read {}", path.display()))? {
chown_recursive(&entry?.path(), uid, gid)?;
}
}
Ok(())
}
#[cfg(unix)]
extern "C" {
fn geteuid() -> u32;
}
#[cfg(unix)]
fn run_install_systemd(a: &SystemdArgs) -> Result<()> {
use std::os::unix::fs::PermissionsExt;
if unsafe { geteuid() } != 0 {
bail!(
"`nornir install systemd` must run as root — it creates the `{}` system user \
and writes /etc/systemd/system. Re-run with sudo.",
a.user
);
}
let server_src = match &a.server_bin {
Some(p) => p.clone(),
None => sibling_bin("nornir-server")?,
};
if !server_src.exists() {
bail!(
"nornir-server not found at {} — build with `cargo build --features server`, \
or pass --server-bin",
server_src.display()
);
}
let bindir = Path::new("/usr/local/bin");
std::fs::create_dir_all(bindir).ok();
let server_bin = bindir.join("nornir-server");
install_binary(&server_src, &server_bin)
.with_context(|| format!("install nornir-server to {}", server_bin.display()))?;
let mut installed = vec![server_bin.clone()];
if let Ok(cli) = std::env::current_exe() {
let dst = bindir.join("nornir");
if install_binary(&cli, &dst).is_ok() {
installed.push(dst);
}
}
if let Some(viz) = server_src.parent().map(|p| p.join("urdr-threads")) {
if viz.exists() {
let dst = bindir.join("urdr-threads");
if install_binary(&viz, &dst).is_ok() {
installed.push(dst);
}
}
}
let config: Option<PathBuf> = match &a.config {
Some(p) => Some(
p.canonicalize()
.with_context(|| format!("config not found: {}", p.display()))?,
),
None => None,
};
if config.is_none() && a.monitor.is_empty() && a.root.is_none() {
bail!("nothing to serve: pass --monitor <name>=<descriptor> (self-sync) and/or --config <nornir.toml> (served workspace)");
}
let unit_path = Path::new("/etc/systemd/system/nornir.service");
let env_path = Path::new("/etc/nornir/nornir.env");
let updating = unit_path.exists();
ensure_system_user(&a.user)?;
let ssh_dir = Path::new(NORNIR_HOME).join(".ssh");
std::fs::create_dir_all(NORNIR_HOME).context("create nornir state home")?;
let key_generated = generate_keypair(&ssh_dir)?;
let pub_path = ssh_dir.join("id_ed25519.pub");
let monitoring = !a.monitor.is_empty() || a.root.is_some();
let (effective_config, default_warehouse, monitor_entries) = if monitoring {
let staged_config = config.as_ref().map(|c| {
let dst = Path::new(NORNIR_HOME).join("server.toml");
let _ = std::fs::copy(c, &dst);
dst
});
let dw = nornir::config::nornir_home_from(Path::new(NORNIR_HOME)).join("warehouse");
std::fs::create_dir_all(&dw).ok();
let desc_dir = Path::new(NORNIR_HOME).join("descriptors");
std::fs::create_dir_all(&desc_dir).ok();
let mut entries = Vec::new();
for entry in &a.monitor {
let (name, descriptor) = entry
.split_once('=')
.ok_or_else(|| anyhow!("--monitor `{entry}` must be name=descriptor"))?;
let dp = Path::new(descriptor);
if dp.is_file() {
let staged = desc_dir.join(format!("{name}.toml"));
std::fs::copy(dp, &staged).with_context(|| {
format!("stage descriptor {} -> {}", dp.display(), staged.display())
})?;
entries.push(format!("{name}={}", staged.display()));
} else {
entries.push(entry.clone()); }
}
(staged_config, Some(dw), entries)
} else {
(config.clone(), None, Vec::new())
};
std::fs::create_dir_all("/etc/nornir").context("create /etc/nornir")?;
let existing_token = std::fs::read_to_string(env_path).ok().and_then(|s| {
s.lines()
.find_map(|l| l.strip_prefix("NORNIR_SERVER_TOKEN="))
.map(|t| t.trim().to_string())
.filter(|t| !t.is_empty())
});
let token_reused = existing_token.is_some();
let token = existing_token
.unwrap_or_else(|| format!("{}{}", uuid::Uuid::new_v4().simple(), uuid::Uuid::new_v4().simple()));
let mut env_body =
format!("NORNIR_SERVER_TOKEN={token}\nNORNIR_SERVER_ADDR={}\n", a.addr);
if let Some(cfg) = &effective_config {
env_body.push_str(&format!("NORNIR_CONFIG={}\n", cfg.display()));
}
if let Some(dw) = &default_warehouse {
env_body.push_str(&format!("NORNIR_WAREHOUSE={}\n", dw.display()));
let funnel = nornir::config::nornir_home_from(Path::new(NORNIR_HOME)).join("funnel");
std::fs::create_dir_all(&funnel).ok();
}
let monitor_root = if monitoring {
if a.root.is_some() {
eprintln!(
"note: --root is ignored — the registry root is home-derived \
(<home>/.nornir/workspaces). Set the service user's $HOME instead."
);
}
let root = nornir::config::nornir_home_from(Path::new(NORNIR_HOME)).join("workspaces");
std::fs::create_dir_all(&root).with_context(|| format!("create {}", root.display()))?;
if !monitor_entries.is_empty() {
env_body.push_str(&format!("NORNIR_MONITOR={}\n", monitor_entries.join(",")));
}
env_body.push_str(&format!("NORNIR_POLL={}\n", a.poll.as_deref().unwrap_or("60s")));
Some(root)
} else {
None
};
if let Some((uid, gid)) = passwd_ids(&a.user) {
chown_recursive(Path::new(NORNIR_HOME), uid, gid).context("chown nornir home")?;
if let Some(root) = &monitor_root {
if !root.starts_with(NORNIR_HOME) {
chown_recursive(root, uid, gid).ok();
}
}
}
std::fs::write(env_path, env_body).context("write /etc/nornir/nornir.env")?;
std::fs::set_permissions(env_path, std::fs::Permissions::from_mode(0o600))?;
let unit = format!(
"[Unit]\n\
Description=nornir server\n\
After=network-online.target\n\
Wants=network-online.target\n\n\
[Service]\n\
Type=simple\n\
User={user}\n\
Group={user}\n\
WorkingDirectory={home}\n\
Environment=HOME={home}\n\
EnvironmentFile=/etc/nornir/nornir.env\n\
ExecStart={bin}\n\
Restart=on-failure\n\
RestartSec=2\n\
NoNewPrivileges=true\n\n\
[Install]\n\
WantedBy=multi-user.target\n",
user = a.user,
home = NORNIR_HOME,
bin = server_bin.display(),
);
std::fs::write(unit_path, &unit).context("write unit file")?;
systemctl(&["daemon-reload"])?;
let _ = std::process::Command::new("systemctl")
.args(["reset-failed", "nornir.service"])
.status();
if updating {
systemctl(&["try-restart", "nornir.service"])?;
}
println!(
"✓ {} nornir systemd service",
if updating { "updated" } else { "installed" }
);
println!(" user : {} (system)", a.user);
println!(
" env file : /etc/nornir/nornir.env (mode 600; token {})",
if token_reused { "preserved" } else { "generated" }
);
println!(" unit : /etc/systemd/system/nornir.service");
println!(
" binaries : {}",
installed
.iter()
.map(|p| p.display().to_string())
.collect::<Vec<_>>()
.join(", ")
);
println!(
" config : {}",
config.as_ref().map(|c| c.display().to_string()).unwrap_or_else(|| "(none — monitor-only)".into())
);
if let Some(root) = &monitor_root {
println!(" monitor : root {} (poll {})", root.display(), a.poll.as_deref().unwrap_or("60s"));
if !a.monitor.is_empty() {
println!(" workspaces: {}", a.monitor.join(", "));
}
}
println!(
" ssh key : {} ({})",
pub_path.display(),
if key_generated { "generated" } else { "preserved" }
);
if let Ok(txt) = std::fs::read_to_string(&pub_path) {
println!();
println!(" add this public key as a deploy key on each monitored repo:");
println!(" {}", txt.trim());
}
println!();
if updating {
println!(" (running service was restarted to apply the update)");
println!(" status : systemctl status nornir · logs: journalctl -u nornir -f");
} else {
println!(" enable + start: systemctl enable --now nornir");
println!(" follow logs : journalctl -u nornir -f");
}
Ok(())
}
#[cfg(not(unix))]
fn run_install_systemd(_a: &SystemdArgs) -> Result<()> {
bail!("`nornir install systemd` is linux-only")
}
#[cfg(unix)]
fn ensure_system_user(user: &str) -> Result<()> {
let passwd = std::fs::read_to_string("/etc/passwd").unwrap_or_default();
let exists = passwd.lines().any(|l| l.split(':').next() == Some(user));
if !exists {
let status = std::process::Command::new("useradd")
.args([
"--system",
"--home-dir",
NORNIR_HOME,
"--shell",
"/usr/sbin/nologin",
user,
])
.status()
.context("run `useradd` (deploy command — must be on PATH)")?;
if !status.success() {
bail!("useradd failed for `{user}` (exit {:?})", status.code());
}
println!(" created system user `{user}` (home {NORNIR_HOME})");
} else {
println!(" user `{user}` already exists — keeping it");
}
std::fs::create_dir_all(NORNIR_HOME).context("create nornir home")?;
if let Some((uid, gid)) = passwd_ids(user) {
chown_recursive(Path::new(NORNIR_HOME), uid, gid).ok();
}
Ok(())
}
#[cfg(unix)]
fn systemctl(args: &[&str]) -> Result<()> {
let status = std::process::Command::new("systemctl")
.args(args)
.status()
.context("run `systemctl`")?;
if !status.success() {
bail!("`systemctl {}` failed (exit {:?})", args.join(" "), status.code());
}
Ok(())
}
fn run_guard(op: GuardOp, loaded: &Loaded) -> Result<()> {
if let Some(server) = server_target() {
match op {
GuardOp::Status => return guard_remote(&server, "status"),
GuardOp::Apply => return guard_remote(&server, "apply"),
GuardOp::Release => return guard_remote(&server, "release"),
GuardOp::Verify => {}
}
}
let forbidden = &loaded.nornir.guard.forbidden;
if let GuardOp::Verify = op {
let recorded = guard::read_manifest(&loaded.workspace_root)?;
let report = guard::verify(&loaded.workspace_root, &recorded);
println!("recorded_at: {}", recorded.recorded_at);
for v in &report {
if v.ok() {
println!("{:<8} {}", "ok", v.rel);
} else {
println!("{:<8} {} {:?}", "DRIFT", v.rel, v.drift);
}
}
guard::intact(&loaded.workspace_root, &recorded)?;
return Ok(());
}
let report = match op {
GuardOp::Status => guard::status(&loaded.workspace_root, forbidden),
GuardOp::Apply => guard::apply_and_record(&loaded.workspace_root, forbidden)?,
GuardOp::Verify => unreachable!("handled above"),
GuardOp::Release => {
if std::env::var_os("NORNIR_GUARD_UNLOCK_TOKEN").is_none() {
anyhow::bail!(
"guard release (unlock) requires the human-held NORNIR_GUARD_UNLOCK_TOKEN \
env var; refusing to chmod +w protected paths"
);
}
guard::release(&loaded.workspace_root, forbidden)?
}
};
println!("{:<10} {:<10} {:<10} path", "exists", "writable", "changed");
for s in &report {
println!(
"{:<10} {:<10} {:<10} {}",
yn(s.exists), yn(s.writable), yn(s.changed), s.path.display()
);
}
Ok(())
}
fn run_mimir(op: MimirOp, loaded: &Loaded) -> Result<()> {
use nornir::mimir;
#[cfg(any(feature = "mcp", feature = "server"))]
if let Some(server) = server_target() {
return run_mimir_remote(&server, &op);
}
let (g, _ws) = mimir::build_graph(loaded)?;
let print = |v: serde_json::Value| -> Result<()> {
println!("{}", serde_json::to_string_pretty(&v)?);
Ok(())
};
match op {
MimirOp::DepsOf { repo, transitive } => print(mimir::deps_of(&g, &repo, transitive)?)?,
MimirOp::DependentsOf { repo, transitive } => {
print(mimir::dependents_of(&g, &repo, transitive)?)?
}
MimirOp::Affected { repos } => print(mimir::affected_by_change(&g, &repos)?)?,
MimirOp::BuildOrder => print(mimir::build_order(&g)?)?,
MimirOp::DepPath { from, to } => print(mimir::dep_path(&g, &from, &to)?)?,
MimirOp::ExternalUsers { krate } => print(mimir::external_dep_users(&g, &krate))?,
MimirOp::Mermaid => println!("{}", mimir::mermaid(&g)),
MimirOp::Overview { repo } => {
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(&iceberg_warehouse_root(loaded))?;
print(mimir::repo_overview(&g, &wh, &repo)?)?;
}
MimirOp::Changed => {
bail!(
"`mimir changed` needs the server's release lineage — set NORNIR_SERVER \
to run it against a nornir-server (or use `release trace`/`release impacted-crates` locally)"
);
}
}
Ok(())
}
fn run_diagram(op: DiagramOp, loaded: &Loaded) -> Result<()> {
use nornir::viz::diagram;
let ws_of = |a: &DiagramArgs| -> String {
a.workspace
.clone()
.or_else(|| std::env::var("NORNIR_WORKSPACE").ok().filter(|s| !s.is_empty()))
.unwrap_or_else(|| "workspace_holger".to_string())
};
let workspace = match &op {
DiagramOp::Timeline(a)
| DiagramOp::LaneSummary(a)
| DiagramOp::Depgraph(a)
| DiagramOp::SnapshotEdges(a)
| DiagramOp::GateMatrix(a)
| DiagramOp::ReleaseVersions(a)
| DiagramOp::BenchCompare(a) => ws_of(a),
DiagramOp::BenchHistory { args, .. } => ws_of(args),
};
let tl = load_diagram_timeline(loaded, &workspace)?;
let out = match op {
DiagramOp::Timeline(_) => diagram::timeline_mermaid(&tl),
DiagramOp::LaneSummary(_) => diagram::lane_summary_table(&tl),
DiagramOp::Depgraph(_) => diagram::depgraph_mermaid(&tl),
DiagramOp::SnapshotEdges(_) => diagram::snapshot_edge_list(&tl),
DiagramOp::GateMatrix(_) => diagram::gate_matrix_table(&tl),
DiagramOp::ReleaseVersions(_) => diagram::release_versions_table(&tl),
DiagramOp::BenchHistory { limit, .. } => diagram::bench_history_table(&tl, limit),
DiagramOp::BenchCompare(_) => diagram::bench_compare_table(&tl),
};
println!("{out}");
Ok(())
}
fn load_diagram_timeline(loaded: &Loaded, workspace: &str) -> Result<nornir::viz::Timeline> {
#[cfg(any(feature = "mcp", feature = "server"))]
if let Some(server) = server_target() {
return diagram_timeline_remote(&server, workspace);
}
nornir::viz::load_timeline(&iceberg_warehouse_root(loaded), workspace)
}
#[cfg(any(feature = "mcp", feature = "server"))]
mod pb_client {
tonic::include_proto!("nornir.v1");
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn on_server<T, F, Fut>(server: &str, f: F) -> Result<T>
where
F: FnOnce(tonic::transport::Channel, tonic::metadata::MetadataValue<tonic::metadata::Ascii>) -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
let token = std::env::var("NORNIR_SERVER_TOKEN")
.context("NORNIR_SERVER is set; NORNIR_SERVER_TOKEN is required for the bearer auth")?;
let endpoint = if server.starts_with("http") { server.to_string() } else { format!("http://{server}") };
let bearer: tonic::metadata::MetadataValue<tonic::metadata::Ascii> =
format!("Bearer {token}").parse().context("build bearer metadata")?;
let rt = tokio::runtime::Runtime::new().context("tokio runtime for server call")?;
rt.block_on(async move {
let channel = tonic::transport::Channel::from_shared(endpoint.clone())
.with_context(|| format!("invalid NORNIR_SERVER url `{endpoint}`"))?
.connect()
.await
.with_context(|| format!("connect to nornir-server at {endpoint}"))?;
f(channel, bearer).await
})
}
fn server_target() -> Option<String> {
std::env::var("NORNIR_SERVER").ok().filter(|s| !s.is_empty())
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn client_workspace() -> Option<&'static str> {
use std::sync::OnceLock;
static WS: OnceLock<Option<String>> = OnceLock::new();
WS.get_or_init(|| {
if let Ok(w) = std::env::var("NORNIR_WORKSPACE") {
if !w.is_empty() {
return Some(w);
}
}
let cwd = std::env::current_dir().ok()?;
config::discover(&cwd).ok().and_then(|l| {
l.workspace_root.file_name().and_then(|s| s.to_str()).map(String::from)
})
})
.as_deref()
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn auth_interceptor(
bearer: tonic::metadata::MetadataValue<tonic::metadata::Ascii>,
) -> impl FnMut(tonic::Request<()>) -> std::result::Result<tonic::Request<()>, tonic::Status> + Clone {
move |mut req: tonic::Request<()>| {
req.metadata_mut().insert("authorization", bearer.clone());
if let Some(ws) = client_workspace() {
if let Ok(v) = ws.parse() {
req.metadata_mut().insert("nornir-workspace", v);
}
}
Ok(req)
}
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn submit_bench_remote(server: &str, repo: &str, run: &bench::BenchRun) -> Result<String> {
use pb_client::bench_client::BenchClient;
use pb_client::{BenchResult as PbResult, BenchRun as PbRun, Kvf, SubmitBenchRequest, TestOutcome as PbTest};
let repo = repo.to_string();
let pb_run = PbRun {
date: run.date.clone(),
timestamp: run.timestamp.clone().unwrap_or_default(),
version: run.version.clone(),
machine: run.machine.clone(),
cores: run.cores,
results: run.results.iter().map(|r| PbResult {
name: r.name.clone(),
metrics: r.metrics.iter()
.filter_map(|(k, v)| v.as_f64().map(|f| Kvf { key: k.clone(), value: f }))
.collect(),
}).collect(),
tests: run.tests.iter().map(|t| PbTest {
name: t.name.clone(),
passed: t.passed,
duration_ms: t.duration_ms.unwrap_or(0.0),
has_duration: t.duration_ms.is_some(),
message: t.message.clone().unwrap_or_default(),
}).collect(),
};
on_server(server, move |channel, bearer| async move {
let mut client = BenchClient::with_interceptor(channel, auth_interceptor(bearer));
let resp = client.submit(SubmitBenchRequest { repo, run: Some(pb_run) }).await
.map_err(|e| anyhow!("Bench.Submit RPC failed: {e}"))?;
Ok(resp.into_inner().run_id)
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn submit_bakeoff_remote(
server: &str,
rows: &[nornir::warehouse::agent_model_runs::AgentModelRunRow],
) -> Result<u32> {
use pb_client::telemetry_client::TelemetryClient;
use pb_client::{AgentModelRunPb, SubmitBakeoffRequest};
let runs: Vec<AgentModelRunPb> = rows
.iter()
.map(|r| AgentModelRunPb {
run_id: r.run_id.clone(),
ts_micros: r.ts_micros,
agent: r.agent.clone(),
model: r.model.clone(),
prompt_id: r.prompt_id.clone(),
prompt: r.prompt.clone(),
output: r.output.clone(),
latency_ms: r.latency_ms,
tokens_in: r.tokens_in,
tokens_out: r.tokens_out,
tokens_per_s: r.tokens_per_s,
score: r.score,
ok: r.ok,
error: r.error.clone().unwrap_or_default(),
cost_usd: r.cost_usd,
mcp_tool_calls: r.mcp_tool_calls,
})
.collect();
on_server(server, move |channel, bearer| async move {
let mut client = TelemetryClient::with_interceptor(channel, auth_interceptor(bearer));
let resp = client.submit_bakeoff(SubmitBakeoffRequest { runs }).await
.map_err(|e| anyhow!("Telemetry.SubmitBakeoff RPC failed: {e}"))?;
Ok(resp.into_inner().accepted)
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn submit_test_remote(
server: &str,
rows: &[nornir::warehouse::test_results::TestResultRow],
) -> Result<u32> {
use pb_client::telemetry_client::TelemetryClient;
use pb_client::{SubmitTestResultsRequest, TestResultPb};
let pb_rows: Vec<TestResultPb> = rows
.iter()
.map(|r| TestResultPb {
run_id: r.run_id.clone(),
repo: r.repo.clone(),
suite: r.suite.clone(),
test_name: r.test_name.clone(),
status: r.status.clone(),
duration_ms: r.duration_ms,
ts_micros: r.ts_micros,
message: r.message.clone(),
aspect: r.aspect.clone(),
metric: r.metric,
})
.collect();
on_server(server, move |channel, bearer| async move {
let mut client = TelemetryClient::with_interceptor(channel, auth_interceptor(bearer));
let resp = client.submit_test_results(SubmitTestResultsRequest { rows: pb_rows }).await
.map_err(|e| anyhow!("Telemetry.SubmitTestResults RPC failed: {e}"))?;
Ok(resp.into_inner().accepted)
})
}
fn persist_bakeoff(
loaded: &Loaded,
rows: &[nornir::warehouse::agent_model_runs::AgentModelRunRow],
) -> Result<()> {
use nornir::warehouse::agent_model_runs::append_agent_model_runs;
use nornir::warehouse::iceberg::IcebergWarehouse;
if let Some(server) = server_target() {
let n = submit_bakeoff_remote(&server, rows)?;
println!(" → submitted {n} bake-off row(s) to {server}");
} else {
let warehouse_root = loaded.warehouse_root();
let wh = IcebergWarehouse::open(&warehouse_root)
.with_context(|| format!("open iceberg warehouse at {}", warehouse_root.display()))?;
wh.block_on(append_agent_model_runs(&wh, rows))?;
}
Ok(())
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn list_repos_remote(server: &str) -> Result<Vec<String>> {
use pb_client::repos_client::ReposClient;
use pb_client::Empty;
on_server(server, move |channel, bearer| async move {
let mut client = ReposClient::with_interceptor(channel, auth_interceptor(bearer));
let resp = client.list(Empty {}).await.map_err(|e| anyhow!("Repos.List RPC failed: {e}"))?;
Ok(resp.into_inner().repos.into_iter().map(|r| r.name).collect())
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn run_workspace_remote(server: &str, op: &WorkspaceOp) -> Result<()> {
use pb_client::workspaces_client::WorkspacesClient;
use pb_client::{Empty, RegisterWorkspaceRequest, WorkspaceFetchRequest, WorkspaceName};
match op {
WorkspaceOp::Ls => {
let list = on_server(server, |channel, bearer| async move {
let mut c = WorkspacesClient::with_interceptor(channel, auth_interceptor(bearer));
let resp = c
.list(Empty {})
.await
.map_err(|e| anyhow!("Workspaces.List RPC failed: {e}"))?;
Ok(resp.into_inner().workspaces)
})?;
if list.is_empty() {
println!("(no workspaces registered on {server})");
return Ok(());
}
println!("{:<20} {:<10} {:>7} {}", "NAME", "MODE", "MEMBERS", "LAST SYNCED");
for ws in list {
let last = ws
.members
.iter()
.map(|m| m.last_synced.as_str())
.filter(|s| !s.is_empty())
.max()
.unwrap_or("never");
println!("{:<20} {:<10} {:>7} {}", ws.name, ws.mode, ws.members.len(), last);
}
}
WorkspaceOp::Show { name } => {
let name = name.clone();
let ws = on_server(server, move |channel, bearer| async move {
let mut c = WorkspacesClient::with_interceptor(channel, auth_interceptor(bearer));
let resp = c
.get(WorkspaceName { name })
.await
.map_err(|e| anyhow!("Workspaces.Get RPC failed: {e}"))?;
Ok(resp.into_inner())
})?;
println!("name: {}", ws.name);
println!("mode: {}", ws.mode);
println!("descriptor: {}", ws.descriptor);
println!("poll: {}", ws.poll);
println!("current_snapshot: {}", ws.current_snapshot);
println!("created_at: {}", ws.created_at);
println!("updated_at: {}", ws.updated_at);
for m in ws.members {
println!(
" - {:<16} remote={} ref={} sha={} synced={} state={}",
m.name, m.remote, m.git_ref, m.last_seen_sha, m.last_synced, m.sync_state
);
}
}
WorkspaceOp::Register { name, descriptor, monitored, external, poll } => {
if *monitored && *external {
bail!("--monitored and --external are mutually exclusive");
}
let mode = if *monitored {
"monitored"
} else if *external {
"external"
} else {
"pushed"
}
.to_string();
let (name, descriptor, poll) =
(name.clone(), descriptor.clone(), poll.clone().unwrap_or_default());
let ws = on_server(server, move |channel, bearer| async move {
let mut c = WorkspacesClient::with_interceptor(channel, auth_interceptor(bearer));
let resp = c
.register(RegisterWorkspaceRequest { name, descriptor, mode, poll })
.await
.map_err(|e| anyhow!("Workspaces.Register RPC failed: {e}"))?;
Ok(resp.into_inner())
})?;
println!(
"✓ registered `{}` ({}, {} member(s)) on {server}",
ws.name,
ws.mode,
ws.members.len()
);
}
WorkspaceOp::Add {
name,
descriptor,
monitored,
external,
poll,
} => {
run_workspace_remote(
server,
&WorkspaceOp::Register {
name: name.clone(),
descriptor: descriptor.clone(),
monitored: *monitored,
external: *external,
poll: poll.clone(),
},
)?;
run_workspace_remote(server, &WorkspaceOp::Fetch { name: name.clone() })?;
println!(
"✓ added `{name}` on {server} — fetched; the server republishes its \
warehouse from the change."
);
}
WorkspaceOp::Rm { name } => {
let name = name.clone();
on_server(server, move |channel, bearer| async move {
let mut c = WorkspacesClient::with_interceptor(channel, auth_interceptor(bearer));
c.remove(WorkspaceName { name })
.await
.map_err(|e| anyhow!("Workspaces.Remove RPC failed: {e}"))?;
Ok(())
})?;
println!("✓ removed");
}
WorkspaceOp::Fetch { name } => {
let name = name.clone();
let rep = on_server(server, move |channel, bearer| async move {
let mut c = WorkspacesClient::with_interceptor(channel, auth_interceptor(bearer));
let resp = c
.fetch(WorkspaceFetchRequest { name, force: false })
.await
.map_err(|e| anyhow!("Workspaces.Fetch RPC failed: {e}"))?;
Ok(resp.into_inner())
})?;
for e in &rep.errors {
eprintln!(" {e}");
}
println!(
"✓ fetched `{}` — {} of {} member(s) changed",
rep.workspace,
rep.changed.len(),
rep.fetched
);
}
}
Ok(())
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn knowledge_query_remote(
server: &str,
repo: &str,
kind: &str,
arg: &str,
to: Option<&str>,
limit: usize,
) -> Result<()> {
use pb_client::knowledge_client::KnowledgeClient;
use pb_client::{KnowledgeCallPathQuery, KnowledgeCallQuery, KnowledgeSymbolQuery};
let (repo, kind, arg) = (repo.to_string(), kind.to_string(), arg.to_string());
let to = to.map(|s| s.to_string());
let lim = limit as u32;
on_server(server, move |channel, bearer| async move {
let mut c = KnowledgeClient::with_interceptor(channel, auth_interceptor(bearer));
match kind.as_str() {
"callers" | "callees" => {
let q = KnowledgeCallQuery { repo, name: arg.clone(), limit: lim };
let calls = if kind == "callers" {
c.callers(q).await
} else {
c.callees(q).await
}
.map_err(|e| anyhow!("Knowledge.{kind} RPC failed: {e}"))?
.into_inner()
.calls;
println!("{kind} of `{arg}` ({}):", calls.len());
for c in &calls {
println!(" {} → {} [{}:{}]", c.caller_path, c.callee_ident, c.file, c.line);
}
}
"defined-in" | "lookup" => {
let q = KnowledgeSymbolQuery { repo, arg: arg.clone(), limit: lim };
let syms = if kind == "lookup" {
c.symbol_lookup(q).await
} else {
c.defined_in(q).await
}
.map_err(|e| anyhow!("Knowledge.{kind} RPC failed: {e}"))?
.into_inner()
.symbols;
println!("symbols ({}) for `{arg}`:", syms.len());
for s in &syms {
println!(" {:8} {}::{} [{}:{}]", s.item_kind, s.module_path, s.item_name, s.file, s.line);
}
}
"path" => {
let to = to.ok_or_else(|| anyhow!("`path` query needs a target: pass --to <fn>"))?;
let names = c
.call_path(KnowledgeCallPathQuery { repo, from: arg.clone(), to: to.clone() })
.await
.map_err(|e| anyhow!("Knowledge.CallPath RPC failed: {e}"))?
.into_inner()
.names;
if names.is_empty() {
println!("no call path from `{arg}` to `{to}`");
} else {
println!("call path `{arg}` → `{to}` ({} hops):", names.len().saturating_sub(1));
println!(" {}", names.join(" → "));
}
}
other => bail!("unknown knowledge kind `{other}`"),
}
Ok(())
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn search_remote(
server: &str,
query: &str,
corpus: Option<&str>,
repo: Option<&str>,
limit: usize,
) -> Result<()> {
use pb_client::search_client::SearchClient;
use pb_client::SearchRequest;
let req = SearchRequest {
query: query.to_string(),
corpus: corpus.unwrap_or("").to_string(),
repo: repo.unwrap_or("").to_string(),
limit: limit as u32,
};
on_server(server, move |channel, bearer| async move {
let mut c = SearchClient::with_interceptor(channel, auth_interceptor(bearer));
let hits = c.query(req).await.map_err(|e| anyhow!("Search.Query RPC failed: {e}"))?.into_inner().hits;
for h in &hits {
println!(
"{:>6.2} [{}/{}] {}\n {}",
h.score,
h.corpus,
if h.repo.is_empty() { "-" } else { &h.repo },
h.path,
h.snippet
);
}
Ok(())
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn guard_remote(server: &str, op: &str) -> Result<()> {
use pb_client::guard_client::GuardClient;
use pb_client::Empty;
let op = op.to_string();
on_server(server, move |channel, bearer| async move {
let mut c = GuardClient::with_interceptor(channel, auth_interceptor(bearer));
let report = match op.as_str() {
"status" => c.status(Empty {}).await,
"apply" => c.apply(Empty {}).await,
"release" => c.release(Empty {}).await,
other => return Err(anyhow!("guard `{other}` is not server-routable")),
}
.map_err(|e| anyhow!("Guard.{op} RPC failed: {e}"))?
.into_inner();
println!("{:<10} {:<10} {:<10} path", "exists", "writable", "changed");
for p in &report.paths {
println!("{:<10} {:<10} {:<10} {}", yn(p.exists), yn(p.writable), yn(p.changed), p.path);
}
Ok(())
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn index_stats_remote(server: &str) -> Result<()> {
use pb_client::index_client::IndexClient;
use pb_client::Empty;
on_server(server, move |channel, bearer| async move {
let mut c = IndexClient::with_interceptor(channel, auth_interceptor(bearer));
let s = c.stats(Empty {}).await.map_err(|e| anyhow!("Index.Stats RPC failed: {e}"))?.into_inner();
println!("total: {}", s.total);
for kv in &s.by_corpus {
println!(" {:<14} {}", kv.key, kv.value);
}
Ok(())
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn gate_remote(server: &str, name: &str, repo: &str) -> Result<()> {
use pb_client::release_client::ReleaseClient;
use pb_client::RepoOnly;
let (name, repo) = (name.to_string(), repo.to_string());
on_server(server, move |channel, bearer| async move {
let mut c = ReleaseClient::with_interceptor(channel, auth_interceptor(bearer));
let req = || RepoOnly { repo: repo.clone() };
if name == "all" {
let res = c.gate_all(req()).await.map_err(|e| anyhow!("Release.GateAll RPC failed: {e}"))?.into_inner();
println!("=== gate-all: {} ===", res.repo);
for p in &res.passed {
println!(" ✓ {p}");
}
for f in &res.failed {
println!(" ✗ {}: {}", f.name, f.error);
}
if !res.failed.is_empty() {
bail!("{} gate(s) failed", res.failed.len());
}
println!("{} gate(s) passed", res.passed.len());
return Ok(());
}
let res = match name.as_str() {
"path_patches" => c.gate_path_patches(req()).await,
"nexus_floor" => c.gate_nexus_floor(req()).await,
"no_regression" => c.gate_no_regression(req()).await,
"docs_fresh" => c.gate_docs_fresh(req()).await,
other => return Err(anyhow!(
"gate `{other}` is not exposed by the server (available: \
path_patches, nexus_floor, no_regression, docs_fresh, all)"
)),
}
.map_err(|e| anyhow!("Release.{name} RPC failed: {e}"))?
.into_inner();
if res.status == "pass" {
let v = if res.version.is_empty() { String::new() } else { format!(" (v{})", res.version) };
println!("✓ {} pass{v}", res.gate);
Ok(())
} else {
bail!("✗ {} fail: {}", res.gate, res.message)
}
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn introspect_symbols_remote(server: &str, binary: &str, kind: &str, arg: &str, limit: usize) -> Result<()> {
use pb_client::introspect_client::IntrospectClient;
use pb_client::{BinaryOnly, DefinedInRequest, SymbolLookupRequest};
let (binary, kind, arg) = (binary.to_string(), kind.to_string(), arg.to_string());
let lim = limit as u32;
on_server(server, move |channel, bearer| async move {
let mut c = IntrospectClient::with_interceptor(channel, auth_interceptor(bearer));
let syms = match kind.as_str() {
"symbols" => c.symbols(BinaryOnly { binary }).await,
"lookup" => c.symbol_lookup(SymbolLookupRequest { binary, pattern: arg, limit: lim }).await,
"defined-in" => c.defined_in(DefinedInRequest { binary, suffix: arg }).await,
other => return Err(anyhow!("introspect `{other}` unsupported over the server")),
}
.map_err(|e| anyhow!("Introspect.{kind} RPC failed: {e}"))?
.into_inner()
.symbols;
for s in &syms {
if kind == "symbols" {
println!("{}\t{}:{}\t{}", s.name_demangled, s.file, s.line, s.krate);
} else {
println!("{}:{} {}", s.file, s.line, s.name_demangled);
}
}
eprintln!("# {} symbol(s)", syms.len());
Ok(())
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn introspect_calls_remote(server: &str, binary: &str, kind: &str, name: &str, to: Option<&str>) -> Result<()> {
use pb_client::introspect_client::IntrospectClient;
use pb_client::{CallQuery, PathBetweenRequest};
let (binary, kind, name) = (binary.to_string(), kind.to_string(), name.to_string());
let to = to.map(|s| s.to_string());
on_server(server, move |channel, bearer| async move {
let mut c = IntrospectClient::with_interceptor(channel, auth_interceptor(bearer));
let names = match kind.as_str() {
"callers" => c.callers(CallQuery { binary, name: name.clone() }).await,
"callees" => c.callees(CallQuery { binary, name: name.clone() }).await,
"path" => {
let to = to.ok_or_else(|| anyhow!("`path` needs --to <fn>"))?;
c.path_between(PathBetweenRequest { binary, from: name.clone(), to }).await
}
other => return Err(anyhow!("introspect `{other}` unsupported over the server")),
}
.map_err(|e| anyhow!("Introspect.{kind} RPC failed: {e}"))?
.into_inner()
.names;
if kind == "path" {
if names.is_empty() {
println!("no call path from `{name}`");
} else {
println!("{}", names.join(" → "));
}
} else {
println!("{kind} of `{name}` ({}):", names.len());
for n in &names {
println!(" {n}");
}
}
Ok(())
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn docs_remote(server: &str, op: &str, repo: &str) -> Result<()> {
use pb_client::docs_client::DocsClient;
use pb_client::RepoOnly;
let (op, repo) = (op.to_string(), repo.to_string());
on_server(server, move |channel, bearer| async move {
let mut c = DocsClient::with_interceptor(channel, auth_interceptor(bearer));
let req = RepoOnly { repo };
let resp = match op.as_str() {
"init" => c.init(req).await,
"render" => c.render(req).await,
"check" => c.check(req).await,
other => return Err(anyhow!("docs `{other}` is not server-routable")),
}
.map_err(|e| anyhow!("Docs.{op} RPC failed: {e}"))?
.into_inner();
println!("{}: {}", resp.status, resp.detail);
for a in &resp.artifacts {
println!(" {a}");
}
if resp.status == "drift" {
bail!("docs drift on {}", resp.repo);
}
Ok(())
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn docs_history_remote(
server: &str,
repo: &str,
doc: Option<&str>,
version: Option<&str>,
format: Option<&str>,
limit: usize,
) -> Result<()> {
use pb_client::docs_client::DocsClient;
use pb_client::DocsHistoryRequest;
let req = DocsHistoryRequest {
repo: repo.to_string(),
doc: doc.unwrap_or("").to_string(),
version: version.unwrap_or("").to_string(),
format: format.unwrap_or("").to_string(),
limit: limit as u32,
};
on_server(server, move |channel, bearer| async move {
let mut c = DocsClient::with_interceptor(channel, auth_interceptor(bearer));
let entries = c.history(req).await.map_err(|e| anyhow!("Docs.History RPC failed: {e}"))?.into_inner().entries;
if entries.is_empty() {
println!("no exports historized");
return Ok(());
}
println!("{:<20} {:<8} {:<6} {:>9} {}", "exported_at", "doc", "format", "bytes", "path");
for e in &entries {
println!("{:<20} {:<8} {:<6} {:>9} {}", e.exported_at, e.doc, e.format, e.size_bytes, e.path);
}
Ok(())
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn index_snapshot_remote(server: &str, repo: &str, workspace: &str, git_sha: &str, branch: &str) -> Result<()> {
use pb_client::index_client::IndexClient;
use pb_client::SnapshotRequest;
let req = SnapshotRequest {
repo: repo.to_string(),
workspace: workspace.to_string(),
git_sha: git_sha.to_string(),
branch: branch.to_string(),
};
on_server(server, move |channel, bearer| async move {
let mut c = IndexClient::with_interceptor(channel, auth_interceptor(bearer));
let s = c.snapshot(req).await.map_err(|e| anyhow!("Index.Snapshot RPC failed: {e}"))?.into_inner();
println!(
"✓ snapshot {} ({}, {} blob(s), {} bytes, sha={})",
s.snapshot_id, s.repo, s.blob_count, s.total_bytes, &s.git_sha[..s.git_sha.len().min(12)]
);
Ok(())
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn index_upload_remote(
server: &str,
index_dir: &std::path::Path,
repo: &str,
workspace: &str,
git_sha: &str,
branch: &str,
) -> Result<()> {
use pb_client::index_client::IndexClient;
use pb_client::{index_blob_frame::Body, IndexBlobFile, IndexBlobFrame, IndexBlobMeta};
const CHUNK: usize = 1024 * 1024;
if !index_dir.is_dir() {
bail!("no index at {} — run `nornir index build` first", index_dir.display());
}
let mut frames: Vec<IndexBlobFrame> = vec![IndexBlobFrame {
body: Some(Body::Meta(IndexBlobMeta {
workspace: workspace.to_string(),
repo: repo.to_string(),
git_sha: git_sha.to_string(),
branch: branch.to_string(),
})),
}];
let mut file_count = 0u64;
let mut total_bytes = 0u64;
for entry in walkdir::WalkDir::new(index_dir).sort_by_file_name() {
let entry = entry.with_context(|| format!("walk index dir {}", index_dir.display()))?;
if !entry.file_type().is_file() {
continue;
}
let rel = entry
.path()
.strip_prefix(index_dir)
.map_err(|e| anyhow!("strip_prefix {}: {e}", entry.path().display()))?
.to_string_lossy()
.replace('\\', "/"); let bytes = std::fs::read(entry.path())
.with_context(|| format!("read {}", entry.path().display()))?;
frames.push(IndexBlobFrame {
body: Some(Body::File(IndexBlobFile { rel_path: rel, size_bytes: bytes.len() as u64 })),
});
for chunk in bytes.chunks(CHUNK) {
frames.push(IndexBlobFrame { body: Some(Body::Chunk(chunk.to_vec())) });
}
file_count += 1;
total_bytes += bytes.len() as u64;
}
if file_count == 0 {
bail!("index dir {} has no files to upload", index_dir.display());
}
println!(
"↥ uploading index {} ({} file(s), {} bytes) to {server} …",
index_dir.display(),
file_count,
total_bytes,
);
on_server(server, move |channel, bearer| async move {
let mut c = IndexClient::with_interceptor(channel, auth_interceptor(bearer));
let stream = futures::stream::iter(frames);
let s = c
.upload_snapshot(tonic::Request::new(stream))
.await
.map_err(|e| anyhow!("Index.UploadSnapshot RPC failed: {e}"))?
.into_inner();
println!(
"✓ uploaded → snapshot {} ({}, {} blob(s), {} bytes, sha={})",
s.snapshot_id, s.repo, s.blob_count, s.total_bytes, &s.git_sha[..s.git_sha.len().min(12)],
);
Ok(())
})
}
#[cfg(any(feature = "mcp", feature = "server"))]
enum MimirOpRemote {
DepsOf { repo: String, transitive: bool },
DependentsOf { repo: String, transitive: bool },
Affected { repos: Vec<String> },
BuildOrder,
DepPath { from: String, to: String },
ExternalUsers { krate: String },
Mermaid,
Overview { repo: String },
Changed,
}
impl MimirOp {
#[cfg(any(feature = "mcp", feature = "server"))]
fn clone_for_remote(&self) -> MimirOpRemote {
match self {
MimirOp::DepsOf { repo, transitive } => {
MimirOpRemote::DepsOf { repo: repo.clone(), transitive: *transitive }
}
MimirOp::DependentsOf { repo, transitive } => {
MimirOpRemote::DependentsOf { repo: repo.clone(), transitive: *transitive }
}
MimirOp::Affected { repos } => MimirOpRemote::Affected { repos: repos.clone() },
MimirOp::BuildOrder => MimirOpRemote::BuildOrder,
MimirOp::DepPath { from, to } => {
MimirOpRemote::DepPath { from: from.clone(), to: to.clone() }
}
MimirOp::ExternalUsers { krate } => {
MimirOpRemote::ExternalUsers { krate: krate.clone() }
}
MimirOp::Mermaid => MimirOpRemote::Mermaid,
MimirOp::Overview { repo } => MimirOpRemote::Overview { repo: repo.clone() },
MimirOp::Changed => MimirOpRemote::Changed,
}
}
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn run_mimir_remote(server: &str, op: &MimirOp) -> Result<()> {
use pb_client::mimir_client::MimirClient;
use pb_client::{AffectedQuery, CrateQuery, DepPathQuery, DepQuery, Empty, RepoOnly};
let op = op.clone_for_remote();
let json = on_server(server, move |channel, bearer| async move {
let mut c = MimirClient::with_interceptor(channel, auth_interceptor(bearer));
let r = match op {
MimirOpRemote::DepsOf { repo, transitive } => {
c.deps_of(DepQuery { repo, transitive }).await
}
MimirOpRemote::DependentsOf { repo, transitive } => {
c.dependents_of(DepQuery { repo, transitive }).await
}
MimirOpRemote::Affected { repos } => c.affected_by_change(AffectedQuery { repos }).await,
MimirOpRemote::BuildOrder => c.build_order(Empty {}).await,
MimirOpRemote::DepPath { from, to } => c.dep_path(DepPathQuery { from, to }).await,
MimirOpRemote::ExternalUsers { krate } => c.external_dep_users(CrateQuery { krate }).await,
MimirOpRemote::Mermaid => c.mermaid(Empty {}).await,
MimirOpRemote::Overview { repo } => c.repo_overview(RepoOnly { repo }).await,
MimirOpRemote::Changed => c.changed_since_last_release(Empty {}).await,
};
Ok(r.map_err(|e| anyhow!("Mimir RPC failed: {e}"))?.into_inner().json)
})?;
match serde_json::from_str::<serde_json::Value>(&json) {
Ok(v) => println!("{}", serde_json::to_string_pretty(&v)?),
Err(_) => println!("{json}"),
}
Ok(())
}
#[cfg(any(feature = "mcp", feature = "server"))]
fn diagram_timeline_remote(server: &str, workspace: &str) -> Result<nornir::viz::Timeline> {
use pb_client::viz_client::VizClient;
use pb_client::VizTimelineRequest;
let workspace = workspace.to_string();
let json = on_server(server, move |channel, bearer| async move {
let mut c = VizClient::with_interceptor(channel, auth_interceptor(bearer));
let r = c
.timeline(VizTimelineRequest { workspace })
.await
.map_err(|e| anyhow!("Viz.Timeline RPC failed: {e}"))?;
Ok(r.into_inner().json)
})?;
serde_json::from_str(&json).context("deserialize Timeline from Viz.Timeline")
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn no_client() -> anyhow::Error {
anyhow!(
"NORNIR_SERVER is set but this `nornir` was built without a client; \
rebuild with `--features server` (or `mcp`) to talk to a nornir-server"
)
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn submit_bench_remote(_server: &str, _repo: &str, _run: &bench::BenchRun) -> Result<String> {
Err(no_client())
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn list_repos_remote(_server: &str) -> Result<Vec<String>> {
Err(no_client())
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn knowledge_query_remote(
_server: &str, _repo: &str, _kind: &str, _arg: &str, _to: Option<&str>, _limit: usize,
) -> Result<()> {
Err(no_client())
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn search_remote(
_server: &str, _query: &str, _corpus: Option<&str>, _repo: Option<&str>, _limit: usize,
) -> Result<()> {
Err(no_client())
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn guard_remote(_server: &str, _op: &str) -> Result<()> {
Err(no_client())
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn index_stats_remote(_server: &str) -> Result<()> {
Err(no_client())
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn gate_remote(_server: &str, _name: &str, _repo: &str) -> Result<()> {
Err(no_client())
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn introspect_symbols_remote(_s: &str, _b: &str, _k: &str, _a: &str, _l: usize) -> Result<()> {
Err(no_client())
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn introspect_calls_remote(_s: &str, _b: &str, _k: &str, _n: &str, _t: Option<&str>) -> Result<()> {
Err(no_client())
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn docs_remote(_s: &str, _op: &str, _repo: &str) -> Result<()> {
Err(no_client())
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn docs_history_remote(
_s: &str, _r: &str, _d: Option<&str>, _v: Option<&str>, _f: Option<&str>, _l: usize,
) -> Result<()> {
Err(no_client())
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn index_snapshot_remote(_s: &str, _r: &str, _w: &str, _sha: &str, _b: &str) -> Result<()> {
Err(no_client())
}
#[cfg(not(any(feature = "mcp", feature = "server")))]
fn index_upload_remote(
_s: &str, _dir: &std::path::Path, _r: &str, _w: &str, _sha: &str, _b: &str,
) -> Result<()> {
Err(no_client())
}
fn run_bench(op: BenchOp, loaded: &Loaded) -> Result<()> {
match op {
BenchOp::HistoryShow(a) => {
let repo = repo_or_err(loaded, &a.repo)?;
let path = config::repo_dir_resolved(&loaded.workspace_root, &a.repo)
.join(if repo.history.is_empty() { "bench_history.jsonl" } else { &repo.history });
let mut runs = bench::history::read_all(&path)?;
if a.json {
for r in &runs {
println!("{}", serde_json::to_string(r)?);
}
return Ok(());
}
runs.sort_by(|x, y| {
let kx = x.timestamp.as_deref().unwrap_or(&x.date);
let ky = y.timestamp.as_deref().unwrap_or(&y.date);
ky.cmp(kx)
});
if a.limit > 0 {
runs.truncate(a.limit);
}
if runs.is_empty() {
println!("no bench runs in {}", path.display());
return Ok(());
}
println!("{} — {} run(s) [{}]", a.repo, runs.len(), path.display());
for r in &runs {
let date = if r.date.is_empty() { "-" } else { r.date.as_str() };
let machine = if r.machine.is_empty() { "-" } else { r.machine.as_str() };
println!("\nv{} · {} · {} cores · {}", r.version, machine, r.cores, date);
let mut scalars: Vec<String> = Vec::new();
for res in &r.results {
for (k, v) in &res.metrics {
if let Some(f) = v.as_f64() {
scalars.push(format!("{}.{k}={f:.2}", res.name));
}
}
}
scalars.sort();
if scalars.is_empty() {
println!(" metrics: (none)");
} else {
println!(" metrics: {}", scalars.join(" "));
}
if r.tests.is_empty() {
println!(" tests: (none recorded)");
} else {
let passed = r.tests.iter().filter(|t| t.passed).count();
let failed = r.tests.len() - passed;
println!(" tests: {passed} passed, {failed} failed");
for t in r.tests.iter().filter(|t| !t.passed) {
let msg = t.message.as_deref().unwrap_or("");
println!(" ✗ {} {}", t.name, msg);
}
}
}
}
BenchOp::Run(a) => {
let _repo = repo_or_err(loaded, &a.repo)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo);
println!("⚡ running nornir-bench in {}", repo_root.display());
let mut run = match release::pipeline::run_bench_example(&repo_root)? {
None => {
println!(
"skipped: neither {}/examples/nornir-bench.rs nor xtask/examples/nornir-bench.rs exists",
repo_root.display(),
);
return Ok(());
}
Some(r) => r,
};
if run.machine.trim().is_empty() {
run.machine = std::env::var("NORNIR_MACHINE").unwrap_or_else(|_| {
std::fs::read_to_string("/etc/hostname")
.ok()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| "unknown".into())
});
}
let id = if let Some(server) = server_target() {
submit_bench_remote(&server, &a.repo, &run)?
} else {
let wh = warehouse::open(&loaded.nornir.storage, &loaded.workspace_root)?;
wh.append_bench_run(&a.repo, &run)?.to_string()
};
println!(
"✓ {} result(s), {} test(s) persisted into bench_runs as {}",
run.results.len(),
run.tests.len(),
id,
);
for r in &run.results {
let mut kv: Vec<String> = r
.metrics
.iter()
.filter_map(|(k, v)| v.as_f64().map(|f| format!("{k}={f:.2}")))
.collect();
kv.sort();
println!(" • {:30} {}", r.name, kv.join(" "));
}
for t in run.tests.iter().filter(|t| !t.passed) {
println!(" ✗ test {}{}", t.name, t.message.as_deref().map(|m| format!(": {m}")).unwrap_or_default());
}
}
}
Ok(())
}
fn run_test(op: TestOp, loaded: &Loaded) -> Result<()> {
use nornir::warehouse::iceberg::IcebergWarehouse;
use nornir::warehouse::test_results::{
query_test_results, render_matrix, rows_to_json, summarize_runs, TestSelector,
};
match op {
TestOp::Run(a) => {
let _repo = repo_or_err(loaded, &a.repo)?;
let aspects = parse_aspects_arg(a.aspects.as_deref())?;
run_test_one(loaded, &a.repo, &aspects)?;
}
TestOp::All(a) => {
let aspects = parse_aspects_arg(a.aspects.as_deref())?;
let repos: Vec<String> = loaded.nornir.repo.keys().cloned().collect();
if repos.is_empty() {
println!("no repos configured in nornir.toml");
return Ok(());
}
let aspect_labels: Vec<&str> = aspects.iter().map(|a| a.label()).collect();
println!(
"▶ full aspect matrix [{}] across {} repo(s): {}",
aspect_labels.join(","),
repos.len(),
repos.join(", "),
);
let mut grid: Vec<(String, Vec<(nornir::test_matrix::Aspect, String)>)> = Vec::new();
let mut any_red = false;
for repo in &repos {
match run_test_one(loaded, repo, &aspects) {
Ok((green, cells)) => {
any_red |= !green;
grid.push((repo.clone(), cells));
}
Err(e) => {
eprintln!(" ✗ {repo}: {e:#}");
any_red = true;
grid.push((repo.clone(), Vec::new()));
}
}
}
print!("{}", render_aspect_grid(&grid, &aspects));
if any_red {
anyhow::bail!("test matrix had red runs (see above)");
}
}
TestOp::History(a) => {
let warehouse_root = loaded.warehouse_root();
let wh = IcebergWarehouse::open_read_only(&warehouse_root)
.with_context(|| format!("open iceberg warehouse at {}", warehouse_root.display()))?;
let by_repo =
wh.block_on(query_test_results(&wh, &TestSelector::Repo(a.repo.clone())))?;
let mut rows = if by_repo.is_empty() {
wh.block_on(query_test_results(&wh, &TestSelector::Run(a.repo.clone())))?
} else {
by_repo
};
if a.limit > 0 {
let keep: std::collections::HashSet<String> = summarize_runs(&rows)
.into_iter()
.take(a.limit)
.map(|s| s.run_id)
.collect();
rows.retain(|r| keep.contains(&r.run_id));
}
if a.json {
println!("{}", rows_to_json(&rows));
} else if rows.is_empty() {
println!(
"no test runs for `{}` yet — run `nornir test {}` first",
a.repo, a.repo
);
} else {
print!("{}", render_matrix(&rows));
}
}
}
Ok(())
}
fn parse_aspects_arg(arg: Option<&str>) -> Result<Vec<nornir::test_matrix::Aspect>> {
use nornir::test_matrix::{parse_aspect, Aspect};
let Some(s) = arg.map(str::trim).filter(|s| !s.is_empty()) else {
return Ok(Aspect::DEFAULT.to_vec());
};
let mut out = Vec::new();
for tok in s.split(',').map(str::trim).filter(|t| !t.is_empty()) {
match parse_aspect(tok) {
Some(a) => {
if !out.contains(&a) {
out.push(a);
}
}
None => anyhow::bail!(
"unknown aspect `{tok}` — valid: build,unit,doctest,clippy,fmt,audit,\
bench-smoke,coverage,feature-powerset,msrv,examples"
),
}
}
if out.is_empty() {
anyhow::bail!("--aspects given but no valid aspect parsed");
}
Ok(out)
}
fn run_test_one(
loaded: &Loaded,
repo: &str,
aspects: &[nornir::test_matrix::Aspect],
) -> Result<(bool, Vec<(nornir::test_matrix::Aspect, String)>)> {
use nornir::test_matrix::{outcome_to_rows, run_aspect, stall_secs, TestSink};
use nornir::warehouse::iceberg::IcebergWarehouse;
use nornir::warehouse::test_results::{new_run_id, status, IcebergTestSink, TestResultRow};
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, repo);
let stall = stall_secs();
let labels: Vec<&str> = aspects.iter().map(|a| a.label()).collect();
println!(
"⚡ {repo}: aspects [{}] in {} (stall-watchdog {})",
labels.join(","),
repo_root.display(),
if stall == 0 { "off".to_string() } else { format!("{stall}s") },
);
let run_id = new_run_id();
let ts_micros = chrono::Utc::now().timestamp_micros();
let mut rows: Vec<TestResultRow> = Vec::new();
let mut cells: Vec<(nornir::test_matrix::Aspect, String)> = Vec::new();
for &aspect in aspects {
let out = run_aspect(&repo_root, aspect);
cells.push((aspect, out.status.clone()));
let chip = match out.status.as_str() {
status::PASS => "✓",
status::SKIP => "○",
_ => "✗",
};
let metric_note = if out.metric != 0.0 { format!(" [{:.2}]", out.metric) } else { String::new() };
println!(" {chip} {} — {}{}", aspect.label(), out.message, metric_note);
rows.extend(outcome_to_rows(&out, &run_id, repo, ts_micros));
}
if rows.is_empty() {
println!(" (no rows produced for {repo})");
return Ok((true, cells));
}
let warehouse_root = loaded.warehouse_root();
let wh = IcebergWarehouse::open(&warehouse_root)
.with_context(|| format!("open iceberg warehouse at {}", warehouse_root.display()))?;
let sink = IcebergTestSink::new(&wh);
sink.append(&rows)?;
let red = rows.iter().any(|r| status::is_red(&r.status));
let mark = if red { "✗" } else { "✓" };
let passed = rows.iter().filter(|r| r.status == status::PASS).count();
let failed = rows.iter().filter(|r| r.status == status::FAIL).count();
let stalled = rows.iter().filter(|r| r.status == status::STALLED).count();
let skipped = rows.iter().filter(|r| r.status == status::SKIP).count();
println!(
"{mark} {repo}: {passed} passed · {failed} failed · {stalled} stalled · {skipped} skipped ({} rows → run {})",
rows.len(),
&run_id[..8.min(run_id.len())],
);
for r in rows.iter().filter(|r| status::is_red(&r.status)) {
let detail = if r.message.is_empty() { String::new() } else { format!(" — {}", r.message) };
println!(" ✗ [{}] {}::{} {}{}", r.aspect, r.suite, r.test_name, r.status, detail);
}
Ok((!red, cells))
}
fn render_aspect_grid(
grid: &[(String, Vec<(nornir::test_matrix::Aspect, String)>)],
aspects: &[nornir::test_matrix::Aspect],
) -> String {
use nornir::warehouse::test_results::status;
if grid.is_empty() {
return String::new();
}
let repo_w = grid.iter().map(|(r, _)| r.len()).max().unwrap_or(4).max(4);
let headers: Vec<String> = aspects.iter().map(|a| short_aspect(a.label())).collect();
let col_w = headers.iter().map(|h| h.len()).max().unwrap_or(3).max(3);
let mut out = String::from("\n");
out.push_str(&format!("{:<repo_w$}", "repo", repo_w = repo_w));
for h in &headers {
out.push_str(&format!(" {:>col_w$}", h, col_w = col_w));
}
out.push('\n');
for (repo, cells) in grid {
out.push_str(&format!("{:<repo_w$}", repo, repo_w = repo_w));
for a in aspects {
let chip = cells
.iter()
.find(|(asp, _)| asp == a)
.map(|(_, st)| match st.as_str() {
status::PASS => "✓",
status::SKIP => "○",
_ if status::is_red(st) => "✗",
_ => "·",
})
.unwrap_or("·");
out.push_str(&format!(" {:>col_w$}", chip, col_w = col_w));
}
out.push('\n');
}
out.push_str("\nlegend: ✓ pass · ✗ red (fail/stalled) · ○ skip (tool absent) · · not run\n");
out
}
fn short_aspect(label: &str) -> String {
match label {
"build" => "bld",
"unit" => "unt",
"doctest" => "doc",
"clippy" => "clp",
"fmt" => "fmt",
"audit" => "aud",
"bench-smoke" => "bch",
"coverage" => "cov",
"feature-powerset" => "pwr",
"msrv" => "msr",
"examples" => "exa",
other => other,
}
.to_string()
}
fn run_release(op: ReleaseOp, loaded: &Loaded) -> Result<()> {
match op {
ReleaseOp::GatePathPatches(a) => {
let _ = repo_or_err(loaded, &a.repo)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo);
release::gate::no_path_patches(&repo_root)?;
println!("ok: no [patch.crates-io] znippy entries in {}", repo_root.display());
}
ReleaseOp::GatePathDepVersions(a) => {
let _ = repo_or_err(loaded, &a.repo)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo);
let findings = release::gate::path_dep_audit(&repo_root)?;
let bad: Vec<_> = findings.iter().filter(|f| !f.ok()).collect();
for f in &findings {
let tag = if f.ok() { "ok " } else { "MISS" };
println!(" {tag} {} → {} (path={}, version={})",
f.manifest.display(), f.dep_name, f.dep_path,
f.version_req.as_deref().unwrap_or("<none>"));
}
if !bad.is_empty() {
return Err(anyhow!("{} path-dep(s) missing version=", bad.len()));
}
println!("ok: all {} path-dep(s) carry a version=", findings.len());
}
ReleaseOp::GateCrateMetadata(a) => {
let _ = repo_or_err(loaded, &a.repo)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo);
let checks = release::gate::crate_metadata_check(&repo_root)?;
let bad: Vec<_> = checks.iter().filter(|c| !c.ok()).collect();
for c in &checks {
let tag = if c.ok() { "ok " } else { "MISS" };
println!(" {tag} {}@{} readme={} license={} repo={} desc={}",
c.crate_name, c.version,
c.has_readme, c.has_license, c.has_repository, c.has_description);
}
if !bad.is_empty() {
return Err(anyhow!("{} crate(s) missing publish-required metadata", bad.len()));
}
println!("ok: all {} crate(s) carry readme+license+repository+description", checks.len());
}
ReleaseOp::GateLinksConflicts(a) => {
let _ = repo_or_err(loaded, &a.repo)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo);
let decls = release::gate::links_declarations_scan(&repo_root)?;
let conflicts = release::gate::detect_links_conflicts(&decls);
println!("scanned {} links= declarations", decls.len());
if !conflicts.is_empty() {
for c in &conflicts {
println!(" CONFLICT links={}: {:?}", c.links_value, c.crates);
}
return Err(anyhow!("{} links= conflict(s) detected", conflicts.len()));
}
println!("ok: no links= conflicts");
}
ReleaseOp::GateNexusFloor(a) => {
let _ = repo_or_err(loaded, &a.repo)?;
let run = last_run(loaded, &a.repo)?;
release::gate::nexus_floor(&run)?;
println!("ok: nexus_floor on v{}", run.version);
}
ReleaseOp::GateNoRegression(a) => {
let repo = repo_or_err(loaded, &a.repo)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo);
let history = history_path(&repo_root, repo);
let run = last_run(loaded, &a.repo)?;
let pct = if repo.gates.max_regression_pct > 0.0 { repo.gates.max_regression_pct } else { 10.0 };
release::gate::no_regression(&run, &history, pct)?;
println!("ok: no_regression ≤{:.1}% on v{}", pct, run.version);
}
ReleaseOp::GateDocsFresh(a) => {
let _ = repo_or_err(loaded, &a.repo)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo);
let run = last_run(loaded, &a.repo).ok();
let history = bench_history_runs(loaded, &a.repo);
let ctx = docs::Ctx::new(&repo_root, &loaded.workspace_root, run.as_ref())
.with_history(&history);
let readme = repo_root.join("README.md");
if readme.exists() {
let _ = docs::check_file(&readme, &ctx)?;
}
docs::assemble_and_check(&repo_root, run.as_ref().ok_or_else(|| anyhow!("no bench runs"))?)?;
println!("ok: docs_fresh on {}", repo_root.display());
}
ReleaseOp::GateRoundtrip(a) => {
let repo = repo_or_err(loaded, &a.repo)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo);
if repo.gates.integration_roundtrip.is_empty() {
println!("ok: roundtrip not configured for {}", a.repo);
return Ok(());
}
let kinds: Vec<&str> = repo.gates.integration_roundtrip.iter().map(|s| s.as_str()).collect();
release::gate::integration_roundtrip_via_cargo_test(&repo_root, &kinds)?;
println!("ok: roundtrip {:?}", kinds);
}
ReleaseOp::GateAll(a) => {
run_gate_all(loaded, &a.repo)?;
}
ReleaseOp::Gate { name, repo } => {
if let Some(server) = server_target() {
return gate_remote(&server, &name, &repo);
}
let a = RepoArg { repo };
let op = match name.as_str() {
"path_patches" | "no_path_patches" => ReleaseOp::GatePathPatches(a),
"path_dep_versions" => ReleaseOp::GatePathDepVersions(a),
"crate_metadata" => ReleaseOp::GateCrateMetadata(a),
"links_conflicts" => ReleaseOp::GateLinksConflicts(a),
"nexus_floor" => ReleaseOp::GateNexusFloor(a),
"no_regression" => ReleaseOp::GateNoRegression(a),
"docs_fresh" => ReleaseOp::GateDocsFresh(a),
"roundtrip" | "integration_roundtrip" => ReleaseOp::GateRoundtrip(a),
"all" => ReleaseOp::GateAll(a),
"guard_intact" => {
let _ = repo_or_err(loaded, &a.repo)?;
let recorded = guard::read_manifest(&loaded.workspace_root)?;
guard::intact(&loaded.workspace_root, &recorded)?;
println!("ok: guard_intact — every [guard].forbidden path matches the manifest");
return Ok(());
}
other => {
bail!(
"unknown gate `{other}`; known: path_patches, path_dep_versions, \
crate_metadata, links_conflicts, nexus_floor, no_regression, \
docs_fresh, roundtrip, guard_intact, all"
);
}
};
return run_release(op, loaded);
}
ReleaseOp::Run(a) => {
run_release_run(loaded, &a)?;
}
ReleaseOp::Trace { repo, workspace, json } => {
let warehouse_root = loaded.warehouse_root();
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open_read_only(&warehouse_root)
.with_context(|| format!("open iceberg warehouse at {}", warehouse_root.display()))?;
let graph = wh
.block_on(nornir::warehouse::dep_graph::query_dep_graph_snapshots(&wh, &repo, None))
.ok()
.and_then(|snaps| snaps.into_iter().last())
.map(|snap| {
nornir::warehouse::dep_graph::WorkspaceGraph::from_query_parts(
Default::default(),
snap.edges,
)
});
let trace = wh.block_on(nornir::release::regression::trace_gate_async(
&wh, &workspace, &repo, graph.as_ref(),
))?;
if json {
println!("{}", serde_json::to_string_pretty(&trace)?);
} else {
print_regression_trace(&trace);
}
}
ReleaseOp::Events { selector, json } => {
use nornir::warehouse::release_events::{
query_release_events, render_topo, rows_to_json, EventSelector,
};
let warehouse_root = loaded.warehouse_root();
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open_read_only(&warehouse_root)
.with_context(|| format!("open iceberg warehouse at {}", warehouse_root.display()))?;
let by_run = wh
.block_on(query_release_events(&wh, &EventSelector::Run(selector.clone())))?;
let rows = if by_run.is_empty() {
wh.block_on(query_release_events(&wh, &EventSelector::Repo(selector.clone())))?
} else {
by_run
};
if json {
println!("{}", rows_to_json(&rows));
} else if rows.is_empty() {
println!(
"no release events for `{selector}` (not a known run_id or repo, or no \
`nornir release run` has recorded events yet)"
);
} else {
print!("{}", render_topo(&rows));
}
}
ReleaseOp::Publish(a) => {
let repo = repo_or_err(loaded, &a.repo)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo);
let outcomes =
release::publish::publish_all(&repo_root, &repo.publish_order, a.dry_run)?;
for (k, o) in &outcomes {
println!(" {k:40} {o:?}");
}
}
ReleaseOp::WaitForIndex { krate, version, timeout_secs } => {
let waited_ms = release::publish::wait_for_index(
&krate, &version,
std::time::Duration::from_secs(timeout_secs),
)?;
println!("ok: {krate}@{version} visible on crates.io after {waited_ms} ms");
}
ReleaseOp::TarballStats { repo, krate } => {
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &repo);
let s = release::publish::tarball_stats(&repo_root, &krate)?;
let warn = if s.tarball_bytes > release::publish::DEFAULT_TARBALL_BYTES_THRESHOLD {
" WARN: above 5 MB threshold"
} else { "" };
let lf = s.largest_file.as_deref().unwrap_or("(none)");
let lb = s.largest_file_bytes.unwrap_or(0);
println!(
"ok: {}@{} tarball {} bytes, {} files, largest={lf} ({lb} bytes){warn}",
s.crate_name, s.version, s.tarball_bytes, s.file_count
);
}
ReleaseOp::StripPatchBlocks(a) => {
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo);
let (files, blocks) = release::cargo::strip_patch_crates_io_recursive(&repo_root)?;
println!("ok: stripped {blocks} [patch.crates-io] block(s) across {files} Cargo.toml(s)");
}
ReleaseOp::Changelog { repo, range } => {
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &repo);
let md = release::cargo::changelog_markdown(&repo_root, &range)?;
print!("{md}");
}
ReleaseOp::ImpactedCrates { repo, base } => {
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &repo);
let changed = release::cargo::changed_crates_since(&repo_root, &base)?;
if changed.is_empty() {
println!("ok: no changed crates since {base}");
return Ok(());
}
let warehouse_root = loaded.warehouse_root();
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open_read_only(&warehouse_root)
.with_context(|| format!("open iceberg warehouse at {}", warehouse_root.display()))?;
let snapshots = wh.block_on(
nornir::warehouse::dep_graph::query_dep_graph_snapshots(&wh, &repo, None),
)?;
let mut edges: Vec<(String, String)> = Vec::new();
if let Some(snap) = snapshots.last() {
for e in &snap.edges {
for via in &e.via {
edges.push((e.from.clone(), via.clone()));
edges.push((via.clone(), e.to.clone()));
}
}
}
let impacted = release::cargo::impacted_crates(&changed, &edges);
for c in &impacted {
println!("{c}");
}
eprintln!("# {} changed → {} impacted", changed.len(), impacted.len());
}
ReleaseOp::YankCascade { repo, undo, dry_run } => {
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &repo);
let list_path = repo_root.join(".nornir/yank-cascade.txt");
let txt = std::fs::read_to_string(&list_path)
.with_context(|| format!("read {}", list_path.display()))?;
let mut order = Vec::new();
for line in txt.lines() {
let l = line.trim();
if l.is_empty() || l.starts_with('#') { continue; }
let mut sp = l.split_whitespace();
let n = sp.next().ok_or_else(|| anyhow::anyhow!("bad line: {l}"))?;
let v = sp.next().ok_or_else(|| anyhow::anyhow!("missing version: {l}"))?;
order.push((n.to_string(), v.to_string()));
}
let results = release::cargo::yank_cascade(&repo_root, &order, undo, dry_run)?;
for (k, v, s) in &results {
println!(" {k:40} {v:12} {s:?}");
}
}
ReleaseOp::MirrorToHolger { repo, krate, version, holger_base_url, token } => {
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &repo);
let tok = token.or_else(|| std::env::var("HOLGER_TOKEN").ok());
let bytes = release::cargo::mirror_to_holger(
&repo_root, &krate, &version, &holger_base_url, tok.as_deref(),
)?;
println!("ok: mirrored {krate}@{version} ({bytes} bytes) to {holger_base_url}");
}
ReleaseOp::BumpVersion { repo, pkg, new_version, bump_consumers, dry_run } => {
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &repo);
let plan = release::cargo::plan_version_bump(
&repo_root, &pkg, &new_version, bump_consumers,
)?;
if plan.edits.is_empty() {
println!("ok: no references to `{pkg}` found under {}", repo_root.display());
return Ok(());
}
for e in &plan.edits {
let rel = e.cargo_toml.strip_prefix(&repo_root).unwrap_or(&e.cargo_toml);
println!(
" {:50} {:?} {} {}→{}",
rel.display(), e.location, e.pkg, e.old_version, e.new_version
);
}
if dry_run {
println!("dry-run: {} edit(s) across {} file(s)",
plan.edits.len(),
plan.edits.iter().map(|e| &e.cargo_toml).collect::<std::collections::BTreeSet<_>>().len()
);
} else {
let n = release::cargo::apply_bump_plan(&plan)?;
println!("ok: applied {} edit(s) across {n} file(s)", plan.edits.len());
}
}
}
Ok(())
}
fn history_path(repo_root: &Path, repo: &config::Repo) -> PathBuf {
repo_root.join(if repo.history.is_empty() { "bench_history.jsonl" } else { &repo.history })
}
fn bench_history_runs(loaded: &Loaded, repo_name: &str) -> Vec<bench::BenchRun> {
warehouse::open_read_only(&loaded.nornir.storage, &loaded.workspace_root)
.ok()
.and_then(|wh| wh.query_bench_runs(&warehouse::BenchFilter::for_repo(repo_name)).ok())
.unwrap_or_default()
}
fn last_run(loaded: &Loaded, name: &str) -> Result<bench::BenchRun> {
let wh = warehouse::open_read_only(&loaded.nornir.storage, &loaded.workspace_root)?;
let filter = warehouse::BenchFilter::for_repo(name);
if let Ok(mut runs) = wh.query_bench_runs(&filter) {
if let Some(latest) = runs.pop() {
return Ok(latest);
}
}
let repo = repo_or_err(loaded, name)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, name);
let path = history_path(&repo_root, repo);
let runs = bench::history::read_all(&path)?;
runs.into_iter()
.last()
.ok_or_else(|| anyhow!("no bench runs in iceberg or {}", path.display()))
}
fn run_gate_all(loaded: &Loaded, repo_name: &str) -> Result<()> {
let repo = repo_or_err(loaded, repo_name)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, repo_name);
let g = &repo.gates;
let mut passed: Vec<String> = Vec::new();
let mut failed: Vec<(String, String)> = Vec::new();
macro_rules! push {
($n:expr, $r:expr) => {
match $r {
Ok(()) => passed.push($n.into()),
Err(e) => failed.push(($n.into(), format!("{e:#}"))),
}
};
}
if g.no_path_patches {
push!("no_path_patches", release::gate::no_path_patches(&repo_root));
}
let last = last_run(loaded, repo_name);
if g.nexus_floor {
push!("nexus_floor", last.as_ref().map_err(|e| anyhow!("{e:#}")).and_then(|r| release::gate::nexus_floor(r)));
}
if g.no_regression {
let pct = if g.max_regression_pct > 0.0 { g.max_regression_pct } else { 10.0 };
let hp = history_path(&repo_root, repo);
push!("no_regression",
last.as_ref().map_err(|e| anyhow!("{e:#}")).and_then(|r| release::gate::no_regression(r, &hp, pct)));
}
if !g.integration_roundtrip.is_empty() {
let kinds: Vec<&str> = g.integration_roundtrip.iter().map(|s| s.as_str()).collect();
push!("integration_roundtrip",
release::gate::integration_roundtrip_via_cargo_test(&repo_root, &kinds));
}
if g.docs_fresh {
let history = bench_history_runs(loaded, repo_name);
let r: Result<()> = (|| {
let run = last.as_ref().map_err(|e| anyhow!("{e:#}"))?;
let ctx = docs::Ctx::new(&repo_root, &loaded.workspace_root, Some(run))
.with_history(&history);
let readme = repo_root.join("README.md");
if readme.exists() { docs::check_file(&readme, &ctx)?; }
docs::assemble_and_check(&repo_root, run)
})();
push!("docs_fresh", r);
}
println!("=== gate-all: {repo_name} ===");
for n in &passed { println!(" ✓ {n}"); }
for (n, e) in &failed { println!(" ✗ {n}: {e}"); }
if !failed.is_empty() {
bail!("{} gate(s) failed", failed.len());
}
println!("{} gate(s) passed", passed.len());
Ok(())
}
fn print_regression_trace(t: &nornir::release::regression::Trace) {
let short = |s: &str| s.chars().take(12).collect::<String>();
println!("regression trace · repo `{}`", t.repo);
if t.frames.is_empty() {
println!(" (no recorded releases for this repo)");
return;
}
match (&t.last_good, &t.first_bad) {
(Some(g), None) => {
println!(" ✓ green — last good {} ({})", short(&g.git_sha), g.gate_status);
}
(Some(g), Some(b)) => {
println!(" ✓ last good : {} ({})", short(&g.git_sha), g.gate_status);
println!(" ✗ first bad : {} ({})", short(&b.git_sha), b.gate_status);
println!(" ⇒ bisect the commits in {}..{}", short(&g.git_sha), short(&b.git_sha));
}
(None, Some(b)) => {
println!(" ✗ never green on record; first bad {} ({})", short(&b.git_sha), b.gate_status);
}
(None, None) => {}
}
if !t.suspect_shas.is_empty() {
let s: Vec<String> = t.suspect_shas.iter().map(|x| short(x)).collect();
println!(" suspect release SHA(s): {}", s.join(", "));
}
if !t.suspects.is_empty() {
println!(" ranked suspects (nearest dep first):");
for s in &t.suspects {
let tag = if s.dep_distance == 0 {
"self".to_string()
} else {
format!("dep+{}", s.dep_distance)
};
let g = s.last_good_sha.as_deref().map(short).unwrap_or_else(|| "—".into());
let b = s.first_bad_sha.as_deref().map(short).unwrap_or_else(|| "—".into());
println!(" [{tag}] {} {g} → {b}", s.repo);
}
}
println!(" timeline (oldest→newest):");
for f in &t.frames {
println!(" {} {} {}", if f.good { "✓" } else { "✗" }, short(&f.git_sha), f.gate_status);
}
}
fn run_release_run(loaded: &Loaded, args: &ReleaseRunArgs) -> Result<()> {
use nornir::release::progress::{now, ProgressWriter, ReleaseEvent};
use nornir::warehouse::dep_graph::{query_dep_graph_snapshots, topo_order_from_edges};
use nornir::warehouse::iceberg::IcebergWarehouse;
let log_dir = loaded.workspace_root.join("workspace_holger/.nornir/logs");
let run_id = chrono::Utc::now().timestamp().to_string();
let progress = ProgressWriter::open(&log_dir, &run_id)
.with_context(|| format!("open progress writer in {}", log_dir.display()))
.ok();
if let Some(p) = &progress {
println!("📡 live events: {}", p.path().display());
p.emit(&ReleaseEvent::RunStart {
ts: now(),
run_id: run_id.clone(),
workspace: args.workspace.clone(),
});
}
let progress_end = progress.clone();
let run_id_for_end = run_id.clone();
let all_repos: Vec<String> = loaded.nornir.repo.keys().cloned().collect();
let selected: Vec<String> = if let Some(name) = &args.repo {
if !loaded.nornir.repo.contains_key(name) {
bail!("unknown repo `{name}`; configured: {:?}", all_repos);
}
vec![name.clone()]
} else {
all_repos.clone()
};
let warehouse_root = loaded.warehouse_root();
let wh = IcebergWarehouse::open_read_only(&warehouse_root)
.with_context(|| format!("open iceberg warehouse at {}", warehouse_root.display()))?;
let snapshots = wh
.block_on(query_dep_graph_snapshots(&wh, &args.workspace, None))
.with_context(|| format!("query dep_graph_edges for workspace `{}`", args.workspace))?;
let latest = snapshots.into_iter().last();
let order: Vec<String> = match (&latest, args.repo.is_some()) {
(_, true) => selected.clone(), (Some(snap), false) => {
println!(
"📚 using dep-graph snapshot {} ({}, {} edge(s))",
snap.snapshot_id,
snap.timestamp.to_rfc3339(),
snap.edges.len()
);
topo_order_from_edges(&selected, &snap.edges)
}
(None, false) => {
println!(
"⚠ no dep_graph_edges rows for workspace `{}` — falling back to nornir.toml repo order",
args.workspace
);
selected.clone()
}
};
println!("▶ release run order: {}", order.join(" → "));
use nornir::warehouse::release_events::{
phase as ev_phase, status as ev_status, ReleaseEventEmitter,
};
let events = ReleaseEventEmitter::new(run_id.clone());
let dep_edges: Vec<nornir::warehouse::dep_graph::CrossRepoEdge> =
latest.as_ref().map(|s| s.edges.clone()).unwrap_or_default();
let deps_of = |repo: &str| -> Vec<String> {
let mut d: Vec<String> = dep_edges
.iter()
.filter(|e| e.from == repo && e.to != repo)
.map(|e| e.to.clone())
.collect();
d.sort();
d.dedup();
d
};
events.emit(
&wh, &args.workspace, "", "run", ev_phase::START, ev_status::RUNNING,
&format!("order: {}", order.join(" → ")), None, None,
);
let release_id = uuid::Uuid::new_v4();
let dep_graph_snapshot_id = latest
.as_ref()
.map(|s| s.snapshot_id)
.unwrap_or_else(uuid::Uuid::nil);
let mut lineage: Vec<release::pipeline::RepoReleaseRecord> = Vec::new();
for (idx, name) in order.iter().enumerate() {
let mut tests_passed = 0u32;
let mut tests_failed = 0u32;
let mut tantivy_pin: Option<String> = None;
let mut dwarf_pin: Option<String> = None;
let repo_cfg = repo_or_err(loaded, name)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, name);
println!(
"\n━━━ [{}/{}] {name} @ {} ━━━",
idx + 1,
order.len(),
repo_root.display(),
);
if let Some(p) = &progress {
use nornir::release::progress::{now, ReleaseEvent};
p.emit(&ReleaseEvent::RepoStart {
ts: now(),
repo: name.clone(),
sha: String::new(),
});
p.emit(&ReleaseEvent::PhaseStart {
ts: now(),
repo: name.clone(),
phase: "test".to_string(),
});
}
let deps = deps_of(name);
if !args.skip_tests {
let t0 = std::time::Instant::now();
events.emit(
&wh, name, name, "test", ev_phase::START, ev_status::RUNNING, "",
Some(deps.clone()), None,
);
let (passed, failed, ok) =
release::pipeline::run_cargo_test(&repo_root, Some(name), progress.as_ref())
.with_context(|| format!("cargo test for `{name}`"))?;
tests_passed = passed;
tests_failed = failed;
println!(" tests: {passed} passed, {failed} failed");
events.emit(
&wh, name, name, "test", ev_phase::END,
if ok { ev_status::OK } else { ev_status::FAIL },
&format!("{passed} passed, {failed} failed"),
Some(deps.clone()),
Some(t0.elapsed().as_millis() as i64),
);
if let Some(p) = &progress {
use nornir::release::progress::{now, ReleaseEvent};
p.emit(&ReleaseEvent::PhaseEnd {
ts: now(),
repo: name.clone(),
phase: "test".to_string(),
ok,
duration_ms: t0.elapsed().as_millis() as u64,
});
p.emit(&ReleaseEvent::RepoEnd {
ts: now(),
repo: name.clone(),
ok,
});
}
if !ok {
if let Some(p) = &progress {
use nornir::release::progress::{now, ReleaseEvent};
p.emit(&ReleaseEvent::RunEnd {
ts: now(),
run_id: run_id.clone(),
ok: false,
});
}
events.emit(
&wh, &args.workspace, "", "run", ev_phase::END, ev_status::FAIL,
&format!("aborted at `{name}` test phase"), None, None,
);
bail!("`{name}` test phase failed — aborting release run");
}
} else {
events.emit(
&wh, name, name, "test", ev_phase::SKIP, ev_status::WARN, "--skip-tests",
Some(deps.clone()), None,
);
println!(" tests: skipped (--skip-tests)");
}
if !args.skip_bench {
println!(" ⚡ running nornir-bench in {}", repo_root.display());
let b0 = std::time::Instant::now();
events.emit(
&wh, name, name, "bench", ev_phase::START, ev_status::RUNNING, "",
Some(deps.clone()), None,
);
match release::pipeline::run_bench_example(&repo_root)
.with_context(|| format!("run nornir-bench for `{name}`"))?
{
None => {
println!(" bench: skipped (no examples/nornir-bench.rs)");
events.emit(
&wh, name, name, "bench", ev_phase::END, ev_status::WARN,
"no examples/nornir-bench.rs", Some(deps.clone()),
Some(b0.elapsed().as_millis() as i64),
);
}
Some(mut run) => {
if run.machine.trim().is_empty() {
run.machine = std::env::var("NORNIR_MACHINE").unwrap_or_else(|_| {
std::fs::read_to_string("/etc/hostname")
.ok()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.unwrap_or_else(|| "unknown".into())
});
}
let n_failed = run.tests.iter().filter(|t| !t.passed).count();
let id = wh
.block_on(wh.append_bench_run_async(name, &run))
.with_context(|| format!("persist bench run for `{name}`"))?;
println!(
" bench: {} result(s), {} test fail(s), persisted as {}",
run.results.len(),
n_failed,
id
);
for r in &run.results {
let mut kv: Vec<String> = r
.metrics
.iter()
.filter_map(|(k, v)| v.as_f64().map(|f| format!("{k}={f:.2}")))
.collect();
kv.sort();
println!(" • {:30} {}", r.name, kv.join(" "));
}
if n_failed > 0 {
for t in run.tests.iter().filter(|t| !t.passed) {
println!(
" ✗ test {}{}",
t.name,
t.message
.as_deref()
.map(|m| format!(": {m}"))
.unwrap_or_default()
);
}
events.emit(
&wh, name, name, "bench", ev_phase::END, ev_status::FAIL,
&format!("{n_failed} failing bench test(s)"), Some(deps.clone()),
Some(b0.elapsed().as_millis() as i64),
);
events.emit(
&wh, &args.workspace, "", "run", ev_phase::END, ev_status::FAIL,
&format!("aborted at `{name}` bench phase"), None, None,
);
bail!("`{name}` bench reported {n_failed} failing test(s) — aborting");
}
events.emit(
&wh, name, name, "bench", ev_phase::END, ev_status::OK,
&format!("{} result(s)", run.results.len()), Some(deps.clone()),
Some(b0.elapsed().as_millis() as i64),
);
}
}
} else {
println!(" bench: skipped (--skip-bench)");
}
if !args.skip_render_docs {
match last_run(loaded, name) {
Ok(run) => {
let ctx = docs::Ctx::new(&repo_root, &loaded.workspace_root, Some(&run));
let readme = repo_root.join("README.md");
if readme.exists() {
match docs::assemble_file(&readme, &ctx) {
Ok(_) => println!(
" docs: rendered README.md from iceberg bench v{}",
run.version
),
Err(e) => println!(" docs: ⚠ render skipped: {e:#}"),
}
} else {
println!(" docs: README.md absent — skipping render");
}
}
Err(e) => println!(" docs: ⚠ no bench row to render from: {e:#}"),
}
} else {
println!(" docs: skipped (--skip-render-docs)");
}
if !args.skip_gates {
let _ = repo_cfg; let g0 = std::time::Instant::now();
events.emit(
&wh, name, name, "gate", ev_phase::START, ev_status::RUNNING, "gate-all",
Some(deps.clone()), None,
);
let gate_res = run_gate_all(loaded, name)
.with_context(|| format!("gate-all for `{name}`"));
events.emit(
&wh, name, name, "gate", ev_phase::END,
if gate_res.is_ok() { ev_status::OK } else { ev_status::FAIL },
if gate_res.is_ok() { "all gates passed" } else { "gate failed" },
Some(deps.clone()),
Some(g0.elapsed().as_millis() as i64),
);
gate_res?;
} else {
println!(" gates: skipped (--skip-gates)");
}
if !args.skip_snapshot {
let (sha, branch) =
read_git_head(&repo_root).unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
match nornir::knowledge::scan_all(&repo_root, name) {
Ok(res) => {
let s = wh.append_symbol_scan(&res.symbols);
let g = wh.append_git_heat_scan(&res.git);
match (s, g) {
(Ok(_), Ok(_)) => println!(
" 📖 knowledge: symbols={} calls={} (symbol_snapshot={} git_snapshot={})",
res.symbols.symbols.len(), res.symbols.calls.len(),
res.symbols.snapshot_id, res.git.snapshot_id,
),
(s, g) => println!(" 📖 knowledge: ⚠ partial: {:?} {:?}", s.err(), g.err()),
}
}
Err(e) => println!(" 📖 knowledge: ⚠ scan skipped: {e:#}"),
}
let existing_index = wh.block_on(wh.index_snapshot_id_for(name, &sha)).ok().flatten();
let index_dir = loaded.workspace_root.join(".nornir/cache/index");
if let (Some(id), false) = (&existing_index, args.recapture) {
println!(" ⏃ urðr: snapshot exists for sha={} → reuse {id} (--recapture to force)",
&sha[..sha.len().min(12)]);
tantivy_pin = Some(id.clone());
} else if !index_dir.exists() {
println!(" ⏃ urðr: workspace index absent at {} — skipping snapshot", index_dir.display());
} else {
match nornir::index::snapshot::snapshot_to_iceberg(
&wh,
&args.workspace,
name,
&sha,
&branch,
&index_dir,
) {
Ok(snap) => {
println!(
" ⏃ urðr: snapshot {} pinned ({} blob(s), {} bytes, sha={})",
snap.snapshot_id,
snap.blob_count,
snap.total_bytes,
&snap.git_sha[..snap.git_sha.len().min(12)],
);
tantivy_pin = Some(snap.snapshot_id.to_string());
}
Err(e) => println!(" ⏃ urðr: ⚠ snapshot skipped: {e:#}"),
}
}
if let Err(e) = map_vectors(&wh, &args.workspace, name, &repo_root, &sha, &branch) {
println!(" 🧬 vectors: ⚠ skipped: {e:#}");
}
let existing_dwarf = wh.block_on(wh.dwarf_snapshot_id_for(name, &sha)).ok().flatten();
if let (Some(id), false) = (&existing_dwarf, args.recapture) {
println!(" 🔬 dwarf: snapshot exists for sha → reuse {id} (--recapture to force)");
dwarf_pin = Some(id.clone());
} else {
dwarf_pin = capture_dwarf(
&wh, &args.workspace, name, &repo_root, &sha, &branch,
&loaded.workspace_root, &warehouse_root,
);
}
}
if !args.skip_push {
use nornir::release::progress::{now, ReleaseEvent};
let t0 = std::time::Instant::now();
if let Some(p) = &progress {
p.emit(&ReleaseEvent::PhaseStart {
ts: now(),
repo: name.clone(),
phase: "push".to_string(),
});
}
let msg = format!("release({name}): regenerate docs from bench run");
let push_ok = match release::publish::commit_release(&repo_root, &msg) {
Ok(Some(sha)) => {
println!(" 📝 committed release docs: {}", &sha[..sha.len().min(12)]);
match release::publish::push(&repo_root, false) {
Ok(true) => { println!(" ⤴ pushed to origin"); true }
Ok(false) => true,
Err(e) => { println!(" ⚠ push step failed (continuing): {e:#}"); false }
}
}
Ok(None) => {
println!(" 📝 docs unchanged — nothing to commit");
true
}
Err(e) => {
println!(" ⚠ commit failed (continuing): {e:#}");
false
}
};
if let Some(p) = &progress {
p.emit(&ReleaseEvent::PhaseEnd {
ts: now(),
repo: name.clone(),
phase: "push".to_string(),
ok: push_ok,
duration_ms: t0.elapsed().as_millis() as u64,
});
}
} else {
println!(" push: skipped (--skip-push)");
}
if !args.skip_publish && !repo_cfg.publish_order.is_empty() {
use nornir::release::progress::{now, ReleaseEvent};
let t0 = std::time::Instant::now();
if let Some(p) = &progress {
p.emit(&ReleaseEvent::PhaseStart {
ts: now(),
repo: name.clone(),
phase: "publish".to_string(),
});
}
events.emit(
&wh, name, name, "publish", ev_phase::START, ev_status::RUNNING,
if args.dry_run_publish { "dry-run" } else { "" },
Some(deps.clone()), None,
);
let pub_ok = match release::publish::publish_all(
&repo_root,
&repo_cfg.publish_order,
args.dry_run_publish,
) {
Ok(outcomes) => {
for (k, o) in &outcomes {
println!(" 📦 {k:40} {o:?}");
}
true
}
Err(e) => {
println!(" ✗ publish failed: {e:#}");
false
}
};
if let Some(p) = &progress {
p.emit(&ReleaseEvent::PhaseEnd {
ts: now(),
repo: name.clone(),
phase: "publish".to_string(),
ok: pub_ok,
duration_ms: t0.elapsed().as_millis() as u64,
});
}
events.emit(
&wh, name, name, "publish", ev_phase::END,
if pub_ok { ev_status::OK } else { ev_status::FAIL },
if pub_ok { "published" } else { "publish failed" },
Some(deps.clone()),
Some(t0.elapsed().as_millis() as i64),
);
if !pub_ok {
if let Some(p) = &progress_end {
p.emit(&ReleaseEvent::RunEnd {
ts: now(),
run_id: run_id_for_end.clone(),
ok: false,
});
}
events.emit(
&wh, &args.workspace, "", "run", ev_phase::END, ev_status::FAIL,
&format!("aborted at `{name}` publish phase"), None, None,
);
bail!("`{name}` publish phase failed — aborting release run");
}
} else if repo_cfg.publish_order.is_empty() {
println!(" publish: no publish_order configured for `{name}` — skipping");
} else {
println!(" publish: skipped (--skip-publish)");
}
let git = release::pipeline::read_git_state(&repo_root).unwrap_or_else(|_| {
let (sha, branch) =
read_git_head(&repo_root).unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
release::pipeline::RepoGitState { sha, branch, dirty: false }
});
lineage.push(release::pipeline::RepoReleaseRecord {
repo: name.clone(),
build_order_idx: idx,
git,
gate_status: "succeeded".to_string(),
tests_passed,
tests_failed,
published_versions: Vec::new(),
tantivy_snapshot_id: tantivy_pin,
dwarf_snapshot_id: dwarf_pin,
});
}
if !lineage.is_empty() {
match wh.block_on(release::pipeline::persist_lineage(
&wh,
release_id,
&args.workspace,
&dep_graph_snapshot_id,
&lineage,
false,
)) {
Ok(()) => println!(
"📌 release lineage: {} repo row(s) pinned (release {})",
lineage.len(),
release_id,
),
Err(e) => println!("⚠ release lineage write failed: {e:#}"),
}
}
events.emit(
&wh, &args.workspace, "", "run", ev_phase::END, ev_status::OK,
&format!("{} repo(s) green", order.len()), None, None,
);
println!("\n🎉 all {} repo(s) green: {}", order.len(), order.join(", "));
if let Some(p) = &progress_end {
p.emit(&ReleaseEvent::RunEnd {
ts: now(),
run_id: run_id_for_end,
ok: true,
});
}
Ok(())
}
fn run_docs(op: DocsOp, loaded: &Loaded) -> Result<()> {
if let Some(server) = server_target() {
match &op {
DocsOp::Init(a) => return docs_remote(&server, "init", &a.repo),
DocsOp::Render(a) => return docs_remote(&server, "render", &a.repo),
DocsOp::Check(a) => return docs_remote(&server, "check", &a.repo),
DocsOp::History(a) => {
return docs_history_remote(
&server, &a.repo.repo, a.doc.as_deref(), a.version.as_deref(),
a.format.as_deref(), a.limit,
)
}
_ => {}
}
}
match op {
DocsOp::Init(a) => run_docs_init(loaded, &a)?,
DocsOp::Render(a) => run_docs_render(loaded, &a)?,
DocsOp::Check(a) => run_docs_check(loaded, &a)?,
#[cfg(feature = "docs-export")]
DocsOp::Export(a) => run_docs_export(loaded, a)?,
#[cfg(feature = "docs-export")]
DocsOp::Book(a) => run_docs_book(loaded, a)?,
DocsOp::History(a) => run_docs_history(loaded, &a)?,
DocsOp::Index(a) => run_docs_index(loaded, &a)?,
DocsOp::Search(a) => run_docs_search(loaded, &a)?,
}
Ok(())
}
fn docs_ctx_for<'a>(
loaded: &'a Loaded,
repo_name: &str,
) -> Result<(docs::RepoLayout, PathBuf, Option<nornir::bench::BenchRun>, Vec<nornir::bench::BenchRun>)>
{
repo_or_err(loaded, repo_name)?; let repo_root = config::repo_dir_resolved(&loaded.workspace_root, repo_name);
let last = last_run(loaded, repo_name).ok();
let history = bench_history_runs(loaded, repo_name);
Ok((docs::RepoLayout::new(&repo_root), repo_root, last, history))
}
fn run_docs_init(loaded: &Loaded, a: &RepoArg) -> Result<()> {
let (layout, _, _, _) = docs_ctx_for(loaded, &a.repo)?;
let srcs = docs::init_repo(&layout)?;
println!("initialised {}", layout.nornir_dir().display());
for s in &srcs {
println!(" source: {}", s.display());
}
println!("next: `nornir docs render {}`", a.repo);
Ok(())
}
fn run_docs_render(loaded: &Loaded, a: &RepoArg) -> Result<()> {
let (layout, repo_root, last, history) = docs_ctx_for(loaded, &a.repo)?;
let ctx = docs::Ctx::new(&repo_root, &loaded.workspace_root, last.as_ref())
.with_history(&history);
let cfg = docs::DocsRenderCfg::load(&layout)?;
if cfg.wants_markdown() {
let reports = docs::render_all(&layout, &ctx)?;
if reports.is_empty() {
if !cfg.wants_pdf() {
println!(
"no sources under {} — run `nornir docs init {}` first",
layout.nornir_dir().display(),
a.repo
);
return Ok(());
}
}
for r in &reports {
let verb = if r.changed { "wrote" } else { "unchanged" };
println!(
"{verb}: {} ({} bytes, {} section{})",
r.output.display(),
r.bytes,
r.sections.len(),
if r.sections.len() == 1 { "" } else { "s" }
);
}
let src_reports = docs::render_sources_in_place(&layout, &ctx)?;
for r in &src_reports {
let verb = if r.changed { "wrote" } else { "unchanged" };
println!(
"{verb}: {} ({} section{})",
r.path.display(),
r.sections.len(),
if r.sections.len() == 1 { "" } else { "s" }
);
}
}
if cfg.wants_pdf() {
#[cfg(feature = "docs-export")]
{
render_book_pdf(&layout, &repo_root, &ctx)?;
}
#[cfg(not(feature = "docs-export"))]
{
anyhow::bail!(
"{} requests `pdf` output, but this nornir was built without the \
`docs-export` feature. Reinstall with `cargo install nornir --features cli` \
(or `--features docs-export`).",
docs::DocsRenderCfg::path(&layout).display()
);
}
}
Ok(())
}
#[cfg(feature = "docs-export")]
fn render_book_pdf(
layout: &docs::RepoLayout,
repo_root: &std::path::Path,
ctx: &docs::Ctx,
) -> Result<()> {
let format = docs::DocFormat::parse("pdf")?;
let (bytes, sources) = docs::build_book(repo_root, ctx, format)?;
let out = layout.export_path("book", format.extension());
if let Some(parent) = out.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&out, &bytes)?;
println!(
"wrote {} ({} bytes) from {} source(s):",
out.display(),
bytes.len(),
sources.len()
);
for s in &sources {
let shown = s.strip_prefix(repo_root).unwrap_or(s);
println!(" + {}", shown.display());
}
Ok(())
}
fn run_docs_check(loaded: &Loaded, a: &RepoArg) -> Result<()> {
let (layout, repo_root, last, history) = docs_ctx_for(loaded, &a.repo)?;
let ctx = docs::Ctx::new(&repo_root, &loaded.workspace_root, last.as_ref())
.with_history(&history);
docs::render_check_all(&layout, &ctx)?;
println!("ok: every doc in {} matches its source", a.repo);
Ok(())
}
#[cfg(feature = "docs-export")]
fn run_docs_export(loaded: &Loaded, a: DocsExportArgs) -> Result<()> {
let (layout, repo_root, last, history) = docs_ctx_for(loaded, &a.repo.repo)?;
let ctx = docs::Ctx::new(&repo_root, &loaded.workspace_root, last.as_ref())
.with_history(&history);
let _ = docs::render_all(&layout, &ctx)?;
let format = docs::DocFormat::parse(&a.format)?;
let bytes = docs::export_repo(&repo_root, format)?;
let cargo = std::fs::read_to_string(repo_root.join("Cargo.toml")).unwrap_or_default();
let parsed: toml::Value = toml::from_str(&cargo).unwrap_or(toml::Value::Table(Default::default()));
let pkg = parsed.get("package");
let version = pkg
.and_then(|p| p.get("version"))
.and_then(|v| v.as_str())
.unwrap_or("0.0.0")
.to_string();
let ext = format.extension();
let out = layout.export_path("README", ext);
if let Some(parent) = out.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&out, &bytes)?;
let workspace = workspace_name(loaded);
let (git_sha, _) = read_git_head(&repo_root).unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(&iceberg_warehouse_root(loaded))?;
let record = docs::record_doc_export(
&wh, &workspace, &a.repo.repo, "README", &version, ext, &git_sha, &bytes,
)?;
if let Some(out_path) = a.out {
if let Some(parent) = out_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&out_path, &bytes)?;
println!("wrote {} ({} bytes, format={})", out_path.display(), bytes.len(), a.format);
}
println!("wrote {} ({} bytes, format={})", out.display(), bytes.len(), a.format);
println!(
"historized README {} ({} bytes, sha256 {}…, git {})",
record.version,
record.byte_len,
&record.sha256[..12.min(record.sha256.len())],
&record.git_sha[..record.git_sha.len().min(12)],
);
Ok(())
}
#[cfg(feature = "docs-export")]
fn run_docs_book(loaded: &Loaded, a: DocsBookArgs) -> Result<()> {
let (layout, repo_root, last, history) = docs_ctx_for(loaded, &a.repo.repo)?;
let ctx = docs::Ctx::new(&repo_root, &loaded.workspace_root, last.as_ref())
.with_history(&history);
let _ = docs::render_all(&layout, &ctx)?;
let format = docs::DocFormat::parse(&a.format)?;
let (bytes, sources) = docs::build_book(&repo_root, &ctx, format)?;
println!("assembled {} source(s) into the book:", sources.len());
for s in &sources {
let shown = s.strip_prefix(&repo_root).unwrap_or(s);
println!(" + {}", shown.display());
}
let version = docs::resolve_version(&repo_root);
let ext = format.extension();
let out = layout.export_path("book", ext);
if let Some(parent) = out.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&out, &bytes)?;
println!("wrote {} ({} bytes, format={})", out.display(), bytes.len(), a.format);
if let Some(extra) = a.out {
if let Some(parent) = extra.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&extra, &bytes)?;
println!("wrote {} ({} bytes, format={})", extra.display(), bytes.len(), a.format);
}
let workspace = workspace_name(loaded);
let (git_sha, _) = read_git_head(&repo_root).unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(&iceberg_warehouse_root(loaded))?;
let record = docs::record_doc_export(
&wh, &workspace, &a.repo.repo, "book", &version, ext, &git_sha, &bytes,
)?;
println!(
"historized book {} ({} bytes, sha256 {}…, git {})",
record.version,
record.byte_len,
&record.sha256[..12.min(record.sha256.len())],
&record.git_sha[..record.git_sha.len().min(12)],
);
Ok(())
}
fn run_docs_history(loaded: &Loaded, a: &DocsHistoryArgs) -> Result<()> {
let _ = repo_or_err(loaded, &a.repo.repo)?;
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(&iceberg_warehouse_root(loaded))?;
let filter = docs::ExportFilter {
doc_name: a.doc.clone(),
version: a.version.clone(),
format: a.format.clone(),
limit: Some(a.limit),
};
let rows = docs::list_doc_exports(&wh, &a.repo.repo, &filter)?;
if rows.is_empty() {
println!("no exports historized for {}", a.repo.repo);
return Ok(());
}
println!(
"{:<19} {:<8} {:<7} {:>9} {:<12} {:<12} {}",
"generated_at", "doc", "format", "bytes", "sha256", "git", "export_id"
);
for r in &rows {
let stamp = r.generated_at.get(0..19).unwrap_or(&r.generated_at);
println!(
"{:<19} {:<8} {:<7} {:>9} {:<12} {:<12} {}",
stamp,
r.doc_name,
r.format,
r.byte_len,
&r.sha256[..12.min(r.sha256.len())],
&r.git_sha[..12.min(r.git_sha.len())],
r.export_id,
);
}
println!("({} row{})", rows.len(), if rows.len() == 1 { "" } else { "s" });
Ok(())
}
fn run_docs_index(loaded: &Loaded, a: &DocsIndexArgs) -> Result<()> {
let (layout, repo_root, last, history) = docs_ctx_for(loaded, &a.repo.repo)?;
let ctx = docs::Ctx::new(&repo_root, &loaded.workspace_root, last.as_ref())
.with_history(&history);
let _ = docs::render_all(&layout, &ctx)?;
let (stats, dir) = docs::build_docs_index(&repo_root, &a.repo.repo)?;
println!(
"docs-index {} :: scanned={} added={} updated={} unchanged={} too_large={} errors={}",
dir.display(),
stats.scanned,
stats.added,
stats.updated,
stats.skipped_unchanged,
stats.skipped_too_large,
stats.errors,
);
if !a.skip_snapshot {
let workspace = workspace_name(loaded);
let (sha, branch) =
read_git_head(&repo_root).unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(&iceberg_warehouse_root(loaded))?;
match docs::snapshot_docs_index(&wh, &workspace, &a.repo.repo, &sha, &branch, &repo_root) {
Ok(snap) => println!(
"✓ historized docs-index {} ({} blob(s), {} bytes, sha={})",
snap.snapshot_id,
snap.blob_count,
snap.total_bytes,
&snap.git_sha[..snap.git_sha.len().min(12)],
),
Err(e) => eprintln!("docs-index snapshot skipped: {e}"),
}
}
Ok(())
}
fn run_docs_search(loaded: &Loaded, a: &DocsSearchArgs) -> Result<()> {
let _ = repo_or_err(loaded, &a.repo.repo)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo.repo);
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(&iceberg_warehouse_root(loaded))?;
let hits = docs::search_docs(
&repo_root,
&wh,
&a.repo.repo,
a.sha.as_deref(),
&a.query,
a.limit,
)?;
for h in &hits {
println!("{:>6.2} {}\n {}", h.score, h.path, h.snippet);
}
if hits.is_empty() {
eprintln!("# no hits");
}
Ok(())
}
fn run_introspect(op: IntrospectOp, loaded: &Loaded) -> Result<()> {
if let Some(server) = server_target() {
let bin = |p: &std::path::Path| p.to_string_lossy().to_string();
match &op {
IntrospectOp::Symbols(a) => return introspect_symbols_remote(&server, &bin(&a.binary), "symbols", "", 0),
IntrospectOp::SymbolLookup(a) => return introspect_symbols_remote(&server, &bin(&a.binary), "lookup", &a.pattern, a.limit),
IntrospectOp::DefinedIn(a) => return introspect_symbols_remote(&server, &bin(&a.binary), "defined-in", &a.file, a.limit),
IntrospectOp::Callers(a) => return introspect_calls_remote(&server, &bin(&a.binary), "callers", &a.name, None),
IntrospectOp::Callees(a) => return introspect_calls_remote(&server, &bin(&a.binary), "callees", &a.name, None),
IntrospectOp::PathBetween(a) => return introspect_calls_remote(&server, &bin(&a.binary), "path", &a.from, Some(&a.to)),
_ => {}
}
}
match op {
IntrospectOp::Depgraph(a) => {
let _ = repo_or_err(loaded, &a.repo)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo);
let g = introspect::depgraph::extract(&repo_root)?;
print!("{}", g.to_mermaid());
}
IntrospectOp::Symbols(a) => {
let syms = introspect::artifact::extract_symbols(&a.binary, &loaded.workspace_root)?;
for s in &syms {
println!("{}", serde_json::to_string(s)?);
}
eprintln!("# {} symbols", syms.len());
if !a.no_save {
let repo = a.repo.as_deref().unwrap_or("_workspace");
let git_root = match &a.repo {
Some(name) => {
let _ = repo_or_err(loaded, name)?; config::repo_dir_resolved(&loaded.workspace_root, name)
}
None => loaded.workspace_root.clone(),
};
let warehouse_root = iceberg_warehouse_root(loaded);
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(&warehouse_root)
.with_context(|| format!("open warehouse at {}", warehouse_root.display()))?;
let (sha, branch) = read_git_head(&git_root)
.unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
let cache_dir = warehouse_root
.parent()
.unwrap_or(warehouse_root.as_path())
.join("cache/dwarf")
.join(repo);
let calls =
introspect::callgraph_dwarf::extract_callgraph(&a.binary, &loaded.workspace_root)?;
let facts = introspect::persist::DwarfFacts { symbols: syms, calls };
let snap = introspect::persist::snapshot_facts(
&wh, &workspace_name(loaded), repo, &sha, &branch, &facts, &cache_dir,
)?;
eprintln!(
"# persisted {} symbols + {} call edges → dwarf snapshot {} (repo={}, sha={})",
facts.symbols.len(),
facts.calls.len(),
snap.snapshot_id,
repo,
&snap.git_sha[..snap.git_sha.len().min(12)],
);
}
}
IntrospectOp::SymbolLookup(a) => {
let syms = introspect::artifact::extract_symbols(&a.binary, &loaded.workspace_root)?;
let hits = introspect::artifact::lookup(&syms, &a.pattern);
for s in hits.iter().take(a.limit) {
println!(
"{}:{} {}",
s.file,
s.line.map(|n| n.to_string()).unwrap_or_else(|| "?".into()),
s.name_demangled
);
}
eprintln!("# {} matches (showing {})", hits.len(), hits.len().min(a.limit));
}
IntrospectOp::DefinedIn(a) => {
let syms = introspect::artifact::extract_symbols(&a.binary, &loaded.workspace_root)?;
let hits = introspect::artifact::defined_in(&syms, &a.file);
for s in hits.iter().take(a.limit) {
println!(
"{}:{} {}",
s.file,
s.line.map(|n| n.to_string()).unwrap_or_else(|| "?".into()),
s.name_demangled
);
}
eprintln!("# {} matches (showing {})", hits.len(), hits.len().min(a.limit));
}
IntrospectOp::Callgraph(a) => {
let edges = introspect::callgraph_dwarf::extract_callgraph(&a.binary, &loaded.workspace_root)?;
for e in &edges {
println!("{}", serde_json::to_string(e)?);
}
eprintln!("# {} inline edges", edges.len());
}
IntrospectOp::CallgraphLlvm(a) => {
let crates_owned: Option<Vec<String>> = a.crates
.as_ref()
.map(|s| s.split(',').map(|s| s.trim().to_string()).filter(|s| !s.is_empty()).collect());
let crates_ref: Option<Vec<&str>> = crates_owned.as_ref().map(|v| v.iter().map(|s| s.as_str()).collect());
let edges = if a.path.is_dir() {
introspect::callgraph_llvm::extract_from_dir(&a.path, crates_ref.as_deref())?
} else {
introspect::callgraph_llvm::extract_from_files(&[a.path.clone()], crates_ref.as_deref())?
};
for e in &edges {
println!("{}", serde_json::to_string(e)?);
}
eprintln!("# {} direct edges", edges.len());
}
IntrospectOp::Callers(a) => {
let edges = introspect::callgraph_dwarf::extract_callgraph(&a.binary, &loaded.workspace_root)?;
let cg = introspect::callgraph_dwarf::Callgraph::from_edges(&edges);
for n in cg.callers_of(&a.name) { println!("{n}"); }
}
IntrospectOp::Callees(a) => {
let edges = introspect::callgraph_dwarf::extract_callgraph(&a.binary, &loaded.workspace_root)?;
let cg = introspect::callgraph_dwarf::Callgraph::from_edges(&edges);
for n in cg.callees_of(&a.name) { println!("{n}"); }
}
IntrospectOp::PathBetween(a) => {
let edges = introspect::callgraph_dwarf::extract_callgraph(&a.binary, &loaded.workspace_root)?;
let cg = introspect::callgraph_dwarf::Callgraph::from_edges(&edges);
match cg.path_between(&a.from, &a.to) {
Some(path) => for n in path { println!("{n}"); },
None => eprintln!("no path from {} to {}", a.from, a.to),
}
}
}
Ok(())
}
fn run_warehouse(op: WarehouseOp, loaded: &Loaded) -> Result<()> {
let wh = warehouse::open(&loaded.nornir.storage, &loaded.workspace_root)?;
match op {
WarehouseOp::ImportJsonl(a) => {
let repo = repo_or_err(loaded, &a.repo)?;
let path = config::repo_dir_resolved(&loaded.workspace_root, &a.repo)
.join(if repo.history.is_empty() { "bench_history.jsonl" } else { &repo.history });
let runs = bench::history::read_all(&path)?;
let mut imported = 0usize;
for r in &runs {
if r.machine.trim().is_empty() {
eprintln!("skipping run dated {}: no machine", r.date);
continue;
}
wh.append_bench_run(&a.repo, r)?;
imported += 1;
}
println!("imported {imported}/{} runs from {}", runs.len(), path.display());
}
WarehouseOp::Query(args) => {
let filter = warehouse::BenchFilter {
repo: Some(args.repo),
machine: args.machine,
limit: args.last,
};
for r in wh.query_bench_runs(&filter)? {
println!("{}", serde_json::to_string(&r)?);
}
}
}
Ok(())
}
fn run_index(op: IndexOp, loaded: &Loaded) -> Result<()> {
let open_for_read = || -> Result<index::Index> {
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(
&iceberg_warehouse_root(loaded),
)?;
let (idx, _restored) = index::Index::open_or_restore_at(
&loaded.workspace_root,
&cache_index_dir(loaded),
&wh,
"_workspace",
None,
)?;
Ok(idx)
};
match op {
IndexOp::Build(a) => {
let index_dir = cache_index_dir(loaded);
if a.clean && index_dir.exists() {
std::fs::remove_dir_all(&index_dir)
.with_context(|| format!("clean index dir {}", index_dir.display()))?;
println!("cleaned index dir {} (rebuilding from scratch)", index_dir.display());
}
let repos: Vec<String> = loaded.nornir.repo.keys().cloned().collect();
let idx = index::Index::open_at(&loaded.workspace_root, &index_dir)?
.with_repo_scope(repos);
let stats = idx.build()?;
println!(
"scanned={} added={} updated={} unchanged={} too_large={} errors={}",
stats.scanned,
stats.added,
stats.updated,
stats.skipped_unchanged,
stats.skipped_too_large,
stats.errors,
);
if !a.no_snapshot {
let (snap_dir, repo_label, sha, branch) = resolve_index_target(loaded, None)?;
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(
&iceberg_warehouse_root(loaded),
)?;
match nornir::index::snapshot::snapshot_to_iceberg(
&wh, &workspace_name(loaded), &repo_label, &sha, &branch, &snap_dir,
) {
Ok(snap) => println!(
"✓ snapshot {} ({} blob(s), {} bytes, sha={})",
snap.snapshot_id,
snap.blob_count,
snap.total_bytes,
&snap.git_sha[..snap.git_sha.len().min(12)],
),
Err(e) => eprintln!("⚠ index snapshot skipped: {e:#}"),
}
}
}
IndexOp::Search(a) => {
if let Some(server) = server_target() {
return search_remote(&server, &a.query, a.corpus.as_deref(), a.repo.as_deref(), a.limit);
}
let idx = open_for_read()?;
let corpus = match a.corpus.as_deref() {
None => None,
Some(s) => Some(
index::Corpus::parse(s)
.ok_or_else(|| anyhow!("unknown corpus: {s}"))?,
),
};
let hits = idx.search(&a.query, corpus, a.repo.as_deref(), a.limit)?;
for h in &hits {
println!(
"{:>6.2} [{}/{}] {}\n {}",
h.score,
h.corpus,
if h.repo.is_empty() { "-" } else { &h.repo },
h.path,
h.snippet
);
}
}
IndexOp::Stats => {
if let Some(server) = server_target() {
return index_stats_remote(&server);
}
let idx = open_for_read()?;
let s = idx.stats()?;
println!("total: {}", s.total);
for (c, n) in &s.by_corpus {
println!(" {:<14} {}", c, n);
}
}
IndexOp::Snapshot(a) => {
let (index_dir, repo_label, sha, branch) = resolve_index_target(loaded, a.repo.as_deref())?;
if let Some(server) = server_target() {
return index_snapshot_remote(&server, &repo_label, &a.workspace, &sha, &branch);
}
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(
&iceberg_warehouse_root(loaded),
)?;
let snap = nornir::index::snapshot::snapshot_to_iceberg(
&wh,
&a.workspace,
&repo_label,
&sha,
&branch,
&index_dir,
)?;
println!(
"✓ snapshot {} ({}, {} blob(s), {} bytes, sha={})",
snap.snapshot_id,
snap.repo,
snap.blob_count,
snap.total_bytes,
&snap.git_sha[..snap.git_sha.len().min(12)],
);
}
IndexOp::Restore(a) => {
let (default_dir, repo_label, _, _) = resolve_index_target(loaded, a.repo.as_deref())?;
let into = a.into.unwrap_or(default_dir);
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(
&iceberg_warehouse_root(loaded),
)?;
let snap = nornir::index::snapshot::restore_from_iceberg(
&wh,
&repo_label,
a.sha.as_deref(),
&into,
)?;
println!(
"✓ restored snapshot {} into {} ({} blob(s), {} bytes, sha={})",
snap.snapshot_id,
into.display(),
snap.blob_count,
snap.total_bytes,
&snap.git_sha[..snap.git_sha.len().min(12)],
);
}
IndexOp::Upload(a) => {
let (index_dir, repo_label, sha, branch) = resolve_index_target(loaded, a.repo.as_deref())?;
let Some(server) = server_target() else {
bail!(
"`index upload` pushes a local index to a running server, but NORNIR_SERVER \
is not set. For the embedded warehouse use `nornir index snapshot`."
);
};
return index_upload_remote(&server, &index_dir, &repo_label, &a.workspace, &sha, &branch);
}
}
Ok(())
}
#[cfg(feature = "vector")]
fn run_vector(op: VectorOp, loaded: &Loaded) -> Result<()> {
use nornir::vector::store;
let wh = || -> Result<nornir::warehouse::iceberg::IcebergWarehouse> {
nornir::warehouse::iceberg::IcebergWarehouse::open(&iceberg_warehouse_root(loaded))
};
match op {
VectorOp::Index(a) => {
let _ = repo_or_err(loaded, &a.repo)?;
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo);
let (sha, branch) = read_git_head(&repo_root)
.unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
let files = store::collect_rust_sources(&repo_root);
if files.is_empty() {
println!("no .rs files under {}", repo_root.display());
return Ok(());
}
let embedder = load_vector_embedder()?;
println!(
"vectorizing {} ({} files) @ {} with {} …",
a.repo,
files.len(),
&sha[..sha.len().min(12)],
vector_backend_and_model(),
);
let opts = nornir::vector::chunk::ChunkOptions::default();
let workspace = workspace_name(loaded);
let snap = store::index_repo(
&wh()?,
&store::RepoRef {
workspace: &workspace,
repo: &a.repo,
git_sha: &sha,
branch: &branch,
complete: true,
},
&files,
&opts,
&*embedder,
)?;
println!(
"✓ snapshot {} — {} occurrences, {} new vector(s) embedded",
snap.snapshot_id, snap.occurrences, snap.new_vectors
);
}
VectorOp::Search(a) => run_vector_search(loaded, &a)?,
VectorOp::Stats => {
let counts = store::warehouse_stats(&wh()?)?;
println!("embeddings: {}", counts.embeddings);
println!("index snapshots: {}", counts.snapshots);
println!("manifest occurrences:{}", counts.occurrences);
}
VectorOp::Doctor => run_vector_doctor()?,
VectorOp::SetupCuda(a) => run_vector_setup_cuda(&a)?,
}
Ok(())
}
#[cfg(feature = "vector")]
fn run_vector_setup_cuda(a: &SetupCudaArgs) -> Result<()> {
#[cfg(feature = "embed-ort")]
{
let (copied, missing) = nornir::vector::cuda::setup(&a.target)?;
println!("✓ CUDA runtime pinned into {}", a.target.display());
println!(" copied : {}", if copied.is_empty() { "(none)".into() } else { copied.join(", ") });
if !missing.is_empty() {
println!(" MISSING: {}", missing.join(", "));
println!(
" → no local source found for these. Install them once (e.g. \
`pip download nvidia-cudnn-cu12 nvidia-cublas-cu12 nvidia-cuda-runtime-cu12` \
into a venv, or the CUDA toolkit) and re-run; downloading from \
NVIDIA's redist CDN directly is a planned follow-up. See .nornir/vector.md."
);
}
println!(" verify : nornir vector doctor");
Ok(())
}
#[cfg(not(feature = "embed-ort"))]
{
let _ = a;
bail!(
"setup-cuda needs an embed-ort build (`cargo install nornir --features cli,embed-ort`); \
this build has no CUDA path to set up"
);
}
}
#[cfg(feature = "robot")]
fn run_robot(op: RobotOp, loaded: &Loaded) -> Result<()> {
use nornir::robot;
match op {
RobotOp::Run(a) => {
let scenarios = robot::parse_scenarios(&a.scenarios)?;
let order = robot::topo_order(&scenarios)?;
if a.dry_run {
println!("plan-tests-DAG ({} scenario(s)):", scenarios.len());
for &i in &order {
let s = &scenarios[i];
println!(
" {} [{} {}] needs=[{}] · {} step(s)",
s.name, s.tier, s.area, s.needs.join(", "), s.steps.len()
);
}
return Ok(());
}
let base = std::env::var("BASE_URL").ok().filter(|s| !s.is_empty()).unwrap_or(a.base_url);
let repo_root = config::repo_dir_resolved(&loaded.workspace_root, &a.repo);
let sha = read_git_head(&repo_root).map(|(s, _)| s).unwrap_or_else(|_| "unknown".into());
let report = robot::run_scenarios(&scenarios, &a.webdriver, &base, &a.repo, &sha)?;
println!(
"robot: {} total · {} passed · {} failed · {} skipped",
report.total, report.passed, report.failed, report.skipped
);
let hist = a.scenarios.parent().unwrap_or(std::path::Path::new(".")).join("robot_history.jsonl");
robot::append_history(&hist, &report)?;
println!("report appended to {}", hist.display());
if report.failed > 0 {
bail!("{} scenario(s) failed", report.failed);
}
}
}
Ok(())
}
#[cfg(feature = "vector")]
fn run_vector_doctor() -> Result<()> {
#[cfg(feature = "embed-ort")]
{
use nornir::vector::store::Embedder;
use std::io::Write;
let (_ready, report) = nornir::vector::cuda::preflight();
print!("{report}");
print!("\n loading model + test embed … ");
std::io::stdout().flush().ok();
match nornir::vector::embed_ort::OrtEmbedder::load() {
Ok(e) => {
let t = std::time::Instant::now();
match e.embed(&["fn main() {}".to_string()]) {
Ok(v) => println!(
"ok — {} dims in {} ms (cuda_ready={})",
v.first().map(|x| x.len()).unwrap_or(0),
t.elapsed().as_millis(),
e.cuda_ready(),
),
Err(err) => println!("embed failed: {err:#}"),
}
}
Err(err) => println!("model load failed: {err:#}"),
}
Ok(())
}
#[cfg(all(feature = "embed-tract", not(feature = "embed-ort")))]
{
println!(
"This build uses embed-tract (pure-Rust CPU): embeddings run on the CPU, \
there is no CUDA/GPU path to check."
);
Ok(())
}
#[cfg(not(any(feature = "embed-ort", feature = "embed-tract")))]
{
println!(
"This build has `vector` but no embedder. Rebuild with --features embed-ort \
(GPU/CUDA → CPU fallback) or --features embed-tract (pure-Rust CPU)."
);
Ok(())
}
}
#[cfg(feature = "vector")]
fn run_vector_search(loaded: &Loaded, a: &VectorSearchArgs) -> Result<()> {
let mode = a.mode.to_ascii_lowercase();
match mode.as_str() {
"lexical" => {
let idx = {
let wh = nornir::warehouse::iceberg::IcebergWarehouse::open(
&iceberg_warehouse_root(loaded),
)?;
let (i, _) = index::Index::open_or_restore_at(
&loaded.workspace_root,
&cache_index_dir(loaded),
&wh,
"_workspace",
None,
)?;
i
};
let hits = idx.search(&a.query, None, a.repo.as_deref(), a.limit)?;
for h in &hits {
println!(
"{:>6.2} [{}/{}] {}",
h.score,
h.corpus,
if h.repo.is_empty() { "-" } else { &h.repo },
h.path
);
}
}
"semantic" | "hybrid" => run_vector_semantic(loaded, a, mode == "hybrid")?,
other => bail!("unknown --mode `{other}` (expected lexical|semantic|hybrid)"),
}
Ok(())
}
#[cfg(all(feature = "vector", any(feature = "embed-tract", feature = "embed-ort")))]
fn run_vector_semantic(loaded: &Loaded, a: &VectorSearchArgs, hybrid: bool) -> Result<()> {
use nornir::vector::store;
let repo = a
.repo
.as_deref()
.ok_or_else(|| anyhow!("semantic/hybrid search needs --repo (embeddings are per-repo)"))?;
let wh =
nornir::warehouse::iceberg::IcebergWarehouse::open(&iceberg_warehouse_root(loaded))?;
let embedder = load_vector_embedder()?;
let mp = embedder.profile().id();
let q = embedder.embed(std::slice::from_ref(&a.query))?;
let sem = store::search(&wh, repo, a.sha.as_deref(), &mp, &q[0], a.limit)?;
if !hybrid {
for (score, occ) in &sem {
println!("{:>6.3} {}:{}-{}", score, occ.file, occ.start_line, occ.end_line);
}
return Ok(());
}
let (i, _) = index::Index::open_or_restore_at(
&loaded.workspace_root,
&cache_index_dir(loaded),
&wh,
"_workspace",
None,
)?;
let lex = i.search(&a.query, None, Some(repo), a.limit.max(10))?;
let mut score: std::collections::HashMap<String, f32> = std::collections::HashMap::new();
const K: f32 = 60.0; for (rank, (_s, occ)) in sem.iter().enumerate() {
*score.entry(occ.file.clone()).or_default() += 1.0 / (K + rank as f32 + 1.0);
}
for (rank, h) in lex.iter().enumerate() {
*score.entry(h.path.clone()).or_default() += 1.0 / (K + rank as f32 + 1.0);
}
let mut ranked: Vec<(String, f32)> = score.into_iter().collect();
ranked.sort_by(|x, y| y.1.total_cmp(&x.1));
ranked.truncate(a.limit);
for (path, s) in &ranked {
println!("{:>6.4} {}", s, path);
}
Ok(())
}
#[cfg(all(feature = "vector", not(any(feature = "embed-tract", feature = "embed-ort"))))]
fn run_vector_semantic(_loaded: &Loaded, _a: &VectorSearchArgs, _hybrid: bool) -> Result<()> {
bail!(
"semantic/hybrid search needs an embedder: rebuild nornir with \
`--features embed-tract` (CPU) or `--features embed-ort` (GPU)"
)
}
#[cfg(all(feature = "vector", any(feature = "embed-tract", feature = "embed-ort")))]
fn vector_backend_name() -> &'static str {
nornir::vector::embedder_backend()
}
#[cfg(all(feature = "vector", not(any(feature = "embed-tract", feature = "embed-ort"))))]
fn vector_backend_name() -> &'static str {
"none (no embedder feature)"
}
#[cfg(feature = "vector")]
fn vector_backend_and_model() -> String {
format!(
"{}, model {}",
vector_backend_name(),
nornir::vector::selected_model_desc()
)
}
#[cfg(all(feature = "vector", any(feature = "embed-tract", feature = "embed-ort")))]
fn load_vector_embedder() -> Result<Box<dyn nornir::vector::store::Embedder>> {
nornir::vector::load_embedder()
}
#[cfg(all(feature = "vector", not(any(feature = "embed-tract", feature = "embed-ort"))))]
fn load_vector_embedder() -> Result<Box<dyn nornir::vector::store::Embedder>> {
bail!(
"this nornir was built without an embedder: rebuild with \
`--features embed-tract` (CPU) or `--features embed-ort` (GPU)"
)
}
fn resolve_index_target(
loaded: &Loaded,
repo: Option<&str>,
) -> Result<(PathBuf, String, String, String)> {
let dir = cache_index_dir(loaded);
let (label, git_root) = match repo {
Some(name) => {
let _ = repo_or_err(loaded, name)?;
(
name.to_string(),
config::repo_dir_resolved(&loaded.workspace_root, name),
)
}
None => (
"_workspace".to_string(),
loaded.workspace_root.join("workspace_holger"),
),
};
let (sha, branch) = read_git_head(&git_root).unwrap_or_else(|_| ("unknown".into(), "unknown".into()));
Ok((dir, label, sha, branch))
}
fn iceberg_warehouse_root(loaded: &Loaded) -> PathBuf {
loaded.warehouse_root()
}
fn funnel_root(loaded: &Loaded) -> PathBuf {
if loaded.nornir.storage.local_path.is_empty() {
loaded.funnel_root()
} else {
nornir::funnel::Store::resolve_root(
&loaded.workspace_root,
&loaded.nornir.storage.local_path,
None,
)
}
}
fn cache_index_dir(loaded: &Loaded) -> PathBuf {
let storage = &loaded.nornir.storage;
if storage.local_path.is_empty() {
loaded.workspace_root.join(".nornir/cache/index")
} else {
loaded.workspace_root.join(&storage.local_path).join("cache/index")
}
}
#[allow(dead_code)]
fn workspace_name(loaded: &Loaded) -> String {
loaded
.workspace_root
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("_workspace")
.to_string()
}
fn read_git_head(repo_root: &Path) -> Result<(String, String)> {
nornir::gitio::head_sha_and_branch(repo_root)
.with_context(|| format!("read git HEAD in {}", repo_root.display()))
}
fn repo_or_err<'a>(loaded: &'a Loaded, name: &str) -> Result<&'a config::Repo> {
loaded
.nornir
.repo
.get(name)
.with_context(|| format!("no [repo.{name}] in {}", loaded.config_path.display()))
}
fn yn(b: bool) -> &'static str {
if b { "yes" } else { "no" }
}