Skip to main content

hm_exec/cloud/
watch.rs

1//! Watch a cloud build to completion, emitting [`BuildEvent`]s.
2//!
3//! Discovers jobs, streams each job's logs concurrently, and maps cloud job
4//! lifecycle + SSE logs to the shared [`BuildEvent`] vocabulary so the cloud
5//! path renders through the same `hm-render` renderers as a local run.
6//!
7//! A cloud job maps to a pipeline step (keyed by `job.id`); the cloud build
8//! is modeled as a single chain (`chain_idx == 0`, `chain_count == 1`).
9
10use std::collections::{HashMap, HashSet};
11use std::time::{Duration, Instant};
12
13use anyhow::Result;
14use chrono::{DateTime, Utc};
15use futures_util::StreamExt;
16use harmont_cloud::{
17    HarmontClient, HarmontError,
18    logs::{LogEvent, StreamKind},
19    models::{build_is_terminal, job_is_terminal},
20    types::JobState,
21};
22use hm_plugin_protocol::events::{BuildEvent, PlanSummary, StdStream};
23use uuid::Uuid;
24
25/// Poll-interval for build/job status.
26const POLL: Duration = Duration::from_millis(1500);
27
28/// Re-mint the log token when its remaining lifetime drops below this margin,
29/// so a stream spawned late in a long build starts with a fresh, valid token
30/// instead of one that 401s within seconds.
31const TOKEN_REFRESH_MARGIN: chrono::Duration = chrono::Duration::minutes(5);
32
33/// Aborts any still-running stream tasks when dropped (covers early-return
34/// error paths so no detached ghost tasks outlive `watch_build`).
35#[derive(Debug)]
36struct AbortGuard(Vec<tokio::task::JoinHandle<()>>);
37impl Drop for AbortGuard {
38    fn drop(&mut self) {
39        for h in &self.0 {
40            h.abort();
41        }
42    }
43}
44
45/// Convert a unix-nanosecond timestamp to a UTC datetime, falling back to
46/// "now" when absent or out of range.
47pub(crate) fn ts_or_now(ts_unix_ns: Option<i64>) -> DateTime<Utc> {
48    ts_unix_ns.map_or_else(Utc::now, DateTime::<Utc>::from_timestamp_nanos)
49}
50
51/// Duration between two optional timestamps, in milliseconds (0 if either is
52/// missing or the interval is negative).
53fn duration_ms(start: Option<DateTime<Utc>>, end: Option<DateTime<Utc>>) -> u64 {
54    match (start, end) {
55        (Some(s), Some(e)) => (e - s).num_milliseconds().max(0).cast_unsigned(),
56        _ => 0,
57    }
58}
59
60/// Whether a job has reached a state where its logs exist (running or already
61/// terminal), and so a log stream should be started for it.
62///
63/// Matching the typed [`JobState`] enum (rather than `to_string()`/`as_str()`
64/// against string literals) makes the set of states exhaustive: when the cloud
65/// adds a new `JobState` variant the compiler forces this decision to be
66/// revisited, and a misspelled state can no longer silently drop a job's logs.
67const fn job_logs_available(state: JobState) -> bool {
68    match state {
69        JobState::Running
70        | JobState::Passed
71        | JobState::Failed
72        | JobState::TimedOut
73        | JobState::Canceling
74        | JobState::Canceled
75        | JobState::TimingOut => true,
76        // No logs yet (not started) or never produced (skipped).
77        JobState::Pending | JobState::Scheduled | JobState::Assigned | JobState::Skipped => false,
78    }
79}
80
81/// Map a terminal build state to the process exit code the renderer and the
82/// `hm run` driver use. `passed` → 0, `canceled` → 130 (SIGINT-cancel, mirrors
83/// [`crate::BuildStatus::Canceled`]), everything else (`failed`, and any
84/// unexpected state) → 1. Kept in lockstep with the backend's state→status map
85/// so a server-side cancel is never reported as a failure.
86pub(crate) fn exit_code_for_state(state: &str) -> i32 {
87    match state {
88        "passed" => 0,
89        "canceled" => 130,
90        _ => 1,
91    }
92}
93
94/// Watch `build #number` until terminal, emitting [`BuildEvent`]s on `tx`.
95///
96/// `log_base` is the host serving the SSE log stream (the API base in prod).
97/// Returns the terminal exit code via [`exit_code_for_state`]: 0 passed, 130
98/// canceled, 1 otherwise.
99///
100/// # Errors
101/// Returns an error if any SDK call fails (build status poll, job list, or log
102/// token fetch). A dropped receiver (`tx`) is treated as a clean early exit
103/// (`Ok(1)`) — not an error.
104#[allow(clippy::too_many_lines)] // single-responsibility poll loop; split would obscure flow
105pub async fn watch_build(
106    client: &HarmontClient,
107    log_base: &str,
108    org: &str,
109    pipeline: &str,
110    number: i64,
111    tx: tokio::sync::mpsc::Sender<BuildEvent>,
112) -> Result<i32> {
113    // Log tokens carry a ~1h TTL. A long build outlives a single mint, so a
114    // job whose stream starts late in the build would 401 mid-stream. We keep
115    // the minted token (with its `expires_at`) and re-mint before spawning a
116    // new stream once we're within `TOKEN_REFRESH_MARGIN` of expiry, so every
117    // later-starting step gets a valid token. Streams that 401 anyway surface a
118    // one-line notice (see `stream_one`) rather than silently dropping logs.
119    let mut log_token = client.log_token(org, pipeline, number).await?;
120
121    let started = Instant::now();
122    if tx
123        .send(BuildEvent::BuildStart {
124            run_id: Uuid::new_v4(),
125            plan: PlanSummary {
126                // #jobs isn't known until the first list_jobs; 0 is a fine
127                // placeholder (renderers treat it as "not yet known").
128                step_count: 0,
129                chain_count: 1,
130                default_runner: "cloud".to_string(),
131            },
132            started_at: Utc::now(),
133        })
134        .await
135        .is_err()
136    {
137        // Renderer side went away — nothing left to drive.
138        return Ok(1);
139    }
140
141    // Jobs we've started a log stream for.
142    let mut streaming: HashSet<Uuid> = HashSet::new();
143    // Deduplicates the post-drain StepEnd sweep: if `list_jobs` returns the
144    // same job ID more than once we emit only one StepEnd per job.
145    let mut ended: HashSet<Uuid> = HashSet::new();
146    // Stable chain-local index assigned in discovery order.
147    let mut chain_idx: HashMap<Uuid, usize> = HashMap::new();
148    let mut next_idx: usize = 0;
149    let mut guard = AbortGuard(Vec::new());
150
151    let final_state = loop {
152        // Discover jobs; start a log stream for each job that has reached a
153        // state where logs exist (running or already terminal).
154        let jobs = client.list_jobs(org, pipeline, number).await?;
155        for job in &jobs {
156            if job_logs_available(job.state) && streaming.insert(job.id) {
157                let name = job.name.clone().unwrap_or_else(|| "job".to_string());
158                let idx = *chain_idx.entry(job.id).or_insert_with(|| {
159                    let i = next_idx;
160                    next_idx += 1;
161                    i
162                });
163                if tx
164                    .send(BuildEvent::StepQueued {
165                        step_id: job.id,
166                        key: name.clone(),
167                        chain_idx: idx,
168                        parent_key: None,
169                        display_name: name.clone(),
170                    })
171                    .await
172                    .is_err()
173                {
174                    return Ok(1);
175                }
176                if tx
177                    .send(BuildEvent::StepStart {
178                        step_id: job.id,
179                        runner: "cloud".to_string(),
180                        image: None,
181                    })
182                    .await
183                    .is_err()
184                {
185                    return Ok(1);
186                }
187                // Re-mint the token if it's near expiry before this (possibly
188                // late-starting) stream begins. A re-mint failure is
189                // non-fatal: fall back to the existing token and let
190                // `stream_one` surface a notice if the server rejects it.
191                if log_token.expires_at - Utc::now() < TOKEN_REFRESH_MARGIN {
192                    match client.log_token(org, pipeline, number).await {
193                        Ok(fresh) => log_token = fresh,
194                        Err(e) => tracing::warn!("log-token refresh failed: {e}"),
195                    }
196                }
197                guard.0.push(tokio::spawn(stream_one(
198                    client.clone(),
199                    log_base.to_string(),
200                    job.id,
201                    log_token.token.clone(),
202                    tx.clone(),
203                )));
204            }
205            // NOTE: StepEnd is intentionally NOT emitted here. A job's log
206            // stream runs in a spawned task concurrently with this poll loop;
207            // emitting StepEnd now could order it ahead of that job's still-
208            // in-flight StepLog lines. We drain every stream below, then emit
209            // all StepEnds — guaranteeing logs precede the step's terminal mark.
210        }
211
212        let build = client.get_build(org, pipeline, number).await?;
213        if build_is_terminal(&build.state.to_string()) {
214            break build.state.to_string();
215        }
216        // TODO: no overall deadline; a build stuck non-terminal loops forever
217        // (matches `hm cloud build watch`). Consider a --timeout ceiling.
218        tokio::time::sleep(POLL).await;
219    };
220
221    // Drain all log streams (empties the guard so Drop aborts nothing on the
222    // success path).
223    for h in guard.0.drain(..) {
224        let _ = h.await;
225    }
226
227    // Emit StepEnd for any terminal job not yet ended (e.g. a job that went
228    // straight to terminal in the same poll the build did).
229    if let Ok(jobs) = client.list_jobs(org, pipeline, number).await {
230        for job in &jobs {
231            if job_is_terminal(&job.state.to_string())
232                && ended.insert(job.id)
233                && tx.send(step_end(job)).await.is_err()
234            {
235                return Ok(1);
236            }
237        }
238    }
239
240    let code = exit_code_for_state(&final_state);
241    // Best-effort close; ignore a dropped receiver.
242    let _ = tx
243        .send(BuildEvent::BuildEnd {
244            exit_code: code,
245            // Saturate at u64::MAX (~584 million years) rather than panic.
246            duration_ms: u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX),
247        })
248        .await;
249    Ok(code)
250}
251
252/// Build a `StepEnd` event from a (terminal) job's recorded fields.
253fn step_end(job: &harmont_cloud::models::Job) -> BuildEvent {
254    let state = job.state.to_string();
255    let passed = matches!(state.as_str(), "passed" | "skipped");
256    let exit_code = job
257        .exit_code
258        // Saturate exit codes outside [i32::MIN, i32::MAX] rather than panic.
259        .map_or_else(|| i32::from(!passed), |c| i32::try_from(c).unwrap_or(1));
260    BuildEvent::StepEnd {
261        step_id: job.id,
262        exit_code,
263        duration_ms: duration_ms(job.started_at, job.finished_at),
264        snapshot: None,
265    }
266}
267
268/// Stream one job's SSE logs as [`BuildEvent::StepLog`] events.
269///
270/// Emits a `StepLog` per complete line (keyed by `step_id`) to `tx`, until
271/// the job's `done` event. Buffers partial lines and flushes the trailing
272/// remainder. Used by both the multi-job watch loop and the single-job
273/// `hm cloud job log` tail.
274///
275/// Returns `Ok(())` on a clean `done` close. A dropped receiver (`tx.send`
276/// fails) is treated as a clean stop — the caller has gone away, not the job.
277///
278/// **Error semantics are caller-controlled:**
279/// - The multi-job watcher (`stream_one`) swallows the error (best-effort: log
280///   other jobs, keep watching).
281/// - The single-job tail (`hm cloud job log`) propagates it (`?`) so the
282///   command surfaces transport failures to the user.
283///
284/// # Errors
285/// Returns an error on transport or SSE stream failure (the underlying
286/// [`HarmontClient::stream_job_logs`] call or a non-`Done` error event).
287pub async fn stream_job_logs_as_events(
288    client: &HarmontClient,
289    log_base: &str,
290    step_id: Uuid,
291    token: &str,
292    tx: &tokio::sync::mpsc::Sender<BuildEvent>,
293) -> Result<()> {
294    let stream = client.stream_job_logs(log_base, step_id, token).await?;
295    futures_util::pin_mut!(stream);
296    let mut buf = String::new();
297    let mut last_stream = StreamKind::Stdout;
298    while let Some(item) = stream.next().await {
299        match item {
300            Ok(LogEvent::History(chunks)) => {
301                for c in chunks {
302                    last_stream = c.stream;
303                    if emit(tx, step_id, c.stream, c.ts_unix_ns, &mut buf, &c.content)
304                        .await
305                        .is_err()
306                    {
307                        // Receiver dropped — treat as clean stop.
308                        return Ok(());
309                    }
310                }
311            }
312            Ok(LogEvent::Chunk(c)) => {
313                last_stream = c.stream;
314                if emit(tx, step_id, c.stream, c.ts_unix_ns, &mut buf, &c.content)
315                    .await
316                    .is_err()
317                {
318                    // Receiver dropped — treat as clean stop.
319                    return Ok(());
320                }
321            }
322            Ok(LogEvent::Done) => break,
323            Err(e) => return Err(e.into()),
324        }
325    }
326    // Flush any trailing partial line.
327    if !buf.is_empty() {
328        let line = std::mem::take(&mut buf);
329        // Ignore send failure: receiver dropping at flush time is still a
330        // clean stop.
331        let _ = tx
332            .send(BuildEvent::StepLog {
333                step_id,
334                stream: map_stream(last_stream),
335                line,
336                ts: Utc::now(),
337            })
338            .await;
339    }
340    Ok(())
341}
342
343/// Thin wrapper used by the multi-job watch loop. Errors are treated as
344/// best-effort (log stream for this job stops, other jobs continue) — with one
345/// exception: a `401 Unauthorized` (the log token expired mid-build) is
346/// surfaced as a single one-line notice on the step's stream instead of being
347/// dropped silently, so the gulf of evaluation ("why did my logs stop?") stays
348/// narrow. The build-status poll still drives the build to its real verdict.
349async fn stream_one(
350    client: HarmontClient,
351    log_base: String,
352    job_id: Uuid,
353    token: String,
354    tx: tokio::sync::mpsc::Sender<BuildEvent>,
355) {
356    let expired = stream_job_logs_as_events(&client, &log_base, job_id, &token, &tx)
357        .await
358        .err()
359        .and_then(|e| {
360            e.downcast_ref::<HarmontError>()
361                .map(|h| matches!(h, HarmontError::Unauthorized))
362        })
363        .unwrap_or(false);
364    if expired {
365        let _ = tx
366            .send(BuildEvent::StepLog {
367                step_id: job_id,
368                stream: StdStream::Stderr,
369                line: "live logs expired; full logs available via `hm cloud build show`"
370                    .to_string(),
371                ts: Utc::now(),
372            })
373            .await;
374    }
375}
376
377/// Map the SDK stream kind onto the renderer's two-way stream: `Meta` folds
378/// into `Stderr` (it's out-of-band, not pipeline stdout).
379pub(crate) const fn map_stream(kind: StreamKind) -> StdStream {
380    match kind {
381        StreamKind::Stdout => StdStream::Stdout,
382        StreamKind::Stderr | StreamKind::Meta => StdStream::Stderr,
383    }
384}
385
386/// Buffer content and emit complete `\n`-terminated lines as `StepLog`
387/// events. Returns `Err(())` if the receiver dropped (caller should stop).
388async fn emit(
389    tx: &tokio::sync::mpsc::Sender<BuildEvent>,
390    job_id: Uuid,
391    kind: StreamKind,
392    ts_unix_ns: Option<i64>,
393    buf: &mut String,
394    content: &str,
395) -> std::result::Result<(), ()> {
396    buf.push_str(content);
397    while let Some(nl) = buf.find('\n') {
398        let raw: String = buf.drain(..=nl).collect();
399        let line = raw.trim_end_matches(['\r', '\n']).to_string();
400        tx.send(BuildEvent::StepLog {
401            step_id: job_id,
402            stream: map_stream(kind),
403            line,
404            ts: ts_or_now(ts_unix_ns),
405        })
406        .await
407        .map_err(|_| ())?;
408    }
409    Ok(())
410}
411
412#[cfg(test)]
413mod tests {
414    use super::{JobState, exit_code_for_state, job_logs_available};
415
416    #[test]
417    fn logs_available_for_running_and_terminal_states() {
418        for state in [
419            JobState::Running,
420            JobState::Passed,
421            JobState::Failed,
422            JobState::TimedOut,
423            JobState::Canceling,
424            JobState::Canceled,
425            JobState::TimingOut,
426        ] {
427            assert!(job_logs_available(state), "expected logs for {state}");
428        }
429    }
430
431    #[test]
432    fn no_logs_before_start_or_when_skipped() {
433        for state in [
434            JobState::Pending,
435            JobState::Scheduled,
436            JobState::Assigned,
437            JobState::Skipped,
438        ] {
439            assert!(!job_logs_available(state), "expected no logs for {state}");
440        }
441    }
442
443    #[test]
444    fn passed_is_zero_canceled_is_130_else_is_one() {
445        assert_eq!(exit_code_for_state("passed"), 0);
446        // A server-side cancel must NOT collapse to the generic failure code.
447        assert_eq!(exit_code_for_state("canceled"), 130);
448        assert_eq!(exit_code_for_state("failed"), 1);
449        // Unexpected/unknown terminal states fail closed.
450        assert_eq!(exit_code_for_state("timed_out"), 1);
451    }
452}