use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use axum::{
extract::{Query, State},
http::{HeaderMap, StatusCode},
response::{sse::{Event, KeepAlive, Sse}, IntoResponse, Json},
routing::{get, post},
Router,
};
use futures::stream::Stream;
use serde::Deserialize;
use serde_json::{json, Value};
use tokio::sync::Mutex;
use nornir::config::{self, Loaded};
use nornir::{bench, docs, guard, index, introspect, release};
use nornir::warehouse::iceberg::IcebergWarehouse;
struct AppState {
loaded: Mutex<Loaded>,
token: String,
index: Arc<index::Index>,
warehouse: Arc<IcebergWarehouse>,
}
type Shared = Arc<AppState>;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "nornir_server=info,tower_http=info".into()),
)
.with_writer(std::io::stderr)
.init();
let token = std::env::var("NORNIR_SERVER_TOKEN").map_err(|_| {
anyhow!(
"NORNIR_SERVER_TOKEN env var required. Future: mTLS with CN=<username>; \
see `nornir-server --help` and crate docs."
)
})?;
if token.len() < 16 {
return Err(anyhow!("NORNIR_SERVER_TOKEN must be ≥ 16 chars"));
}
let config_path = std::env::var_os("NORNIR_CONFIG").map(PathBuf::from);
let loaded = match config_path {
Some(p) => config::load_explicit(&p)?,
None => config::discover(&std::env::current_dir()?)?,
};
eprintln!(
"nornir-server: loaded {} from {}",
loaded.nornir.repo.len(),
loaded.config_path.display()
);
let warehouse_root = if loaded.nornir.storage.local_path.is_empty() {
loaded.workspace_root.join("workspace_holger/.nornir/warehouse")
} else {
loaded.workspace_root.join(&loaded.nornir.storage.local_path).join("warehouse")
};
let warehouse_root_for_open = warehouse_root.clone();
let warehouse = tokio::task::spawn_blocking(move || {
IcebergWarehouse::open(&warehouse_root_for_open)
})
.await
.context("spawn warehouse open")?
.with_context(|| format!("open iceberg warehouse at {}", warehouse_root.display()))?;
let warehouse = Arc::new(warehouse);
eprintln!("nornir-server: warehouse online at {}", warehouse_root.display());
let ws_root = loaded.workspace_root.clone();
let wh_for_open = warehouse.clone();
let (idx, restored) = tokio::task::spawn_blocking(move || {
index::Index::open_or_restore(&ws_root, &wh_for_open, "_workspace", None)
})
.await
.context("spawn index open")?
.context("warm Tantivy index")?;
eprintln!(
"nornir-server: index online ({})",
if restored { "rehydrated from iceberg" } else { "loaded from local cache" }
);
let index = Arc::new(idx);
let state: Shared = Arc::new(AppState {
loaded: Mutex::new(loaded),
token,
index,
warehouse,
});
let app = Router::new()
.route("/healthz", get(healthz))
.route("/repos/list", post(repos_list))
.route("/guard/status", post(guard_status))
.route("/guard/apply", post(guard_apply))
.route("/guard/release", post(guard_release))
.route("/bench/history", post(bench_history))
.route("/release/gate/path-patches", post(gate_path_patches))
.route("/release/gate/nexus-floor", post(gate_nexus_floor))
.route("/release/gate/no-regression", post(gate_no_regression))
.route("/release/gate/docs-fresh", post(gate_docs_fresh))
.route("/release/gate/all", post(gate_all))
.route("/release/order", post(release_order))
.route("/release/progress", get(release_progress_sse))
.route("/search", post(search))
.route("/index/stats", post(index_stats))
.route("/index/snapshot", post(index_snapshot))
.route("/introspect/symbols", post(introspect_symbols))
.route("/introspect/symbol-lookup", post(introspect_symbol_lookup))
.route("/introspect/defined-in", post(introspect_defined_in))
.route("/introspect/callers", post(introspect_callers))
.route("/introspect/callees", post(introspect_callees))
.route("/introspect/path-between", post(introspect_path_between))
.route("/docs/init", post(docs_init))
.route("/docs/render", post(docs_render))
.route("/docs/check", post(docs_check))
.route("/docs/history", post(docs_history))
.layer(tower_http::trace::TraceLayer::new_for_http())
.with_state(state);
let addr: SocketAddr = std::env::var("NORNIR_SERVER_ADDR")
.unwrap_or_else(|_| "127.0.0.1:7878".into())
.parse()
.context("parse NORNIR_SERVER_ADDR")?;
eprintln!("nornir-server: listening on {addr}");
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
struct ApiErr(StatusCode, String);
impl IntoResponse for ApiErr {
fn into_response(self) -> axum::response::Response {
(self.0, Json(json!({"error": self.1}))).into_response()
}
}
fn ise<E: std::fmt::Display>(e: E) -> ApiErr {
ApiErr(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
}
fn auth(headers: &HeaderMap, token: &str) -> Result<(), ApiErr> {
let h = headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.unwrap_or("");
let supplied = h.strip_prefix("Bearer ").unwrap_or("");
if supplied.is_empty() || supplied != token {
return Err(ApiErr(StatusCode::UNAUTHORIZED, "missing or invalid bearer token".into()));
}
Ok(())
}
#[derive(Deserialize)]
struct EmptyReq {}
#[derive(Deserialize)]
struct RepoReq {
repo: String,
}
#[derive(Deserialize)]
struct SearchReq {
query: String,
#[serde(default)]
corpus: Option<String>,
#[serde(default)]
repo: Option<String>,
#[serde(default)]
limit: Option<usize>,
}
#[derive(Deserialize)]
struct BinaryReq {
binary: PathBuf,
}
#[derive(Deserialize)]
struct LookupReq {
binary: PathBuf,
pattern: String,
#[serde(default)]
limit: Option<usize>,
}
#[derive(Deserialize)]
struct DefinedInReq {
binary: PathBuf,
suffix: String,
}
#[derive(Deserialize)]
struct CallQueryReq {
binary: PathBuf,
name: String,
}
#[derive(Deserialize)]
struct PathReq {
binary: PathBuf,
from: String,
to: String,
}
async fn healthz() -> &'static str {
"ok"
}
async fn repos_list(
State(s): State<Shared>,
headers: HeaderMap,
_: Json<EmptyReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let v: Vec<Value> = l
.nornir
.repo
.iter()
.map(|(name, r)| {
json!({
"name": name,
"remote": r.remote,
"history": r.history,
"readme": r.readme,
})
})
.collect();
Ok(Json(json!({"repos": v})))
}
async fn guard_status(
State(s): State<Shared>,
headers: HeaderMap,
_: Json<EmptyReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let report = guard::status(&l.workspace_root, &l.nornir.guard.forbidden);
Ok(Json(serde_json::to_value(&report).map_err(ise)?))
}
async fn guard_apply(
State(s): State<Shared>,
headers: HeaderMap,
_: Json<EmptyReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let report = guard::apply(&l.workspace_root, &l.nornir.guard.forbidden).map_err(ise)?;
let locked = report.iter().filter(|p| p.changed).count();
Ok(Json(json!({"locked": locked, "paths": report})))
}
async fn guard_release(
State(s): State<Shared>,
headers: HeaderMap,
_: Json<EmptyReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let report = guard::release(&l.workspace_root, &l.nornir.guard.forbidden).map_err(ise)?;
let unlocked = report.iter().filter(|p| p.changed).count();
Ok(Json(json!({"unlocked": unlocked, "paths": report})))
}
async fn bench_history(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<RepoReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let repo = l.nornir.repo.get(&req.repo).ok_or_else(|| ApiErr(StatusCode::NOT_FOUND, format!("repo `{}` not configured", req.repo)))?;
let root = config::Nornir::repo_dir(&l.workspace_root, &req.repo);
let path = root.join(if repo.history.is_empty() { "bench_history.jsonl" } else { &repo.history });
let runs = bench::history::read_all(&path).map_err(ise)?;
Ok(Json(json!({"runs": runs})))
}
fn last_run(l: &Loaded, repo_name: &str) -> Result<(PathBuf, bench::BenchRun), ApiErr> {
let repo = l.nornir.repo.get(repo_name).ok_or_else(|| ApiErr(StatusCode::NOT_FOUND, format!("repo `{repo_name}` not configured")))?;
let root = config::Nornir::repo_dir(&l.workspace_root, repo_name);
let path = root.join(if repo.history.is_empty() { "bench_history.jsonl" } else { &repo.history });
let runs = bench::history::read_all(&path).map_err(ise)?;
let run = runs.into_iter().last().ok_or_else(|| ApiErr(StatusCode::NOT_FOUND, format!("no bench runs in {}", path.display())))?;
Ok((root, run))
}
async fn gate_path_patches(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<RepoReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let root = config::Nornir::repo_dir(&l.workspace_root, &req.repo);
release::gate::no_path_patches(&root).map_err(ise)?;
Ok(Json(json!({"status": "pass", "gate": "no_path_patches"})))
}
async fn gate_nexus_floor(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<RepoReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let (_, run) = last_run(&l, &req.repo)?;
release::gate::nexus_floor(&run).map_err(ise)?;
Ok(Json(json!({"status": "pass", "gate": "nexus_floor", "version": run.version})))
}
async fn gate_no_regression(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<RepoReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let repo = l.nornir.repo.get(&req.repo).ok_or_else(|| ApiErr(StatusCode::NOT_FOUND, format!("repo `{}` not configured", req.repo)))?;
let (root, run) = last_run(&l, &req.repo)?;
let pct = if repo.gates.max_regression_pct > 0.0 { repo.gates.max_regression_pct } else { 10.0 };
let hp = root.join(if repo.history.is_empty() { "bench_history.jsonl" } else { &repo.history });
release::gate::no_regression(&run, &hp, pct).map_err(ise)?;
Ok(Json(json!({"status": "pass", "gate": "no_regression", "max_drop_pct": pct})))
}
async fn gate_docs_fresh(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<RepoReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let (root, run) = last_run(&l, &req.repo)?;
let layout = docs::RepoLayout::new(&root);
let ctx = docs::Ctx { repo_root: &root, workspace_root: &l.workspace_root, run: Some(&run) };
docs::render_check_all(&layout, &ctx).map_err(ise)?;
Ok(Json(json!({"status": "pass", "gate": "docs_fresh"})))
}
async fn gate_all(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<RepoReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let repo = l.nornir.repo.get(&req.repo).ok_or_else(|| ApiErr(StatusCode::NOT_FOUND, format!("repo `{}` not configured", req.repo)))?;
let root = config::Nornir::repo_dir(&l.workspace_root, &req.repo);
let g = &repo.gates;
let mut passed: Vec<String> = Vec::new();
let mut failed: Vec<Value> = Vec::new();
macro_rules! push {
($n:expr, $r:expr) => {
match $r { Ok(()) => passed.push($n.into()), Err(e) => failed.push(json!({"name": $n, "error": format!("{e:#}")})) }
};
}
if g.no_path_patches { push!("no_path_patches", release::gate::no_path_patches(&root)); }
let last_pair = last_run(&l, &req.repo);
let last_run_val: Option<bench::BenchRun> = last_pair.as_ref().ok().map(|(_, r)| r.clone());
if g.nexus_floor {
let r = last_run_val.as_ref().ok_or_else(|| anyhow::anyhow!("no bench runs")).and_then(|r| release::gate::nexus_floor(r));
push!("nexus_floor", r);
}
if g.no_regression {
let pct = if g.max_regression_pct > 0.0 { g.max_regression_pct } else { 10.0 };
let hp = root.join(if repo.history.is_empty() { "bench_history.jsonl" } else { &repo.history });
let r = last_run_val.as_ref().ok_or_else(|| anyhow::anyhow!("no bench runs")).and_then(|r| release::gate::no_regression(r, &hp, pct));
push!("no_regression", r);
}
if !g.integration_roundtrip.is_empty() {
let kinds: Vec<&str> = g.integration_roundtrip.iter().map(|s| s.as_str()).collect();
push!("integration_roundtrip",
release::gate::integration_roundtrip_via_cargo_test(&root, &kinds));
}
if g.docs_fresh {
let r: anyhow::Result<()> = (|| {
let run = last_run_val.as_ref().ok_or_else(|| anyhow::anyhow!("no bench runs"))?;
let layout = docs::RepoLayout::new(&root);
let ctx = docs::Ctx { repo_root: &root, workspace_root: &l.workspace_root, run: Some(run) };
docs::render_check_all(&layout, &ctx)
})();
push!("docs_fresh", r);
}
Ok(Json(json!({"repo": req.repo, "passed": passed, "failed": failed})))
}
async fn search(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<SearchReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let corpus = match req.corpus.as_deref() {
None | Some("") => None,
Some(c) => Some(index::Corpus::parse(c).ok_or_else(|| ApiErr(StatusCode::BAD_REQUEST, format!("unknown corpus `{c}`")))?),
};
let hits = s.index.search(&req.query, corpus, req.repo.as_deref(), req.limit.unwrap_or(20)).map_err(ise)?;
Ok(Json(json!({"hits": hits})))
}
async fn index_stats(
State(s): State<Shared>,
headers: HeaderMap,
_: Json<EmptyReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let stats = s.index.stats().map_err(ise)?;
Ok(Json(json!({
"total": stats.total,
"by_corpus": stats.by_corpus,
})))
}
#[derive(Deserialize)]
struct SnapshotReq {
#[serde(default)] repo: Option<String>,
#[serde(default)] workspace: Option<String>,
git_sha: String,
#[serde(default)] branch: Option<String>,
}
async fn index_snapshot(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<SnapshotReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let workspace = req.workspace.as_deref().unwrap_or("workspace_holger");
let repo = req.repo.as_deref().unwrap_or("_workspace");
let branch = req.branch.as_deref().unwrap_or("unknown");
let index_dir = l.workspace_root.join(".nornir/cache/index");
drop(l);
let snap = tokio::task::spawn_blocking({
let wh = s.warehouse.clone();
let workspace = workspace.to_string();
let repo = repo.to_string();
let sha = req.git_sha.clone();
let branch = branch.to_string();
move || nornir::index::snapshot::snapshot_to_iceberg(
&wh, &workspace, &repo, &sha, &branch, &index_dir,
)
})
.await
.map_err(ise)?
.map_err(ise)?;
Ok(Json(json!({
"snapshot_id": snap.snapshot_id,
"workspace": snap.workspace,
"repo": snap.repo,
"git_sha": snap.git_sha,
"branch": snap.branch,
"schema_hash": snap.schema_hash,
"blob_count": snap.blob_count,
"total_bytes": snap.total_bytes,
})))
}
#[derive(Deserialize)]
struct ReleaseOrderReq {
#[serde(default)] workspace: Option<String>,
}
async fn release_order(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<ReleaseOrderReq>,
) -> Result<Json<Value>, ApiErr> {
use nornir::warehouse::dep_graph::{query_dep_graph_snapshots, topo_order_from_edges};
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let all: Vec<String> = l.nornir.repo.keys().cloned().collect();
drop(l);
let workspace = req.workspace.unwrap_or_else(|| "workspace_holger".to_string());
let wh = s.warehouse.clone();
let (order, source, snapshot_id) = tokio::task::spawn_blocking(move || -> anyhow::Result<_> {
let snaps = wh.block_on(query_dep_graph_snapshots(&wh, &workspace, None))?;
Ok(match snaps.into_iter().last() {
Some(snap) => (
topo_order_from_edges(&all, &snap.edges),
"iceberg",
Some(snap.snapshot_id.to_string()),
),
None => (all, "fallback", None),
})
})
.await
.map_err(ise)?
.map_err(ise)?;
Ok(Json(json!({
"order": order,
"source": source,
"snapshot_id": snapshot_id,
})))
}
fn resolve_binary(workspace_root: &std::path::Path, p: &std::path::Path) -> PathBuf {
if p.is_absolute() || p.exists() { p.to_path_buf() } else { workspace_root.join(p) }
}
async fn introspect_symbols(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<BinaryReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let bin = resolve_binary(&l.workspace_root, &req.binary);
let syms = introspect::artifact::extract_symbols(&bin, &l.workspace_root).map_err(ise)?;
Ok(Json(json!({"symbols": syms})))
}
async fn introspect_symbol_lookup(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<LookupReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let bin = resolve_binary(&l.workspace_root, &req.binary);
let syms = introspect::artifact::extract_symbols(&bin, &l.workspace_root).map_err(ise)?;
let mut hits: Vec<_> = introspect::artifact::lookup(&syms, &req.pattern).into_iter().cloned().collect();
hits.truncate(req.limit.unwrap_or(50));
Ok(Json(json!({"matches": hits})))
}
async fn introspect_defined_in(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<DefinedInReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let bin = resolve_binary(&l.workspace_root, &req.binary);
let syms = introspect::artifact::extract_symbols(&bin, &l.workspace_root).map_err(ise)?;
let hits: Vec<_> = introspect::artifact::defined_in(&syms, &req.suffix).into_iter().cloned().collect();
Ok(Json(json!({"matches": hits})))
}
async fn introspect_callers(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<CallQueryReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let bin = resolve_binary(&l.workspace_root, &req.binary);
let edges = introspect::callgraph_dwarf::extract_callgraph(&bin, &l.workspace_root).map_err(ise)?;
let cg = introspect::callgraph_dwarf::Callgraph::from_edges(&edges);
Ok(Json(json!({"callers": cg.callers_of(&req.name)})))
}
async fn introspect_callees(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<CallQueryReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let bin = resolve_binary(&l.workspace_root, &req.binary);
let edges = introspect::callgraph_dwarf::extract_callgraph(&bin, &l.workspace_root).map_err(ise)?;
let cg = introspect::callgraph_dwarf::Callgraph::from_edges(&edges);
Ok(Json(json!({"callees": cg.callees_of(&req.name)})))
}
async fn introspect_path_between(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<PathReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let bin = resolve_binary(&l.workspace_root, &req.binary);
let edges = introspect::callgraph_dwarf::extract_callgraph(&bin, &l.workspace_root).map_err(ise)?;
let cg = introspect::callgraph_dwarf::Callgraph::from_edges(&edges);
Ok(Json(json!({"path": cg.path_between(&req.from, &req.to)})))
}
async fn docs_init(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<RepoReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let _ = l.nornir.repo.get(&req.repo).ok_or_else(|| ApiErr(StatusCode::NOT_FOUND, format!("repo `{}` not configured", req.repo)))?;
let root = config::Nornir::repo_dir(&l.workspace_root, &req.repo);
let layout = docs::RepoLayout::new(&root);
let srcs = docs::init_repo(&layout).map_err(ise)?;
Ok(Json(json!({
"repo": req.repo,
"nornir_dir": layout.nornir_dir(),
"sources": srcs,
})))
}
async fn docs_render(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<RepoReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let repo = l.nornir.repo.get(&req.repo).ok_or_else(|| ApiErr(StatusCode::NOT_FOUND, format!("repo `{}` not configured", req.repo)))?;
let root = config::Nornir::repo_dir(&l.workspace_root, &req.repo);
let history = root.join(if repo.history.is_empty() { "bench_history.jsonl" } else { &repo.history });
let runs = bench::history::read_all(&history).unwrap_or_default();
let last = runs.last();
let layout = docs::RepoLayout::new(&root);
let ctx = docs::Ctx { repo_root: &root, workspace_root: &l.workspace_root, run: last };
let reports = docs::render_all(&layout, &ctx).map_err(ise)?;
Ok(Json(json!({
"repo": req.repo,
"reports": reports.iter().map(|r| json!({
"output": r.output,
"bytes": r.bytes,
"changed": r.changed,
"sections": r.sections,
})).collect::<Vec<_>>(),
})))
}
async fn docs_check(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<RepoReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let repo = l.nornir.repo.get(&req.repo).ok_or_else(|| ApiErr(StatusCode::NOT_FOUND, format!("repo `{}` not configured", req.repo)))?;
let root = config::Nornir::repo_dir(&l.workspace_root, &req.repo);
let history = root.join(if repo.history.is_empty() { "bench_history.jsonl" } else { &repo.history });
let runs = bench::history::read_all(&history).unwrap_or_default();
let last = runs.last();
let layout = docs::RepoLayout::new(&root);
let ctx = docs::Ctx { repo_root: &root, workspace_root: &l.workspace_root, run: last };
docs::render_check_all(&layout, &ctx).map_err(ise)?;
Ok(Json(json!({"status": "pass", "repo": req.repo})))
}
#[derive(Deserialize)]
struct DocsHistoryReq {
repo: String,
#[serde(default)]
doc: Option<String>,
#[serde(default)]
version: Option<String>,
#[serde(default)]
format: Option<String>,
#[serde(default)]
limit: Option<usize>,
}
async fn docs_history(
State(s): State<Shared>,
headers: HeaderMap,
Json(req): Json<DocsHistoryReq>,
) -> Result<Json<Value>, ApiErr> {
auth(&headers, &s.token)?;
let l = s.loaded.lock().await;
let _ = l.nornir.repo.get(&req.repo).ok_or_else(|| ApiErr(StatusCode::NOT_FOUND, format!("repo `{}` not configured", req.repo)))?;
let root = config::Nornir::repo_dir(&l.workspace_root, &req.repo);
let layout = docs::RepoLayout::new(&root);
let wh = docs::DocsWarehouse::open(&layout).map_err(ise)?;
let filter = docs::ExportFilter {
doc_name: req.doc,
version: req.version,
format: req.format,
limit: req.limit.or(Some(50)),
};
let rows = wh.list(&filter).map_err(ise)?;
Ok(Json(json!({
"repo": req.repo,
"root": wh.root(),
"rows": rows,
})))
}
#[derive(Deserialize, Default)]
struct ProgressQuery {
run_id: Option<String>,
}
async fn release_progress_sse(
State(state): State<Shared>,
Query(q): Query<ProgressQuery>,
) -> Result<Sse<impl Stream<Item = std::result::Result<Event, std::io::Error>>>, ApiErr> {
use tokio::io::{AsyncBufReadExt, AsyncSeekExt, BufReader, SeekFrom};
use tokio::time::{sleep, Duration};
let workspace_root = {
let loaded = state.loaded.lock().await;
loaded.workspace_root.clone()
};
let log_dir = workspace_root.join("workspace_holger/.nornir/logs");
let path = if let Some(rid) = q.run_id {
log_dir.join(format!("release-run-{rid}.events.ndjson"))
} else {
let mut newest: Option<(std::time::SystemTime, std::path::PathBuf)> = None;
let mut rd = tokio::fs::read_dir(&log_dir)
.await
.map_err(|e| ApiErr(StatusCode::NOT_FOUND, format!("read_dir {}: {e}", log_dir.display())))?;
while let Ok(Some(entry)) = rd.next_entry().await {
let p = entry.path();
if p.file_name()
.and_then(|s| s.to_str())
.map(|s| s.starts_with("release-run-") && s.ends_with(".events.ndjson"))
.unwrap_or(false)
{
if let Ok(meta) = entry.metadata().await {
if let Ok(mtime) = meta.modified() {
if newest.as_ref().map_or(true, |(t, _)| mtime > *t) {
newest = Some((mtime, p));
}
}
}
}
}
newest
.map(|(_, p)| p)
.ok_or_else(|| ApiErr(StatusCode::NOT_FOUND, format!("no events ndjson in {}", log_dir.display())))?
};
let stream = async_stream::try_stream! {
for _ in 0..40 {
if tokio::fs::metadata(&path).await.is_ok() { break; }
sleep(Duration::from_millis(100)).await;
}
let file = tokio::fs::File::open(&path).await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::NotFound, format!("open {}: {e}", path.display())))?;
let mut reader = BufReader::new(file);
reader.seek(SeekFrom::Start(0)).await?;
let mut line = String::new();
loop {
line.clear();
let n = reader.read_line(&mut line).await?;
if n == 0 {
sleep(Duration::from_millis(250)).await;
continue;
}
let trimmed = line.trim_end_matches(['\r', '\n']);
if trimmed.is_empty() { continue; }
let is_end = trimmed.contains("\"run_end\"");
yield Event::default().data(trimmed.to_string());
if is_end {
break;
}
}
};
Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
}