use anyhow::{Context, Result};
use super::live::LiveEvent;
use super::model::Timeline;
use crate::warehouse::iceberg::TablePreview;
mod pb {
tonic::include_proto!("nornir.v1");
}
fn endpoint_and_bearer(
endpoint: &str,
token: &str,
) -> Result<(String, tonic::metadata::MetadataValue<tonic::metadata::Ascii>)> {
let endpoint = if endpoint.starts_with("http") {
endpoint.to_string()
} else {
format!("http://{endpoint}")
};
let bearer = format!("Bearer {token}").parse().context("parse bearer token")?;
Ok((endpoint, bearer))
}
fn ws_header(workspace: &str) -> Option<tonic::metadata::MetadataValue<tonic::metadata::Ascii>> {
(!workspace.is_empty()).then(|| workspace.parse().ok()).flatten()
}
pub fn list_workspaces(endpoint: &str, token: &str) -> Result<Vec<String>> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("build tokio runtime for viz client")?;
rt.block_on(async {
let (endpoint, bearer) = endpoint_and_bearer(endpoint, token)?;
let channel = tonic::transport::Channel::from_shared(endpoint.clone())
.with_context(|| format!("invalid server url `{endpoint}`"))?
.connect()
.await
.with_context(|| format!("connect to nornir-server at {endpoint}"))?;
let mut client = pb::workspaces_client::WorkspacesClient::with_interceptor(
channel,
move |mut req: tonic::Request<()>| {
req.metadata_mut().insert("authorization", bearer.clone());
Ok(req)
},
);
let resp = client.list(pb::Empty {}).await.context("Workspaces.List RPC")?.into_inner();
Ok(resp.workspaces.into_iter().map(|w| w.name).collect())
})
}
pub fn fetch_timeline(endpoint: &str, token: &str, workspace: &str) -> Result<Timeline> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("build tokio runtime for viz client")?;
rt.block_on(async {
let endpoint = if endpoint.starts_with("http") {
endpoint.to_string()
} else {
format!("http://{endpoint}")
};
let bearer: tonic::metadata::MetadataValue<tonic::metadata::Ascii> =
format!("Bearer {token}").parse().context("parse bearer token")?;
let ws_md = ws_header(workspace);
let channel = tonic::transport::Channel::from_shared(endpoint.clone())
.with_context(|| format!("invalid server url `{endpoint}`"))?
.connect()
.await
.with_context(|| format!("connect to nornir-server at {endpoint}"))?;
let mut client = pb::viz_client::VizClient::with_interceptor(
channel,
move |mut req: tonic::Request<()>| {
req.metadata_mut().insert("authorization", bearer.clone());
if let Some(ws) = &ws_md {
req.metadata_mut().insert("nornir-workspace", ws.clone());
}
Ok(req)
},
);
let resp = client
.timeline(pb::VizTimelineRequest { workspace: workspace.to_string() })
.await
.context("Viz.Timeline RPC")?
.into_inner();
let timeline: Timeline =
serde_json::from_str(&resp.json).context("decode timeline json from server")?;
Ok(timeline)
})
}
pub fn fetch_release_events(
endpoint: &str,
token: &str,
workspace: &str,
) -> Result<Vec<crate::warehouse::release_events::ReleaseEventRow>> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::viz_client::VizClient::with_interceptor(channel, auth);
let r = c
.release_events(pb::VizTimelineRequest { workspace: String::new() })
.await
.context("Viz.ReleaseEvents RPC")?
.into_inner();
let rows = serde_json::from_str(&r.json).context("decode release_events json from server")?;
Ok(rows)
})
}
pub fn fetch_test_results(
endpoint: &str,
token: &str,
workspace: &str,
) -> Result<Vec<crate::warehouse::test_results::TestResultRow>> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::viz_client::VizClient::with_interceptor(channel, auth);
let r = c
.test_results(pb::VizTimelineRequest { workspace: String::new() })
.await
.context("Viz.TestResults RPC")?
.into_inner();
let rows = serde_json::from_str(&r.json).context("decode test_results json from server")?;
Ok(rows)
})
}
pub fn fetch_bench_live(
endpoint: &str,
token: &str,
workspace: &str,
) -> Result<(Vec<crate::warehouse::iceberg::BenchTelemetryRow>, Vec<crate::bench::BenchRun>)> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::viz_client::VizClient::with_interceptor(channel, auth);
let r = c
.bench_telemetry(pb::VizTimelineRequest { workspace: String::new() })
.await
.context("Viz.BenchTelemetry RPC")?
.into_inner();
let v: serde_json::Value =
serde_json::from_str(&r.json).context("decode bench json from server")?;
let telemetry = serde_json::from_value(v.get("telemetry").cloned().unwrap_or_default())
.context("decode bench_telemetry rows")?;
let runs = serde_json::from_value(v.get("runs").cloned().unwrap_or_default())
.context("decode bench_runs rows")?;
Ok((telemetry, runs))
})
}
pub fn fetch_bakeoff_results(
endpoint: &str,
token: &str,
workspace: &str,
) -> Result<Vec<crate::warehouse::agent_model_runs::AgentModelRunRow>> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::viz_client::VizClient::with_interceptor(channel, auth);
let r = c
.bakeoff_results(pb::VizTimelineRequest { workspace: String::new() })
.await
.context("Viz.BakeoffResults RPC")?
.into_inner();
let rows = serde_json::from_str(&r.json).context("decode agent_model_runs json from server")?;
Ok(rows)
})
}
pub fn submit_bakeoff(
endpoint: &str,
token: &str,
workspace: &str,
rows: &[crate::warehouse::agent_model_runs::AgentModelRunRow],
) -> Result<u32> {
let runs: Vec<pb::AgentModelRunPb> = rows
.iter()
.map(|r| pb::AgentModelRunPb {
run_id: r.run_id.clone(),
ts_micros: r.ts_micros,
agent: r.agent.clone(),
model: r.model.clone(),
prompt_id: r.prompt_id.clone(),
prompt: r.prompt.clone(),
output: r.output.clone(),
latency_ms: r.latency_ms,
tokens_in: r.tokens_in,
tokens_out: r.tokens_out,
tokens_per_s: r.tokens_per_s,
score: r.score,
ok: r.ok,
error: r.error.clone().unwrap_or_default(),
cost_usd: r.cost_usd,
mcp_tool_calls: r.mcp_tool_calls,
})
.collect();
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::telemetry_client::TelemetryClient::with_interceptor(channel, auth);
let r = c
.submit_bakeoff(pb::SubmitBakeoffRequest { runs })
.await
.context("Telemetry.SubmitBakeoff RPC")?
.into_inner();
Ok(r.accepted)
})
}
pub fn submit_test_results(
endpoint: &str,
token: &str,
workspace: &str,
rows: &[crate::warehouse::test_results::TestResultRow],
) -> Result<u32> {
let pb_rows: Vec<pb::TestResultPb> = rows
.iter()
.map(|r| pb::TestResultPb {
run_id: r.run_id.clone(),
repo: r.repo.clone(),
suite: r.suite.clone(),
test_name: r.test_name.clone(),
status: r.status.clone(),
duration_ms: r.duration_ms,
ts_micros: r.ts_micros,
message: r.message.clone(),
aspect: r.aspect.clone(),
metric: r.metric,
})
.collect();
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::telemetry_client::TelemetryClient::with_interceptor(channel, auth);
let r = c
.submit_test_results(pb::SubmitTestResultsRequest { rows: pb_rows })
.await
.context("Telemetry.SubmitTestResults RPC")?
.into_inner();
Ok(r.accepted)
})
}
pub fn submit_release_events(
endpoint: &str,
token: &str,
workspace: &str,
rows: &[crate::warehouse::release_events::ReleaseEventRow],
) -> Result<u32> {
let pb_rows: Vec<pb::ReleaseEventPb> = rows
.iter()
.map(|r| pb::ReleaseEventPb {
run_id: r.run_id.clone(),
seq: r.seq,
ts_micros: r.ts_micros,
component: r.component.clone(),
repo: r.repo.clone(),
op: r.op.clone(),
phase: r.phase.clone(),
status: r.status.clone(),
detail: r.detail.clone(),
has_depends_on: r.depends_on.is_some(),
depends_on: r.depends_on.clone().unwrap_or_default(),
has_elapsed_ms: r.elapsed_ms.is_some(),
elapsed_ms: r.elapsed_ms.unwrap_or_default(),
})
.collect();
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::telemetry_client::TelemetryClient::with_interceptor(channel, auth);
let r = c
.submit_release_events(pb::SubmitReleaseEventsRequest { rows: pb_rows })
.await
.context("Telemetry.SubmitReleaseEvents RPC")?
.into_inner();
Ok(r.accepted)
})
}
pub fn knowledge_summary(
endpoint: &str,
token: &str,
workspace: &str,
) -> Result<super::knowledge::KnowledgeSummary> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::viz_client::VizClient::with_interceptor(channel, auth);
let r = c
.knowledge(pb::VizTimelineRequest { workspace: String::new() })
.await
.context("Viz.Knowledge RPC")?
.into_inner();
let summary = serde_json::from_str(&r.json).context("decode knowledge summary json from server")?;
Ok(summary)
})
}
pub fn stream_progress(
endpoint: &str,
token: &str,
mut on_event: impl FnMut(LiveEvent),
) -> Result<()> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("build tokio runtime for viz live client")?;
rt.block_on(async {
let (endpoint, bearer) = endpoint_and_bearer(endpoint, token)?;
let channel = tonic::transport::Channel::from_shared(endpoint.clone())
.with_context(|| format!("invalid server url `{endpoint}`"))?
.connect()
.await
.with_context(|| format!("connect to nornir-server at {endpoint}"))?;
let mut client = pb::release_client::ReleaseClient::with_interceptor(
channel,
move |mut req: tonic::Request<()>| {
req.metadata_mut().insert("authorization", bearer.clone());
Ok(req)
},
);
let mut stream = client
.progress(pb::Empty {})
.await
.context("Release.Progress RPC")?
.into_inner();
while let Some(ev) = stream.message().await.context("progress stream")? {
if let Some(live) = to_live(ev) {
on_event(live);
}
}
Ok(())
})
}
pub fn fetch_tables(endpoint: &str, token: &str, workspace: &str) -> Result<Vec<String>> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("build tokio runtime for viz client")?;
rt.block_on(async {
let (endpoint, bearer) = endpoint_and_bearer(endpoint, token)?;
let channel = tonic::transport::Channel::from_shared(endpoint.clone())
.with_context(|| format!("invalid server url `{endpoint}`"))?
.connect()
.await
.with_context(|| format!("connect to nornir-server at {endpoint}"))?;
let ws_md = ws_header(workspace);
let mut client = pb::warehouse_client::WarehouseClient::with_interceptor(
channel,
move |mut req: tonic::Request<()>| {
req.metadata_mut().insert("authorization", bearer.clone());
if let Some(ws) = &ws_md {
req.metadata_mut().insert("nornir-workspace", ws.clone());
}
Ok(req)
},
);
let resp = client.tables(pb::Empty {}).await.context("Warehouse.Tables RPC")?.into_inner();
Ok(resp.names)
})
}
pub fn scan_table(endpoint: &str, token: &str, table: &str, limit: u32, workspace: &str) -> Result<TablePreview> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("build tokio runtime for viz client")?;
rt.block_on(async {
let (endpoint, bearer) = endpoint_and_bearer(endpoint, token)?;
let channel = tonic::transport::Channel::from_shared(endpoint.clone())
.with_context(|| format!("invalid server url `{endpoint}`"))?
.connect()
.await
.with_context(|| format!("connect to nornir-server at {endpoint}"))?;
let ws_md = ws_header(workspace);
let mut client = pb::warehouse_client::WarehouseClient::with_interceptor(
channel,
move |mut req: tonic::Request<()>| {
req.metadata_mut().insert("authorization", bearer.clone());
if let Some(ws) = &ws_md {
req.metadata_mut().insert("nornir-workspace", ws.clone());
}
Ok(req)
},
);
let resp = client
.scan(pb::WarehouseScanRequest { table: table.to_string(), limit })
.await
.context("Warehouse.Scan RPC")?
.into_inner();
Ok(TablePreview {
columns: resp.columns,
rows: resp.rows.into_iter().map(|r| r.cells).collect(),
})
})
}
#[derive(Clone)]
pub(crate) struct Auth {
bearer: tonic::metadata::MetadataValue<tonic::metadata::Ascii>,
ws: Option<tonic::metadata::MetadataValue<tonic::metadata::Ascii>>,
}
impl tonic::service::Interceptor for Auth {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
req.metadata_mut().insert("authorization", self.bearer.clone());
if let Some(ws) = &self.ws {
req.metadata_mut().insert("nornir-workspace", ws.clone());
}
Ok(req)
}
}
fn call<T, F, Fut>(endpoint: &str, token: &str, workspace: &str, f: F) -> Result<T>
where
F: FnOnce(tonic::transport::Channel, Auth) -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("build tokio runtime for viz client")?;
rt.block_on(async {
let (endpoint, bearer) = endpoint_and_bearer(endpoint, token)?;
let channel = tonic::transport::Channel::from_shared(endpoint.clone())
.with_context(|| format!("invalid server url `{endpoint}`"))?
.connect()
.await
.with_context(|| format!("connect to nornir-server at {endpoint}"))?;
let auth = Auth { bearer, ws: ws_header(workspace) };
f(channel, auth).await
})
}
#[derive(Clone, Debug, Default)]
pub struct WorkspaceInfo {
pub name: String,
pub mode: String,
pub poll: String,
pub current_snapshot: String,
pub updated_at: String,
pub members: Vec<(String, String)>,
pub freshness: Vec<(String, bool, String)>,
}
#[derive(Clone, Debug)]
pub struct Hit {
pub corpus: String,
pub repo: String,
pub path: String,
pub score: f32,
pub title: String,
pub snippet: String,
}
#[derive(Clone, Debug)]
pub struct KnownSym {
pub crate_name: String,
pub item_kind: String,
pub item_name: String,
pub visibility: String,
pub file: String,
pub line: u32,
pub signature: String,
}
#[derive(Clone, Debug, Default)]
pub struct GateReport {
pub repo: String,
pub passed: Vec<String>,
pub failed: Vec<(String, String)>,
}
#[derive(Clone, Debug)]
pub struct BenchPoint {
pub date: String,
pub version: String,
pub metric: String,
pub value: f64,
}
#[derive(Clone, Debug, Default)]
pub struct ServerInfo {
pub status: String,
pub version: String,
pub repo_count: u32,
}
pub fn ping(endpoint: &str, token: &str) -> Result<ServerInfo> {
call(endpoint, token, "", |channel, auth| async move {
let mut c = pb::health_client::HealthClient::with_interceptor(channel, auth);
let r = c.ping(pb::Empty {}).await.context("Health.Ping RPC")?.into_inner();
Ok(ServerInfo {
status: r.status,
version: r.version,
repo_count: r.repo_count,
})
})
}
pub fn get_workspace(endpoint: &str, token: &str, name: &str) -> Result<WorkspaceInfo> {
call(endpoint, token, "", |channel, auth| async move {
let mut c = pb::workspaces_client::WorkspacesClient::with_interceptor(channel, auth);
let r = c
.get(pb::WorkspaceName { name: name.to_string() })
.await
.context("Workspaces.Get RPC")?
.into_inner();
Ok(WorkspaceInfo {
name: r.name,
mode: r.mode,
poll: r.poll,
current_snapshot: r.current_snapshot,
updated_at: r.updated_at,
freshness: r
.members
.iter()
.map(|m| (m.name.clone(), m.worktree_dirty, m.worktree_digest.clone()))
.collect(),
members: r
.members
.into_iter()
.map(|m| {
let sha = match m.last_seen_sha.char_indices().nth(8) {
Some((byte_idx, _)) => &m.last_seen_sha[..byte_idx],
None => &m.last_seen_sha,
};
let fresh = if m.worktree_dirty { " ⚠ uncommitted" } else { "" };
(m.name, format!("{} @ {sha} [{}]{fresh}", m.remote, m.sync_state))
})
.collect(),
})
})
}
pub fn fetch_workspace(
endpoint: &str,
token: &str,
name: &str,
force: bool,
) -> Result<(u32, Vec<String>, Vec<String>, String)> {
call(endpoint, token, "", |channel, auth| async move {
let mut c = pb::workspaces_client::WorkspacesClient::with_interceptor(channel, auth);
let r = c
.fetch(pb::WorkspaceFetchRequest { name: name.to_string(), force })
.await
.context("Workspaces.Fetch RPC")?
.into_inner();
Ok((r.fetched, r.changed, r.errors, r.snapshot))
})
}
pub fn search(
endpoint: &str,
token: &str,
query: &str,
corpus: &str,
repo: &str,
limit: u32,
workspace: &str,
) -> Result<Vec<Hit>> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::search_client::SearchClient::with_interceptor(channel, auth);
let r = c
.query(pb::SearchRequest {
query: query.to_string(),
corpus: corpus.to_string(),
repo: repo.to_string(),
limit,
})
.await
.context("Search.Query RPC")?
.into_inner();
Ok(r.hits
.into_iter()
.map(|h| Hit {
corpus: h.corpus,
repo: h.repo,
path: h.path,
score: h.score,
title: h.title,
snippet: h.snippet,
})
.collect())
})
}
pub fn knowledge_lookup(
endpoint: &str,
token: &str,
repo: &str,
arg: &str,
limit: u32,
workspace: &str,
) -> Result<Vec<KnownSym>> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::knowledge_client::KnowledgeClient::with_interceptor(channel, auth);
let r = c
.symbol_lookup(pb::KnowledgeSymbolQuery {
repo: repo.to_string(),
arg: arg.to_string(),
limit,
})
.await
.context("Knowledge.SymbolLookup RPC")?
.into_inner();
Ok(r.symbols
.into_iter()
.map(|s| KnownSym {
crate_name: s.crate_name,
item_kind: s.item_kind,
item_name: s.item_name,
visibility: s.visibility,
file: s.file,
line: s.line,
signature: s.signature,
})
.collect())
})
}
pub fn gate_all(endpoint: &str, token: &str, repo: &str, workspace: &str) -> Result<GateReport> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::release_client::ReleaseClient::with_interceptor(channel, auth);
let r = c
.gate_all(pb::RepoOnly { repo: repo.to_string() })
.await
.context("Release.GateAll RPC")?
.into_inner();
Ok(GateReport {
repo: r.repo,
passed: r.passed,
failed: r.failed.into_iter().map(|f| (f.name, f.error)).collect(),
})
})
}
pub fn security_scan(endpoint: &str, token: &str, repo: &str, workspace: &str) -> Result<String> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::mimir_client::MimirClient::with_interceptor(channel, auth);
let r = c
.security_scan(pb::RepoOnly { repo: repo.to_string() })
.await
.context("Mimir.SecurityScan RPC")?
.into_inner();
Ok(r.json)
})
}
pub fn mimir_deps_of(
endpoint: &str,
token: &str,
repo: &str,
workspace: &str,
transitive: bool,
) -> Result<String> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::mimir_client::MimirClient::with_interceptor(channel, auth);
let r = c
.deps_of(pb::DepQuery { repo: repo.to_string(), transitive })
.await
.context("Mimir.DepsOf RPC")?
.into_inner();
Ok(r.json)
})
}
pub fn trace(
endpoint: &str,
token: &str,
repo: &str,
workspace: &str,
) -> Result<String> {
let ws = workspace.to_string();
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::release_client::ReleaseClient::with_interceptor(channel, auth);
let r = c
.trace(pb::TraceQuery { repo: repo.to_string(), workspace: ws })
.await
.context("Release.Trace RPC")?
.into_inner();
Ok(r.json)
})
}
pub fn bench_history(
endpoint: &str,
token: &str,
repo: &str,
workspace: &str,
) -> Result<Vec<BenchPoint>> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::bench_client::BenchClient::with_interceptor(channel, auth);
let r = c
.history(pb::RepoOnly { repo: repo.to_string() })
.await
.context("Bench.History RPC")?
.into_inner();
let mut pts = Vec::new();
for run in r.runs {
for res in run.results {
for kvf in res.metrics {
pts.push(BenchPoint {
date: run.date.clone(),
version: run.version.clone(),
metric: format!("{}::{}", res.name, kvf.key),
value: kvf.value,
});
}
}
}
Ok(pts)
})
}
#[derive(Clone, Debug)]
pub struct VecHit {
pub score: f64,
pub file: String,
pub start_line: u64,
pub end_line: u64,
}
pub fn vector_search(
endpoint: &str,
token: &str,
repo: &str,
query: &str,
limit: u32,
workspace: &str,
) -> Result<Vec<VecHit>> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::vector_client::VectorClient::with_interceptor(channel, auth);
let r = c
.search(pb::VectorSearchRequest {
repo: repo.to_string(),
query: query.to_string(),
sha: String::new(),
limit,
})
.await
.context("Vector.Search RPC")?
.into_inner();
let v: serde_json::Value =
serde_json::from_str(&r.json).context("decode vector hits json")?;
let hits = v
.get("hits")
.and_then(|h| h.as_array())
.map(|arr| {
arr.iter()
.map(|h| VecHit {
score: h.get("score").and_then(|x| x.as_f64()).unwrap_or(0.0),
file: h.get("file").and_then(|x| x.as_str()).unwrap_or("").to_string(),
start_line: h.get("start_line").and_then(|x| x.as_u64()).unwrap_or(0),
end_line: h.get("end_line").and_then(|x| x.as_u64()).unwrap_or(0),
})
.collect()
})
.unwrap_or_default();
Ok(hits)
})
}
pub fn index_stats(endpoint: &str, token: &str, workspace: &str) -> Result<(u64, Vec<(String, String)>)> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::index_client::IndexClient::with_interceptor(channel, auth);
let r = c.stats(pb::Empty {}).await.context("Index.Stats RPC")?.into_inner();
Ok((r.total, r.by_corpus.into_iter().map(|kv| (kv.key, kv.value)).collect()))
})
}
pub fn knowledge_calls(
endpoint: &str,
token: &str,
repo: &str,
name: &str,
callers: bool,
limit: u32,
workspace: &str,
) -> Result<Vec<(String, String, String, u32)>> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::knowledge_client::KnowledgeClient::with_interceptor(channel, auth);
let q = pb::KnowledgeCallQuery { repo: repo.to_string(), name: name.to_string(), limit };
let calls = if callers {
c.callers(q).await.context("Knowledge.Callers RPC")?
} else {
c.callees(q).await.context("Knowledge.Callees RPC")?
}
.into_inner()
.calls;
Ok(calls.into_iter().map(|k| (k.caller_path, k.callee_ident, k.file, k.line)).collect())
})
}
pub fn knowledge_call_path(
endpoint: &str,
token: &str,
repo: &str,
from: &str,
to: &str,
workspace: &str,
) -> Result<Vec<String>> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::knowledge_client::KnowledgeClient::with_interceptor(channel, auth);
let r = c
.call_path(pb::KnowledgeCallPathQuery {
repo: repo.to_string(),
from: from.to_string(),
to: to.to_string(),
})
.await
.context("Knowledge.CallPath RPC")?
.into_inner();
Ok(r.names)
})
}
pub fn funnel_show(
endpoint: &str,
token: &str,
workspace: &str,
) -> Result<super::funnel_view::FunnelView> {
use super::funnel_view::{FunnelView, NodeStat, NodeView, PlanView};
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::funnel_client::FunnelClient::with_interceptor(channel, auth);
let dump = c.show(pb::Empty {}).await.context("Funnel.Show RPC")?.into_inner();
let mut plans = Vec::new();
for idea in dump.ideas {
for plan in idea.plans {
let nodes = plan
.nodes
.into_iter()
.map(|n| NodeView {
id: n.id,
kind: n.kind,
title: n.title,
status: NodeStat::parse(&n.status),
targets: Vec::new(), deps: n.deps,
})
.collect();
plans.push(PlanView {
id: plan.id,
summary: plan.summary,
status: plan.status.to_ascii_lowercase(),
idea_text: idea.text.clone(),
nodes,
});
}
}
plans.sort_by(|a, b| a.id.cmp(&b.id));
Ok(FunnelView { plans })
})
}
pub fn funnel_submit(
endpoint: &str,
token: &str,
workspace: &str,
text: &str,
item_kind: &str,
source: &str,
) -> Result<String> {
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::funnel_client::FunnelClient::with_interceptor(channel, auth);
let r = c
.submit_idea(pb::SubmitIdeaRequest {
text: text.to_string(),
source: source.to_string(),
item_kind: item_kind.to_string(),
})
.await
.context("Funnel.SubmitIdea RPC")?
.into_inner();
Ok(r.id)
})
}
pub fn funnel_history(
endpoint: &str,
token: &str,
workspace: &str,
kind: &str,
status: &str,
) -> Result<Vec<super::funnel_view::HistoryItemView>> {
use super::funnel_view::HistoryItemView;
call(endpoint, token, workspace, |channel, auth| async move {
let mut c = pb::funnel_client::FunnelClient::with_interceptor(channel, auth);
let resp = c
.history(pb::FunnelHistoryRequest {
kind: kind.to_string(),
status: status.to_string(),
limit: 0,
})
.await
.context("Funnel.History RPC")?
.into_inner();
Ok(resp
.items
.into_iter()
.map(|it| HistoryItemView {
id: it.id,
item_kind: it.item_kind,
text: it.text,
source: it.source,
submitted_at: it.submitted_at,
status: it.status,
plan_ids: it.plan_ids,
})
.collect())
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[ignore]
fn live_list_workspaces() {
let ep = std::env::var("NORNIR_SERVER").expect("set NORNIR_SERVER");
let tok = std::env::var("NORNIR_SERVER_TOKEN").expect("set NORNIR_SERVER_TOKEN");
let ws = list_workspaces(&ep, &tok).expect("list_workspaces should succeed");
eprintln!("live workspaces: {ws:?}");
assert!(!ws.is_empty(), "server returned no workspaces");
}
#[test]
fn release_events_wire_roundtrip() {
use crate::warehouse::release_events::{status, ReleaseEventRow};
let rows = vec![
ReleaseEventRow {
run_id: "run-42".into(),
seq: 0,
ts_micros: 1_000,
component: "znippy".into(),
repo: "znippy".into(),
op: "test".into(),
phase: "start".into(),
status: status::RUNNING.into(),
detail: String::new(),
depends_on: Some(vec![]),
elapsed_ms: None,
},
ReleaseEventRow {
run_id: "run-42".into(),
seq: 1,
ts_micros: 2_000,
component: "holger".into(),
repo: "holger".into(),
op: "gate".into(),
phase: "end".into(),
status: status::OK.into(),
detail: "all gates green".into(),
depends_on: Some(vec!["znippy".into()]),
elapsed_ms: Some(1234),
},
];
let json = serde_json::to_string(&rows).unwrap();
let back: Vec<ReleaseEventRow> = serde_json::from_str(&json).unwrap();
assert_eq!(back, rows, "release_events wire round-trip must be lossless");
assert_eq!(back[1].depends_on.as_deref(), Some(&["znippy".to_string()][..]));
assert_eq!(back[1].elapsed_ms, Some(1234));
assert_eq!(back[1].detail, "all gates green");
}
#[test]
fn test_results_wire_roundtrip() {
use crate::warehouse::test_results::{status, TestResultRow};
let rows = vec![
TestResultRow {
run_id: "run-t1".into(),
ts_micros: 1_000,
repo: "znippy".into(),
suite: "unit".into(),
test_name: "compress_roundtrip".into(),
status: status::PASS.into(),
duration_ms: 12.0,
message: String::new(),
aspect: "unit".into(),
metric: 0.0,
},
TestResultRow {
run_id: "run-t1".into(),
ts_micros: 2_000,
repo: "znippy".into(),
suite: "clippy".into(),
test_name: "clippy::all".into(),
status: status::FAIL.into(),
duration_ms: 88.0,
message: "error[E0277]: ...".into(),
aspect: "clippy".into(),
metric: 3.0,
},
];
let json = serde_json::to_string(&rows).unwrap();
let back: Vec<TestResultRow> = serde_json::from_str(&json).unwrap();
assert_eq!(back, rows, "test_results wire round-trip must be lossless");
assert_eq!(back[0].status, status::PASS);
assert_eq!(back[0].duration_ms, 12.0);
assert_eq!(back[1].status, status::FAIL);
assert_eq!(back[1].message, "error[E0277]: ...");
assert_eq!(back[1].metric, 3.0);
}
#[test]
fn bench_telemetry_wire_roundtrip() {
use crate::bench::{BenchRun, TestOutcome};
use crate::warehouse::iceberg::BenchTelemetryRow;
let telemetry = vec![BenchTelemetryRow {
run_id: "run-b1".into(),
repo: "znippy".into(),
bench: "znippy.compress".into(),
n_cores: 8,
cpu_pct_avg: 51.5,
cpu_pct_max: 92.0,
cores_busy_avg: 6.5,
cores_busy_max: 8,
mem_peak_mb: 256.0,
mem_pct_max: 30.0,
elapsed_ms: 1234.0,
}];
let runs = vec![BenchRun {
date: "2026-06-14".into(),
timestamp: None,
version: "1.0".into(),
machine: "host".into(),
cores: 8,
results: Vec::new(),
tests: vec![TestOutcome {
name: "compress".into(),
passed: false,
duration_ms: Some(12.0),
message: None,
}],
}];
let json = serde_json::json!({ "telemetry": telemetry, "runs": runs }).to_string();
let v: serde_json::Value = serde_json::from_str(&json).unwrap();
let back_telem: Vec<BenchTelemetryRow> =
serde_json::from_value(v.get("telemetry").cloned().unwrap()).unwrap();
let back_runs: Vec<BenchRun> =
serde_json::from_value(v.get("runs").cloned().unwrap()).unwrap();
assert_eq!(back_telem, telemetry, "bench_telemetry wire round-trip must be lossless");
assert_eq!(back_telem[0].bench, "znippy.compress");
assert_eq!(back_telem[0].n_cores, 8);
assert_eq!(back_telem[0].cores_busy_max, 8);
assert!((back_telem[0].cores_busy_avg - 6.5).abs() < 1e-9);
assert!((back_telem[0].mem_peak_mb - 256.0).abs() < 1e-9);
assert_eq!(back_runs.len(), 1);
assert_eq!(back_runs[0].date, "2026-06-14");
assert_eq!(back_runs[0].cores, 8);
assert_eq!(back_runs[0].tests.len(), 1);
assert_eq!(back_runs[0].tests[0].name, "compress");
assert!(!back_runs[0].tests[0].passed, "failed test survives the wire");
}
#[test]
fn bakeoff_results_wire_roundtrip() {
use crate::warehouse::agent_model_runs::AgentModelRunRow;
let rows = vec![AgentModelRunRow {
run_id: "bake-1".into(),
ts_micros: 9_000,
agent: "claude".into(),
model: "mistral".into(),
prompt_id: "p-sum".into(),
prompt: "summarize".into(),
output: "ok".into(),
latency_ms: 120.5,
tokens_in: 10,
tokens_out: 42,
tokens_per_s: 88.3,
score: 0.91,
ok: true,
error: None,
cost_usd: 0.0123,
mcp_tool_calls: 5,
}];
let json = serde_json::to_string(&rows).unwrap();
let back: Vec<AgentModelRunRow> = serde_json::from_str(&json).unwrap();
assert_eq!(back, rows, "bakeoff wire round-trip must be lossless");
assert_eq!(back[0].tokens_per_s, 88.3);
assert_eq!(back[0].score, 0.91);
assert_eq!(back[0].agent, "claude");
assert_eq!(back[0].cost_usd, 0.0123);
assert_eq!(back[0].mcp_tool_calls, 5);
let legacy = r#"[{"run_id":"r","ts_micros":1,"model":"m","prompt_id":"p","prompt":"q","output":"o","latency_ms":1.0,"tokens_in":1,"tokens_out":1,"tokens_per_s":1.0,"score":0.5,"ok":true,"error":null}]"#;
let decoded: Vec<AgentModelRunRow> = serde_json::from_str(legacy).unwrap();
assert_eq!(decoded[0].agent, "-", "legacy row defaults agent to AGENT_UNSET");
assert_eq!(decoded[0].cost_usd, 0.0);
assert_eq!(decoded[0].mcp_tool_calls, 0);
}
#[test]
fn knowledge_summary_wire_roundtrip() {
use super::super::knowledge::{Bubble, KnowledgeSummary, RepoScanSummary};
let summary = KnowledgeSummary {
repos: vec![RepoScanSummary {
repo: "znippy".into(),
ok: true,
symbols: 120,
calls: 88,
features: 3,
git_files: 40,
error: None,
}],
crates: vec![Bubble {
repo: "znippy".into(),
krate: "znippy".into(),
symbols: 120,
calls: 88,
files: 40,
gates: 3,
heat_30d: 7,
}],
};
let json = serde_json::to_string(&summary).unwrap();
let back: KnowledgeSummary = serde_json::from_str(&json).unwrap();
assert_eq!(back.crates.len(), 1);
assert_eq!(back.crates[0].symbols, 120);
assert_eq!(back.repos[0].calls, 88);
assert!(back.repos[0].ok);
}
#[test]
#[ignore]
fn live_release_events() {
let ep = std::env::var("NORNIR_SERVER").expect("set NORNIR_SERVER");
let tok = std::env::var("NORNIR_SERVER_TOKEN").expect("set NORNIR_SERVER_TOKEN");
let ws = std::env::var("NORNIR_WS").unwrap_or_else(|_| "holger".into());
let rows = fetch_release_events(&ep, &tok, &ws).expect("fetch_release_events");
eprintln!("live release_events ws={ws}: {} rows", rows.len());
}
#[test]
#[ignore]
fn live_bakeoff_results() {
let ep = std::env::var("NORNIR_SERVER").expect("set NORNIR_SERVER");
let tok = std::env::var("NORNIR_SERVER_TOKEN").expect("set NORNIR_SERVER_TOKEN");
let ws = std::env::var("NORNIR_WS").unwrap_or_else(|_| "holger".into());
let rows = fetch_bakeoff_results(&ep, &tok, &ws).expect("fetch_bakeoff_results");
eprintln!("live agent_model_runs ws={ws}: {} rows", rows.len());
}
#[test]
#[ignore]
fn live_knowledge_summary() {
let ep = std::env::var("NORNIR_SERVER").expect("set NORNIR_SERVER");
let tok = std::env::var("NORNIR_SERVER_TOKEN").expect("set NORNIR_SERVER_TOKEN");
let ws = std::env::var("NORNIR_WS").unwrap_or_else(|_| "holger".into());
let s = knowledge_summary(&ep, &tok, &ws).expect("knowledge_summary");
eprintln!("live knowledge ws={ws}: {} crates", s.crates.len());
assert!(!s.crates.is_empty(), "remote knowledge map empty for {ws}");
}
#[test]
#[ignore]
fn live_funnel_intake_and_history() {
let ep = std::env::var("NORNIR_SERVER").expect("set NORNIR_SERVER");
let tok = std::env::var("NORNIR_SERVER_TOKEN").expect("set NORNIR_SERVER_TOKEN");
let ws = std::env::var("NORNIR_WS").unwrap_or_else(|_| "znippy".into());
let marker = format!("funnel-intake live test {}", chrono::Utc::now().to_rfc3339());
let id = funnel_submit(&ep, &tok, &ws, &marker, "error", "live-test")
.expect("Funnel.SubmitIdea(error) must succeed");
eprintln!("submitted error id={id} ws={ws}");
let errs = funnel_history(&ep, &tok, &ws, "error", "").expect("Funnel.History must succeed");
let mine = errs.iter().find(|it| it.id == id).expect("submitted error in history");
assert_eq!(mine.item_kind, "error");
assert_eq!(mine.text, marker);
eprintln!("history error_count={} (found mine, kind={})", errs.len(), mine.item_kind);
}
#[test]
#[ignore]
fn live_funnel_show() {
let ep = std::env::var("NORNIR_SERVER").expect("set NORNIR_SERVER");
let tok = std::env::var("NORNIR_SERVER_TOKEN").expect("set NORNIR_SERVER_TOKEN");
let ws = std::env::var("NORNIR_WS").unwrap_or_else(|_| "holger".into());
match funnel_show(&ep, &tok, &ws) {
Ok(v) => eprintln!("funnel_show ws={ws} OK: {} plans", v.plans.len()),
Err(e) => {
eprintln!("funnel_show ws={ws} ERR: {e:#}");
panic!("funnel_show failed: {e:#}");
}
}
}
}
fn to_live(ev: pb::ProgressEvent) -> Option<LiveEvent> {
use pb::progress_event::Kind;
Some(match ev.kind? {
Kind::RunStart(x) => LiveEvent::RunStart { run_id: x.run_id, workspace: x.workspace },
Kind::RepoStart(x) => LiveEvent::RepoStart { repo: x.repo, sha: x.sha },
Kind::PhaseStart(x) => LiveEvent::PhaseStart { repo: x.repo, phase: x.phase },
Kind::PhaseEnd(x) => LiveEvent::PhaseEnd {
repo: x.repo,
phase: x.phase,
ok: x.ok,
duration_ms: x.duration_ms,
},
Kind::BinaryStart(x) => LiveEvent::BinaryStart { repo: x.repo, binary: x.binary },
Kind::TestPass(x) => LiveEvent::TestPass { repo: x.repo, binary: x.binary, name: x.name },
Kind::TestFail(x) => LiveEvent::TestFail { repo: x.repo, binary: x.binary, name: x.name },
Kind::BinaryDone(x) => LiveEvent::BinaryDone {
repo: x.repo,
binary: x.binary,
passed: x.passed,
failed: x.failed,
},
Kind::RepoEnd(x) => LiveEvent::RepoEnd { repo: x.repo, ok: x.ok },
Kind::RunEnd(x) => LiveEvent::RunEnd { run_id: x.run_id, ok: x.ok },
})
}