hm-exec 0.0.8

Pluggable CI execution backends (local VM + cloud) for the hm CLI.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
//! Watch a cloud build to completion, emitting [`BuildEvent`]s.
//!
//! Discovers jobs, streams each job's logs concurrently, and maps cloud job
//! lifecycle + SSE logs to the shared [`BuildEvent`] vocabulary so the cloud
//! path renders through the same `hm-render` renderers as a local run.
//!
//! A cloud job maps to a pipeline step (keyed by `job.id`); the cloud build
//! is modeled as a single chain (`chain_idx == 0`, `chain_count == 1`).

use std::collections::{HashMap, HashSet};
use std::time::{Duration, Instant};

use anyhow::Result;
use chrono::{DateTime, Utc};
use futures_util::StreamExt;
use harmont_cloud::{
    HarmontClient, HarmontError,
    logs::{LogEvent, StreamKind},
    models::{build_is_terminal, job_is_terminal},
    types::JobState,
};
use hm_plugin_protocol::events::{BuildEvent, PlanSummary, StdStream};
use uuid::Uuid;

/// Poll-interval for build/job status.
const POLL: Duration = Duration::from_millis(1500);

/// Re-mint the log token when its remaining lifetime drops below this margin,
/// so a stream spawned late in a long build starts with a fresh, valid token
/// instead of one that 401s within seconds.
const TOKEN_REFRESH_MARGIN: chrono::Duration = chrono::Duration::minutes(5);

/// Aborts any still-running stream tasks when dropped (covers early-return
/// error paths so no detached ghost tasks outlive `watch_build`).
#[derive(Debug)]
struct AbortGuard(Vec<tokio::task::JoinHandle<()>>);
impl Drop for AbortGuard {
    fn drop(&mut self) {
        for h in &self.0 {
            h.abort();
        }
    }
}

/// Convert a unix-nanosecond timestamp to a UTC datetime, falling back to
/// "now" when absent or out of range.
pub(crate) fn ts_or_now(ts_unix_ns: Option<i64>) -> DateTime<Utc> {
    ts_unix_ns.map_or_else(Utc::now, DateTime::<Utc>::from_timestamp_nanos)
}

/// Duration between two optional timestamps, in milliseconds (0 if either is
/// missing or the interval is negative).
fn duration_ms(start: Option<DateTime<Utc>>, end: Option<DateTime<Utc>>) -> u64 {
    match (start, end) {
        (Some(s), Some(e)) => (e - s).num_milliseconds().max(0).cast_unsigned(),
        _ => 0,
    }
}

/// Whether a job has reached a state where its logs exist (running or already
/// terminal), and so a log stream should be started for it.
///
/// Matching the typed [`JobState`] enum (rather than `to_string()`/`as_str()`
/// against string literals) makes the set of states exhaustive: when the cloud
/// adds a new `JobState` variant the compiler forces this decision to be
/// revisited, and a misspelled state can no longer silently drop a job's logs.
const fn job_logs_available(state: JobState) -> bool {
    match state {
        JobState::Running
        | JobState::Passed
        | JobState::Failed
        | JobState::TimedOut
        | JobState::Canceling
        | JobState::Canceled
        | JobState::TimingOut => true,
        // No logs yet (not started) or never produced (skipped).
        JobState::Pending | JobState::Scheduled | JobState::Assigned | JobState::Skipped => false,
    }
}

/// Map a terminal build state to the process exit code the renderer and the
/// `hm run` driver use. `passed` → 0, `canceled` → 130 (SIGINT-cancel, mirrors
/// [`crate::BuildStatus::Canceled`]), everything else (`failed`, and any
/// unexpected state) → 1. Kept in lockstep with the backend's state→status map
/// so a server-side cancel is never reported as a failure.
pub(crate) fn exit_code_for_state(state: &str) -> i32 {
    match state {
        "passed" => 0,
        "canceled" => 130,
        _ => 1,
    }
}

