1use crate::{
2 error,
3 runner::{
4 docker::Docker, sender::StepMessageSender, GlobalSecret, GlobalVolume, JobContext,
5 OrganizationSecret, OrganizationVolume, RepositorySecret, RepositoryVolume, RunnerInner,
6 Secret, Volume,
7 },
8 EnvironmentVariable, StepNumber, StepRunResult, WorkflowState,
9};
10use parking_lot::Mutex;
11use serde::{Deserialize, Serialize};
12use std::{collections::HashMap, path::PathBuf, process::ExitStatus, sync::Arc, time::Duration};
13use tokio::{fs, io::AsyncWriteExt, sync::broadcast::error::RecvError};
14
15#[derive(Serialize, Deserialize, Debug, Clone)]
16pub struct StepSecret {
17 pub key: String,
18}
19
20#[derive(Serialize, Deserialize, Debug, Clone)]
21pub struct StepVolume {
22 pub key: String,
23 pub to: String,
24}
25
26#[derive(Serialize, Deserialize, Debug, Clone)]
27pub struct Step {
28 pub name: Option<String>,
29 pub image: String,
30 pub run: String,
31 pub continue_on_error: bool,
32 pub runner_dir: String,
33 pub working_dir: String,
34 pub cache_dir: String,
35 pub environments: Option<HashMap<String, EnvironmentVariable>>,
36 pub secrets: Vec<StepSecret>,
37 pub volumes: Vec<StepVolume>,
38 pub timeout: Duration,
39 pub security_opts: Option<Vec<String>>,
40}
41
42pub struct StepRunContext {
43 pub number: u64,
44 pub message_sender: StepMessageSender,
45 pub job_context: Arc<JobContext>,
46}
47
48#[derive(Debug)]
49pub enum StepCommandResult {
50 ExitStatus(ExitStatus),
51 Cancelled,
52 Failed(error::Error),
53}
54
55fn get_secret_value(state: &Arc<Mutex<RunnerInner>>, secret_key: &String) -> Option<String> {
56 let state = state.lock();
57 let secret_value = state.secrets.get(secret_key);
58
59 match secret_value {
60 Some(secret) => match secret {
61 Secret::Repository(RepositorySecret { value, .. }) => Some(value.clone()),
62 Secret::Organization(OrganizationSecret { value, .. }) => Some(value.clone()),
63 Secret::Global(GlobalSecret { value, .. }) => Some(value.clone()),
64 },
65 None => None,
66 }
67}
68
69fn get_volume_value(state: &Arc<Mutex<RunnerInner>>, volume_key: &String) -> Option<String> {
70 match state.lock().volumes.get(volume_key) {
71 Some(volume) => match volume {
72 Volume::Repository(RepositoryVolume { path, .. }) => Some(path.clone()),
73 Volume::Organization(OrganizationVolume { path, .. }) => Some(path.clone()),
74 Volume::Global(GlobalVolume { path, .. }) => Some(path.clone()),
75 },
76 None => None,
77 }
78}
79
80impl Step {
81 pub async fn run(&self, context: StepRunContext) -> crate::Result<StepRunResult> {
82 let started_at = chrono::Utc::now();
83 let workflow_id = context.job_context.workflow_run_context.id.clone();
84 let job_id = context.job_context.id.clone();
85 let number = context.number;
86
87 context
88 .message_sender
89 .update_step_state(WorkflowState::InProgress);
90
91 let docker_name = format!("workflow-step-{}_{}_{}", workflow_id, job_id, number);
92
93 let entrypoint_path = self.create_entrypoint_script(&context).await?;
94 let entrypoint_path_string =
95 entrypoint_path
96 .to_str()
97 .ok_or(error::Error::internal_runtime_error(
98 "Failed to convert PathBuf to &str",
99 ))?;
100
101 let docker = self.generate_docker(
102 docker_name.clone(),
103 entrypoint_path_string.to_string(),
104 &context,
105 )?;
106
107 let mut workflow_state_receiver = context
108 .job_context
109 .workflow_run_context
110 .workflow_state
111 .clone();
112
113 let step_command_result = tokio::select! {
114 res = self.run_docker(docker.clone(), &context) => {
115 match res {
116 Ok(result) => {
117 result
118 },
119 Err(err) => {
120 StepCommandResult::Failed(err)
121 }
122 }
123 }
124 _ = workflow_state_receiver.changed() => {
125 let command = workflow_state_receiver.borrow().clone();
126 if command == WorkflowState::Cancelled {
127 log::info!("Cancel command received");
128 docker
129 .kill()
130 .await
131 .map_err(|err| error::Error::internal_runtime_error(
132 format!("Failed to stop docker container: {}", err.to_string())
133 ))?;
134
135 StepCommandResult::Cancelled
136 } else {
137 StepCommandResult::Failed(error::Error::internal_runtime_error(
138 format!("Unexpected command received: {}", command.to_string())
139 ))
140 }
141 }
142 _ = tokio::time::sleep(self.timeout) => {
144 log::info!("Timeout");
145 docker
146 .kill()
147 .await
148 .map_err(|err| error::Error::internal_runtime_error(
149 format!("Failed to stop docker container: {}", err.to_string())
150 ))?;
151
152 StepCommandResult::Cancelled
153 }
154 };
155
156 let ended_at = chrono::Utc::now();
158
159 log::info!("Cleanup files");
160
161 log::info!(
162 "Duration: {}ms",
163 ended_at.timestamp_millis() - started_at.timestamp_millis()
164 );
165
166 let (state, error) = match step_command_result {
168 StepCommandResult::ExitStatus(status) => {
169 if status.success() {
170 (WorkflowState::Succeeded, None)
171 } else {
172 (
173 WorkflowState::Failed,
174 Some(error::Error::failed(status.clone())),
175 )
176 }
177 }
178 StepCommandResult::Cancelled => (WorkflowState::Cancelled, None),
179 StepCommandResult::Failed(err) => {
180 log::error!("Step failed: {}", err.to_string());
181
182 (WorkflowState::Failed, Some(err))
183 }
184 };
185
186 context.message_sender.update_step_state(state.clone());
187
188 Ok(StepRunResult {
189 error,
190 state,
191 started_at: Some(started_at),
192 ended_at: Some(ended_at),
193 })
194 }
195
196 async fn run_docker(
197 &self,
198 docker: Docker,
199 context: &StepRunContext,
200 ) -> crate::Result<StepCommandResult> {
201 let sender = context.message_sender.clone();
202
203 let workflow_state = context
205 .job_context
206 .workflow_run_context
207 .workflow_state
208 .borrow()
209 .clone();
210
211 if workflow_state == WorkflowState::Cancelled {
212 return Ok(StepCommandResult::Cancelled);
213 }
214
215 let mut log_receiver = docker.subscribe_logs();
216
217 let join_handle = tokio::task::spawn(async move {
218 loop {
219 match log_receiver.recv().await {
220 Ok(log) => {
221 sender.send_log(log);
222 }
223 Err(err) => {
224 if err == RecvError::Closed {
225 log::info!("Log receiver closed");
226 break;
227 }
228 log::error!("Failed to receive log: {}", err.to_string());
229 }
230 };
231 }
232 });
233
234 let status = docker.run().await.map_err(|err| {
235 error::Error::internal_runtime_error(format!(
236 "Failed to run docker container: {}",
237 err.to_string()
238 ))
239 })?;
240 join_handle.abort();
241
242 Ok(StepCommandResult::ExitStatus(status))
243 }
244
245 fn generate_docker(
246 &self,
247 docker_name: String,
248 entrypoint_path: String,
249 context: &StepRunContext,
250 ) -> crate::Result<Docker> {
251 let image = self.image.clone();
252
253 let host_user_dir =
254 context
255 .job_context
256 .host_user_dir
257 .to_str()
258 .ok_or(error::Error::internal_runtime_error(
259 "Failed to parse user dir",
260 ))?;
261
262 let container_cache_dir = context
263 .job_context
264 .workflow_run_context
265 .cache_dir
266 .to_str()
267 .ok_or(error::Error::internal_runtime_error(
268 "Failed to parse cache dir",
269 ))?;
270
271 let workspace_dir = self.working_dir.clone();
272 let runner_dir = self.runner_dir.clone();
273 let cache_dir = self.cache_dir.clone();
274
275 let environments = self.get_environments(&context)?;
276
277 let docker_entrypoint_path = format!("{}/entrypoint.sh", runner_dir);
278
279 let mut docker = Docker::new(docker_name)
280 .image(image)
281 .working_dir(workspace_dir.clone())
282 .volume(entrypoint_path, docker_entrypoint_path.clone())
283 .volume(host_user_dir, workspace_dir)
284 .volume(container_cache_dir, cache_dir)
285 .entrypoint(docker_entrypoint_path)
286 .auto_remove(true);
287
288 for (key, value) in environments {
289 docker = docker.environment(key, value);
290 }
291
292 for (from, to) in &context.job_context.working_dir_maps {
293 let from = from.to_str().ok_or(error::Error::internal_runtime_error(
294 "Failed to parse working dir",
295 ))?;
296 docker = docker.volume(from, to);
297 }
298
299 let inner_state = context
300 .job_context
301 .workflow_run_context
302 .runner_inner
303 .clone();
304 for volume in self.volumes.clone() {
305 let from = get_volume_value(&inner_state, &volume.key).ok_or(
306 error::Error::workflow_config_error(format!("Volume `{}` is not defined.", volume.key)),
307 )?;
308 docker = docker.volume(from, volume.to);
309 }
310
311 drop(inner_state);
312
313 if let Some(security_opts) = self.security_opts.clone() {
314 for opt in security_opts {
315 docker = docker.security_opt(opt);
316 }
317 }
318
319 Ok(docker)
320 }
321
322 fn get_environments(&self, context: &StepRunContext) -> crate::Result<HashMap<String, String>> {
323 let mut environments: HashMap<String, String> = HashMap::new();
324
325 if let Some(envs) = context
327 .job_context
328 .workflow_run_context
329 .environments
330 .clone()
331 {
332 for (key, value) in envs {
333 environments.insert(key, value);
334 }
335 }
336
337 if let Some(envs) = self.environments.clone() {
340 for (key, value) in envs {
341 match value {
342 EnvironmentVariable::String(value) => {
343 environments.insert(key, value);
344 }
345 EnvironmentVariable::Number(value) => {
346 environments.insert(key, value.to_string());
347 }
348 EnvironmentVariable::Boolean(value) => {
349 environments.insert(key, value.to_string());
350 }
351 }
352 }
353 }
354
355 let inner_state = context
356 .job_context
357 .workflow_run_context
358 .runner_inner
359 .clone();
360
361 for secret in self.secrets.clone() {
362 let value = get_secret_value(&inner_state, &secret.key).ok_or(
363 error::Error::workflow_config_error(format!("Secret `{}` is not defined.", secret.key)),
364 )?;
365 environments.insert(secret.key, value);
366 }
367
368 drop(inner_state);
369
370 let workspace_dir = self.working_dir.clone();
371 let cache_dir = self.cache_dir.clone();
372 let envs = self.preset_environments(
374 context.number,
375 workspace_dir,
376 cache_dir,
377 &context.job_context,
378 );
379 for (key, value) in envs {
380 environments.insert(key, value);
381 }
382
383 Ok(environments)
384 }
385
386 fn preset_environments(
388 &self,
389 number: StepNumber,
390 workspace: String,
391 cache_dir: String,
392 job_context: &JobContext,
393 ) -> HashMap<String, String> {
394 let mut environments: Vec<(&str, String)> = vec![];
395
396 environments.push(("COODEV_STEP_ID", number.to_string()));
397
398 environments.push(("COODEV_JOB_ID", job_context.id.to_string()));
399
400 environments.push(("COODEV_WORKSPACE", workspace));
401
402 environments.push(("COODEV_CACHE_DIR", cache_dir));
403
404 environments.push(("CI", "COODEV".to_string()));
405
406 let ctx = &job_context.workflow_run_context;
407
408 environments.push(("COODEV_WORKFLOW_ID", ctx.id.to_string()));
409
410 environments.push(("COODEV_REF", ctx.ref_name.to_string()));
411
412 environments.push(("COODEV_REPO_OWNER", ctx.repo_owner.to_string()));
416
417 environments.push(("COODEV_REPO_NAME", ctx.repo_name.to_string()));
418
419 environments.push(("COODEV_DEFAULT_BRANCH", ctx.default_branch.to_string()));
420
421 environments.push((
422 "COODEV_REPOSITORY",
423 format!("{}/{}", ctx.repo_owner, ctx.repo_name),
424 ));
425
426 environments.push(("COODEV_SHA", ctx.sha.to_string()));
427
428 environments.push(("COODEV_COMMIT_MESSAGE", ctx.commit_message.to_string()));
429
430 environments.push(("COODEV_COMMITTER", ctx.committer.to_string()));
431
432 environments.push(("COODEV_COMMITTER_EMAIL", ctx.committer_email.to_string()));
433
434 environments.push(("COODEV_IS_PRIVATE", ctx.is_private.to_string()));
435
436 environments.push(("COODEV_ACCESS_TOKEN", ctx.access_token.to_string()));
437
438 if ctx.pr_number.is_some() {
439 environments.push(("COODEV_PR_NUMBER", ctx.pr_number.unwrap().to_string()));
440 } else {
441 environments.push(("COODEV_PR_NUMBER", "".to_string()));
442 }
443
444 environments
445 .into_iter()
446 .map(|(k, v)| (k.to_string(), v))
447 .collect()
448 }
449
450 async fn create_entrypoint_script(&self, context: &StepRunContext) -> crate::Result<PathBuf> {
451 let entrypoint_name = format!("step-entrypoint-{}.sh", context.number);
452 let entrypoint_path = context.job_context.host_working_dir.join(entrypoint_name);
453
454 let mut file;
455 #[cfg(unix)]
456 {
457 file = fs::OpenOptions::new()
458 .create(true)
459 .write(true)
460 .mode(0o777)
461 .open(&entrypoint_path)
462 .await
463 .map_err(|err| error::Error::io_error(err, "Failed to create entrypoint file"))?;
464 }
465 #[cfg(not(unix))]
466 {
467 file = fs::File::create(&entrypoint_path)
468 .await
469 .map_err(|err| error::Error::io_error(err, "Failed to create entrypoint file"))?;
470 }
471
472 file
473 .write(b"#!/bin/sh\n")
474 .await
475 .map_err(|err| error::Error::io_error(err, "Failed to write entrypoint file"))?;
476 file
477 .write_all(self.run.as_bytes())
478 .await
479 .map_err(|err| error::Error::io_error(err, "Failed to write entrypoint file"))?;
480
481 drop(file);
483
484 Ok(entrypoint_path)
485 }
486}