nornir 0.4.3

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
//! Thin-client timeline loader: fetch a [`Timeline`] from a running
//! `nornir-server` over the `Viz.Timeline` gRPC instead of opening a local
//! Iceberg warehouse. The server builds the timeline from the warehouse it
//! owns (it holds the redb lock) and returns it as JSON; we deserialize into
//! the same [`Timeline`] the embedded path produces, so the egui app is
//! source-agnostic.

use anyhow::{Context, Result};

use super::live::LiveEvent;
use super::model::Timeline;
use crate::warehouse::iceberg::TablePreview;

mod pb {
    tonic::include_proto!("nornir.v1");
}

/// Build an `http://…` endpoint url + a `Bearer <token>` metadata value — the
/// shared connect/auth shape for the viz gRPC clients.
fn endpoint_and_bearer(
    endpoint: &str,
    token: &str,
) -> Result<(String, tonic::metadata::MetadataValue<tonic::metadata::Ascii>)> {
    let endpoint = if endpoint.starts_with("http") {
        endpoint.to_string()
    } else {
        format!("http://{endpoint}")
    };
    let bearer = format!("Bearer {token}").parse().context("parse bearer token")?;
    Ok((endpoint, bearer))
}

/// The `nornir-workspace` metadata value from `$NORNIR_WORKSPACE`, if set —
/// selects which served workspace the gRPC calls target (the monitored one).
fn workspace_md() -> Option<tonic::metadata::MetadataValue<tonic::metadata::Ascii>> {
    std::env::var("NORNIR_WORKSPACE")
        .ok()
        .filter(|s| !s.is_empty())
        .and_then(|w| w.parse().ok())
}

/// Fetch the timeline for `workspace` from `endpoint` (e.g.
/// `http://127.0.0.1:7878`), authenticating with the bearer `token`. Runs the
/// async tonic call on a private current-thread runtime so it's safe to call
/// from the synchronous egui update loop.
pub fn fetch_timeline(endpoint: &str, token: &str, workspace: &str) -> Result<Timeline> {
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .context("build tokio runtime for viz client")?;
    rt.block_on(async {
        let endpoint = if endpoint.starts_with("http") {
            endpoint.to_string()
        } else {
            format!("http://{endpoint}")
        };
        let bearer: tonic::metadata::MetadataValue<tonic::metadata::Ascii> =
            format!("Bearer {token}").parse().context("parse bearer token")?;
        let ws_md = workspace_md();
        let channel = tonic::transport::Channel::from_shared(endpoint.clone())
            .with_context(|| format!("invalid server url `{endpoint}`"))?
            .connect()
            .await
            .with_context(|| format!("connect to nornir-server at {endpoint}"))?;
        let mut client = pb::viz_client::VizClient::with_interceptor(
            channel,
            move |mut req: tonic::Request<()>| {
                req.metadata_mut().insert("authorization", bearer.clone());
                if let Some(ws) = &ws_md {
                    req.metadata_mut().insert("nornir-workspace", ws.clone());
                }
                Ok(req)
            },
        );
        let resp = client
            .timeline(pb::VizTimelineRequest { workspace: workspace.to_string() })
            .await
            .context("Viz.Timeline RPC")?
            .into_inner();
        let timeline: Timeline =
            serde_json::from_str(&resp.json).context("decode timeline json from server")?;
        Ok(timeline)
    })
}

/// Open the server's `Release.Progress` server-stream and invoke `on_event`
/// for each converted [`LiveEvent`]. Blocks until the stream closes (the server
/// ends it after `RunEnd`) or errors — call it from a dedicated `std::thread`
/// (see [`super::live`]). Runs its own current-thread tokio runtime so it never
/// touches the egui loop. This is what makes a remote viz animate a release run
/// in real time over Tailscale: same events the local file tail would produce.
pub fn stream_progress(
    endpoint: &str,
    token: &str,
    mut on_event: impl FnMut(LiveEvent),
) -> Result<()> {
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .context("build tokio runtime for viz live client")?;
    rt.block_on(async {
        let (endpoint, bearer) = endpoint_and_bearer(endpoint, token)?;
        let channel = tonic::transport::Channel::from_shared(endpoint.clone())
            .with_context(|| format!("invalid server url `{endpoint}`"))?
            .connect()
            .await
            .with_context(|| format!("connect to nornir-server at {endpoint}"))?;
        let mut client = pb::release_client::ReleaseClient::with_interceptor(
            channel,
            move |mut req: tonic::Request<()>| {
                req.metadata_mut().insert("authorization", bearer.clone());
                Ok(req)
            },
        );
        let mut stream = client
            .progress(pb::Empty {})
            .await
            .context("Release.Progress RPC")?
            .into_inner();
        while let Some(ev) = stream.message().await.context("progress stream")? {
            if let Some(live) = to_live(ev) {
                on_event(live);
            }
        }
        Ok(())
    })
}

