1use crate::{
2 error,
3 runner::{
4 sender::MessageSender,
5 utils::{get_workflow_event_payload, is_workflow_finished},
6 workflow::Workflow,
7 GithubAuthorization, RunnerInner, WorkflowMessageSender,
8 },
9 Id, WorkflowAPIEvent, WorkflowEvent, WorkflowRunResult, WorkflowState, WorkflowTriggerEvents,
10};
11use derive_builder::Builder;
12use octocrate::{GithubAPI, GithubApp, GithubPersonalAccessToken};
13use parking_lot::Mutex;
14use serde::{Deserialize, Serialize};
15use std::{collections::HashMap, path::PathBuf, sync::Arc};
16use tokio::{
17 fs,
18 sync::{watch, Semaphore},
19};
20
21#[derive(Debug, Clone)]
22pub enum WorkflowCancelError {
23 Finished,
24 Cancelled,
25}
26
27#[derive(Debug, Clone)]
28pub struct WorkflowGithubContext {
29 pub default_branch: String,
30 pub committer: String,
31 pub committer_email: String,
32 pub commit_message: String,
33 pub is_private: bool,
34 pub sha: String,
35 pub access_token: String,
36}
37
38#[derive(Serialize, Deserialize, Debug, Clone)]
39pub struct WorkflowRunOptions {
40 pub run_id: Id,
41 pub environments: Option<HashMap<String, String>>,
42}
43
44pub struct WorkflowRunContext {
46 pub id: Id,
47 pub name: Option<String>,
48 pub on: Option<WorkflowTriggerEvents>,
49 pub event: WorkflowEvent,
50 pub environments: Option<HashMap<String, String>>,
51 pub working_dir: PathBuf,
52 pub cache_dir: PathBuf,
53 pub message_sender: WorkflowMessageSender,
54 pub workflow_state: watch::Receiver<WorkflowState>,
55 pub runner_inner: Arc<Mutex<RunnerInner>>,
56 pub repo_owner: String,
59 pub repo_name: String,
60 pub sha: String,
61 pub pr_number: Option<u64>,
62 pub ref_name: String,
63 pub default_branch: String,
64 pub committer: String,
65 pub committer_email: String,
66 pub commit_message: String,
67 pub is_private: bool,
68 pub access_token: String,
69}
70
71#[derive(Clone, Builder)]
72pub struct WorkflowRunner {
73 workflow: Workflow,
74 event: WorkflowEvent,
75 runner_working_dir: PathBuf,
76 github_authorization: GithubAuthorization,
77 #[builder(default = "Arc::new(Semaphore::new(1))")]
78 workflow_semaphore: Arc<Semaphore>,
79 #[builder(default = "Arc::new(watch::channel(WorkflowState::Pending).0)")]
80 workflow_state_sender: Arc<watch::Sender<WorkflowState>>,
81 workflow_message_sender: MessageSender,
82 runner_inner: Arc<Mutex<RunnerInner>>,
83}
84
85impl WorkflowRunner {
86 pub fn builder() -> WorkflowRunnerBuilder {
87 WorkflowRunnerBuilder::default()
88 }
89
90 pub async fn run(&self, options: WorkflowRunOptions) -> crate::Result<WorkflowRunResult> {
91 let workflow_run_id = options.run_id.clone();
92 let event_payload = get_workflow_event_payload(self.event.clone())?;
93
94 let workflow_message_sender = WorkflowMessageSender::new(
95 workflow_run_id.clone(),
96 self.workflow_message_sender.clone(),
97 );
98
99 workflow_message_sender.update_workflow_state(WorkflowState::Queued);
100
101 let workflow_semaphore = self.workflow_semaphore.clone();
102
103 let mut workflow_state_receiver = self.workflow_state_sender.subscribe();
104
105 let permit = tokio::select! {
106 res = workflow_semaphore.acquire() => {
107 let permit = res.map_err(|e|
108 error::Error::internal_runtime_error(format!("Failed to acquire workflow semaphore: {}", e))
109 )?;
110
111 Some(permit)
112 },
113 _ = workflow_state_receiver.changed() => {
114 None
115 }
116 };
117 let workflow_state = workflow_state_receiver.borrow().clone();
118
119 if permit.is_none() || workflow_state == WorkflowState::Cancelled {
120 log::info!("Workflow {} is cancelled before running", workflow_run_id);
121 workflow_message_sender.update_workflow_state(WorkflowState::Cancelled);
122
123 return Ok(WorkflowRunResult {
124 state: WorkflowState::Cancelled,
125 started_at: None,
126 ended_at: None,
127 jobs: HashMap::new(),
128 });
129 }
130
131 let github_context;
132
133 match self.get_github_context(&event_payload).await {
134 Ok(ctx) => {
135 github_context = ctx;
136 }
137 Err(e) => {
138 log::error!("Failed to get github context: {}", e);
139 workflow_message_sender.update_workflow_state(WorkflowState::Failed);
140
141 return Ok(WorkflowRunResult {
142 state: WorkflowState::Failed,
143 started_at: None,
144 ended_at: None,
145 jobs: HashMap::new(),
146 });
147 }
148 }
149
150 let workflow_dir = self
152 .runner_working_dir
153 .join(&event_payload.repo_owner)
154 .join(&event_payload.repo_name);
155
156 let cache_dir = workflow_dir.join("cache");
157 let working_dir = workflow_dir.join(&workflow_run_id);
158
159 let WorkflowRunner {
160 workflow, event, ..
161 } = self.clone();
162
163 let WorkflowAPIEvent {
164 repo_owner,
165 repo_name,
166 pr_number,
167 sha,
168 ref_name,
169 ..
170 } = event_payload;
171
172 let context = WorkflowRunContext {
173 id: workflow_run_id.clone(),
174 name: workflow.name.clone(),
175 on: workflow.on.clone(),
176 event,
177 repo_owner,
178 repo_name,
179 ref_name,
180 environments: options.environments,
181 runner_inner: self.runner_inner.clone(),
182 message_sender: workflow_message_sender.clone(),
183 workflow_state: workflow_state_receiver.clone(),
184 working_dir: working_dir.clone(),
185 cache_dir: cache_dir.clone(),
186 pr_number,
187 sha,
188 default_branch: github_context.default_branch,
189 committer: github_context.committer,
190 committer_email: github_context.committer_email,
191 commit_message: github_context.commit_message,
192 is_private: github_context.is_private,
193 access_token: github_context.access_token,
194 };
195
196 fs::create_dir_all(&working_dir).await.map_err(|err| {
197 error::Error::io_error(
198 err,
199 format!(
200 "Failed to create workflow directory: {}",
201 working_dir.display()
202 ),
203 )
204 })?;
205 fs::create_dir_all(&cache_dir).await.map_err(|err| {
206 error::Error::io_error(
207 err,
208 format!("Failed to create cache directory: {}", cache_dir.display()),
209 )
210 })?;
211
212 let res = workflow.run(context).await?;
213
214 log::info!(
215 "Workflow finished: {} {}",
216 workflow_run_id,
217 working_dir.display()
218 );
219
220 if let Err(err) = fs::remove_dir_all(&working_dir).await {
221 log::error!("Failed to remove workflow directory: {}", err);
222 }
223
224 Ok(res)
225 }
226
227 pub async fn cancel(&self) -> Result<(), WorkflowCancelError> {
228 let workflow_state = self.workflow_state_sender.borrow().clone();
230 if workflow_state == WorkflowState::Cancelled {
231 return Err(WorkflowCancelError::Cancelled);
233 }
234 if workflow_state == WorkflowState::Succeeded
235 || workflow_state == WorkflowState::Failed
236 || workflow_state == WorkflowState::Skipped
237 {
238 return Err(WorkflowCancelError::Finished);
240 }
241 self
242 .workflow_state_sender
243 .send_replace(WorkflowState::Cancelled);
244
245 if workflow_state == WorkflowState::Pending {
246 return Ok(());
248 }
249
250 let mut workflow_state_receiver = self.workflow_state_sender.subscribe();
251
252 while let Ok(_) = workflow_state_receiver.changed().await {
253 let workflow_state = workflow_state_receiver.borrow();
254
255 if is_workflow_finished(&workflow_state.clone()) {
256 break;
257 }
258 }
259
260 Ok(())
261 }
262
263 pub fn get_workflow(&self) -> Workflow {
264 self.workflow.clone()
265 }
266
267 async fn get_github_api(
268 &self,
269 repo_owner: &String,
270 repo_name: &String,
271 ) -> crate::Result<(GithubAPI, String)> {
272 let github_api = match &self.github_authorization {
273 GithubAuthorization::PersonalAccessToken(token) => {
274 let access_token = GithubPersonalAccessToken::new(token);
275 (GithubAPI::with_token(access_token), token.clone())
276 }
277 GithubAuthorization::GithubApp {
278 app_id,
279 private_key,
280 } => {
281 let github_app = GithubApp::builder()
282 .app_id(app_id.to_string())
283 .private_key(private_key)
284 .build()
285 .map_err(|err| error::Error::github_error(err, "Failed to create github app"))?;
286
287 let installation = github_app
288 .get_repository_installation(repo_owner, repo_name)
289 .await
290 .map_err(|err| {
291 error::Error::github_error(err, "Failed to get repository installation")
292 })?;
293
294 let access_token = github_app
295 .get_installation_access_token(installation.id)
296 .await
297 .map_err(|err| {
298 error::Error::github_error(err, "Failed to get installation access token")
299 })?;
300
301 let github_api = github_app.get_api(installation.id).await.map_err(|err| {
302 error::Error::github_error(err, "Failed to get github api for installation")
303 })?;
304
305 (github_api, access_token.token)
306 }
307 };
308
309 Ok(github_api)
310 }
311
312 async fn get_github_context(
313 &self,
314 event: &WorkflowAPIEvent,
315 ) -> crate::Result<WorkflowGithubContext> {
316 let (github_api, access_token) = self
317 .get_github_api(&event.repo_owner, &event.repo_name)
318 .await?;
319
320 let repository = github_api
321 .repositories
322 .get_repository(&event.repo_owner, &event.repo_name)
323 .send()
324 .await
325 .map_err(|err| {
326 error::Error::github_error(
327 err,
328 format!(
329 "Failed to get repository {}/{}",
330 &event.repo_owner, &event.repo_name
331 ),
332 )
333 })?;
334
335 let commit = github_api
336 .commits
337 .get_commit(&event.repo_owner, &event.repo_name, &event.sha)
338 .send()
339 .await
340 .map_err(|err| {
341 error::Error::github_error(
342 err,
343 format!(
344 "Failed to get commit {}/{}@{}",
345 &event.repo_owner, &event.repo_name, &event.sha
346 ),
347 )
348 })?;
349
350 let commit_content = commit.commit.unwrap();
351 let commit_author = commit_content.author;
352
353 Ok(WorkflowGithubContext {
354 default_branch: repository.default_branch.unwrap(),
355 is_private: repository.private,
356 committer: commit_author.name,
357 committer_email: commit_author.email,
358 commit_message: commit_content.message,
359 sha: commit.sha,
360 access_token,
361 })
362 }
363
364 }
366
367#[cfg(test)]
368mod tests {
369 use crate::{utils::test::create_workflow_runner, WorkflowAPIEvent};
370
371 #[tokio::test]
372 async fn test_github_context() -> anyhow::Result<()> {
373 let workflow_runner = create_workflow_runner(
374 r#"
375 jobs:
376 test:
377 image: ubuntu
378 steps:
379 - name: Test step
380 run: echo "Hello world"
381 "#,
382 )?;
383
384 let event = WorkflowAPIEvent {
385 repo_owner: "panghu-huang".to_string(),
386 repo_name: "octocrate".to_string(),
387 ref_name: "refs/heads/main".to_string(),
388 pr_number: None,
389 sha: "95409faeae0e81635075091f56888e4bb5fc1a76".to_string(),
390 };
391
392 let github_context = workflow_runner.get_github_context(&event).await?;
393
394 assert_eq!(github_context.default_branch, "main");
395 assert_eq!(github_context.committer, "wokeyi");
396 assert_eq!(github_context.committer_email, "wokeyifrontend@gmail.com");
397 assert_eq!(github_context.is_private, false);
398 assert_eq!(github_context.commit_message, "release octocrate 0.1.0");
399
400 Ok(())
401 }
402}