1use harmont_cloud::{HarmontClient, HarmontError, builds::NewBuild};
6use hm_plugin_protocol::events::{BuildEvent, BuildRef};
7use tokio::sync::mpsc;
8use tokio_util::sync::CancellationToken;
9
10use std::path::Path;
11
12use crate::{
13 BackendError, BackendHandle, BuildOutcome, BuildStatus, Capabilities, ExecutionBackend, Result,
14 RunRequest,
15};
16
17const ARCHIVE_WARN_BYTES: u64 = 4 * 1024 * 1024;
20
21const ARCHIVE_CAP_BYTES: u64 = 6 * 1024 * 1024;
27
28#[derive(Debug)]
30pub struct CloudBackend {
31 client: HarmontClient,
32 api_base: String,
34 app_base: String,
38 org: String,
39}
40
41impl CloudBackend {
42 #[must_use]
48 #[allow(clippy::similar_names)]
52 pub const fn new(
53 client: HarmontClient,
54 api_base: String,
55 app_base: String,
56 org: String,
57 ) -> Self {
58 Self {
59 client,
60 api_base,
61 app_base,
62 org,
63 }
64 }
65}
66
67#[async_trait::async_trait]
68impl ExecutionBackend for CloudBackend {
69 fn name(&self) -> &'static str {
70 "cloud"
71 }
72
73 fn capabilities(&self) -> Capabilities {
74 Capabilities::cloud()
75 }
76
77 async fn start(&self, req: RunRequest) -> Result<BackendHandle> {
78 let source_tgz = crate::local::build_archive_bytes(&req.repo_root)
80 .map_err(|e| BackendError::Local(format!("archiving worktree: {e}")))?;
81
82 guard_archive_size(source_tgz.len(), &req.repo_root)?;
86
87 let build = self
90 .client
91 .submit_build(NewBuild {
92 org: self.org.clone(),
93 pipeline: req.pipeline_slug.clone(),
94 branch: req.source.branch.clone(),
95 commit: req.source.commit.clone(),
96 message: req.source.message.clone(),
97 pipeline_ir: req.plan.ir_json.clone(), source_tgz,
99 env: req.env.clone().into_iter().collect(),
100 })
101 .await
102 .map_err(map_harmont_err)?;
103
104 let watch_url = Some(dashboard_build_url(
109 &self.app_base,
110 &self.org,
111 &req.pipeline_slug,
112 build.number,
113 ));
114 let build_ref = BuildRef {
115 run_id: uuid::Uuid::new_v4(),
116 number: Some(build.number),
117 org: Some(self.org.clone()),
118 pipeline: req.pipeline_slug.clone(),
119 };
120
121 let (tx, rx) = mpsc::channel(1024);
122 let cancel = CancellationToken::new();
123
124 let _ = tx.try_send(BuildEvent::BuildAccepted {
128 build: build_ref.clone(),
129 watch_url: watch_url.clone(),
130 });
131
132 if !req.options.watch {
135 let now = chrono::Utc::now();
136 let outcome = BuildOutcome {
137 build: build_ref,
138 status: BuildStatus::Passed,
139 steps: vec![],
140 started_at: now,
141 finished_at: now,
142 watch_url,
143 };
144 let join = tokio::spawn(async move {
145 drop(tx); Ok(outcome)
147 });
148 return Ok(BackendHandle::spawn(rx, cancel, join));
149 }
150
151 let client = self.client.clone();
152 let api_base = self.api_base.clone();
153 let org = self.org.clone();
154 let pipeline = req.pipeline_slug.clone();
155 let number = build.number;
156 let token = cancel.clone();
157 let started = chrono::Utc::now();
158 let join = tokio::spawn(async move {
159 let exit = tokio::select! {
160 biased;
161 () = token.cancelled() => {
162 let _ = client.cancel_build(&org, &pipeline, number).await;
164 return Ok(BuildOutcome {
165 build: build_ref,
166 status: BuildStatus::Canceled,
167 steps: vec![],
168 started_at: started,
169 finished_at: chrono::Utc::now(),
170 watch_url,
171 });
172 }
173 r = crate::cloud::watch::watch_build(&client, &api_base, &org, &pipeline, number, tx) => {
174 r.map_err(|e| BackendError::LogStream(e.to_string()))?
175 }
176 };
177 let status = match exit {
182 0 => BuildStatus::Passed,
183 130 => BuildStatus::Canceled,
184 _ => BuildStatus::Failed,
185 };
186 Ok(BuildOutcome {
187 build: build_ref,
188 status,
189 steps: vec![],
192 started_at: started,
193 finished_at: chrono::Utc::now(),
194 watch_url,
195 })
196 });
197 Ok(BackendHandle::spawn(rx, cancel, join))
198 }
199}
200
201fn map_harmont_err(e: HarmontError) -> BackendError {
204 match e {
205 HarmontError::Unauthorized => BackendError::Unauthorized,
206 HarmontError::Api {
207 status: _,
208 code,
209 message,
210 } => BackendError::Rejected { code, message },
211 HarmontError::NotFound(w) => BackendError::NotFound(w),
212 HarmontError::Transport(m) => BackendError::Transport(m),
213 HarmontError::Decode(m) | HarmontError::LogStream(m) => BackendError::LogStream(m),
214 }
215}
216
217fn dashboard_build_url(app_base: &str, org: &str, slug: &str, number: i64) -> String {
221 format!("{app_base}/{org}/pipelines/{slug}/builds/{number}")
222}
223
224fn human_mb(bytes: u64) -> String {
226 #[allow(clippy::cast_precision_loss)] let mb = bytes as f64 / (1024.0 * 1024.0);
228 format!("{mb:.1} MB")
229}
230
231fn guard_archive_size(archive_len: usize, repo_root: &Path) -> Result<()> {
239 let bytes = archive_len as u64;
240
241 tracing::info!("uploading source archive ({})", human_mb(bytes));
244
245 if bytes <= ARCHIVE_WARN_BYTES {
246 return Ok(());
247 }
248
249 let largest = crate::local::top_level_sizes(repo_root);
250 let offenders: Vec<(String, u64)> = largest.into_iter().take(3).collect();
251
252 if bytes > ARCHIVE_CAP_BYTES {
253 return Err(BackendError::SourceTooLarge {
254 observed_bytes: bytes,
255 cap_bytes: ARCHIVE_CAP_BYTES,
256 largest_paths: offenders,
257 });
258 }
259
260 let hint = offenders
262 .iter()
263 .map(|(name, sz)| format!("{name} ({})", human_mb(*sz)))
264 .collect::<Vec<_>>()
265 .join(", ");
266 tracing::warn!(
267 "source archive is {} (largest: {}). Add big build artifacts to .gitignore to speed up uploads.",
268 human_mb(bytes),
269 if hint.is_empty() {
270 "—".to_string()
271 } else {
272 hint
273 },
274 );
275 Ok(())
276}
277
278#[cfg(test)]
279#[allow(clippy::unwrap_used, clippy::panic, clippy::cast_possible_truncation)]
280mod tests {
281 use super::*;
282
283 #[test]
284 fn watch_url_uses_app_host_and_pipelines_path() {
285 assert_eq!(
287 dashboard_build_url("https://app.harmont.dev", "acme", "web", 42),
288 "https://app.harmont.dev/acme/pipelines/web/builds/42"
289 );
290 }
291
292 #[test]
293 fn archive_under_warn_passes() {
294 guard_archive_size(1024, std::path::Path::new("/nonexistent")).unwrap();
297 }
298
299 #[test]
300 fn archive_over_cap_fails_with_source_too_large() {
301 let tmp = tempfile::tempdir().unwrap();
302 let err = guard_archive_size(ARCHIVE_CAP_BYTES as usize + 1, tmp.path()).unwrap_err();
303 match err {
304 BackendError::SourceTooLarge {
305 observed_bytes,
306 cap_bytes,
307 ..
308 } => {
309 assert_eq!(observed_bytes, ARCHIVE_CAP_BYTES + 1);
310 assert_eq!(cap_bytes, ARCHIVE_CAP_BYTES);
311 }
312 other => panic!("expected SourceTooLarge, got {other:?}"),
313 }
314 }
315
316 #[test]
317 fn human_mb_formats_one_decimal() {
318 assert_eq!(human_mb(6 * 1024 * 1024), "6.0 MB");
319 assert_eq!(human_mb(1536 * 1024), "1.5 MB");
320 }
321}