coodev-runner 0.1.42

A simple runner for coodev
Documentation
use crate::{
  error,
  runner::{
    sender::MessageSender,
    utils::{get_workflow_event_payload, is_workflow_finished},
    workflow::Workflow,
    GithubAuthorization, RunnerInner, WorkflowMessageSender,
  },
  Id, WorkflowAPIEvent, WorkflowEvent, WorkflowRunResult, WorkflowState, WorkflowTriggerEvents,
};
use derive_builder::Builder;
use octocrate::{GithubAPI, GithubApp, GithubPersonalAccessToken};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::{
  fs,
  sync::{watch, Semaphore},
};

#[derive(Debug, Clone)]
pub enum WorkflowCancelError {
  Finished,
  Cancelled,
}

#[derive(Debug, Clone)]
pub struct WorkflowGithubContext {
  pub default_branch: String,
  pub committer: String,
  pub committer_email: String,
  pub commit_message: String,
  pub is_private: bool,
  pub sha: String,
  pub access_token: String,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WorkflowRunOptions {
  pub run_id: Id,
  pub environments: Option<HashMap<String, String>>,
}

// #[derive(Clone)]
pub struct WorkflowRunContext {
  pub id: Id,
  pub name: Option<String>,
  pub on: Option<WorkflowTriggerEvents>,
  pub event: WorkflowEvent,
  pub environments: Option<HashMap<String, String>>,
  pub working_dir: PathBuf,
  pub cache_dir: PathBuf,
  pub message_sender: WorkflowMessageSender,
  pub workflow_state: watch::Receiver<WorkflowState>,
  pub runner_inner: Arc<Mutex<RunnerInner>>,
  // pub actions: Actions,
  // git info
  pub repo_owner: String,
  pub repo_name: String,
  pub sha: String,
  pub pr_number: Option<u64>,
  pub ref_name: String,
  pub default_branch: String,
  pub committer: String,
  pub committer_email: String,
  pub commit_message: String,
  pub is_private: bool,
  pub access_token: String,
}

#[derive(Clone, Builder)]
pub struct WorkflowRunner {
  workflow: Workflow,
  event: WorkflowEvent,
  runner_working_dir: PathBuf,
  github_authorization: GithubAuthorization,
  #[builder(default = "Arc::new(Semaphore::new(1))")]
  workflow_semaphore: Arc<Semaphore>,
  #[builder(default = "Arc::new(watch::channel(WorkflowState::Pending).0)")]
  workflow_state_sender: Arc<watch::Sender<WorkflowState>>,
  workflow_message_sender: MessageSender,
  runner_inner: Arc<Mutex<RunnerInner>>,
}

impl WorkflowRunner {
  pub fn builder() -> WorkflowRunnerBuilder {
    WorkflowRunnerBuilder::default()
  }

