mod command;
mod docker;
mod job;
mod plugins;
mod runner_builder;
mod secret;
mod sender;
mod step;
mod utils;
mod volumes;
mod workflow;
mod workflow_runner;
pub use self::{
job::{Job, JobContext},
plugins::{Plugin, PluginManager},
runner_builder::RunnerBuilder,
secret::{GlobalSecret, OrganizationSecret, RepositorySecret, Secret},
sender::{EventHandler, JobMessageSender, MessageSender, WorkflowMessageSender},
step::{Step, StepRunContext, StepSecret, StepVolume},
utils::{get_workflow_event_payload, should_skip_workflow},
volumes::{GlobalVolume, OrganizationVolume, RepositoryVolume, Volume},
workflow::Workflow,
workflow_runner::{WorkflowRunContext, WorkflowRunOptions, WorkflowRunner},
};
use crate::{
actions::{Action, Actions},
constants::COODEV_CONTAINER_IMAGE,
error,
user_config::{UserCommandStep, UserJob, UserStep, UserWorkflow},
WorkflowEvent, WorkflowMessage,
};
use derive_builder::Builder;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, env, path::PathBuf, sync::Arc};
use tokio::sync::Semaphore;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum GithubAuthorization {
PersonalAccessToken(String),
GithubApp { app_id: u64, private_key: String },
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum WorkflowRefType {
Branch,
Tag,
}
#[derive(Serialize, Deserialize, Debug, Clone, Builder)]
pub struct CreateWorkflowOptions {
pub config: String,
pub event: WorkflowEvent,
}
pub struct RunnerInner {
actions: Actions,
secrets: HashMap<String, Secret>,
volumes: HashMap<String, Volume>,
plugin_manager: PluginManager,
workflow_message_sender: MessageSender,
}
#[derive(Clone)]
pub struct Runner {
github_authorization: GithubAuthorization,
working_dir: Option<PathBuf>,
inner: Arc<Mutex<RunnerInner>>,
parallel_semaphore: Arc<Semaphore>,
}
impl Runner {
pub fn builder() -> RunnerBuilder {
RunnerBuilder::default()
}
pub fn register_action<T>(&self, name: impl Into<String>, action: T) -> &Self
where
T: Action + 'static,
{
self
.inner
.lock()
.actions
.register(name.into(), Box::new(action) as Box<dyn Action>);
self
}
pub fn register_secret(&self, secret: Secret) -> &Self {
match secret.clone() {
Secret::Repository(RepositorySecret {
key, repository, ..
}) => {
self
.inner
.lock()
.secrets
.insert(format!("{}@{}", key.to_uppercase(), repository), secret);
}
Secret::Organization(OrganizationSecret {
key, organization, ..
}) => {
self
.inner
.lock()
.secrets
.insert(format!("{}@{}", key.to_uppercase(), organization), secret);
}
Secret::Global(GlobalSecret { key, .. }) => {
self.inner.lock().secrets.insert(key.to_uppercase(), secret);
}
}
self
}
pub fn register_volume(&self, volume: Volume) -> &Self {
match volume.clone() {
Volume::Repository(RepositoryVolume {
key, repository, ..
}) => {
self
.inner
.lock()
.volumes
.insert(format!("{}@{}", key.to_uppercase(), repository), volume);
}
Volume::Organization(OrganizationVolume {
key, organization, ..
}) => {
self
.inner
.lock()
.volumes
.insert(format!("{}@{}", key.to_uppercase(), organization), volume);
}
Volume::Global(GlobalVolume { key, .. }) => {
self.inner.lock().volumes.insert(key.to_uppercase(), volume);
}
}
self
}
pub fn register_plugin<T>(&self, plugin: T) -> &Self
where
T: Plugin + 'static,
{
plugin.on_init(self);
self
.inner
.lock()
.plugin_manager
.register(Box::new(plugin) as Box<dyn Plugin>);
self
}
pub fn on_event<F>(&self, event_handler: F) -> &Self
where
F: Fn(&WorkflowMessage) + Send + Sync + 'static,
{
self
.inner
.lock()
.workflow_message_sender
.register(Box::new(event_handler) as EventHandler);
self
}
pub fn create_workflow_runner(
&self,
options: CreateWorkflowOptions,
) -> crate::Result<WorkflowRunner> {
let event = options.event.clone();
let workflow = self.parse_workflow(options)?;
let workflow_runner = self.create_workflow_runner_from_normalized_workflow(workflow, event)?;
Ok(workflow_runner)
}
pub fn create_workflow_runner_from_normalized_workflow(
&self,
workflow: Workflow,
event: WorkflowEvent,
) -> crate::Result<WorkflowRunner> {
let runner_working_dir = match self.working_dir {
Some(ref path) => path.clone(),
#[allow(deprecated)]
None => env::home_dir()
.ok_or(
error::Error::workflow_config_error(
"Failed to get working directory. Please set working directory by Runner::builder().working_dir()"
)
)?
.join("coodev-runner"),
};
let workflow_runner = WorkflowRunner::builder()
.workflow(workflow)
.workflow_semaphore(self.parallel_semaphore.clone())
.workflow_message_sender(self.inner.lock().workflow_message_sender.clone())
.runner_working_dir(runner_working_dir)
.github_authorization(self.github_authorization.clone())
.runner_inner(self.inner.clone())
.event(event)
.build()
.map_err(|err| error::Error::internal_runtime_error(err.to_string()))?;
self
.inner
.lock()
.plugin_manager
.on_workflow_runner_init(&workflow_runner);
Ok(workflow_runner)
}
pub fn parse_workflow(&self, options: CreateWorkflowOptions) -> crate::Result<Workflow> {
let workflow = UserWorkflow::from_str(&options.config)?;
let mut jobs = HashMap::new();
for (job_id, job) in workflow.jobs {
let normalized_job = self.normalize_job(job, &options)?;
jobs.insert(job_id, normalized_job);
}
let workflow = Workflow {
name: workflow.name,
on: workflow.on,
jobs,
};
Ok(workflow)
}
fn normalize_job(&self, job: UserJob, options: &CreateWorkflowOptions) -> crate::Result<Job> {
let image = job.image.unwrap_or(COODEV_CONTAINER_IMAGE.to_string());
let steps = self.normalize_steps(job.steps, image.clone(), &options)?;
Ok(Job {
name: job.name,
image,
steps,
depends_on: job.depends_on,
working_dirs: job.working_dirs,
})
}
fn normalize_steps(
&self,
steps: Vec<UserStep>,
job_image: String,
options: &CreateWorkflowOptions,
) -> crate::Result<Vec<Step>> {
let event_payload = get_workflow_event_payload(options.event.clone())?;
let mut normalized_steps = vec![];
let steps = self.normalize_user_steps(steps)?;
for step in steps.iter() {
if let UserStep::Command(command_step) = step.clone() {
let UserCommandStep {
name,
image,
run,
continue_on_error,
environments,
volumes,
secrets,
timeout,
security_opts,
} = command_step;
let runner_dir = "/home/runner".to_string();
let cache_dir = "/home/runner/cache".to_string();
let inner_state = self.inner.lock();
let mut step_volumes: Vec<StepVolume> = vec![];
if let Some(volumes) = volumes {
for volume in volumes {
let idx = volume.find(':').ok_or(error::Error::workflow_config_error(
"Invalid volume format. The format should be `volume_key:to`",
))?;
let (volume_key, volume_to) = volume.split_at(idx);
let uppercase_key = volume_key.to_uppercase();
let repository = format!("{}/{}", event_payload.repo_owner, event_payload.repo_name);
let repo_volume_key = format!("{}@{}", uppercase_key, repository);
if inner_state.volumes.contains_key(&repo_volume_key) {
step_volumes.push(StepVolume {
key: repo_volume_key,
to: volume_to[1..].to_string(),
});
continue;
}
let org_volume_key = format!("{}@{}", uppercase_key, event_payload.repo_owner);
if inner_state.volumes.contains_key(&org_volume_key) {
step_volumes.push(StepVolume {
key: org_volume_key,
to: volume_to[1..].to_string(),
});
continue;
}
if inner_state.volumes.contains_key(&uppercase_key) {
step_volumes.push(StepVolume {
key: uppercase_key,
to: volume_to[1..].to_string(),
});
continue;
}
return Err(error::Error::workflow_config_error(format!(
"Volume `{}` is not defined.",
volume_key
)));
}
}
let mut step_secrets: Vec<StepSecret> = vec![];
if let Some(secret_keys) = secrets {
for secret_key in secret_keys {
let uppercase_key = secret_key.to_uppercase();
let repository = format!("{}/{}", event_payload.repo_owner, event_payload.repo_name);
let repo_secret_key = format!("{}@{}", uppercase_key, repository);
if inner_state.secrets.contains_key(&repo_secret_key) {
step_secrets.push(StepSecret {
key: repo_secret_key,
});
continue;
}
let org_secret_key = format!("{}@{}", uppercase_key, event_payload.repo_owner);
if inner_state.secrets.contains_key(&org_secret_key) {
step_secrets.push(StepSecret {
key: org_secret_key,
});
continue;
}
if inner_state.secrets.contains_key(&uppercase_key) {
step_secrets.push(StepSecret { key: uppercase_key });
continue;
}
return Err(error::Error::workflow_config_error(format!(
"Secret `{}` is not defined.",
secret_key
)));
}
}
let timeout = timeout.unwrap_or("60m".to_string());
let timeout = humantime::parse_duration(&timeout).map_err(|err| {
log::error!("Invalid timeout format: {}", err);
error::Error::workflow_config_error(
"Invalid timeout format. The format should like `60m` or `1h`.",
)
})?;
let normalized_step = Step {
name,
image: image.unwrap_or(job_image.clone()),
run,
continue_on_error: continue_on_error.unwrap_or(false),
environments,
runner_dir,
working_dir: format!("/home/runner/work/{}", event_payload.repo_name),
cache_dir,
volumes: step_volumes,
secrets: step_secrets,
timeout,
security_opts,
};
normalized_steps.push(normalized_step);
} else {
return Err(error::Error::unsupported_feature(
"Currently, only `command` step is supported.",
));
}
}
Ok(normalized_steps)
}
fn normalize_user_steps(&self, user_steps: Vec<UserStep>) -> crate::Result<Vec<UserStep>> {
let mut pre_steps = vec![];
let mut steps = vec![];
let mut post_steps = vec![];
for step in user_steps {
if let UserStep::Action(user_action_step) = &step {
let action_name = &user_action_step.uses;
let action = self.inner.lock().actions.get(action_name).ok_or_else(|| {
error::Error::workflow_config_error(format!("Action `{}` is not found", action_name))
})?;
let action_steps = action.normalize(user_action_step.clone())?;
if let Some(pre) = action_steps.pre {
pre_steps.push(pre);
}
if let Some(post) = action_steps.post {
post_steps.insert(0, post)
}
steps.push(action_steps.run);
continue;
}
steps.push(step.clone());
}
let steps: Vec<UserStep> = vec![]
.into_iter()
.chain(pre_steps.into_iter())
.chain(steps.into_iter())
.chain(post_steps.into_iter())
.collect();
Ok(steps)
}
}
#[cfg(test)]
mod tests {
use crate::{
runner::{plugins::Plugin, GithubAuthorization, Runner, Secret, Volume, WorkflowRunOptions},
utils::test::{
create_runner, create_workflow_options, create_workflow_runner, enable_logger,
get_default_run_options, load_config, run_workflow,
},
WorkflowState,
};
use std::path::PathBuf;
use tokio::fs;
#[tokio::test]
#[ignore]
async fn test() -> anyhow::Result<()> {
let res = run_workflow(
r#"
name: Test workflow
on:
push:
branches:
- master
jobs:
test:
image: ubuntu
steps:
- name: Test step
run: echo "Hello world"
test2:
image: ubuntu
steps:
- name: Test step
environments:
TEST: "test"
run: |
pwd
echo "Hello world2 ${TEST}"
"#,
)
.await?;
assert_eq!(res.state, WorkflowState::Succeeded);
Ok(())
}
#[tokio::test]
#[ignore]
async fn test_working_directories() -> anyhow::Result<()> {
let res = run_workflow(
r#"
jobs:
test:
working-directories:
- /root/.cache
steps:
- name: Test create file
run: |
mkdir -p /root/.cache
echo "Hello world" > /root/.cache/test.txt
- name: Test step
run: |
contents=$(cat /root/.cache/test.txt)
if [ "$contents" != "Hello world" ]; then
exit 1
fi
echo "Contents: $contents"
"#,
)
.await?;
assert_eq!(res.state, WorkflowState::Succeeded);
Ok(())
}
#[tokio::test]
#[ignore]
async fn test_timeout() -> anyhow::Result<()> {
let res = run_workflow(
r#"
jobs:
test:
steps:
- name: Timeout step
timeout: 5s
run: sleep 1m
"#,
)
.await?;
assert_eq!(res.state, WorkflowState::Cancelled);
Ok(())
}
#[tokio::test]
#[ignore]
async fn test_continue_on_error() -> anyhow::Result<()> {
let res = run_workflow(
r#"
jobs:
test:
image: ubuntu
steps:
- name: Throw error
run: exit 1
- name: Test step
allow-failure: true
run: echo "\e[31mHello world\e[0m"
"#,
)
.await?;
assert_eq!(res.state, WorkflowState::Failed);
Ok(())
}
#[tokio::test]
#[ignore]
async fn test_buildkit() -> anyhow::Result<()> {
let res = run_workflow(
r#"
jobs:
test:
image: ubuntu
steps:
- name: Build Docker image
image: moby/buildkit:rootless
environments:
BUILDKITD_FLAGS: --oci-worker-no-process-sandbox
security-opts:
- seccomp=unconfined
- apparmor=unconfined
run: |
echo "FROM alpine" > Dockerfile
buildctl-daemonless.sh build \
--frontend dockerfile.v0 \
--local context=. \
--local dockerfile=. \
--output type=image
"#,
)
.await?;
assert_eq!(res.state, WorkflowState::Succeeded);
Ok(())
}
#[tokio::test]
#[ignore]
async fn test_secret() -> anyhow::Result<()> {
let runner = create_runner()?;
runner
.register_secret(
Secret::new("TEST_SECRET")
.value("test_secret_value")
.build()?,
)
.register_secret(
Secret::new("lowercase_secret")
.value("test_lowercase_secret_value")
.owner("panghu-huang")
.repository("octocrate")
.build()?,
)
.register_secret(
Secret::new("owner_lowercase_secret")
.value("test_owner_lowercase_secret_value")
.owner("panghu-huang")
.build()?,
);
let workflow_runner = runner.create_workflow_runner(create_workflow_options(
r#"
name: Test workflow
on:
push:
branches:
- master
jobs:
test:
image: ubuntu
steps:
- name: Test step
secrets:
- TEST_SECRET
- lowercase_secret
- owner_lowercase_secret
run: |
echo "TEST_SECRET is $TEST_SECRET"
echo "LOWERCASE_SECRET is $LOWERCASE_SECRET"
echo "OWNER_LOWERCASE_SECRET is $OWNER_LOWERCASE_SECRET"
"#,
))?;
let res = workflow_runner.run(get_default_run_options()).await?;
assert_eq!(res.state, WorkflowState::Succeeded);
Ok(())
}
#[tokio::test]
async fn test_out_of_scope_secret() -> anyhow::Result<()> {
let runner = create_runner()?;
runner
.register_secret(
Secret::new("lowercase_secret")
.value("global_secret")
.build()?,
)
.register_secret(
Secret::new("lowercase_secret")
.value("test_lowercase_secret_value")
.owner("panghu-huang")
.repository("github-api")
.build()?,
)
.register_secret(
Secret::new("owner_lowercase_secret")
.value("test_owner_lowercase_secret_value")
.owner("coodevjs")
.build()?,
);
let res = runner.create_workflow_runner(create_workflow_options(
r#"
name: Test workflow
jobs:
test:
image: ubuntu
steps:
- name: Test step
secrets:
- lowercase_secret
- owner_lowercase_secret
run: |
echo "LOWERCASE_SECRET is $LOWERCASE_SECRET"
echo "OWNER_LOWERCASE_SECRET is $OWNER_LOWERCASE_SECRET"
"#,
));
match res {
Err(err) => assert_eq!(
err.to_string(),
"Failed to parse user config: Secret `owner_lowercase_secret` is not defined."
),
_ => assert!(false),
}
Ok(())
}
#[tokio::test]
#[ignore]
async fn test_volume() -> anyhow::Result<()> {
let runner = create_runner()?;
#[allow(deprecated)]
let home_dir = std::env::home_dir().unwrap();
let filepath = PathBuf::from(home_dir).join("test.txt");
let filepath = filepath.to_str().ok_or(anyhow::anyhow!(
"Failed to convert path to string: {:?}",
filepath
))?;
fs::write(filepath, "test volume content").await?;
runner.register_volume(
Volume::new("test_volume")
.path(filepath)
.owner("panghu-huang")
.repository("octocrate")
.build()?,
);
let workflow_runner = runner.create_workflow_runner(create_workflow_options(
r#"
name: Test workflow
jobs:
test:
image: ubuntu
steps:
- name: Test step
volumes:
- test_volume:/home/runner/work/octocrate/test.txt
run: |
echo "Before"
pwd
ls
echo "File content: $(cat ./test.txt)"
echo "After"
"#,
))?;
let res = workflow_runner.run(get_default_run_options()).await?;
fs::remove_file(filepath).await?;
assert_eq!(res.state, WorkflowState::Succeeded);
Ok(())
}
#[tokio::test]
#[ignore]
async fn depends_on() -> anyhow::Result<()> {
let res = run_workflow(
r#"
name: Test workflow
jobs:
test:
image: ubuntu
steps:
- name: Test step
run: |
sleep 5s
echo "Hello world"
test2:
image: ubuntu
steps:
- name: Test step
environments:
TEST: "test"
run: echo "Hello world2 ${TEST}"
test3:
image: ubuntu
depends-on:
- test
- test2
steps:
- name: Test step
environments:
TEST: "test"
run: echo "Hello world3 ${TEST}"
"#,
)
.await?;
assert_eq!(res.state, WorkflowState::Succeeded);
Ok(())
}
#[tokio::test]
#[ignore]
async fn cancel() -> anyhow::Result<()> {
let workflow_runner = create_workflow_runner(
r#"
name: Test workflow
jobs:
test:
image: ubuntu
steps:
- name: Test step
run: |
sleep 5s
echo "Hello world"
test2:
image: ubuntu
steps:
- name: Test step
environments:
TEST: "test"
run: |
sleep 5s
echo "Hello world2 ${TEST}"
"#,
)?;
let cloned = workflow_runner.clone();
let handle = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
cloned.cancel().await.unwrap();
log::info!("Workflow cancelled");
});
let res = workflow_runner.run(get_default_run_options()).await?;
handle.await?;
assert_eq!(res.state, WorkflowState::Cancelled);
Ok(())
}
#[tokio::test]
#[ignore]
async fn queue() -> anyhow::Result<()> {
enable_logger();
let config = load_config()?;
let github_authorization = GithubAuthorization::GithubApp {
app_id: config.github_app_id,
private_key: config.github_app_private_key,
};
let runner = Runner::builder()
.github_authorization(github_authorization)
.parallel_size(3 as usize)
.build()?;
let mut handles = vec![];
for idx in 0..10 {
let sleep = 10 - idx;
let workflow_runner = runner.create_workflow_runner(create_workflow_options(format!(
r#"
name: Test workflow
jobs:
test:
image: ubuntu
steps:
- name: Test step
run: |
echo "Running {idx}"
sleep {sleep}s
echo "Finished {idx}"
"#
)))?;
let handle = tokio::spawn(async move {
let options = WorkflowRunOptions {
run_id: idx.to_string(),
environments: None,
};
workflow_runner.run(options).await.unwrap()
});
handles.push(handle);
}
for handle in handles {
let res = handle.await?;
assert_eq!(res.state, WorkflowState::Succeeded);
}
Ok(())
}
#[tokio::test]
#[ignore]
async fn test_plugin() -> anyhow::Result<()> {
enable_logger();
let runner = create_runner()?;
struct TestPlugin;
impl Plugin for TestPlugin {
fn on_init(&self, _runner: &Runner) {
log::info!("TestPlugin init");
}
}
runner.register_plugin(TestPlugin);
Ok(())
}
#[tokio::test]
#[ignore]
async fn test_on_event() -> anyhow::Result<()> {
enable_logger();
let runner = create_runner()?;
runner.on_event(|event| {
log::info!("Event: {:?}", event);
});
let workflow_runner = runner.create_workflow_runner(create_workflow_options(format!(
r#"
name: Test workflow
jobs:
test:
image: ubuntu
steps:
- name: Test step
run: |
echo "Running"
"#
)))?;
let _res = workflow_runner.run(get_default_run_options()).await?;
Ok(())
}
}