use harmont_cloud::{
HarmontClient, HarmontError,
builds::{NewBuild, NewRepoBuild},
};
use hm_plugin_protocol::events::{BuildEvent, BuildRef};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use std::path::Path;
use crate::{
BackendError, BackendHandle, BuildOutcome, BuildStatus, Capabilities, ExecutionBackend, Result,
RunRequest,
};
const ARCHIVE_WARN_BYTES: u64 = 4 * 1024 * 1024;
const ARCHIVE_CAP_BYTES: u64 = 6 * 1024 * 1024;
#[derive(Debug)]
pub struct CloudBackend {
client: HarmontClient,
api_base: String,
app_base: String,
org: String,
}
impl CloudBackend {
#[must_use]
#[allow(clippy::similar_names)]
pub const fn new(
client: HarmontClient,
api_base: String,
app_base: String,
org: String,
) -> Self {
Self {
client,
api_base,
app_base,
org,
}
}
}
#[async_trait::async_trait]
impl ExecutionBackend for CloudBackend {
fn name(&self) -> &'static str {
"cloud"
}
fn capabilities(&self) -> Capabilities {
Capabilities::cloud()
}
#[allow(clippy::too_many_lines)]
async fn start(&self, req: RunRequest) -> Result<BackendHandle> {
let source_tgz = crate::local::build_archive_bytes(&req.repo_root)
.map_err(|e| BackendError::Local(format!("archiving worktree: {e}")))?;
guard_archive_size(source_tgz.len(), &req.repo_root)?;
let build = if let Some(slug) = req.cloud_pipeline_slug.clone() {
self.client
.submit_build(NewBuild {
org: self.org.clone(),
pipeline: slug,
branch: req.source.branch.clone(),
commit: req.source.commit.clone(),
message: req.source.message.clone(),
pipeline_ir: req.plan.ir_json.clone(), source_tgz,
env: req.env.clone().into_iter().collect(),
})
.await
.map_err(map_harmont_err)?
} else {
let repo_name = req.source.repo_name.clone().ok_or_else(|| {
BackendError::Local(
"cloud runs need a git remote to identify the pipeline — add an \
`origin` remote, or run from a cloned repo"
.into(),
)
})?;
self.client
.submit_repo_build(NewRepoBuild {
org: self.org.clone(),
repo_name,
source_slug: req.pipeline_slug.clone(),
branch: req.source.branch.clone(),
commit: req.source.commit.clone(),
message: req.source.message.clone(),
pipeline_ir: req.plan.ir_json.clone(), source_tgz,
env: req.env.clone().into_iter().collect(),
})
.await
.map_err(map_harmont_err)?
};
let pipeline = build.pipeline_slug.clone();
let watch_url = Some(dashboard_build_url(
&self.app_base,
&self.org,
&pipeline,
build.number,
));
let build_ref = BuildRef {
run_id: uuid::Uuid::new_v4(),
number: Some(build.number),
org: Some(self.org.clone()),
pipeline: pipeline.clone(),
};
let (tx, rx) = mpsc::channel(1024);
let cancel = CancellationToken::new();
let _ = tx.try_send(BuildEvent::BuildAccepted {
build: build_ref.clone(),
watch_url: watch_url.clone(),
});
if !req.options.watch {
let now = chrono::Utc::now();
let outcome = BuildOutcome {
build: build_ref,
status: BuildStatus::Passed,
steps: vec![],
started_at: now,
finished_at: now,
watch_url,
};
let join = tokio::spawn(async move {
drop(tx); Ok(outcome)
});
return Ok(BackendHandle::spawn(rx, cancel, join));
}
let client = self.client.clone();
let api_base = self.api_base.clone();
let org = self.org.clone();
let number = build.number;
let token = cancel.clone();
let started = chrono::Utc::now();
let join = tokio::spawn(async move {
let exit = tokio::select! {
biased;
() = token.cancelled() => {
let _ = client.cancel_build(&org, &pipeline, number).await;
return Ok(BuildOutcome {
build: build_ref,
status: BuildStatus::Canceled,
steps: vec![],
started_at: started,
finished_at: chrono::Utc::now(),
watch_url,
});
}
r = crate::cloud::watch::watch_build(&client, &api_base, &org, &pipeline, number, tx) => {
r.map_err(|e| BackendError::LogStream(e.to_string()))?
}
};
let status = match exit {
0 => BuildStatus::Passed,
130 => BuildStatus::Canceled,
_ => BuildStatus::Failed,
};
Ok(BuildOutcome {
build: build_ref,
status,
steps: vec![],
started_at: started,
finished_at: chrono::Utc::now(),
watch_url,
})
});
Ok(BackendHandle::spawn(rx, cancel, join))
}
}
fn map_harmont_err(e: HarmontError) -> BackendError {
match e {
HarmontError::Unauthorized => BackendError::Unauthorized,
HarmontError::Api {
status: _,
code,
message,
} => BackendError::Rejected { code, message },
HarmontError::NotFound(w) => BackendError::NotFound(w),
HarmontError::Transport(m) => BackendError::Transport(m),
HarmontError::Decode(m) | HarmontError::LogStream(m) => BackendError::LogStream(m),
}
}
fn dashboard_build_url(app_base: &str, org: &str, slug: &str, number: i64) -> String {
format!("{app_base}/{org}/pipelines/{slug}/builds/{number}")
}
fn human_mb(bytes: u64) -> String {
#[allow(clippy::cast_precision_loss)] let mb = bytes as f64 / (1024.0 * 1024.0);
format!("{mb:.1} MB")
}
fn guard_archive_size(archive_len: usize, repo_root: &Path) -> Result<()> {
let bytes = archive_len as u64;
tracing::info!("uploading source archive ({})", human_mb(bytes));
if bytes <= ARCHIVE_WARN_BYTES {
return Ok(());
}
let largest = crate::local::top_level_sizes(repo_root);
let offenders: Vec<(String, u64)> = largest.into_iter().take(3).collect();
if bytes > ARCHIVE_CAP_BYTES {
return Err(BackendError::SourceTooLarge {
observed_bytes: bytes,
cap_bytes: ARCHIVE_CAP_BYTES,
largest_paths: offenders,
});
}
let hint = offenders
.iter()
.map(|(name, sz)| format!("{name} ({})", human_mb(*sz)))
.collect::<Vec<_>>()
.join(", ");
tracing::warn!(
"source archive is {} (largest: {}). Add big build artifacts to .gitignore to speed up uploads.",
human_mb(bytes),
if hint.is_empty() {
"—".to_string()
} else {
hint
},
);
Ok(())
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::panic, clippy::cast_possible_truncation)]
mod tests {
use super::*;
#[test]
fn watch_url_uses_app_host_and_pipelines_path() {
assert_eq!(
dashboard_build_url("https://app.harmont.dev", "acme", "web", 42),
"https://app.harmont.dev/acme/pipelines/web/builds/42"
);
}
#[test]
fn archive_under_warn_passes() {
guard_archive_size(1024, std::path::Path::new("/nonexistent")).unwrap();
}
#[test]
fn archive_over_cap_fails_with_source_too_large() {
let tmp = tempfile::tempdir().unwrap();
let err = guard_archive_size(ARCHIVE_CAP_BYTES as usize + 1, tmp.path()).unwrap_err();
match err {
BackendError::SourceTooLarge {
observed_bytes,
cap_bytes,
..
} => {
assert_eq!(observed_bytes, ARCHIVE_CAP_BYTES + 1);
assert_eq!(cap_bytes, ARCHIVE_CAP_BYTES);
}
other => panic!("expected SourceTooLarge, got {other:?}"),
}
}
#[test]
fn human_mb_formats_one_decimal() {
assert_eq!(human_mb(6 * 1024 * 1024), "6.0 MB");
assert_eq!(human_mb(1536 * 1024), "1.5 MB");
}
}