Skip to main content

mk_lib/schema/
task.rs

1use hashbrown::HashMap;
2use indicatif::{
3  HumanDuration,
4  ProgressBar,
5  ProgressStyle,
6};
7use rand::Rng as _;
8use schemars::JsonSchema;
9use serde::{
10  Deserialize,
11  Serialize,
12};
13
14use std::io::BufRead as _;
15use std::sync::mpsc::{
16  channel,
17  Receiver,
18  Sender,
19};
20use std::thread;
21use std::time::{
22  Duration,
23  Instant,
24};
25
26use super::{
27  contains_output_reference,
28  extract_output_references,
29  interpolate_template_string,
30  is_shell_command,
31  CommandRunner,
32  Precondition,
33  Shell,
34  TaskContext,
35  TaskDependency,
36};
37use crate::cache::{
38  compute_fingerprint,
39  expand_patterns_in_dir,
40  CacheEntry,
41};
42use crate::defaults::default_verbose;
43use crate::run_shell_command;
44use crate::secrets::{
45  load_secret_env,
46  merge_optional_secret_settings,
47  resolve_secret_config,
48  SecretSettings,
49};
50use crate::utils::{
51  deserialize_environment,
52  load_env_files_in_dir,
53  resolve_path,
54};
55
56fn default_cache_enabled() -> bool {
57  true
58}
59
60fn default_fail_fast() -> bool {
61  true
62}
63
64#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema)]
65#[serde(rename_all = "snake_case")]
66pub enum ExecutionMode {
67  Sequential,
68  Parallel,
69}
70
71#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
72pub struct TaskExecution {
73  #[serde(default)]
74  pub mode: Option<ExecutionMode>,
75
76  #[serde(default)]
77  pub max_parallel: Option<usize>,
78
79  #[serde(default = "default_fail_fast")]
80  pub fail_fast: bool,
81}
82
83#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
84pub struct TaskCache {
85  #[serde(default = "default_cache_enabled")]
86  pub enabled: bool,
87}
88
89/// This struct represents a task that can be executed. A task can contain multiple
90/// commands that are executed sequentially. A task can also have preconditions that
91/// must be met before the task can be executed.
92#[derive(Debug, Default, Deserialize, JsonSchema)]
93pub struct TaskArgs {
94  /// The commands to run
95  pub commands: Vec<CommandRunner>,
96
97  /// The preconditions that must be met before the task can be executed
98  #[serde(default)]
99  pub preconditions: Vec<Precondition>,
100
101  /// The tasks that must be executed before this task can be executed
102  #[serde(default)]
103  pub depends_on: Vec<TaskDependency>,
104
105  /// The labels for the task
106  #[schemars(with = "std::collections::HashMap<String, String>")]
107  #[serde(default)]
108  pub labels: HashMap<String, String>,
109
110  /// The description of the task
111  #[serde(default)]
112  pub description: String,
113
114  /// The environment variables to set before running the task
115  #[schemars(with = "std::collections::HashMap<String, String>")]
116  #[serde(default, deserialize_with = "deserialize_environment")]
117  pub environment: HashMap<String, String>,
118
119  /// The environment files to load before running the task
120  #[serde(default)]
121  pub env_file: Vec<String>,
122
123  /// Secret paths to load as dotenv-style environment entries before running the task
124  #[serde(default)]
125  pub secrets_path: Vec<String>,
126
127  /// Canonical task secret settings block.
128  #[serde(default)]
129  pub secrets: Option<SecretSettings>,
130
131  /// The path to the secret vault
132  #[serde(default)]
133  pub vault_location: Option<String>,
134
135  /// The path to the private keys used for secret decryption
136  #[serde(default)]
137  pub keys_location: Option<String>,
138
139  /// The key name to use for secret decryption
140  #[serde(default)]
141  pub key_name: Option<String>,
142
143  /// GPG key ID or fingerprint for hardware/passphrase-protected keys (delegates to system gpg)
144  #[serde(default)]
145  pub gpg_key_id: Option<String>,
146
147  /// The shell to use when running the task
148  #[serde(default)]
149  pub shell: Option<Shell>,
150
151  /// Run the commands in parallel
152  /// It should only work if the task are local_run commands
153  #[serde(default)]
154  pub parallel: Option<bool>,
155
156  /// Richer execution configuration
157  #[serde(default)]
158  pub execution: Option<TaskExecution>,
159
160  /// Task caching configuration
161  #[serde(default)]
162  pub cache: Option<TaskCache>,
163
164  /// Files or glob patterns that affect task output
165  #[serde(default)]
166  pub inputs: Vec<String>,
167
168  /// Files produced by the task
169  #[serde(default)]
170  pub outputs: Vec<String>,
171
172  /// Ignore errors if the task fails
173  #[serde(default)]
174  pub ignore_errors: Option<bool>,
175
176  /// Show verbose output
177  #[serde(default)]
178  pub verbose: Option<bool>,
179
180  /// Original legacy secret scalar fields before normalization.
181  #[schemars(skip)]
182  #[serde(skip)]
183  pub(crate) raw_legacy_secret_settings: Option<SecretSettings>,
184
185  /// Original `secrets` block before normalization.
186  #[schemars(skip)]
187  #[serde(skip)]
188  pub(crate) raw_secrets: Option<SecretSettings>,
189}
190
191#[derive(Debug, Deserialize, JsonSchema)]
192#[serde(untagged)]
193/// A task definition: either a command string shorthand or a full task object.
194pub enum Task {
195  String(String),
196  Task(Box<TaskArgs>),
197}
198
199#[derive(Debug)]
200pub struct CommandResult {
201  index: usize,
202  success: bool,
203  message: String,
204}
205
206impl Task {
207  pub fn run(&self, context: &mut TaskContext) -> anyhow::Result<()> {
208    match self {
209      Task::String(command) => self.execute(context, command),
210      Task::Task(args) => args.run(context),
211    }
212  }
213
214  fn execute(&self, context: &mut TaskContext, command: &str) -> anyhow::Result<()> {
215    assert!(!command.is_empty());
216
217    TaskArgs {
218      commands: vec![CommandRunner::CommandRun(command.to_string())],
219      ..Default::default()
220    }
221    .run(context)
222  }
223}
224
225impl TaskArgs {
226  pub(crate) fn normalized_secret_settings(&self) -> Option<SecretSettings> {
227    merge_optional_secret_settings(
228      Some(SecretSettings::from_legacy(
229        self.vault_location.clone(),
230        self.keys_location.clone(),
231        self.key_name.clone(),
232        self.gpg_key_id.clone(),
233        self.secrets_path.clone(),
234      )),
235      self.secrets.clone(),
236    )
237    .filter(|settings| !settings.is_empty())
238  }
239
240  pub(crate) fn validation_legacy_secret_settings(&self) -> SecretSettings {
241    self.raw_legacy_secret_settings.clone().unwrap_or_else(|| {
242      SecretSettings::from_legacy(
243        self.vault_location.clone(),
244        self.keys_location.clone(),
245        self.key_name.clone(),
246        self.gpg_key_id.clone(),
247        self.secrets_path.clone(),
248      )
249    })
250  }
251
252  pub(crate) fn normalize_secret_settings(&mut self) {
253    if self.raw_legacy_secret_settings.is_none() {
254      self.raw_legacy_secret_settings = Some(SecretSettings::from_legacy(
255        self.vault_location.clone(),
256        self.keys_location.clone(),
257        self.key_name.clone(),
258        self.gpg_key_id.clone(),
259        self.secrets_path.clone(),
260      ));
261    }
262    if self.raw_secrets.is_none() {
263      self.raw_secrets = self.secrets.clone();
264    }
265
266    self.secrets = self.normalized_secret_settings();
267    if let Some(secrets) = &self.secrets {
268      self.vault_location = secrets.vault_location.clone();
269      self.keys_location = secrets.keys_location.clone();
270      self.key_name = secrets.key_name.clone();
271      self.gpg_key_id = secrets.gpg_key_id.clone();
272      self.secrets_path = secrets.secrets_path.clone().unwrap_or_default();
273    }
274  }
275
276  pub fn run(&self, context: &mut TaskContext) -> anyhow::Result<()> {
277    assert!(!self.commands.is_empty());
278
279    // Validate parallel execution requirements early
280    self.validate_parallel_commands()?;
281
282    let started = Instant::now();
283    let tick_interval = Duration::from_millis(80);
284
285    if let Some(shell) = &self.shell {
286      context.set_shell(shell);
287    }
288
289    if let Some(ignore_errors) = &self.ignore_errors {
290      context.set_ignore_errors(*ignore_errors);
291    }
292
293    if let Some(verbose) = &self.verbose {
294      context.set_verbose(*verbose);
295    }
296
297    let config_base_dir = self.config_base_dir(context);
298    let task_secret_config = resolve_secret_config(
299      &config_base_dir,
300      None,
301      self.secrets.as_ref(),
302      context.task_root.secrets.as_ref(),
303    );
304    context.set_secret_config(task_secret_config.clone());
305
306    // Load environment variables from root and task environments and env files.
307    if !context.is_nested {
308      let root_env = context.task_root.environment.clone();
309      let root_env_files = load_env_files_in_dir(&context.task_root.env_file, &config_base_dir)?;
310      context.extend_env_vars(root_env);
311      context.extend_env_vars(root_env_files);
312    }
313
314    // Load environment variables from the task environment and env files field
315    let defined_env = self.load_static_env(context)?;
316    let additional_env = self.load_env_file(context)?;
317    let secret_env = self.load_secret_env(context)?;
318
319    context.extend_env_vars(defined_env);
320    context.extend_env_vars(additional_env);
321    context.extend_env_vars(secret_env);
322
323    let mut rng = rand::thread_rng();
324    // Spinners can be found here:
325    // https://github.com/sindresorhus/cli-spinners/blob/main/spinners.json
326    let pb_style =
327      ProgressStyle::with_template("{spinner:.green} [{prefix:.bold.dim}] {wide_msg:.cyan/blue} ")?
328        .tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏⦿");
329
330    let depends_on_pb = context.multi.add(ProgressBar::new(self.depends_on.len() as u64));
331
332    if !self.depends_on.is_empty() {
333      depends_on_pb.set_style(pb_style.clone());
334      depends_on_pb.set_message("Running task dependencies...");
335      depends_on_pb.enable_steady_tick(tick_interval);
336      for (i, dependency) in self.depends_on.iter().enumerate() {
337        thread::sleep(Duration::from_millis(rng.gen_range(40..300)));
338        depends_on_pb.set_prefix(format!("{}/{}", i + 1, self.depends_on.len()));
339        dependency.run(context)?;
340        depends_on_pb.inc(1);
341      }
342
343      let message = format!("Dependencies completed in {}.", HumanDuration(started.elapsed()));
344      if context.is_nested {
345        depends_on_pb.finish_and_clear();
346      } else {
347        depends_on_pb.finish_with_message(message);
348      }
349    }
350
351    let precondition_pb = context
352      .multi
353      .add(ProgressBar::new(self.preconditions.len() as u64));
354
355    if !self.preconditions.is_empty() {
356      precondition_pb.set_style(pb_style.clone());
357      precondition_pb.set_message("Running task precondition...");
358      precondition_pb.enable_steady_tick(tick_interval);
359      for (i, precondition) in self.preconditions.iter().enumerate() {
360        thread::sleep(Duration::from_millis(rng.gen_range(40..300)));
361        precondition_pb.set_prefix(format!("{}/{}", i + 1, self.preconditions.len()));
362        precondition.execute(context)?;
363        precondition_pb.inc(1);
364      }
365
366      let message = format!("Preconditions completed in {}.", HumanDuration(started.elapsed()));
367      if context.is_nested {
368        precondition_pb.finish_and_clear();
369      } else {
370        precondition_pb.finish_with_message(message);
371      }
372    }
373
374    if self.should_skip_from_cache(context)? {
375      context.emit_event(&serde_json::json!({
376        "event": "task_skipped",
377        "task": context.current_task_name.clone().unwrap_or_else(|| "<task>".to_string()),
378        "reason": "cache_hit",
379      }))?;
380      return Ok(());
381    }
382
383    if self.is_parallel() {
384      self.execute_commands_parallel(context)?;
385    } else {
386      let command_pb = context.multi.add(ProgressBar::new(self.commands.len() as u64));
387      command_pb.set_style(pb_style);
388      command_pb.set_message("Running task command...");
389      command_pb.enable_steady_tick(tick_interval);
390      for (i, command) in self.commands.iter().enumerate() {
391        thread::sleep(Duration::from_millis(rng.gen_range(100..400)));
392        command_pb.set_prefix(format!("{}/{}", i + 1, self.commands.len()));
393        self.refresh_output_env(context)?;
394        command.execute(context)?;
395        command_pb.inc(1);
396      }
397
398      let message = format!("Commands completed in {}.", HumanDuration(started.elapsed()));
399      if context.is_nested {
400        command_pb.finish_and_clear();
401      } else {
402        command_pb.finish_with_message(message);
403      }
404    }
405
406    self.update_cache(context)?;
407
408    Ok(())
409  }
410
411  /// Validate if the task can be run in parallel
412  fn validate_parallel_commands(&self) -> anyhow::Result<()> {
413    if !self.is_parallel() {
414      return Ok(());
415    }
416
417    for command in &self.commands {
418      match command {
419        CommandRunner::LocalRun(local_run) if local_run.is_parallel_safe() => continue,
420        CommandRunner::LocalRun(local_run) if local_run.interactive_enabled() => {
421          return Err(anyhow::anyhow!(
422            "Interactive local commands cannot be run in parallel"
423          ))
424        },
425        CommandRunner::LocalRun(_) => {
426          return Err(anyhow::anyhow!(
427            "Local commands with `retrigger: true` cannot be run in parallel"
428          ))
429        },
430        _ => {
431          return Err(anyhow::anyhow!(
432            "Parallel execution is only supported for non-interactive local commands"
433          ))
434        },
435      }
436    }
437    Ok(())
438  }
439
440  /// Execute the commands in parallel
441  fn execute_commands_parallel(&self, context: &TaskContext) -> anyhow::Result<()> {
442    context.clear_cancellation();
443    let (tx, rx): (Sender<CommandResult>, Receiver<CommandResult>) = channel();
444    let mut handles = vec![];
445    let command_count = self.commands.len();
446    let max_parallel = self.max_parallel().min(command_count.max(1));
447    let fail_fast = self.fail_fast();
448    let command_pb = context.multi.add(ProgressBar::new(command_count as u64));
449    command_pb.set_style(ProgressStyle::with_template(
450      "{spinner:.green} [{prefix:.bold.dim}] {wide_msg:.cyan/blue} ",
451    )?);
452    command_pb.set_prefix("?/?");
453    command_pb.set_message("Running task commands in parallel...");
454    command_pb.enable_steady_tick(Duration::from_millis(80));
455    let mut failures = Vec::new();
456
457    // Clone all commands upfront to avoid borrowing issues
458    let commands: Vec<_> = self.commands.to_vec();
459
460    // Track results in order
461    let mut completed = 0;
462
463    let mut iter = commands.into_iter().enumerate();
464    let mut running = 0usize;
465    let mut stop_scheduling = false;
466
467    while completed < command_count {
468      while !stop_scheduling && running < max_parallel {
469        let Some((i, command)) = iter.next() else {
470          break;
471        };
472
473        let tx = tx.clone();
474        let context = context.clone();
475
476        let handle = thread::spawn(move || {
477          let result = match command.execute_cancellable(&context) {
478            Ok(_) => CommandResult {
479              index: i,
480              success: true,
481              message: format!("Command {} completed successfully", i + 1),
482            },
483            Err(e) => CommandResult {
484              index: i,
485              success: false,
486              message: format!("Command {} failed: {}", i + 1, e),
487            },
488          };
489          tx.send(result).unwrap();
490        });
491
492        handles.push(handle);
493        running += 1;
494      }
495
496      if running == 0 {
497        break;
498      }
499
500      match rx.recv() {
501        Ok(result) => {
502          running -= 1;
503          let index = result.index;
504          if !result.success && !context.ignore_errors() {
505            failures.push(result.message);
506            if fail_fast {
507              stop_scheduling = true;
508              context.request_cancellation();
509            }
510          }
511
512          completed += 1;
513          command_pb.set_prefix(format!("{}/{}", completed, command_count));
514          command_pb.inc(1);
515
516          command_pb.set_message(format!(
517            "Running task commands in parallel (completed {})",
518            index + 1
519          ));
520        },
521        Err(e) => {
522          command_pb.finish_with_message("Error receiving command results");
523          return Err(anyhow::anyhow!("Channel error: {}", e));
524        },
525      }
526    }
527
528    // Wait for all threads to complete
529    for handle in handles {
530      handle.join().unwrap();
531    }
532
533    context.clear_cancellation();
534
535    if !failures.is_empty() {
536      command_pb.finish_with_message("Some commands failed");
537
538      // Sort failures by command index for clearer error reporting
539      failures.sort();
540      return Err(anyhow::anyhow!("Failed commands:\n{}", failures.join("\n")));
541    }
542
543    let message = "Commands completed in parallel";
544    if context.is_nested {
545      command_pb.finish_and_clear();
546    } else {
547      command_pb.finish_with_message(message);
548    }
549
550    Ok(())
551  }
552
553  fn load_static_env(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
554    let mut local_env: HashMap<String, String> = HashMap::new();
555    for (key, value) in &self.environment {
556      if contains_output_reference(value) {
557        continue;
558      }
559      let value = self.get_env_value(context, value)?;
560      local_env.insert(key.clone(), value);
561    }
562
563    Ok(local_env)
564  }
565
566  fn refresh_output_env(&self, context: &mut TaskContext) -> anyhow::Result<()> {
567    let mut updated_env = HashMap::new();
568
569    for (key, value) in &self.environment {
570      let output_refs = extract_output_references(value);
571      if output_refs.is_empty() {
572        continue;
573      }
574
575      let mut all_outputs_ready = true;
576      for output_name in &output_refs {
577        if !context.has_task_output(output_name)? {
578          all_outputs_ready = false;
579          break;
580        }
581      }
582      if !all_outputs_ready {
583        continue;
584      }
585
586      updated_env.insert(key.clone(), self.get_env_value(context, value)?);
587    }
588
589    context.extend_env_vars(updated_env);
590    Ok(())
591  }
592
593  fn load_env_file(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
594    load_env_files_in_dir(&self.env_file, &self.config_base_dir(context))
595  }
596
597  fn load_secret_env(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
598    let secret_config = context
599      .secret_config
600      .as_ref()
601      .ok_or_else(|| anyhow::anyhow!("Secret config missing from task context"))?;
602    load_secret_env(secret_config)
603  }
604
605  fn get_env_value(&self, context: &TaskContext, value_in: &str) -> anyhow::Result<String> {
606    if is_shell_command(value_in)? {
607      let verbose = self.verbose();
608      let mut cmd = self
609        .shell
610        .as_ref()
611        .map(|shell| shell.proc())
612        .unwrap_or_else(|| context.shell().proc());
613      let output = run_shell_command!(value_in, cmd, verbose);
614      Ok(output)
615    } else if super::is_template_command(value_in)? || contains_output_reference(value_in) {
616      Ok(interpolate_template_string(value_in, context)?)
617    } else {
618      Ok(value_in.to_string())
619    }
620  }
621
622  fn verbose(&self) -> bool {
623    self.verbose.unwrap_or(default_verbose())
624  }
625
626  pub(crate) fn execution_mode(&self) -> ExecutionMode {
627    self
628      .execution
629      .as_ref()
630      .and_then(|execution| execution.mode.clone())
631      .or_else(|| {
632        self.parallel.map(|parallel| {
633          if parallel {
634            ExecutionMode::Parallel
635          } else {
636            ExecutionMode::Sequential
637          }
638        })
639      })
640      .unwrap_or(ExecutionMode::Sequential)
641  }
642
643  pub(crate) fn is_parallel(&self) -> bool {
644    self.execution_mode().is_parallel()
645  }
646
647  pub(crate) fn max_parallel(&self) -> usize {
648    self
649      .execution
650      .as_ref()
651      .and_then(|execution| execution.max_parallel)
652      .unwrap_or(self.commands.len().max(1))
653  }
654
655  pub(crate) fn fail_fast(&self) -> bool {
656    self
657      .execution
658      .as_ref()
659      .map(|execution| execution.fail_fast)
660      .unwrap_or(true)
661  }
662
663  fn cache_enabled(&self) -> bool {
664    self.cache.as_ref().map(|cache| cache.enabled).unwrap_or(false)
665  }
666
667  fn should_skip_from_cache(&self, context: &TaskContext) -> anyhow::Result<bool> {
668    if context.force || !self.cache_enabled() || self.outputs.is_empty() {
669      return Ok(false);
670    }
671
672    let resolved_outputs = self.resolve_output_paths(context)?;
673    let outputs_exist = resolved_outputs.iter().all(|output| output.exists());
674    if !outputs_exist {
675      return Ok(false);
676    }
677
678    let env_vars = sorted_env_vars(&context.env_vars);
679    let inputs = self.resolve_input_paths(context)?;
680    let mut env_files = self.resolve_env_file_paths(context);
681    env_files.extend(self.resolve_secret_paths(context));
682    env_files.sort();
683    env_files.dedup();
684    let fingerprint = compute_fingerprint(
685      &context
686        .current_task_name
687        .clone()
688        .unwrap_or_else(|| "<task>".to_string()),
689      &stable_task_debug(self),
690      &env_vars,
691      &inputs,
692      &env_files,
693      &resolved_outputs,
694    )?;
695
696    let store = context
697      .cache_store
698      .lock()
699      .map_err(|e| anyhow::anyhow!("Failed to lock cache store - {}", e))?;
700    let Some(entry) = store.tasks.get(&fingerprint_task_key(context, self)) else {
701      return Ok(false);
702    };
703
704    Ok(entry.fingerprint == fingerprint)
705  }
706
707  fn update_cache(&self, context: &TaskContext) -> anyhow::Result<()> {
708    if !self.cache_enabled() || self.outputs.is_empty() {
709      return Ok(());
710    }
711
712    let env_vars = sorted_env_vars(&context.env_vars);
713    let inputs = self.resolve_input_paths(context)?;
714    let mut env_files = self.resolve_env_file_paths(context);
715    env_files.extend(self.resolve_secret_paths(context));
716    env_files.sort();
717    env_files.dedup();
718    let resolved_outputs = self.resolve_output_paths(context)?;
719    let fingerprint = compute_fingerprint(
720      &context
721        .current_task_name
722        .clone()
723        .unwrap_or_else(|| "<task>".to_string()),
724      &stable_task_debug(self),
725      &env_vars,
726      &inputs,
727      &env_files,
728      &resolved_outputs,
729    )?;
730    let key = fingerprint_task_key(context, self);
731
732    {
733      let mut store = context
734        .cache_store
735        .lock()
736        .map_err(|e| anyhow::anyhow!("Failed to lock cache store - {}", e))?;
737      store.tasks.insert(
738        key,
739        CacheEntry {
740          fingerprint,
741          outputs: resolved_outputs
742            .iter()
743            .map(|path| path.to_string_lossy().into_owned())
744            .collect(),
745          updated_at: chrono::Utc::now().to_rfc3339(),
746        },
747      );
748      store.save_in_dir(&context.task_root.cache_base_dir())?;
749    }
750
751    Ok(())
752  }
753
754  pub(crate) fn config_base_dir_from_root(&self, root: &super::TaskRoot) -> std::path::PathBuf {
755    root.config_base_dir()
756  }
757
758  pub(crate) fn task_base_dir_from_root(&self, root: &super::TaskRoot) -> std::path::PathBuf {
759    let config_base_dir = self.config_base_dir_from_root(root);
760    let mut work_dirs = self
761      .commands
762      .iter()
763      .filter_map(|command| match command {
764        CommandRunner::LocalRun(local_run) => local_run.work_dir.as_ref(),
765        _ => None,
766      })
767      .map(|work_dir| resolve_path(&config_base_dir, work_dir))
768      .collect::<Vec<_>>();
769
770    work_dirs.sort();
771    work_dirs.dedup();
772
773    if work_dirs.len() == 1 {
774      work_dirs.remove(0)
775    } else {
776      config_base_dir
777    }
778  }
779
780  fn config_base_dir(&self, context: &TaskContext) -> std::path::PathBuf {
781    self.config_base_dir_from_root(&context.task_root)
782  }
783
784  fn task_base_dir(&self, context: &TaskContext) -> std::path::PathBuf {
785    self.task_base_dir_from_root(&context.task_root)
786  }
787
788  fn resolve_input_paths(&self, context: &TaskContext) -> anyhow::Result<Vec<std::path::PathBuf>> {
789    expand_patterns_in_dir(&self.task_base_dir(context), &self.inputs)
790  }
791
792  fn resolve_output_paths(&self, context: &TaskContext) -> anyhow::Result<Vec<std::path::PathBuf>> {
793    let base_dir = self.task_base_dir(context);
794    Ok(
795      self
796        .outputs
797        .iter()
798        .map(|output| resolve_path(&base_dir, output))
799        .collect(),
800    )
801  }
802
803  fn resolve_env_file_paths(&self, context: &TaskContext) -> Vec<std::path::PathBuf> {
804    let config_base_dir = self.config_base_dir(context);
805    let mut env_files = context
806      .task_root
807      .env_file
808      .iter()
809      .chain(self.env_file.iter())
810      .map(|env_file| resolve_path(&config_base_dir, env_file))
811      .collect::<Vec<_>>();
812    env_files.sort();
813    env_files.dedup();
814    env_files
815  }
816
817  fn resolve_secret_paths(&self, context: &TaskContext) -> Vec<std::path::PathBuf> {
818    let vault_location = context
819      .secret_config
820      .as_ref()
821      .map(|config| config.vault_location.clone())
822      .unwrap_or_else(|| resolve_path(&self.config_base_dir(context), "./.mk/vault"));
823    let mut secret_paths = context
824      .task_root
825      .secrets
826      .as_ref()
827      .and_then(|settings| settings.secrets_path.as_ref())
828      .into_iter()
829      .flatten()
830      .chain(
831        context
832          .secret_config
833          .as_ref()
834          .map(|config| config.secrets_path.iter())
835          .into_iter()
836          .flatten(),
837      )
838      .map(|secret_path| vault_location.join(secret_path))
839      .collect::<Vec<_>>();
840    secret_paths.sort();
841    secret_paths.dedup();
842    secret_paths
843  }
844}
845
846impl ExecutionMode {
847  fn is_parallel(&self) -> bool {
848    matches!(self, ExecutionMode::Parallel)
849  }
850}
851
852fn sorted_env_vars(env_vars: &HashMap<String, String>) -> Vec<(String, String)> {
853  let mut pairs: Vec<_> = env_vars
854    .iter()
855    .map(|(key, value)| (key.clone(), value.clone()))
856    .collect();
857  pairs.sort();
858  pairs
859}
860
861fn fingerprint_task_key(context: &TaskContext, task: &TaskArgs) -> String {
862  context.current_task_name.clone().unwrap_or_else(|| {
863    if !task.description.is_empty() {
864      task.description.clone()
865    } else {
866      format!("{:?}", task.commands)
867    }
868  })
869}
870
871fn stable_task_debug(task: &TaskArgs) -> String {
872  let mut labels: Vec<_> = task
873    .labels
874    .iter()
875    .map(|(key, value)| (key.clone(), value.clone()))
876    .collect();
877  labels.sort();
878
879  let mut environment: Vec<_> = task
880    .environment
881    .iter()
882    .map(|(key, value)| (key.clone(), value.clone()))
883    .collect();
884  environment.sort();
885
886  let mut secrets_path = task.secrets_path.clone();
887  secrets_path.sort();
888
889  format!(
890    "commands={:?};preconditions={:?};depends_on={:?};labels={:?};description={:?};environment={:?};env_file={:?};secrets_path={:?};vault_location={:?};keys_location={:?};key_name={:?};shell={:?};execution_mode={:?};max_parallel={:?};fail_fast={};cache_enabled={};inputs={:?};outputs={:?};ignore_errors={:?};verbose={:?}",
891    task.commands,
892    task.preconditions,
893    task.depends_on,
894    labels,
895    task.description,
896    environment,
897    task.env_file,
898    secrets_path,
899    task.vault_location,
900    task.keys_location,
901    task.key_name,
902    task.shell,
903    task.execution_mode(),
904    task.execution.as_ref().and_then(|execution| execution.max_parallel),
905    task.fail_fast(),
906    task.cache_enabled(),
907    task.inputs,
908    task.outputs,
909    task.ignore_errors,
910    task.verbose
911  )
912}
913
914#[cfg(test)]
915mod test {
916  use super::*;
917
918  #[test]
919  fn test_task_1() -> anyhow::Result<()> {
920    {
921      let yaml = "
922        commands:
923          - command: echo \"Hello, World!\"
924            ignore_errors: false
925            verbose: false
926        depends_on:
927          - name: task1
928        description: This is a task
929        environment:
930          FOO: bar
931        env_file:
932          - test.env
933          - test2.env
934      ";
935
936      let task = serde_yaml::from_str::<Task>(yaml)?;
937
938      if let Task::Task(task) = &task {
939        if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
940          assert_eq!(local_run.command, "echo \"Hello, World!\"");
941          assert_eq!(local_run.work_dir, None);
942          assert_eq!(local_run.ignore_errors, Some(false));
943          assert_eq!(local_run.verbose, Some(false));
944        }
945
946        if let TaskDependency::TaskDependency(args) = &task.depends_on[0] {
947          assert_eq!(args.name, "task1");
948        }
949
950        assert_eq!(task.labels.len(), 0);
951        assert_eq!(task.description, "This is a task");
952        assert_eq!(task.environment.len(), 1);
953        assert_eq!(task.env_file.len(), 2);
954      } else {
955        panic!("Expected Task::Task");
956      }
957
958      Ok(())
959    }
960  }
961
962  #[test]
963  fn test_task_2() -> anyhow::Result<()> {
964    {
965      let yaml = "
966        commands:
967          - command: echo 'Hello, World!'
968            ignore_errors: false
969            verbose: false
970        description: This is a task
971        environment:
972          FOO: bar
973          BAR: foo
974      ";
975
976      let task = serde_yaml::from_str::<Task>(yaml)?;
977
978      if let Task::Task(task) = &task {
979        if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
980          assert_eq!(local_run.command, "echo 'Hello, World!'");
981          assert_eq!(local_run.work_dir, None);
982          assert_eq!(local_run.ignore_errors, Some(false));
983          assert_eq!(local_run.verbose, Some(false));
984        }
985
986        assert_eq!(task.description, "This is a task");
987        assert_eq!(task.depends_on.len(), 0);
988        assert_eq!(task.labels.len(), 0);
989        assert_eq!(task.env_file.len(), 0);
990        assert_eq!(task.environment.len(), 2);
991      } else {
992        panic!("Expected Task::Task");
993      }
994
995      Ok(())
996    }
997  }
998
999  #[test]
1000  fn test_task_3() -> anyhow::Result<()> {
1001    {
1002      let yaml = "
1003        commands:
1004          - command: echo 'Hello, World!'
1005      ";
1006
1007      let task = serde_yaml::from_str::<Task>(yaml)?;
1008
1009      if let Task::Task(task) = &task {
1010        if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
1011          assert_eq!(local_run.command, "echo 'Hello, World!'");
1012          assert_eq!(local_run.work_dir, None);
1013          assert_eq!(local_run.ignore_errors, None);
1014          assert_eq!(local_run.verbose, None);
1015        }
1016
1017        assert_eq!(task.description.len(), 0);
1018        assert_eq!(task.depends_on.len(), 0);
1019        assert_eq!(task.labels.len(), 0);
1020        assert_eq!(task.env_file.len(), 0);
1021        assert_eq!(task.environment.len(), 0);
1022      } else {
1023        panic!("Expected Task::Task");
1024      }
1025
1026      Ok(())
1027    }
1028  }
1029
1030  #[test]
1031  fn test_task_4() -> anyhow::Result<()> {
1032    {
1033      let yaml = "
1034        commands:
1035          - container_command:
1036              - echo
1037              - Hello, World!
1038            image: docker.io/library/hello-world:latest
1039      ";
1040
1041      let task = serde_yaml::from_str::<Task>(yaml)?;
1042
1043      if let Task::Task(task) = &task {
1044        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1045          assert_eq!(container_run.container_command.len(), 2);
1046          assert_eq!(container_run.container_command[0], "echo");
1047          assert_eq!(container_run.container_command[1], "Hello, World!");
1048          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1049          assert_eq!(container_run.mounted_paths, Vec::<String>::new());
1050          assert_eq!(container_run.ignore_errors, None);
1051          assert_eq!(container_run.verbose, None);
1052        }
1053
1054        assert_eq!(task.description.len(), 0);
1055        assert_eq!(task.depends_on.len(), 0);
1056        assert_eq!(task.labels.len(), 0);
1057        assert_eq!(task.env_file.len(), 0);
1058        assert_eq!(task.environment.len(), 0);
1059      } else {
1060        panic!("Expected Task::Task");
1061      }
1062
1063      Ok(())
1064    }
1065  }
1066
1067  #[test]
1068  fn test_task_5() -> anyhow::Result<()> {
1069    {
1070      let yaml = "
1071        commands:
1072          - container_command:
1073              - echo
1074              - Hello, World!
1075            image: docker.io/library/hello-world:latest
1076            mounted_paths:
1077              - /tmp
1078              - /var/tmp
1079      ";
1080
1081      let task = serde_yaml::from_str::<Task>(yaml)?;
1082
1083      if let Task::Task(task) = &task {
1084        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1085          assert_eq!(container_run.container_command.len(), 2);
1086          assert_eq!(container_run.container_command[0], "echo");
1087          assert_eq!(container_run.container_command[1], "Hello, World!");
1088          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1089          assert_eq!(container_run.mounted_paths, vec!["/tmp", "/var/tmp"]);
1090          assert_eq!(container_run.ignore_errors, None);
1091          assert_eq!(container_run.verbose, None);
1092        }
1093
1094        assert_eq!(task.description.len(), 0);
1095        assert_eq!(task.depends_on.len(), 0);
1096        assert_eq!(task.labels.len(), 0);
1097        assert_eq!(task.env_file.len(), 0);
1098        assert_eq!(task.environment.len(), 0);
1099      } else {
1100        panic!("Expected Task::Task");
1101      }
1102
1103      Ok(())
1104    }
1105  }
1106
1107  #[test]
1108  fn test_task_6() -> anyhow::Result<()> {
1109    {
1110      let yaml = "
1111        commands:
1112          - container_command:
1113              - echo
1114              - Hello, World!
1115            image: docker.io/library/hello-world:latest
1116            mounted_paths:
1117              - /tmp
1118              - /var/tmp
1119            ignore_errors: true
1120      ";
1121
1122      let task = serde_yaml::from_str::<Task>(yaml)?;
1123
1124      if let Task::Task(task) = &task {
1125        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1126          assert_eq!(container_run.container_command.len(), 2);
1127          assert_eq!(container_run.container_command[0], "echo");
1128          assert_eq!(container_run.container_command[1], "Hello, World!");
1129          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1130          assert_eq!(container_run.mounted_paths, vec!["/tmp", "/var/tmp"]);
1131          assert_eq!(container_run.ignore_errors, Some(true));
1132          assert_eq!(container_run.verbose, None);
1133        }
1134
1135        assert_eq!(task.description.len(), 0);
1136        assert_eq!(task.depends_on.len(), 0);
1137        assert_eq!(task.labels.len(), 0);
1138        assert_eq!(task.env_file.len(), 0);
1139        assert_eq!(task.environment.len(), 0);
1140      } else {
1141        panic!("Expected Task::Task");
1142      }
1143
1144      Ok(())
1145    }
1146  }
1147
1148  #[test]
1149  fn test_task_7() -> anyhow::Result<()> {
1150    {
1151      let yaml = "
1152        commands:
1153          - container_command:
1154              - echo
1155              - Hello, World!
1156            image: docker.io/library/hello-world:latest
1157            verbose: false
1158      ";
1159
1160      let task = serde_yaml::from_str::<Task>(yaml)?;
1161
1162      if let Task::Task(task) = &task {
1163        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1164          assert_eq!(container_run.container_command.len(), 2);
1165          assert_eq!(container_run.container_command[0], "echo");
1166          assert_eq!(container_run.container_command[1], "Hello, World!");
1167          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1168          assert_eq!(container_run.mounted_paths, Vec::<String>::new());
1169          assert_eq!(container_run.ignore_errors, None);
1170          assert_eq!(container_run.verbose, Some(false));
1171        }
1172
1173        assert_eq!(task.description.len(), 0);
1174        assert_eq!(task.depends_on.len(), 0);
1175        assert_eq!(task.labels.len(), 0);
1176        assert_eq!(task.env_file.len(), 0);
1177        assert_eq!(task.environment.len(), 0);
1178      } else {
1179        panic!("Expected Task::Task");
1180      }
1181
1182      Ok(())
1183    }
1184  }
1185
1186  #[test]
1187  fn test_task_8() -> anyhow::Result<()> {
1188    {
1189      let yaml = "
1190        commands:
1191          - task: task1
1192      ";
1193
1194      let task = serde_yaml::from_str::<Task>(yaml)?;
1195
1196      if let Task::Task(task) = &task {
1197        if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1198          assert_eq!(task_run.task, "task1");
1199          assert_eq!(task_run.ignore_errors, None);
1200          assert_eq!(task_run.verbose, None);
1201        }
1202
1203        assert_eq!(task.description.len(), 0);
1204        assert_eq!(task.depends_on.len(), 0);
1205        assert_eq!(task.labels.len(), 0);
1206        assert_eq!(task.env_file.len(), 0);
1207        assert_eq!(task.environment.len(), 0);
1208      } else {
1209        panic!("Expected Task::Task");
1210      }
1211
1212      Ok(())
1213    }
1214  }
1215
1216  #[test]
1217  fn test_task_9() -> anyhow::Result<()> {
1218    {
1219      let yaml = "
1220        commands:
1221          - task: task1
1222            verbose: true
1223      ";
1224
1225      let task = serde_yaml::from_str::<Task>(yaml)?;
1226
1227      if let Task::Task(task) = &task {
1228        if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1229          assert_eq!(task_run.task, "task1");
1230          assert_eq!(task_run.ignore_errors, None);
1231          assert_eq!(task_run.verbose, Some(true));
1232        }
1233
1234        assert_eq!(task.description.len(), 0);
1235        assert_eq!(task.depends_on.len(), 0);
1236        assert_eq!(task.labels.len(), 0);
1237        assert_eq!(task.env_file.len(), 0);
1238        assert_eq!(task.environment.len(), 0);
1239      } else {
1240        panic!("Expected Task::Task");
1241      }
1242
1243      Ok(())
1244    }
1245  }
1246
1247  #[test]
1248  fn test_task_10() -> anyhow::Result<()> {
1249    {
1250      let yaml = "
1251        commands:
1252          - task: task1
1253            ignore_errors: true
1254      ";
1255
1256      let task = serde_yaml::from_str::<Task>(yaml)?;
1257
1258      if let Task::Task(task) = &task {
1259        if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1260          assert_eq!(task_run.task, "task1");
1261          assert_eq!(task_run.ignore_errors, Some(true));
1262          assert_eq!(task_run.verbose, None);
1263        }
1264
1265        assert_eq!(task.description.len(), 0);
1266        assert_eq!(task.depends_on.len(), 0);
1267        assert_eq!(task.labels.len(), 0);
1268        assert_eq!(task.env_file.len(), 0);
1269        assert_eq!(task.environment.len(), 0);
1270      } else {
1271        panic!("Expected Task::Task");
1272      }
1273
1274      Ok(())
1275    }
1276  }
1277
1278  #[test]
1279  fn test_task_11() -> anyhow::Result<()> {
1280    {
1281      let yaml = "
1282        echo 'Hello, World!'
1283      ";
1284
1285      let task = serde_yaml::from_str::<Task>(yaml)?;
1286
1287      if let Task::String(task) = &task {
1288        assert_eq!(task, "echo 'Hello, World!'");
1289      } else {
1290        panic!("Expected Task::String");
1291      }
1292
1293      Ok(())
1294    }
1295  }
1296
1297  #[test]
1298  fn test_task_12() -> anyhow::Result<()> {
1299    {
1300      let yaml = "
1301        'true'
1302      ";
1303
1304      let task = serde_yaml::from_str::<Task>(yaml)?;
1305
1306      if let Task::String(task) = &task {
1307        assert_eq!(task, "true");
1308      } else {
1309        panic!("Expected Task::String");
1310      }
1311
1312      Ok(())
1313    }
1314  }
1315
1316  #[test]
1317  fn test_task_13() -> anyhow::Result<()> {
1318    {
1319      let yaml = "
1320        commands: []
1321        environment:
1322          FOO: bar
1323          BAR: foo
1324          KEY: 42
1325          PIS: 3.14
1326      ";
1327
1328      let task = serde_yaml::from_str::<Task>(yaml)?;
1329
1330      if let Task::Task(task) = &task {
1331        assert_eq!(task.environment.len(), 4);
1332        assert_eq!(task.environment.get("FOO").unwrap(), "bar");
1333        assert_eq!(task.environment.get("BAR").unwrap(), "foo");
1334        assert_eq!(task.environment.get("KEY").unwrap(), "42");
1335      } else {
1336        panic!("Expected Task::Task");
1337      }
1338
1339      Ok(())
1340    }
1341  }
1342
1343  #[test]
1344  fn test_task_14() -> anyhow::Result<()> {
1345    let yaml = "
1346      commands: []
1347      secrets_path:
1348        - app/common
1349      vault_location: ./.mk/vault
1350      keys_location: ./.mk/keys
1351      key_name: team
1352      environment:
1353        SECRET_VALUE: ${{ secrets.app/password }}
1354    ";
1355
1356    let task = serde_yaml::from_str::<Task>(yaml)?;
1357
1358    if let Task::Task(task) = &task {
1359      assert_eq!(task.secrets_path, vec!["app/common"]);
1360      assert_eq!(task.vault_location.as_deref(), Some("./.mk/vault"));
1361      assert_eq!(task.keys_location.as_deref(), Some("./.mk/keys"));
1362      assert_eq!(task.key_name.as_deref(), Some("team"));
1363      assert_eq!(
1364        task.environment.get("SECRET_VALUE").map(String::as_str),
1365        Some("${{ secrets.app/password }}")
1366      );
1367    } else {
1368      panic!("Expected Task::Task");
1369    }
1370
1371    Ok(())
1372  }
1373
1374  #[test]
1375  fn test_parallel_interactive_rejected() -> anyhow::Result<()> {
1376    let yaml = r#"
1377          commands:
1378            - command: "echo hello"
1379              interactive: true
1380            - command: "echo world"
1381          parallel: true
1382      "#;
1383
1384    let task = serde_yaml::from_str::<Task>(yaml)?;
1385    let mut context = TaskContext::empty();
1386
1387    if let Task::Task(task) = task {
1388      let result = task.run(&mut context);
1389      assert!(result.is_err());
1390      assert!(result
1391        .unwrap_err()
1392        .to_string()
1393        .contains("Interactive local commands cannot be run in parallel"));
1394    }
1395
1396    Ok(())
1397  }
1398
1399  #[test]
1400  fn test_parallel_non_interactive_accepted() -> anyhow::Result<()> {
1401    let yaml = r#"
1402          commands:
1403            - command: "echo hello"
1404              interactive: false
1405            - command: "echo world"
1406          parallel: true
1407      "#;
1408
1409    let task = serde_yaml::from_str::<Task>(yaml)?;
1410    let mut context = TaskContext::empty();
1411
1412    if let Task::Task(task) = task {
1413      let result = task.run(&mut context);
1414      assert!(result.is_ok());
1415    }
1416
1417    Ok(())
1418  }
1419
1420  #[test]
1421  fn test_parallel_retrigger_rejected() -> anyhow::Result<()> {
1422    let yaml = r#"
1423          commands:
1424            - command: "go run ."
1425              retrigger: true
1426          parallel: true
1427      "#;
1428
1429    let task = serde_yaml::from_str::<Task>(yaml)?;
1430    let mut context = TaskContext::empty();
1431
1432    if let Task::Task(task) = task {
1433      let result = task.run(&mut context);
1434      assert!(result.is_err());
1435      assert!(result
1436        .unwrap_err()
1437        .to_string()
1438        .contains("Local commands with `retrigger: true` cannot be run in parallel"));
1439    }
1440
1441    Ok(())
1442  }
1443}