  pub async fn run(&self, options: WorkflowRunOptions) -> crate::Result<WorkflowRunResult> {
    let workflow_run_id = options.run_id.clone();
    let event_payload = get_workflow_event_payload(self.event.clone())?;

    let workflow_message_sender = WorkflowMessageSender::new(
      workflow_run_id.clone(),
      self.workflow_message_sender.clone(),
    );

    workflow_message_sender.update_workflow_state(WorkflowState::Queued);

    let workflow_semaphore = self.workflow_semaphore.clone();

    let mut workflow_state_receiver = self.workflow_state_sender.subscribe();

    let permit = tokio::select! {
      res = workflow_semaphore.acquire() => {
        let permit = res.map_err(|e|
          error::Error::internal_runtime_error(format!("Failed to acquire workflow semaphore: {}", e))

        )?;

        Some(permit)
      },
      _ = workflow_state_receiver.changed() => {
        None
      }
    };
    let workflow_state = workflow_state_receiver.borrow().clone();

    if permit.is_none() || workflow_state == WorkflowState::Cancelled {
      log::info!("Workflow {} is cancelled before running", workflow_run_id);
      workflow_message_sender.update_workflow_state(WorkflowState::Cancelled);

      return Ok(WorkflowRunResult {
        state: WorkflowState::Cancelled,
        started_at: None,
        ended_at: None,
        jobs: HashMap::new(),
      });
    }

    let github_context;

    match self.get_github_context(&event_payload).await {
      Ok(ctx) => {
        github_context = ctx;
      }
      Err(e) => {
        log::error!("Failed to get github context: {}", e);
        workflow_message_sender.update_workflow_state(WorkflowState::Failed);

        return Ok(WorkflowRunResult {
          state: WorkflowState::Failed,
          started_at: None,
          ended_at: None,
          jobs: HashMap::new(),
        });
      }
    }

    // Workflow and cache directories will not be deleted after the workflow is finished
    let workflow_dir = self
      .runner_working_dir
      .join(&event_payload.repo_owner)
      .join(&event_payload.repo_name);

    let cache_dir = workflow_dir.join("cache");
    let working_dir = workflow_dir.join(&workflow_run_id);

    let WorkflowRunner {
      workflow, event, ..
    } = self.clone();

    let WorkflowAPIEvent {
      repo_owner,
      repo_name,
      pr_number,
      sha,
      ref_name,
      ..
    } = event_payload;

    let context = WorkflowRunContext {
      id: workflow_run_id.clone(),
      name: workflow.name.clone(),
      on: workflow.on.clone(),
      event,
      repo_owner,
      repo_name,
      ref_name,
      environments: options.environments,
      runner_inner: self.runner_inner.clone(),
      message_sender: workflow_message_sender.clone(),
      workflow_state: workflow_state_receiver.clone(),
      working_dir: working_dir.clone(),
      cache_dir: cache_dir.clone(),
      pr_number,
      sha,
      default_branch: github_context.default_branch,
      committer: github_context.committer,
      committer_email: github_context.committer_email,
      commit_message: github_context.commit_message,
      is_private: github_context.is_private,
      access_token: github_context.access_token,
    };

    fs::create_dir_all(&working_dir).await.map_err(|err| {
      error::Error::io_error(
        err,
        format!(
          "Failed to create workflow directory: {}",
          working_dir.display()
        ),
      )
    })?;
    fs::create_dir_all(&cache_dir).await.map_err(|err| {
      error::Error::io_error(
        err,
        format!("Failed to create cache directory: {}", cache_dir.display()),
      )
    })?;

    let res = workflow.run(context).await?;

    log::info!(
      "Workflow finished: {} {}",
      workflow_run_id,
      working_dir.display()
    );

    if let Err(err) = fs::remove_dir_all(&working_dir).await {
      log::error!("Failed to remove workflow directory: {}", err);
    }

    Ok(res)
  }

  pub async fn cancel(&self) -> Result<(), WorkflowCancelError> {
    // log::info!("Cancelling workflow: {}", self.workflow.id);
    let workflow_state = self.workflow_state_sender.borrow().clone();
    if workflow_state == WorkflowState::Cancelled {
      // log::info!("Workflow {} is already cancelled", self.workflow.id);
      return Err(WorkflowCancelError::Cancelled);
    }
    if workflow_state == WorkflowState::Succeeded
      || workflow_state == WorkflowState::Failed
      || workflow_state == WorkflowState::Skipped
    {
      // log::info!("Workflow {} is already finished", self.workflow.id);
      return Err(WorkflowCancelError::Finished);
    }
    self
      .workflow_state_sender
      .send_replace(WorkflowState::Cancelled);

    if workflow_state == WorkflowState::Pending {
      // log::info!("Workflow {} is not running", self.workflow.id);
      return Ok(());
    }

    let mut workflow_state_receiver = self.workflow_state_sender.subscribe();

    while let Ok(_) = workflow_state_receiver.changed().await {
      let workflow_state = workflow_state_receiver.borrow();

      if is_workflow_finished(&workflow_state.clone()) {
        break;
      }
    }

    Ok(())
  }

