Skip to main content

hm_exec/cloud/
backend.rs

1//! The cloud [`ExecutionBackend`]: archive the worktree, submit the whole build
2//! to Harmont Cloud, and watch it to completion. The server schedules and runs;
3//! this backend is an *observer* (see [`Capabilities::cloud`]).
4
5use harmont_cloud::{
6    HarmontClient, HarmontError,
7    builds::{NewBuild, NewRepoBuild},
8};
9use hm_plugin_protocol::events::{BuildEvent, BuildRef};
10use tokio::sync::mpsc;
11use tokio_util::sync::CancellationToken;
12
13use std::path::Path;
14
15use crate::{
16    BackendError, BackendHandle, BuildOutcome, BuildStatus, Capabilities, ExecutionBackend, Result,
17    RunRequest,
18};
19
20/// Soft warning threshold for the (compressed) source archive. Above this we
21/// nudge the user toward a `.gitignore` fix but still upload.
22const ARCHIVE_WARN_BYTES: u64 = 4 * 1024 * 1024;
23
24/// Hard cap for the (compressed) source archive. The build request base64-
25/// encodes this blob into a single JSON POST; base64 inflates by ~4/3, and the
26/// backend's JSON body limit is ~8 MB, so a 6 MiB raw cap keeps the encoded
27/// body (plus the IR + envelope) comfortably under that limit. Over the cap we
28/// fail fast BEFORE the upload.
29const ARCHIVE_CAP_BYTES: u64 = 6 * 1024 * 1024;
30
31/// Submits the whole build to Harmont Cloud and watches it; the server schedules.
32#[derive(Debug)]
33pub struct CloudBackend {
34    client: HarmontClient,
35    /// API base used as the SSE log stream host during `watch_build`.
36    api_base: String,
37    /// Dashboard (SPA) base used to build the human-clickable watch URL. This
38    /// is the `app.` host, NOT `api.` — a link built from `api_base` lands on
39    /// raw JSON. Resolved via [`hm_config::app_url`] at the call site.
40    app_base: String,
41    org: String,
42}
43
44impl CloudBackend {
45    /// Construct a `CloudBackend`.
46    ///
47    /// `client`/`api_base` come from the CLI's resolved cloud settings;
48    /// `app_base` is the dashboard host the watch URL points at (see the field
49    /// docs); `org` is the resolved organization slug.
50    #[must_use]
51    // `api_base` (the API host) and `app_base` (the dashboard host) are two
52    // distinct hosts that must not be confused — the whole point of this fix —
53    // so the near-identical names are deliberate and documented.
54    #[allow(clippy::similar_names)]
55    pub const fn new(
56        client: HarmontClient,
57        api_base: String,
58        app_base: String,
59        org: String,
60    ) -> Self {
61        Self {
62            client,
63            api_base,
64            app_base,
65            org,
66        }
67    }
68}
69
70#[async_trait::async_trait]
71impl ExecutionBackend for CloudBackend {
72    fn name(&self) -> &'static str {
73        "cloud"
74    }
75
76    fn capabilities(&self) -> Capabilities {
77        Capabilities::cloud()
78    }
79
80    // The two submit branches (by-slug vs. repo-identity) plus the
81    // watch/no-watch handling push this just over the 100-line lint; splitting
82    // it would only scatter the linear submit-then-watch flow.
83    #[allow(clippy::too_many_lines)]
84    async fn start(&self, req: RunRequest) -> Result<BackendHandle> {
85        // Archive the worktree (fail fast as a setup error).
86        let source_tgz = crate::local::build_archive_bytes(&req.repo_root)
87            .map_err(|e| BackendError::Local(format!("archiving worktree: {e}")))?;
88
89        // Guard the upload size BEFORE the POST: warn when large, fail fast over
90        // the cap (so the user never waits on a doomed upload), and always show
91        // the size so the upload isn't a silent gulf of evaluation.
92        guard_archive_size(source_tgz.len(), &req.repo_root)?;
93
94        // Submit the build. Normally we address the pipeline by repo identity
95        // (`submit_repo_build` resolves `(repo_name, source_slug)` to the
96        // server's global slug). But when the driver has already resolved or
97        // created the pipeline — for a repo the server hasn't discovered — it
98        // passes the global slug directly, and we submit by slug
99        // (`submit_build`), bypassing repo-identity resolution.
100        let build = if let Some(slug) = req.cloud_pipeline_slug.clone() {
101            self.client
102                .submit_build(NewBuild {
103                    org: self.org.clone(),
104                    pipeline: slug,
105                    branch: req.source.branch.clone(),
106                    commit: req.source.commit.clone(),
107                    message: req.source.message.clone(),
108                    pipeline_ir: req.plan.ir_json.clone(), // verbatim
109                    source_tgz,
110                    env: req.env.clone().into_iter().collect(),
111                })
112                .await
113                .map_err(map_harmont_err)?
114        } else {
115            let repo_name = req.source.repo_name.clone().ok_or_else(|| {
116                BackendError::Local(
117                    "cloud runs need a git remote to identify the pipeline — add an \
118                     `origin` remote, or run from a cloned repo"
119                        .into(),
120                )
121            })?;
122            self.client
123                .submit_repo_build(NewRepoBuild {
124                    org: self.org.clone(),
125                    repo_name,
126                    source_slug: req.pipeline_slug.clone(),
127                    branch: req.source.branch.clone(),
128                    commit: req.source.commit.clone(),
129                    message: req.source.message.clone(),
130                    pipeline_ir: req.plan.ir_json.clone(), // verbatim
131                    source_tgz,
132                    env: req.env.clone().into_iter().collect(),
133                })
134                .await
135                .map_err(map_harmont_err)?
136        };
137
138        // The server resolved (and returns) the global slug; watch/cancel/log
139        // endpoints are addressed by it, NOT by the source slug we submitted.
140        let pipeline = build.pipeline_slug.clone();
141
142        // Build the dashboard URL from the app host (NOT the API host) and the
143        // SPA route shape `/:orgSlug/pipelines/:slug/builds/:number`. A link
144        // built from `api_base` or without the `pipelines/` segment is
145        // unclickable — it lands on raw JSON or a 404.
146        let watch_url = Some(dashboard_build_url(
147            &self.app_base,
148            &self.org,
149            &pipeline,
150            build.number,
151        ));
152        let build_ref = BuildRef {
153            run_id: uuid::Uuid::new_v4(),
154            number: Some(build.number),
155            org: Some(self.org.clone()),
156            pipeline: pipeline.clone(),
157        };
158
159        let (tx, rx) = mpsc::channel(1024);
160        let cancel = CancellationToken::new();
161
162        // Emit `BuildAccepted` immediately (the CLI prints the watch line from
163        // this). `try_send` can't fail: the receiver is alive and the buffer
164        // is empty.
165        let _ = tx.try_send(BuildEvent::BuildAccepted {
166            build: build_ref.clone(),
167            watch_url: watch_url.clone(),
168        });
169
170        // `--no-watch`: detach. The build was accepted server-side; resolve to
171        // a terminal "submitted" outcome at once. The server keeps running it.
172        if !req.options.watch {
173            let now = chrono::Utc::now();
174            let outcome = BuildOutcome {
175                build: build_ref,
176                status: BuildStatus::Passed,
177                steps: vec![],
178                started_at: now,
179                finished_at: now,
180                watch_url,
181            };
182            let join = tokio::spawn(async move {
183                drop(tx); // close the stream so the renderer terminates
184                Ok(outcome)
185            });
186            return Ok(BackendHandle::spawn(rx, cancel, join));
187        }
188
189        let client = self.client.clone();
190        let api_base = self.api_base.clone();
191        let org = self.org.clone();
192        let number = build.number;
193        let token = cancel.clone();
194        let started = chrono::Utc::now();
195        let join = tokio::spawn(async move {
196            let exit = tokio::select! {
197                biased;
198                () = token.cancelled() => {
199                    // Cancel server-side (best-effort) and report Canceled.
200                    let _ = client.cancel_build(&org, &pipeline, number).await;
201                    return Ok(BuildOutcome {
202                        build: build_ref,
203                        status: BuildStatus::Canceled,
204                        steps: vec![],
205                        started_at: started,
206                        finished_at: chrono::Utc::now(),
207                        watch_url,
208                    });
209                }
210                r = crate::cloud::watch::watch_build(&client, &api_base, &org, &pipeline, number, tx) => {
211                    r.map_err(|e| BackendError::LogStream(e.to_string()))?
212                }
213            };
214            // Map the terminal exit code `watch_build` reports back to a
215            // verdict. 130 is a server-side cancel (see
216            // `watch::exit_code_for_state`) — report it as Canceled, NOT a
217            // failure, mirroring the local scheduler.
218            let status = match exit {
219                0 => BuildStatus::Passed,
220                130 => BuildStatus::Canceled,
221                _ => BuildStatus::Failed,
222            };
223            Ok(BuildOutcome {
224                build: build_ref,
225                status,
226                // TODO(v1 follow-up): collect per-step summaries from the
227                // `StepEnd` events `watch_build` already emits.
228                steps: vec![],
229                started_at: started,
230                finished_at: chrono::Utc::now(),
231                watch_url,
232            })
233        });
234        Ok(BackendHandle::spawn(rx, cancel, join))
235    }
236}
237
238/// Map the SDK error onto the backend-boundary error (the CLI maps THIS to the
239/// project error doctrine). Exhaustive over [`HarmontError`].
240fn map_harmont_err(e: HarmontError) -> BackendError {
241    match e {
242        HarmontError::Unauthorized => BackendError::Unauthorized,
243        HarmontError::Api {
244            status: _,
245            code,
246            message,
247        } => BackendError::Rejected { code, message },
248        HarmontError::NotFound(w) => BackendError::NotFound(w),
249        HarmontError::Transport(m) => BackendError::Transport(m),
250        HarmontError::Decode(m) | HarmontError::LogStream(m) => BackendError::LogStream(m),
251    }
252}
253
254/// Build the human-clickable dashboard URL for a build, matching the SPA route
255/// `/:orgSlug/pipelines/:slug/builds/:number`. `app_base` is the dashboard
256/// host (see [`CloudBackend`]'s `app_base` field) with no trailing slash.
257fn dashboard_build_url(app_base: &str, org: &str, slug: &str, number: i64) -> String {
258    format!("{app_base}/{org}/pipelines/{slug}/builds/{number}")
259}
260
261/// Render a byte count as a human "N.N MB" (mebibytes, one decimal).
262fn human_mb(bytes: u64) -> String {
263    #[allow(clippy::cast_precision_loss)] // display-only; precision is irrelevant
264    let mb = bytes as f64 / (1024.0 * 1024.0);
265    format!("{mb:.1} MB")
266}
267
268/// Guard the source-archive upload: announce its size, warn when large, and
269/// reject (fail fast) when over the cap.
270///
271/// `archive_bytes` is the compressed (`.tar.gz`) length actually shipped.
272/// `repo_root` is only walked for the "biggest paths" hint, and only when the
273/// archive is large enough to warn or reject — so the common (small) case pays
274/// nothing extra.
275fn guard_archive_size(archive_len: usize, repo_root: &Path) -> Result<()> {
276    let bytes = archive_len as u64;
277
278    // Always close the gulf of evaluation: a multi-second silent upload is a
279    // wide gulf. Name the size up front.
280    tracing::info!("uploading source archive ({})", human_mb(bytes));
281
282    if bytes <= ARCHIVE_WARN_BYTES {
283        return Ok(());
284    }
285
286    let largest = crate::local::top_level_sizes(repo_root);
287    let offenders: Vec<(String, u64)> = largest.into_iter().take(3).collect();
288
289    if bytes > ARCHIVE_CAP_BYTES {
290        return Err(BackendError::SourceTooLarge {
291            observed_bytes: bytes,
292            cap_bytes: ARCHIVE_CAP_BYTES,
293            largest_paths: offenders,
294        });
295    }
296
297    // Over the warn threshold but under the cap: nudge toward a .gitignore fix.
298    let hint = offenders
299        .iter()
300        .map(|(name, sz)| format!("{name} ({})", human_mb(*sz)))
301        .collect::<Vec<_>>()
302        .join(", ");
303    tracing::warn!(
304        "source archive is {} (largest: {}). Add big build artifacts to .gitignore to speed up uploads.",
305        human_mb(bytes),
306        if hint.is_empty() {
307            "—".to_string()
308        } else {
309            hint
310        },
311    );
312    Ok(())
313}
314
315#[cfg(test)]
316#[allow(clippy::unwrap_used, clippy::panic, clippy::cast_possible_truncation)]
317mod tests {
318    use super::*;
319
320    #[test]
321    fn watch_url_uses_app_host_and_pipelines_path() {
322        // Mirrors hm_config::app_url(DEFAULT_API_URL) -> https://app.harmont.dev.
323        assert_eq!(
324            dashboard_build_url("https://app.harmont.dev", "acme", "web", 42),
325            "https://app.harmont.dev/acme/pipelines/web/builds/42"
326        );
327    }
328
329    #[test]
330    fn archive_under_warn_passes() {
331        // A tiny archive (well under the warn threshold) never walks the tree
332        // and never errors; repo_root is irrelevant.
333        guard_archive_size(1024, std::path::Path::new("/nonexistent")).unwrap();
334    }
335
336    #[test]
337    fn archive_over_cap_fails_with_source_too_large() {
338        let tmp = tempfile::tempdir().unwrap();
339        let err = guard_archive_size(ARCHIVE_CAP_BYTES as usize + 1, tmp.path()).unwrap_err();
340        match err {
341            BackendError::SourceTooLarge {
342                observed_bytes,
343                cap_bytes,
344                ..
345            } => {
346                assert_eq!(observed_bytes, ARCHIVE_CAP_BYTES + 1);
347                assert_eq!(cap_bytes, ARCHIVE_CAP_BYTES);
348            }
349            other => panic!("expected SourceTooLarge, got {other:?}"),
350        }
351    }
352
353    #[test]
354    fn human_mb_formats_one_decimal() {
355        assert_eq!(human_mb(6 * 1024 * 1024), "6.0 MB");
356        assert_eq!(human_mb(1536 * 1024), "1.5 MB");
357    }
358}