hm_exec/cloud/watch.rs
1//! Watch a cloud build to completion, emitting [`BuildEvent`]s.
2//!
3//! Discovers jobs, streams each job's logs concurrently, and maps cloud job
4//! lifecycle + SSE logs to the shared [`BuildEvent`] vocabulary so the cloud
5//! path renders through the same `hm-render` renderers as a local run.
6//!
7//! A cloud job maps to a pipeline step (keyed by `job.id`); the cloud build
8//! is modeled as a single chain (`chain_idx == 0`, `chain_count == 1`).
9
10use std::collections::{HashMap, HashSet};
11use std::time::{Duration, Instant};
12
13use anyhow::Result;
14use chrono::{DateTime, Utc};
15use futures_util::StreamExt;
16use harmont_cloud::{
17 HarmontClient, HarmontError,
18 logs::{LogEvent, StreamKind},
19 models::{build_is_terminal, job_is_terminal},
20 types::JobState,
21};
22use hm_plugin_protocol::events::{BuildEvent, PlanSummary, StdStream};
23use uuid::Uuid;
24
25/// Poll-interval for build/job status.
26const POLL: Duration = Duration::from_millis(1500);
27
28/// Re-mint the log token when its remaining lifetime drops below this margin,
29/// so a stream spawned late in a long build starts with a fresh, valid token
30/// instead of one that 401s within seconds.
31const TOKEN_REFRESH_MARGIN: chrono::Duration = chrono::Duration::minutes(5);
32
33/// Aborts any still-running stream tasks when dropped (covers early-return
34/// error paths so no detached ghost tasks outlive `watch_build`).
35#[derive(Debug)]
36struct AbortGuard(Vec<tokio::task::JoinHandle<()>>);
37impl Drop for AbortGuard {
38 fn drop(&mut self) {
39 for h in &self.0 {
40 h.abort();
41 }
42 }
43}
44
45/// Convert a unix-nanosecond timestamp to a UTC datetime, falling back to
46/// "now" when absent or out of range.
47pub(crate) fn ts_or_now(ts_unix_ns: Option<i64>) -> DateTime<Utc> {
48 ts_unix_ns.map_or_else(Utc::now, DateTime::<Utc>::from_timestamp_nanos)
49}
50
51/// Duration between two optional timestamps, in milliseconds (0 if either is
52/// missing or the interval is negative).
53fn duration_ms(start: Option<DateTime<Utc>>, end: Option<DateTime<Utc>>) -> u64 {
54 match (start, end) {
55 (Some(s), Some(e)) => (e - s).num_milliseconds().max(0).cast_unsigned(),
56 _ => 0,
57 }
58}
59
60/// Whether a job has reached a state where its logs exist (running or already
61/// terminal), and so a log stream should be started for it.
62///
63/// Matching the typed [`JobState`] enum (rather than `to_string()`/`as_str()`
64/// against string literals) makes the set of states exhaustive: when the cloud
65/// adds a new `JobState` variant the compiler forces this decision to be
66/// revisited, and a misspelled state can no longer silently drop a job's logs.
67const fn job_logs_available(state: JobState) -> bool {
68 match state {
69 JobState::Running
70 | JobState::Passed
71 | JobState::Failed
72 | JobState::TimedOut
73 | JobState::Canceling
74 | JobState::Canceled
75 | JobState::TimingOut => true,
76 // No logs yet (not started) or never produced (skipped).
77 JobState::Pending | JobState::Scheduled | JobState::Assigned | JobState::Skipped => false,
78 }
79}
80
81/// Map a terminal build state to the process exit code the renderer and the
82/// `hm run` driver use. `passed` → 0, `canceled` → 130 (SIGINT-cancel, mirrors
83/// [`crate::BuildStatus::Canceled`]), everything else (`failed`, and any
84/// unexpected state) → 1. Kept in lockstep with the backend's state→status map
85/// so a server-side cancel is never reported as a failure.
86pub(crate) fn exit_code_for_state(state: &str) -> i32 {
87 match state {
88 "passed" => 0,
89 "canceled" => 130,
90 _ => 1,
91 }
92}
93
94/// Watch `build #number` until terminal, emitting [`BuildEvent`]s on `tx`.
95///
96/// `log_base` is the host serving the SSE log stream (the API base in prod).
97/// Returns the terminal exit code via [`exit_code_for_state`]: 0 passed, 130
98/// canceled, 1 otherwise.
99///
100/// # Errors
101/// Returns an error if any SDK call fails (build status poll, job list, or log
102/// token fetch). A dropped receiver (`tx`) is treated as a clean early exit
103/// (`Ok(1)`) — not an error.
104#[allow(clippy::too_many_lines)] // single-responsibility poll loop; split would obscure flow
105pub async fn watch_build(
106 client: &HarmontClient,
107 log_base: &str,
108 org: &str,
109 pipeline: &str,
110 number: i64,
111 tx: tokio::sync::mpsc::Sender<BuildEvent>,
112) -> Result<i32> {
113 // Log tokens carry a ~1h TTL. A long build outlives a single mint, so a
114 // job whose stream starts late in the build would 401 mid-stream. We keep
115 // the minted token (with its `expires_at`) and re-mint before spawning a
116 // new stream once we're within `TOKEN_REFRESH_MARGIN` of expiry, so every
117 // later-starting step gets a valid token. Streams that 401 anyway surface a
118 // one-line notice (see `stream_one`) rather than silently dropping logs.
119 let mut log_token = client.log_token(org, pipeline, number).await?;
120
121 let started = Instant::now();
122 if tx
123 .send(BuildEvent::BuildStart {
124 run_id: Uuid::new_v4(),
125 plan: PlanSummary {
126 // #jobs isn't known until the first list_jobs; 0 is a fine
127 // placeholder (renderers treat it as "not yet known").
128 step_count: 0,
129 chain_count: 1,
130 default_runner: "cloud".to_string(),
131 },
132 started_at: Utc::now(),
133 })
134 .await
135 .is_err()
136 {
137 // Renderer side went away — nothing left to drive.
138 return Ok(1);
139 }
140
141 // Jobs we've started a log stream for.
142 let mut streaming: HashSet<Uuid> = HashSet::new();
143 // Deduplicates the post-drain StepEnd sweep: if `list_jobs` returns the
144 // same job ID more than once we emit only one StepEnd per job.
145 let mut ended: HashSet<Uuid> = HashSet::new();
146 // Stable chain-local index assigned in discovery order.
147 let mut chain_idx: HashMap<Uuid, usize> = HashMap::new();
148 let mut next_idx: usize = 0;
149 let mut guard = AbortGuard(Vec::new());
150
151 let final_state = loop {
152 // Discover jobs; start a log stream for each job that has reached a
153 // state where logs exist (running or already terminal).
154 let jobs = client.list_jobs(org, pipeline, number).await?;
155 for job in &jobs {
156 if job_logs_available(job.state) && streaming.insert(job.id) {
157 let name = job.name.clone().unwrap_or_else(|| "job".to_string());
158 let idx = *chain_idx.entry(job.id).or_insert_with(|| {
159 let i = next_idx;
160 next_idx += 1;
161 i
162 });
163 if tx
164 .send(BuildEvent::StepQueued {
165 step_id: job.id,
166 key: name.clone(),
167 chain_idx: idx,
168 parent_key: None,
169 display_name: name.clone(),
170 })
171 .await
172 .is_err()
173 {
174 return Ok(1);
175 }
176 if tx
177 .send(BuildEvent::StepStart {
178 step_id: job.id,
179 runner: "cloud".to_string(),
180 image: None,
181 })
182 .await
183 .is_err()
184 {
185 return Ok(1);
186 }
187 // Re-mint the token if it's near expiry before this (possibly
188 // late-starting) stream begins. A re-mint failure is
189 // non-fatal: fall back to the existing token and let
190 // `stream_one` surface a notice if the server rejects it.
191 if log_token.expires_at - Utc::now() < TOKEN_REFRESH_MARGIN {
192 match client.log_token(org, pipeline, number).await {
193 Ok(fresh) => log_token = fresh,
194 Err(e) => tracing::warn!("log-token refresh failed: {e}"),
195 }
196 }
197 guard.0.push(tokio::spawn(stream_one(
198 client.clone(),
199 log_base.to_string(),
200 job.id,
201 log_token.token.clone(),
202 tx.clone(),
203 )));
204 }
205 // NOTE: StepEnd is intentionally NOT emitted here. A job's log
206 // stream runs in a spawned task concurrently with this poll loop;
207 // emitting StepEnd now could order it ahead of that job's still-
208 // in-flight StepLog lines. We drain every stream below, then emit
209 // all StepEnds — guaranteeing logs precede the step's terminal mark.
210 }
211
212 let build = client.get_build(org, pipeline, number).await?;
213 if build_is_terminal(&build.state.to_string()) {
214 break build.state.to_string();
215 }
216 // TODO: no overall deadline; a build stuck non-terminal loops forever
217 // (matches `hm cloud build watch`). Consider a --timeout ceiling.
218 tokio::time::sleep(POLL).await;
219 };
220
221 // Drain all log streams (empties the guard so Drop aborts nothing on the
222 // success path).
223 for h in guard.0.drain(..) {
224 let _ = h.await;
225 }
226
227 // Emit StepEnd for any terminal job not yet ended (e.g. a job that went
228 // straight to terminal in the same poll the build did).
229 if let Ok(jobs) = client.list_jobs(org, pipeline, number).await {
230 for job in &jobs {
231 if job_is_terminal(&job.state.to_string())
232 && ended.insert(job.id)
233 && tx.send(step_end(job)).await.is_err()
234 {
235 return Ok(1);
236 }
237 }
238 }
239
240 let code = exit_code_for_state(&final_state);
241 // Best-effort close; ignore a dropped receiver.
242 let _ = tx
243 .send(BuildEvent::BuildEnd {
244 exit_code: code,
245 // Saturate at u64::MAX (~584 million years) rather than panic.
246 duration_ms: u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX),
247 })
248 .await;
249 Ok(code)
250}
251
252/// Build a `StepEnd` event from a (terminal) job's recorded fields.
253fn step_end(job: &harmont_cloud::models::Job) -> BuildEvent {
254 let state = job.state.to_string();
255 let passed = matches!(state.as_str(), "passed" | "skipped");
256 let exit_code = job
257 .exit_code
258 // Saturate exit codes outside [i32::MIN, i32::MAX] rather than panic.
259 .map_or_else(|| i32::from(!passed), |c| i32::try_from(c).unwrap_or(1));
260 BuildEvent::StepEnd {
261 step_id: job.id,
262 exit_code,
263 duration_ms: duration_ms(job.started_at, job.finished_at),
264 snapshot: None,
265 }
266}
267
268/// Stream one job's SSE logs as [`BuildEvent::StepLog`] events.
269///
270/// Emits a `StepLog` per complete line (keyed by `step_id`) to `tx`, until
271/// the job's `done` event. Buffers partial lines and flushes the trailing
272/// remainder. Used by both the multi-job watch loop and the single-job
273/// `hm cloud job log` tail.
274///
275/// Returns `Ok(())` on a clean `done` close. A dropped receiver (`tx.send`
276/// fails) is treated as a clean stop — the caller has gone away, not the job.
277///
278/// **Error semantics are caller-controlled:**
279/// - The multi-job watcher (`stream_one`) swallows the error (best-effort: log
280/// other jobs, keep watching).
281/// - The single-job tail (`hm cloud job log`) propagates it (`?`) so the
282/// command surfaces transport failures to the user.
283///
284/// # Errors
285/// Returns an error on transport or SSE stream failure (the underlying
286/// [`HarmontClient::stream_job_logs`] call or a non-`Done` error event).
287pub async fn stream_job_logs_as_events(
288 client: &HarmontClient,
289 log_base: &str,
290 step_id: Uuid,
291 token: &str,
292 tx: &tokio::sync::mpsc::Sender<BuildEvent>,
293) -> Result<()> {
294 let stream = client.stream_job_logs(log_base, step_id, token).await?;
295 futures_util::pin_mut!(stream);
296 let mut buf = String::new();
297 let mut last_stream = StreamKind::Stdout;
298 while let Some(item) = stream.next().await {
299 match item {
300 Ok(LogEvent::History(chunks)) => {
301 for c in chunks {
302 last_stream = c.stream;
303 if emit(tx, step_id, c.stream, c.ts_unix_ns, &mut buf, &c.content)
304 .await
305 .is_err()
306 {
307 // Receiver dropped — treat as clean stop.
308 return Ok(());
309 }
310 }
311 }
312 Ok(LogEvent::Chunk(c)) => {
313 last_stream = c.stream;
314 if emit(tx, step_id, c.stream, c.ts_unix_ns, &mut buf, &c.content)
315 .await
316 .is_err()
317 {
318 // Receiver dropped — treat as clean stop.
319 return Ok(());
320 }
321 }
322 Ok(LogEvent::Done) => break,
323 Err(e) => return Err(e.into()),
324 }
325 }
326 // Flush any trailing partial line.
327 if !buf.is_empty() {
328 let line = std::mem::take(&mut buf);
329 // Ignore send failure: receiver dropping at flush time is still a
330 // clean stop.
331 let _ = tx
332 .send(BuildEvent::StepLog {
333 step_id,
334 stream: map_stream(last_stream),
335 line,
336 ts: Utc::now(),
337 })
338 .await;
339 }
340 Ok(())
341}
342
343/// Thin wrapper used by the multi-job watch loop. Errors are treated as
344/// best-effort (log stream for this job stops, other jobs continue) — with one
345/// exception: a `401 Unauthorized` (the log token expired mid-build) is
346/// surfaced as a single one-line notice on the step's stream instead of being
347/// dropped silently, so the gulf of evaluation ("why did my logs stop?") stays
348/// narrow. The build-status poll still drives the build to its real verdict.
349async fn stream_one(
350 client: HarmontClient,
351 log_base: String,
352 job_id: Uuid,
353 token: String,
354 tx: tokio::sync::mpsc::Sender<BuildEvent>,
355) {
356 let expired = stream_job_logs_as_events(&client, &log_base, job_id, &token, &tx)
357 .await
358 .err()
359 .and_then(|e| {
360 e.downcast_ref::<HarmontError>()
361 .map(|h| matches!(h, HarmontError::Unauthorized))
362 })
363 .unwrap_or(false);
364 if expired {
365 let _ = tx
366 .send(BuildEvent::StepLog {
367 step_id: job_id,
368 stream: StdStream::Stderr,
369 line: "live logs expired; full logs available via `hm cloud build show`"
370 .to_string(),
371 ts: Utc::now(),
372 })
373 .await;
374 }
375}
376
377/// Map the SDK stream kind onto the renderer's two-way stream: `Meta` folds
378/// into `Stderr` (it's out-of-band, not pipeline stdout).
379pub(crate) const fn map_stream(kind: StreamKind) -> StdStream {
380 match kind {
381 StreamKind::Stdout => StdStream::Stdout,
382 StreamKind::Stderr | StreamKind::Meta => StdStream::Stderr,
383 }
384}
385
386/// Buffer content and emit complete `\n`-terminated lines as `StepLog`
387/// events. Returns `Err(())` if the receiver dropped (caller should stop).
388async fn emit(
389 tx: &tokio::sync::mpsc::Sender<BuildEvent>,
390 job_id: Uuid,
391 kind: StreamKind,
392 ts_unix_ns: Option<i64>,
393 buf: &mut String,
394 content: &str,
395) -> std::result::Result<(), ()> {
396 buf.push_str(content);
397 while let Some(nl) = buf.find('\n') {
398 let raw: String = buf.drain(..=nl).collect();
399 let line = raw.trim_end_matches(['\r', '\n']).to_string();
400 tx.send(BuildEvent::StepLog {
401 step_id: job_id,
402 stream: map_stream(kind),
403 line,
404 ts: ts_or_now(ts_unix_ns),
405 })
406 .await
407 .map_err(|_| ())?;
408 }
409 Ok(())
410}
411
412#[cfg(test)]
413mod tests {
414 use super::{JobState, exit_code_for_state, job_logs_available};
415
416 #[test]
417 fn logs_available_for_running_and_terminal_states() {
418 for state in [
419 JobState::Running,
420 JobState::Passed,
421 JobState::Failed,
422 JobState::TimedOut,
423 JobState::Canceling,
424 JobState::Canceled,
425 JobState::TimingOut,
426 ] {
427 assert!(job_logs_available(state), "expected logs for {state}");
428 }
429 }
430
431 #[test]
432 fn no_logs_before_start_or_when_skipped() {
433 for state in [
434 JobState::Pending,
435 JobState::Scheduled,
436 JobState::Assigned,
437 JobState::Skipped,
438 ] {
439 assert!(!job_logs_available(state), "expected no logs for {state}");
440 }
441 }
442
443 #[test]
444 fn passed_is_zero_canceled_is_130_else_is_one() {
445 assert_eq!(exit_code_for_state("passed"), 0);
446 // A server-side cancel must NOT collapse to the generic failure code.
447 assert_eq!(exit_code_for_state("canceled"), 130);
448 assert_eq!(exit_code_for_state("failed"), 1);
449 // Unexpected/unknown terminal states fail closed.
450 assert_eq!(exit_code_for_state("timed_out"), 1);
451 }
452}