use crate::{
error,
runner::{
docker::Docker, sender::StepMessageSender, GlobalSecret, GlobalVolume, JobContext,
OrganizationSecret, OrganizationVolume, RepositorySecret, RepositoryVolume, RunnerInner,
Secret, Volume,
},
EnvironmentVariable, StepNumber, StepRunResult, WorkflowState,
};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, path::PathBuf, process::ExitStatus, sync::Arc, time::Duration};
use tokio::{fs, io::AsyncWriteExt, sync::broadcast::error::RecvError};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct StepSecret {
pub key: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct StepVolume {
pub key: String,
pub to: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Step {
pub name: Option<String>,
pub image: String,
pub run: String,
pub continue_on_error: bool,
pub runner_dir: String,
pub working_dir: String,
pub cache_dir: String,
pub environments: Option<HashMap<String, EnvironmentVariable>>,
pub secrets: Vec<StepSecret>,
pub volumes: Vec<StepVolume>,
pub timeout: Duration,
pub security_opts: Option<Vec<String>>,
}
pub struct StepRunContext {
pub number: u64,
pub message_sender: StepMessageSender,
pub job_context: Arc<JobContext>,
}
#[derive(Debug)]
pub enum StepCommandResult {
ExitStatus(ExitStatus),
Cancelled,
Failed(error::Error),
}
fn get_secret_value(state: &Arc<Mutex<RunnerInner>>, secret_key: &String) -> Option<String> {
let state = state.lock();
let secret_value = state.secrets.get(secret_key);
match secret_value {
Some(secret) => match secret {
Secret::Repository(RepositorySecret { value, .. }) => Some(value.clone()),
Secret::Organization(OrganizationSecret { value, .. }) => Some(value.clone()),
Secret::Global(GlobalSecret { value, .. }) => Some(value.clone()),
},
None => None,
}
}
fn get_volume_value(state: &Arc<Mutex<RunnerInner>>, volume_key: &String) -> Option<String> {
match state.lock().volumes.get(volume_key) {
Some(volume) => match volume {
Volume::Repository(RepositoryVolume { path, .. }) => Some(path.clone()),
Volume::Organization(OrganizationVolume { path, .. }) => Some(path.clone()),
Volume::Global(GlobalVolume { path, .. }) => Some(path.clone()),
},
None => None,
}
}
impl Step {
pub async fn run(&self, context: StepRunContext) -> crate::Result<StepRunResult> {
let started_at = chrono::Utc::now();
let workflow_id = context.job_context.workflow_run_context.id.clone();
let job_id = context.job_context.id.clone();
let number = context.number;
context
.message_sender
.update_step_state(WorkflowState::InProgress);
let docker_name = format!("workflow-step-{}_{}_{}", workflow_id, job_id, number);
let entrypoint_path = self.create_entrypoint_script(&context).await?;
let entrypoint_path_string =
entrypoint_path
.to_str()
.ok_or(error::Error::internal_runtime_error(
"Failed to convert PathBuf to &str",
))?;
let docker = self.generate_docker(
docker_name.clone(),
entrypoint_path_string.to_string(),
&context,
)?;
let mut workflow_state_receiver = context
.job_context
.workflow_run_context
.workflow_state
.clone();
let step_command_result = tokio::select! {
res = self.run_docker(docker.clone(), &context) => {
match res {
Ok(result) => {
result
},
Err(err) => {
StepCommandResult::Failed(err)
}
}
}
_ = workflow_state_receiver.changed() => {
let command = workflow_state_receiver.borrow().clone();
if command == WorkflowState::Cancelled {
log::info!("Cancel command received");
docker
.kill()
.await
.map_err(|err| error::Error::internal_runtime_error(
format!("Failed to stop docker container: {}", err.to_string())
))?;
StepCommandResult::Cancelled
} else {
StepCommandResult::Failed(error::Error::internal_runtime_error(
format!("Unexpected command received: {}", command.to_string())
))
}
}
_ = tokio::time::sleep(self.timeout) => {
log::info!("Timeout");
docker
.kill()
.await
.map_err(|err| error::Error::internal_runtime_error(
format!("Failed to stop docker container: {}", err.to_string())
))?;
StepCommandResult::Cancelled
}
};
let ended_at = chrono::Utc::now();
log::info!("Cleanup files");
log::info!(
"Duration: {}ms",
ended_at.timestamp_millis() - started_at.timestamp_millis()
);
let (state, error) = match step_command_result {
StepCommandResult::ExitStatus(status) => {
if status.success() {
(WorkflowState::Succeeded, None)
} else {
(
WorkflowState::Failed,
Some(error::Error::failed(status.clone())),
)
}
}
StepCommandResult::Cancelled => (WorkflowState::Cancelled, None),
StepCommandResult::Failed(err) => {
log::error!("Step failed: {}", err.to_string());
(WorkflowState::Failed, Some(err))
}
};
context.message_sender.update_step_state(state.clone());
Ok(StepRunResult {
error,
state,
started_at: Some(started_at),
ended_at: Some(ended_at),
})
}
async fn run_docker(
&self,
docker: Docker,
context: &StepRunContext,
) -> crate::Result<StepCommandResult> {
let sender = context.message_sender.clone();
let workflow_state = context
.job_context
.workflow_run_context
.workflow_state
.borrow()
.clone();
if workflow_state == WorkflowState::Cancelled {
return Ok(StepCommandResult::Cancelled);
}
let mut log_receiver = docker.subscribe_logs();
let join_handle = tokio::task::spawn(async move {
loop {
match log_receiver.recv().await {
Ok(log) => {
sender.send_log(log);
}
Err(err) => {
if err == RecvError::Closed {
log::info!("Log receiver closed");
break;
}
log::error!("Failed to receive log: {}", err.to_string());
}
};
}
});
let status = docker.run().await.map_err(|err| {
error::Error::internal_runtime_error(format!(
"Failed to run docker container: {}",
err.to_string()
))
})?;
join_handle.abort();
Ok(StepCommandResult::ExitStatus(status))
}
fn generate_docker(
&self,
docker_name: String,
entrypoint_path: String,
context: &StepRunContext,
) -> crate::Result<Docker> {
let image = self.image.clone();
let host_user_dir =
context
.job_context
.host_user_dir
.to_str()
.ok_or(error::Error::internal_runtime_error(
"Failed to parse user dir",
))?;
let container_cache_dir = context
.job_context
.workflow_run_context
.cache_dir
.to_str()
.ok_or(error::Error::internal_runtime_error(
"Failed to parse cache dir",
))?;
let workspace_dir = self.working_dir.clone();
let runner_dir = self.runner_dir.clone();
let cache_dir = self.cache_dir.clone();
let environments = self.get_environments(&context)?;
let docker_entrypoint_path = format!("{}/entrypoint.sh", runner_dir);
let mut docker = Docker::new(docker_name)
.image(image)
.working_dir(workspace_dir.clone())
.volume(entrypoint_path, docker_entrypoint_path.clone())
.volume(host_user_dir, workspace_dir)
.volume(container_cache_dir, cache_dir)
.entrypoint(docker_entrypoint_path)
.auto_remove(true);
for (key, value) in environments {
docker = docker.environment(key, value);
}
for (from, to) in &context.job_context.working_dir_maps {
let from = from.to_str().ok_or(error::Error::internal_runtime_error(
"Failed to parse working dir",
))?;
docker = docker.volume(from, to);
}
let inner_state = context
.job_context
.workflow_run_context
.runner_inner
.clone();
for volume in self.volumes.clone() {
let from = get_volume_value(&inner_state, &volume.key).ok_or(
error::Error::workflow_config_error(format!("Volume `{}` is not defined.", volume.key)),
)?;
docker = docker.volume(from, volume.to);
}
drop(inner_state);
if let Some(security_opts) = self.security_opts.clone() {
for opt in security_opts {
docker = docker.security_opt(opt);
}
}
Ok(docker)
}
fn get_environments(&self, context: &StepRunContext) -> crate::Result<HashMap<String, String>> {
let mut environments: HashMap<String, String> = HashMap::new();
if let Some(envs) = context
.job_context
.workflow_run_context
.environments
.clone()
{
for (key, value) in envs {
environments.insert(key, value);
}
}
if let Some(envs) = self.environments.clone() {
for (key, value) in envs {
match value {
EnvironmentVariable::String(value) => {
environments.insert(key, value);
}
EnvironmentVariable::Number(value) => {
environments.insert(key, value.to_string());
}
EnvironmentVariable::Boolean(value) => {
environments.insert(key, value.to_string());
}
}
}
}
let inner_state = context
.job_context
.workflow_run_context
.runner_inner
.clone();
for secret in self.secrets.clone() {
let value = get_secret_value(&inner_state, &secret.key).ok_or(
error::Error::workflow_config_error(format!("Secret `{}` is not defined.", secret.key)),
)?;
environments.insert(secret.key, value);
}
drop(inner_state);
let workspace_dir = self.working_dir.clone();
let cache_dir = self.cache_dir.clone();
let envs = self.preset_environments(
context.number,
workspace_dir,
cache_dir,
&context.job_context,
);
for (key, value) in envs {
environments.insert(key, value);
}
Ok(environments)
}
fn preset_environments(
&self,
number: StepNumber,
workspace: String,
cache_dir: String,
job_context: &JobContext,
) -> HashMap<String, String> {
let mut environments: Vec<(&str, String)> = vec![];
environments.push(("COODEV_STEP_ID", number.to_string()));
environments.push(("COODEV_JOB_ID", job_context.id.to_string()));
environments.push(("COODEV_WORKSPACE", workspace));
environments.push(("COODEV_CACHE_DIR", cache_dir));
environments.push(("CI", "COODEV".to_string()));
let ctx = &job_context.workflow_run_context;
environments.push(("COODEV_WORKFLOW_ID", ctx.id.to_string()));
environments.push(("COODEV_REF", ctx.ref_name.to_string()));
environments.push(("COODEV_REPO_OWNER", ctx.repo_owner.to_string()));
environments.push(("COODEV_REPO_NAME", ctx.repo_name.to_string()));
environments.push(("COODEV_DEFAULT_BRANCH", ctx.default_branch.to_string()));
environments.push((
"COODEV_REPOSITORY",
format!("{}/{}", ctx.repo_owner, ctx.repo_name),
));
environments.push(("COODEV_SHA", ctx.sha.to_string()));
environments.push(("COODEV_COMMIT_MESSAGE", ctx.commit_message.to_string()));
environments.push(("COODEV_COMMITTER", ctx.committer.to_string()));
environments.push(("COODEV_COMMITTER_EMAIL", ctx.committer_email.to_string()));
environments.push(("COODEV_IS_PRIVATE", ctx.is_private.to_string()));
environments.push(("COODEV_ACCESS_TOKEN", ctx.access_token.to_string()));
if ctx.pr_number.is_some() {
environments.push(("COODEV_PR_NUMBER", ctx.pr_number.unwrap().to_string()));
} else {
environments.push(("COODEV_PR_NUMBER", "".to_string()));
}
environments
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect()
}
async fn create_entrypoint_script(&self, context: &StepRunContext) -> crate::Result<PathBuf> {
let entrypoint_name = format!("step-entrypoint-{}.sh", context.number);
let entrypoint_path = context.job_context.host_working_dir.join(entrypoint_name);
let mut file;
#[cfg(unix)]
{
file = fs::OpenOptions::new()
.create(true)
.write(true)
.mode(0o777)
.open(&entrypoint_path)
.await
.map_err(|err| error::Error::io_error(err, "Failed to create entrypoint file"))?;
}
#[cfg(not(unix))]
{
file = fs::File::create(&entrypoint_path)
.await
.map_err(|err| error::Error::io_error(err, "Failed to create entrypoint file"))?;
}
file
.write(b"#!/bin/sh\n")
.await
.map_err(|err| error::Error::io_error(err, "Failed to write entrypoint file"))?;
file
.write_all(self.run.as_bytes())
.await
.map_err(|err| error::Error::io_error(err, "Failed to write entrypoint file"))?;
drop(file);
Ok(entrypoint_path)
}
}