use anyhow::{Context, Result};
use super::live::LiveEvent;
use super::model::Timeline;
use crate::warehouse::iceberg::TablePreview;
mod pb {
tonic::include_proto!("nornir.v1");
}
fn endpoint_and_bearer(
endpoint: &str,
token: &str,
) -> Result<(String, tonic::metadata::MetadataValue<tonic::metadata::Ascii>)> {
let endpoint = if endpoint.starts_with("http") {
endpoint.to_string()
} else {
format!("http://{endpoint}")
};
let bearer = format!("Bearer {token}").parse().context("parse bearer token")?;
Ok((endpoint, bearer))
}
fn workspace_md() -> Option<tonic::metadata::MetadataValue<tonic::metadata::Ascii>> {
std::env::var("NORNIR_WORKSPACE")
.ok()
.filter(|s| !s.is_empty())
.and_then(|w| w.parse().ok())
}
pub fn fetch_timeline(endpoint: &str, token: &str, workspace: &str) -> Result<Timeline> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("build tokio runtime for viz client")?;
rt.block_on(async {
let endpoint = if endpoint.starts_with("http") {
endpoint.to_string()
} else {
format!("http://{endpoint}")
};
let bearer: tonic::metadata::MetadataValue<tonic::metadata::Ascii> =
format!("Bearer {token}").parse().context("parse bearer token")?;
let ws_md = workspace_md();
let channel = tonic::transport::Channel::from_shared(endpoint.clone())
.with_context(|| format!("invalid server url `{endpoint}`"))?
.connect()
.await
.with_context(|| format!("connect to nornir-server at {endpoint}"))?;
let mut client = pb::viz_client::VizClient::with_interceptor(
channel,
move |mut req: tonic::Request<()>| {
req.metadata_mut().insert("authorization", bearer.clone());
if let Some(ws) = &ws_md {
req.metadata_mut().insert("nornir-workspace", ws.clone());
}
Ok(req)
},
);
let resp = client
.timeline(pb::VizTimelineRequest { workspace: workspace.to_string() })
.await
.context("Viz.Timeline RPC")?
.into_inner();
let timeline: Timeline =
serde_json::from_str(&resp.json).context("decode timeline json from server")?;
Ok(timeline)
})
}
pub fn stream_progress(
endpoint: &str,
token: &str,
mut on_event: impl FnMut(LiveEvent),
) -> Result<()> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("build tokio runtime for viz live client")?;
rt.block_on(async {
let (endpoint, bearer) = endpoint_and_bearer(endpoint, token)?;
let channel = tonic::transport::Channel::from_shared(endpoint.clone())
.with_context(|| format!("invalid server url `{endpoint}`"))?
.connect()
.await
.with_context(|| format!("connect to nornir-server at {endpoint}"))?;
let mut client = pb::release_client::ReleaseClient::with_interceptor(
channel,
move |mut req: tonic::Request<()>| {
req.metadata_mut().insert("authorization", bearer.clone());
Ok(req)
},
);
let mut stream = client
.progress(pb::Empty {})
.await
.context("Release.Progress RPC")?
.into_inner();
while let Some(ev) = stream.message().await.context("progress stream")? {
if let Some(live) = to_live(ev) {
on_event(live);
}
}
Ok(())
})
}
pub fn fetch_tables(endpoint: &str, token: &str) -> Result<Vec<String>> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("build tokio runtime for viz client")?;
rt.block_on(async {
let (endpoint, bearer) = endpoint_and_bearer(endpoint, token)?;
let channel = tonic::transport::Channel::from_shared(endpoint.clone())
.with_context(|| format!("invalid server url `{endpoint}`"))?
.connect()
.await
.with_context(|| format!("connect to nornir-server at {endpoint}"))?;
let ws_md = workspace_md();
let mut client = pb::warehouse_client::WarehouseClient::with_interceptor(
channel,
move |mut req: tonic::Request<()>| {
req.metadata_mut().insert("authorization", bearer.clone());
if let Some(ws) = &ws_md {
req.metadata_mut().insert("nornir-workspace", ws.clone());
}
Ok(req)
},
);
let resp = client.tables(pb::Empty {}).await.context("Warehouse.Tables RPC")?.into_inner();
Ok(resp.names)
})
}
pub fn scan_table(endpoint: &str, token: &str, table: &str, limit: u32) -> Result<TablePreview> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.context("build tokio runtime for viz client")?;
rt.block_on(async {
let (endpoint, bearer) = endpoint_and_bearer(endpoint, token)?;
let channel = tonic::transport::Channel::from_shared(endpoint.clone())
.with_context(|| format!("invalid server url `{endpoint}`"))?
.connect()
.await
.with_context(|| format!("connect to nornir-server at {endpoint}"))?;
let ws_md = workspace_md();
let mut client = pb::warehouse_client::WarehouseClient::with_interceptor(
channel,
move |mut req: tonic::Request<()>| {
req.metadata_mut().insert("authorization", bearer.clone());
if let Some(ws) = &ws_md {
req.metadata_mut().insert("nornir-workspace", ws.clone());
}
Ok(req)
},
);
let resp = client
.scan(pb::WarehouseScanRequest { table: table.to_string(), limit })
.await
.context("Warehouse.Scan RPC")?
.into_inner();
Ok(TablePreview {
columns: resp.columns,
rows: resp.rows.into_iter().map(|r| r.cells).collect(),
})
})
}
fn to_live(ev: pb::ProgressEvent) -> Option<LiveEvent> {
use pb::progress_event::Kind;
Some(match ev.kind? {
Kind::RunStart(x) => LiveEvent::RunStart { run_id: x.run_id, workspace: x.workspace },
Kind::RepoStart(x) => LiveEvent::RepoStart { repo: x.repo, sha: x.sha },
Kind::PhaseStart(x) => LiveEvent::PhaseStart { repo: x.repo, phase: x.phase },
Kind::PhaseEnd(x) => LiveEvent::PhaseEnd {
repo: x.repo,
phase: x.phase,
ok: x.ok,
duration_ms: x.duration_ms,
},
Kind::BinaryStart(x) => LiveEvent::BinaryStart { repo: x.repo, binary: x.binary },
Kind::TestPass(x) => LiveEvent::TestPass { repo: x.repo, binary: x.binary, name: x.name },
Kind::TestFail(x) => LiveEvent::TestFail { repo: x.repo, binary: x.binary, name: x.name },
Kind::BinaryDone(x) => LiveEvent::BinaryDone {
repo: x.repo,
binary: x.binary,
passed: x.passed,
failed: x.failed,
},
Kind::RepoEnd(x) => LiveEvent::RepoEnd { repo: x.repo, ok: x.ok },
Kind::RunEnd(x) => LiveEvent::RunEnd { run_id: x.run_id, ok: x.ok },
})
}