use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use std::collections::HashMap;
use tokio::sync::Mutex;
use tonic::{transport::Server, Request, Response, Status};
use nornir::config::{self, Loaded};
use nornir::{bench, docs, guard, index, introspect, knowledge, release};
use nornir::warehouse::iceberg::IcebergWarehouse;
use nornir::warehouse::dep_graph::{query_dep_graph_snapshots, topo_order_from_edges};
pub mod pb {
tonic::include_proto!("nornir.v1");
}
use pb::bench_server::{Bench as BenchSvcTrait, BenchServer};
use pb::docs_server::{Docs as DocsSvcTrait, DocsServer};
use pb::dwarf_server::{Dwarf as DwarfSvcTrait, DwarfServer};
use pb::guard_server::{Guard as GuardSvcTrait, GuardServer};
use pb::health_server::{Health, HealthServer};
use pb::index_server::{Index as IndexSvcTrait, IndexServer};
use pb::introspect_server::{Introspect as IntrospectSvcTrait, IntrospectServer};
use pb::knowledge_server::{Knowledge as KnowledgeSvcTrait, KnowledgeServer};
use pb::mimir_server::{Mimir as MimirSvcTrait, MimirServer};
use pb::release_server::{Release as ReleaseSvcTrait, ReleaseServer};
use pb::repos_server::{Repos as ReposSvcTrait, ReposServer};
use pb::search_server::{Search as SearchSvcTrait, SearchServer};
use pb::vector_server::{Vector as VectorSvcTrait, VectorServer};
use pb::funnel_server::{Funnel as FunnelSvcTrait, FunnelServer};
use pb::viz_server::{Viz as VizSvcTrait, VizServer};
use pb::warehouse_server::{Warehouse as WarehouseSvcTrait, WarehouseServer};
use pb::workspaces_server::{Workspaces as WorkspacesSvcTrait, WorkspacesServer};
use nornir::funnel::event::{Event as FunnelEvent, NodeStatus, PlanStatus};
use nornir::funnel::ids::{IdeaId, NodeId, PlanId};
use nornir::funnel::store::Store as FunnelStore;
use nornir::funnel::topo::topo_ready;
use pb::{
BenchHistoryResponse, BinaryOnly, CallQuery, DefinedInRequest, DocExport as PbDocExport,
DocsHistoryRequest, DocsHistoryResponse, DocsResponse, Empty, GateAllResult,
GateFailure, GateResult, GuardPath, GuardReport, IndexBlobFrame, IndexStatsResponse, Kv, Kvf,
KnowledgeCall, KnowledgeCallPathQuery, KnowledgeCallQuery, KnowledgeCalls, KnowledgeSymbol, KnowledgeSymbolQuery,
KnowledgeSymbols, NameList, OrderRequest, OrderResponse, PathBetweenRequest, PingResponse,
ProgressEvent, RegisterRepoRequest, RegisterRepoResponse, ReloadResponse, RepoList, RepoOnly,
RepoSummary, SearchHit, SearchRequest, SearchResponse, SnapshotRequest, SnapshotResponse,
SubmitBenchRequest, SubmitBenchResponse, Symbol as PbSymbol, SymbolList, SymbolLookupRequest,
UnregisterRepoRequest, UnregisterRepoResponse, RegisterWorkspaceRequest, WorkspaceFetchReport,
WorkspaceFetchRequest, WorkspaceList, WorkspaceMember, WorkspaceName, WorkspaceRecord,
};
struct WorkspaceCtx {
loaded: Mutex<Loaded>,
index: Arc<index::Index>,
warehouse: Arc<IcebergWarehouse>,
mimir: Mutex<Option<Arc<GraphCtx>>>,
funnel: Mutex<Option<FunnelStore>>,
}
struct GraphCtx {
graph: nornir::warehouse::dep_graph::WorkspaceGraph,
workspace_name: String,
}
impl WorkspaceCtx {
async fn graph(&self) -> Result<Arc<GraphCtx>, Status> {
if let Some(g) = self.mimir.lock().await.as_ref() {
return Ok(g.clone());
}
let (workspace_root, config_path) = {
let l = self.loaded.lock().await;
(l.workspace_root.clone(), l.config_path.clone())
};
let (graph, workspace_name) = tokio::task::spawn_blocking(move || {
nornir::mimir::build_graph_at(&workspace_root, &config_path)
})
.await
.map_err(internal)?
.map_err(internal)?;
let arc = Arc::new(GraphCtx { graph, workspace_name });
*self.mimir.lock().await = Some(arc.clone());
Ok(arc)
}
async fn funnel_root(&self) -> PathBuf {
let l = self.loaded.lock().await;
FunnelStore::resolve_root(&l.workspace_root, &l.nornir.storage.local_path, None)
}
async fn ensure_funnel(&self, read_only: bool) -> Result<(), Status> {
let mut fg = self.funnel.lock().await;
let needs_open = match fg.as_ref() {
None => true,
Some(s) => !read_only && s.is_read_only(),
};
if needs_open {
let root = self.funnel_root().await;
let store = if read_only {
FunnelStore::open_read_only_async(root).await
} else {
FunnelStore::open_async(root).await
}
.map_err(internal)?;
*fg = Some(store);
}
Ok(())
}
}
struct AppState {
workspaces: HashMap<String, Arc<WorkspaceCtx>>,
default_ws: String,
#[cfg(any(feature = "embed-tract", feature = "embed-ort"))]
embedder: Mutex<Option<Arc<dyn nornir::vector::store::Embedder>>>,
}
impl std::ops::Deref for AppState {
type Target = WorkspaceCtx;
fn deref(&self) -> &WorkspaceCtx {
self.workspaces
.get(&self.default_ws)
.expect("default workspace is always present")
}
}
impl AppState {
#[allow(dead_code)] fn ws(&self, md: &tonic::metadata::MetadataMap) -> Result<&Arc<WorkspaceCtx>, Status> {
let name = md
.get("nornir-workspace")
.and_then(|v| v.to_str().ok())
.unwrap_or(self.default_ws.as_str());
self.workspaces
.get(name)
.ok_or_else(|| not_found(format!("workspace `{name}` is not served")))
}
#[cfg(any(feature = "embed-tract", feature = "embed-ort"))]
async fn embedder(&self) -> Result<Arc<dyn nornir::vector::store::Embedder>, Status> {
if let Some(e) = self.embedder.lock().await.as_ref() {
return Ok(e.clone());
}
let e: Arc<dyn nornir::vector::store::Embedder> =
tokio::task::spawn_blocking(|| nornir::vector::load_embedder().map(Arc::from))
.await
.map_err(internal)?
.map_err(internal)?;
let mut g = self.embedder.lock().await;
Ok(g.get_or_insert(e).clone())
}
}
type Shared = Arc<AppState>;
async fn open_workspace(config_path: Option<PathBuf>) -> Result<WorkspaceCtx> {
let loaded = match config_path {
Some(p) => config::load_explicit(&p)?,
None => match config::discover(&std::env::current_dir()?) {
Ok(l) => l,
Err(_) => {
let cwd = std::env::current_dir()?;
eprintln!(
"nornir-server: no nornir.toml discovered; starting empty \
(use Repos.Register to add projects)"
);
config::Loaded {
nornir: config::Nornir::default(),
config_path: cwd.join("nornir.toml"),
workspace_root: cwd,
}
}
},
};
eprintln!(
"nornir-server: loaded {} repo(s) from {}",
loaded.nornir.repo.len(),
loaded.config_path.display()
);
let warehouse_root = if let Some(env) = std::env::var_os("NORNIR_WAREHOUSE") {
let p = PathBuf::from(env);
if p.is_absolute() { p } else { loaded.workspace_root.join(p) }
} else if loaded.nornir.storage.local_path.is_empty() {
config::warehouse_default_root()
} else {
loaded.workspace_root.join(&loaded.nornir.storage.local_path).join("warehouse")
};
std::fs::create_dir_all(&warehouse_root).ok();
let wr = warehouse_root.clone();
let warehouse = Arc::new(
tokio::task::spawn_blocking(move || IcebergWarehouse::open(&wr))
.await
.context("spawn warehouse open")?
.with_context(|| format!("open iceberg warehouse at {}", warehouse_root.display()))?,
);
eprintln!("nornir-server: warehouse online at {}", warehouse_root.display());
let ws_root = loaded.workspace_root.clone();
let index_dir = if std::env::var_os("NORNIR_WAREHOUSE").is_some() {
warehouse_root
.parent()
.map(|p| p.join("cache").join("index"))
.unwrap_or_else(|| warehouse_root.join("cache/index"))
} else {
let lp = &loaded.nornir.storage.local_path;
if lp.is_empty() {
ws_root.join(".nornir/cache/index")
} else {
ws_root.join(lp).join("cache/index")
}
};
let wh_for_open = warehouse.clone();
let (idx, restored) = tokio::task::spawn_blocking(move || {
index::Index::open_or_restore_at(&ws_root, &index_dir, &wh_for_open, "_workspace", None)
})
.await
.context("spawn index open")?
.context("warm Tantivy index")?;
eprintln!(
"nornir-server: index online ({})",
if restored { "rehydrated from iceberg" } else { "loaded from local cache" }
);
Ok(WorkspaceCtx {
loaded: Mutex::new(loaded),
index: Arc::new(idx),
warehouse,
mimir: Mutex::new(None),
funnel: Mutex::new(None),
})
}
struct HealthSvc { state: Shared }
#[tonic::async_trait]
impl Health for HealthSvc {
async fn ping(&self, _req: Request<Empty>) -> Result<Response<PingResponse>, Status> {
let l = self.state.loaded.lock().await;
Ok(Response::new(PingResponse {
status: "ok".into(),
version: env!("CARGO_PKG_VERSION").into(),
repo_count: l.nornir.repo.len() as u32,
}))
}
}
struct ReposSvc { state: Shared }
fn invalid_arg<E: std::fmt::Display>(msg: E) -> Status { Status::invalid_argument(msg.to_string()) }
fn internal<E: std::fmt::Display>(msg: E) -> Status { Status::internal(msg.to_string()) }
fn not_found<E: std::fmt::Display>(msg: E) -> Status { Status::not_found(msg.to_string()) }
#[tonic::async_trait]
impl ReposSvcTrait for ReposSvc {
async fn list(&self, req: Request<Empty>) -> Result<Response<RepoList>, Status> {
let l = self.state.ws(req.metadata())?.loaded.lock().await;
let repos = l.nornir.repo.iter().map(|(name, r)| RepoSummary {
name: name.clone(),
remote: r.remote.clone(),
history: r.history.clone(),
readme: r.readme.clone(),
}).collect();
Ok(Response::new(RepoList { repos }))
}
async fn register(
&self,
req: Request<RegisterRepoRequest>,
) -> Result<Response<RegisterRepoResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let req = req.into_inner();
if req.name.is_empty() {
return Err(invalid_arg("`name` must not be empty"));
}
if req.name.contains(['.', '/', '\\', '[', ']']) {
return Err(invalid_arg(
"`name` must be a bare identifier (no '.', '/', '\\\\', '[', ']')",
));
}
let mut l = wctx.loaded.lock().await;
let cfg_path = l.config_path.clone();
let existing = std::fs::read_to_string(&cfg_path).ok().unwrap_or_default();
let mut doc: toml_edit::DocumentMut = existing
.parse()
.map_err(|e| internal(format!("parse {}: {e}", cfg_path.display())))?;
let repo_parent = doc
.entry("repo")
.or_insert_with(|| {
let mut t = toml_edit::Table::new();
t.set_implicit(true);
toml_edit::Item::Table(t)
})
.as_table_mut()
.ok_or_else(|| Status::failed_precondition("`repo` exists but is not a table"))?;
repo_parent.set_implicit(true);
let entry = repo_parent
.entry(&req.name)
.or_insert_with(|| toml_edit::Item::Table(toml_edit::Table::new()));
let tbl = entry.as_table_mut().ok_or_else(|| {
Status::failed_precondition(format!("`repo.{}` exists but is not a table", req.name))
})?;
if !req.remote.is_empty() { tbl.insert("remote", toml_edit::value(req.remote.as_str())); }
if !req.history.is_empty() { tbl.insert("history", toml_edit::value(req.history.as_str())); }
if !req.readme.is_empty() { tbl.insert("readme", toml_edit::value(req.readme.as_str())); }
if !req.path.is_empty() { tbl.insert("path", toml_edit::value(req.path.as_str())); }
if let Some(parent) = cfg_path.parent() {
std::fs::create_dir_all(parent).map_err(internal)?;
}
std::fs::write(&cfg_path, doc.to_string()).map_err(|e| {
if e.kind() == std::io::ErrorKind::PermissionDenied {
Status::failed_precondition(format!(
"{} is read-only (likely guard-locked); call Guard.Release first",
cfg_path.display(),
))
} else {
internal(e)
}
})?;
let repo = config::Repo {
remote: req.remote.clone(),
history: req.history.clone(),
readme: req.readme.clone(),
publish_order: Vec::new(),
gates: config::Gates::default(),
bench: config::BenchSpec::default(),
};
l.nornir.repo.insert(req.name.clone(), repo);
Ok(Response::new(RegisterRepoResponse {
name: req.name,
config_path: cfg_path.display().to_string(),
repo_count: l.nornir.repo.len() as u32,
}))
}
async fn unregister(
&self,
req: Request<UnregisterRepoRequest>,
) -> Result<Response<UnregisterRepoResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let req = req.into_inner();
let mut l = wctx.loaded.lock().await;
let cfg_path = l.config_path.clone();
let removed_mem = l.nornir.repo.remove(&req.name).is_some();
let removed_disk = if cfg_path.exists() {
let text = std::fs::read_to_string(&cfg_path).map_err(internal)?;
let mut doc: toml_edit::DocumentMut = text
.parse()
.map_err(|e| internal(format!("parse {}: {e}", cfg_path.display())))?;
let removed = doc
.get_mut("repo")
.and_then(|i| i.as_table_mut())
.and_then(|t| t.remove(&req.name))
.is_some();
if removed {
std::fs::write(&cfg_path, doc.to_string()).map_err(internal)?;
}
removed
} else {
false
};
if !removed_mem && !removed_disk {
return Err(not_found(format!("repo `{}` not found", req.name)));
}
Ok(Response::new(UnregisterRepoResponse {
name: req.name,
repo_count: l.nornir.repo.len() as u32,
}))
}
async fn reload(&self, req: Request<Empty>) -> Result<Response<ReloadResponse>, Status> {
let mut l = self.state.ws(req.metadata())?.loaded.lock().await;
let cfg_path = l.config_path.clone();
if !cfg_path.exists() {
return Err(not_found(format!("{} does not exist", cfg_path.display())));
}
let fresh = config::load_explicit(&cfg_path).map_err(internal)?;
*l = fresh;
Ok(Response::new(ReloadResponse {
config_path: cfg_path.display().to_string(),
repo_count: l.nornir.repo.len() as u32,
}))
}
}
struct GuardSvc { state: Shared }
fn to_pb(paths: Vec<guard::PathStatus>) -> GuardReport {
let changed_count = paths.iter().filter(|p| p.changed).count() as u32;
let paths = paths.into_iter().map(|p| GuardPath {
path: p.path.display().to_string(),
exists: p.exists,
writable: p.writable,
changed: p.changed,
}).collect();
GuardReport { paths, changed_count }
}
#[tonic::async_trait]
impl GuardSvcTrait for GuardSvc {
async fn status(&self, req: Request<Empty>) -> Result<Response<GuardReport>, Status> {
let l = self.state.ws(req.metadata())?.loaded.lock().await;
let report = guard::status(&l.workspace_root, &l.nornir.guard.forbidden);
Ok(Response::new(to_pb(report)))
}
async fn apply(&self, req: Request<Empty>) -> Result<Response<GuardReport>, Status> {
let l = self.state.ws(req.metadata())?.loaded.lock().await;
let report = guard::apply(&l.workspace_root, &l.nornir.guard.forbidden).map_err(internal)?;
Ok(Response::new(to_pb(report)))
}
async fn release(&self, req: Request<Empty>) -> Result<Response<GuardReport>, Status> {
let l = self.state.ws(req.metadata())?.loaded.lock().await;
let report = guard::release(&l.workspace_root, &l.nornir.guard.forbidden).map_err(internal)?;
Ok(Response::new(to_pb(report)))
}
async fn verify(&self, req: Request<Empty>) -> Result<Response<pb::GuardVerifyResponse>, Status> {
let root = self.state.ws(req.metadata())?.loaded.lock().await.workspace_root.clone();
let recorded = guard::read_manifest(&root).map_err(internal)?;
let report = guard::verify(&root, &recorded);
let intact = report.iter().all(|v| v.ok());
let paths_json = serde_json::to_string(&report).map_err(internal)?;
Ok(Response::new(pb::GuardVerifyResponse {
intact,
recorded_at: recorded.recorded_at.to_rfc3339(),
paths_json,
}))
}
}
struct ReleaseSvc { state: Shared }
fn last_run_for(loaded: &Loaded, name: &str) -> anyhow::Result<bench::BenchRun> {
let repo = loaded
.nornir
.repo
.get(name)
.ok_or_else(|| anyhow!("repo `{name}` not configured"))?;
let repo_root = config::Nornir::repo_dir(&loaded.workspace_root, name);
let path = repo_root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
let runs = bench::history::read_all(&path)?;
runs.into_iter()
.last()
.ok_or_else(|| anyhow!("no bench runs in {}", path.display()))
}
fn history_for(loaded: &Loaded, name: &str) -> Vec<bench::BenchRun> {
let Some(repo) = loaded.nornir.repo.get(name) else { return Vec::new() };
let repo_root = config::Nornir::repo_dir(&loaded.workspace_root, name);
let path = repo_root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
bench::history::read_all(&path).unwrap_or_default()
}
fn repo_root_for(loaded: &Loaded, name: &str) -> Result<PathBuf, Status> {
if !loaded.nornir.repo.contains_key(name) {
return Err(not_found(format!("repo `{name}` not configured")));
}
Ok(config::Nornir::repo_dir(&loaded.workspace_root, name))
}
fn gate_pass(name: &str) -> GateResult {
GateResult {
gate: name.into(),
status: "pass".into(),
message: String::new(),
version: String::new(),
max_drop_pct: 0.0,
}
}
fn gate_fail<E: std::fmt::Display>(name: &str, e: E) -> GateResult {
GateResult {
gate: name.into(),
status: "fail".into(),
message: format!("{e:#}"),
version: String::new(),
max_drop_pct: 0.0,
}
}
#[tonic::async_trait]
impl ReleaseSvcTrait for ReleaseSvc {
async fn gate_path_patches(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
Ok(Response::new(match release::gate::no_path_patches(&root) {
Ok(()) => gate_pass("no_path_patches"),
Err(e) => gate_fail("no_path_patches", e),
}))
}
async fn gate_nexus_floor(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let _ = repo_root_for(&l, &name)?;
let run = match last_run_for(&l, &name) {
Ok(r) => r,
Err(e) => return Ok(Response::new(gate_fail("nexus_floor", e))),
};
let mut res = match release::gate::nexus_floor(&run) {
Ok(()) => gate_pass("nexus_floor"),
Err(e) => gate_fail("nexus_floor", e),
};
res.version = run.version;
Ok(Response::new(res))
}
async fn gate_no_regression(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let repo = l.nornir.repo.get(&name).unwrap();
let pct = if repo.gates.max_regression_pct > 0.0 {
repo.gates.max_regression_pct
} else {
10.0
};
let hp = root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
let run = match last_run_for(&l, &name) {
Ok(r) => r,
Err(e) => return Ok(Response::new(gate_fail("no_regression", e))),
};
let mut res = match release::gate::no_regression(&run, &hp, pct) {
Ok(()) => gate_pass("no_regression"),
Err(e) => gate_fail("no_regression", e),
};
res.version = run.version;
res.max_drop_pct = pct;
Ok(Response::new(res))
}
async fn gate_docs_fresh(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let run = match last_run_for(&l, &name) {
Ok(r) => r,
Err(e) => return Ok(Response::new(gate_fail("docs_fresh", e))),
};
let layout = docs::RepoLayout::new(&root);
let history = history_for(&l, &name);
let ctx = docs::Ctx::new(&root, &l.workspace_root, Some(&run)).with_history(&history);
Ok(Response::new(match docs::render_check_all(&layout, &ctx) {
Ok(_) => gate_pass("docs_fresh"),
Err(e) => gate_fail("docs_fresh", e),
}))
}
async fn gate_all(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateAllResult>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let repo = l.nornir.repo.get(&name).unwrap().clone();
let g = &repo.gates;
let mut passed: Vec<String> = Vec::new();
let mut failed: Vec<GateFailure> = Vec::new();
let push = |passed: &mut Vec<String>,
failed: &mut Vec<GateFailure>,
n: &str,
r: anyhow::Result<()>| match r {
Ok(()) => passed.push(n.into()),
Err(e) => failed.push(GateFailure {
name: n.into(),
error: format!("{e:#}"),
}),
};
if g.no_path_patches {
push(&mut passed, &mut failed, "no_path_patches",
release::gate::no_path_patches(&root));
}
let last = last_run_for(&l, &name);
if g.nexus_floor {
let r = last.as_ref()
.map_err(|e| anyhow!("{e:#}"))
.and_then(|r| release::gate::nexus_floor(r));
push(&mut passed, &mut failed, "nexus_floor", r);
}
if g.no_regression {
let pct = if g.max_regression_pct > 0.0 { g.max_regression_pct } else { 10.0 };
let hp = root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
let r = last.as_ref()
.map_err(|e| anyhow!("{e:#}"))
.and_then(|r| release::gate::no_regression(r, &hp, pct));
push(&mut passed, &mut failed, "no_regression", r);
}
if !g.integration_roundtrip.is_empty() {
let kinds: Vec<&str> = g.integration_roundtrip.iter().map(|s| s.as_str()).collect();
push(&mut passed, &mut failed, "integration_roundtrip",
release::gate::integration_roundtrip_via_cargo_test(&root, &kinds));
}
if g.docs_fresh {
let ws_root = l.workspace_root.clone();
let r: anyhow::Result<()> = (|| {
let run = last.as_ref().map_err(|e| anyhow!("{e:#}"))?;
let layout = docs::RepoLayout::new(&root);
let history = history_for(&l, &name);
let ctx = docs::Ctx::new(&root, &ws_root, Some(run)).with_history(&history);
docs::render_check_all(&layout, &ctx).map(|_| ())
})();
push(&mut passed, &mut failed, "docs_fresh", r);
}
Ok(Response::new(GateAllResult { repo: name, passed, failed }))
}
async fn order(
&self,
req: Request<OrderRequest>,
) -> Result<Response<OrderResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let mut ws = req.into_inner().workspace;
if ws.is_empty() {
ws = "workspace_holger".into();
}
let (selected, wh_clone) = {
let l = wctx.loaded.lock().await;
(
l.nornir.repo.keys().cloned().collect::<Vec<_>>(),
wctx.warehouse.clone(),
)
};
let ws_for_q = ws.clone();
let snapshots = tokio::task::spawn_blocking(move || {
wh_clone.block_on(query_dep_graph_snapshots(&wh_clone, &ws_for_q, None))
})
.await
.map_err(internal)?
.map_err(internal)?;
let latest = snapshots.into_iter().last();
match latest {
Some(snap) => {
let order = topo_order_from_edges(&selected, &snap.edges);
Ok(Response::new(OrderResponse {
order,
source: "iceberg".into(),
snapshot_id: snap.snapshot_id.to_string(),
}))
}
None => Ok(Response::new(OrderResponse {
order: selected,
source: "fallback".into(),
snapshot_id: String::new(),
})),
}
}
async fn trace(&self, req: Request<pb::TraceQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let q = req.into_inner();
let graph = wctx.graph().await.ok();
let trace = nornir::release::regression::trace_gate_async(
&wctx.warehouse,
&q.workspace,
&q.repo,
graph.as_ref().map(|g| &g.graph),
)
.await
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json: serde_json::to_string(&trace).map_err(internal)? }))
}
type ProgressStream = std::pin::Pin<Box<
dyn tokio_stream::Stream<Item = Result<ProgressEvent, Status>> + Send + 'static,
>>;
async fn progress(
&self,
_req: Request<Empty>,
) -> Result<Response<Self::ProgressStream>, Status> {
let log_dir = {
let loaded = self.state.loaded.lock().await;
let wh = loaded.warehouse_root();
wh.parent()
.map(|p| p.join("logs"))
.unwrap_or_else(|| wh.join("logs"))
};
let (tx, rx) = tokio::sync::mpsc::channel::<Result<ProgressEvent, Status>>(64);
tokio::spawn(async move {
if let Err(e) = tail_progress(log_dir, tx.clone()).await {
let _ = tx.send(Err(internal(e))).await;
}
});
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream) as Self::ProgressStream))
}
}
async fn latest_log(log_dir: &std::path::Path) -> Option<PathBuf> {
let mut best: Option<(std::time::SystemTime, PathBuf)> = None;
let mut rd = tokio::fs::read_dir(log_dir).await.ok()?;
while let Ok(Some(entry)) = rd.next_entry().await {
let name = entry.file_name();
let n = name.to_string_lossy();
if !(n.starts_with("release-run-") && n.ends_with(".events.ndjson")) {
continue;
}
if let Ok(meta) = entry.metadata().await {
if let Ok(mt) = meta.modified() {
match &best {
Some((t, _)) if *t >= mt => {}
_ => best = Some((mt, entry.path())),
}
}
}
}
best.map(|(_, p)| p)
}
async fn tail_progress(
log_dir: PathBuf,
tx: tokio::sync::mpsc::Sender<Result<ProgressEvent, Status>>,
) -> anyhow::Result<()> {
use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
let path = loop {
if let Some(p) = latest_log(&log_dir).await {
break p;
}
if tx.is_closed() {
return Ok(());
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
};
let mut f = tokio::fs::File::open(&path).await?;
f.seek(std::io::SeekFrom::Start(0)).await?;
let mut reader = tokio::io::BufReader::new(f);
let mut buf = String::new();
loop {
buf.clear();
let n = reader.read_line(&mut buf).await?;
if n == 0 {
if tx.is_closed() {
return Ok(());
}
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
continue;
}
let line = buf.trim();
if line.is_empty() {
continue;
}
match serde_json::from_str::<release::progress::ReleaseEvent>(line) {
Ok(ev) => {
let is_end = matches!(ev, release::progress::ReleaseEvent::RunEnd { .. });
let pb_ev = release_event_to_pb(ev);
if tx.send(Ok(pb_ev)).await.is_err() {
return Ok(());
}
if is_end {
return Ok(());
}
}
Err(_) => continue,
}
}
}
fn release_event_to_pb(ev: release::progress::ReleaseEvent) -> ProgressEvent {
use pb::progress_event::Kind as K;
use release::progress::ReleaseEvent as R;
let kind = match ev {
R::RunStart { run_id, workspace, .. } => K::RunStart(pb::RunStart { run_id, workspace }),
R::RepoStart { repo, sha, .. } => K::RepoStart(pb::RepoStart { repo, sha }),
R::PhaseStart { repo, phase, .. } => K::PhaseStart(pb::PhaseStart { repo, phase }),
R::PhaseEnd { repo, phase, ok, duration_ms, .. } => {
K::PhaseEnd(pb::PhaseEnd { repo, phase, ok, duration_ms })
}
R::BinaryStart { repo, binary, .. } => K::BinaryStart(pb::BinaryStart { repo, binary }),
R::TestPass { repo, binary, name, .. } => K::TestPass(pb::TestPass { repo, binary, name }),
R::TestFail { repo, binary, name, .. } => K::TestFail(pb::TestFail { repo, binary, name }),
R::BinaryDone { repo, binary, passed, failed, .. } => {
K::BinaryDone(pb::BinaryDone { repo, binary, passed, failed })
}
R::RepoEnd { repo, ok, .. } => K::RepoEnd(pb::RepoEnd { repo, ok }),
R::RunEnd { run_id, ok, .. } => K::RunEnd(pb::RunEnd { run_id, ok }),
};
ProgressEvent { kind: Some(kind) }
}
struct BenchSvc { state: Shared }
fn pb_bench_run_from(run: &bench::BenchRun) -> pb::BenchRun {
let results = run
.results
.iter()
.map(|r| pb::BenchResult {
name: r.name.clone(),
metrics: r
.metrics
.iter()
.filter_map(|(k, v)| v.as_f64().map(|f| Kvf { key: k.clone(), value: f }))
.collect(),
})
.collect();
let tests = run
.tests
.iter()
.map(|t| pb::TestOutcome {
name: t.name.clone(),
passed: t.passed,
duration_ms: t.duration_ms.unwrap_or(0.0),
has_duration: t.duration_ms.is_some(),
message: t.message.clone().unwrap_or_default(),
})
.collect();
pb::BenchRun {
date: run.date.clone(),
timestamp: run.timestamp.clone().unwrap_or_default(),
version: run.version.clone(),
machine: run.machine.clone(),
cores: run.cores,
results,
tests,
}
}
fn bench_run_from_pb(p: pb::BenchRun) -> bench::BenchRun {
let results = p
.results
.into_iter()
.map(|r| {
let mut map = serde_json::Map::new();
for kv in r.metrics {
if let Some(num) = serde_json::Number::from_f64(kv.value) {
map.insert(kv.key, serde_json::Value::Number(num));
}
}
bench::BenchResult { name: r.name, metrics: map }
})
.collect();
let tests = p
.tests
.into_iter()
.map(|t| bench::TestOutcome {
name: t.name,
passed: t.passed,
duration_ms: t.has_duration.then_some(t.duration_ms),
message: if t.message.is_empty() { None } else { Some(t.message) },
})
.collect();
bench::BenchRun {
date: p.date,
timestamp: if p.timestamp.is_empty() { None } else { Some(p.timestamp) },
version: p.version,
machine: p.machine,
cores: p.cores,
results,
tests,
}
}
#[tonic::async_trait]
impl BenchSvcTrait for BenchSvc {
async fn history(
&self,
req: Request<RepoOnly>,
) -> Result<Response<BenchHistoryResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let repo = l
.nornir
.repo
.get(&name)
.ok_or_else(|| not_found(format!("repo `{name}` not configured")))?;
let repo_root = config::Nornir::repo_dir(&l.workspace_root, &name);
let path = repo_root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
let runs = if path.exists() {
bench::history::read_all(&path).map_err(internal)?
} else {
Vec::new()
};
Ok(Response::new(BenchHistoryResponse {
repo: name,
runs: runs.iter().map(pb_bench_run_from).collect(),
}))
}
async fn submit(
&self,
req: Request<SubmitBenchRequest>,
) -> Result<Response<SubmitBenchResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let req = req.into_inner();
let pb_run = req.run.ok_or_else(|| invalid_arg("`run` is required"))?;
let run = bench_run_from_pb(pb_run);
if run.machine.is_empty() {
return Err(invalid_arg("BenchRun.machine must not be empty"));
}
let l = wctx.loaded.lock().await;
let repo = l
.nornir
.repo
.get(&req.repo)
.ok_or_else(|| not_found(format!("repo `{}` not configured", req.repo)))?;
let repo_root = config::Nornir::repo_dir(&l.workspace_root, &req.repo);
let path = repo_root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
drop(l); let run_id = wctx
.warehouse
.append_bench_run_async(&req.repo, &run)
.await
.map_err(internal)?;
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(internal)?;
}
bench::history::append(&path, &run).map_err(internal)?;
Ok(Response::new(SubmitBenchResponse { run_id: run_id.to_string() }))
}
}
struct SearchSvc { state: Shared }
#[tonic::async_trait]
impl SearchSvcTrait for SearchSvc {
async fn query(
&self,
req: Request<SearchRequest>,
) -> Result<Response<SearchResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let req = req.into_inner();
let corpus = if req.corpus.is_empty() {
None
} else {
Some(
index::Corpus::parse(&req.corpus)
.ok_or_else(|| invalid_arg(format!("unknown corpus `{}`", req.corpus)))?,
)
};
let repo_opt = if req.repo.is_empty() { None } else { Some(req.repo.clone()) };
let limit = if req.limit == 0 { 20 } else { req.limit as usize };
let idx = wctx.index.clone();
let q = req.query.clone();
let hits = tokio::task::spawn_blocking(move || {
idx.search(&q, corpus, repo_opt.as_deref(), limit)
})
.await
.map_err(internal)?
.map_err(internal)?;
let hits = hits
.into_iter()
.map(|h| SearchHit {
id: String::new(),
corpus: h.corpus,
repo: h.repo,
path: h.path,
score: h.score,
snippet: h.snippet,
title: h.title,
})
.collect();
Ok(Response::new(SearchResponse { hits }))
}
}
struct IndexSvc { state: Shared }
#[tonic::async_trait]
impl IndexSvcTrait for IndexSvc {
async fn stats(&self, req: Request<Empty>) -> Result<Response<IndexStatsResponse>, Status> {
let idx = self.state.ws(req.metadata())?.index.clone();
let stats = tokio::task::spawn_blocking(move || idx.stats())
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(IndexStatsResponse {
total: stats.total,
by_corpus: stats
.by_corpus
.into_iter()
.map(|(k, v)| Kv { key: k, value: v.to_string() })
.collect(),
}))
}
async fn snapshot(
&self,
req: Request<SnapshotRequest>,
) -> Result<Response<SnapshotResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let req = req.into_inner();
if req.git_sha.is_empty() {
return Err(invalid_arg("`git_sha` is required"));
}
let workspace = if req.workspace.is_empty() { "workspace_holger".into() } else { req.workspace };
let repo = if req.repo.is_empty() { "_workspace".into() } else { req.repo };
let branch = if req.branch.is_empty() { "unknown".into() } else { req.branch };
let index_dir = wctx.index.index_dir().to_path_buf();
let wh = wctx.warehouse.clone();
let snap = tokio::task::spawn_blocking(move || {
nornir::index::snapshot::snapshot_to_iceberg(&wh, &workspace, &repo, &req.git_sha, &branch, &index_dir)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(snapshot_ref_to_pb(snap)))
}
async fn upload_snapshot(
&self,
req: Request<tonic::Streaming<IndexBlobFrame>>,
) -> Result<Response<SnapshotResponse>, Status> {
use pb::index_blob_frame::Body;
let wctx = self.state.ws(req.metadata())?.clone();
let mut stream = req.into_inner();
let tmp_root = std::env::temp_dir()
.join(format!("nornir-upload-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&tmp_root).map_err(internal)?;
struct TmpDir(PathBuf);
impl Drop for TmpDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.0);
}
}
let _guard = TmpDir(tmp_root.clone());
let root = tmp_root;
let mut meta: Option<pb::IndexBlobMeta> = None;
let mut cur_file: Option<(PathBuf, u64, std::fs::File, u64)> = None;
while let Some(frame) = stream.message().await? {
match frame.body {
Some(Body::Meta(m)) => {
if meta.is_some() {
return Err(invalid_arg("meta frame seen twice"));
}
meta = Some(m);
}
Some(Body::File(f)) => {
if meta.is_none() {
return Err(invalid_arg("first frame must carry `meta`"));
}
if let Some((p, declared, _, written)) = cur_file.take() {
if written != declared {
return Err(invalid_arg(format!(
"file {} declared {} bytes, got {}", p.display(), declared, written
)));
}
}
if f.rel_path.is_empty() || f.rel_path.contains("..") {
return Err(invalid_arg("rel_path must be non-empty and not contain `..`"));
}
let path = root.join(&f.rel_path);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(internal)?;
}
let fh = std::fs::File::create(&path).map_err(internal)?;
cur_file = Some((path, f.size_bytes, fh, 0));
}
Some(Body::Chunk(bytes)) => {
let Some((_, _, fh, written)) = cur_file.as_mut() else {
return Err(invalid_arg("chunk before file"));
};
use std::io::Write;
fh.write_all(&bytes).map_err(internal)?;
*written += bytes.len() as u64;
}
None => return Err(invalid_arg("empty frame")),
}
}
if let Some((p, declared, _, written)) = cur_file.take() {
if written != declared {
return Err(invalid_arg(format!(
"file {} declared {} bytes, got {}", p.display(), declared, written
)));
}
}
let meta = meta.ok_or_else(|| invalid_arg("stream had no `meta` frame"))?;
if meta.git_sha.is_empty() {
return Err(invalid_arg("meta.git_sha is required"));
}
let workspace = if meta.workspace.is_empty() { "workspace_holger".into() } else { meta.workspace };
let repo = if meta.repo.is_empty() { "_workspace".into() } else { meta.repo };
let branch = if meta.branch.is_empty() { "unknown".into() } else { meta.branch };
let wh = wctx.warehouse.clone();
let snap = tokio::task::spawn_blocking(move || {
nornir::index::snapshot::snapshot_to_iceberg(&wh, &workspace, &repo, &meta.git_sha, &branch, &root)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(snapshot_ref_to_pb(snap)))
}
}
fn snapshot_ref_to_pb(s: nornir::index::snapshot::SnapshotRef) -> SnapshotResponse {
SnapshotResponse {
snapshot_id: s.snapshot_id.to_string(),
workspace: s.workspace,
repo: s.repo,
git_sha: s.git_sha,
branch: s.branch,
schema_hash: s.schema_hash,
blob_count: s.blob_count as u64,
total_bytes: s.total_bytes as u64,
}
}
struct IntrospectSvc { state: Shared }
fn resolve_binary(workspace_root: &std::path::Path, binary: &str) -> PathBuf {
let p = PathBuf::from(binary);
if p.is_absolute() { p } else { workspace_root.join(p) }
}
fn sym_to_pb(s: &introspect::artifact::Symbol) -> PbSymbol {
PbSymbol {
name: s.name.clone(),
name_demangled: s.name_demangled.clone(),
name_mangled: s.name_mangled.clone(),
file: s.file.clone(),
line: s.line.unwrap_or(0),
size_bytes: s.size_bytes.unwrap_or(0),
krate: s.krate.clone(),
}
}
#[tonic::async_trait]
impl IntrospectSvcTrait for IntrospectSvc {
async fn symbols(&self, req: Request<BinaryOnly>) -> Result<Response<SymbolList>, Status> {
let ws = self.state.ws(req.metadata())?.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &req.into_inner().binary);
let ws2 = ws.clone();
let syms = tokio::task::spawn_blocking(move || {
introspect::artifact::extract_symbols(&bin, &ws2)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(SymbolList { symbols: syms.iter().map(sym_to_pb).collect() }))
}
async fn symbol_lookup(
&self,
req: Request<SymbolLookupRequest>,
) -> Result<Response<SymbolList>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let ws = wctx.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &r.binary);
let ws2 = ws.clone();
let limit = if r.limit == 0 { 50 } else { r.limit as usize };
let pat = r.pattern;
let syms = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<introspect::artifact::Symbol>> {
let all = introspect::artifact::extract_symbols(&bin, &ws2)?;
Ok(introspect::artifact::lookup(&all, &pat)
.into_iter()
.take(limit)
.cloned()
.collect())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(SymbolList { symbols: syms.iter().map(sym_to_pb).collect() }))
}
async fn defined_in(
&self,
req: Request<DefinedInRequest>,
) -> Result<Response<SymbolList>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let ws = wctx.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &r.binary);
let ws2 = ws.clone();
let suffix = r.suffix;
let syms = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<introspect::artifact::Symbol>> {
let all = introspect::artifact::extract_symbols(&bin, &ws2)?;
Ok(introspect::artifact::defined_in(&all, &suffix)
.into_iter()
.cloned()
.collect())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(SymbolList { symbols: syms.iter().map(sym_to_pb).collect() }))
}
async fn callers(&self, req: Request<CallQuery>) -> Result<Response<NameList>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
Ok(Response::new(NameList { names: callgraph_query(&wctx, req.into_inner(), true).await? }))
}
async fn callees(&self, req: Request<CallQuery>) -> Result<Response<NameList>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
Ok(Response::new(NameList { names: callgraph_query(&wctx, req.into_inner(), false).await? }))
}
async fn path_between(
&self,
req: Request<PathBetweenRequest>,
) -> Result<Response<NameList>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let ws = wctx.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &r.binary);
let ws2 = ws.clone();
let names = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<String>> {
let edges = introspect::callgraph_dwarf::extract_callgraph(&bin, &ws2)?;
let cg = introspect::callgraph_dwarf::Callgraph::from_edges(&edges);
Ok(cg.path_between(&r.from, &r.to).unwrap_or_default())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(NameList { names }))
}
}
async fn callgraph_query(wctx: &WorkspaceCtx, r: CallQuery, callers: bool) -> Result<Vec<String>, Status> {
let ws = wctx.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &r.binary);
let ws2 = ws.clone();
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<String>> {
let edges = introspect::callgraph_dwarf::extract_callgraph(&bin, &ws2)?;
let cg = introspect::callgraph_dwarf::Callgraph::from_edges(&edges);
Ok(if callers { cg.callers_of(&r.name) } else { cg.callees_of(&r.name) })
})
.await
.map_err(internal)?
.map_err(internal)
}
struct KnowledgeSvc { state: Shared }
fn ksym_to_pb(s: &knowledge::symbols::SymbolRow) -> KnowledgeSymbol {
KnowledgeSymbol {
crate_name: s.crate_name.clone(),
module_path: s.module_path.clone(),
item_kind: s.item_kind.clone(),
item_name: s.item_name.clone(),
visibility: s.visibility.clone(),
file: s.file.clone(),
line: s.line,
doc_lines: s.doc_lines,
signature: s.signature.clone().unwrap_or_default(),
}
}
fn kcall_to_pb(c: &knowledge::symbols::CallEdgeRow) -> KnowledgeCall {
KnowledgeCall {
crate_name: c.crate_name.clone(),
caller_path: c.caller_path.clone(),
callee_ident: c.callee_ident.clone(),
call_kind: c.call_kind.clone(),
file: c.file.clone(),
line: c.line,
}
}
#[tonic::async_trait]
impl KnowledgeSvcTrait for KnowledgeSvc {
async fn symbol_lookup(
&self,
req: Request<KnowledgeSymbolQuery>,
) -> Result<Response<KnowledgeSymbols>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let wh = wctx.warehouse.clone();
let limit = if r.limit == 0 { 50 } else { r.limit as usize };
let symbols = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<KnowledgeSymbol>> {
let view = knowledge::query::load_latest(&wh, &r.repo)?;
Ok(view.symbol_lookup(&r.arg, limit).iter().map(|s| ksym_to_pb(s)).collect())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(KnowledgeSymbols { symbols }))
}
async fn defined_in(
&self,
req: Request<KnowledgeSymbolQuery>,
) -> Result<Response<KnowledgeSymbols>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let wh = wctx.warehouse.clone();
let limit = if r.limit == 0 { 100 } else { r.limit as usize };
let symbols = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<KnowledgeSymbol>> {
let view = knowledge::query::load_latest(&wh, &r.repo)?;
Ok(view.defined_in(&r.arg).iter().take(limit).map(|s| ksym_to_pb(s)).collect())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(KnowledgeSymbols { symbols }))
}
async fn callers(
&self,
req: Request<KnowledgeCallQuery>,
) -> Result<Response<KnowledgeCalls>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
Ok(Response::new(KnowledgeCalls {
calls: knowledge_call_query(&wctx, req.into_inner(), true).await?,
}))
}
async fn callees(
&self,
req: Request<KnowledgeCallQuery>,
) -> Result<Response<KnowledgeCalls>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
Ok(Response::new(KnowledgeCalls {
calls: knowledge_call_query(&wctx, req.into_inner(), false).await?,
}))
}
async fn call_path(
&self,
req: Request<KnowledgeCallPathQuery>,
) -> Result<Response<NameList>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let wh = wctx.warehouse.clone();
let names = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<String>> {
let view = knowledge::query::load_latest(&wh, &r.repo)?;
Ok(view.call_path(&r.from, &r.to).unwrap_or_default())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(NameList { names }))
}
}
async fn knowledge_call_query(
wctx: &WorkspaceCtx,
r: KnowledgeCallQuery,
callers: bool,
) -> Result<Vec<KnowledgeCall>, Status> {
let wh = wctx.warehouse.clone();
let limit = if r.limit == 0 { 100 } else { r.limit as usize };
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<KnowledgeCall>> {
let view = knowledge::query::load_latest(&wh, &r.repo)?;
let hits = if callers { view.callers_of(&r.name) } else { view.callees_of(&r.name) };
Ok(hits.iter().take(limit).map(|c| kcall_to_pb(c)).collect())
})
.await
.map_err(internal)?
.map_err(internal)
}
struct DocsSvc { state: Shared }
#[tonic::async_trait]
impl DocsSvcTrait for DocsSvc {
async fn init(&self, req: Request<RepoOnly>) -> Result<Response<DocsResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let layout = docs::RepoLayout::new(&root);
let srcs = docs::init_repo(&layout).map_err(internal)?;
Ok(Response::new(DocsResponse {
repo: name,
status: "ok".into(),
artifacts: srcs.into_iter().map(|p| p.display().to_string()).collect(),
detail: layout.nornir_dir().display().to_string(),
}))
}
async fn render(&self, req: Request<RepoOnly>) -> Result<Response<DocsResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let layout = docs::RepoLayout::new(&root);
let last = last_run_for(&l, &name).ok();
let history = history_for(&l, &name);
let ctx = docs::Ctx::new(&root, &l.workspace_root, last.as_ref()).with_history(&history);
let reports = docs::render_all(&layout, &ctx).map_err(internal)?;
Ok(Response::new(DocsResponse {
repo: name,
status: "ok".into(),
artifacts: reports.iter().map(|r| r.output.display().to_string()).collect(),
detail: format!("{} doc(s) rendered", reports.len()),
}))
}
async fn check(&self, req: Request<RepoOnly>) -> Result<Response<DocsResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let name = req.into_inner().repo;
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let layout = docs::RepoLayout::new(&root);
let last = last_run_for(&l, &name).ok();
let history = history_for(&l, &name);
let ctx = docs::Ctx::new(&root, &l.workspace_root, last.as_ref()).with_history(&history);
let (status, detail) = match docs::render_check_all(&layout, &ctx) {
Ok(_) => ("ok".to_string(), String::new()),
Err(e) => ("drift".to_string(), format!("{e:#}")),
};
Ok(Response::new(DocsResponse {
repo: name,
status,
artifacts: Vec::new(),
detail,
}))
}
async fn history(
&self,
req: Request<DocsHistoryRequest>,
) -> Result<Response<DocsHistoryResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let repo = r.repo.clone();
let filter = docs::ExportFilter {
doc_name: if r.doc.is_empty() { None } else { Some(r.doc) },
version: if r.version.is_empty() { None } else { Some(r.version) },
format: if r.format.is_empty() { None } else { Some(r.format) },
limit: Some(if r.limit == 0 { 50 } else { r.limit as usize }),
};
let wh = wctx.warehouse.clone();
let rows = tokio::task::spawn_blocking(move || docs::list_doc_exports(&wh, &repo, &filter))
.await
.map_err(internal)?
.map_err(internal)?;
let entries = rows
.into_iter()
.map(|e| PbDocExport {
doc: e.doc_name,
version: e.version,
format: e.format,
path: e.export_id,
exported_at: e.generated_at,
size_bytes: e.byte_len.max(0) as u64,
})
.collect();
Ok(Response::new(DocsHistoryResponse { entries }))
}
async fn export(
&self,
req: Request<pb::DocsExportRequest>,
) -> Result<Response<pb::DocsExportResponse>, Status> {
#[cfg(not(feature = "docs-export"))]
{
let _ = req;
Err(Status::unimplemented(
"nornir-server was built without the `docs-export` feature; \
rebuild with `--features server,docs-export` to use Docs.Export",
))
}
#[cfg(feature = "docs-export")]
{
self.render_export(req, "README").await
}
}
async fn book(
&self,
req: Request<pb::DocsExportRequest>,
) -> Result<Response<pb::DocsExportResponse>, Status> {
#[cfg(not(feature = "docs-export"))]
{
let _ = req;
Err(Status::unimplemented(
"nornir-server was built without the `docs-export` feature; \
rebuild with `--features server,docs-export` to use Docs.Book",
))
}
#[cfg(feature = "docs-export")]
{
self.render_export(req, "book").await
}
}
}
#[cfg(feature = "docs-export")]
impl DocsSvc {
async fn render_export(
&self,
req: Request<pb::DocsExportRequest>,
doc: &'static str,
) -> Result<Response<pb::DocsExportResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let r = req.into_inner();
let repo_name = r.repo.clone();
let fmt_str = if r.format.is_empty() { "pdf".to_string() } else { r.format.clone() };
let (root, workspace_root, last, history) = {
let l = wctx.loaded.lock().await;
let root = repo_root_for(&l, &repo_name)?;
let last = last_run_for(&l, &repo_name).ok();
let history = history_for(&l, &repo_name);
(root, l.workspace_root.clone(), last, history)
};
let wh = wctx.warehouse.clone();
let fmt_for_closure = fmt_str.clone();
let (out_path, nbytes, sources, record) =
tokio::task::spawn_blocking(move || -> anyhow::Result<_> {
let layout = docs::RepoLayout::new(&root);
let ctx = docs::Ctx::new(&root, &workspace_root, last.as_ref())
.with_history(&history);
docs::render_all(&layout, &ctx)?;
let format = docs::DocFormat::parse(&fmt_for_closure)?;
let ext = format.extension();
let (bytes, sources): (Vec<u8>, Vec<String>) = if doc == "book" {
let (bytes, srcs) = docs::build_book(&root, &ctx, format)?;
(bytes, srcs.iter().map(|p| p.display().to_string()).collect())
} else {
(docs::export_repo(&root, format)?, Vec::new())
};
let version = docs::resolve_version(&root);
let out_path = layout.export_path(doc, ext);
if let Some(parent) = out_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::write(&out_path, &bytes)?;
let workspace = workspace_root
.file_name()
.and_then(|x| x.to_str())
.unwrap_or("_workspace")
.to_string();
let git_sha =
nornir::gitio::head_sha(&root).unwrap_or_else(|_| "unknown".to_string());
let record = docs::record_doc_export(
&wh, &workspace, &repo_name, doc, &version, ext, &git_sha, &bytes,
)?;
Ok((out_path, bytes.len(), sources, record))
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::DocsExportResponse {
repo: r.repo,
format: fmt_str,
bytes: nbytes as u64,
out: out_path.display().to_string(),
sha256: record.sha256,
git_sha: record.git_sha,
export_id: record.export_id,
sources,
}))
}
}
struct MimirSvc { state: Shared }
impl MimirSvc {
async fn json<F>(&self, md: &tonic::metadata::MetadataMap, f: F) -> Result<Response<pb::JsonResponse>, Status>
where
F: FnOnce(&nornir::warehouse::dep_graph::WorkspaceGraph) -> anyhow::Result<serde_json::Value>,
{
let g = self.state.ws(md)?.graph().await?;
let value = f(&g.graph).map_err(internal)?;
let json = serde_json::to_string(&value).map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
}
#[tonic::async_trait]
impl MimirSvcTrait for MimirSvc {
async fn deps_of(&self, req: Request<pb::DepQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.json(&md, |g| nornir::mimir::deps_of(g, &q.repo, q.transitive)).await
}
async fn dependents_of(&self, req: Request<pb::DepQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.json(&md, |g| nornir::mimir::dependents_of(g, &q.repo, q.transitive)).await
}
async fn affected_by_change(&self, req: Request<pb::AffectedQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.json(&md, |g| nornir::mimir::affected_by_change(g, &q.repos)).await
}
async fn build_order(&self, req: Request<Empty>) -> Result<Response<pb::JsonResponse>, Status> {
let md = req.metadata().clone();
self.json(&md, nornir::mimir::build_order).await
}
async fn dep_path(&self, req: Request<pb::DepPathQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.json(&md, |g| nornir::mimir::dep_path(g, &q.from, &q.to)).await
}
async fn external_dep_users(&self, req: Request<pb::CrateQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.json(&md, |g| Ok(nornir::mimir::external_dep_users(g, &q.krate))).await
}
async fn mermaid(&self, req: Request<Empty>) -> Result<Response<pb::JsonResponse>, Status> {
let g = self.state.ws(req.metadata())?.graph().await?;
Ok(Response::new(pb::JsonResponse { json: nornir::mimir::mermaid(&g.graph) }))
}
async fn repo_overview(&self, req: Request<RepoOnly>) -> Result<Response<pb::JsonResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let repo = req.into_inner().repo;
let g = wctx.graph().await?;
let wh = wctx.warehouse.clone();
let value = tokio::task::spawn_blocking(move || nornir::mimir::repo_overview(&g.graph, &wh, &repo))
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json: serde_json::to_string(&value).map_err(internal)? }))
}
async fn security_scan(&self, req: Request<RepoOnly>) -> Result<Response<pb::JsonResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let repo = req.into_inner().repo;
let repo_dir = wctx.loaded.lock().await.workspace_root.join(&repo);
let wh = wctx.warehouse.clone();
let value = tokio::task::spawn_blocking(move || -> anyhow::Result<serde_json::Value> {
use std::collections::HashMap;
let (repo_name, comps) = nornir::security::components(&repo_dir)?;
let cache = wh.query_vuln_findings().unwrap_or_default();
let key = |c: &nornir::security::Component| (c.name.clone(), c.version.clone());
let now = chrono::Utc::now().timestamp_micros();
const TTL_MICROS: i64 = 7 * 24 * 3600 * 1_000_000;
let is_fresh = |c: &nornir::security::Component| {
cache.get(&key(c)).map(|(_, _, ts)| now - *ts < TTL_MICROS).unwrap_or(false)
};
let misses: Vec<nornir::security::Component> =
comps.iter().filter(|c| !is_fresh(c)).cloned().collect();
let fresh = nornir::security::query_vulns(&misses)?;
let fresh_map: HashMap<(String, String), &nornir::security::Vuln> =
fresh.iter().map(|v| ((v.crate_name.clone(), v.version.clone()), v)).collect();
let rows: Vec<nornir::warehouse::iceberg::VulnFinding> = misses
.iter()
.map(|c| {
let (ids, summary) =
fresh_map.get(&key(c)).map(|v| (v.ids.clone(), v.summary.clone())).unwrap_or_default();
nornir::warehouse::iceberg::VulnFinding {
crate_name: c.name.clone(),
version: c.version.clone(),
ids,
summary,
checked_at_micros: now,
}
})
.collect();
let _ = wh.append_vuln_findings(&rows);
let mut vulns = Vec::new();
for c in &comps {
let ids: Vec<String> = if is_fresh(c) {
cache.get(&key(c)).map(|(i, _, _)| i.clone()).unwrap_or_default()
} else {
fresh_map.get(&key(c)).map(|v| v.ids.clone()).unwrap_or_default()
};
if !ids.is_empty() {
vulns.push(serde_json::json!({"crate": c.name, "version": c.version, "ids": ids}));
}
}
let mut lic: std::collections::BTreeMap<String, usize> = Default::default();
for c in &comps {
*lic.entry(c.license.clone()).or_default() += 1;
}
let mut lic: Vec<_> = lic.into_iter().collect();
lic.sort_by(|a, b| b.1.cmp(&a.1).then(a.0.cmp(&b.0)));
Ok(serde_json::json!({
"repo": repo_name,
"components": comps.len(),
"vulns": vulns,
"licenses": lic.into_iter().take(12).map(|(l, n)| serde_json::json!([l, n])).collect::<Vec<_>>(),
"cache_hits": comps.len().saturating_sub(misses.len()),
"cache_misses": misses.len(),
}))
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json: serde_json::to_string(&value).map_err(internal)? }))
}
async fn changed_since_last_release(&self, req: Request<Empty>) -> Result<Response<pb::JsonResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let g = wctx.graph().await?;
let change = nornir::change::detect(&wctx.warehouse, &g.graph, &g.workspace_name)
.await
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json: serde_json::to_string(&change).map_err(internal)? }))
}
}
struct DwarfSvc { state: Shared }
impl DwarfSvc {
async fn query<F>(
&self,
md: &tonic::metadata::MetadataMap,
repo: String,
sha: String,
f: F,
) -> Result<Response<pb::JsonResponse>, Status>
where
F: FnOnce(&introspect::persist::DwarfFacts) -> anyhow::Result<String> + Send + 'static,
{
let wctx = self.state.ws(md)?.clone();
let wh = wctx.warehouse.clone();
let warehouse_root = wctx.loaded.lock().await.warehouse_root();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let into = warehouse_root
.parent()
.unwrap_or(warehouse_root.as_path())
.join("cache/dwarf")
.join(&repo);
let sha = if sha.is_empty() { None } else { Some(sha.as_str()) };
let facts = introspect::persist::load_dwarf(&wh, &repo, sha, &into)?;
f(&facts)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
}
#[tonic::async_trait]
impl DwarfSvcTrait for DwarfSvc {
async fn symbol_lookup(&self, req: Request<pb::DwarfQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
let limit = if q.limit == 0 { 50 } else { q.limit as usize };
self.query(&md, q.repo, q.sha, move |facts| {
let hits: Vec<_> = facts.lookup(&q.arg).into_iter().take(limit).collect();
Ok(serde_json::to_string(&hits)?)
}).await
}
async fn defined_in(&self, req: Request<pb::DwarfQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
let limit = if q.limit == 0 { 100 } else { q.limit as usize };
self.query(&md, q.repo, q.sha, move |facts| {
let hits: Vec<_> = facts.defined_in(&q.arg).into_iter().take(limit).collect();
Ok(serde_json::to_string(&hits)?)
}).await
}
async fn callers(&self, req: Request<pb::DwarfQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.query(&md, q.repo, q.sha, move |facts| Ok(serde_json::to_string(&facts.callers_of(&q.arg))?)).await
}
async fn callees(&self, req: Request<pb::DwarfQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.query(&md, q.repo, q.sha, move |facts| Ok(serde_json::to_string(&facts.callees_of(&q.arg))?)).await
}
async fn call_path(&self, req: Request<pb::DwarfPathQuery>) -> Result<Response<pb::JsonResponse>, Status> {
let (md, q) = (req.metadata().clone(), req.get_ref().clone());
self.query(&md, q.repo, q.sha, move |facts| Ok(serde_json::to_string(&facts.call_path(&q.from, &q.to))?)).await
}
}
struct VectorSvc {
#[cfg_attr(not(any(feature = "embed-tract", feature = "embed-ort")), allow(dead_code))]
state: Shared,
}
#[tonic::async_trait]
impl VectorSvcTrait for VectorSvc {
async fn search(
&self,
req: Request<pb::VectorSearchRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
#[cfg(not(any(feature = "embed-tract", feature = "embed-ort")))]
{
let _ = req;
Err(Status::unimplemented(
"nornir-server was built without an embedder; rebuild with \
`--features server,embed-tract` (CPU) or `--features server,embed-ort` (GPU) \
to use Vector.Search",
))
}
#[cfg(any(feature = "embed-tract", feature = "embed-ort"))]
{
let wctx = self.state.ws(req.metadata())?.clone();
let embedder = self.state.embedder().await?;
let r = req.into_inner();
let repo = r.repo.clone();
let query = r.query.clone();
let sha = r.sha.clone();
let limit = if r.limit == 0 { 10 } else { r.limit as usize };
let wh = wctx.warehouse.clone();
let hits = tokio::task::spawn_blocking(move || -> anyhow::Result<_> {
let mp = embedder.profile().id();
let q = embedder.embed(std::slice::from_ref(&query))?;
let sha = (!sha.is_empty()).then_some(sha.as_str());
nornir::vector::store::search(&wh, &repo, sha, &mp, &q[0], limit)
})
.await
.map_err(internal)?
.map_err(internal)?;
let out: Vec<_> = hits
.iter()
.map(|(score, o)| {
serde_json::json!({
"score": score,
"file": o.file,
"start_line": o.start_line,
"end_line": o.end_line,
})
})
.collect();
let json = serde_json::to_string(&serde_json::json!({ "repo": r.repo, "hits": out }))
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
}
}
struct FunnelSvc { state: Shared }
impl FunnelSvc {
async fn ctx(&self, md: &tonic::metadata::MetadataMap) -> Result<Arc<WorkspaceCtx>, Status> {
let wctx = self.state.ws(md)?.clone();
wctx.ensure_funnel(false).await?;
Ok(wctx)
}
}
#[tonic::async_trait]
impl FunnelSvcTrait for FunnelSvc {
async fn submit_idea(
&self,
req: Request<pb::SubmitIdeaRequest>,
) -> Result<Response<pb::IdRef>, Status> {
let wctx = self.ctx(req.metadata()).await?;
let r = req.into_inner();
let mut fg = wctx.funnel.lock().await;
let store = fg.as_mut().unwrap();
let id = IdeaId::seq(store.funnel.next_idea);
let source = if r.source.is_empty() { "grpc".into() } else { r.source };
store
.record_async(FunnelEvent::IdeaSubmitted {
id: id.clone(),
source,
text: r.text,
refs: Vec::new(),
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
Ok(Response::new(pb::IdRef { id: id.as_str().to_string() }))
}
async fn create_plan(
&self,
req: Request<pb::CreatePlanRequest>,
) -> Result<Response<pb::IdRef>, Status> {
let wctx = self.ctx(req.metadata()).await?;
let r = req.into_inner();
let mut fg = wctx.funnel.lock().await;
let store = fg.as_mut().unwrap();
let plan_id = PlanId::seq(store.funnel.next_plan);
store
.record_async(FunnelEvent::PlanCreated {
id: plan_id.clone(),
idea_id: IdeaId::new(r.idea_id),
summary: r.summary,
planner: "grpc".into(),
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
store
.record_async(FunnelEvent::PlanStatusChanged {
plan_id: plan_id.clone(),
status: PlanStatus::Active,
why: None,
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
Ok(Response::new(pb::IdRef { id: plan_id.as_str().to_string() }))
}
async fn add_node(
&self,
req: Request<pb::AddNodeRequest>,
) -> Result<Response<pb::IdRef>, Status> {
let wctx = self.ctx(req.metadata()).await?;
let r = req.into_inner();
let mut fg = wctx.funnel.lock().await;
let store = fg.as_mut().unwrap();
let plan_id = PlanId::new(r.plan_id);
let node_id = NodeId::seq(store.funnel.next_node);
let mut params = serde_json::Map::new();
if !r.title.is_empty() {
params.insert("title".into(), serde_json::Value::String(r.title));
}
store
.record_async(FunnelEvent::NodeAdded {
plan_id: plan_id.clone(),
node_id: node_id.clone(),
kind: r.kind,
params,
targets: r.targets,
prompt_excerpt: (!r.prompt.is_empty()).then_some(r.prompt),
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
for from in &r.needs {
store
.record_async(FunnelEvent::EdgeAdded {
plan_id: plan_id.clone(),
from_node: NodeId::new(from.clone()),
to_node: node_id.clone(),
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
}
store.funnel.promote_ready();
Ok(Response::new(pb::IdRef { id: node_id.as_str().to_string() }))
}
async fn link(&self, req: Request<pb::LinkRequest>) -> Result<Response<Empty>, Status> {
let wctx = self.ctx(req.metadata()).await?;
let r = req.into_inner();
let mut fg = wctx.funnel.lock().await;
let store = fg.as_mut().unwrap();
store
.record_async(FunnelEvent::EdgeAdded {
plan_id: PlanId::new(r.plan_id),
from_node: NodeId::new(r.from),
to_node: NodeId::new(r.to),
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
store.funnel.promote_ready();
Ok(Response::new(Empty {}))
}
async fn set_status(
&self,
req: Request<pb::SetStatusRequest>,
) -> Result<Response<Empty>, Status> {
let wctx = self.ctx(req.metadata()).await?;
let r = req.into_inner();
let status = match r.status.as_str() {
"ready" => NodeStatus::Ready,
"active" | "in_progress" => NodeStatus::InProgress,
"blocked" => NodeStatus::Blocked,
"done" => NodeStatus::Done,
"failed" | "abandoned" => NodeStatus::Failed,
other => {
return Err(invalid_arg(format!(
"unknown status {other:?}; expected ready|active|blocked|done|failed"
)))
}
};
let mut fg = wctx.funnel.lock().await;
let store = fg.as_mut().unwrap();
store
.record_async(FunnelEvent::NodeStatusChanged {
plan_id: PlanId::new(r.plan_id),
node_id: NodeId::new(r.node_id),
status,
why: (!r.why.is_empty()).then_some(r.why),
ts: chrono::Utc::now(),
})
.await
.map_err(internal)?;
store.funnel.promote_ready();
Ok(Response::new(Empty {}))
}
async fn next(&self, req: Request<Empty>) -> Result<Response<pb::NextStepList>, Status> {
let wctx = self.ctx(req.metadata()).await?;
let mut fg = wctx.funnel.lock().await;
let store = fg.as_mut().unwrap();
let steps = topo_ready(&mut store.funnel)
.into_iter()
.map(|s| pb::NextStep {
plan_id: s.plan_id.as_str().to_string(),
node_id: s.node_id.as_str().to_string(),
kind: s.kind,
summary: s.summary,
prompt: s.prompt_excerpt.unwrap_or_default(),
targets: s.targets,
})
.collect();
Ok(Response::new(pb::NextStepList { steps }))
}
async fn show(&self, req: Request<Empty>) -> Result<Response<pb::FunnelDump>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
wctx.ensure_funnel(true).await?;
let fg = wctx.funnel.lock().await;
let Some(store) = fg.as_ref() else {
return Ok(Response::new(pb::FunnelDump { ideas: Vec::new() }));
};
let f = &store.funnel;
let ideas = f
.ideas
.values()
.map(|idea| {
let plans = f
.plans
.values()
.filter(|p| p.idea_id == idea.id)
.map(|p| {
let nodes = p
.nodes
.values()
.map(|n| {
let title = n
.params
.get("title")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let deps = p
.edges
.iter()
.filter(|(_, to)| to == &n.id)
.map(|(from, _)| from.as_str().to_string())
.collect();
pb::FunnelDumpNode {
id: n.id.as_str().to_string(),
kind: n.kind.clone(),
title,
status: format!("{:?}", n.status),
deps,
}
})
.collect();
pb::FunnelDumpPlan {
id: p.id.as_str().to_string(),
idea_id: p.idea_id.as_str().to_string(),
summary: p.summary.clone(),
status: format!("{:?}", p.status),
nodes,
}
})
.collect();
pb::FunnelDumpIdea {
id: idea.id.as_str().to_string(),
text: idea.text.clone(),
source: idea.source.clone(),
plans,
}
})
.collect();
Ok(Response::new(pb::FunnelDump { ideas }))
}
}
struct VizSvc { state: Shared }
#[tonic::async_trait]
impl VizSvcTrait for VizSvc {
async fn timeline(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let workspace = req.into_inner().workspace;
let wh = wctx.warehouse.clone();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let root = wh.root().to_path_buf();
let timeline = nornir::viz::build_timeline(&wh, &workspace, Some(&root))?;
Ok(serde_json::to_string(&timeline)?)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn release_events(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
use nornir::warehouse::release_events::{query_release_events, EventSelector};
let wctx = self.state.ws(req.metadata())?.clone();
let wh = wctx.warehouse.clone();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let rows = wh.block_on(query_release_events(&wh, &EventSelector::All))?;
Ok(serde_json::to_string(&rows)?)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn bakeoff_results(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
use nornir::warehouse::agent_model_runs::{query_agent_model_runs, BakeoffSelector};
let wctx = self.state.ws(req.metadata())?.clone();
let wh = wctx.warehouse.clone();
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
let rows = wh.block_on(query_agent_model_runs(&wh, &BakeoffSelector::All))?;
Ok(serde_json::to_string(&rows)?)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
async fn knowledge(
&self,
req: Request<pb::VizTimelineRequest>,
) -> Result<Response<pb::JsonResponse>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let (workspace_root, members): (PathBuf, Vec<String>) = {
let l = wctx.loaded.lock().await;
(l.workspace_root.clone(), l.nornir.repo.keys().cloned().collect())
};
let json = tokio::task::spawn_blocking(move || -> anyhow::Result<String> {
use rayon::prelude::*;
let scans: Vec<(String, Result<knowledge::ScanResult, String>)> = members
.par_iter()
.map(|repo| {
let repo_root = config::repo_dir_resolved(&workspace_root, repo);
let scan = knowledge::scan_all(&repo_root, repo).map_err(|e| format!("{e:#}"));
(repo.clone(), scan)
})
.collect();
Ok(nornir::viz::knowledge::scan_summary_json(&scans).to_string())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::JsonResponse { json }))
}
}
struct WarehouseSvc { state: Shared }
#[tonic::async_trait]
impl WarehouseSvcTrait for WarehouseSvc {
async fn tables(
&self,
req: Request<pb::Empty>,
) -> Result<Response<pb::WarehouseTables>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let wh = wctx.warehouse.clone();
let names = tokio::task::spawn_blocking(move || wh.table_names())
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(pb::WarehouseTables { names }))
}
async fn scan(
&self,
req: Request<pb::WarehouseScanRequest>,
) -> Result<Response<pb::WarehouseScan>, Status> {
let wctx = self.state.ws(req.metadata())?.clone();
let wh = wctx.warehouse.clone();
let r = req.into_inner();
let limit = if r.limit == 0 { 500 } else { r.limit as usize };
let table = r.table;
let preview = tokio::task::spawn_blocking(move || wh.scan_preview(&table, limit))
.await
.map_err(internal)?
.map_err(internal)?;
let rows = preview
.rows
.into_iter()
.map(|cells| pb::WarehouseRow { cells })
.collect();
Ok(Response::new(pb::WarehouseScan { columns: preview.columns, rows }))
}
}
fn auth_interceptor(token: Arc<String>) -> impl Fn(Request<()>) -> Result<Request<()>, Status> + Clone {
move |req: Request<()>| -> Result<Request<()>, Status> {
let supplied = req
.metadata()
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.strip_prefix("Bearer "))
.unwrap_or("");
if supplied.is_empty() || supplied != token.as_str() {
return Err(Status::unauthenticated("missing or invalid bearer token"));
}
Ok(req)
}
}
struct WorkspacesSvc {
registry: Option<Arc<nornir::registry::Registry>>,
root: PathBuf,
state: Shared,
}
impl WorkspacesSvc {
fn reg(&self) -> Result<&Arc<nornir::registry::Registry>, Status> {
self.registry.as_ref().ok_or_else(|| {
Status::unavailable(
"workspace registry not enabled on this server \
(no <home>/.nornir/workspaces registry root)",
)
})
}
}
fn ws_to_pb(w: nornir::registry::Workspace) -> WorkspaceRecord {
WorkspaceRecord {
name: w.name,
mode: w.mode.as_str().to_string(),
descriptor: w.descriptor,
poll: w.poll,
current_snapshot: w.current_snapshot,
members: w
.members
.into_iter()
.map(|m| WorkspaceMember {
name: m.name,
remote: m.remote,
git_ref: m.git_ref,
last_seen_sha: m.last_seen_sha,
last_synced: m.last_synced,
sync_state: m.sync_state,
})
.collect(),
created_at: w.created_at,
updated_at: w.updated_at,
}
}
#[tonic::async_trait]
impl WorkspacesSvcTrait for WorkspacesSvc {
async fn list(&self, _req: Request<Empty>) -> Result<Response<WorkspaceList>, Status> {
let all = self.reg()?.list().map_err(internal)?;
Ok(Response::new(WorkspaceList {
workspaces: all.into_iter().map(ws_to_pb).collect(),
}))
}
async fn get(
&self,
req: Request<WorkspaceName>,
) -> Result<Response<WorkspaceRecord>, Status> {
let name = req.into_inner().name;
let w = self
.reg()?
.get(&name)
.map_err(internal)?
.ok_or_else(|| not_found(format!("no workspace `{name}`")))?;
Ok(Response::new(ws_to_pb(w)))
}
async fn register(
&self,
req: Request<RegisterWorkspaceRequest>,
) -> Result<Response<WorkspaceRecord>, Status> {
let reg = self.reg()?.clone();
let req = req.into_inner();
if req.name.is_empty() {
return Err(invalid_arg("`name` must not be empty"));
}
let created = reg.get(&req.name).map_err(internal)?.map(|w| w.created_at);
let ws = nornir::registry::Workspace::new(
req.name,
req.descriptor,
nornir::registry::Mode::parse(&req.mode),
req.poll,
created,
);
reg.upsert(&ws).map_err(internal)?;
Ok(Response::new(ws_to_pb(ws)))
}
async fn remove(&self, req: Request<WorkspaceName>) -> Result<Response<Empty>, Status> {
let name = req.into_inner().name;
if self.reg()?.remove(&name).map_err(internal)? {
Ok(Response::new(Empty {}))
} else {
Err(not_found(format!("no workspace `{name}`")))
}
}
async fn fetch(
&self,
req: Request<WorkspaceFetchRequest>,
) -> Result<Response<WorkspaceFetchReport>, Status> {
let reg = self.reg()?.clone();
let root = self.root.clone();
let state = self.state.clone();
let inner = req.into_inner();
let (name, force) = (inner.name, inner.force);
let (rep, snapshot) = tokio::task::spawn_blocking(move || -> Result<_> {
let rep = nornir::monitor::fetch_workspace(®, &root, &name)?;
let snapshot = if force || !rep.changed.is_empty() || workspace_needs_build(®, &name) {
let changed: Option<&[String]> =
if force || rep.changed.is_empty() { None } else { Some(&rep.changed) };
republish_served(&state, ®, &root, &name, changed)?.unwrap_or_default()
} else {
String::new()
};
Ok((rep, snapshot))
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(WorkspaceFetchReport {
workspace: rep.workspace,
fetched: rep.fetched as u32,
changed: rep.changed,
errors: rep
.errors
.into_iter()
.map(|(m, e)| format!("{m}: {e}"))
.collect(),
snapshot,
}))
}
}
async fn open_monitored_workspace(
name: &str,
root: &std::path::Path,
members: Vec<String>,
) -> Result<WorkspaceCtx> {
let git_dir = root.join(name).join("git");
let builds = root.join(name).join("builds");
std::fs::create_dir_all(&git_dir).ok();
std::fs::create_dir_all(&builds).ok();
let wh_root = builds.clone();
let warehouse = Arc::new(
tokio::task::spawn_blocking(move || IcebergWarehouse::open(&wh_root))
.await
.context("spawn warehouse open")?
.with_context(|| format!("open warehouse {}", builds.display()))?,
);
let index_dir = builds.join("cache").join("index");
let git_for_idx = git_dir.clone();
let wh_for_open = warehouse.clone();
let (idx, _restored) = tokio::task::spawn_blocking(move || {
index::Index::open_or_restore_at(&git_for_idx, &index_dir, &wh_for_open, "_workspace", None)
})
.await
.context("spawn index open")?
.context("warm index for monitored workspace")?;
let mut nornir = config::Nornir::default();
for m in &members {
nornir.repo.insert(m.clone(), config::Repo::default());
}
let loaded = config::Loaded {
nornir,
config_path: builds.join("nornir.toml"),
workspace_root: git_dir,
};
Ok(WorkspaceCtx {
loaded: Mutex::new(loaded),
index: Arc::new(idx),
warehouse,
mimir: Mutex::new(None),
funnel: Mutex::new(None),
})
}
fn spawn_server_poll_loop(
state: Shared,
reg: Arc<nornir::registry::Registry>,
root: PathBuf,
tick: std::time::Duration,
) {
tokio::spawn(async move {
let mut interval = tokio::time::interval(tick);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
let (state, reg, root) = (state.clone(), reg.clone(), root.clone());
match tokio::task::spawn_blocking(move || server_sweep(&state, ®, &root)).await {
Ok(Ok(())) => {}
Ok(Err(e)) => eprintln!("nornir-monitor: sweep error: {e:#}"),
Err(e) => eprintln!("nornir-monitor: sweep task panicked: {e}"),
}
}
});
}
fn server_sweep(
state: &AppState,
reg: &nornir::registry::Registry,
root: &std::path::Path,
) -> Result<()> {
use nornir::registry::Mode;
for ws in reg.list()? {
if ws.mode != Mode::Monitored {
continue;
}
let rep = match nornir::monitor::fetch_workspace(reg, root, &ws.name) {
Ok(r) => r,
Err(e) => {
eprintln!("nornir-monitor: {} fetch failed: {e:#}", ws.name);
continue;
}
};
for (m, e) in &rep.errors {
eprintln!("nornir-monitor: {}::{m} fetch error: {e}", ws.name);
}
let needs_build = workspace_needs_build(reg, &ws.name);
if rep.changed.is_empty() && !needs_build {
continue;
}
let why = if !rep.changed.is_empty() {
format!("changed [{}]", rep.changed.join(", "))
} else {
"empty warehouse".to_string()
};
eprintln!("nornir-monitor: {} {why} → republish", ws.name);
let changed: Option<&[String]> =
if rep.changed.is_empty() { None } else { Some(&rep.changed) };
match republish_served(state, reg, root, &ws.name, changed) {
Ok(Some(snap)) => {
eprintln!("nornir-monitor: {} republished → snapshot {snap}", ws.name)
}
Ok(None) => {}
Err(e) => eprintln!("nornir-monitor: {} republish failed: {e:#}", ws.name),
}
}
Ok(())
}
fn workspace_needs_build(reg: &nornir::registry::Registry, ws_name: &str) -> bool {
reg.get(ws_name)
.ok()
.flatten()
.map(|r| r.current_snapshot.trim().is_empty())
.unwrap_or(false)
}
fn republish_served(
state: &AppState,
reg: &nornir::registry::Registry,
root: &std::path::Path,
ws_name: &str,
changed: Option<&[String]>,
) -> Result<Option<String>> {
let Some(ctx) = state.workspaces.get(ws_name) else {
eprintln!("nornir-monitor: {ws_name} not served; skipping republish");
return Ok(None);
};
let rec = reg.get(ws_name)?;
let members: Vec<String> = rec
.as_ref()
.map(|w| w.members.iter().map(|m| m.name.clone()).collect())
.unwrap_or_default();
let descriptor = rec.as_ref().map(|w| w.descriptor.clone()).unwrap_or_default();
let git_dir = root.join(ws_name).join("git");
let index_dir = root.join(ws_name).join("builds").join("cache").join("index");
let deep_scan = nornir::workspace::WorkspaceDescriptor::load(std::path::Path::new(&descriptor))
.map(|d| d.workspace.deep_scan)
.unwrap_or(false);
let snap = nornir::monitor::republish_with(
&ctx.warehouse,
&ctx.index,
&git_dir,
&index_dir,
&members,
ws_name,
deep_scan,
changed,
)?;
if let Ok(Some(mut r)) = reg.get(ws_name) {
r.current_snapshot = snap.clone();
r.updated_at = chrono::Utc::now().to_rfc3339();
let _ = reg.upsert(&r);
}
Ok(Some(snap))
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "nornir_server=info,tonic=info".into()),
)
.with_writer(std::io::stderr)
.init();
let token = std::env::var("NORNIR_SERVER_TOKEN").map_err(|_| {
anyhow!(
"NORNIR_SERVER_TOKEN env var required. Future: mTLS with CN=<username>; \
see `nornir-server --help` and crate docs."
)
})?;
if token.len() < 16 {
return Err(anyhow!("NORNIR_SERVER_TOKEN must be ≥ 16 chars"));
}
let mut workspaces: HashMap<String, Arc<WorkspaceCtx>> = HashMap::new();
let mut default_ws: String;
if let Some(spec) = std::env::var_os("NORNIR_WORKSPACES") {
let spec = spec.to_string_lossy().into_owned();
let mut first: Option<String> = None;
for entry in spec.split(',').map(str::trim).filter(|s| !s.is_empty()) {
let (name, path) = entry
.split_once('=')
.ok_or_else(|| anyhow!("NORNIR_WORKSPACES entry `{entry}` must be name=path"))?;
eprintln!("nornir-server: opening workspace `{name}` …");
let ctx = Arc::new(
open_workspace(Some(PathBuf::from(path.trim())))
.await
.with_context(|| format!("open workspace `{name}`"))?,
);
let dir_name = ctx
.loaded
.lock()
.await
.workspace_root
.file_name()
.and_then(|s| s.to_str())
.map(String::from);
workspaces.insert(name.to_string(), ctx.clone());
if let Some(dn) = dir_name {
workspaces.entry(dn).or_insert_with(|| ctx.clone());
}
first.get_or_insert_with(|| name.to_string());
}
default_ws = first.ok_or_else(|| anyhow!("NORNIR_WORKSPACES is empty"))?;
} else {
let ctx = open_workspace(std::env::var_os("NORNIR_CONFIG").map(PathBuf::from)).await?;
let name = ctx
.loaded
.lock()
.await
.workspace_root
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("default")
.to_string();
default_ws = name.clone();
workspaces.insert(name, Arc::new(ctx));
}
let registry_root = config::registry_root();
let registry: Option<Arc<nornir::registry::Registry>> =
if registry_root.exists() {
match nornir::registry::Registry::open(®istry_root) {
Ok(r) => Some(Arc::new(r)),
Err(e) => {
eprintln!("nornir-server: registry open failed ({e:#}); monitored role off");
None
}
}
} else {
None
};
if let Some(reg) = ®istry {
if let Ok(spec) = std::env::var("NORNIR_MONITOR") {
let poll = std::env::var("NORNIR_POLL").unwrap_or_default();
for entry in spec.split(',').map(str::trim).filter(|s| !s.is_empty()) {
let Some((name, descriptor)) = entry.split_once('=') else {
eprintln!("nornir-server: NORNIR_MONITOR entry `{entry}` must be name=descriptor");
continue;
};
let created = reg.get(name).ok().flatten().map(|w| w.created_at);
let ws = nornir::registry::Workspace::new(
name.to_string(),
descriptor.to_string(),
nornir::registry::Mode::Monitored,
poll.clone(),
created,
);
match reg.upsert(&ws) {
Ok(()) => eprintln!(
"nornir-server: monitoring `{name}` ({} member(s)) from {descriptor}",
ws.members.len()
),
Err(e) => eprintln!("nornir-server: register `{name}` failed: {e:#}"),
}
}
}
for ws in reg.list().unwrap_or_default() {
if ws.mode != nornir::registry::Mode::Monitored || workspaces.contains_key(&ws.name) {
continue;
}
let members: Vec<String> = ws.members.iter().map(|m| m.name.clone()).collect();
match open_monitored_workspace(&ws.name, ®istry_root, members).await {
Ok(ctx) => {
eprintln!(
"nornir-server: serving monitored workspace `{}` (builds/ warehouse)",
ws.name
);
if default_ws.is_empty() {
default_ws = ws.name.clone();
}
workspaces.insert(ws.name.clone(), Arc::new(ctx));
}
Err(e) => eprintln!("nornir-server: open monitored `{}` failed: {e:#}", ws.name),
}
}
}
eprintln!(
"nornir-server: serving {} workspace(s); default `{}`",
workspaces.len(),
default_ws
);
let state: Shared = Arc::new(AppState {
workspaces,
default_ws,
#[cfg(any(feature = "embed-tract", feature = "embed-ort"))]
embedder: Mutex::new(None),
});
let addr: SocketAddr = std::env::var("NORNIR_SERVER_ADDR")
.unwrap_or_else(|_| "127.0.0.1:7878".into())
.parse()
.context("parse NORNIR_SERVER_ADDR")?;
eprintln!("nornir-server: listening on {addr}");
if let Some(reg) = registry.clone() {
let tick = nornir::monitor::parse_interval(
&std::env::var("NORNIR_POLL").unwrap_or_default(),
std::time::Duration::from_secs(60),
);
eprintln!(
"nornir-monitor: poll loop every {}s over registry at {}",
tick.as_secs(),
registry_root.display()
);
spawn_server_poll_loop(state.clone(), reg, registry_root.clone(), tick);
}
let token_arc = Arc::new(token);
let health = HealthServer::with_interceptor(
HealthSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let repos = ReposServer::with_interceptor(
ReposSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let guard_svc = GuardServer::with_interceptor(
GuardSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let release_svc = ReleaseServer::with_interceptor(
ReleaseSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let bench_svc = BenchServer::with_interceptor(
BenchSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let search_svc = SearchServer::with_interceptor(
SearchSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let index_svc = IndexServer::with_interceptor(
IndexSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let introspect_svc = IntrospectServer::with_interceptor(
IntrospectSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let knowledge_svc = KnowledgeServer::with_interceptor(
KnowledgeSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let docs_svc = DocsServer::with_interceptor(
DocsSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let mimir_svc = MimirServer::with_interceptor(
MimirSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let dwarf_svc = DwarfServer::with_interceptor(
DwarfSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let vector_svc = VectorServer::with_interceptor(
VectorSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let funnel_svc = FunnelServer::with_interceptor(
FunnelSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let viz_svc = VizServer::with_interceptor(
VizSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let warehouse_svc = WarehouseServer::with_interceptor(
WarehouseSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let workspaces_svc = WorkspacesServer::with_interceptor(
WorkspacesSvc {
registry: registry.clone(),
root: registry_root.clone(),
state: state.clone(),
},
auth_interceptor(token_arc.clone()),
);
Server::builder()
.add_service(health)
.add_service(repos)
.add_service(guard_svc)
.add_service(release_svc)
.add_service(bench_svc)
.add_service(search_svc)
.add_service(index_svc)
.add_service(introspect_svc)
.add_service(knowledge_svc)
.add_service(docs_svc)
.add_service(mimir_svc)
.add_service(dwarf_svc)
.add_service(vector_svc)
.add_service(funnel_svc)
.add_service(viz_svc)
.add_service(warehouse_svc)
.add_service(workspaces_svc)
.serve(addr)
.await
.context("tonic serve")?;
Ok(())
}