coodev_runner/runner/
workflow_runner.rs

1use crate::{
2  error,
3  runner::{
4    sender::MessageSender,
5    utils::{get_workflow_event_payload, is_workflow_finished},
6    workflow::Workflow,
7    GithubAuthorization, RunnerInner, WorkflowMessageSender,
8  },
9  Id, WorkflowAPIEvent, WorkflowEvent, WorkflowRunResult, WorkflowState, WorkflowTriggerEvents,
10};
11use derive_builder::Builder;
12use octocrate::{GithubAPI, GithubApp, GithubPersonalAccessToken};
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use std::{collections::HashMap, path::PathBuf, sync::Arc};
16use tokio::{
17  fs,
18  sync::{watch, Semaphore},
19};
20
21#[derive(Debug, Clone)]
22pub enum WorkflowCancelError {
23  Finished,
24  Cancelled,
25}
26
27#[derive(Debug, Clone)]
28pub struct WorkflowGithubContext {
29  pub default_branch: String,
30  pub committer: String,
31  pub committer_email: String,
32  pub commit_message: String,
33  pub is_private: bool,
34  pub sha: String,
35  pub access_token: String,
36}
37
38#[derive(Serialize, Deserialize, Debug, Clone)]
39pub struct WorkflowRunOptions {
40  pub run_id: Id,
41  pub environments: Option<HashMap<String, String>>,
42}
43
44// #[derive(Clone)]
45pub struct WorkflowRunContext {
46  pub id: Id,
47  pub name: Option<String>,
48  pub on: Option<WorkflowTriggerEvents>,
49  pub event: WorkflowEvent,
50  pub environments: Option<HashMap<String, String>>,
51  pub working_dir: PathBuf,
52  pub cache_dir: PathBuf,
53  pub message_sender: WorkflowMessageSender,
54  pub workflow_state: watch::Receiver<WorkflowState>,
55  pub runner_inner: Arc<Mutex<RunnerInner>>,
56  // pub actions: Actions,
57  // git info
58  pub repo_owner: String,
59  pub repo_name: String,
60  pub sha: String,
61  pub pr_number: Option<u64>,
62  pub ref_name: String,
63  pub default_branch: String,
64  pub committer: String,
65  pub committer_email: String,
66  pub commit_message: String,
67  pub is_private: bool,
68  pub access_token: String,
69}
70
71#[derive(Clone, Builder)]
72pub struct WorkflowRunner {
73  workflow: Workflow,
74  event: WorkflowEvent,
75  runner_working_dir: PathBuf,
76  github_authorization: GithubAuthorization,
77  #[builder(default = "Arc::new(Semaphore::new(1))")]
78  workflow_semaphore: Arc<Semaphore>,
79  #[builder(default = "Arc::new(watch::channel(WorkflowState::Pending).0)")]
80  workflow_state_sender: Arc<watch::Sender<WorkflowState>>,
81  workflow_message_sender: MessageSender,
82  runner_inner: Arc<Mutex<RunnerInner>>,
83}
84
85impl WorkflowRunner {
86  pub fn builder() -> WorkflowRunnerBuilder {
87    WorkflowRunnerBuilder::default()
88  }
89
90  pub async fn run(&self, options: WorkflowRunOptions) -> crate::Result<WorkflowRunResult> {
91    let workflow_run_id = options.run_id.clone();
92    let event_payload = get_workflow_event_payload(self.event.clone())?;
93
94    let workflow_message_sender = WorkflowMessageSender::new(
95      workflow_run_id.clone(),
96      self.workflow_message_sender.clone(),
97    );
98
99    workflow_message_sender.update_workflow_state(WorkflowState::Queued);
100
101    let workflow_semaphore = self.workflow_semaphore.clone();
102
103    let mut workflow_state_receiver = self.workflow_state_sender.subscribe();
104
105    let permit = tokio::select! {
106      res = workflow_semaphore.acquire() => {
107        let permit = res.map_err(|e|
108          error::Error::internal_runtime_error(format!("Failed to acquire workflow semaphore: {}", e))
109        )?;
110
111        Some(permit)
112      },
113      _ = workflow_state_receiver.changed() => {
114        None
115      }
116    };
117    let workflow_state = workflow_state_receiver.borrow().clone();
118
119    if permit.is_none() || workflow_state == WorkflowState::Cancelled {
120      log::info!("Workflow {} is cancelled before running", workflow_run_id);
121      workflow_message_sender.update_workflow_state(WorkflowState::Cancelled);
122
123      return Ok(WorkflowRunResult {
124        state: WorkflowState::Cancelled,
125        started_at: None,
126        ended_at: None,
127        jobs: HashMap::new(),
128      });
129    }
130
131    let github_context;
132
133    match self.get_github_context(&event_payload).await {
134      Ok(ctx) => {
135        github_context = ctx;
136      }
137      Err(e) => {
138        log::error!("Failed to get github context: {}", e);
139        workflow_message_sender.update_workflow_state(WorkflowState::Failed);
140
141        return Ok(WorkflowRunResult {
142          state: WorkflowState::Failed,
143          started_at: None,
144          ended_at: None,
145          jobs: HashMap::new(),
146        });
147      }
148    }
149
150    // Workflow and cache directories will not be deleted after the workflow is finished
151    let workflow_dir = self
152      .runner_working_dir
153      .join(&event_payload.repo_owner)
154      .join(&event_payload.repo_name);
155
156    let cache_dir = workflow_dir.join("cache");
157    let working_dir = workflow_dir.join(&workflow_run_id);
158
159    let WorkflowRunner {
160      workflow, event, ..
161    } = self.clone();
162
163    let WorkflowAPIEvent {
164      repo_owner,
165      repo_name,
166      pr_number,
167      sha,
168      ref_name,
169      ..
170    } = event_payload;
171
172    let context = WorkflowRunContext {
173      id: workflow_run_id.clone(),
174      name: workflow.name.clone(),
175      on: workflow.on.clone(),
176      event,
177      repo_owner,
178      repo_name,
179      ref_name,
180      environments: options.environments,
181      runner_inner: self.runner_inner.clone(),
182      message_sender: workflow_message_sender.clone(),
183      workflow_state: workflow_state_receiver.clone(),
184      working_dir: working_dir.clone(),
185      cache_dir: cache_dir.clone(),
186      pr_number,
187      sha,
188      default_branch: github_context.default_branch,
189      committer: github_context.committer,
190      committer_email: github_context.committer_email,
191      commit_message: github_context.commit_message,
192      is_private: github_context.is_private,
193      access_token: github_context.access_token,
194    };
195
196    fs::create_dir_all(&working_dir).await.map_err(|err| {
197      error::Error::io_error(
198        err,
199        format!(
200          "Failed to create workflow directory: {}",
201          working_dir.display()
202        ),
203      )
204    })?;
205    fs::create_dir_all(&cache_dir).await.map_err(|err| {
206      error::Error::io_error(
207        err,
208        format!("Failed to create cache directory: {}", cache_dir.display()),
209      )
210    })?;
211
212    let res = workflow.run(context).await?;
213
214    log::info!(
215      "Workflow finished: {} {}",
216      workflow_run_id,
217      working_dir.display()
218    );
219
220    if let Err(err) = fs::remove_dir_all(&working_dir).await {
221      log::error!("Failed to remove workflow directory: {}", err);
222    }
223
224    Ok(res)
225  }
226
227  pub async fn cancel(&self) -> Result<(), WorkflowCancelError> {
228    // log::info!("Cancelling workflow: {}", self.workflow.id);
229    let workflow_state = self.workflow_state_sender.borrow().clone();
230    if workflow_state == WorkflowState::Cancelled {
231      // log::info!("Workflow {} is already cancelled", self.workflow.id);
232      return Err(WorkflowCancelError::Cancelled);
233    }
234    if workflow_state == WorkflowState::Succeeded
235      || workflow_state == WorkflowState::Failed
236      || workflow_state == WorkflowState::Skipped
237    {
238      // log::info!("Workflow {} is already finished", self.workflow.id);
239      return Err(WorkflowCancelError::Finished);
240    }
241    self
242      .workflow_state_sender
243      .send_replace(WorkflowState::Cancelled);
244
245    if workflow_state == WorkflowState::Pending {
246      // log::info!("Workflow {} is not running", self.workflow.id);
247      return Ok(());
248    }
249
250    let mut workflow_state_receiver = self.workflow_state_sender.subscribe();
251
252    while let Ok(_) = workflow_state_receiver.changed().await {
253      let workflow_state = workflow_state_receiver.borrow();
254
255      if is_workflow_finished(&workflow_state.clone()) {
256        break;
257      }
258    }
259
260    Ok(())
261  }
262
263  pub fn get_workflow(&self) -> Workflow {
264    self.workflow.clone()
265  }
266
267  async fn get_github_api(
268    &self,
269    repo_owner: &String,
270    repo_name: &String,
271  ) -> crate::Result<(GithubAPI, String)> {
272    let github_api = match &self.github_authorization {
273      GithubAuthorization::PersonalAccessToken(token) => {
274        let access_token = GithubPersonalAccessToken::new(token);
275        (GithubAPI::with_token(access_token), token.clone())
276      }
277      GithubAuthorization::GithubApp {
278        app_id,
279        private_key,
280      } => {
281        let github_app = GithubApp::builder()
282          .app_id(app_id.to_string())
283          .private_key(private_key)
284          .build()
285          .map_err(|err| error::Error::github_error(err, "Failed to create github app"))?;
286
287        let installation = github_app
288          .get_repository_installation(repo_owner, repo_name)
289          .await
290          .map_err(|err| {
291            error::Error::github_error(err, "Failed to get repository installation")
292          })?;
293
294        let access_token = github_app
295          .get_installation_access_token(installation.id)
296          .await
297          .map_err(|err| {
298            error::Error::github_error(err, "Failed to get installation access token")
299          })?;
300
301        let github_api = github_app.get_api(installation.id).await.map_err(|err| {
302          error::Error::github_error(err, "Failed to get github api for installation")
303        })?;
304
305        (github_api, access_token.token)
306      }
307    };
308
309    Ok(github_api)
310  }
311
312  async fn get_github_context(
313    &self,
314    event: &WorkflowAPIEvent,
315  ) -> crate::Result<WorkflowGithubContext> {
316    let (github_api, access_token) = self
317      .get_github_api(&event.repo_owner, &event.repo_name)
318      .await?;
319
320    let repository = github_api
321      .repositories
322      .get_repository(&event.repo_owner, &event.repo_name)
323      .send()
324      .await
325      .map_err(|err| {
326        error::Error::github_error(
327          err,
328          format!(
329            "Failed to get repository {}/{}",
330            &event.repo_owner, &event.repo_name
331          ),
332        )
333      })?;
334
335    let commit = github_api
336      .commits
337      .get_commit(&event.repo_owner, &event.repo_name, &event.sha)
338      .send()
339      .await
340      .map_err(|err| {
341        error::Error::github_error(
342          err,
343          format!(
344            "Failed to get commit {}/{}@{}",
345            &event.repo_owner, &event.repo_name, &event.sha
346          ),
347        )
348      })?;
349
350    let commit_content = commit.commit.unwrap();
351    let commit_author = commit_content.author;
352
353    Ok(WorkflowGithubContext {
354      default_branch: repository.default_branch.unwrap(),
355      is_private: repository.private,
356      committer: commit_author.name,
357      committer_email: commit_author.email,
358      commit_message: commit_content.message,
359      sha: commit.sha,
360      access_token,
361    })
362  }
363
364  // async fn get_event_changed_files
365}
366
367#[cfg(test)]
368mod tests {
369  use crate::{utils::test::create_workflow_runner, WorkflowAPIEvent};
370
371  #[tokio::test]
372  async fn test_github_context() -> anyhow::Result<()> {
373    let workflow_runner = create_workflow_runner(
374      r#"
375      jobs:
376        test:
377          image: ubuntu
378          steps:
379            - name: Test step
380              run: echo "Hello world"
381      "#,
382    )?;
383
384    let event = WorkflowAPIEvent {
385      repo_owner: "panghu-huang".to_string(),
386      repo_name: "octocrate".to_string(),
387      ref_name: "refs/heads/main".to_string(),
388      pr_number: None,
389      sha: "95409faeae0e81635075091f56888e4bb5fc1a76".to_string(),
390    };
391
392    let github_context = workflow_runner.get_github_context(&event).await?;
393
394    assert_eq!(github_context.default_branch, "main");
395    assert_eq!(github_context.committer, "wokeyi");
396    assert_eq!(github_context.committer_email, "wokeyifrontend@gmail.com");
397    assert_eq!(github_context.is_private, false);
398    assert_eq!(github_context.commit_message, "release octocrate 0.1.0");
399
400    Ok(())
401  }
402}