use crate::{
error,
runner::{
sender::MessageSender,
utils::{get_workflow_event_payload, is_workflow_finished},
workflow::Workflow,
GithubAuthorization, RunnerInner, WorkflowMessageSender,
},
Id, WorkflowAPIEvent, WorkflowEvent, WorkflowRunResult, WorkflowState, WorkflowTriggerEvents,
};
use derive_builder::Builder;
use octocrate::{GithubAPI, GithubApp, GithubPersonalAccessToken};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::{
fs,
sync::{watch, Semaphore},
};
#[derive(Debug, Clone)]
pub enum WorkflowCancelError {
Finished,
Cancelled,
}
#[derive(Debug, Clone)]
pub struct WorkflowGithubContext {
pub default_branch: String,
pub committer: String,
pub committer_email: String,
pub commit_message: String,
pub is_private: bool,
pub sha: String,
pub access_token: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WorkflowRunOptions {
pub run_id: Id,
pub environments: Option<HashMap<String, String>>,
}
pub struct WorkflowRunContext {
pub id: Id,
pub name: Option<String>,
pub on: Option<WorkflowTriggerEvents>,
pub event: WorkflowEvent,
pub environments: Option<HashMap<String, String>>,
pub working_dir: PathBuf,
pub cache_dir: PathBuf,
pub message_sender: WorkflowMessageSender,
pub workflow_state: watch::Receiver<WorkflowState>,
pub runner_inner: Arc<Mutex<RunnerInner>>,
pub repo_owner: String,
pub repo_name: String,
pub sha: String,
pub pr_number: Option<u64>,
pub ref_name: String,
pub default_branch: String,
pub committer: String,
pub committer_email: String,
pub commit_message: String,
pub is_private: bool,
pub access_token: String,
}
#[derive(Clone, Builder)]
pub struct WorkflowRunner {
workflow: Workflow,
event: WorkflowEvent,
runner_working_dir: PathBuf,
github_authorization: GithubAuthorization,
#[builder(default = "Arc::new(Semaphore::new(1))")]
workflow_semaphore: Arc<Semaphore>,
#[builder(default = "Arc::new(watch::channel(WorkflowState::Pending).0)")]
workflow_state_sender: Arc<watch::Sender<WorkflowState>>,
workflow_message_sender: MessageSender,
runner_inner: Arc<Mutex<RunnerInner>>,
}
impl WorkflowRunner {
pub fn builder() -> WorkflowRunnerBuilder {
WorkflowRunnerBuilder::default()
}
pub async fn run(&self, options: WorkflowRunOptions) -> crate::Result<WorkflowRunResult> {
let workflow_run_id = options.run_id.clone();
let event_payload = get_workflow_event_payload(self.event.clone())?;
let workflow_message_sender = WorkflowMessageSender::new(
workflow_run_id.clone(),
self.workflow_message_sender.clone(),
);
workflow_message_sender.update_workflow_state(WorkflowState::Queued);
let workflow_semaphore = self.workflow_semaphore.clone();
let mut workflow_state_receiver = self.workflow_state_sender.subscribe();
let permit = tokio::select! {
res = workflow_semaphore.acquire() => {
let permit = res.map_err(|e|
error::Error::internal_runtime_error(format!("Failed to acquire workflow semaphore: {}", e))
)?;
Some(permit)
},
_ = workflow_state_receiver.changed() => {
None
}
};
let workflow_state = workflow_state_receiver.borrow().clone();
if permit.is_none() || workflow_state == WorkflowState::Cancelled {
log::info!("Workflow {} is cancelled before running", workflow_run_id);
workflow_message_sender.update_workflow_state(WorkflowState::Cancelled);
return Ok(WorkflowRunResult {
state: WorkflowState::Cancelled,
started_at: None,
ended_at: None,
jobs: HashMap::new(),
});
}
let github_context;
match self.get_github_context(&event_payload).await {
Ok(ctx) => {
github_context = ctx;
}
Err(e) => {
log::error!("Failed to get github context: {}", e);
workflow_message_sender.update_workflow_state(WorkflowState::Failed);
return Ok(WorkflowRunResult {
state: WorkflowState::Failed,
started_at: None,
ended_at: None,
jobs: HashMap::new(),
});
}
}
let workflow_dir = self
.runner_working_dir
.join(&event_payload.repo_owner)
.join(&event_payload.repo_name);
let cache_dir = workflow_dir.join("cache");
let working_dir = workflow_dir.join(&workflow_run_id);
let WorkflowRunner {
workflow, event, ..
} = self.clone();
let WorkflowAPIEvent {
repo_owner,
repo_name,
pr_number,
sha,
ref_name,
..
} = event_payload;
let context = WorkflowRunContext {
id: workflow_run_id.clone(),
name: workflow.name.clone(),
on: workflow.on.clone(),
event,
repo_owner,
repo_name,
ref_name,
environments: options.environments,
runner_inner: self.runner_inner.clone(),
message_sender: workflow_message_sender.clone(),
workflow_state: workflow_state_receiver.clone(),
working_dir: working_dir.clone(),
cache_dir: cache_dir.clone(),
pr_number,
sha,
default_branch: github_context.default_branch,
committer: github_context.committer,
committer_email: github_context.committer_email,
commit_message: github_context.commit_message,
is_private: github_context.is_private,
access_token: github_context.access_token,
};
fs::create_dir_all(&working_dir).await.map_err(|err| {
error::Error::io_error(
err,
format!(
"Failed to create workflow directory: {}",
working_dir.display()
),
)
})?;
fs::create_dir_all(&cache_dir).await.map_err(|err| {
error::Error::io_error(
err,
format!("Failed to create cache directory: {}", cache_dir.display()),
)
})?;
let res = workflow.run(context).await?;
log::info!(
"Workflow finished: {} {}",
workflow_run_id,
working_dir.display()
);
if let Err(err) = fs::remove_dir_all(&working_dir).await {
log::error!("Failed to remove workflow directory: {}", err);
}
Ok(res)
}
pub async fn cancel(&self) -> Result<(), WorkflowCancelError> {
let workflow_state = self.workflow_state_sender.borrow().clone();
if workflow_state == WorkflowState::Cancelled {
return Err(WorkflowCancelError::Cancelled);
}
if workflow_state == WorkflowState::Succeeded
|| workflow_state == WorkflowState::Failed
|| workflow_state == WorkflowState::Skipped
{
return Err(WorkflowCancelError::Finished);
}
self
.workflow_state_sender
.send_replace(WorkflowState::Cancelled);
if workflow_state == WorkflowState::Pending {
return Ok(());
}
let mut workflow_state_receiver = self.workflow_state_sender.subscribe();
while let Ok(_) = workflow_state_receiver.changed().await {
let workflow_state = workflow_state_receiver.borrow();
if is_workflow_finished(&workflow_state.clone()) {
break;
}
}
Ok(())
}
pub fn get_workflow(&self) -> Workflow {
self.workflow.clone()
}
async fn get_github_api(
&self,
repo_owner: &String,
repo_name: &String,
) -> crate::Result<(GithubAPI, String)> {
let github_api = match &self.github_authorization {
GithubAuthorization::PersonalAccessToken(token) => {
let access_token = GithubPersonalAccessToken::new(token);
(GithubAPI::with_token(access_token), token.clone())
}
GithubAuthorization::GithubApp {
app_id,
private_key,
} => {
let github_app = GithubApp::builder()
.app_id(app_id.to_string())
.private_key(private_key)
.build()
.map_err(|err| error::Error::github_error(err, "Failed to create github app"))?;
let installation = github_app
.get_repository_installation(repo_owner, repo_name)
.await
.map_err(|err| {
error::Error::github_error(err, "Failed to get repository installation")
})?;
let access_token = github_app
.get_installation_access_token(installation.id)
.await
.map_err(|err| {
error::Error::github_error(err, "Failed to get installation access token")
})?;
let github_api = github_app.get_api(installation.id).await.map_err(|err| {
error::Error::github_error(err, "Failed to get github api for installation")
})?;
(github_api, access_token.token)
}
};
Ok(github_api)
}
async fn get_github_context(
&self,
event: &WorkflowAPIEvent,
) -> crate::Result<WorkflowGithubContext> {
let (github_api, access_token) = self
.get_github_api(&event.repo_owner, &event.repo_name)
.await?;
let repository = github_api
.repositories
.get_repository(&event.repo_owner, &event.repo_name)
.send()
.await
.map_err(|err| {
error::Error::github_error(
err,
format!(
"Failed to get repository {}/{}",
&event.repo_owner, &event.repo_name
),
)
})?;
let commit = github_api
.commits
.get_commit(&event.repo_owner, &event.repo_name, &event.sha)
.send()
.await
.map_err(|err| {
error::Error::github_error(
err,
format!(
"Failed to get commit {}/{}@{}",
&event.repo_owner, &event.repo_name, &event.sha
),
)
})?;
let commit_content = commit.commit.unwrap();
let commit_author = commit_content.author;
Ok(WorkflowGithubContext {
default_branch: repository.default_branch.unwrap(),
is_private: repository.private,
committer: commit_author.name,
committer_email: commit_author.email,
commit_message: commit_content.message,
sha: commit.sha,
access_token,
})
}
}
#[cfg(test)]
mod tests {
use crate::{utils::test::create_workflow_runner, WorkflowAPIEvent};
#[tokio::test]
async fn test_github_context() -> anyhow::Result<()> {
let workflow_runner = create_workflow_runner(
r#"
jobs:
test:
image: ubuntu
steps:
- name: Test step
run: echo "Hello world"
"#,
)?;
let event = WorkflowAPIEvent {
repo_owner: "panghu-huang".to_string(),
repo_name: "octocrate".to_string(),
ref_name: "refs/heads/main".to_string(),
pr_number: None,
sha: "95409faeae0e81635075091f56888e4bb5fc1a76".to_string(),
};
let github_context = workflow_runner.get_github_context(&event).await?;
assert_eq!(github_context.default_branch, "main");
assert_eq!(github_context.committer, "wokeyi");
assert_eq!(github_context.committer_email, "wokeyifrontend@gmail.com");
assert_eq!(github_context.is_private, false);
assert_eq!(github_context.commit_message, "release octocrate 0.1.0");
Ok(())
}
}