#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use std::collections::HashMap;
use tokio::sync::Mutex;
use tonic::{transport::Server, Request, Response, Status};
use nornir::config::{self, Loaded};
use nornir::{bench, docs, guard, index, introspect, knowledge, release};
use nornir::warehouse::iceberg::IcebergWarehouse;
use nornir::warehouse::dep_graph::{query_dep_graph_snapshots, topo_order_from_edges};
pub mod pb {
tonic::include_proto!("nornir.v1");
}
use pb::bench_server::{Bench as BenchSvcTrait, BenchServer};
use pb::telemetry_server::{Telemetry as TelemetrySvcTrait, TelemetryServer};
use pb::docs_server::{Docs as DocsSvcTrait, DocsServer};
use pb::dwarf_server::{Dwarf as DwarfSvcTrait, DwarfServer};
use pb::guard_server::{Guard as GuardSvcTrait, GuardServer};
use pb::health_server::{Health, HealthServer};
use pb::index_server::{Index as IndexSvcTrait, IndexServer};
use pb::introspect_server::{Introspect as IntrospectSvcTrait, IntrospectServer};
use pb::knowledge_server::{Knowledge as KnowledgeSvcTrait, KnowledgeServer};
use pb::mimir_server::{Mimir as MimirSvcTrait, MimirServer};
use pb::release_server::{Release as ReleaseSvcTrait, ReleaseServer};
use pb::repos_server::{Repos as ReposSvcTrait, ReposServer};
use pb::search_server::{Search as SearchSvcTrait, SearchServer};
use pb::vector_server::{Vector as VectorSvcTrait, VectorServer};
use pb::funnel_server::{Funnel as FunnelSvcTrait, FunnelServer};
use pb::viz_server::{Viz as VizSvcTrait, VizServer};
use pb::warehouse_server::{Warehouse as WarehouseSvcTrait, WarehouseServer};
use pb::workspaces_server::{Workspaces as WorkspacesSvcTrait, WorkspacesServer};
use pb::ops_server::{Ops as OpsSvcTrait, OpsServer};
use nornir::funnel::event::{Event as FunnelEvent, NodeStatus, PlanStatus};
use nornir::funnel::ids::{IdeaId, NodeId, PlanId};
use nornir::funnel::store::Store as FunnelStore;
use nornir::funnel::topo::topo_ready;
use pb::{
BenchHistoryResponse, BinaryOnly, CallQuery, DefinedInRequest, DocExport as PbDocExport,
DocsHistoryRequest, DocsHistoryResponse, DocsResponse, Empty, GateAllResult,
GateFailure, GateResult, GuardPath, GuardReport, IndexBlobFrame, IndexStatsResponse, Kv, Kvf,
KnowledgeCall, KnowledgeCallPathQuery, KnowledgeCallQuery, KnowledgeCalls, KnowledgeSymbol, KnowledgeSymbolQuery,
KnowledgeSymbols, NameList, OrderRequest, OrderResponse, PathBetweenRequest, PingResponse,
ProgressEvent, RegisterRepoRequest, RegisterRepoResponse, ReloadResponse, RepoList, RepoOnly,
RepoSummary, SearchHit, SearchRequest, SearchResponse, SnapshotRequest, SnapshotResponse,
SubmitBenchRequest, SubmitBenchResponse, Symbol as PbSymbol, SymbolList, SymbolLookupRequest,
UnregisterRepoRequest, UnregisterRepoResponse, RegisterWorkspaceRequest, WorkspaceFetchReport,
WorkspaceFetchRequest, WorkspaceList, WorkspaceMember, WorkspaceName, WorkspaceRecord,
OpTarget, RunBenchRequest, RunOpResult, RunReleaseRequest, RunTestMatrixRequest,
ScipIngestRequest,
};
struct WorkspaceCtx {
name: String,
loaded: Mutex<Loaded>,
index: Arc<index::Index>,
warehouse: Arc<IcebergWarehouse>,
mimir: Mutex<Option<Arc<GraphCtx>>>,
funnel: Mutex<Option<FunnelStore>>,
jobs: Arc<nornir::jobs::JobStore>,
}
struct GraphCtx {
graph: nornir::warehouse::dep_graph::WorkspaceGraph,
workspace_name: String,
}
impl WorkspaceCtx {
async fn graph(&self) -> Result<Arc<GraphCtx>, Status> {
if let Some(g) = self.mimir.lock().await.as_ref() {
return Ok(g.clone());
}
let (workspace_root, config_path, members) = {
let l = self.loaded.lock().await;
let members: Vec<String> = l.nornir.repo.keys().cloned().collect();
(l.workspace_root.clone(), l.config_path.clone(), members)
};
let ws_name = self.name.clone();
let wh = self.warehouse.clone();
let (graph, workspace_name) = tokio::task::spawn_blocking(move || {
let git_root = workspace_root.clone();
match nornir::mimir::build_graph_at(&workspace_root, &config_path) {
Ok(g) => Ok(g),
Err(descriptor_err) => {
match nornir::mimir::build_graph_from_warehouse(&wh, &ws_name, &members, &git_root) {
Ok(Some(g)) => Ok((g, ws_name.clone())),
Ok(None) => Err(anyhow::anyhow!(
"no dependency graph for workspace `{ws_name}`: \
no `nornir-workspace.toml` in the checkout ({descriptor_err}) \
and no `dep_graph_edges` recorded yet (republish to populate it)"
)),
Err(wh_err) => Err(anyhow::anyhow!(
"dep-graph descriptor missing ({descriptor_err}) and \
warehouse fallback failed: {wh_err:#}"
)),
}
}
}
})
.await
.map_err(internal)?
.map_err(internal)?;
let arc = Arc::new(GraphCtx { graph, workspace_name });
*self.mimir.lock().await = Some(arc.clone());
Ok(arc)
}
async fn funnel_root(&self) -> PathBuf {
let l = self.loaded.lock().await;
FunnelStore::resolve_root(&l.workspace_root, &l.nornir.storage.local_path, None)
}
async fn ensure_funnel(&self, read_only: bool) -> Result<(), Status> {
let mut fg = self.funnel.lock().await;
let needs_open = match fg.as_ref() {
None => true,
Some(s) => !read_only && s.is_read_only(),
};
if needs_open {
let root = self.funnel_root().await;
let store = if read_only {
FunnelStore::open_read_only_async(root).await
} else {
FunnelStore::open_async(root).await
}
.map_err(internal)?;
*fg = Some(store);
}
Ok(())
}
}
struct AppState {
workspaces: std::sync::RwLock<HashMap<String, Arc<WorkspaceCtx>>>,
#[cfg(any(feature = "embed-tract", feature = "embed-ort"))]
embedder: Mutex<Option<Arc<dyn nornir::vector::store::Embedder>>>,
}
impl AppState {
fn get_ws(&self, name: &str) -> Option<Arc<WorkspaceCtx>> {
self.workspaces.read().expect("workspaces lock poisoned").get(name).cloned()
}
fn insert_ws(&self, name: String, ctx: Arc<WorkspaceCtx>) {
self.workspaces.write().expect("workspaces lock poisoned").insert(name, ctx);
}
fn remove_ws(&self, name: &str) -> Option<Arc<WorkspaceCtx>> {
self.workspaces.write().expect("workspaces lock poisoned").remove(name)
}
fn served_names(&self) -> Vec<String> {
let mut names: Vec<String> =
self.workspaces.read().expect("workspaces lock poisoned").keys().cloned().collect();
names.sort();
names
}
fn ws(&self, md: &tonic::metadata::MetadataMap) -> Result<Arc<WorkspaceCtx>, Status> {
let header = md
.get("nornir-workspace")
.and_then(|v| v.to_str().ok())
.filter(|s| !s.is_empty())
.map(|s| s.to_string());
let name = match header {
Some(n) => n,
None => {
let names = self.served_names();
return Err(not_found(format!(
"no workspace selected — set the `nornir-workspace` header \
(CLI `--workspace <name>`, or pick one in the viz top-bar); served: [{}]",
names.join(", ")
)));
}
};
self.workspaces
.read()
.expect("workspaces lock poisoned")
.get(&name)
.cloned()
.ok_or_else(|| {
let names = self.served_names();
not_found(format!(
"workspace `{name}` is not served (served: [{}])",
names.join(", ")
))
})
}
#[cfg(any(feature = "embed-tract", feature = "embed-ort"))]
async fn embedder(&self) -> Result<Arc<dyn nornir::vector::store::Embedder>, Status> {
if let Some(e) = self.embedder.lock().await.as_ref() {
return Ok(e.clone());
}
let e: Arc<dyn nornir::vector::store::Embedder> =
tokio::task::spawn_blocking(|| nornir::vector::load_embedder().map(Arc::from))
.await
.map_err(internal)?
.map_err(internal)?;
let mut g = self.embedder.lock().await;
Ok(g.get_or_insert(e).clone())
}
}
type Shared = Arc<AppState>;
fn flag_or_env(args: &[String], flag: &str, env: &str) -> Option<String> {
let eq = format!("{flag}=");
let mut it = args.iter();
while let Some(a) = it.next() {
if a == flag {
return it.next().cloned();
}
if let Some(v) = a.strip_prefix(&eq) {
return Some(v.to_string());
}
}
std::env::var(env).ok()
}
struct HealthSvc {
#[allow(dead_code)]
state: Shared,
}
#[tonic::async_trait]
impl Health for HealthSvc {
async fn ping(&self, _req: Request<Empty>) -> Result<Response<PingResponse>, Status> {
Ok(Response::new(PingResponse {
status: "ok".into(),
version: env!("CARGO_PKG_VERSION").into(),
repo_count: 0,
}))
}
}
struct ReposSvc { state: Shared }
fn invalid_arg<E: std::fmt::Display>(msg: E) -> Status { Status::invalid_argument(msg.to_string()) }
fn internal<E: std::fmt::Display>(msg: E) -> Status { Status::internal(msg.to_string()) }
fn not_found<E: std::fmt::Display>(msg: E) -> Status { Status::not_found(msg.to_string()) }
#[tonic::async_trait]
impl ReposSvcTrait for ReposSvc {
async fn list(&self, req: Request<Empty>) -> Result<Response<RepoList>, Status> {
let wctx = self.state.ws(req.metadata())?;
let l = wctx.loaded.lock().await;
let repos = l.nornir.repo.iter().map(|(name, r)| RepoSummary {
name: name.clone(),
remote: r.remote.clone(),
history: r.history.clone(),
readme: r.readme.clone(),
}).collect();
Ok(Response::new(RepoList { repos }))
}
async fn register(
&self,
req: Request<RegisterRepoRequest>,
) -> Result<Response<RegisterRepoResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let req = req.into_inner();
if req.name.is_empty() {
return Err(invalid_arg("`name` must not be empty"));
}
if req.name.contains(['.', '/', '\\', '[', ']']) {
return Err(invalid_arg(
"`name` must be a bare identifier (no '.', '/', '\\\\', '[', ']')",
));
}
let mut l = wctx.loaded.lock().await;
let cfg_path = l.config_path.clone();
let existing = std::fs::read_to_string(&cfg_path).ok().unwrap_or_default();
let mut doc: toml_edit::DocumentMut = existing
.parse()
.map_err(|e| internal(format!("parse {}: {e}", cfg_path.display())))?;
let repo_parent = doc
.entry("repo")
.or_insert_with(|| {
let mut t = toml_edit::Table::new();
t.set_implicit(true);
toml_edit::Item::Table(t)
})
.as_table_mut()
.ok_or_else(|| Status::failed_precondition("`repo` exists but is not a table"))?;
repo_parent.set_implicit(true);
let entry = repo_parent
.entry(&req.name)
.or_insert_with(|| toml_edit::Item::Table(toml_edit::Table::new()));
let tbl = entry.as_table_mut().ok_or_else(|| {
Status::failed_precondition(format!("`repo.{}` exists but is not a table", req.name))
})?;
if !req.remote.is_empty() { tbl.insert("remote", toml_edit::value(req.remote.as_str())); }
if !req.history.is_empty() { tbl.insert("history", toml_edit::value(req.history.as_str())); }
if !req.readme.is_empty() { tbl.insert("readme", toml_edit::value(req.readme.as_str())); }
if !req.path.is_empty() { tbl.insert("path", toml_edit::value(req.path.as_str())); }
if let Some(parent) = cfg_path.parent() {
std::fs::create_dir_all(parent).map_err(internal)?;
}
std::fs::write(&cfg_path, doc.to_string()).map_err(|e| {
if e.kind() == std::io::ErrorKind::PermissionDenied {
Status::failed_precondition(format!(
"{} is read-only (likely guard-locked); call Guard.Release first",
cfg_path.display(),
))
} else {
internal(e)
}
})?;
let repo = config::Repo {
path: String::new(),
remote: req.remote.clone(),
history: req.history.clone(),
readme: req.readme.clone(),
publish_order: Vec::new(),
gates: config::Gates::default(),
bench: config::BenchSpec::default(),
private: false,
};
l.nornir.repo.insert(req.name.clone(), repo);
Ok(Response::new(RegisterRepoResponse {
name: req.name,
config_path: cfg_path.display().to_string(),
repo_count: l.nornir.repo.len() as u32,
}))
}
async fn unregister(
&self,
req: Request<UnregisterRepoRequest>,
) -> Result<Response<UnregisterRepoResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let req = req.into_inner();
let mut l = wctx.loaded.lock().await;
let cfg_path = l.config_path.clone();
let removed_mem = l.nornir.repo.remove(&req.name).is_some();
let removed_disk = if cfg_path.exists() {
let text = std::fs::read_to_string(&cfg_path).map_err(internal)?;
let mut doc: toml_edit::DocumentMut = text
.parse()
.map_err(|e| internal(format!("parse {}: {e}", cfg_path.display())))?;
let removed = doc
.get_mut("repo")
.and_then(|i| i.as_table_mut())
.and_then(|t| t.remove(&req.name))
.is_some();
if removed {
std::fs::write(&cfg_path, doc.to_string()).map_err(internal)?;
}
removed
} else {
false
};
if !removed_mem && !removed_disk {
return Err(not_found(format!("repo `{}` not found", req.name)));
}
Ok(Response::new(UnregisterRepoResponse {
name: req.name,
repo_count: l.nornir.repo.len() as u32,
}))
}
async fn reload(&self, req: Request<Empty>) -> Result<Response<ReloadResponse>, Status> {
let wctx = self.state.ws(req.metadata())?;
let mut l = wctx.loaded.lock().await;
let cfg_path = l.config_path.clone();
if !cfg_path.exists() {
return Err(not_found(format!("{} does not exist", cfg_path.display())));
}
let fresh = config::load_explicit(&cfg_path).map_err(internal)?;
*l = fresh;
Ok(Response::new(ReloadResponse {
config_path: cfg_path.display().to_string(),
repo_count: l.nornir.repo.len() as u32,
}))
}
}
struct GuardSvc { state: Shared }
fn to_pb(paths: Vec<guard::PathStatus>) -> GuardReport {
let changed_count = paths.iter().filter(|p| p.changed).count() as u32;
let paths = paths.into_iter().map(|p| GuardPath {
path: p.path.display().to_string(),
exists: p.exists,
writable: p.writable,
changed: p.changed,
}).collect();
GuardReport { paths, changed_count }
}
#[tonic::async_trait]
impl GuardSvcTrait for GuardSvc {
async fn status(&self, req: Request<Empty>) -> Result<Response<GuardReport>, Status> {
let wctx = self.state.ws(req.metadata())?;
let l = wctx.loaded.lock().await;
let report = guard::status(&l.workspace_root, &l.nornir.guard.forbidden);
Ok(Response::new(to_pb(report)))
}
async fn apply(&self, req: Request<Empty>) -> Result<Response<GuardReport>, Status> {
let wctx = self.state.ws(req.metadata())?;
let l = wctx.loaded.lock().await;
let report = guard::apply(&l.workspace_root, &l.nornir.guard.forbidden).map_err(internal)?;
Ok(Response::new(to_pb(report)))
}
async fn release(&self, req: Request<Empty>) -> Result<Response<GuardReport>, Status> {
let wctx = self.state.ws(req.metadata())?;
let l = wctx.loaded.lock().await;
let report = guard::release(&l.workspace_root, &l.nornir.guard.forbidden).map_err(internal)?;
Ok(Response::new(to_pb(report)))
}
async fn verify(&self, req: Request<Empty>) -> Result<Response<pb::GuardVerifyResponse>, Status> {
let root = self.state.ws(req.metadata())?.loaded.lock().await.workspace_root.clone();
let recorded = guard::read_manifest(&root).map_err(internal)?;
let report = guard::verify(&root, &recorded);
let intact = report.iter().all(|v| v.ok());
let paths_json = serde_json::to_string(&report).map_err(internal)?;
Ok(Response::new(pb::GuardVerifyResponse {
intact,
recorded_at: recorded.recorded_at.to_rfc3339(),
paths_json,
}))
}
}
struct ReleaseSvc { state: Shared }
fn last_run_for(loaded: &Loaded, name: &str) -> anyhow::Result<bench::BenchRun> {
let repo = loaded
.nornir
.repo
.get(name)
.ok_or_else(|| anyhow!("repo `{name}` not configured"))?;
let repo_root = config::Nornir::repo_dir(&loaded.workspace_root, name);
let path = repo_root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
let runs = bench::history::read_all(&path)?;
runs.into_iter()
.last()
.ok_or_else(|| anyhow!("no bench runs in {}", path.display()))
}
fn history_for(loaded: &Loaded, name: &str) -> Vec<bench::BenchRun> {
let Some(repo) = loaded.nornir.repo.get(name) else { return Vec::new() };
let repo_root = config::Nornir::repo_dir(&loaded.workspace_root, name);
let path = repo_root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
bench::history::read_all(&path).unwrap_or_default()
}
fn repo_root_for(loaded: &Loaded, name: &str) -> Result<PathBuf, Status> {
if !loaded.nornir.repo.contains_key(name) {
return Err(not_found(format!("repo `{name}` not configured")));
}
Ok(config::Nornir::repo_dir(&loaded.workspace_root, name))
}
fn gate_pass(name: &str) -> GateResult {
GateResult {
gate: name.into(),
status: "pass".into(),
message: String::new(),
version: String::new(),
max_drop_pct: 0.0,
}
}
fn gate_fail<E: std::fmt::Display>(name: &str, e: E) -> GateResult {
GateResult {
gate: name.into(),
status: "fail".into(),
message: format!("{e:#}"),
version: String::new(),
max_drop_pct: 0.0,
}
}
#[tonic::async_trait]
impl ReleaseSvcTrait for ReleaseSvc {
async fn gate_path_patches(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
Ok(Response::new(match release::gate::no_path_patches(&root) {
Ok(()) => gate_pass("no_path_patches"),
Err(e) => gate_fail("no_path_patches", e),
}))
}
async fn gate_nexus_floor(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let _ = repo_root_for(&l, &name)?;
let run = match last_run_for(&l, &name) {
Ok(r) => r,
Err(e) => return Ok(Response::new(gate_fail("nexus_floor", e))),
};
let mut res = match release::gate::nexus_floor(&run) {
Ok(()) => gate_pass("nexus_floor"),
Err(e) => gate_fail("nexus_floor", e),
};
res.version = run.version;
Ok(Response::new(res))
}
async fn gate_no_regression(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let repo = l.nornir.repo.get(&name).unwrap();
let pct = if repo.gates.max_regression_pct > 0.0 {
repo.gates.max_regression_pct
} else {
10.0
};
let hp = root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
let run = match last_run_for(&l, &name) {
Ok(r) => r,
Err(e) => return Ok(Response::new(gate_fail("no_regression", e))),
};
let mut res = match release::gate::no_regression(&run, &hp, pct) {
Ok(()) => gate_pass("no_regression"),
Err(e) => gate_fail("no_regression", e),
};
res.version = run.version;
res.max_drop_pct = pct;
Ok(Response::new(res))
}
async fn gate_docs_fresh(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let run = match last_run_for(&l, &name) {
Ok(r) => r,
Err(e) => return Ok(Response::new(gate_fail("docs_fresh", e))),
};
let layout = docs::RepoLayout::new(&root);
let history = history_for(&l, &name);
let ctx = docs::Ctx::new(&root, &l.workspace_root, Some(&run)).with_history(&history);
Ok(Response::new(match docs::render_check_all(&layout, &ctx) {
Ok(_) => gate_pass("docs_fresh"),
Err(e) => gate_fail("docs_fresh", e),
}))
}
async fn gate_all(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateAllResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let repo = l.nornir.repo.get(&name).unwrap().clone();
let g = &repo.gates;
let job = wctx.jobs.start(
nornir::jobs::kind::RELEASE_RUN,
&name,
&wctx.name,
serde_json::json!({ "op": "gate_all" }),
);
let mut passed: Vec<String> = Vec::new();
let mut failed: Vec<GateFailure> = Vec::new();
let push = |passed: &mut Vec<String>,
failed: &mut Vec<GateFailure>,
n: &str,
r: anyhow::Result<()>| match r {
Ok(()) => passed.push(n.into()),
Err(e) => failed.push(GateFailure {
name: n.into(),
error: format!("{e:#}"),
}),
};
if g.no_path_patches {
push(&mut passed, &mut failed, "no_path_patches",
release::gate::no_path_patches(&root));
}
let last = last_run_for(&l, &name);
if g.nexus_floor {
let r = last.as_ref()
.map_err(|e| anyhow!("{e:#}"))
.and_then(|r| release::gate::nexus_floor(r));
push(&mut passed, &mut failed, "nexus_floor", r);
}
if g.no_regression {
let pct = if g.max_regression_pct > 0.0 { g.max_regression_pct } else { 10.0 };
let hp = root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
let r = last.as_ref()
.map_err(|e| anyhow!("{e:#}"))
.and_then(|r| release::gate::no_regression(r, &hp, pct));
push(&mut passed, &mut failed, "no_regression", r);
}
if !g.integration_roundtrip.is_empty() {
let kinds: Vec<&str> = g.integration_roundtrip.iter().map(|s| s.as_str()).collect();
push(&mut passed, &mut failed, "integration_roundtrip",
release::gate::integration_roundtrip_via_cargo_test(&root, &kinds));
}
if g.docs_fresh {
let ws_root = l.workspace_root.clone();
let r: anyhow::Result<()> = (|| {
let run = last.as_ref().map_err(|e| anyhow!("{e:#}"))?;
let layout = docs::RepoLayout::new(&root);
let history = history_for(&l, &name);
let ctx = docs::Ctx::new(&root, &ws_root, Some(run)).with_history(&history);
docs::render_check_all(&layout, &ctx).map(|_| ())
})();
push(&mut passed, &mut failed, "docs_fresh", r);
}
if g.coverage {
let wh = wctx.warehouse.clone();
let root_c = root.clone();
let repo_name = name.clone();
let workspace = l.workspace_name();
let r = tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
let run_id = nornir::warehouse::test_results::new_run_id();
let state = nornir::autonom::gate_coverage_for_repo(
&wh, &workspace, &repo_name, &root_c, Vec::new(), &run_id,
)?;
release::gate::coverage_gate(&state.report)
})
.await
.map_err(internal)?;
push(&mut passed, &mut failed, "coverage", r);
}
if g.tests {
let wh = wctx.warehouse.clone();
let root_c = root.clone();
let repo_name = name.clone();
drop(l);
let r = tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
use nornir::warehouse::test_results::{query_test_results, TestSelector};
let history = wh
.block_on(query_test_results(&wh, &TestSelector::Repo(repo_name.clone())))
.unwrap_or_default();
let aspects = nornir::test_matrix::Aspect::DEFAULT.to_vec();
let fresh = release::gate::run_tests_gate(&root_c, &history, &aspects)?;
let _ = nornir::warehouse::test_results::append_test_results_blocking(&wh, &fresh);
Ok(())
})
.await
.map_err(internal)?;
push(&mut passed, &mut failed, "tests", r);
job.finish(
serde_json::json!({ "passed": passed.len(), "failed": failed.len() }),
"",
);
return Ok(Response::new(GateAllResult { repo: name, passed, failed }));
}
job.finish(
serde_json::json!({ "passed": passed.len(), "failed": failed.len() }),
"",
);
Ok(Response::new(GateAllResult { repo: name, passed, failed }))
}
async fn order(
&self,
req: Request<OrderRequest>,
) -> Result<Response<OrderResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let mut ws = req.into_inner().workspace;
if ws.is_empty() {
ws = "workspace_holger".into();
}
let (selected, wh_clone) = {
let l = wctx.loaded.lock().await;
(
l.nornir.repo.keys().cloned().collect::<Vec<_>>(),
wctx.warehouse.clone(),
)
};
let ws_for_q = ws.clone();
let snapshots = tokio::task::spawn_blocking(move || {
wh_clone.block_on(query_dep_graph_snapshots(&wh_clone, &ws_for_q, None))
})
.await
.map_err(internal)?
.map_err(internal)?;
let latest = snapshots.into_iter().last();
match latest {
Some(snap) => {
let order = topo_order_from_edges(&selected, &snap.edges);
Ok(Response::new(OrderResponse {
order,
source: "iceberg".into(),
snapshot_id: snap.snapshot_id.to_string(),
}))
}
None => Ok(Response::new(OrderResponse {
order: selected,
source: "fallback".into(),
snapshot_id: String::new(),
})),
}
}
async fn trace(&self, req: Request<pb::TraceQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let q = req.into_inner();
let graph = wctx.graph().await.ok();
let trace = nornir::release::regression::trace_gate_async(
&wctx.warehouse,
&q.workspace,
&q.repo,
graph.as_ref().map(|g| &g.graph),
)
.await
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json: serde_json::to_string(&trace).map_err(internal)? }))
}
type ProgressStream = std::pin::Pin<Box<
dyn tokio_stream::Stream<Item = Result<ProgressEvent, Status>> + Send + 'static,
>>;
async fn progress(
&self,
req: Request<Empty>,
) -> Result<Response<Self::ProgressStream>, Status> {
let log_dir = {
let ctx = self.state.ws(req.metadata())?;
let loaded = ctx.loaded.lock().await;
let wh = loaded.warehouse_root();
wh.parent()
.map(|p| p.join("logs"))
.unwrap_or_else(|| wh.join("logs"))
};
let (tx, rx) = tokio::sync::mpsc::channel::<Result<ProgressEvent, Status>>(64);
tokio::spawn(async move {
if let Err(e) = tail_progress(log_dir, tx.clone()).await {
let _ = tx.send(Err(internal(e))).await;
}
});
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream) as Self::ProgressStream))
}
}
async fn latest_log(log_dir: &std::path::Path) -> Option<PathBuf> {
let mut best: Option<(std::time::SystemTime, PathBuf)> = None;
let mut rd = tokio::fs::read_dir(log_dir).await.ok()?;
while let Ok(Some(entry)) = rd.next_entry().await {
let name = entry.file_name();
let n = name.to_string_lossy();
if !(n.starts_with("release-run-") && n.ends_with(".events.ndjson")) {
continue;
}
if let Ok(meta) = entry.metadata().await {
if let Ok(mt) = meta.modified() {
match &best {
Some((t, _)) if *t >= mt => {}
_ => best = Some((mt, entry.path())),
}
}
}
}
best.map(|(_, p)| p)
}
async fn tail_progress(
log_dir: PathBuf,
tx: tokio::sync::mpsc::Sender<Result<ProgressEvent, Status>>,
) -> anyhow::Result<()> {
use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
let path = loop {
if let Some(p) = latest_log(&log_dir).await {
break p;
}
if tx.is_closed() {
return Ok(());
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
};
let mut f = tokio::fs::File::open(&path).await?;
f.seek(std::io::SeekFrom::Start(0)).await?;
let mut reader = tokio::io::BufReader::new(f);
let mut buf = String::new();
loop {
buf.clear();
let n = reader.read_line(&mut buf).await?;
if n == 0 {
if tx.is_closed() {
return Ok(());
}
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
continue;
}
let line = buf.trim();
if line.is_empty() {
continue;
}
match serde_json::from_str::<release::progress::ReleaseEvent>(line) {
Ok(ev) => {
let is_end = matches!(ev, release::progress::ReleaseEvent::RunEnd { .. });
let pb_ev = release_event_to_pb(ev);
if tx.send(Ok(pb_ev)).await.is_err() {
return Ok(());
}
if is_end {
return Ok(());
}
}
Err(_) => continue,
}
}
}
fn release_event_to_pb(ev: release::progress::ReleaseEvent) -> ProgressEvent {
use pb::progress_event::Kind as K;
use release::progress::ReleaseEvent as R;
let kind = match ev {
R::RunStart { run_id, workspace, .. } => K::RunStart(pb::RunStart { run_id, workspace }),
R::RepoStart { repo, sha, .. } => K::RepoStart(pb::RepoStart { repo, sha }),
R::PhaseStart { repo, phase, .. } => K::PhaseStart(pb::PhaseStart { repo, phase }),
R::PhaseEnd { repo, phase, ok, duration_ms, .. } => {
K::PhaseEnd(pb::PhaseEnd { repo, phase, ok, duration_ms })
}
R::BinaryStart { repo, binary, .. } => K::BinaryStart(pb::BinaryStart { repo, binary }),
R::TestPass { repo, binary, name, .. } => K::TestPass(pb::TestPass { repo, binary, name }),
R::TestFail { repo, binary, name, .. } => K::TestFail(pb::TestFail { repo, binary, name }),
R::BinaryDone { repo, binary, passed, failed, .. } => {
K::BinaryDone(pb::BinaryDone { repo, binary, passed, failed })
}
R::RepoEnd { repo, ok, .. } => K::RepoEnd(pb::RepoEnd { repo, ok }),
R::RunEnd { run_id, ok, .. } => K::RunEnd(pb::RunEnd { run_id, ok }),
};
ProgressEvent { kind: Some(kind) }
}
struct BenchSvc { state: Shared }
fn pb_bench_run_from(run: &bench::BenchRun) -> pb::BenchRun {
let results = run
.results
.iter()
.map(|r| pb::BenchResult {
name: r.name.clone(),
metrics: r
.metrics
.iter()
.filter_map(|(k, v)| v.as_f64().map(|f| Kvf { key: k.clone(), value: f }))
.collect(),
})
.collect();
let tests = run
.tests
.iter()
.map(|t| pb::TestOutcome {
name: t.name.clone(),
passed: t.passed,
duration_ms: t.duration_ms.unwrap_or(0.0),
has_duration: t.duration_ms.is_some(),
message: t.message.clone().unwrap_or_default(),
})
.collect();
pb::BenchRun {
date: run.date.clone(),
timestamp: run.timestamp.clone().unwrap_or_default(),
version: run.version.clone(),
machine: run.machine.clone(),
cores: run.cores,
results,
tests,
}
}
fn bench_run_from_pb(p: pb::BenchRun) -> bench::BenchRun {
let results = p
.results
.into_iter()
.map(|r| {
let mut map = serde_json::Map::new();
for kv in r.metrics {
if let Some(num) = serde_json::Number::from_f64(kv.value) {
map.insert(kv.key, serde_json::Value::Number(num));
}
}
bench::BenchResult { name: r.name, metrics: map }
})
.collect();
let tests = p
.tests
.into_iter()
.map(|t| bench::TestOutcome {
name: t.name,
passed: t.passed,
duration_ms: t.has_duration.then_some(t.duration_ms),
message: if t.message.is_empty() { None } else { Some(t.message) },
})
.collect();
bench::BenchRun {
date: p.date,
timestamp: if p.timestamp.is_empty() { None } else { Some(p.timestamp) },
version: p.version,
machine: p.machine,
cores: p.cores,
results,
tests,
}
}
#[tonic::async_trait]
impl BenchSvcTrait for BenchSvc {
async fn history(
&self,
req: Request<RepoOnly>,
) -> Result<Response<BenchHistoryResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
{
let l = wctx.loaded.lock().await;
if !l.nornir.repo.contains_key(&name) {
return Err(not_found(format!("repo `{name}` not configured")));
}
}
let runs = wctx
.warehouse
.query_bench_runs_async(&nornir::warehouse::BenchFilter::for_repo(name.clone()))
.await
.map_err(internal)?;
Ok(Response::new(BenchHistoryResponse {
repo: name,
runs: runs.iter().map(pb_bench_run_from).collect(),
}))
}
async fn submit(
&self,
req: Request<SubmitBenchRequest>,
) -> Result<Response<SubmitBenchResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let req = req.into_inner();
let pb_run = req.run.ok_or_else(|| invalid_arg("`run` is required"))?;
let run = bench_run_from_pb(pb_run);
if run.machine.is_empty() {
return Err(invalid_arg("BenchRun.machine must not be empty"));
}
let l = wctx.loaded.lock().await;
let repo = l
.nornir
.repo
.get(&req.repo)
.ok_or_else(|| not_found(format!("repo `{}` not configured", req.repo)))?;
let repo_root = config::Nornir::repo_dir(&l.workspace_root, &req.repo);
let path = repo_root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
drop(l); let run_id = wctx
.warehouse
.append_bench_run_async(&req.repo, &run)
.await
.map_err(internal)?;
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(internal)?;
}
bench::history::append(&path, &run).map_err(internal)?;
Ok(Response::new(SubmitBenchResponse { run_id: run_id.to_string() }))
}
}
struct TelemetrySvc { state: Shared }
#[tonic::async_trait]
impl TelemetrySvcTrait for TelemetrySvc {
async fn submit_mcp_calls(
&self,
req: Request<pb::SubmitMcpCallsRequest>,
) -> Result<Response<pb::SubmitMcpCallsResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let calls: Vec<nornir::warehouse::iceberg::McpCall> = req
.into_inner()
.calls
.into_iter()
.map(|c| nornir::warehouse::iceberg::McpCall {
ts_micros: c.ts_micros,
tool: c.tool,
status: c.status,
latency_ms: c.latency_ms,
})
.collect();
let n = calls.len() as u32;
if !calls.is_empty() {
let wh = wctx.warehouse.clone();
tokio::task::spawn_blocking(move || wh.append_mcp_calls(&calls))
.await
.map_err(internal)?
.map_err(internal)?;
}
Ok(Response::new(pb::SubmitMcpCallsResponse { accepted: n }))
}
async fn submit_bakeoff(
&self,
req: Request<pb::SubmitBakeoffRequest>,
) -> Result<Response<pb::SubmitCountResponse>, Status> {
use nornir::warehouse::agent_model_runs::{append_agent_model_runs, AgentModelRunRow};
let wctx = self.state.ws(req.metadata())?.clone();
let rows: Vec<AgentModelRunRow> = req
.into_inner()
.runs
.into_iter()
.map(|r| AgentModelRunRow {
run_id: r.run_id,
ts_micros: r.ts_micros,
agent: r.agent,
model: r.model,
prompt_id: r.prompt_id,
prompt: r.prompt,
output: r.output,
latency_ms: r.latency_ms,
tokens_in: r.tokens_in,
tokens_out: r.tokens_out,
tokens_per_s: r.tokens_per_s,
score: r.score,
ok: r.ok,
error: (!r.error.is_empty()).then_some(r.error),
cost_usd: r.cost_usd,
mcp_tool_calls: r.mcp_tool_calls,
})
.collect();
let n = rows.len() as u32;
if !rows.is_empty() {
append_agent_model_runs(&wctx.warehouse, &rows).await.map_err(internal)?;
}
Ok(Response::new(pb::SubmitCountResponse { accepted: n }))
}
async fn submit_test_results(
&self,
req: Request<pb::SubmitTestResultsRequest>,
) -> Result<Response<pb::SubmitCountResponse>, Status> {
use nornir::warehouse::test_results::{append_test_results, TestResultRow};
let wctx = self.state.ws(req.metadata())?.clone();
let rows: Vec<TestResultRow> = req
.into_inner()
.rows
.into_iter()
.map(|r| TestResultRow {
run_id: r.run_id,
repo: r.repo,
suite: r.suite,
test_name: r.test_name,
status: r.status,
duration_ms: r.duration_ms,
ts_micros: r.ts_micros,
message: r.message,
aspect: r.aspect,
metric: r.metric,
})
.collect();
let n = rows.len() as u32;
if !rows.is_empty() {
append_test_results(&wctx.warehouse, &rows).await.map_err(internal)?;
}
Ok(Response::new(pb::SubmitCountResponse { accepted: n }))
}
async fn submit_release_events(
&self,
req: Request<pb::SubmitReleaseEventsRequest>,
) -> Result<Response<pb::SubmitCountResponse>, Status> {
use nornir::warehouse::release_events::{append_release_events, ReleaseEventRow};
let wctx = self.state.ws(req.metadata())?.clone();
let rows: Vec<ReleaseEventRow> = req
.into_inner()
.rows
.into_iter()
.map(|r| ReleaseEventRow {
run_id: r.run_id,
seq: r.seq,
ts_micros: r.ts_micros,
component: r.component,
repo: r.repo,
op: r.op,
phase: r.phase,
status: r.status,
detail: r.detail,
depends_on: r.has_depends_on.then_some(r.depends_on),
elapsed_ms: r.has_elapsed_ms.then_some(r.elapsed_ms),
})
.collect();
let n = rows.len() as u32;
if !rows.is_empty() {
append_release_events(&wctx.warehouse, &rows).await.map_err(internal)?;
}
Ok(Response::new(pb::SubmitCountResponse { accepted: n }))
}
async fn submit_jobs(
&self,
req: Request<pb::SubmitJobsRequest>,
) -> Result<Response<pb::SubmitCountResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let jobs_json = req.into_inner().jobs_json;
let store = wctx.jobs.clone();
let n = tokio::task::spawn_blocking(move || -> anyhow::Result<u32> {
let recs: Vec<nornir::jobs::JobRecord> = if jobs_json.trim().is_empty() {
Vec::new()
} else {
serde_json::from_str(&jobs_json)?
};
for rec in &recs {
store.submit(rec)?;
}
Ok(recs.len() as u32)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::SubmitCountResponse { accepted: n }))
}
}
struct SearchSvc { state: Shared }
#[tonic::async_trait]
impl SearchSvcTrait for SearchSvc {
async fn query(
&self,
req: Request<SearchRequest>,
) -> Result<Response<SearchResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let req = req.into_inner();
let corpus = if req.corpus.is_empty() {
None
} else {
Some(
index::Corpus::parse(&req.corpus)
.ok_or_else(|| invalid_arg(format!("unknown corpus `{}`", req.corpus)))?,
)
};
let repo_opt = if req.repo.is_empty() { None } else { Some(req.repo.clone()) };
let limit = if req.limit == 0 { 20 } else { req.limit as usize };
let idx = wctx.index.clone();
let q = req.query.clone();
let hits = tokio::task::spawn_blocking(move || {
idx.search(&q, corpus, repo_opt.as_deref(), limit)
})
.await
.map_err(internal)?
.map_err(internal)?;
let hits = hits
.into_iter()
.map(|h| SearchHit {
id: String::new(),
corpus: h.corpus,
repo: h.repo,
path: h.path,
score: h.score,
snippet: h.snippet,
title: h.title,
})
.collect();
Ok(Response::new(SearchResponse { hits }))
}
}
struct IndexSvc { state: Shared }
#[tonic::async_trait]
impl IndexSvcTrait for IndexSvc {
async fn stats(&self, req: Request<Empty>) -> Result<Response<IndexStatsResponse>, Status> {
let idx = self.state.ws(req.metadata())?.index.clone();
let stats = tokio::task::spawn_blocking(move || idx.stats())
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(IndexStatsResponse {
total: stats.total,
by_corpus: stats
.by_corpus
.into_iter()
.map(|(k, v)| Kv { key: k, value: v.to_string() })
.collect(),
}))
}
async fn snapshot(
&self,
req: Request<SnapshotRequest>,
) -> Result<Response<SnapshotResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let req = req.into_inner();
if req.git_sha.is_empty() {
return Err(invalid_arg("`git_sha` is required"));
}
let workspace = if req.workspace.is_empty() { "workspace_holger".into() } else { req.workspace };
let repo = if req.repo.is_empty() { "_workspace".into() } else { req.repo };
let branch = if req.branch.is_empty() { "unknown".into() } else { req.branch };
let index_dir = wctx.index.index_dir().to_path_buf();
let wh = wctx.warehouse.clone();
let job = wctx.jobs.start(
nornir::jobs::kind::SNAPSHOT,
&repo,
&workspace,
serde_json::json!({ "git_sha": req.git_sha, "branch": branch, "source": "index_snapshot" }),
);
let snap = match tokio::task::spawn_blocking(move || {
nornir::index::snapshot::snapshot_to_iceberg(&wh, &workspace, &repo, &req.git_sha, &branch, &index_dir)
})
.await
.map_err(internal)?
{
Ok(s) => s,
Err(e) => {
let e = anyhow::anyhow!("{e:#}");
job.fail(&e);
return Err(internal(e));
}
};
job.finish(
serde_json::json!({ "blob_count": snap.blob_count, "total_bytes": snap.total_bytes }),
&format!("index_snapshots:{}", snap.snapshot_id),
);
Ok(Response::new(snapshot_ref_to_pb(snap)))
}
async fn upload_snapshot(
&self,
req: Request<tonic::Streaming<IndexBlobFrame>>,
) -> Result<Response<SnapshotResponse>, Status> {
use pb::index_blob_frame::Body;
let wctx = self.state.ws(req.metadata())?.clone();
let mut stream = req.into_inner();
let tmp_root = std::env::temp_dir()
.join(format!("nornir-upload-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&tmp_root).map_err(internal)?;
struct TmpDir(PathBuf);
impl Drop for TmpDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.0);
}
}
let _guard = TmpDir(tmp_root.clone());
let root = tmp_root;
let mut meta: Option<pb::IndexBlobMeta> = None;
let mut cur_file: Option<(PathBuf, u64, std::fs::File, u64)> = None;
while let Some(frame) = stream.message().await? {
match frame.body {
Some(Body::Meta(m)) => {
if meta.is_some() {
return Err(invalid_arg("meta frame seen twice"));
}
meta = Some(m);
}
Some(Body::File(f)) => {
if meta.is_none() {
return Err(invalid_arg("first frame must carry `meta`"));
}
if let Some((p, declared, _, written)) = cur_file.take() {
if written != declared {
return Err(invalid_arg(format!(
"file {} declared {} bytes, got {}", p.display(), declared, written
)));
}
}
if f.rel_path.is_empty() || f.rel_path.contains("..") {
return Err(invalid_arg("rel_path must be non-empty and not contain `..`"));
}
let path = root.join(&f.rel_path);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(internal)?;
}
let fh = std::fs::File::create(&path).map_err(internal)?;
cur_file = Some((path, f.size_bytes, fh, 0));
}
Some(Body::Chunk(bytes)) => {
let Some((_, _, fh, written)) = cur_file.as_mut() else {
return Err(invalid_arg("chunk before file"));
};
use std::io::Write;
fh.write_all(&bytes).map_err(internal)?;
*written += bytes.len() as u64;
}
None => return Err(invalid_arg("empty frame")),
}
}
if let Some((p, declared, _, written)) = cur_file.take() {
if written != declared {
return Err(invalid_arg(format!(
"file {} declared {} bytes, got {}", p.display(), declared, written
)));
}
}
let meta = meta.ok_or_else(|| invalid_arg("stream had no `meta` frame"))?;
if meta.git_sha.is_empty() {
return Err(invalid_arg("meta.git_sha is required"));
}
let workspace = if meta.workspace.is_empty() { "workspace_holger".into() } else { meta.workspace };
let repo = if meta.repo.is_empty() { "_workspace".into() } else { meta.repo };
let branch = if meta.branch.is_empty() { "unknown".into() } else { meta.branch };
let wh = wctx.warehouse.clone();
let job = wctx.jobs.start(
nornir::jobs::kind::SNAPSHOT,
&repo,
&workspace,
serde_json::json!({ "git_sha": meta.git_sha, "branch": branch, "source": "upload" }),
);
let snap = match tokio::task::spawn_blocking(move || {
nornir::index::snapshot::snapshot_to_iceberg(&wh, &workspace, &repo, &meta.git_sha, &branch, &root)
})
.await
.map_err(internal)?
{
Ok(s) => s,
Err(e) => {
let e = anyhow::anyhow!("{e:#}");
job.fail(&e);
return Err(internal(e));
}
};
job.finish(
serde_json::json!({ "blob_count": snap.blob_count, "total_bytes": snap.total_bytes }),
&format!("index_snapshots:{}", snap.snapshot_id),
);
Ok(Response::new(snapshot_ref_to_pb(snap)))
}
}
fn snapshot_ref_to_pb(s: nornir::index::snapshot::SnapshotRef) -> SnapshotResponse {
SnapshotResponse {
snapshot_id: s.snapshot_id.to_string(),
workspace: s.workspace,
repo: s.repo,
git_sha: s.git_sha,
branch: s.branch,
schema_hash: s.schema_hash,
blob_count: s.blob_count as u64,
total_bytes: s.total_bytes as u64,
}
}
struct IntrospectSvc { state: Shared }
fn resolve_binary(workspace_root: &std::path::Path, binary: &str) -> PathBuf {
let p = PathBuf::from(binary);
if p.is_absolute() { p } else { workspace_root.join(p) }
}
fn sym_to_pb(s: &introspect::artifact::Symbol) -> PbSymbol {
PbSymbol {
name: s.name.clone(),
name_demangled: s.name_demangled.clone(),
name_mangled: s.name_mangled.clone(),
file: s.file.clone(),
line: s.line.unwrap_or(0),
size_bytes: s.size_bytes.unwrap_or(0),
krate: s.krate.clone(),
}
}
#[tonic::async_trait]
impl IntrospectSvcTrait for IntrospectSvc {
async fn symbols(&self, req: Request<BinaryOnly>) -> Result<Response<SymbolList>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let ws = wctx.loaded.lock().await.workspace_root.clone();
let binary = req.into_inner().binary;
let bin = resolve_binary(&ws, &binary);
let ws2 = ws.clone();
let job = wctx.jobs.start(
nornir::jobs::kind::SYMBOL_SCAN,
&binary,
&wctx.name,
serde_json::json!({ "op": "symbols" }),
);
let syms = match tokio::task::spawn_blocking(move || {
introspect::artifact::extract_symbols(&bin, &ws2)
})
.await
.map_err(internal)?
{
Ok(s) => s,
Err(e) => {
job.fail(&e);
return Err(internal(e));
}
};
job.finish(serde_json::json!({ "symbols": syms.len() }), "");
Ok(Response::new(SymbolList { symbols: syms.iter().map(sym_to_pb).collect() }))
}
async fn symbol_lookup(
&self,
req: Request<SymbolLookupRequest>,
) -> Result<Response<SymbolList>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let ws = wctx.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &r.binary);
let ws2 = ws.clone();
let limit = if r.limit == 0 { 50 } else { r.limit as usize };
let pat = r.pattern;
let job = wctx.jobs.start(
nornir::jobs::kind::SYMBOL_SCAN,
&r.binary,
&wctx.name,
serde_json::json!({ "op": "symbol_lookup" }),
);
let syms = match tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<introspect::artifact::Symbol>> {
let all = introspect::artifact::extract_symbols(&bin, &ws2)?;
Ok(introspect::artifact::lookup(&all, &pat)
.into_iter()
.take(limit)
.cloned()
.collect())
})
.await
.map_err(internal)?
{
Ok(s) => s,
Err(e) => {
job.fail(&e);
return Err(internal(e));
}
};
job.finish(serde_json::json!({ "symbols": syms.len() }), "");
Ok(Response::new(SymbolList { symbols: syms.iter().map(sym_to_pb).collect() }))
}
async fn defined_in(
&self,
req: Request<DefinedInRequest>,
) -> Result<Response<SymbolList>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let ws = wctx.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &r.binary);
let ws2 = ws.clone();
let suffix = r.suffix;
let job = wctx.jobs.start(
nornir::jobs::kind::SYMBOL_SCAN,
&r.binary,
&wctx.name,
serde_json::json!({ "op": "defined_in" }),
);
let syms = match tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<introspect::artifact::Symbol>> {
let all = introspect::artifact::extract_symbols(&bin, &ws2)?;
Ok(introspect::artifact::defined_in(&all, &suffix)
.into_iter()
.cloned()
.collect())
})
.await
.map_err(internal)?
{
Ok(s) => s,
Err(e) => {
job.fail(&e);
return Err(internal(e));
}
};
job.finish(serde_json::json!({ "symbols": syms.len() }), "");
Ok(Response::new(SymbolList { symbols: syms.iter().map(sym_to_pb).collect() }))
}
async fn callers(&self, req: Request<CallQuery>) -> Result<Response<NameList>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
Ok(Response::new(NameList { names: callgraph_query(&wctx, req.into_inner(), true).await? }))
}
async fn callees(&self, req: Request<CallQuery>) -> Result<Response<NameList>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
Ok(Response::new(NameList { names: callgraph_query(&wctx, req.into_inner(), false).await? }))
}
async fn path_between(
&self,
req: Request<PathBetweenRequest>,
) -> Result<Response<NameList>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let ws = wctx.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &r.binary);
let ws2 = ws.clone();
let names = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<String>> {
let edges = introspect::callgraph_dwarf::extract_callgraph(&bin, &ws2)?;
let cg = introspect::callgraph_dwarf::Callgraph::from_edges(&edges);
Ok(cg.path_between(&r.from, &r.to).unwrap_or_default())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(NameList { names }))
}
}
async fn callgraph_query(wctx: &WorkspaceCtx, r: CallQuery, callers: bool) -> Result<Vec<String>, Status> {
let ws = wctx.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &r.binary);
let ws2 = ws.clone();
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<String>> {
let edges = introspect::callgraph_dwarf::extract_callgraph(&bin, &ws2)?;
let cg = introspect::callgraph_dwarf::Callgraph::from_edges(&edges);
Ok(if callers { cg.callers_of(&r.name) } else { cg.callees_of(&r.name) })
})
.await
.map_err(internal)?
.map_err(internal)
}
struct KnowledgeSvc { state: Shared }
fn ksym_to_pb(s: &knowledge::symbols::SymbolRow) -> KnowledgeSymbol {
KnowledgeSymbol {
crate_name: s.crate_name.clone(),
module_path: s.module_path.clone(),
item_kind: s.item_kind.clone(),
item_name: s.item_name.clone(),
visibility: s.visibility.clone(),
file: s.file.clone(),
line: s.line,
doc_lines: s.doc_lines,
signature: s.signature.clone().unwrap_or_default(),
}
}
fn kcall_to_pb(c: &knowledge::symbols::CallEdgeRow) -> KnowledgeCall {
KnowledgeCall {
crate_name: c.crate_name.clone(),
caller_path: c.caller_path.clone(),
callee_ident: c.callee_ident.clone(),
call_kind: c.call_kind.clone(),
file: c.file.clone(),
line: c.line,
}
}
#[tonic::async_trait]
impl KnowledgeSvcTrait for KnowledgeSvc {
async fn symbol_lookup(
&self,
req: Request<KnowledgeSymbolQuery>,
) -> Result<Response<KnowledgeSymbols>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let wh = wctx.warehouse.clone();
let limit = if r.limit == 0 { 50 } else { r.limit as usize };
let symbols = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<KnowledgeSymbol>> {
let view = knowledge::query::load_latest(&wh, &r.repo)?;
Ok(view.symbol_lookup(&r.arg, limit).iter().map(|s| ksym_to_pb(s)).collect())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(KnowledgeSymbols { symbols }))
}
async fn defined_in(
&self,
req: Request<KnowledgeSymbolQuery>,
) -> Result<Response<KnowledgeSymbols>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let wh = wctx.warehouse.clone();
let limit = if r.limit == 0 { 100 } else { r.limit as usize };
let symbols = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<KnowledgeSymbol>> {
let view = knowledge::query::load_latest(&wh, &r.repo)?;
Ok(view.defined_in(&r.arg).iter().take(limit).map(|s| ksym_to_pb(s)).collect())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(KnowledgeSymbols { symbols }))
}
async fn callers(
&self,
req: Request<KnowledgeCallQuery>,
) -> Result<Response<KnowledgeCalls>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
Ok(Response::new(KnowledgeCalls {
calls: knowledge_call_query(&wctx, req.into_inner(), true).await?,
}))
}
async fn callees(
&self,
req: Request<KnowledgeCallQuery>,
) -> Result<Response<KnowledgeCalls>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
Ok(Response::new(KnowledgeCalls {
calls: knowledge_call_query(&wctx, req.into_inner(), false).await?,
}))
}
async fn call_path(
&self,
req: Request<KnowledgeCallPathQuery>,
) -> Result<Response<NameList>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let wh = wctx.warehouse.clone();
let names = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<String>> {
let (view, source) = knowledge::query::load_preferred(&wh, &r.repo)?;
eprintln!("knowledge.call_path: repo={} source={source}", r.repo);
Ok(view.call_path(&r.from, &r.to).unwrap_or_default())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(NameList { names }))
}
}
async fn knowledge_call_query(
wctx: &WorkspaceCtx,
r: KnowledgeCallQuery,
callers: bool,
) -> Result<Vec<KnowledgeCall>, Status> {
let wh = wctx.warehouse.clone();
let limit = if r.limit == 0 { 100 } else { r.limit as usize };
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<KnowledgeCall>> {
let (view, source) = knowledge::query::load_preferred(&wh, &r.repo)?;
eprintln!("knowledge.{}: repo={} source={source}", if callers { "callers" } else { "callees" }, r.repo);
let hits = if callers { view.callers_of(&r.name) } else { view.callees_of(&r.name) };
Ok(hits.iter().take(limit).map(|c| kcall_to_pb(c)).collect())
})
.await
.map_err(internal)?
.map_err(internal)
}
fn prepare_docs_path_deps(repo: &std::path::Path, scan_root: &std::path::Path) -> anyhow::Result<()> {
let (cloned, linked) = nornir::security::prepare_path_deps(repo, scan_root, &|_| None);
if cloned + linked > 0 {
eprintln!(
"nornir-docs: prepared path-deps for {} ({cloned} cloned, {linked} linked)",
repo.display()
);
}
let unresolved = nornir::security::unresolved_path_dep_siblings(repo, scan_root);
if !unresolved.is_empty() {
let names: Vec<String> =
unresolved.iter().map(|(dep, path)| format!("{dep} ({path})")).collect();
anyhow::bail!(
"docs render for {repo}: unresolved path-dep sibling(s) {sibs} — their \
Cargo.toml is not on disk beside the repo. Check the sibling repo(s) \
out under {root}, or build nornir-server with the `net-scan` feature \
so it clones them automatically (set NORNIR_DEP_CLONE_BASE for a \
non-default git host).",
repo = repo.display(),
sibs = names.join(", "),
root = scan_root.display(),
);
}
Ok(())
}
struct DocsSvc { state: Shared }
#[tonic::async_trait]
impl DocsSvcTrait for DocsSvc {
async fn init(&self, req: Request<RepoOnly>) -> Result<Response<DocsResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let layout = docs::RepoLayout::new(&root);
let srcs = docs::init_repo(&layout).map_err(internal)?;
Ok(Response::new(DocsResponse {
repo: name,
status: "ok".into(),
artifacts: srcs.into_iter().map(|p| p.display().to_string()).collect(),
detail: layout.nornir_dir().display().to_string(),
}))
}
async fn render(&self, req: Request<RepoOnly>) -> Result<Response<DocsResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let layout = docs::RepoLayout::new(&root);
let last = last_run_for(&l, &name).ok();
let history = history_for(&l, &name);
let ctx = docs::Ctx::new(&root, &l.workspace_root, last.as_ref()).with_history(&history);
let job = wctx.jobs.start(
nornir::jobs::kind::DOCS_RENDER,
&name,
&wctx.name,
serde_json::Value::Null,
);
if let Err(e) = prepare_docs_path_deps(&root, &l.workspace_root) {
job.fail(&e);
return Err(internal(e));
}
let reports = match docs::render_all(&layout, &ctx) {
Ok(r) => r,
Err(e) => {
job.fail(&e);
return Err(internal(e));
}
};
job.finish(serde_json::json!({ "docs": reports.len() }), "");
Ok(Response::new(DocsResponse {
repo: name,
status: "ok".into(),
artifacts: reports.iter().map(|r| r.output.display().to_string()).collect(),
detail: format!("{} doc(s) rendered", reports.len()),
}))
}
async fn check(&self, req: Request<RepoOnly>) -> Result<Response<DocsResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let layout = docs::RepoLayout::new(&root);
let last = last_run_for(&l, &name).ok();
let history = history_for(&l, &name);
let ctx = docs::Ctx::new(&root, &l.workspace_root, last.as_ref()).with_history(&history);
let (status, detail) = match docs::render_check_all(&layout, &ctx) {
Ok(_) => ("ok".to_string(), String::new()),
Err(e) => ("drift".to_string(), format!("{e:#}")),
};
Ok(Response::new(DocsResponse {
repo: name,
status,
artifacts: Vec::new(),
detail,
}))
}
async fn history(
&self,
req: Request<DocsHistoryRequest>,
) -> Result<Response<DocsHistoryResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let repo = r.repo.clone();
let filter = docs::ExportFilter {
doc_name: if r.doc.is_empty() { None } else { Some(r.doc) },
version: if r.version.is_empty() { None } else { Some(r.version) },
format: if r.format.is_empty() { None } else { Some(r.format) },
limit: Some(if r.limit == 0 { 50 } else { r.limit as usize }),
};
let wh = wctx.warehouse.clone();
let rows = tokio::task::spawn_blocking(move || docs::list_doc_exports(&wh, &repo, &filter))
.await
.map_err(internal)?
.map_err(internal)?;
let entries = rows
.into_iter()
.map(|e| PbDocExport {
doc: e.doc_name,
version: e.version,
format: e.format,
path: e.export_id,
exported_at: e.generated_at,
size_bytes: e.byte_len.max(0) as u64,
})
.collect();
Ok(Response::new(DocsHistoryResponse { entries }))
}
async fn export(
&self,
req: Request<pb::DocsExportRequest>,
) -> Result<Response<pb::DocsExportResponse>, Status> {
#[cfg(not(feature = "docs-export"))]
{
let _ = req;
Err(Status::unimplemented(
"nornir-server was built without the `docs-export` feature; \
rebuild with `--features server,docs-export` to use Docs.Export",
))
}
#[cfg(feature = "docs-export")]
{
self.render_export(req, "README").await
}
}
async fn book(
&self,
req: Request<pb::DocsExportRequest>,
) -> Result<Response<pb::DocsExportResponse>, Status> {
#[cfg(not(feature = "docs-export"))]
{
let _ = req;
Err(Status::unimplemented(
"nornir-server was built without the `docs-export` feature; \
rebuild with `--features server,docs-export` to use Docs.Book",
))
}
#[cfg(feature = "docs-export")]
{
self.render_export(req, "book").await
}
}
}
#[cfg(feature = "docs-export")]
impl DocsSvc {
async fn render_export(
&self,
req: Request<pb::DocsExportRequest>,
doc: &'static str,
) -> Result<Response<pb::DocsExportResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let repo_name = r.repo.clone();
let fmt_str = if r.format.is_empty() { "pdf".to_string() } else { r.format.clone() };
let (root, workspace_root, last, history) = {
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &repo_name)?;
let last = last_run_for(&l, &repo_name).ok();
let history = history_for(&l, &repo_name);
(root, l.workspace_root.clone(), last, history)
};
let wh = wctx.warehouse.clone();
let fmt_for_closure = fmt_str.clone();
let job_kind = if doc == "book" {
nornir::jobs::kind::DOCS_BOOK
} else {
nornir::jobs::kind::DOCS_EXPORT
};
let job = wctx.jobs.start(
job_kind,
&repo_name,
&wctx.name,
serde_json::json!({ "doc": doc, "format": fmt_str }),
);
let blocking = tokio::task::spawn_blocking(move || -> anyhow::Result<_> {
let layout = docs::RepoLayout::new(&root);
let ctx = docs::Ctx::new(&root, &workspace_root, last.as_ref())
.with_history(&history);
prepare_docs_path_deps(&root, &workspace_root)?;
docs::render_all(&layout, &ctx)?;
let format = docs::DocFormat::parse(&fmt_for_closure)?;
let ext = format.extension();
let (bytes, sources): (Vec<u8>, Vec<String>) = if doc == "book" {
let (bytes, srcs) = docs::build_book(&root, &ctx, format)?;
(bytes, srcs.iter().map(|p| p.display().to_string()).collect())
} else {
(docs::export_repo(&root, format)?, Vec::new())
};
let version = docs::resolve_version(&root);
let out_path = layout.export_path(doc, ext);
if let Some(parent) = out_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&out_path, &bytes)?;
let workspace = workspace_root
.file_name()
.and_then(|x| x.to_str())
.unwrap_or("_workspace")
.to_string();
let git_sha =
nornir::gitio::head_sha(&root).unwrap_or_else(|_| "unknown".to_string());
let record = docs::record_doc_export(
&wh, &workspace, &repo_name, doc, &version, ext, &git_sha, &bytes,
)?;
Ok((out_path, bytes.len(), sources, record))
})
.await
.map_err(internal)?;
let (out_path, nbytes, sources, record) = match blocking {
Ok(v) => v,
Err(e) => {
job.fail(&e);
return Err(internal(e));
}
};
job.finish(
serde_json::json!({ "bytes": nbytes, "format": fmt_str }),
&format!("doc_exports:{}", record.export_id),
);
Ok(Response::new(pb::DocsExportResponse {
repo: r.repo,
format: fmt_str,
bytes: nbytes as u64,
out: out_path.display().to_string(),
sha256: record.sha256,
git_sha: record.git_sha,
export_id: record.export_id,
sources,
}))
}
}
struct MimirSvc { state: Shared }
impl MimirSvc {
async fn json<F>(&self, md: &tonic::metadata::MetadataMap, f: F) -> Result<Response<pb::JsonResponse>, Status>
where
F: FnOnce(&nornir::warehouse::dep_graph::WorkspaceGraph) -> anyhow::Result<serde_json::Value>,
{
let g = self.state.ws(md)?.graph().await?;
let value = f(&g.graph).map_err(internal)?;
let json = serde_json::to_string(&value).map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
}
#[tonic::async_trait]
impl MimirSvcTrait for MimirSvc {
async fn deps_of(&self, req: Request<pb::DepQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.json(&md, |g| nornir::mimir::deps_of(g, &q.repo, q.transitive)).await
}
async fn dependents_of(&self, req: Request<pb::DepQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.json(&md, |g| nornir::mimir::dependents_of(g, &q.repo, q.transitive)).await
}
async fn affected_by_change(&self, req: Request<pb::AffectedQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.json(&md, |g| nornir::mimir::affected_by_change(g, &q.repos)).await
}
async fn build_order(&self, req: Request<Empty>) -> Result<Response<pb::JsonResponse>, Status> {
let md = req.metadata().clone();
self.json(&md, nornir::mimir::build_order).await
}
async fn dep_path(&self, req: Request<pb::DepPathQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.json(&md, |g| nornir::mimir::dep_path(g, &q.from, &q.to)).await
}
async fn external_dep_users(&self, req: Request<pb::CrateQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.json(&md, |g| Ok(nornir::mimir::external_dep_users(g, &q.krate))).await
}
async fn svg(&self, req: Request<Empty>) -> Result<Response<pb::JsonResponse>, Status> {
let g = self.state.ws(req.metadata())?.graph().await?;
Ok(Response::new(pb::JsonResponse { json: nornir::mimir::svg(&g.graph) }))
}
async fn repo_overview(&self, req: Request<RepoOnly>) -> Result<Response<pb::JsonResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let repo = req.into_inner().repo;
let g = wctx.graph().await?;
let wh = wctx.warehouse.clone();
let value = tokio::task::spawn_blocking(move || nornir::mimir::repo_overview(&g.graph, &wh, &repo))
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json: serde_json::to_string(&value).map_err(internal)? }))
}
async fn security_scan(&self, req: Request<RepoOnly>) -> Result<Response<pb::JsonResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let repo = req.into_inner().repo;
let repo_dir = wctx.loaded.lock().await.workspace_root.join(&repo);
let scan_root = repo_dir.parent().map(|p| p.to_path_buf());
let wh = wctx.warehouse.clone();
let job = wctx.jobs.start(
nornir::jobs::kind::SECURITY_SCAN,
&repo,
&wctx.name,
serde_json::Value::Null,
);
let value = match tokio::task::spawn_blocking(move || -> anyhow::Result<serde_json::Value> {
use std::collections::HashMap;
if let Some(root) = &scan_root {
let (cloned, linked) =
nornir::security::prepare_path_deps(&repo_dir, root, &|_| None);
if cloned + linked > 0 {
eprintln!(
"nornir-security: prepared path-deps for scan of {} ({cloned} cloned, {linked} linked)",
repo_dir.display()
);
}
}
let (repo_name, comps) = nornir::security::components(&repo_dir)?;
let cache = wh.query_vuln_findings().unwrap_or_default();
let key = |c: &nornir::security::Component| (c.name.clone(), c.version.clone());
let now = chrono::Utc::now().timestamp_micros();
const TTL_MICROS: i64 = 7 * 24 * 3600 * 1_000_000;
let is_fresh = |c: &nornir::security::Component| {
cache.get(&key(c)).map(|(_, _, ts)| now - *ts < TTL_MICROS).unwrap_or(false)
};
let misses: Vec<nornir::security::Component> =
comps.iter().filter(|c| !is_fresh(c)).cloned().collect();
let fresh = nornir::security::query_vulns(&misses)?;
let fresh_map: HashMap<(String, String), &nornir::security::Vuln> =
fresh.iter().map(|v| ((v.crate_name.clone(), v.version.clone()), v)).collect();
let rows: Vec<nornir::warehouse::iceberg::VulnFinding> = misses
.iter()
.map(|c| {
let (ids, summary) =
fresh_map.get(&key(c)).map(|v| (v.ids.clone(), v.summary.clone())).unwrap_or_default();
nornir::warehouse::iceberg::VulnFinding {
crate_name: c.name.clone(),
version: c.version.clone(),
ids,
summary,
checked_at_micros: now,
}
})
.collect();
let _ = wh.append_vuln_findings(&rows);
let mut vulns = Vec::new();
for c in &comps {
let ids: Vec<String> = if is_fresh(c) {
cache.get(&key(c)).map(|(i, _, _)| i.clone()).unwrap_or_default()
} else {
fresh_map.get(&key(c)).map(|v| v.ids.clone()).unwrap_or_default()
};
if !ids.is_empty() {
vulns.push(serde_json::json!({"crate": c.name, "version": c.version, "ids": ids}));
}
}
let mut lic: std::collections::BTreeMap<String, usize> = Default::default();
for c in &comps {
*lic.entry(c.license.clone()).or_default() += 1;
}
let mut lic: Vec<_> = lic.into_iter().collect();
lic.sort_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
Ok(serde_json::json!({
"repo": repo_name,
"components": comps.len(),
"vulns": vulns,
"licenses": lic.into_iter().take(12).map(|(l, n)| serde_json::json!([l, n])).collect::<Vec<_>>(),
"cache_hits": comps.len().saturating_sub(misses.len()),
"cache_misses": misses.len(),
}))
})
.await
.map_err(internal)?
{
Ok(v) => v,
Err(e) => {
job.fail(&e);
return Err(internal(e));
}
};
job.finish(
serde_json::json!({
"components": value.get("components").cloned().unwrap_or(serde_json::Value::Null),
"vulns": value.get("vulns").and_then(|v| v.as_array()).map(|a| a.len()).unwrap_or(0),
}),
"",
);
Ok(Response::new(pb::JsonResponse { json: serde_json::to_string(&value).map_err(internal)? }))
}
async fn changed_since_last_release(&self, req: Request<Empty>) -> Result<Response<pb::JsonResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let g = wctx.graph().await?;
let change = nornir::change::detect(&wctx.warehouse, &g.graph, &g.workspace_name)
.await
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json: serde_json::to_string(&change).map_err(internal)? }))
}
}
struct DwarfSvc { state: Shared }
impl DwarfSvc {
async fn query<F>(
&self,
md: &tonic::metadata::MetadataMap,
repo: String,
sha: String,
f: F,
) -> Result<Response<pb::JsonResponse>, Status>
where
F: FnOnce(&introspect::persist::DwarfFacts) -> anyhow::Result<String> + Send + 'static,
{
let wctx = self.state.ws(md)?.clone();
let wh = wctx.warehouse.clone();
let warehouse_root = wctx.loaded.lock().await.warehouse_root();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let into = warehouse_root
.parent()
.unwrap_or(warehouse_root.as_path())
.join("cache/dwarf")
.join(&repo);
let sha = if sha.is_empty() { None } else { Some(sha.as_str()) };
let facts = introspect::persist::load_dwarf(&wh, &repo, sha, &into)?;
f(&facts)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
}
#[tonic::async_trait]
impl DwarfSvcTrait for DwarfSvc {
async fn symbol_lookup(&self, req: Request<pb::DwarfQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
let limit = if q.limit == 0 { 50 } else { q.limit as usize };
self.query(&md, q.repo, q.sha, move |facts| {
let hits: Vec<_> = facts.lookup(&q.arg).into_iter().take(limit).collect();
Ok(serde_json::to_string(&hits)?)
}).await
}
async fn defined_in(&self, req: Request<pb::DwarfQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
let limit = if q.limit == 0 { 100 } else { q.limit as usize };
self.query(&md, q.repo, q.sha, move |facts| {
let hits: Vec<_> = facts.defined_in(&q.arg).into_iter().take(limit).collect();
Ok(serde_json::to_string(&hits)?)
}).await
}
async fn callers(&self, req: Request<pb::DwarfQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.query(&md, q.repo, q.sha, move |facts| Ok(serde_json::to_string(&facts.callers_of(&q.arg))?)).await
}
async fn callees(&self, req: Request<pb::DwarfQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.query(&md, q.repo, q.sha, move |facts| Ok(serde_json::to_string(&facts.callees_of(&q.arg))?)).await
}
async fn call_path(&self, req: Request<pb::DwarfPathQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.query(&md, q.repo, q.sha, move |facts| Ok(serde_json::to_string(&facts.call_path(&q.from, &q.to))?)).await
}
}
struct VectorSvc {
#[cfg_attr(not(any(feature = "embed-tract", feature = "embed-ort")), allow(dead_code))]
state: Shared,
}
#[tonic::async_trait]
impl VectorSvcTrait for VectorSvc {
async fn search(
&self,
req: Request<pb::VectorSearchRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
#[cfg(not(any(feature = "embed-tract", feature = "embed-ort")))]
{
let _ = req;
Err(Status::unimplemented(
"nornir-server was built without an embedder; rebuild with \
`--features server,embed-tract` (CPU) or `--features server,embed-ort` (GPU) \
to use Vector.Search",
))
}
#[cfg(any(feature = "embed-tract", feature = "embed-ort"))]
{
let wctx = self.state.ws(req.metadata())?.clone();
let embedder = self.state.embedder().await?;
let r = req.into_inner();
let repo = r.repo.clone();
let query = r.query.clone();
let sha = r.sha.clone();
let limit = if r.limit == 0 { 10 } else { r.limit as usize };
let wh = wctx.warehouse.clone();
let hits = tokio::task::spawn_blocking(move || -> anyhow::Result<_> {
let mp = embedder.profile().id();
let q = embedder.embed(std::slice::from_ref(&query))?;
let sha = (!sha.is_empty()).then_some(sha.as_str());
nornir::vector::store::search(&wh, &repo, sha, &mp, &q[0], limit)
})
.await
.map_err(internal)?
.map_err(internal)?;
let out: Vec<_> = hits
.iter()
.map(|(score, o)| {
serde_json::json!({
"score": score,
"file": o.file,
"start_line": o.start_line,
"end_line": o.end_line,
})
})
.collect();
let json = serde_json::to_string(&serde_json::json!({ "repo": r.repo, "hits": out }))
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
}
}
struct FunnelSvc { state: Shared }
impl FunnelSvc {
async fn ctx(&self, md: &tonic::metadata::MetadataMap) -> Result<Arc<WorkspaceCtx>, Status> {
let wctx = self.state.ws(md)?.clone();
wctx.ensure_funnel(false).await?;
Ok(wctx)
}
}
#[tonic::async_trait]
impl FunnelSvcTrait for FunnelSvc {
async fn submit_idea(
&self,
req: Request<pb::SubmitIdeaRequest>,
) -> Result<Response<pb::IdRef>, Status> {
let wctx = self.ctx(req.metadata()).await?;
let r = req.into_inner();
let mut fg = wctx.funnel.lock().await;
let store = fg.as_mut().unwrap();
let id = IdeaId::seq(store.funnel.next_idea);
let source = if r.source.is_empty() { "grpc".into() } else { r.source };
store
.record_async(FunnelEvent::IdeaSubmitted {
id: id.clone(),
source,
text: r.text,
refs: Vec::new(),
item_kind: nornir::funnel::ItemKind::parse(&r.item_kind),
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
Ok(Response::new(pb::IdRef { id: id.as_str().to_string() }))
}
async fn history(
&self,
req: Request<pb::FunnelHistoryRequest>,
) -> Result<Response<pb::FunnelHistoryResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
wctx.ensure_funnel(true).await?;
let r = req.into_inner();
let fg = wctx.funnel.lock().await;
let store = fg.as_ref().unwrap();
let kind = if r.kind.is_empty() {
None
} else {
Some(nornir::funnel::ItemKind::parse(&r.kind))
};
let status = nornir::funnel::HistoryStatus::parse(&r.status);
let mut items = nornir::funnel::history(&store.funnel, kind, status);
if r.limit > 0 {
items.truncate(r.limit as usize);
}
let items = items
.into_iter()
.map(|it| pb::FunnelHistoryItem {
id: it.id,
item_kind: it.item_kind.as_str().to_string(),
text: it.text,
source: it.source,
submitted_at: it.submitted_at,
status: it.status.as_str().to_string(),
plan_ids: it.plan_ids,
})
.collect();
Ok(Response::new(pb::FunnelHistoryResponse { items }))
}
async fn create_plan(
&self,
req: Request<pb::CreatePlanRequest>,
) -> Result<Response<pb::IdRef>, Status> {
let wctx = self.ctx(req.metadata()).await?;
let r = req.into_inner();
let mut fg = wctx.funnel.lock().await;
let store = fg.as_mut().unwrap();
let plan_id = PlanId::seq(store.funnel.next_plan);
store
.record_async(FunnelEvent::PlanCreated {
id: plan_id.clone(),
idea_id: IdeaId::new(r.idea_id),
summary: r.summary,
planner: "grpc".into(),
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
store
.record_async(FunnelEvent::PlanStatusChanged {
plan_id: plan_id.clone(),
status: PlanStatus::Active,
why: None,
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
Ok(Response::new(pb::IdRef { id: plan_id.as_str().to_string() }))
}
async fn add_node(
&self,
req: Request<pb::AddNodeRequest>,
) -> Result<Response<pb::IdRef>, Status> {
let wctx = self.ctx(req.metadata()).await?;
let r = req.into_inner();
let mut fg = wctx.funnel.lock().await;
let store = fg.as_mut().unwrap();
let plan_id = PlanId::new(r.plan_id);
let node_id = NodeId::seq(store.funnel.next_node);
let mut params = serde_json::Map::new();
if !r.title.is_empty() {
params.insert("title".into(), serde_json::Value::String(r.title));
}
store
.record_async(FunnelEvent::NodeAdded {
plan_id: plan_id.clone(),
node_id: node_id.clone(),
kind: r.kind,
params,
targets: r.targets,
prompt_excerpt: (!r.prompt.is_empty()).then_some(r.prompt),
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
for from in &r.needs {
store
.record_async(FunnelEvent::EdgeAdded {
plan_id: plan_id.clone(),
from_node: NodeId::new(from.clone()),
to_node: node_id.clone(),
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
}
store.funnel.promote_ready();
Ok(Response::new(pb::IdRef { id: node_id.as_str().to_string() }))
}
async fn link(&self, req: Request<pb::LinkRequest>) -> Result<Response<Empty>, Status> {
let wctx = self.ctx(req.metadata()).await?;
let r = req.into_inner();
let mut fg = wctx.funnel.lock().await;
let store = fg.as_mut().unwrap();
store
.record_async(FunnelEvent::EdgeAdded {
plan_id: PlanId::new(r.plan_id),
from_node: NodeId::new(r.from),
to_node: NodeId::new(r.to),
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
store.funnel.promote_ready();
Ok(Response::new(Empty {}))
}
async fn set_status(
&self,
req: Request<pb::SetStatusRequest>,
) -> Result<Response<Empty>, Status> {
let wctx = self.ctx(req.metadata()).await?;
let r = req.into_inner();
let status = match r.status.as_str() {
"ready" => NodeStatus::Ready,
"active" | "in_progress" => NodeStatus::InProgress,
"blocked" => NodeStatus::Blocked,
"done" => NodeStatus::Done,
"failed" | "abandoned" => NodeStatus::Failed,
other => {
return Err(invalid_arg(format!(
"unknown status {other:?}; expected ready|active|blocked|done|failed"
)))
}
};
let mut fg = wctx.funnel.lock().await;
let store = fg.as_mut().unwrap();
store
.record_async(FunnelEvent::NodeStatusChanged {
plan_id: PlanId::new(r.plan_id),
node_id: NodeId::new(r.node_id),
status,
why: (!r.why.is_empty()).then_some(r.why),
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
store.funnel.promote_ready();
Ok(Response::new(Empty {}))
}
async fn next(&self, req: Request<Empty>) -> Result<Response<pb::NextStepList>, Status> {
let wctx = self.ctx(req.metadata()).await?;
let mut fg = wctx.funnel.lock().await;
let store = fg.as_mut().unwrap();
let steps = topo_ready(&mut store.funnel)
.into_iter()
.map(|s| pb::NextStep {
plan_id: s.plan_id.as_str().to_string(),
node_id: s.node_id.as_str().to_string(),
kind: s.kind,
summary: s.summary,
prompt: s.prompt_excerpt.unwrap_or_default(),
targets: s.targets,
})
.collect();
Ok(Response::new(pb::NextStepList { steps }))
}
async fn show(&self, req: Request<Empty>) -> Result<Response<pb::FunnelDump>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
wctx.ensure_funnel(true).await?;
let fg = wctx.funnel.lock().await;
let Some(store) = fg.as_ref() else {
return Ok(Response::new(pb::FunnelDump { ideas: Vec::new() }));
};
let f = &store.funnel;
let ideas = f
.ideas
.values()
.map(|idea| {
let plans = f
.plans
.values()
.filter(|p| p.idea_id == idea.id)
.map(|p| {
let nodes = p
.nodes
.values()
.map(|n| {
let title = n
.params
.get("title")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let deps = p
.edges
.iter()
.filter(|(_, to)| to == &n.id)
.map(|(from, _)| from.as_str().to_string())
.collect();
pb::FunnelDumpNode {
id: n.id.as_str().to_string(),
kind: n.kind.clone(),
title,
status: format!("{:?}", n.status),
deps,
}
})
.collect();
pb::FunnelDumpPlan {
id: p.id.as_str().to_string(),
idea_id: p.idea_id.as_str().to_string(),
summary: p.summary.clone(),
status: format!("{:?}", p.status),
nodes,
}
})
.collect();
pb::FunnelDumpIdea {
id: idea.id.as_str().to_string(),
text: idea.text.clone(),
source: idea.source.clone(),
plans,
item_kind: idea.item_kind.as_str().to_string(),
}
})
.collect();
Ok(Response::new(pb::FunnelDump { ideas }))
}
}
struct VizSvc { state: Shared }
#[tonic::async_trait]
impl VizSvcTrait for VizSvc {
async fn timeline(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let workspace = req.into_inner().workspace;
let wh = wctx.warehouse.clone();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let root = wh.root().to_path_buf();
let timeline = nornir::viz::build_timeline(&wh, &workspace, Some(&root))?;
Ok(serde_json::to_string(&timeline)?)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn release_events(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
use nornir::warehouse::release_events::{query_release_events, EventSelector};
let wctx = self.state.ws(req.metadata())?.clone();
let wh = wctx.warehouse.clone();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let rows = wh.block_on(query_release_events(&wh, &EventSelector::All))?;
Ok(serde_json::to_string(&rows)?)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn bakeoff_results(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
use nornir::warehouse::agent_model_runs::{query_agent_model_runs, BakeoffSelector};
let wctx = self.state.ws(req.metadata())?.clone();
let wh = wctx.warehouse.clone();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let rows = wh.block_on(query_agent_model_runs(&wh, &BakeoffSelector::All))?;
Ok(serde_json::to_string(&rows)?)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn test_results(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
use nornir::warehouse::test_results::{query_test_results, TestSelector};
let wctx = self.state.ws(req.metadata())?.clone();
let wh = wctx.warehouse.clone();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let rows = wh.block_on(query_test_results(&wh, &TestSelector::All))?;
Ok(serde_json::to_string(&rows)?)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn test_matrix(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
use nornir::warehouse::test_inventory::query_test_inventory;
use nornir::warehouse::test_results::{query_test_results, TestSelector};
let wctx = self.state.ws(req.metadata())?.clone();
let wh = wctx.warehouse.clone();
let repos: Vec<String> = {
let l = wctx.loaded.lock().await;
l.nornir.repo.keys().cloned().collect()
};
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let mut inventory = Vec::new();
for repo in &repos {
if let Ok(inv) = query_test_inventory(&wh, repo) {
inventory.extend(inv);
}
}
let results = wh.block_on(query_test_results(&wh, &TestSelector::All))?;
Ok(serde_json::json!({ "inventory": inventory, "results": results }).to_string())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn bench_telemetry(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
use nornir::warehouse::{BenchFilter, Warehouse};
let wctx = self.state.ws(req.metadata())?.clone();
let wh = wctx.warehouse.clone();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let telemetry = wh.query_bench_telemetry(None)?; let runs = wh.query_bench_runs(&BenchFilter::default())?; Ok(serde_json::json!({ "telemetry": telemetry, "runs": runs }).to_string())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn knowledge(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let members: Vec<String> = {
let l = wctx.loaded.lock().await;
l.nornir.repo.keys().cloned().collect()
};
let wh = wctx.warehouse.clone();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let summary = nornir::viz::knowledge::summary_from_warehouse(&wh, &members);
Ok(serde_json::to_value(&summary)?.to_string())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn clone_events(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
use nornir::warehouse::clone_events::{query_clone_events, CloneSelector};
let wctx = self.state.ws(req.metadata())?.clone();
let wh = wctx.warehouse.clone();
let name = wctx.name.clone();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let mut rows =
wh.block_on(query_clone_events(&wh, &CloneSelector::Workspace(name)))?;
const VIZ_CLONE_EVENTS_CAP: usize = 2_000;
rows.truncate(VIZ_CLONE_EVENTS_CAP);
Ok(serde_json::to_string(&rows)?)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn architecture(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let workspace = wctx.name.clone();
let members: Vec<String> = {
let l = wctx.loaded.lock().await;
l.nornir.repo.keys().cloned().collect()
};
let wh = wctx.warehouse.clone();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let (graph, members_with_data, errors) =
nornir::arch::warehouse::merged_arch_from_warehouse(&wh, &members);
let coverage =
nornir::arch::warehouse::coverage_for_graph(&wh, &workspace, &graph);
let (merged, metro_fuel_source) =
nornir::knowledge::query::load_preferred_merged(&wh, &members)
.unwrap_or_else(|_| {
(
nornir::knowledge::query::KnowledgeView { symbols: vec![], calls: vec![] },
"",
)
});
let symbols = merged.symbols;
let calls = merged.calls;
Ok(serde_json::json!({
"graph": graph,
"members_with_data": members_with_data,
"errors": errors,
"coverage": coverage,
"symbols": symbols,
"calls": calls,
"metro_fuel_source": metro_fuel_source,
})
.to_string())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn jobs(
&self,
req: Request<pb::JobsRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
use nornir::jobs::JobSelector;
let wctx = self.state.ws(req.metadata())?.clone();
let inner = req.into_inner();
let store = wctx.jobs.clone();
let workspace = wctx.name.clone();
let kind = inner.kind.clone();
let limit = if inner.limit == 0 { 200 } else { inner.limit as usize };
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let sel = if kind.is_empty() {
JobSelector::Workspace(workspace)
} else {
JobSelector::Kind(kind)
};
let mut rows = store.list(&sel)?;
rows.truncate(limit);
Ok(serde_json::to_string(&rows)?)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn coverage(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
use std::collections::BTreeMap;
let wctx = self.state.ws(req.metadata())?.clone();
let wh = wctx.warehouse.clone();
let repo = req.into_inner().workspace; let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
if repo.is_empty() {
return Ok(serde_json::json!({
"rows": [], "fn_rows": [], "history": [],
})
.to_string());
}
let (rows, fn_rows) = wh.block_on(nornir::warehouse::coverage::latest(&wh, &repo))?;
let all = wh.block_on(nornir::warehouse::coverage::query_coverage(&wh, &repo))?;
let mut by_ts: BTreeMap<i64, f64> = BTreeMap::new();
for r in &all {
if r.scope == "overall" {
by_ts.insert(r.ts_micros, r.line_pct());
}
}
let history: Vec<f64> = by_ts.into_values().collect();
Ok(serde_json::json!({
"rows": rows, "fn_rows": fn_rows, "history": history,
})
.to_string())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn holger_registry(
&self,
_req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
#[cfg(feature = "embed-holger")]
let json = tokio::task::spawn_blocking(|| -> anyhow::Result<String> {
let data = std::env::var_os("NORNIR_HOLGER_DATA")
.map(std::path::PathBuf::from)
.unwrap_or_else(|| nornir::config::nornir_home().join("holger-cache"));
let grpc = std::env::var("NORNIR_HOLGER_GRPC").unwrap_or_else(|_| "127.0.0.1:18443".into());
let http = std::env::var("NORNIR_HOLGER_HTTP").unwrap_or_else(|_| "127.0.0.1:18464".into());
let repos = nornir::holger_embed::read_registry(&data, &grpc, &http, 100_000)?;
Ok(serde_json::to_string(&repos)?)
})
.await
.map_err(internal)?
.map_err(internal)?;
#[cfg(not(feature = "embed-holger"))]
let json = "[]".to_string();
Ok(Response::new(pb::JsonResponse { json }))
}
}
struct WarehouseSvc { state: Shared }
#[tonic::async_trait]
impl WarehouseSvcTrait for WarehouseSvc {
async fn tables(
&self,
req: Request<pb::Empty>,
) -> Result<Response<pb::WarehouseTables>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let wh = wctx.warehouse.clone();
let names = tokio::task::spawn_blocking(move || wh.table_names())
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::WarehouseTables { names }))
}
async fn scan(
&self,
req: Request<pb::WarehouseScanRequest>,
) -> Result<Response<pb::WarehouseScan>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let wh = wctx.warehouse.clone();
let r = req.into_inner();
let limit = if r.limit == 0 { 500 } else { r.limit as usize };
let table = r.table;
let preview = tokio::task::spawn_blocking(move || wh.scan_preview(&table, limit))
.await
.map_err(internal)?
.map_err(internal)?;
let rows = preview
.rows
.into_iter()
.map(|cells| pb::WarehouseRow { cells })
.collect();
Ok(Response::new(pb::WarehouseScan { columns: preview.columns, rows }))
}
}
fn auth_interceptor(token: Arc<String>) -> impl Fn(Request<()>) -> Result<Request<()>, Status> + Clone {
move |req: Request<()>| -> Result<Request<()>, Status> {
let supplied = req
.metadata()
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.strip_prefix("Bearer "))
.unwrap_or("");
if supplied.is_empty() || supplied != token.as_str() {
return Err(Status::unauthenticated("missing or invalid bearer token"));
}
Ok(req)
}
}
struct WorkspacesSvc {
registry: Option<Arc<nornir::registry::Registry>>,
root: PathBuf,
state: Shared,
}
impl WorkspacesSvc {
fn reg(&self) -> Result<&Arc<nornir::registry::Registry>, Status> {
self.registry.as_ref().ok_or_else(|| {
Status::unavailable(
"workspace registry not enabled on this server \
(no <home>/.nornir/workspaces registry root)",
)
})
}
}
fn resolve_descriptor_content(root: &Path, name: &str, descriptor: &str) -> Result<String> {
const TOML: &str = "nornir-workspace.toml";
let is_git_url = nornir::gitio::is_ssh_url(descriptor)
|| descriptor.starts_with("http://")
|| descriptor.starts_with("https://")
|| descriptor.ends_with(".git");
if is_git_url {
let dest = root.join(name).join("workspace");
std::fs::create_dir_all(&dest).ok();
nornir::gitio::clone_or_fetch(descriptor, &dest, None)
.with_context(|| format!("clone workspace repo `{descriptor}`"))?;
let toml = dest.join(TOML);
std::fs::read_to_string(&toml)
.with_context(|| format!("read {TOML} from cloned workspace repo at {}", dest.display()))
} else {
let p = Path::new(descriptor);
let toml = if p.is_dir() { p.join(TOML) } else { p.to_path_buf() };
std::fs::read_to_string(&toml)
.with_context(|| format!("read local descriptor {}", toml.display()))
}
}
fn ws_to_pb(w: nornir::registry::Workspace) -> WorkspaceRecord {
WorkspaceRecord {
name: w.name,
mode: w.mode.as_str().to_string(),
descriptor: w.descriptor,
poll: w.poll,
current_snapshot: w.current_snapshot,
members: w
.members
.into_iter()
.map(|m| WorkspaceMember {
name: m.name,
remote: m.remote,
git_ref: m.git_ref,
last_seen_sha: m.last_seen_sha,
last_synced: m.last_synced,
sync_state: m.sync_state,
worktree_digest: m.worktree_digest,
worktree_dirty: m.worktree_dirty,
})
.collect(),
created_at: w.created_at,
updated_at: w.updated_at,
}
}
#[tonic::async_trait]
impl WorkspacesSvcTrait for WorkspacesSvc {
async fn list(&self, _req: Request<Empty>) -> Result<Response<WorkspaceList>, Status> {
let all = self.reg()?.list().map_err(internal)?;
Ok(Response::new(WorkspaceList {
workspaces: all.into_iter().map(ws_to_pb).collect(),
}))
}
async fn get(
&self,
req: Request<WorkspaceName>,
) -> Result<Response<WorkspaceRecord>, Status> {
let name = req.into_inner().name;
let w = self
.reg()?
.get(&name)
.map_err(internal)?
.ok_or_else(|| not_found(format!("no workspace `{name}`")))?;
Ok(Response::new(ws_to_pb(w)))
}
async fn register(
&self,
req: Request<RegisterWorkspaceRequest>,
) -> Result<Response<WorkspaceRecord>, Status> {
let reg = self.reg()?.clone();
let req = req.into_inner();
if req.name.is_empty() {
return Err(invalid_arg("`name` must not be empty"));
}
let created = reg.get(&req.name).map_err(internal)?.map(|w| w.created_at);
let mode = nornir::registry::Mode::parse(&req.mode);
let name = req.name.clone();
let content = if !req.descriptor_content.trim().is_empty() {
req.descriptor_content.clone()
} else {
let root = self.root.clone();
let nm = name.clone();
let desc = req.descriptor.clone();
let desc_err = req.descriptor.clone();
tokio::task::spawn_blocking(move || resolve_descriptor_content(&root, &nm, &desc))
.await
.map_err(internal)?
.map_err(|e| invalid_arg(format!("descriptor `{desc_err}`: {e:#}")))?
};
let ws = nornir::registry::Workspace::from_content(
req.name,
req.descriptor,
content,
mode,
req.poll,
created,
);
if !req.lazy
&& mode == nornir::registry::Mode::Monitored
&& ws.members.is_empty()
{
return Err(invalid_arg(format!(
"monitored workspace `{}` resolved 0 members from descriptor `{}`: it must be \
a server-readable nornir-workspace.toml with [repos.*] entries — place the \
descriptor on the server, or pass lazy=true to defer population to the poll loop",
ws.name, ws.descriptor
)));
}
reg.upsert(&ws).map_err(internal)?;
if mode == nornir::registry::Mode::Monitored && self.state.get_ws(&ws.name).is_none() {
let members: Vec<String> = ws.members.iter().map(|m| m.name.clone()).collect();
match open_monitored_workspace(&ws.name, &self.root, members).await {
Ok(ctx) => {
eprintln!(
"nornir-server: now serving monitored workspace `{}` (registered at runtime)",
ws.name
);
self.state.insert_ws(ws.name.clone(), Arc::new(ctx));
}
Err(e) => eprintln!(
"nornir-server: open served handle for `{}` failed (poll loop will retry): {e:#}",
ws.name
),
}
}
if !req.lazy && mode == nornir::registry::Mode::Monitored {
let reg2 = reg.clone();
let root = self.root.clone();
let state = self.state.clone();
let log_name = ws.name.clone();
let build = tokio::task::spawn_blocking(move || -> Result<()> {
let jobs = state.get_ws(&name).map(|ctx| ctx.jobs.clone());
let parent = jobs.as_ref().map(|j| {
j.start(
nornir::jobs::kind::WORKSPACE_POPULATE,
&name,
&name,
serde_json::json!({ "trigger": "register-eager" }),
)
});
let parent_id = parent.as_ref().map(|p| p.job_id().to_string());
let mut member_jobs = 0usize;
let fetch_res = nornir::monitor::fetch_workspace_with_progress(
®2,
&root,
&name,
|mo: &nornir::monitor::MemberOutcome| {
if let (Some(jobs), Some(pid)) = (jobs.as_ref(), parent_id.as_ref()) {
member_jobs += 1;
let child = jobs.start_child(
pid,
nornir::jobs::kind::WORKSPACE_CLONE,
&mo.member,
&name,
serde_json::json!({ "remote": mo.remote, "op": mo.op }),
);
let detail = serde_json::json!({
"member": mo.member,
"status": mo.status,
"detail": mo.detail,
"elapsed_ms": mo.elapsed_ms,
});
if mo.status == "ok" {
child.finish(detail, &format!("clone_events:{name}"));
} else {
child.fail_with_detail(detail);
}
}
},
)
.with_context(|| format!("eager fetch members of `{name}`"));
let rep = match fetch_res {
Ok(r) => r,
Err(e) => {
if let Some(p) = parent {
p.fail(&e);
}
return Err(e);
}
};
if let Some(ctx) = state.get_ws(&name) {
nornir::warehouse::clone_events::record_fetch_report(&ctx.warehouse, &rep);
}
let build_job = match (jobs.as_ref(), parent_id.as_ref()) {
(Some(jobs), Some(pid)) => Some(jobs.start_child(
pid,
nornir::jobs::kind::WORKSPACE_REPUBLISH,
&name,
&name,
serde_json::json!({ "phase": "build", "trigger": "register-eager" }),
)),
_ => None,
};
let built = if state.get_ws(&name).is_some() {
republish_served(&state, ®2, &root, &name, None)
.with_context(|| format!("eager build `{name}` warehouse (served handle)"))
.map(|snap| snap.unwrap_or_default())
} else {
nornir::monitor::republish(®2, &root, &name, None)
.with_context(|| format!("eager build `{name}` warehouse (own handle)"))
};
let _ = member_jobs;
match built {
Ok(snap) => {
if let Some(job) = build_job {
job.finish(
serde_json::json!({ "snapshot": snap }),
&format!("clone_events:{name}"),
);
}
if let Some(p) = parent {
p.finish(
serde_json::json!({
"fetched": rep.fetched,
"changed": rep.changed.len(),
"errors": rep.errors.len(),
"snapshot": snap,
}),
&format!("clone_events:{name}"),
);
}
Ok(())
}
Err(e) => {
if let Some(job) = build_job {
job.fail(&e);
}
if let Some(p) = parent {
p.fail(&e);
}
Err(e)
}
}
})
.await;
match build {
Ok(Ok(())) => {}
Ok(Err(e)) => eprintln!(
"nornir-monitor: eager build `{log_name}` failed (poll loop will retry): {e:#}"
),
Err(e) => eprintln!(
"nornir-monitor: eager build `{log_name}` task panicked (poll loop will retry): {e}"
),
}
}
let populated = reg
.get(&ws.name)
.map_err(internal)?
.unwrap_or(ws);
Ok(Response::new(ws_to_pb(populated)))
}
async fn remove(&self, req: Request<WorkspaceName>) -> Result<Response<Empty>, Status> {
let inner = req.into_inner();
let (name, purge) = (inner.name, inner.purge);
if self.reg()?.remove(&name, purge).map_err(internal)? {
if self.state.remove_ws(&name).is_some() {
eprintln!("nornir-server: stopped serving workspace `{name}` (removed at runtime)");
}
Ok(Response::new(Empty {}))
} else {
Err(not_found(format!("no workspace `{name}`")))
}
}
async fn fetch(
&self,
req: Request<WorkspaceFetchRequest>,
) -> Result<Response<WorkspaceFetchReport>, Status> {
let reg = self.reg()?.clone();
let root = self.root.clone();
let state = self.state.clone();
let inner = req.into_inner();
let (name, force, background) = (inner.name, inner.force, inner.background);
let (rep, snapshot) = tokio::task::spawn_blocking(move || -> Result<_> {
let job = state.get_ws(&name).map(|ctx| {
ctx.jobs.start(
nornir::jobs::kind::WORKSPACE_FETCH,
&name,
&name,
serde_json::json!({ "force": force, "background": background }),
)
});
let rep = nornir::monitor::fetch_workspace(®, &root, &name)?;
if let Some(ctx) = state.get_ws(&name) {
nornir::warehouse::clone_events::record_fetch_report(&ctx.warehouse, &rep);
}
let snapshot = if !background
&& (force || !rep.changed.is_empty() || workspace_needs_build(®, &name))
{
let changed: Option<&[String]> =
if force || rep.changed.is_empty() { None } else { Some(&rep.changed) };
republish_served(&state, ®, &root, &name, changed)?.unwrap_or_default()
} else {
String::new()
};
if let Some(job) = job {
job.finish(
serde_json::json!({ "fetched": rep.fetched, "changed": rep.changed.len(), "snapshot": snapshot }),
&format!("clone_events:{name}"),
);
}
Ok((rep, snapshot))
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(WorkspaceFetchReport {
workspace: rep.workspace,
fetched: rep.fetched as u32,
changed: rep.changed,
errors: rep
.errors
.into_iter()
.map(|(m, e)| format!("{m}: {e}"))
.collect(),
snapshot,
}))
}
async fn populate_status(
&self,
_req: Request<Empty>,
) -> Result<Response<pb::JsonResponse>, Status> {
use nornir::registry::RosterRow;
use nornir::warehouse::clone_events::{
query_clone_events, workspace_populate_verdict, CloneSelector,
};
let all = self.reg()?.list().map_err(internal)?;
let state = self.state.clone();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let mut rows: Vec<RosterRow> = Vec::with_capacity(all.len());
for ws in all {
let last_synced = ws
.members
.iter()
.map(|m| m.last_synced.as_str())
.filter(|s| !s.is_empty())
.max()
.unwrap_or("never")
.to_string();
let verdict = match state.get_ws(&ws.name) {
Some(ctx) => {
let wh = ctx.warehouse.clone();
let name = ws.name.clone();
let evs = wh
.block_on(query_clone_events(&wh, &CloneSelector::Workspace(name)))
.unwrap_or_default();
workspace_populate_verdict(&evs)
}
None => workspace_populate_verdict(&[]),
};
rows.push(RosterRow {
name: ws.name,
mode: ws.mode.as_str().to_string(),
members: ws.members.len(),
last_synced,
populate: verdict.state,
failing_member: verdict.failing_member,
last_error: verdict.last_error,
});
}
Ok(serde_json::to_string(&rows)?)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
}
struct OpsSvc { state: Shared }
fn run_test_matrix_one(
repo: &str,
repo_root: &std::path::Path,
aspects: &[nornir::test_matrix::Aspect],
run_id: &str,
ts_micros: i64,
) -> (bool, String, Vec<nornir::warehouse::test_results::TestResultRow>) {
use nornir::test_matrix::{detect_runner, list_tests, outcome_to_rows, run_aspect};
use nornir::warehouse::test_results::{listed_rows, status as test_status};
let mut rows = Vec::new();
let runner = detect_runner();
if let Ok(names) = list_tests(repo_root, runner) {
rows.extend(listed_rows(&names, run_id, repo, ts_micros));
}
for &aspect in aspects {
let out = run_aspect(repo_root, aspect);
rows.extend(outcome_to_rows(&out, run_id, repo, ts_micros));
}
let red = rows.iter().any(|r| test_status::is_red(&r.status));
let passed = rows.iter().filter(|r| r.status == test_status::PASS).count();
let failed = rows.iter().filter(|r| test_status::is_red(&r.status)).count();
let msg = format!("{passed} passed · {failed} red ({} rows)", rows.len());
(!red, msg, rows)
}
#[tonic::async_trait]
impl OpsSvcTrait for OpsSvc {
async fn run_test_matrix(
&self,
req: Request<RunTestMatrixRequest>,
) -> Result<Response<RunOpResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let inner = req.into_inner();
let aspects: Vec<nornir::test_matrix::Aspect> = {
use nornir::test_matrix::{parse_aspect, Aspect};
let s = inner.aspects.trim();
if s.is_empty() {
Aspect::DEFAULT.to_vec()
} else {
let mut out = Vec::new();
for tok in s.split(',').map(str::trim).filter(|t| !t.is_empty()) {
match parse_aspect(tok) {
Some(a) if !out.contains(&a) => out.push(a),
Some(_) => {}
None => return Err(invalid_arg(format!("unknown aspect `{tok}`"))),
}
}
out
}
};
let (repos, roots): (Vec<String>, Vec<PathBuf>) = {
let l = wctx.loaded.lock().await;
let names: Vec<String> = if inner.repo.is_empty() {
l.nornir.repo.keys().cloned().collect()
} else {
if !l.nornir.repo.contains_key(&inner.repo) {
return Err(not_found(format!("repo `{}` not configured", inner.repo)));
}
vec![inner.repo.clone()]
};
let roots = names
.iter()
.map(|n| config::Nornir::repo_dir(&l.workspace_root, n))
.collect();
(names, roots)
};
if repos.is_empty() {
return Err(invalid_arg("no repos configured in this workspace"));
}
let target = if inner.repo.is_empty() { "all".to_string() } else { inner.repo.clone() };
let job = wctx.jobs.start(
nornir::jobs::kind::TEST_MATRIX,
&target,
&wctx.name,
serde_json::json!({ "repos": repos.len(), "aspects": aspects.len() }),
);
let run_id = nornir::warehouse::test_results::new_run_id();
let ts = chrono::Utc::now().timestamp_micros();
let run_id2 = run_id.clone();
let features = inner.features.trim().to_string();
let (targets, all_rows) = tokio::task::spawn_blocking(move || {
if !features.is_empty() {
unsafe { std::env::set_var("NORNIR_TEST_FEATURES", &features) };
}
let mut targets = Vec::new();
let mut all_rows = Vec::new();
for (repo, root) in repos.iter().zip(roots.iter()) {
let (green, msg, rows) = run_test_matrix_one(repo, root, &aspects, &run_id2, ts);
targets.push(OpTarget {
name: repo.clone(),
status: if green { "pass" } else { "fail" }.into(),
message: if green { String::new() } else { msg },
});
all_rows.extend(rows);
}
(targets, all_rows)
})
.await
.map_err(internal)?;
use nornir::warehouse::test_results::append_test_results;
for repo in repos_in(&all_rows) {
let rows: Vec<_> = all_rows.iter().filter(|r| r.repo == repo).cloned().collect();
append_test_results(&wctx.warehouse, &rows).await.map_err(internal)?;
}
let ok = targets.iter().all(|t| t.status != "fail");
let summary = format!(
"test matrix: {}/{} repo(s) green ({} rows → run {})",
targets.iter().filter(|t| t.status == "pass").count(),
targets.len(),
all_rows.len(),
&run_id[..8.min(run_id.len())],
);
job.finish(serde_json::json!({ "ok": ok, "rows": all_rows.len() }), &format!("test_results:{run_id}"));
Ok(Response::new(RunOpResult { ok, summary, targets, run_id }))
}
async fn run_bench(
&self,
req: Request<RunBenchRequest>,
) -> Result<Response<RunOpResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let inner = req.into_inner();
let (repo, root) = {
let l = wctx.loaded.lock().await;
let repo = if inner.repo.is_empty() {
l.nornir.repo.keys().next().cloned().ok_or_else(|| {
invalid_arg("no repos configured in this workspace")
})?
} else {
if !l.nornir.repo.contains_key(&inner.repo) {
return Err(not_found(format!("repo `{}` not configured", inner.repo)));
}
inner.repo.clone()
};
(repo.clone(), config::Nornir::repo_dir(&l.workspace_root, &repo))
};
let job = wctx
.jobs
.start_scheduled(nornir::jobs::kind::BENCH_RUN, &repo, &wctx.name, serde_json::Value::Null)
.await;
let root2 = root.clone();
let run = tokio::task::spawn_blocking(move || release::pipeline::run_bench_example(&root2))
.await
.map_err(internal)?
.map_err(internal)?;
let Some(mut run) = run else {
job.finish(serde_json::json!({ "skipped": "no bench example" }), "");
return Ok(Response::new(RunOpResult {
ok: true,
summary: format!("bench skipped: no examples/nornir-bench.rs in {}", root.display()),
targets: vec![OpTarget { name: repo, status: "skip".into(), message: "no bench example".into() }],
run_id: String::new(),
}));
};
if run.machine.trim().is_empty() {
run.machine = std::env::var("NORNIR_MACHINE").unwrap_or_else(|_| "server".into());
}
let run_id = wctx.warehouse.append_bench_run_async(&repo, &run).await.map_err(internal)?;
let summary = format!(
"bench {repo}: {} result(s), {} test(s) → run {}",
run.results.len(),
run.tests.len(),
&run_id.to_string()[..8.min(run_id.to_string().len())],
);
let ok = run.tests.iter().all(|t| t.passed);
job.finish(
serde_json::json!({ "ok": ok, "results": run.results.len(), "tests": run.tests.len() }),
&format!("bench_runs:{run_id}"),
);
Ok(Response::new(RunOpResult {
ok,
summary,
targets: vec![OpTarget {
name: repo,
status: if ok { "pass" } else { "fail" }.into(),
message: String::new(),
}],
run_id: run_id.to_string(),
}))
}
async fn run_release(
&self,
req: Request<RunReleaseRequest>,
) -> Result<Response<RunOpResult>, Status> {
use nornir::warehouse::release_events::{phase, status};
let wctx = self.state.ws(req.metadata())?.clone();
let inner = req.into_inner();
let repos: Vec<String> = {
let l = wctx.loaded.lock().await;
if inner.repo.is_empty() {
l.nornir.repo.keys().cloned().collect()
} else {
if !l.nornir.repo.contains_key(&inner.repo) {
return Err(not_found(format!("repo `{}` not configured", inner.repo)));
}
vec![inner.repo.clone()]
}
};
if repos.is_empty() {
return Err(invalid_arg("no repos configured in this workspace"));
}
let target = if inner.repo.is_empty() { "all".to_string() } else { inner.repo.clone() };
let job = wctx.jobs.start(
nornir::jobs::kind::RELEASE_RUN,
&target,
&wctx.name,
serde_json::json!({ "repos": repos.len() }),
);
let run_id = format!("release-{}", chrono::Utc::now().timestamp_micros());
let mut targets = Vec::new();
let mut events = Vec::new();
let ts0 = chrono::Utc::now().timestamp_micros();
for (i, repo) in repos.iter().enumerate() {
let (root, repo_cfg) = {
let l = wctx.loaded.lock().await;
(
config::Nornir::repo_dir(&l.workspace_root, repo),
l.nornir.repo.get(repo).cloned(),
)
};
let repo_cfg = repo_cfg.ok_or_else(|| not_found(format!("repo `{repo}`")))?;
let root2 = root.clone();
let res = tokio::task::spawn_blocking(move || release::gate::no_path_patches(&root2))
.await
.map_err(internal)?;
let _ = &repo_cfg;
let (status_str, detail) = match res {
Ok(()) => (status::OK, String::new()),
Err(e) => (status::FAIL, format!("{e:#}")),
};
targets.push(OpTarget {
name: repo.clone(),
status: if status_str == status::OK { "pass" } else { "fail" }.into(),
message: detail.clone(),
});
events.push(nornir::warehouse::release_events::ReleaseEventRow {
run_id: run_id.clone(),
seq: i as i64,
ts_micros: ts0 + i as i64,
component: repo.clone(),
repo: repo.clone(),
op: "release_gate".into(),
phase: phase::END.into(),
status: status_str.into(),
detail,
depends_on: None,
elapsed_ms: None,
});
}
use nornir::warehouse::release_events::append_release_events;
append_release_events(&wctx.warehouse, &events).await.map_err(internal)?;
let ok = targets.iter().all(|t| t.status != "fail");
let summary = format!(
"release gate: {}/{} repo(s) ready ({} events → run {})",
targets.iter().filter(|t| t.status == "pass").count(),
targets.len(),
events.len(),
&run_id,
);
job.finish(serde_json::json!({ "ok": ok, "events": events.len() }), &format!("release_events:{run_id}"));
Ok(Response::new(RunOpResult { ok, summary, targets, run_id }))
}
async fn run_arch(
&self,
req: Request<pb::RunArchRequest>,
) -> Result<Response<RunOpResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let workspace = wctx.name.clone();
let inner = req.into_inner();
let (repos, roots): (Vec<String>, Vec<PathBuf>) = {
let l = wctx.loaded.lock().await;
let names: Vec<String> = if inner.repo.is_empty() {
l.nornir.repo.keys().cloned().collect()
} else {
if !l.nornir.repo.contains_key(&inner.repo) {
return Err(not_found(format!("repo `{}` not configured", inner.repo)));
}
vec![inner.repo.clone()]
};
let roots = names
.iter()
.map(|n| config::Nornir::repo_dir(&l.workspace_root, n))
.collect();
(names, roots)
};
if repos.is_empty() {
return Err(invalid_arg("no repos configured in this workspace"));
}
let target = if inner.repo.is_empty() { "all".to_string() } else { inner.repo.clone() };
let job = wctx.jobs.start(
nornir::jobs::kind::ARCH_GENERATE,
&target,
&workspace,
serde_json::json!({ "repos": repos.len() }),
);
let repos2 = repos.clone();
let built = tokio::task::spawn_blocking(
move || -> Vec<(String, Result<(nornir::arch::ArchGraph, String, String), String>)> {
let mut out = Vec::new();
for (repo, root) in repos2.iter().zip(roots.iter()) {
let res = (|| {
let scan = nornir::knowledge::scan_all(root, repo)
.map_err(|e| format!("scan {repo}: {e:#}"))?;
let access = nornir::warehouse::access_scan::scan_repo(root)
.map_err(|e| format!("access scan {repo}: {e:#}"))?;
let graph = nornir::arch::generate(
&scan.symbols.symbols,
&scan.symbols.calls,
&access,
);
let svg = graph.to_svg();
let git_sha =
nornir::gitio::head_sha(root).unwrap_or_else(|_| "unknown".into());
Ok((graph, svg, git_sha))
})();
out.push((repo.clone(), res));
}
out
},
)
.await
.map_err(internal)?;
let mut targets = Vec::new();
let mut last_id = String::new();
for (repo, res) in built {
match res {
Err(e) => targets.push(OpTarget {
name: repo,
status: "fail".into(),
message: e,
}),
Ok((graph, svg, git_sha)) => {
match nornir::arch::warehouse::record_arch_wiring_async(
&wctx.warehouse,
&workspace,
&repo,
&git_sha,
&graph,
&svg,
)
.await
{
Ok(rec) => {
last_id = rec.wiring_id.clone();
targets.push(OpTarget {
name: repo,
status: "pass".into(),
message: format!(
"nodes={} edges={} git={}",
rec.node_count,
rec.edge_count,
&rec.git_sha[..12.min(rec.git_sha.len())],
),
});
}
Err(e) => {
let message = format!("historize {repo}: {e:#}");
targets.push(OpTarget { name: repo, status: "fail".into(), message });
}
}
}
}
}
let ok = targets.iter().all(|t| t.status != "fail");
let summary = format!(
"arch: {}/{} repo(s) historized",
targets.iter().filter(|t| t.status == "pass").count(),
targets.len(),
);
job.finish(serde_json::json!({ "ok": ok }), &format!("architecture_wiring:{last_id}"));
Ok(Response::new(RunOpResult { ok, summary, targets, run_id: last_id }))
}
#[allow(unused_variables)]
async fn scip_ingest(
&self,
req: Request<ScipIngestRequest>,
) -> Result<Response<RunOpResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let inner = req.into_inner();
#[cfg(feature = "scip")]
{
let (repo, root) = {
let l = wctx.loaded.lock().await;
let repo = if inner.repo.is_empty() {
l.nornir.repo.keys().next().cloned().ok_or_else(|| {
invalid_arg("no repos configured in this workspace")
})?
} else {
if !l.nornir.repo.contains_key(&inner.repo) {
return Err(not_found(format!("repo `{}` not configured", inner.repo)));
}
inner.repo.clone()
};
(repo.clone(), l.nornir.repo_dir_for(&l.workspace_root, &repo))
};
#[cfg(not(feature = "ra-ingest"))]
if inner.in_process {
return Err(invalid_arg(
"--in-process requires the server be built with the `ra-ingest` feature \
(redeploy nornir-server with `--features ra-ingest`)",
));
}
let (sha, _branch) = nornir::gitio::head_sha_and_branch(&root)
.map_err(|e| internal(anyhow!("read git HEAD in {}: {e:#}", root.display())))?;
let snapshot_id = uuid::Uuid::new_v4();
let no_save = inner.no_save;
let in_process = inner.in_process;
let index = if inner.index_path.is_empty() { None } else { Some(inner.index_path.clone()) };
let job = wctx
.jobs
.start_scheduled(
nornir::jobs::kind::SCIP_INGEST,
&repo,
&wctx.name,
serde_json::json!({ "in_process": in_process, "no_save": no_save }),
)
.await;
let root2 = root.clone();
let repo2 = repo.clone();
let sha2 = sha.clone();
let scan = tokio::task::spawn_blocking(move || -> Result<nornir::knowledge::scip::ScipScan> {
#[cfg(feature = "ra-ingest")]
if in_process {
return nornir::knowledge::ra_ingest::ingest_in_process(
&root2, &repo2, &sha2, snapshot_id, chrono::Utc::now(),
);
}
let index_path = if let Some(p) = index {
std::path::PathBuf::from(p)
} else {
let out = root2.join("index.scip");
let status = std::process::Command::new("rust-analyzer")
.arg("scip")
.arg(&root2)
.arg("--output")
.arg(&out)
.status()
.context("spawning rust-analyzer (install: `rustup component add rust-analyzer`)")?;
if !status.success() {
anyhow::bail!("rust-analyzer scip failed ({status})");
}
out
};
nornir::knowledge::scip::ingest_index_file(
&index_path, &repo2, &sha2, snapshot_id, chrono::Utc::now(),
)
})
.await
.map_err(internal)?;
let scan = match scan {
Ok(s) => s,
Err(e) => {
job.finish(serde_json::json!({ "ok": false }), "");
return Err(internal(e));
}
};
let defs = scan.rows.iter().filter(|r| r.is_definition).count();
let refs = scan.rows.len() - defs;
let total = scan.rows.len();
let mut summary = format!("ingested {total} occurrence(s) ({defs} def, {refs} ref) @ {sha}");
let run_id = if !no_save {
wctx.warehouse.append_scip_scan_async(&scan).await.map_err(internal)?;
summary.push_str(&format!(
"; persisted: scip_occurrences snapshot={snapshot_id} repo={repo} sha={sha}"
));
snapshot_id.to_string()
} else {
String::new()
};
job.finish(
serde_json::json!({ "ok": true, "occurrences": total, "defs": defs, "refs": refs, "saved": !no_save }),
&if no_save { String::new() } else { format!("scip_occurrences:{snapshot_id}") },
);
Ok(Response::new(RunOpResult {
ok: true,
summary,
targets: vec![OpTarget {
name: repo,
status: "pass".into(),
message: String::new(),
}],
run_id,
}))
}
#[cfg(not(feature = "scip"))]
{
Err(Status::unimplemented(
"Ops.ScipIngest requires the server be built with the `scip` feature \
(redeploy nornir-server with `--features scip` or `--features ra-ingest`)",
))
}
}
}
fn repos_in(rows: &[nornir::warehouse::test_results::TestResultRow]) -> Vec<String> {
let mut seen = Vec::new();
for r in rows {
if !seen.contains(&r.repo) {
seen.push(r.repo.clone());
}
}
seen
}
async fn open_monitored_workspace(
name: &str,
root: &std::path::Path,
members: Vec<String>,
) -> Result<WorkspaceCtx> {
let git_dir = root.join(name).join("git");
let builds = root.join(name).join("builds");
std::fs::create_dir_all(&git_dir).ok();
std::fs::create_dir_all(&builds).ok();
let wh_root = builds.clone();
let warehouse = Arc::new(
tokio::task::spawn_blocking(move || IcebergWarehouse::open(&wh_root))
.await
.context("spawn warehouse open")?
.with_context(|| format!("open warehouse {}", builds.display()))?,
);
let index_dir = builds.join("cache").join("index");
let git_for_idx = git_dir.clone();
let wh_for_open = warehouse.clone();
let (idx, _restored) = tokio::task::spawn_blocking(move || {
index::Index::open_or_restore_at(&git_for_idx, &index_dir, &wh_for_open, "_workspace", None)
})
.await
.context("spawn index open")?
.context("warm index for monitored workspace")?;
let mut nornir = config::Nornir::default();
for m in &members {
nornir.repo.insert(m.clone(), config::Repo::default());
}
let loaded = config::Loaded {
nornir,
config_path: builds.join("nornir.toml"),
workspace_root: git_dir,
};
let jobs = Arc::new(
nornir::jobs::JobStore::open(warehouse.root())
.with_context(|| format!("open job ledger at {}", warehouse.root().display()))?,
);
Ok(WorkspaceCtx {
name: name.to_string(),
loaded: Mutex::new(loaded),
index: Arc::new(idx),
warehouse,
mimir: Mutex::new(None),
funnel: Mutex::new(None),
jobs,
})
}
fn spawn_server_poll_loop(
state: Shared,
reg: Arc<nornir::registry::Registry>,
root: PathBuf,
tick: std::time::Duration,
) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(tick);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
let (state, reg, root) = (state.clone(), reg.clone(), root.clone());
match tokio::task::spawn_blocking(move || server_sweep(&state, ®, &root)).await {
Ok(Ok(())) => {}
Ok(Err(e)) => eprintln!("nornir-monitor: sweep error: {e:#}"),
Err(e) => eprintln!("nornir-monitor: sweep task panicked: {e}"),
}
}
});
}
fn server_sweep(
state: &AppState,
reg: &nornir::registry::Registry,
root: &std::path::Path,
) -> Result<()> {
use nornir::registry::Mode;
for ws in reg.list()? {
if ws.mode != Mode::Monitored {
continue;
}
let rep = match nornir::monitor::fetch_workspace(reg, root, &ws.name) {
Ok(r) => r,
Err(e) => {
eprintln!("nornir-monitor: {} fetch failed: {e:#}", ws.name);
continue;
}
};
for (m, e) in &rep.errors {
eprintln!("nornir-monitor: {}::{m} fetch error: {e}", ws.name);
}
if let Some(ctx) = state.get_ws(&ws.name) {
nornir::warehouse::clone_events::record_sweep_report(&ctx.warehouse, &rep);
}
let needs_build = workspace_needs_build(reg, &ws.name);
if rep.changed.is_empty() && !needs_build {
continue;
}
let why = if !rep.changed.is_empty() {
format!("changed [{}]", rep.changed.join(", "))
} else {
"empty warehouse".to_string()
};
eprintln!("nornir-monitor: {} {why} → republish", ws.name);
let changed: Option<&[String]> =
if rep.changed.is_empty() { None } else { Some(&rep.changed) };
let job = state.get_ws(&ws.name).map(|ctx| {
ctx.jobs.start(
nornir::jobs::kind::WORKSPACE_REPUBLISH,
&ws.name,
&ws.name,
serde_json::json!({ "why": why }),
)
});
match republish_served(state, reg, root, &ws.name, changed) {
Ok(Some(snap)) => {
eprintln!("nornir-monitor: {} republished → snapshot {snap}", ws.name);
if let Some(job) = job {
job.finish(serde_json::json!({ "snapshot": snap }), &format!("clone_events:{}", ws.name));
}
}
Ok(None) => {
if let Some(job) = job {
job.finish(serde_json::json!({ "snapshot": null }), "");
}
}
Err(e) => {
if let Some(job) = job {
job.fail(&e);
}
eprintln!("nornir-monitor: {} republish failed: {e:#}", ws.name);
}
}
}
Ok(())
}
fn workspace_needs_build(reg: &nornir::registry::Registry, ws_name: &str) -> bool {
reg.get(ws_name)
.ok()
.flatten()
.map(|r| r.current_snapshot.trim().is_empty())
.unwrap_or(false)
}
fn republish_served(
state: &AppState,
reg: &nornir::registry::Registry,
root: &std::path::Path,
ws_name: &str,
changed: Option<&[String]>,
) -> Result<Option<String>> {
let Some(ctx) = state.get_ws(ws_name) else {
eprintln!("nornir-monitor: {ws_name} not served; skipping republish");
return Ok(None);
};
let rec = reg.get(ws_name)?;
let members: Vec<String> = rec
.as_ref()
.map(|w| w.members.iter().map(|m| m.name.clone()).collect())
.unwrap_or_default();
let descriptor = rec.as_ref().map(|w| w.descriptor.clone()).unwrap_or_default();
let git_dir = root.join(ws_name).join("git");
let index_dir = root.join(ws_name).join("builds").join("cache").join("index");
let deep_scan = nornir::workspace::WorkspaceDescriptor::load(std::path::Path::new(&descriptor))
.map(|d| d.workspace.deep_scan)
.unwrap_or(false);
let snap = nornir::monitor::republish_with(
&ctx.warehouse,
&ctx.index,
&git_dir,
&index_dir,
&members,
ws_name,
deep_scan,
changed,
)?;
if let Ok(Some(mut r)) = reg.get(ws_name) {
r.current_snapshot = snap.clone();
r.updated_at = chrono::Utc::now().to_rfc3339();
let _ = reg.upsert(&r);
}
Ok(Some(snap))
}
fn ensure_cargo_bin_on_path() {
let Some(home) = std::env::var_os("HOME") else { return };
let cargo_bin = std::path::Path::new(&home).join(".cargo").join("bin");
if !cargo_bin.is_dir() {
return;
}
let current = std::env::var_os("PATH").unwrap_or_default();
if std::env::split_paths(¤t).any(|p| p == cargo_bin) {
return; }
let mut paths = vec![cargo_bin];
paths.extend(std::env::split_paths(¤t));
if let Ok(joined) = std::env::join_paths(paths) {
unsafe { std::env::set_var("PATH", joined) };
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "nornir_server=info,tonic=info".into()),
)
.with_writer(std::io::stderr)
.init();
ensure_cargo_bin_on_path();
let args: Vec<String> = std::env::args().collect();
let token_flag = flag_or_env(&args, "--token", "NORNIR_SERVER_TOKEN");
let token = match token_flag {
Some(t) => t,
None => {
let path = config::nornir_home().join("token");
std::fs::read_to_string(&path)
.map(|s| s.trim().to_string())
.map_err(|e| {
anyhow!(
"no bearer token: pass --token, set NORNIR_SERVER_TOKEN, or write one \
(≥16 chars) to {} ({e}). Future: mTLS with CN=<username>.",
path.display()
)
})?
}
};
if token.len() < 16 {
return Err(anyhow!(
"bearer token must be ≥ 16 chars (--token / NORNIR_SERVER_TOKEN / <home>/.nornir/token)"
));
}
let mut workspaces: HashMap<String, Arc<WorkspaceCtx>> = HashMap::new();
let registry_root = config::registry_root();
let registry: Option<Arc<nornir::registry::Registry>> =
match nornir::registry::Registry::open(®istry_root) {
Ok(r) => Some(Arc::new(r)),
Err(e) => {
eprintln!("nornir-server: registry open failed ({e:#}); monitored role off");
None
}
};
if let Some(reg) = ®istry {
for ws in reg.list().unwrap_or_default() {
if ws.mode != nornir::registry::Mode::Monitored || workspaces.contains_key(&ws.name) {
continue;
}
let members: Vec<String> = ws.members.iter().map(|m| m.name.clone()).collect();
match open_monitored_workspace(&ws.name, ®istry_root, members)
.await
{
Ok(ctx) => {
eprintln!(
"nornir-server: serving monitored workspace `{}` (builds/ warehouse)",
ws.name
);
workspaces.insert(ws.name.clone(), Arc::new(ctx));
}
Err(e) => eprintln!("nornir-server: open monitored `{}` failed: {e:#}", ws.name),
}
}
}
if workspaces.is_empty() {
eprintln!(
"nornir-server: serving 0 workspace(s) (empty registry — register one with \
`nornir workspace register <name> --descriptor <toml>`)"
);
} else {
eprintln!("nornir-server: serving {} workspace(s)", workspaces.len());
}
let state: Shared = Arc::new(AppState {
workspaces: std::sync::RwLock::new(workspaces),
#[cfg(any(feature = "embed-tract", feature = "embed-ort"))]
embedder: Mutex::new(None),
});
let addr_str = flag_or_env(&args, "--addr", "NORNIR_SERVER_ADDR")
.unwrap_or_else(|| "127.0.0.1:7878".into());
let addr: SocketAddr =
addr_str.parse().with_context(|| format!("parse server bind addr {addr_str:?} (--addr / NORNIR_SERVER_ADDR)"))?;
eprintln!("nornir-server: listening on {addr}");
if let Some(reg) = registry.clone() {
let tick = std::time::Duration::from_secs(60);
eprintln!(
"nornir-monitor: poll loop every {}s over registry at {}",
tick.as_secs(),
registry_root.display()
);
spawn_server_poll_loop(state.clone(), reg, registry_root.clone(), tick);
}
let token_arc = Arc::new(token);
let health = HealthServer::with_interceptor(
HealthSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let repos = ReposServer::with_interceptor(
ReposSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let guard_svc = GuardServer::with_interceptor(
GuardSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let release_svc = ReleaseServer::with_interceptor(
ReleaseSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let bench_svc = BenchServer::with_interceptor(
BenchSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let telemetry_svc = TelemetryServer::with_interceptor(
TelemetrySvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let search_svc = SearchServer::with_interceptor(
SearchSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let index_svc = IndexServer::with_interceptor(
IndexSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let introspect_svc = IntrospectServer::with_interceptor(
IntrospectSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let knowledge_svc = KnowledgeServer::with_interceptor(
KnowledgeSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let docs_svc = DocsServer::with_interceptor(
DocsSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let mimir_svc = MimirServer::with_interceptor(
MimirSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let dwarf_svc = DwarfServer::with_interceptor(
DwarfSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let vector_svc = VectorServer::with_interceptor(
VectorSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let funnel_svc = FunnelServer::with_interceptor(
FunnelSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let viz_svc = tonic::service::interceptor::InterceptedService::new(
VizServer::new(VizSvc { state: state.clone() })
.max_encoding_message_size(64 << 20)
.max_decoding_message_size(64 << 20),
auth_interceptor(token_arc.clone()),
);
let warehouse_svc = WarehouseServer::with_interceptor(
WarehouseSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let workspaces_svc = WorkspacesServer::with_interceptor(
WorkspacesSvc {
registry: registry.clone(),
root: registry_root.clone(),
state: state.clone(),
},
auth_interceptor(token_arc.clone()),
);
let ops_svc = OpsServer::with_interceptor(
OpsSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
#[cfg(feature = "embed-holger")]
{
let enabled = std::env::var_os("NORNIR_HOLGER").is_some()
|| std::env::var_os("NORNIR_HOLGER_HTTP").is_some();
if enabled {
let data = std::env::var_os("NORNIR_HOLGER_DATA")
.map(PathBuf::from)
.unwrap_or_else(|| config::nornir_home().join("holger-cache"));
let grpc = std::env::var("NORNIR_HOLGER_GRPC").unwrap_or_else(|_| "127.0.0.1:18443".into());
let http = std::env::var("NORNIR_HOLGER_HTTP").unwrap_or_else(|_| "127.0.0.1:18464".into());
match nornir::holger_embed::start_in_current_runtime(&data, &grpc, &http) {
Ok(()) => eprintln!(
"nornir-server: embedded holger up — /cache + /sparring on http://{http} (grpc {grpc}, data {})",
data.display()
),
Err(e) => eprintln!("nornir-server: embedded holger failed to start (non-fatal): {e:#}"),
}
}
}
Server::builder()
.add_service(health)
.add_service(repos)
.add_service(guard_svc)
.add_service(release_svc)
.add_service(bench_svc)
.add_service(telemetry_svc)
.add_service(search_svc)
.add_service(index_svc)
.add_service(introspect_svc)
.add_service(knowledge_svc)
.add_service(docs_svc)
.add_service(mimir_svc)
.add_service(dwarf_svc)
.add_service(vector_svc)
.add_service(funnel_svc)
.add_service(viz_svc)
.add_service(warehouse_svc)
.add_service(workspaces_svc)
.add_service(ops_svc)
.serve(addr)
.await
.context("tonic serve")?;
Ok(())
}
#[cfg(test)]
mod descriptor_in_redb_tests {
use nornir::registry::{Mode, Registry, Workspace};
use nornir::workspace::WorkspaceDescriptor;
#[test]
fn descriptor_content_is_stored_in_redb_and_drives_members_and_deep_scan() {
let content = "[workspace]\nname = \"nordisk\"\ndeep_scan = true\n\n\
[repos.skade]\ngit = \"https://x/skade.git\"\n\
[repos.znippy]\ngit = \"https://x/znippy.git\"\n";
let ws = Workspace::from_content(
"nordisk".into(),
"<shipped>".into(),
content.into(),
Mode::Monitored,
"60s".into(),
None,
);
let names: Vec<&str> = ws.members.iter().map(|m| m.name.as_str()).collect();
assert!(
names.contains(&"skade") && names.contains(&"znippy"),
"members resolved from content: {names:?}"
);
assert_eq!(
ws.members.iter().find(|m| m.name == "skade").unwrap().remote,
"https://x/skade.git"
);
assert_eq!(ws.descriptor_content, content);
let dir = std::env::temp_dir().join(format!("nornir-desc-redb-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&dir);
let reg = Registry::open(&dir).unwrap();
reg.upsert(&ws).unwrap();
let got = reg.get("nordisk").unwrap().expect("row present");
assert_eq!(got.descriptor_content, content, "descriptor content round-trips via redb");
assert_eq!(got.members.len(), 2);
let deep = WorkspaceDescriptor::from_content(&got.descriptor_content)
.unwrap()
.workspace
.deep_scan;
assert!(deep, "deep_scan read back from the registry row's content, not a file");
let _ = std::fs::remove_dir_all(&dir);
}
}
#[cfg(test)]
mod empty_registry_tests {
use nornir::registry::{Mode, Registry, Workspace};
use std::collections::HashMap;
use std::sync::Arc;
fn tmp_root() -> std::path::PathBuf {
std::env::temp_dir().join(format!(
"nornir-empty-reg-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
))
}
#[test]
fn empty_registry_is_an_acceptable_state_then_register_appears() {
let root = tmp_root();
let reg = Registry::open(&root).expect("open fresh registry");
let listed = reg.list().expect("list empty registry");
assert_eq!(listed.len(), 0, "fresh registry must list 0 workspaces");
let mut workspaces: HashMap<String, Arc<()>> = HashMap::new();
for ws in reg.list().expect("list for init") {
if ws.mode != Mode::Monitored {
continue;
}
workspaces.insert(ws.name.clone(), Arc::new(()));
}
assert!(workspaces.is_empty(), "empty registry ⇒ empty served map");
let ws = Workspace::new(
"nordisk".to_string(),
"/tmp/does-not-need-to-exist/nornir-workspace.toml".to_string(),
Mode::Monitored,
"60s".to_string(),
None,
);
reg.upsert(&ws).expect("upsert one workspace");
let after = reg.list().expect("list after upsert");
assert_eq!(after.len(), 1, "after register the registry lists 1 workspace");
assert_eq!(after[0].name, "nordisk");
assert_eq!(after[0].mode, Mode::Monitored);
let mut workspaces2: HashMap<String, Arc<()>> = HashMap::new();
for ws in reg.list().expect("list for init 2") {
if ws.mode != Mode::Monitored {
continue;
}
workspaces2.insert(ws.name.clone(), Arc::new(()));
}
assert_eq!(workspaces2.len(), 1, "register ⇒ one served workspace");
assert!(
!workspaces2.is_empty(),
"serving a workspace must not imply any default selection — the served \
set is non-empty but there is no `default_ws` to point anywhere"
);
let _ = std::fs::remove_dir_all(&root);
}
}
#[cfg(test)]
mod no_default_route_tests {
use super::*;
fn empty_state() -> AppState {
AppState {
workspaces: std::sync::RwLock::new(HashMap::new()),
#[cfg(any(feature = "embed-tract", feature = "embed-ort"))]
embedder: Mutex::new(None),
}
}
fn expect_no_selection_err(state: &AppState, md: &tonic::metadata::MetadataMap) {
match state.ws(md) {
Ok(_) => panic!("expected an error — the server must never auto-pick a workspace"),
Err(err) => {
assert_eq!(err.code(), tonic::Code::NotFound);
assert!(
err.message().contains("no workspace selected"),
"unexpected message: {}",
err.message()
);
}
}
}
#[test]
fn ws_with_no_header_errors_no_default() {
let state = empty_state();
let md = tonic::metadata::MetadataMap::new();
expect_no_selection_err(&state, &md);
}
#[test]
fn ws_with_empty_header_errors_no_default() {
let state = empty_state();
let mut md = tonic::metadata::MetadataMap::new();
md.insert("nornir-workspace", "".parse().unwrap());
expect_no_selection_err(&state, &md);
}
}