/// Watch `build #number` until terminal, emitting [`BuildEvent`]s on `tx`.
///
/// `log_base` is the host serving the SSE log stream (the API base in prod).
/// Returns the terminal exit code via [`exit_code_for_state`]: 0 passed, 130
/// canceled, 1 otherwise.
///
/// # Errors
/// Returns an error if any SDK call fails (build status poll, job list, or log
/// token fetch). A dropped receiver (`tx`) is treated as a clean early exit
/// (`Ok(1)`) — not an error.
#[allow(clippy::too_many_lines)] // single-responsibility poll loop; split would obscure flow
pub async fn watch_build(
    client: &HarmontClient,
    log_base: &str,
    org: &str,
    pipeline: &str,
    number: i64,
    tx: tokio::sync::mpsc::Sender<BuildEvent>,
) -> Result<i32> {
    // Log tokens carry a ~1h TTL. A long build outlives a single mint, so a
    // job whose stream starts late in the build would 401 mid-stream. We keep
    // the minted token (with its `expires_at`) and re-mint before spawning a
    // new stream once we're within `TOKEN_REFRESH_MARGIN` of expiry, so every
    // later-starting step gets a valid token. Streams that 401 anyway surface a
    // one-line notice (see `stream_one`) rather than silently dropping logs.
    let mut log_token = client.log_token(org, pipeline, number).await?;

    let started = Instant::now();
    if tx
        .send(BuildEvent::BuildStart {
            run_id: Uuid::new_v4(),
            plan: PlanSummary {
                // #jobs isn't known until the first list_jobs; 0 is a fine
                // placeholder (renderers treat it as "not yet known").
                step_count: 0,
                chain_count: 1,
                default_runner: "cloud".to_string(),
            },
            started_at: Utc::now(),
        })
        .await
        .is_err()
    {
        // Renderer side went away — nothing left to drive.
        return Ok(1);
    }

    // Jobs we've started a log stream for.
    let mut streaming: HashSet<Uuid> = HashSet::new();
    // Deduplicates the post-drain StepEnd sweep: if `list_jobs` returns the
    // same job ID more than once we emit only one StepEnd per job.
    let mut ended: HashSet<Uuid> = HashSet::new();
    // Stable chain-local index assigned in discovery order.
    let mut chain_idx: HashMap<Uuid, usize> = HashMap::new();
    let mut next_idx: usize = 0;
    let mut guard = AbortGuard(Vec::new());

    let final_state = loop {
        // Discover jobs; start a log stream for each job that has reached a
        // state where logs exist (running or already terminal).
        let jobs = client.list_jobs(org, pipeline, number).await?;
        for job in &jobs {
            if job_logs_available(job.state) && streaming.insert(job.id) {
                let name = job.name.clone().unwrap_or_else(|| "job".to_string());
                let idx = *chain_idx.entry(job.id).or_insert_with(|| {
                    let i = next_idx;
                    next_idx += 1;
                    i
                });
                if tx
                    .send(BuildEvent::StepQueued {
                        step_id: job.id,
                        key: name.clone(),
                        chain_idx: idx,
                        parent_key: None,
                        display_name: name.clone(),
                    })
                    .await
                    .is_err()
                {
                    return Ok(1);
                }
                if tx
                    .send(BuildEvent::StepStart {
                        step_id: job.id,
                        runner: "cloud".to_string(),
                        image: None,
                    })
                    .await
                    .is_err()
                {
                    return Ok(1);
                }
                // Re-mint the token if it's near expiry before this (possibly
                // late-starting) stream begins. A re-mint failure is
                // non-fatal: fall back to the existing token and let
                // `stream_one` surface a notice if the server rejects it.
                if log_token.expires_at - Utc::now() < TOKEN_REFRESH_MARGIN {
                    match client.log_token(org, pipeline, number).await {
                        Ok(fresh) => log_token = fresh,
                        Err(e) => tracing::warn!("log-token refresh failed: {e}"),
                    }
                }
                guard.0.push(tokio::spawn(stream_one(
                    client.clone(),
                    log_base.to_string(),
                    job.id,
                    log_token.token.clone(),
                    tx.clone(),
                )));
            }
            // NOTE: StepEnd is intentionally NOT emitted here. A job's log
            // stream runs in a spawned task concurrently with this poll loop;
            // emitting StepEnd now could order it ahead of that job's still-
            // in-flight StepLog lines. We drain every stream below, then emit
            // all StepEnds — guaranteeing logs precede the step's terminal mark.
        }

        let build = client.get_build(org, pipeline, number).await?;
        if build_is_terminal(&build.state.to_string()) {
            break build.state.to_string();
        }
        // TODO: no overall deadline; a build stuck non-terminal loops forever
        // (matches `hm cloud build watch`). Consider a --timeout ceiling.
        tokio::time::sleep(POLL).await;
    };

    // Drain all log streams (empties the guard so Drop aborts nothing on the
    // success path).
    for h in guard.0.drain(..) {
        let _ = h.await;
    }

    // Emit StepEnd for any terminal job not yet ended (e.g. a job that went
    // straight to terminal in the same poll the build did).
    if let Ok(jobs) = client.list_jobs(org, pipeline, number).await {
        for job in &jobs {
            if job_is_terminal(&job.state.to_string())
                && ended.insert(job.id)
                && tx.send(step_end(job)).await.is_err()
            {
                return Ok(1);
            }
        }
    }

    let code = exit_code_for_state(&final_state);
    // Best-effort close; ignore a dropped receiver.
    let _ = tx
        .send(BuildEvent::BuildEnd {
            exit_code: code,
            // Saturate at u64::MAX (~584 million years) rather than panic.
            duration_ms: u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX),
        })
        .await;
    Ok(code)
}

