use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use tokio::sync::Mutex;
use tonic::{transport::Server, Request, Response, Status};
use nornir::config::{self, Loaded};
use nornir::{bench, docs, guard, index, introspect, knowledge, release};
use nornir::warehouse::iceberg::IcebergWarehouse;
use nornir::warehouse::dep_graph::{query_dep_graph_snapshots, topo_order_from_edges};
pub mod pb {
tonic::include_proto!("nornir.v1");
}
use pb::bench_server::{Bench as BenchSvcTrait, BenchServer};
use pb::docs_server::{Docs as DocsSvcTrait, DocsServer};
use pb::guard_server::{Guard as GuardSvcTrait, GuardServer};
use pb::health_server::{Health, HealthServer};
use pb::index_server::{Index as IndexSvcTrait, IndexServer};
use pb::introspect_server::{Introspect as IntrospectSvcTrait, IntrospectServer};
use pb::knowledge_server::{Knowledge as KnowledgeSvcTrait, KnowledgeServer};
use pb::release_server::{Release as ReleaseSvcTrait, ReleaseServer};
use pb::repos_server::{Repos as ReposSvcTrait, ReposServer};
use pb::search_server::{Search as SearchSvcTrait, SearchServer};
use pb::{
BenchHistoryResponse, BinaryOnly, CallQuery, DefinedInRequest, DocExport as PbDocExport,
DocsHistoryRequest, DocsHistoryResponse, DocsResponse, Empty, GateAllResult,
GateFailure, GateResult, GuardPath, GuardReport, IndexBlobFrame, IndexStatsResponse, Kv, Kvf,
KnowledgeCall, KnowledgeCallPathQuery, KnowledgeCallQuery, KnowledgeCalls, KnowledgeSymbol, KnowledgeSymbolQuery,
KnowledgeSymbols, NameList, OrderRequest, OrderResponse, PathBetweenRequest, PingResponse,
ProgressEvent, RegisterRepoRequest, RegisterRepoResponse, ReloadResponse, RepoList, RepoOnly,
RepoSummary, SearchHit, SearchRequest, SearchResponse, SnapshotRequest, SnapshotResponse,
SubmitBenchRequest, SubmitBenchResponse, Symbol as PbSymbol, SymbolList, SymbolLookupRequest,
UnregisterRepoRequest, UnregisterRepoResponse,
};
struct AppState {
loaded: Mutex<Loaded>,
#[allow(dead_code)] index: Arc<index::Index>,
#[allow(dead_code)] warehouse: Arc<IcebergWarehouse>,
}
type Shared = Arc<AppState>;
struct HealthSvc { state: Shared }
#[tonic::async_trait]
impl Health for HealthSvc {
async fn ping(&self, _req: Request<Empty>) -> Result<Response<PingResponse>, Status> {
let l = self.state.loaded.lock().await;
Ok(Response::new(PingResponse {
status: "ok".into(),
version: env!("CARGO_PKG_VERSION").into(),
repo_count: l.nornir.repo.len() as u32,
}))
}
}
struct ReposSvc { state: Shared }
fn invalid_arg<E: std::fmt::Display>(msg: E) -> Status { Status::invalid_argument(msg.to_string()) }
fn internal<E: std::fmt::Display>(msg: E) -> Status { Status::internal(msg.to_string()) }
fn not_found<E: std::fmt::Display>(msg: E) -> Status { Status::not_found(msg.to_string()) }
#[tonic::async_trait]
impl ReposSvcTrait for ReposSvc {
async fn list(&self, _req: Request<Empty>) -> Result<Response<RepoList>, Status> {
let l = self.state.loaded.lock().await;
let repos = l.nornir.repo.iter().map(|(name, r)| RepoSummary {
name: name.clone(),
remote: r.remote.clone(),
history: r.history.clone(),
readme: r.readme.clone(),
}).collect();
Ok(Response::new(RepoList { repos }))
}
async fn register(
&self,
req: Request<RegisterRepoRequest>,
) -> Result<Response<RegisterRepoResponse>, Status> {
let req = req.into_inner();
if req.name.is_empty() {
return Err(invalid_arg("`name` must not be empty"));
}
if req.name.contains(['.', '/', '\\', '[', ']']) {
return Err(invalid_arg(
"`name` must be a bare identifier (no '.', '/', '\\\\', '[', ']')",
));
}
let mut l = self.state.loaded.lock().await;
let cfg_path = l.config_path.clone();
let existing = std::fs::read_to_string(&cfg_path).ok().unwrap_or_default();
let mut doc: toml_edit::DocumentMut = existing
.parse()
.map_err(|e| internal(format!("parse {}: {e}", cfg_path.display())))?;
let repo_parent = doc
.entry("repo")
.or_insert_with(|| {
let mut t = toml_edit::Table::new();
t.set_implicit(true);
toml_edit::Item::Table(t)
})
.as_table_mut()
.ok_or_else(|| Status::failed_precondition("`repo` exists but is not a table"))?;
repo_parent.set_implicit(true);
let entry = repo_parent
.entry(&req.name)
.or_insert_with(|| toml_edit::Item::Table(toml_edit::Table::new()));
let tbl = entry.as_table_mut().ok_or_else(|| {
Status::failed_precondition(format!("`repo.{}` exists but is not a table", req.name))
})?;
if !req.remote.is_empty() { tbl.insert("remote", toml_edit::value(req.remote.as_str())); }
if !req.history.is_empty() { tbl.insert("history", toml_edit::value(req.history.as_str())); }
if !req.readme.is_empty() { tbl.insert("readme", toml_edit::value(req.readme.as_str())); }
if !req.path.is_empty() { tbl.insert("path", toml_edit::value(req.path.as_str())); }
if let Some(parent) = cfg_path.parent() {
std::fs::create_dir_all(parent).map_err(internal)?;
}
std::fs::write(&cfg_path, doc.to_string()).map_err(|e| {
if e.kind() == std::io::ErrorKind::PermissionDenied {
Status::failed_precondition(format!(
"{} is read-only (likely guard-locked); call Guard.Release first",
cfg_path.display(),
))
} else {
internal(e)
}
})?;
let repo = config::Repo {
remote: req.remote.clone(),
history: req.history.clone(),
readme: req.readme.clone(),
publish_order: Vec::new(),
gates: config::Gates::default(),
bench: config::BenchSpec::default(),
};
l.nornir.repo.insert(req.name.clone(), repo);
Ok(Response::new(RegisterRepoResponse {
name: req.name,
config_path: cfg_path.display().to_string(),
repo_count: l.nornir.repo.len() as u32,
}))
}
async fn unregister(
&self,
req: Request<UnregisterRepoRequest>,
) -> Result<Response<UnregisterRepoResponse>, Status> {
let req = req.into_inner();
let mut l = self.state.loaded.lock().await;
let cfg_path = l.config_path.clone();
let removed_mem = l.nornir.repo.remove(&req.name).is_some();
let removed_disk = if cfg_path.exists() {
let text = std::fs::read_to_string(&cfg_path).map_err(internal)?;
let mut doc: toml_edit::DocumentMut = text
.parse()
.map_err(|e| internal(format!("parse {}: {e}", cfg_path.display())))?;
let removed = doc
.get_mut("repo")
.and_then(|i| i.as_table_mut())
.and_then(|t| t.remove(&req.name))
.is_some();
if removed {
std::fs::write(&cfg_path, doc.to_string()).map_err(internal)?;
}
removed
} else {
false
};
if !removed_mem && !removed_disk {
return Err(not_found(format!("repo `{}` not found", req.name)));
}
Ok(Response::new(UnregisterRepoResponse {
name: req.name,
repo_count: l.nornir.repo.len() as u32,
}))
}
async fn reload(&self, _req: Request<Empty>) -> Result<Response<ReloadResponse>, Status> {
let mut l = self.state.loaded.lock().await;
let cfg_path = l.config_path.clone();
if !cfg_path.exists() {
return Err(not_found(format!("{} does not exist", cfg_path.display())));
}
let fresh = config::load_explicit(&cfg_path).map_err(internal)?;
*l = fresh;
Ok(Response::new(ReloadResponse {
config_path: cfg_path.display().to_string(),
repo_count: l.nornir.repo.len() as u32,
}))
}
}
struct GuardSvc { state: Shared }
fn to_pb(paths: Vec<guard::PathStatus>) -> GuardReport {
let changed_count = paths.iter().filter(|p| p.changed).count() as u32;
let paths = paths.into_iter().map(|p| GuardPath {
path: p.path.display().to_string(),
exists: p.exists,
writable: p.writable,
changed: p.changed,
}).collect();
GuardReport { paths, changed_count }
}
#[tonic::async_trait]
impl GuardSvcTrait for GuardSvc {
async fn status(&self, _req: Request<Empty>) -> Result<Response<GuardReport>, Status> {
let l = self.state.loaded.lock().await;
let report = guard::status(&l.workspace_root, &l.nornir.guard.forbidden);
Ok(Response::new(to_pb(report)))
}
async fn apply(&self, _req: Request<Empty>) -> Result<Response<GuardReport>, Status> {
let l = self.state.loaded.lock().await;
let report = guard::apply(&l.workspace_root, &l.nornir.guard.forbidden).map_err(internal)?;
Ok(Response::new(to_pb(report)))
}
async fn release(&self, _req: Request<Empty>) -> Result<Response<GuardReport>, Status> {
let l = self.state.loaded.lock().await;
let report = guard::release(&l.workspace_root, &l.nornir.guard.forbidden).map_err(internal)?;
Ok(Response::new(to_pb(report)))
}
}
struct ReleaseSvc { state: Shared }
fn last_run_for(loaded: &Loaded, name: &str) -> anyhow::Result<bench::BenchRun> {
let repo = loaded
.nornir
.repo
.get(name)
.ok_or_else(|| anyhow!("repo `{name}` not configured"))?;
let repo_root = config::Nornir::repo_dir(&loaded.workspace_root, name);
let path = repo_root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
let runs = bench::history::read_all(&path)?;
runs.into_iter()
.last()
.ok_or_else(|| anyhow!("no bench runs in {}", path.display()))
}
fn history_for(loaded: &Loaded, name: &str) -> Vec<bench::BenchRun> {
let Some(repo) = loaded.nornir.repo.get(name) else { return Vec::new() };
let repo_root = config::Nornir::repo_dir(&loaded.workspace_root, name);
let path = repo_root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
bench::history::read_all(&path).unwrap_or_default()
}
fn repo_root_for(loaded: &Loaded, name: &str) -> Result<PathBuf, Status> {
if !loaded.nornir.repo.contains_key(name) {
return Err(not_found(format!("repo `{name}` not configured")));
}
Ok(config::Nornir::repo_dir(&loaded.workspace_root, name))
}
fn gate_pass(name: &str) -> GateResult {
GateResult {
gate: name.into(),
status: "pass".into(),
message: String::new(),
version: String::new(),
max_drop_pct: 0.0,
}
}
fn gate_fail<E: std::fmt::Display>(name: &str, e: E) -> GateResult {
GateResult {
gate: name.into(),
status: "fail".into(),
message: format!("{e:#}"),
version: String::new(),
max_drop_pct: 0.0,
}
}
#[tonic::async_trait]
impl ReleaseSvcTrait for ReleaseSvc {
async fn gate_path_patches(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateResult>, Status> {
let name = req.into_inner().repo;
let l = self.state.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
Ok(Response::new(match release::gate::no_path_patches(&root) {
Ok(()) => gate_pass("no_path_patches"),
Err(e) => gate_fail("no_path_patches", e),
}))
}
async fn gate_nexus_floor(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateResult>, Status> {
let name = req.into_inner().repo;
let l = self.state.loaded.lock().await;
let _ = repo_root_for(&l, &name)?;
let run = match last_run_for(&l, &name) {
Ok(r) => r,
Err(e) => return Ok(Response::new(gate_fail("nexus_floor", e))),
};
let mut res = match release::gate::nexus_floor(&run) {
Ok(()) => gate_pass("nexus_floor"),
Err(e) => gate_fail("nexus_floor", e),
};
res.version = run.version;
Ok(Response::new(res))
}
async fn gate_no_regression(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateResult>, Status> {
let name = req.into_inner().repo;
let l = self.state.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let repo = l.nornir.repo.get(&name).unwrap();
let pct = if repo.gates.max_regression_pct > 0.0 {
repo.gates.max_regression_pct
} else {
10.0
};
let hp = root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
let run = match last_run_for(&l, &name) {
Ok(r) => r,
Err(e) => return Ok(Response::new(gate_fail("no_regression", e))),
};
let mut res = match release::gate::no_regression(&run, &hp, pct) {
Ok(()) => gate_pass("no_regression"),
Err(e) => gate_fail("no_regression", e),
};
res.version = run.version;
res.max_drop_pct = pct;
Ok(Response::new(res))
}
async fn gate_docs_fresh(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateResult>, Status> {
let name = req.into_inner().repo;
let l = self.state.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let run = match last_run_for(&l, &name) {
Ok(r) => r,
Err(e) => return Ok(Response::new(gate_fail("docs_fresh", e))),
};
let layout = docs::RepoLayout::new(&root);
let history = history_for(&l, &name);
let ctx = docs::Ctx::new(&root, &l.workspace_root, Some(&run)).with_history(&history);
Ok(Response::new(match docs::render_check_all(&layout, &ctx) {
Ok(_) => gate_pass("docs_fresh"),
Err(e) => gate_fail("docs_fresh", e),
}))
}
async fn gate_all(
&self,
req: Request<RepoOnly>,
) -> Result<Response<GateAllResult>, Status> {
let name = req.into_inner().repo;
let l = self.state.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let repo = l.nornir.repo.get(&name).unwrap().clone();
let g = &repo.gates;
let mut passed: Vec<String> = Vec::new();
let mut failed: Vec<GateFailure> = Vec::new();
let push = |passed: &mut Vec<String>,
failed: &mut Vec<GateFailure>,
n: &str,
r: anyhow::Result<()>| match r {
Ok(()) => passed.push(n.into()),
Err(e) => failed.push(GateFailure {
name: n.into(),
error: format!("{e:#}"),
}),
};
if g.no_path_patches {
push(&mut passed, &mut failed, "no_path_patches",
release::gate::no_path_patches(&root));
}
let last = last_run_for(&l, &name);
if g.nexus_floor {
let r = last.as_ref()
.map_err(|e| anyhow!("{e:#}"))
.and_then(|r| release::gate::nexus_floor(r));
push(&mut passed, &mut failed, "nexus_floor", r);
}
if g.no_regression {
let pct = if g.max_regression_pct > 0.0 { g.max_regression_pct } else { 10.0 };
let hp = root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
let r = last.as_ref()
.map_err(|e| anyhow!("{e:#}"))
.and_then(|r| release::gate::no_regression(r, &hp, pct));
push(&mut passed, &mut failed, "no_regression", r);
}
if !g.integration_roundtrip.is_empty() {
let kinds: Vec<&str> = g.integration_roundtrip.iter().map(|s| s.as_str()).collect();
push(&mut passed, &mut failed, "integration_roundtrip",
release::gate::integration_roundtrip_via_cargo_test(&root, &kinds));
}
if g.docs_fresh {
let ws_root = l.workspace_root.clone();
let r: anyhow::Result<()> = (|| {
let run = last.as_ref().map_err(|e| anyhow!("{e:#}"))?;
let layout = docs::RepoLayout::new(&root);
let history = history_for(&l, &name);
let ctx = docs::Ctx::new(&root, &ws_root, Some(run)).with_history(&history);
docs::render_check_all(&layout, &ctx).map(|_| ())
})();
push(&mut passed, &mut failed, "docs_fresh", r);
}
Ok(Response::new(GateAllResult { repo: name, passed, failed }))
}
async fn order(
&self,
req: Request<OrderRequest>,
) -> Result<Response<OrderResponse>, Status> {
let mut ws = req.into_inner().workspace;
if ws.is_empty() {
ws = "workspace_holger".into();
}
let (selected, wh_clone) = {
let l = self.state.loaded.lock().await;
(
l.nornir.repo.keys().cloned().collect::<Vec<_>>(),
self.state.warehouse.clone(),
)
};
let ws_for_q = ws.clone();
let snapshots = tokio::task::spawn_blocking(move || {
wh_clone.block_on(query_dep_graph_snapshots(&wh_clone, &ws_for_q, None))
})
.await
.map_err(internal)?
.map_err(internal)?;
let latest = snapshots.into_iter().last();
match latest {
Some(snap) => {
let order = topo_order_from_edges(&selected, &snap.edges);
Ok(Response::new(OrderResponse {
order,
source: "iceberg".into(),
snapshot_id: snap.snapshot_id.to_string(),
}))
}
None => Ok(Response::new(OrderResponse {
order: selected,
source: "fallback".into(),
snapshot_id: String::new(),
})),
}
}
type ProgressStream = std::pin::Pin<Box<
dyn tokio_stream::Stream<Item = Result<ProgressEvent, Status>> + Send + 'static,
>>;
async fn progress(
&self,
_req: Request<Empty>,
) -> Result<Response<Self::ProgressStream>, Status> {
let log_dir = self
.state
.loaded
.lock()
.await
.workspace_root
.join("workspace_holger/.nornir/logs");
let (tx, rx) = tokio::sync::mpsc::channel::<Result<ProgressEvent, Status>>(64);
tokio::spawn(async move {
if let Err(e) = tail_progress(log_dir, tx.clone()).await {
let _ = tx.send(Err(internal(e))).await;
}
});
let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream) as Self::ProgressStream))
}
}
async fn latest_log(log_dir: &std::path::Path) -> Option<PathBuf> {
let mut best: Option<(std::time::SystemTime, PathBuf)> = None;
let mut rd = tokio::fs::read_dir(log_dir).await.ok()?;
while let Ok(Some(entry)) = rd.next_entry().await {
let name = entry.file_name();
let n = name.to_string_lossy();
if !(n.starts_with("release-run-") && n.ends_with(".events.ndjson")) {
continue;
}
if let Ok(meta) = entry.metadata().await {
if let Ok(mt) = meta.modified() {
match &best {
Some((t, _)) if *t >= mt => {}
_ => best = Some((mt, entry.path())),
}
}
}
}
best.map(|(_, p)| p)
}
async fn tail_progress(
log_dir: PathBuf,
tx: tokio::sync::mpsc::Sender<Result<ProgressEvent, Status>>,
) -> anyhow::Result<()> {
use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
let path = loop {
if let Some(p) = latest_log(&log_dir).await {
break p;
}
if tx.is_closed() {
return Ok(());
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
};
let mut f = tokio::fs::File::open(&path).await?;
f.seek(std::io::SeekFrom::Start(0)).await?;
let mut reader = tokio::io::BufReader::new(f);
let mut buf = String::new();
loop {
buf.clear();
let n = reader.read_line(&mut buf).await?;
if n == 0 {
if tx.is_closed() {
return Ok(());
}
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
continue;
}
let line = buf.trim();
if line.is_empty() {
continue;
}
match serde_json::from_str::<release::progress::ReleaseEvent>(line) {
Ok(ev) => {
let is_end = matches!(ev, release::progress::ReleaseEvent::RunEnd { .. });
let pb_ev = release_event_to_pb(ev);
if tx.send(Ok(pb_ev)).await.is_err() {
return Ok(());
}
if is_end {
return Ok(());
}
}
Err(_) => continue,
}
}
}
fn release_event_to_pb(ev: release::progress::ReleaseEvent) -> ProgressEvent {
use pb::progress_event::Kind as K;
use release::progress::ReleaseEvent as R;
let kind = match ev {
R::RunStart { run_id, workspace, .. } => K::RunStart(pb::RunStart { run_id, workspace }),
R::RepoStart { repo, sha, .. } => K::RepoStart(pb::RepoStart { repo, sha }),
R::PhaseStart { repo, phase, .. } => K::PhaseStart(pb::PhaseStart { repo, phase }),
R::PhaseEnd { repo, phase, ok, duration_ms, .. } => {
K::PhaseEnd(pb::PhaseEnd { repo, phase, ok, duration_ms })
}
R::BinaryStart { repo, binary, .. } => K::BinaryStart(pb::BinaryStart { repo, binary }),
R::TestPass { repo, binary, name, .. } => K::TestPass(pb::TestPass { repo, binary, name }),
R::TestFail { repo, binary, name, .. } => K::TestFail(pb::TestFail { repo, binary, name }),
R::BinaryDone { repo, binary, passed, failed, .. } => {
K::BinaryDone(pb::BinaryDone { repo, binary, passed, failed })
}
R::RepoEnd { repo, ok, .. } => K::RepoEnd(pb::RepoEnd { repo, ok }),
R::RunEnd { run_id, ok, .. } => K::RunEnd(pb::RunEnd { run_id, ok }),
};
ProgressEvent { kind: Some(kind) }
}
struct BenchSvc { state: Shared }
fn pb_bench_run_from(run: &bench::BenchRun) -> pb::BenchRun {
let results = run
.results
.iter()
.map(|r| pb::BenchResult {
name: r.name.clone(),
metrics: r
.metrics
.iter()
.filter_map(|(k, v)| v.as_f64().map(|f| Kvf { key: k.clone(), value: f }))
.collect(),
})
.collect();
let tests = run
.tests
.iter()
.map(|t| pb::TestOutcome {
name: t.name.clone(),
passed: t.passed,
duration_ms: t.duration_ms.unwrap_or(0.0),
has_duration: t.duration_ms.is_some(),
message: t.message.clone().unwrap_or_default(),
})
.collect();
pb::BenchRun {
date: run.date.clone(),
timestamp: run.timestamp.clone().unwrap_or_default(),
version: run.version.clone(),
machine: run.machine.clone(),
cores: run.cores,
results,
tests,
}
}
fn bench_run_from_pb(p: pb::BenchRun) -> bench::BenchRun {
let results = p
.results
.into_iter()
.map(|r| {
let mut map = serde_json::Map::new();
for kv in r.metrics {
if let Some(num) = serde_json::Number::from_f64(kv.value) {
map.insert(kv.key, serde_json::Value::Number(num));
}
}
bench::BenchResult { name: r.name, metrics: map }
})
.collect();
let tests = p
.tests
.into_iter()
.map(|t| bench::TestOutcome {
name: t.name,
passed: t.passed,
duration_ms: t.has_duration.then_some(t.duration_ms),
message: if t.message.is_empty() { None } else { Some(t.message) },
})
.collect();
bench::BenchRun {
date: p.date,
timestamp: if p.timestamp.is_empty() { None } else { Some(p.timestamp) },
version: p.version,
machine: p.machine,
cores: p.cores,
results,
tests,
}
}
#[tonic::async_trait]
impl BenchSvcTrait for BenchSvc {
async fn history(
&self,
req: Request<RepoOnly>,
) -> Result<Response<BenchHistoryResponse>, Status> {
let name = req.into_inner().repo;
let l = self.state.loaded.lock().await;
let repo = l
.nornir
.repo
.get(&name)
.ok_or_else(|| not_found(format!("repo `{name}` not configured")))?;
let repo_root = config::Nornir::repo_dir(&l.workspace_root, &name);
let path = repo_root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
let runs = if path.exists() {
bench::history::read_all(&path).map_err(internal)?
} else {
Vec::new()
};
Ok(Response::new(BenchHistoryResponse {
repo: name,
runs: runs.iter().map(pb_bench_run_from).collect(),
}))
}
async fn submit(
&self,
req: Request<SubmitBenchRequest>,
) -> Result<Response<SubmitBenchResponse>, Status> {
let req = req.into_inner();
let pb_run = req.run.ok_or_else(|| invalid_arg("`run` is required"))?;
let run = bench_run_from_pb(pb_run);
if run.machine.is_empty() {
return Err(invalid_arg("BenchRun.machine must not be empty"));
}
let l = self.state.loaded.lock().await;
let repo = l
.nornir
.repo
.get(&req.repo)
.ok_or_else(|| not_found(format!("repo `{}` not configured", req.repo)))?;
let repo_root = config::Nornir::repo_dir(&l.workspace_root, &req.repo);
let path = repo_root.join(if repo.history.is_empty() {
"bench_history.jsonl"
} else {
&repo.history
});
drop(l); let run_id = self
.state
.warehouse
.append_bench_run_async(&req.repo, &run)
.await
.map_err(internal)?;
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(internal)?;
}
bench::history::append(&path, &run).map_err(internal)?;
Ok(Response::new(SubmitBenchResponse { run_id: run_id.to_string() }))
}
}
struct SearchSvc { state: Shared }
#[tonic::async_trait]
impl SearchSvcTrait for SearchSvc {
async fn query(
&self,
req: Request<SearchRequest>,
) -> Result<Response<SearchResponse>, Status> {
let req = req.into_inner();
let corpus = if req.corpus.is_empty() {
None
} else {
Some(
index::Corpus::parse(&req.corpus)
.ok_or_else(|| invalid_arg(format!("unknown corpus `{}`", req.corpus)))?,
)
};
let repo_opt = if req.repo.is_empty() { None } else { Some(req.repo.clone()) };
let limit = if req.limit == 0 { 20 } else { req.limit as usize };
let idx = self.state.index.clone();
let q = req.query.clone();
let hits = tokio::task::spawn_blocking(move || {
idx.search(&q, corpus, repo_opt.as_deref(), limit)
})
.await
.map_err(internal)?
.map_err(internal)?;
let hits = hits
.into_iter()
.map(|h| SearchHit {
id: String::new(),
corpus: h.corpus,
repo: h.repo,
path: h.path,
score: h.score,
snippet: h.snippet,
title: h.title,
})
.collect();
Ok(Response::new(SearchResponse { hits }))
}
}
struct IndexSvc { state: Shared }
#[tonic::async_trait]
impl IndexSvcTrait for IndexSvc {
async fn stats(&self, _req: Request<Empty>) -> Result<Response<IndexStatsResponse>, Status> {
let idx = self.state.index.clone();
let stats = tokio::task::spawn_blocking(move || idx.stats())
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(IndexStatsResponse {
total: stats.total,
by_corpus: stats
.by_corpus
.into_iter()
.map(|(k, v)| Kv { key: k, value: v.to_string() })
.collect(),
}))
}
async fn snapshot(
&self,
req: Request<SnapshotRequest>,
) -> Result<Response<SnapshotResponse>, Status> {
let req = req.into_inner();
if req.git_sha.is_empty() {
return Err(invalid_arg("`git_sha` is required"));
}
let workspace = if req.workspace.is_empty() { "workspace_holger".into() } else { req.workspace };
let repo = if req.repo.is_empty() { "_workspace".into() } else { req.repo };
let branch = if req.branch.is_empty() { "unknown".into() } else { req.branch };
let index_dir = self.state.index.index_dir().to_path_buf();
let wh = self.state.warehouse.clone();
let snap = tokio::task::spawn_blocking(move || {
nornir::index::snapshot::snapshot_to_iceberg(&wh, &workspace, &repo, &req.git_sha, &branch, &index_dir)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(snapshot_ref_to_pb(snap)))
}
async fn upload_snapshot(
&self,
req: Request<tonic::Streaming<IndexBlobFrame>>,
) -> Result<Response<SnapshotResponse>, Status> {
use pb::index_blob_frame::Body;
let mut stream = req.into_inner();
let tmp_root = std::env::temp_dir()
.join(format!("nornir-upload-{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&tmp_root).map_err(internal)?;
struct TmpDir(PathBuf);
impl Drop for TmpDir {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.0);
}
}
let _guard = TmpDir(tmp_root.clone());
let root = tmp_root;
let mut meta: Option<pb::IndexBlobMeta> = None;
let mut cur_file: Option<(PathBuf, u64, std::fs::File, u64)> = None;
while let Some(frame) = stream.message().await? {
match frame.body {
Some(Body::Meta(m)) => {
if meta.is_some() {
return Err(invalid_arg("meta frame seen twice"));
}
meta = Some(m);
}
Some(Body::File(f)) => {
if meta.is_none() {
return Err(invalid_arg("first frame must carry `meta`"));
}
if let Some((p, declared, _, written)) = cur_file.take() {
if written != declared {
return Err(invalid_arg(format!(
"file {} declared {} bytes, got {}", p.display(), declared, written
)));
}
}
if f.rel_path.is_empty() || f.rel_path.contains("..") {
return Err(invalid_arg("rel_path must be non-empty and not contain `..`"));
}
let path = root.join(&f.rel_path);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(internal)?;
}
let fh = std::fs::File::create(&path).map_err(internal)?;
cur_file = Some((path, f.size_bytes, fh, 0));
}
Some(Body::Chunk(bytes)) => {
let Some((_, _, fh, written)) = cur_file.as_mut() else {
return Err(invalid_arg("chunk before file"));
};
use std::io::Write;
fh.write_all(&bytes).map_err(internal)?;
*written += bytes.len() as u64;
}
None => return Err(invalid_arg("empty frame")),
}
}
if let Some((p, declared, _, written)) = cur_file.take() {
if written != declared {
return Err(invalid_arg(format!(
"file {} declared {} bytes, got {}", p.display(), declared, written
)));
}
}
let meta = meta.ok_or_else(|| invalid_arg("stream had no `meta` frame"))?;
if meta.git_sha.is_empty() {
return Err(invalid_arg("meta.git_sha is required"));
}
let workspace = if meta.workspace.is_empty() { "workspace_holger".into() } else { meta.workspace };
let repo = if meta.repo.is_empty() { "_workspace".into() } else { meta.repo };
let branch = if meta.branch.is_empty() { "unknown".into() } else { meta.branch };
let wh = self.state.warehouse.clone();
let snap = tokio::task::spawn_blocking(move || {
nornir::index::snapshot::snapshot_to_iceberg(&wh, &workspace, &repo, &meta.git_sha, &branch, &root)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(snapshot_ref_to_pb(snap)))
}
}
fn snapshot_ref_to_pb(s: nornir::index::snapshot::SnapshotRef) -> SnapshotResponse {
SnapshotResponse {
snapshot_id: s.snapshot_id.to_string(),
workspace: s.workspace,
repo: s.repo,
git_sha: s.git_sha,
branch: s.branch,
schema_hash: s.schema_hash,
blob_count: s.blob_count as u64,
total_bytes: s.total_bytes as u64,
}
}
struct IntrospectSvc { state: Shared }
fn resolve_binary(workspace_root: &std::path::Path, binary: &str) -> PathBuf {
let p = PathBuf::from(binary);
if p.is_absolute() { p } else { workspace_root.join(p) }
}
fn sym_to_pb(s: &introspect::artifact::Symbol) -> PbSymbol {
PbSymbol {
name: s.name.clone(),
name_demangled: s.name_demangled.clone(),
name_mangled: s.name_mangled.clone(),
file: s.file.clone(),
line: s.line.unwrap_or(0),
size_bytes: s.size_bytes.unwrap_or(0),
krate: s.krate.clone(),
}
}
#[tonic::async_trait]
impl IntrospectSvcTrait for IntrospectSvc {
async fn symbols(&self, req: Request<BinaryOnly>) -> Result<Response<SymbolList>, Status> {
let ws = self.state.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &req.into_inner().binary);
let ws2 = ws.clone();
let syms = tokio::task::spawn_blocking(move || {
introspect::artifact::extract_symbols(&bin, &ws2)
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(SymbolList { symbols: syms.iter().map(sym_to_pb).collect() }))
}
async fn symbol_lookup(
&self,
req: Request<SymbolLookupRequest>,
) -> Result<Response<SymbolList>, Status> {
let r = req.into_inner();
let ws = self.state.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &r.binary);
let ws2 = ws.clone();
let limit = if r.limit == 0 { 50 } else { r.limit as usize };
let pat = r.pattern;
let syms = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<introspect::artifact::Symbol>> {
let all = introspect::artifact::extract_symbols(&bin, &ws2)?;
Ok(introspect::artifact::lookup(&all, &pat)
.into_iter()
.take(limit)
.cloned()
.collect())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(SymbolList { symbols: syms.iter().map(sym_to_pb).collect() }))
}
async fn defined_in(
&self,
req: Request<DefinedInRequest>,
) -> Result<Response<SymbolList>, Status> {
let r = req.into_inner();
let ws = self.state.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &r.binary);
let ws2 = ws.clone();
let suffix = r.suffix;
let syms = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<introspect::artifact::Symbol>> {
let all = introspect::artifact::extract_symbols(&bin, &ws2)?;
Ok(introspect::artifact::defined_in(&all, &suffix)
.into_iter()
.cloned()
.collect())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(SymbolList { symbols: syms.iter().map(sym_to_pb).collect() }))
}
async fn callers(&self, req: Request<CallQuery>) -> Result<Response<NameList>, Status> {
Ok(Response::new(NameList { names: callgraph_query(&self.state, req.into_inner(), true).await? }))
}
async fn callees(&self, req: Request<CallQuery>) -> Result<Response<NameList>, Status> {
Ok(Response::new(NameList { names: callgraph_query(&self.state, req.into_inner(), false).await? }))
}
async fn path_between(
&self,
req: Request<PathBetweenRequest>,
) -> Result<Response<NameList>, Status> {
let r = req.into_inner();
let ws = self.state.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &r.binary);
let ws2 = ws.clone();
let names = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<String>> {
let edges = introspect::callgraph_dwarf::extract_callgraph(&bin, &ws2)?;
let cg = introspect::callgraph_dwarf::Callgraph::from_edges(&edges);
Ok(cg.path_between(&r.from, &r.to).unwrap_or_default())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(NameList { names }))
}
}
async fn callgraph_query(state: &Shared, r: CallQuery, callers: bool) -> Result<Vec<String>, Status> {
let ws = state.loaded.lock().await.workspace_root.clone();
let bin = resolve_binary(&ws, &r.binary);
let ws2 = ws.clone();
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<String>> {
let edges = introspect::callgraph_dwarf::extract_callgraph(&bin, &ws2)?;
let cg = introspect::callgraph_dwarf::Callgraph::from_edges(&edges);
Ok(if callers { cg.callers_of(&r.name) } else { cg.callees_of(&r.name) })
})
.await
.map_err(internal)?
.map_err(internal)
}
struct KnowledgeSvc { state: Shared }
fn ksym_to_pb(s: &knowledge::symbols::SymbolRow) -> KnowledgeSymbol {
KnowledgeSymbol {
crate_name: s.crate_name.clone(),
module_path: s.module_path.clone(),
item_kind: s.item_kind.clone(),
item_name: s.item_name.clone(),
visibility: s.visibility.clone(),
file: s.file.clone(),
line: s.line,
doc_lines: s.doc_lines,
signature: s.signature.clone().unwrap_or_default(),
}
}
fn kcall_to_pb(c: &knowledge::symbols::CallEdgeRow) -> KnowledgeCall {
KnowledgeCall {
crate_name: c.crate_name.clone(),
caller_path: c.caller_path.clone(),
callee_ident: c.callee_ident.clone(),
call_kind: c.call_kind.clone(),
file: c.file.clone(),
line: c.line,
}
}
#[tonic::async_trait]
impl KnowledgeSvcTrait for KnowledgeSvc {
async fn symbol_lookup(
&self,
req: Request<KnowledgeSymbolQuery>,
) -> Result<Response<KnowledgeSymbols>, Status> {
let r = req.into_inner();
let wh = self.state.warehouse.clone();
let limit = if r.limit == 0 { 50 } else { r.limit as usize };
let symbols = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<KnowledgeSymbol>> {
let view = knowledge::query::load_latest(&wh, &r.repo)?;
Ok(view.symbol_lookup(&r.arg, limit).iter().map(|s| ksym_to_pb(s)).collect())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(KnowledgeSymbols { symbols }))
}
async fn defined_in(
&self,
req: Request<KnowledgeSymbolQuery>,
) -> Result<Response<KnowledgeSymbols>, Status> {
let r = req.into_inner();
let wh = self.state.warehouse.clone();
let limit = if r.limit == 0 { 100 } else { r.limit as usize };
let symbols = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<KnowledgeSymbol>> {
let view = knowledge::query::load_latest(&wh, &r.repo)?;
Ok(view.defined_in(&r.arg).iter().take(limit).map(|s| ksym_to_pb(s)).collect())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(KnowledgeSymbols { symbols }))
}
async fn callers(
&self,
req: Request<KnowledgeCallQuery>,
) -> Result<Response<KnowledgeCalls>, Status> {
Ok(Response::new(KnowledgeCalls {
calls: knowledge_call_query(&self.state, req.into_inner(), true).await?,
}))
}
async fn callees(
&self,
req: Request<KnowledgeCallQuery>,
) -> Result<Response<KnowledgeCalls>, Status> {
Ok(Response::new(KnowledgeCalls {
calls: knowledge_call_query(&self.state, req.into_inner(), false).await?,
}))
}
async fn call_path(
&self,
req: Request<KnowledgeCallPathQuery>,
) -> Result<Response<NameList>, Status> {
let r = req.into_inner();
let wh = self.state.warehouse.clone();
let names = tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<String>> {
let view = knowledge::query::load_latest(&wh, &r.repo)?;
Ok(view.call_path(&r.from, &r.to).unwrap_or_default())
})
.await
.map_err(internal)?
.map_err(internal)?;
Ok(Response::new(NameList { names }))
}
}
async fn knowledge_call_query(
state: &Shared,
r: KnowledgeCallQuery,
callers: bool,
) -> Result<Vec<KnowledgeCall>, Status> {
let wh = state.warehouse.clone();
let limit = if r.limit == 0 { 100 } else { r.limit as usize };
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<KnowledgeCall>> {
let view = knowledge::query::load_latest(&wh, &r.repo)?;
let hits = if callers { view.callers_of(&r.name) } else { view.callees_of(&r.name) };
Ok(hits.iter().take(limit).map(|c| kcall_to_pb(c)).collect())
})
.await
.map_err(internal)?
.map_err(internal)
}
struct DocsSvc { state: Shared }
#[tonic::async_trait]
impl DocsSvcTrait for DocsSvc {
async fn init(&self, req: Request<RepoOnly>) -> Result<Response<DocsResponse>, Status> {
let name = req.into_inner().repo;
let l = self.state.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let layout = docs::RepoLayout::new(&root);
let srcs = docs::init_repo(&layout).map_err(internal)?;
Ok(Response::new(DocsResponse {
repo: name,
status: "ok".into(),
artifacts: srcs.into_iter().map(|p| p.display().to_string()).collect(),
detail: layout.nornir_dir().display().to_string(),
}))
}
async fn render(&self, req: Request<RepoOnly>) -> Result<Response<DocsResponse>, Status> {
let name = req.into_inner().repo;
let l = self.state.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let layout = docs::RepoLayout::new(&root);
let last = last_run_for(&l, &name).ok();
let history = history_for(&l, &name);
let ctx = docs::Ctx::new(&root, &l.workspace_root, last.as_ref()).with_history(&history);
let reports = docs::render_all(&layout, &ctx).map_err(internal)?;
Ok(Response::new(DocsResponse {
repo: name,
status: "ok".into(),
artifacts: reports.iter().map(|r| r.output.display().to_string()).collect(),
detail: format!("{} doc(s) rendered", reports.len()),
}))
}
async fn check(&self, req: Request<RepoOnly>) -> Result<Response<DocsResponse>, Status> {
let name = req.into_inner().repo;
let l = self.state.loaded.lock().await;
let root = repo_root_for(&l, &name)?;
let layout = docs::RepoLayout::new(&root);
let last = last_run_for(&l, &name).ok();
let history = history_for(&l, &name);
let ctx = docs::Ctx::new(&root, &l.workspace_root, last.as_ref()).with_history(&history);
let (status, detail) = match docs::render_check_all(&layout, &ctx) {
Ok(_) => ("ok".to_string(), String::new()),
Err(e) => ("drift".to_string(), format!("{e:#}")),
};
Ok(Response::new(DocsResponse {
repo: name,
status,
artifacts: Vec::new(),
detail,
}))
}
async fn history(
&self,
req: Request<DocsHistoryRequest>,
) -> Result<Response<DocsHistoryResponse>, Status> {
let r = req.into_inner();
let repo = r.repo.clone();
let filter = docs::ExportFilter {
doc_name: if r.doc.is_empty() { None } else { Some(r.doc) },
version: if r.version.is_empty() { None } else { Some(r.version) },
format: if r.format.is_empty() { None } else { Some(r.format) },
limit: Some(if r.limit == 0 { 50 } else { r.limit as usize }),
};
let wh = self.state.warehouse.clone();
let rows = tokio::task::spawn_blocking(move || docs::list_doc_exports(&wh, &repo, &filter))
.await
.map_err(internal)?
.map_err(internal)?;
let entries = rows
.into_iter()
.map(|e| PbDocExport {
doc: e.doc_name,
version: e.version,
format: e.format,
path: e.export_id,
exported_at: e.generated_at,
size_bytes: e.byte_len.max(0) as u64,
})
.collect();
Ok(Response::new(DocsHistoryResponse { entries }))
}
}
fn auth_interceptor(token: Arc<String>) -> impl Fn(Request<()>) -> Result<Request<()>, Status> + Clone {
move |req: Request<()>| -> Result<Request<()>, Status> {
let supplied = req
.metadata()
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.strip_prefix("Bearer "))
.unwrap_or("");
if supplied.is_empty() || supplied != token.as_str() {
return Err(Status::unauthenticated("missing or invalid bearer token"));
}
Ok(req)
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "nornir_server=info,tonic=info".into()),
)
.with_writer(std::io::stderr)
.init();
let token = std::env::var("NORNIR_SERVER_TOKEN").map_err(|_| {
anyhow!(
"NORNIR_SERVER_TOKEN env var required. Future: mTLS with CN=<username>; \
see `nornir-server --help` and crate docs."
)
})?;
if token.len() < 16 {
return Err(anyhow!("NORNIR_SERVER_TOKEN must be ≥ 16 chars"));
}
let config_path = std::env::var_os("NORNIR_CONFIG").map(PathBuf::from);
let loaded = match config_path {
Some(p) => config::load_explicit(&p)?,
None => match config::discover(&std::env::current_dir()?) {
Ok(l) => l,
Err(_) => {
let cwd = std::env::current_dir()?;
eprintln!(
"nornir-server: no nornir.toml discovered; starting empty \
(use Repos.Register to add projects)"
);
config::Loaded {
nornir: config::Nornir::default(),
config_path: cwd.join("nornir.toml"),
workspace_root: cwd,
}
}
},
};
eprintln!(
"nornir-server: loaded {} repo(s) from {}",
loaded.nornir.repo.len(),
loaded.config_path.display()
);
let warehouse_root = if let Some(env) = std::env::var_os("NORNIR_WAREHOUSE") {
let p = PathBuf::from(env);
if p.is_absolute() { p } else { loaded.workspace_root.join(p) }
} else if loaded.nornir.storage.local_path.is_empty() {
loaded.workspace_root.join("workspace_holger/.nornir/warehouse")
} else {
loaded.workspace_root.join(&loaded.nornir.storage.local_path).join("warehouse")
};
std::fs::create_dir_all(&warehouse_root).ok();
let warehouse_root_for_open = warehouse_root.clone();
let warehouse = tokio::task::spawn_blocking(move || {
IcebergWarehouse::open(&warehouse_root_for_open)
})
.await
.context("spawn warehouse open")?
.with_context(|| format!("open iceberg warehouse at {}", warehouse_root.display()))?;
let warehouse = Arc::new(warehouse);
eprintln!("nornir-server: warehouse online at {}", warehouse_root.display());
let ws_root = loaded.workspace_root.clone();
let index_dir = {
let lp = &loaded.nornir.storage.local_path;
if lp.is_empty() {
ws_root.join(".nornir/cache/index")
} else {
ws_root.join(lp).join("cache/index")
}
};
let wh_for_open = warehouse.clone();
let (idx, restored) = tokio::task::spawn_blocking(move || {
index::Index::open_or_restore_at(&ws_root, &index_dir, &wh_for_open, "_workspace", None)
})
.await
.context("spawn index open")?
.context("warm Tantivy index")?;
eprintln!(
"nornir-server: index online ({})",
if restored { "rehydrated from iceberg" } else { "loaded from local cache" }
);
let index = Arc::new(idx);
let state: Shared = Arc::new(AppState {
loaded: Mutex::new(loaded),
index,
warehouse,
});
let addr: SocketAddr = std::env::var("NORNIR_SERVER_ADDR")
.unwrap_or_else(|_| "127.0.0.1:7878".into())
.parse()
.context("parse NORNIR_SERVER_ADDR")?;
eprintln!("nornir-server: listening on {addr}");
let token_arc = Arc::new(token);
let health = HealthServer::with_interceptor(
HealthSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let repos = ReposServer::with_interceptor(
ReposSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let guard_svc = GuardServer::with_interceptor(
GuardSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let release_svc = ReleaseServer::with_interceptor(
ReleaseSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let bench_svc = BenchServer::with_interceptor(
BenchSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let search_svc = SearchServer::with_interceptor(
SearchSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let index_svc = IndexServer::with_interceptor(
IndexSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let introspect_svc = IntrospectServer::with_interceptor(
IntrospectSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let knowledge_svc = KnowledgeServer::with_interceptor(
KnowledgeSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
let docs_svc = DocsServer::with_interceptor(
DocsSvc { state: state.clone() },
auth_interceptor(token_arc.clone()),
);
Server::builder()
.add_service(health)
.add_service(repos)
.add_service(guard_svc)
.add_service(release_svc)
.add_service(bench_svc)
.add_service(search_svc)
.add_service(index_svc)
.add_service(introspect_svc)
.add_service(knowledge_svc)
.add_service(docs_svc)
.serve(addr)
.await
.context("tonic serve")?;
Ok(())
}