  pub fn get_workflow(&self) -> Workflow {
    self.workflow.clone()
  }

  async fn get_github_api(
    &self,
    repo_owner: &String,
    repo_name: &String,
  ) -> crate::Result<(GithubAPI, String)> {
    let github_api = match &self.github_authorization {
      GithubAuthorization::PersonalAccessToken(token) => {
        let access_token = GithubPersonalAccessToken::new(token);
        (GithubAPI::with_token(access_token), token.clone())
      }
      GithubAuthorization::GithubApp {
        app_id,
        private_key,
      } => {
        let github_app = GithubApp::builder()
          .app_id(app_id.to_string())
          .private_key(private_key)
          .build()
          .map_err(|err| error::Error::github_error(err, "Failed to create github app"))?;

        let installation = github_app
          .get_repository_installation(repo_owner, repo_name)
          .await
          .map_err(|err| {
            error::Error::github_error(err, "Failed to get repository installation")
          })?;

        let access_token = github_app
          .get_installation_access_token(installation.id)
          .await
          .map_err(|err| {
            error::Error::github_error(err, "Failed to get installation access token")
          })?;

        let github_api = github_app.get_api(installation.id).await.map_err(|err| {
          error::Error::github_error(err, "Failed to get github api for installation")
        })?;

        (github_api, access_token.token)
      }
    };

    Ok(github_api)
  }

  async fn get_github_context(
    &self,
    event: &WorkflowAPIEvent,
  ) -> crate::Result<WorkflowGithubContext> {
    let (github_api, access_token) = self
      .get_github_api(&event.repo_owner, &event.repo_name)
      .await?;

    let repository = github_api
      .repositories
      .get_repository(&event.repo_owner, &event.repo_name)
      .send()
      .await
      .map_err(|err| {
        error::Error::github_error(
          err,
          format!(
            "Failed to get repository {}/{}",
            &event.repo_owner, &event.repo_name
          ),
        )
      })?;

    let commit = github_api
      .commits
      .get_commit(&event.repo_owner, &event.repo_name, &event.sha)
      .send()
      .await
      .map_err(|err| {
        error::Error::github_error(
          err,
          format!(
            "Failed to get commit {}/{}@{}",
            &event.repo_owner, &event.repo_name, &event.sha
          ),
        )
      })?;

    let commit_content = commit.commit.unwrap();
    let commit_author = commit_content.author;

    Ok(WorkflowGithubContext {
      default_branch: repository.default_branch.unwrap(),
      is_private: repository.private,
      committer: commit_author.name,
      committer_email: commit_author.email,
      commit_message: commit_content.message,
      sha: commit.sha,
      access_token,
    })
  }

  // async fn get_event_changed_files
}

#[cfg(test)]
mod tests {
  use crate::{utils::test::create_workflow_runner, WorkflowAPIEvent};

  #[tokio::test]
  async fn test_github_context() -> anyhow::Result<()> {
    let workflow_runner = create_workflow_runner(
      r#"
      jobs:
        test:
          image: ubuntu
          steps:
            - name: Test step
              run: echo "Hello world"
      "#,
    )?;

    let event = WorkflowAPIEvent {
      repo_owner: "panghu-huang".to_string(),
      repo_name: "octocrate".to_string(),
      ref_name: "refs/heads/main".to_string(),
      pr_number: None,
      sha: "95409faeae0e81635075091f56888e4bb5fc1a76".to_string(),
    };

    let github_context = workflow_runner.get_github_context(&event).await?;

    assert_eq!(github_context.default_branch, "main");
    assert_eq!(github_context.committer, "wokeyi");
    assert_eq!(github_context.committer_email, "wokeyifrontend@gmail.com");
    assert_eq!(github_context.is_private, false);
    assert_eq!(github_context.commit_message, "release octocrate 0.1.0");

    Ok(())
  }
}