hm_exec/cloud/backend.rs
1//! The cloud [`ExecutionBackend`]: archive the worktree, submit the whole build
2//! to Harmont Cloud, and watch it to completion. The server schedules and runs;
3//! this backend is an *observer* (see [`Capabilities::cloud`]).
4
5use harmont_cloud::{
6 HarmontClient, HarmontError,
7 builds::{NewBuild, NewRepoBuild},
8};
9use hm_plugin_protocol::events::{BuildEvent, BuildRef};
10use tokio::sync::mpsc;
11use tokio_util::sync::CancellationToken;
12
13use std::path::Path;
14
15use crate::{
16 BackendError, BackendHandle, BuildOutcome, BuildStatus, Capabilities, ExecutionBackend, Result,
17 RunRequest,
18};
19
20/// Soft warning threshold for the (compressed) source archive. Above this we
21/// nudge the user toward a `.gitignore` fix but still upload.
22const ARCHIVE_WARN_BYTES: u64 = 4 * 1024 * 1024;
23
24/// Hard cap for the (compressed) source archive. The build request base64-
25/// encodes this blob into a single JSON POST; base64 inflates by ~4/3, and the
26/// backend's JSON body limit is ~8 MB, so a 6 MiB raw cap keeps the encoded
27/// body (plus the IR + envelope) comfortably under that limit. Over the cap we
28/// fail fast BEFORE the upload.
29const ARCHIVE_CAP_BYTES: u64 = 6 * 1024 * 1024;
30
31/// Submits the whole build to Harmont Cloud and watches it; the server schedules.
32#[derive(Debug)]
33pub struct CloudBackend {
34 client: HarmontClient,
35 /// API base used as the SSE log stream host during `watch_build`.
36 api_base: String,
37 /// Dashboard (SPA) base used to build the human-clickable watch URL. This
38 /// is the `app.` host, NOT `api.` — a link built from `api_base` lands on
39 /// raw JSON. Resolved via [`hm_config::app_url`] at the call site.
40 app_base: String,
41 org: String,
42}
43
44impl CloudBackend {
45 /// Construct a `CloudBackend`.
46 ///
47 /// `client`/`api_base` come from the CLI's resolved cloud settings;
48 /// `app_base` is the dashboard host the watch URL points at (see the field
49 /// docs); `org` is the resolved organization slug.
50 #[must_use]
51 // `api_base` (the API host) and `app_base` (the dashboard host) are two
52 // distinct hosts that must not be confused — the whole point of this fix —
53 // so the near-identical names are deliberate and documented.
54 #[allow(clippy::similar_names)]
55 pub const fn new(
56 client: HarmontClient,
57 api_base: String,
58 app_base: String,
59 org: String,
60 ) -> Self {
61 Self {
62 client,
63 api_base,
64 app_base,
65 org,
66 }
67 }
68}
69
70#[async_trait::async_trait]
71impl ExecutionBackend for CloudBackend {
72 fn name(&self) -> &'static str {
73 "cloud"
74 }
75
76 fn capabilities(&self) -> Capabilities {
77 Capabilities::cloud()
78 }
79
80 // The two submit branches (by-slug vs. repo-identity) plus the
81 // watch/no-watch handling push this just over the 100-line lint; splitting
82 // it would only scatter the linear submit-then-watch flow.
83 #[allow(clippy::too_many_lines)]
84 async fn start(&self, req: RunRequest) -> Result<BackendHandle> {
85 // Archive the worktree (fail fast as a setup error).
86 let source_tgz = crate::local::build_archive_bytes(&req.repo_root)
87 .map_err(|e| BackendError::Local(format!("archiving worktree: {e}")))?;
88
89 // Guard the upload size BEFORE the POST: warn when large, fail fast over
90 // the cap (so the user never waits on a doomed upload), and always show
91 // the size so the upload isn't a silent gulf of evaluation.
92 guard_archive_size(source_tgz.len(), &req.repo_root)?;
93
94 // Submit the build. Normally we address the pipeline by repo identity
95 // (`submit_repo_build` resolves `(repo_name, source_slug)` to the
96 // server's global slug). But when the driver has already resolved or
97 // created the pipeline — for a repo the server hasn't discovered — it
98 // passes the global slug directly, and we submit by slug
99 // (`submit_build`), bypassing repo-identity resolution.
100 let build = if let Some(slug) = req.cloud_pipeline_slug.clone() {
101 self.client
102 .submit_build(NewBuild {
103 org: self.org.clone(),
104 pipeline: slug,
105 branch: req.source.branch.clone(),
106 commit: req.source.commit.clone(),
107 message: req.source.message.clone(),
108 pipeline_ir: req.plan.ir_json.clone(), // verbatim
109 source_tgz,
110 env: req.env.clone().into_iter().collect(),
111 })
112 .await
113 .map_err(map_harmont_err)?
114 } else {
115 let repo_name = req.source.repo_name.clone().ok_or_else(|| {
116 BackendError::Local(
117 "cloud runs need a git remote to identify the pipeline — add an \
118 `origin` remote, or run from a cloned repo"
119 .into(),
120 )
121 })?;
122 self.client
123 .submit_repo_build(NewRepoBuild {
124 org: self.org.clone(),
125 repo_name,
126 source_slug: req.pipeline_slug.clone(),
127 branch: req.source.branch.clone(),
128 commit: req.source.commit.clone(),
129 message: req.source.message.clone(),
130 pipeline_ir: req.plan.ir_json.clone(), // verbatim
131 source_tgz,
132 env: req.env.clone().into_iter().collect(),
133 })
134 .await
135 .map_err(map_harmont_err)?
136 };
137
138 // The server resolved (and returns) the global slug; watch/cancel/log
139 // endpoints are addressed by it, NOT by the source slug we submitted.
140 let pipeline = build.pipeline_slug.clone();
141
142 // Build the dashboard URL from the app host (NOT the API host) and the
143 // SPA route shape `/:orgSlug/pipelines/:slug/builds/:number`. A link
144 // built from `api_base` or without the `pipelines/` segment is
145 // unclickable — it lands on raw JSON or a 404.
146 let watch_url = Some(dashboard_build_url(
147 &self.app_base,
148 &self.org,
149 &pipeline,
150 build.number,
151 ));
152 let build_ref = BuildRef {
153 run_id: uuid::Uuid::new_v4(),
154 number: Some(build.number),
155 org: Some(self.org.clone()),
156 pipeline: pipeline.clone(),
157 };
158
159 let (tx, rx) = mpsc::channel(1024);
160 let cancel = CancellationToken::new();
161
162 // Emit `BuildAccepted` immediately (the CLI prints the watch line from
163 // this). `try_send` can't fail: the receiver is alive and the buffer
164 // is empty.
165 let _ = tx.try_send(BuildEvent::BuildAccepted {
166 build: build_ref.clone(),
167 watch_url: watch_url.clone(),
168 });
169
170 // `--no-watch`: detach. The build was accepted server-side; resolve to
171 // a terminal "submitted" outcome at once. The server keeps running it.
172 if !req.options.watch {
173 let now = chrono::Utc::now();
174 let outcome = BuildOutcome {
175 build: build_ref,
176 status: BuildStatus::Passed,
177 steps: vec![],
178 started_at: now,
179 finished_at: now,
180 watch_url,
181 };
182 let join = tokio::spawn(async move {
183 drop(tx); // close the stream so the renderer terminates
184 Ok(outcome)
185 });
186 return Ok(BackendHandle::spawn(rx, cancel, join));
187 }
188
189 let client = self.client.clone();
190 let api_base = self.api_base.clone();
191 let org = self.org.clone();
192 let number = build.number;
193 let token = cancel.clone();
194 let started = chrono::Utc::now();
195 let join = tokio::spawn(async move {
196 let exit = tokio::select! {
197 biased;
198 () = token.cancelled() => {
199 // Cancel server-side (best-effort) and report Canceled.
200 let _ = client.cancel_build(&org, &pipeline, number).await;
201 return Ok(BuildOutcome {
202 build: build_ref,
203 status: BuildStatus::Canceled,
204 steps: vec![],
205 started_at: started,
206 finished_at: chrono::Utc::now(),
207 watch_url,
208 });
209 }
210 r = crate::cloud::watch::watch_build(&client, &api_base, &org, &pipeline, number, tx) => {
211 r.map_err(|e| BackendError::LogStream(e.to_string()))?
212 }
213 };
214 // Map the terminal exit code `watch_build` reports back to a
215 // verdict. 130 is a server-side cancel (see
216 // `watch::exit_code_for_state`) — report it as Canceled, NOT a
217 // failure, mirroring the local scheduler.
218 let status = match exit {
219 0 => BuildStatus::Passed,
220 130 => BuildStatus::Canceled,
221 _ => BuildStatus::Failed,
222 };
223 Ok(BuildOutcome {
224 build: build_ref,
225 status,
226 // TODO(v1 follow-up): collect per-step summaries from the
227 // `StepEnd` events `watch_build` already emits.
228 steps: vec![],
229 started_at: started,
230 finished_at: chrono::Utc::now(),
231 watch_url,
232 })
233 });
234 Ok(BackendHandle::spawn(rx, cancel, join))
235 }
236}
237
238/// Map the SDK error onto the backend-boundary error (the CLI maps THIS to the
239/// project error doctrine). Exhaustive over [`HarmontError`].
240fn map_harmont_err(e: HarmontError) -> BackendError {
241 match e {
242 HarmontError::Unauthorized => BackendError::Unauthorized,
243 HarmontError::Api {
244 status: _,
245 code,
246 message,
247 } => BackendError::Rejected { code, message },
248 HarmontError::NotFound(w) => BackendError::NotFound(w),
249 HarmontError::Transport(m) => BackendError::Transport(m),
250 HarmontError::Decode(m) | HarmontError::LogStream(m) => BackendError::LogStream(m),
251 }
252}
253
254/// Build the human-clickable dashboard URL for a build, matching the SPA route
255/// `/:orgSlug/pipelines/:slug/builds/:number`. `app_base` is the dashboard
256/// host (see [`CloudBackend`]'s `app_base` field) with no trailing slash.
257fn dashboard_build_url(app_base: &str, org: &str, slug: &str, number: i64) -> String {
258 format!("{app_base}/{org}/pipelines/{slug}/builds/{number}")
259}
260
261/// Render a byte count as a human "N.N MB" (mebibytes, one decimal).
262fn human_mb(bytes: u64) -> String {
263 #[allow(clippy::cast_precision_loss)] // display-only; precision is irrelevant
264 let mb = bytes as f64 / (1024.0 * 1024.0);
265 format!("{mb:.1} MB")
266}
267
268/// Guard the source-archive upload: announce its size, warn when large, and
269/// reject (fail fast) when over the cap.
270///
271/// `archive_bytes` is the compressed (`.tar.gz`) length actually shipped.
272/// `repo_root` is only walked for the "biggest paths" hint, and only when the
273/// archive is large enough to warn or reject — so the common (small) case pays
274/// nothing extra.
275fn guard_archive_size(archive_len: usize, repo_root: &Path) -> Result<()> {
276 let bytes = archive_len as u64;
277
278 // Always close the gulf of evaluation: a multi-second silent upload is a
279 // wide gulf. Name the size up front.
280 tracing::info!("uploading source archive ({})", human_mb(bytes));
281
282 if bytes <= ARCHIVE_WARN_BYTES {
283 return Ok(());
284 }
285
286 let largest = crate::local::top_level_sizes(repo_root);
287 let offenders: Vec<(String, u64)> = largest.into_iter().take(3).collect();
288
289 if bytes > ARCHIVE_CAP_BYTES {
290 return Err(BackendError::SourceTooLarge {
291 observed_bytes: bytes,
292 cap_bytes: ARCHIVE_CAP_BYTES,
293 largest_paths: offenders,
294 });
295 }
296
297 // Over the warn threshold but under the cap: nudge toward a .gitignore fix.
298 let hint = offenders
299 .iter()
300 .map(|(name, sz)| format!("{name} ({})", human_mb(*sz)))
301 .collect::<Vec<_>>()
302 .join(", ");
303 tracing::warn!(
304 "source archive is {} (largest: {}). Add big build artifacts to .gitignore to speed up uploads.",
305 human_mb(bytes),
306 if hint.is_empty() {
307 "—".to_string()
308 } else {
309 hint
310 },
311 );
312 Ok(())
313}
314
315#[cfg(test)]
316#[allow(clippy::unwrap_used, clippy::panic, clippy::cast_possible_truncation)]
317mod tests {
318 use super::*;
319
320 #[test]
321 fn watch_url_uses_app_host_and_pipelines_path() {
322 // Mirrors hm_config::app_url(DEFAULT_API_URL) -> https://app.harmont.dev.
323 assert_eq!(
324 dashboard_build_url("https://app.harmont.dev", "acme", "web", 42),
325 "https://app.harmont.dev/acme/pipelines/web/builds/42"
326 );
327 }
328
329 #[test]
330 fn archive_under_warn_passes() {
331 // A tiny archive (well under the warn threshold) never walks the tree
332 // and never errors; repo_root is irrelevant.
333 guard_archive_size(1024, std::path::Path::new("/nonexistent")).unwrap();
334 }
335
336 #[test]
337 fn archive_over_cap_fails_with_source_too_large() {
338 let tmp = tempfile::tempdir().unwrap();
339 let err = guard_archive_size(ARCHIVE_CAP_BYTES as usize + 1, tmp.path()).unwrap_err();
340 match err {
341 BackendError::SourceTooLarge {
342 observed_bytes,
343 cap_bytes,
344 ..
345 } => {
346 assert_eq!(observed_bytes, ARCHIVE_CAP_BYTES + 1);
347 assert_eq!(cap_bytes, ARCHIVE_CAP_BYTES);
348 }
349 other => panic!("expected SourceTooLarge, got {other:?}"),
350 }
351 }
352
353 #[test]
354 fn human_mb_formats_one_decimal() {
355 assert_eq!(human_mb(6 * 1024 * 1024), "6.0 MB");
356 assert_eq!(human_mb(1536 * 1024), "1.5 MB");
357 }
358}