/// Build a `StepEnd` event from a (terminal) job's recorded fields.
fn step_end(job: &harmont_cloud::models::Job) -> BuildEvent {
    let state = job.state.to_string();
    let passed = matches!(state.as_str(), "passed" | "skipped");
    let exit_code = job
        .exit_code
        // Saturate exit codes outside [i32::MIN, i32::MAX] rather than panic.
        .map_or_else(|| i32::from(!passed), |c| i32::try_from(c).unwrap_or(1));
    BuildEvent::StepEnd {
        step_id: job.id,
        exit_code,
        duration_ms: duration_ms(job.started_at, job.finished_at),
        snapshot: None,
    }
}

/// Stream one job's SSE logs as [`BuildEvent::StepLog`] events.
///
/// Emits a `StepLog` per complete line (keyed by `step_id`) to `tx`, until
/// the job's `done` event. Buffers partial lines and flushes the trailing
/// remainder. Used by both the multi-job watch loop and the single-job
/// `hm cloud job log` tail.
///
/// Returns `Ok(())` on a clean `done` close. A dropped receiver (`tx.send`
/// fails) is treated as a clean stop — the caller has gone away, not the job.
///
/// **Error semantics are caller-controlled:**
/// - The multi-job watcher (`stream_one`) swallows the error (best-effort: log
///   other jobs, keep watching).
/// - The single-job tail (`hm cloud job log`) propagates it (`?`) so the
///   command surfaces transport failures to the user.
///
/// # Errors
/// Returns an error on transport or SSE stream failure (the underlying
/// [`HarmontClient::stream_job_logs`] call or a non-`Done` error event).
pub async fn stream_job_logs_as_events(
    client: &HarmontClient,
    log_base: &str,
    step_id: Uuid,
    token: &str,
    tx: &tokio::sync::mpsc::Sender<BuildEvent>,
) -> Result<()> {
    let stream = client.stream_job_logs(log_base, step_id, token).await?;
    futures_util::pin_mut!(stream);
    let mut buf = String::new();
    let mut last_stream = StreamKind::Stdout;
    while let Some(item) = stream.next().await {
        match item {
            Ok(LogEvent::History(chunks)) => {
                for c in chunks {
                    last_stream = c.stream;
                    if emit(tx, step_id, c.stream, c.ts_unix_ns, &mut buf, &c.content)
                        .await
                        .is_err()
                    {
                        // Receiver dropped — treat as clean stop.
                        return Ok(());
                    }
                }
            }
            Ok(LogEvent::Chunk(c)) => {
                last_stream = c.stream;
                if emit(tx, step_id, c.stream, c.ts_unix_ns, &mut buf, &c.content)
                    .await
                    .is_err()
                {
                    // Receiver dropped — treat as clean stop.
                    return Ok(());
                }
            }
            Ok(LogEvent::Done) => break,
            Err(e) => return Err(e.into()),
        }
    }
    // Flush any trailing partial line.
    if !buf.is_empty() {
        let line = std::mem::take(&mut buf);
        // Ignore send failure: receiver dropping at flush time is still a
        // clean stop.
        let _ = tx
            .send(BuildEvent::StepLog {
                step_id,
                stream: map_stream(last_stream),
                line,
                ts: Utc::now(),
            })
            .await;
    }
    Ok(())
}