/// List every warehouse table the server owns (the `Warehouse.Tables` RPC) —
/// the remote counterpart to `IcebergWarehouse::table_names`.
pub fn fetch_tables(endpoint: &str, token: &str) -> Result<Vec<String>> {
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .context("build tokio runtime for viz client")?;
    rt.block_on(async {
        let (endpoint, bearer) = endpoint_and_bearer(endpoint, token)?;
        let channel = tonic::transport::Channel::from_shared(endpoint.clone())
            .with_context(|| format!("invalid server url `{endpoint}`"))?
            .connect()
            .await
            .with_context(|| format!("connect to nornir-server at {endpoint}"))?;
        let ws_md = workspace_md();
        let mut client = pb::warehouse_client::WarehouseClient::with_interceptor(
            channel,
            move |mut req: tonic::Request<()>| {
                req.metadata_mut().insert("authorization", bearer.clone());
                if let Some(ws) = &ws_md {
                    req.metadata_mut().insert("nornir-workspace", ws.clone());
                }
                Ok(req)
            },
        );
        let resp = client.tables(pb::Empty {}).await.context("Warehouse.Tables RPC")?.into_inner();
        Ok(resp.names)
    })
}

/// Scan one warehouse table for display (the `Warehouse.Scan` RPC) — the remote
/// counterpart to `IcebergWarehouse::scan_preview`. `limit` 0 = server default.
pub fn scan_table(endpoint: &str, token: &str, table: &str, limit: u32) -> Result<TablePreview> {
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .context("build tokio runtime for viz client")?;
    rt.block_on(async {
        let (endpoint, bearer) = endpoint_and_bearer(endpoint, token)?;
        let channel = tonic::transport::Channel::from_shared(endpoint.clone())
            .with_context(|| format!("invalid server url `{endpoint}`"))?
            .connect()
            .await
            .with_context(|| format!("connect to nornir-server at {endpoint}"))?;
        let ws_md = workspace_md();
        let mut client = pb::warehouse_client::WarehouseClient::with_interceptor(
            channel,
            move |mut req: tonic::Request<()>| {
                req.metadata_mut().insert("authorization", bearer.clone());
                if let Some(ws) = &ws_md {
                    req.metadata_mut().insert("nornir-workspace", ws.clone());
                }
                Ok(req)
            },
        );
        let resp = client
            .scan(pb::WarehouseScanRequest { table: table.to_string(), limit })
            .await
            .context("Warehouse.Scan RPC")?
            .into_inner();
        Ok(TablePreview {
            columns: resp.columns,
            rows: resp.rows.into_iter().map(|r| r.cells).collect(),
        })
    })
}

/// Convert a proto `ProgressEvent` (the oneof) into the viz [`LiveEvent`] —
/// the same enum the local NDJSON tail deserializes, so the pane is wire-source
/// agnostic. Unknown/empty kinds are dropped (forward-compat).
fn to_live(ev: pb::ProgressEvent) -> Option<LiveEvent> {
    use pb::progress_event::Kind;
    Some(match ev.kind? {
        Kind::RunStart(x) => LiveEvent::RunStart { run_id: x.run_id, workspace: x.workspace },
        Kind::RepoStart(x) => LiveEvent::RepoStart { repo: x.repo, sha: x.sha },
        Kind::PhaseStart(x) => LiveEvent::PhaseStart { repo: x.repo, phase: x.phase },
        Kind::PhaseEnd(x) => LiveEvent::PhaseEnd {
            repo: x.repo,
            phase: x.phase,
            ok: x.ok,
            duration_ms: x.duration_ms,
        },
        Kind::BinaryStart(x) => LiveEvent::BinaryStart { repo: x.repo, binary: x.binary },
        Kind::TestPass(x) => LiveEvent::TestPass { repo: x.repo, binary: x.binary, name: x.name },
        Kind::TestFail(x) => LiveEvent::TestFail { repo: x.repo, binary: x.binary, name: x.name },
        Kind::BinaryDone(x) => LiveEvent::BinaryDone {
            repo: x.repo,
            binary: x.binary,
            passed: x.passed,
            failed: x.failed,
        },
        Kind::RepoEnd(x) => LiveEvent::RepoEnd { repo: x.repo, ok: x.ok },
        Kind::RunEnd(x) => LiveEvent::RunEnd { run_id: x.run_id, ok: x.ok },
    })
}