use std::collections::{HashMap, HashSet};
use std::time::{Duration, Instant};
use anyhow::Result;
use chrono::{DateTime, Utc};
use futures_util::StreamExt;
use harmont_cloud::{
HarmontClient, HarmontError,
logs::{LogEvent, StreamKind},
models::{build_is_terminal, job_is_terminal},
types::JobState,
};
use hm_plugin_protocol::events::{BuildEvent, PlanSummary, StdStream};
use uuid::Uuid;
const POLL: Duration = Duration::from_millis(1500);
const TOKEN_REFRESH_MARGIN: chrono::Duration = chrono::Duration::minutes(5);
#[derive(Debug)]
struct AbortGuard(Vec<tokio::task::JoinHandle<()>>);
impl Drop for AbortGuard {
fn drop(&mut self) {
for h in &self.0 {
h.abort();
}
}
}
pub(crate) fn ts_or_now(ts_unix_ns: Option<i64>) -> DateTime<Utc> {
ts_unix_ns.map_or_else(Utc::now, DateTime::<Utc>::from_timestamp_nanos)
}
fn duration_ms(start: Option<DateTime<Utc>>, end: Option<DateTime<Utc>>) -> u64 {
match (start, end) {
(Some(s), Some(e)) => (e - s).num_milliseconds().max(0).cast_unsigned(),
_ => 0,
}
}
const fn job_logs_available(state: JobState) -> bool {
match state {
JobState::Running
| JobState::Passed
| JobState::Failed
| JobState::TimedOut
| JobState::Canceling
| JobState::Canceled
| JobState::TimingOut => true,
JobState::Pending | JobState::Scheduled | JobState::Assigned | JobState::Skipped => false,
}
}
pub(crate) fn exit_code_for_state(state: &str) -> i32 {
match state {
"passed" => 0,
"canceled" => 130,
_ => 1,
}
}
#[allow(clippy::too_many_lines)] pub async fn watch_build(
client: &HarmontClient,
log_base: &str,
org: &str,
pipeline: &str,
number: i64,
tx: tokio::sync::mpsc::Sender<BuildEvent>,
) -> Result<i32> {
let mut log_token = client.log_token(org, pipeline, number).await?;
let started = Instant::now();
if tx
.send(BuildEvent::BuildStart {
run_id: Uuid::new_v4(),
plan: PlanSummary {
step_count: 0,
chain_count: 1,
default_runner: "cloud".to_string(),
},
started_at: Utc::now(),
})
.await
.is_err()
{
return Ok(1);
}
let mut streaming: HashSet<Uuid> = HashSet::new();
let mut ended: HashSet<Uuid> = HashSet::new();
let mut chain_idx: HashMap<Uuid, usize> = HashMap::new();
let mut next_idx: usize = 0;
let mut guard = AbortGuard(Vec::new());
let final_state = loop {
let jobs = client.list_jobs(org, pipeline, number).await?;
for job in &jobs {
if job_logs_available(job.state) && streaming.insert(job.id) {
let name = job.name.clone().unwrap_or_else(|| "job".to_string());
let idx = *chain_idx.entry(job.id).or_insert_with(|| {
let i = next_idx;
next_idx += 1;
i
});
if tx
.send(BuildEvent::StepQueued {
step_id: job.id,
key: name.clone(),
chain_idx: idx,
parent_key: None,
display_name: name.clone(),
})
.await
.is_err()
{
return Ok(1);
}
if tx
.send(BuildEvent::StepStart {
step_id: job.id,
runner: "cloud".to_string(),
image: None,
})
.await
.is_err()
{
return Ok(1);
}
if log_token.expires_at - Utc::now() < TOKEN_REFRESH_MARGIN {
match client.log_token(org, pipeline, number).await {
Ok(fresh) => log_token = fresh,
Err(e) => tracing::warn!("log-token refresh failed: {e}"),
}
}
guard.0.push(tokio::spawn(stream_one(
client.clone(),
log_base.to_string(),
job.id,
log_token.token.clone(),
tx.clone(),
)));
}
}
let build = client.get_build(org, pipeline, number).await?;
if build_is_terminal(&build.state.to_string()) {
break build.state.to_string();
}
tokio::time::sleep(POLL).await;
};
for h in guard.0.drain(..) {
let _ = h.await;
}
if let Ok(jobs) = client.list_jobs(org, pipeline, number).await {
for job in &jobs {
if job_is_terminal(&job.state.to_string())
&& ended.insert(job.id)
&& tx.send(step_end(job)).await.is_err()
{
return Ok(1);
}
}
}
let code = exit_code_for_state(&final_state);
let _ = tx
.send(BuildEvent::BuildEnd {
exit_code: code,
duration_ms: u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX),
})
.await;
Ok(code)
}
fn step_end(job: &harmont_cloud::models::Job) -> BuildEvent {
let state = job.state.to_string();
let passed = matches!(state.as_str(), "passed" | "skipped");
let exit_code = job
.exit_code
.map_or_else(|| i32::from(!passed), |c| i32::try_from(c).unwrap_or(1));
BuildEvent::StepEnd {
step_id: job.id,
exit_code,
duration_ms: duration_ms(job.started_at, job.finished_at),
snapshot: None,
}
}
pub async fn stream_job_logs_as_events(
client: &HarmontClient,
log_base: &str,
step_id: Uuid,
token: &str,
tx: &tokio::sync::mpsc::Sender<BuildEvent>,
) -> Result<()> {
let stream = client.stream_job_logs(log_base, step_id, token).await?;
futures_util::pin_mut!(stream);
let mut buf = String::new();
let mut last_stream = StreamKind::Stdout;
while let Some(item) = stream.next().await {
match item {
Ok(LogEvent::History(chunks)) => {
for c in chunks {
last_stream = c.stream;
if emit(tx, step_id, c.stream, c.ts_unix_ns, &mut buf, &c.content)
.await
.is_err()
{
return Ok(());
}
}
}
Ok(LogEvent::Chunk(c)) => {
last_stream = c.stream;
if emit(tx, step_id, c.stream, c.ts_unix_ns, &mut buf, &c.content)
.await
.is_err()
{
return Ok(());
}
}
Ok(LogEvent::Done) => break,
Err(e) => return Err(e.into()),
}
}
if !buf.is_empty() {
let line = std::mem::take(&mut buf);
let _ = tx
.send(BuildEvent::StepLog {
step_id,
stream: map_stream(last_stream),
line,
ts: Utc::now(),
})
.await;
}
Ok(())
}
async fn stream_one(
client: HarmontClient,
log_base: String,
job_id: Uuid,
token: String,
tx: tokio::sync::mpsc::Sender<BuildEvent>,
) {
let expired = stream_job_logs_as_events(&client, &log_base, job_id, &token, &tx)
.await
.err()
.and_then(|e| {
e.downcast_ref::<HarmontError>()
.map(|h| matches!(h, HarmontError::Unauthorized))
})
.unwrap_or(false);
if expired {
let _ = tx
.send(BuildEvent::StepLog {
step_id: job_id,
stream: StdStream::Stderr,
line: "live logs expired; full logs available via `hm cloud build show`"
.to_string(),
ts: Utc::now(),
})
.await;
}
}
pub(crate) const fn map_stream(kind: StreamKind) -> StdStream {
match kind {
StreamKind::Stdout => StdStream::Stdout,
StreamKind::Stderr | StreamKind::Meta => StdStream::Stderr,
}
}
async fn emit(
tx: &tokio::sync::mpsc::Sender<BuildEvent>,
job_id: Uuid,
kind: StreamKind,
ts_unix_ns: Option<i64>,
buf: &mut String,
content: &str,
) -> std::result::Result<(), ()> {
buf.push_str(content);
while let Some(nl) = buf.find('\n') {
let raw: String = buf.drain(..=nl).collect();
let line = raw.trim_end_matches(['\r', '\n']).to_string();
tx.send(BuildEvent::StepLog {
step_id: job_id,
stream: map_stream(kind),
line,
ts: ts_or_now(ts_unix_ns),
})
.await
.map_err(|_| ())?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::{JobState, exit_code_for_state, job_logs_available};
#[test]
fn logs_available_for_running_and_terminal_states() {
for state in [
JobState::Running,
JobState::Passed,
JobState::Failed,
JobState::TimedOut,
JobState::Canceling,
JobState::Canceled,
JobState::TimingOut,
] {
assert!(job_logs_available(state), "expected logs for {state}");
}
}
#[test]
fn no_logs_before_start_or_when_skipped() {
for state in [
JobState::Pending,
JobState::Scheduled,
JobState::Assigned,
JobState::Skipped,
] {
assert!(!job_logs_available(state), "expected no logs for {state}");
}
}
#[test]
fn passed_is_zero_canceled_is_130_else_is_one() {
assert_eq!(exit_code_for_state("passed"), 0);
assert_eq!(exit_code_for_state("canceled"), 130);
assert_eq!(exit_code_for_state("failed"), 1);
assert_eq!(exit_code_for_state("timed_out"), 1);
}
}