/// Thin wrapper used by the multi-job watch loop. Errors are treated as
/// best-effort (log stream for this job stops, other jobs continue) — with one
/// exception: a `401 Unauthorized` (the log token expired mid-build) is
/// surfaced as a single one-line notice on the step's stream instead of being
/// dropped silently, so the gulf of evaluation ("why did my logs stop?") stays
/// narrow. The build-status poll still drives the build to its real verdict.
async fn stream_one(
    client: HarmontClient,
    log_base: String,
    job_id: Uuid,
    token: String,
    tx: tokio::sync::mpsc::Sender<BuildEvent>,
) {
    let expired = stream_job_logs_as_events(&client, &log_base, job_id, &token, &tx)
        .await
        .err()
        .and_then(|e| {
            e.downcast_ref::<HarmontError>()
                .map(|h| matches!(h, HarmontError::Unauthorized))
        })
        .unwrap_or(false);
    if expired {
        let _ = tx
            .send(BuildEvent::StepLog {
                step_id: job_id,
                stream: StdStream::Stderr,
                line: "live logs expired; full logs available via `hm cloud build show`"
                    .to_string(),
                ts: Utc::now(),
            })
            .await;
    }
}

/// Map the SDK stream kind onto the renderer's two-way stream: `Meta` folds
/// into `Stderr` (it's out-of-band, not pipeline stdout).
pub(crate) const fn map_stream(kind: StreamKind) -> StdStream {
    match kind {
        StreamKind::Stdout => StdStream::Stdout,
        StreamKind::Stderr | StreamKind::Meta => StdStream::Stderr,
    }
}

/// Buffer content and emit complete `\n`-terminated lines as `StepLog`
/// events. Returns `Err(())` if the receiver dropped (caller should stop).
async fn emit(
    tx: &tokio::sync::mpsc::Sender<BuildEvent>,
    job_id: Uuid,
    kind: StreamKind,
    ts_unix_ns: Option<i64>,
    buf: &mut String,
    content: &str,
) -> std::result::Result<(), ()> {
    buf.push_str(content);
    while let Some(nl) = buf.find('\n') {
        let raw: String = buf.drain(..=nl).collect();
        let line = raw.trim_end_matches(['\r', '\n']).to_string();
        tx.send(BuildEvent::StepLog {
            step_id: job_id,
            stream: map_stream(kind),
            line,
            ts: ts_or_now(ts_unix_ns),
        })
        .await
        .map_err(|_| ())?;
    }
    Ok(())
}

#[cfg(test)]
mod tests {
    use super::{JobState, exit_code_for_state, job_logs_available};

    #[test]
    fn logs_available_for_running_and_terminal_states() {
        for state in [
            JobState::Running,
            JobState::Passed,
            JobState::Failed,
            JobState::TimedOut,
            JobState::Canceling,
            JobState::Canceled,
            JobState::TimingOut,
        ] {
            assert!(job_logs_available(state), "expected logs for {state}");
        }
    }

    #[test]
    fn no_logs_before_start_or_when_skipped() {
        for state in [
            JobState::Pending,
            JobState::Scheduled,
            JobState::Assigned,
            JobState::Skipped,
        ] {
            assert!(!job_logs_available(state), "expected no logs for {state}");
        }
    }

    #[test]
    fn passed_is_zero_canceled_is_130_else_is_one() {
        assert_eq!(exit_code_for_state("passed"), 0);
        // A server-side cancel must NOT collapse to the generic failure code.
        assert_eq!(exit_code_for_state("canceled"), 130);
        assert_eq!(exit_code_for_state("failed"), 1);
        // Unexpected/unknown terminal states fail closed.
        assert_eq!(exit_code_for_state("timed_out"), 1);
    }
}