coodev_runner/runner/
step.rs

1use crate::{
2  error,
3  runner::{
4    docker::Docker, sender::StepMessageSender, GlobalSecret, GlobalVolume, JobContext,
5    OrganizationSecret, OrganizationVolume, RepositorySecret, RepositoryVolume, RunnerInner,
6    Secret, Volume,
7  },
8  EnvironmentVariable, StepNumber, StepRunResult, WorkflowState,
9};
10use parking_lot::Mutex;
11use serde::{Deserialize, Serialize};
12use std::{collections::HashMap, path::PathBuf, process::ExitStatus, sync::Arc, time::Duration};
13use tokio::{fs, io::AsyncWriteExt, sync::broadcast::error::RecvError};
14
15#[derive(Serialize, Deserialize, Debug, Clone)]
16pub struct StepSecret {
17  pub key: String,
18}
19
20#[derive(Serialize, Deserialize, Debug, Clone)]
21pub struct StepVolume {
22  pub key: String,
23  pub to: String,
24}
25
26#[derive(Serialize, Deserialize, Debug, Clone)]
27pub struct Step {
28  pub name: Option<String>,
29  pub image: String,
30  pub run: String,
31  pub continue_on_error: bool,
32  pub runner_dir: String,
33  pub working_dir: String,
34  pub cache_dir: String,
35  pub environments: Option<HashMap<String, EnvironmentVariable>>,
36  pub secrets: Vec<StepSecret>,
37  pub volumes: Vec<StepVolume>,
38  pub timeout: Duration,
39  pub security_opts: Option<Vec<String>>,
40}
41
42pub struct StepRunContext {
43  pub number: u64,
44  pub message_sender: StepMessageSender,
45  pub job_context: Arc<JobContext>,
46}
47
48#[derive(Debug)]
49pub enum StepCommandResult {
50  ExitStatus(ExitStatus),
51  Cancelled,
52  Failed(error::Error),
53}
54
55fn get_secret_value(state: &Arc<Mutex<RunnerInner>>, secret_key: &String) -> Option<String> {
56  let state = state.lock();
57  let secret_value = state.secrets.get(secret_key);
58
59  match secret_value {
60    Some(secret) => match secret {
61      Secret::Repository(RepositorySecret { value, .. }) => Some(value.clone()),
62      Secret::Organization(OrganizationSecret { value, .. }) => Some(value.clone()),
63      Secret::Global(GlobalSecret { value, .. }) => Some(value.clone()),
64    },
65    None => None,
66  }
67}
68
69fn get_volume_value(state: &Arc<Mutex<RunnerInner>>, volume_key: &String) -> Option<String> {
70  match state.lock().volumes.get(volume_key) {
71    Some(volume) => match volume {
72      Volume::Repository(RepositoryVolume { path, .. }) => Some(path.clone()),
73      Volume::Organization(OrganizationVolume { path, .. }) => Some(path.clone()),
74      Volume::Global(GlobalVolume { path, .. }) => Some(path.clone()),
75    },
76    None => None,
77  }
78}
79
80impl Step {
81  pub async fn run(&self, context: StepRunContext) -> crate::Result<StepRunResult> {
82    let started_at = chrono::Utc::now();
83    let workflow_id = context.job_context.workflow_run_context.id.clone();
84    let job_id = context.job_context.id.clone();
85    let number = context.number;
86
87    context
88      .message_sender
89      .update_step_state(WorkflowState::InProgress);
90
91    let docker_name = format!("workflow-step-{}_{}_{}", workflow_id, job_id, number);
92
93    let entrypoint_path = self.create_entrypoint_script(&context).await?;
94    let entrypoint_path_string =
95      entrypoint_path
96        .to_str()
97        .ok_or(error::Error::internal_runtime_error(
98          "Failed to convert PathBuf to &str",
99        ))?;
100
101    let docker = self.generate_docker(
102      docker_name.clone(),
103      entrypoint_path_string.to_string(),
104      &context,
105    )?;
106
107    let mut workflow_state_receiver = context
108      .job_context
109      .workflow_run_context
110      .workflow_state
111      .clone();
112
113    let step_command_result = tokio::select! {
114      res = self.run_docker(docker.clone(), &context) => {
115        match res {
116          Ok(result) => {
117            result
118          },
119          Err(err) => {
120            StepCommandResult::Failed(err)
121          }
122        }
123      }
124      _ = workflow_state_receiver.changed() => {
125        let command = workflow_state_receiver.borrow().clone();
126        if command == WorkflowState::Cancelled {
127          log::info!("Cancel command received");
128          docker
129            .kill()
130            .await
131            .map_err(|err| error::Error::internal_runtime_error(
132              format!("Failed to stop docker container: {}", err.to_string())
133             ))?;
134
135          StepCommandResult::Cancelled
136        } else {
137          StepCommandResult::Failed(error::Error::internal_runtime_error(
138            format!("Unexpected command received: {}", command.to_string())
139          ))
140        }
141      }
142      // Timeout
143      _ = tokio::time::sleep(self.timeout) => {
144        log::info!("Timeout");
145        docker
146          .kill()
147          .await
148          .map_err(|err| error::Error::internal_runtime_error(
149           format!("Failed to stop docker container: {}", err.to_string())
150          ))?;
151
152          StepCommandResult::Cancelled
153      }
154    };
155
156    // End time
157    let ended_at = chrono::Utc::now();
158
159    log::info!("Cleanup files");
160
161    log::info!(
162      "Duration: {}ms",
163      ended_at.timestamp_millis() - started_at.timestamp_millis()
164    );
165
166    // Convert command result to StepResult
167    let (state, error) = match step_command_result {
168      StepCommandResult::ExitStatus(status) => {
169        if status.success() {
170          (WorkflowState::Succeeded, None)
171        } else {
172          (
173            WorkflowState::Failed,
174            Some(error::Error::failed(status.clone())),
175          )
176        }
177      }
178      StepCommandResult::Cancelled => (WorkflowState::Cancelled, None),
179      StepCommandResult::Failed(err) => {
180        log::error!("Step failed: {}", err.to_string());
181
182        (WorkflowState::Failed, Some(err))
183      }
184    };
185
186    context.message_sender.update_step_state(state.clone());
187
188    Ok(StepRunResult {
189      error,
190      state,
191      started_at: Some(started_at),
192      ended_at: Some(ended_at),
193    })
194  }
195
196  async fn run_docker(
197    &self,
198    docker: Docker,
199    context: &StepRunContext,
200  ) -> crate::Result<StepCommandResult> {
201    let sender = context.message_sender.clone();
202
203    // Is workflow canceled?
204    let workflow_state = context
205      .job_context
206      .workflow_run_context
207      .workflow_state
208      .borrow()
209      .clone();
210
211    if workflow_state == WorkflowState::Cancelled {
212      return Ok(StepCommandResult::Cancelled);
213    }
214
215    let mut log_receiver = docker.subscribe_logs();
216
217    let join_handle = tokio::task::spawn(async move {
218      loop {
219        match log_receiver.recv().await {
220          Ok(log) => {
221            sender.send_log(log);
222          }
223          Err(err) => {
224            if err == RecvError::Closed {
225              log::info!("Log receiver closed");
226              break;
227            }
228            log::error!("Failed to receive log: {}", err.to_string());
229          }
230        };
231      }
232    });
233
234    let status = docker.run().await.map_err(|err| {
235      error::Error::internal_runtime_error(format!(
236        "Failed to run docker container: {}",
237        err.to_string()
238      ))
239    })?;
240    join_handle.abort();
241
242    Ok(StepCommandResult::ExitStatus(status))
243  }
244
245  fn generate_docker(
246    &self,
247    docker_name: String,
248    entrypoint_path: String,
249    context: &StepRunContext,
250  ) -> crate::Result<Docker> {
251    let image = self.image.clone();
252
253    let host_user_dir =
254      context
255        .job_context
256        .host_user_dir
257        .to_str()
258        .ok_or(error::Error::internal_runtime_error(
259          "Failed to parse user dir",
260        ))?;
261
262    let container_cache_dir = context
263      .job_context
264      .workflow_run_context
265      .cache_dir
266      .to_str()
267      .ok_or(error::Error::internal_runtime_error(
268        "Failed to parse cache dir",
269      ))?;
270
271    let workspace_dir = self.working_dir.clone();
272    let runner_dir = self.runner_dir.clone();
273    let cache_dir = self.cache_dir.clone();
274
275    let environments = self.get_environments(&context)?;
276
277    let docker_entrypoint_path = format!("{}/entrypoint.sh", runner_dir);
278
279    let mut docker = Docker::new(docker_name)
280      .image(image)
281      .working_dir(workspace_dir.clone())
282      .volume(entrypoint_path, docker_entrypoint_path.clone())
283      .volume(host_user_dir, workspace_dir)
284      .volume(container_cache_dir, cache_dir)
285      .entrypoint(docker_entrypoint_path)
286      .auto_remove(true);
287
288    for (key, value) in environments {
289      docker = docker.environment(key, value);
290    }
291
292    for (from, to) in &context.job_context.working_dir_maps {
293      let from = from.to_str().ok_or(error::Error::internal_runtime_error(
294        "Failed to parse working dir",
295      ))?;
296      docker = docker.volume(from, to);
297    }
298
299    let inner_state = context
300      .job_context
301      .workflow_run_context
302      .runner_inner
303      .clone();
304    for volume in self.volumes.clone() {
305      let from = get_volume_value(&inner_state, &volume.key).ok_or(
306        error::Error::workflow_config_error(format!("Volume `{}` is not defined.", volume.key)),
307      )?;
308      docker = docker.volume(from, volume.to);
309    }
310
311    drop(inner_state);
312
313    if let Some(security_opts) = self.security_opts.clone() {
314      for opt in security_opts {
315        docker = docker.security_opt(opt);
316      }
317    }
318
319    Ok(docker)
320  }
321
322  fn get_environments(&self, context: &StepRunContext) -> crate::Result<HashMap<String, String>> {
323    let mut environments: HashMap<String, String> = HashMap::new();
324
325    // environments options from `workflow.run`
326    if let Some(envs) = context
327      .job_context
328      .workflow_run_context
329      .environments
330      .clone()
331    {
332      for (key, value) in envs {
333        environments.insert(key, value);
334      }
335    }
336
337    // environments from job config
338    // TODO: Add tests
339    if let Some(envs) = self.environments.clone() {
340      for (key, value) in envs {
341        match value {
342          EnvironmentVariable::String(value) => {
343            environments.insert(key, value);
344          }
345          EnvironmentVariable::Number(value) => {
346            environments.insert(key, value.to_string());
347          }
348          EnvironmentVariable::Boolean(value) => {
349            environments.insert(key, value.to_string());
350          }
351        }
352      }
353    }
354
355    let inner_state = context
356      .job_context
357      .workflow_run_context
358      .runner_inner
359      .clone();
360
361    for secret in self.secrets.clone() {
362      let value = get_secret_value(&inner_state, &secret.key).ok_or(
363        error::Error::workflow_config_error(format!("Secret `{}` is not defined.", secret.key)),
364      )?;
365      environments.insert(secret.key, value);
366    }
367
368    drop(inner_state);
369
370    let workspace_dir = self.working_dir.clone();
371    let cache_dir = self.cache_dir.clone();
372    // Coodev preset environments
373    let envs = self.preset_environments(
374      context.number,
375      workspace_dir,
376      cache_dir,
377      &context.job_context,
378    );
379    for (key, value) in envs {
380      environments.insert(key, value);
381    }
382
383    Ok(environments)
384  }
385
386  // preset environment variables
387  fn preset_environments(
388    &self,
389    number: StepNumber,
390    workspace: String,
391    cache_dir: String,
392    job_context: &JobContext,
393  ) -> HashMap<String, String> {
394    let mut environments: Vec<(&str, String)> = vec![];
395
396    environments.push(("COODEV_STEP_ID", number.to_string()));
397
398    environments.push(("COODEV_JOB_ID", job_context.id.to_string()));
399
400    environments.push(("COODEV_WORKSPACE", workspace));
401
402    environments.push(("COODEV_CACHE_DIR", cache_dir));
403
404    environments.push(("CI", "COODEV".to_string()));
405
406    let ctx = &job_context.workflow_run_context;
407
408    environments.push(("COODEV_WORKFLOW_ID", ctx.id.to_string()));
409
410    environments.push(("COODEV_REF", ctx.ref_name.to_string()));
411
412    // TODO: event name
413    // environments.push(("COODEV_EVENT_NAME", event.to_string()));
414
415    environments.push(("COODEV_REPO_OWNER", ctx.repo_owner.to_string()));
416
417    environments.push(("COODEV_REPO_NAME", ctx.repo_name.to_string()));
418
419    environments.push(("COODEV_DEFAULT_BRANCH", ctx.default_branch.to_string()));
420
421    environments.push((
422      "COODEV_REPOSITORY",
423      format!("{}/{}", ctx.repo_owner, ctx.repo_name),
424    ));
425
426    environments.push(("COODEV_SHA", ctx.sha.to_string()));
427
428    environments.push(("COODEV_COMMIT_MESSAGE", ctx.commit_message.to_string()));
429
430    environments.push(("COODEV_COMMITTER", ctx.committer.to_string()));
431
432    environments.push(("COODEV_COMMITTER_EMAIL", ctx.committer_email.to_string()));
433
434    environments.push(("COODEV_IS_PRIVATE", ctx.is_private.to_string()));
435
436    environments.push(("COODEV_ACCESS_TOKEN", ctx.access_token.to_string()));
437
438    if ctx.pr_number.is_some() {
439      environments.push(("COODEV_PR_NUMBER", ctx.pr_number.unwrap().to_string()));
440    } else {
441      environments.push(("COODEV_PR_NUMBER", "".to_string()));
442    }
443
444    environments
445      .into_iter()
446      .map(|(k, v)| (k.to_string(), v))
447      .collect()
448  }
449
450  async fn create_entrypoint_script(&self, context: &StepRunContext) -> crate::Result<PathBuf> {
451    let entrypoint_name = format!("step-entrypoint-{}.sh", context.number);
452    let entrypoint_path = context.job_context.host_working_dir.join(entrypoint_name);
453
454    let mut file;
455    #[cfg(unix)]
456    {
457      file = fs::OpenOptions::new()
458        .create(true)
459        .write(true)
460        .mode(0o777)
461        .open(&entrypoint_path)
462        .await
463        .map_err(|err| error::Error::io_error(err, "Failed to create entrypoint file"))?;
464    }
465    #[cfg(not(unix))]
466    {
467      file = fs::File::create(&entrypoint_path)
468        .await
469        .map_err(|err| error::Error::io_error(err, "Failed to create entrypoint file"))?;
470    }
471
472    file
473      .write(b"#!/bin/sh\n")
474      .await
475      .map_err(|err| error::Error::io_error(err, "Failed to write entrypoint file"))?;
476    file
477      .write_all(self.run.as_bytes())
478      .await
479      .map_err(|err| error::Error::io_error(err, "Failed to write entrypoint file"))?;
480
481    // Fix Text file busy
482    drop(file);
483
484    Ok(entrypoint_path)
485  }
486}