Skip to main content

mk_lib/schema/
task.rs

1use hashbrown::HashMap;
2use indicatif::{
3  HumanDuration,
4  ProgressBar,
5  ProgressStyle,
6};
7use rand::Rng as _;
8use serde::{
9  Deserialize,
10  Serialize,
11};
12
13use std::io::BufRead as _;
14use std::sync::mpsc::{
15  channel,
16  Receiver,
17  Sender,
18};
19use std::thread;
20use std::time::{
21  Duration,
22  Instant,
23};
24
25use super::{
26  contains_output_reference,
27  extract_output_references,
28  interpolate_template_string,
29  is_shell_command,
30  CommandRunner,
31  Precondition,
32  Shell,
33  TaskContext,
34  TaskDependency,
35};
36use crate::cache::{
37  compute_fingerprint,
38  expand_patterns_in_dir,
39  CacheEntry,
40};
41use crate::defaults::default_verbose;
42use crate::run_shell_command;
43use crate::secrets::load_secret_env;
44use crate::utils::{
45  deserialize_environment,
46  load_env_files_in_dir,
47  resolve_path,
48};
49
50fn default_cache_enabled() -> bool {
51  true
52}
53
54fn default_fail_fast() -> bool {
55  true
56}
57
58#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
59#[serde(rename_all = "snake_case")]
60pub enum ExecutionMode {
61  Sequential,
62  Parallel,
63}
64
65#[derive(Debug, Clone, Deserialize, Serialize)]
66pub struct TaskExecution {
67  #[serde(default)]
68  pub mode: Option<ExecutionMode>,
69
70  #[serde(default)]
71  pub max_parallel: Option<usize>,
72
73  #[serde(default = "default_fail_fast")]
74  pub fail_fast: bool,
75}
76
77#[derive(Debug, Clone, Deserialize, Serialize)]
78pub struct TaskCache {
79  #[serde(default = "default_cache_enabled")]
80  pub enabled: bool,
81}
82
83/// This struct represents a task that can be executed. A task can contain multiple
84/// commands that are executed sequentially. A task can also have preconditions that
85/// must be met before the task can be executed.
86#[derive(Debug, Default, Deserialize)]
87pub struct TaskArgs {
88  /// The commands to run
89  pub commands: Vec<CommandRunner>,
90
91  /// The preconditions that must be met before the task can be executed
92  #[serde(default)]
93  pub preconditions: Vec<Precondition>,
94
95  /// The tasks that must be executed before this task can be executed
96  #[serde(default)]
97  pub depends_on: Vec<TaskDependency>,
98
99  /// The labels for the task
100  #[serde(default)]
101  pub labels: HashMap<String, String>,
102
103  /// The description of the task
104  #[serde(default)]
105  pub description: String,
106
107  /// The environment variables to set before running the task
108  #[serde(default, deserialize_with = "deserialize_environment")]
109  pub environment: HashMap<String, String>,
110
111  /// The environment files to load before running the task
112  #[serde(default)]
113  pub env_file: Vec<String>,
114
115  /// Secret paths to load as dotenv-style environment entries before running the task
116  #[serde(default)]
117  pub secrets_path: Vec<String>,
118
119  /// The path to the secret vault
120  #[serde(default)]
121  pub vault_location: Option<String>,
122
123  /// The path to the private keys used for secret decryption
124  #[serde(default)]
125  pub keys_location: Option<String>,
126
127  /// The key name to use for secret decryption
128  #[serde(default)]
129  pub key_name: Option<String>,
130
131  /// GPG key ID or fingerprint for hardware/passphrase-protected keys (delegates to system gpg)
132  #[serde(default)]
133  pub gpg_key_id: Option<String>,
134
135  /// The shell to use when running the task
136  #[serde(default)]
137  pub shell: Option<Shell>,
138
139  /// Run the commands in parallel
140  /// It should only work if the task are local_run commands
141  #[serde(default)]
142  pub parallel: Option<bool>,
143
144  /// Richer execution configuration
145  #[serde(default)]
146  pub execution: Option<TaskExecution>,
147
148  /// Task caching configuration
149  #[serde(default)]
150  pub cache: Option<TaskCache>,
151
152  /// Files or glob patterns that affect task output
153  #[serde(default)]
154  pub inputs: Vec<String>,
155
156  /// Files produced by the task
157  #[serde(default)]
158  pub outputs: Vec<String>,
159
160  /// Ignore errors if the task fails
161  #[serde(default)]
162  pub ignore_errors: Option<bool>,
163
164  /// Show verbose output
165  #[serde(default)]
166  pub verbose: Option<bool>,
167}
168
169#[derive(Debug, Deserialize)]
170#[serde(untagged)]
171pub enum Task {
172  String(String),
173  Task(Box<TaskArgs>),
174}
175
176#[derive(Debug)]
177pub struct CommandResult {
178  index: usize,
179  success: bool,
180  message: String,
181}
182
183impl Task {
184  pub fn run(&self, context: &mut TaskContext) -> anyhow::Result<()> {
185    match self {
186      Task::String(command) => self.execute(context, command),
187      Task::Task(args) => args.run(context),
188    }
189  }
190
191  fn execute(&self, context: &mut TaskContext, command: &str) -> anyhow::Result<()> {
192    assert!(!command.is_empty());
193
194    TaskArgs {
195      commands: vec![CommandRunner::CommandRun(command.to_string())],
196      ..Default::default()
197    }
198    .run(context)
199  }
200}
201
202impl TaskArgs {
203  pub fn run(&self, context: &mut TaskContext) -> anyhow::Result<()> {
204    assert!(!self.commands.is_empty());
205
206    // Validate parallel execution requirements early
207    self.validate_parallel_commands()?;
208
209    let started = Instant::now();
210    let tick_interval = Duration::from_millis(80);
211
212    if let Some(shell) = &self.shell {
213      context.set_shell(shell);
214    }
215
216    if let Some(ignore_errors) = &self.ignore_errors {
217      context.set_ignore_errors(*ignore_errors);
218    }
219
220    if let Some(verbose) = &self.verbose {
221      context.set_verbose(*verbose);
222    }
223
224    if !context.is_nested {
225      if let Some(vault_location) = &context.task_root.vault_location {
226        context.set_secret_vault_location(vault_location.clone());
227      }
228
229      if let Some(keys_location) = &context.task_root.keys_location {
230        context.set_secret_keys_location(keys_location.clone());
231      }
232
233      if let Some(key_name) = &context.task_root.key_name {
234        context.set_secret_key_name(key_name.clone());
235      }
236
237      if let Some(gpg_key_id) = &context.task_root.gpg_key_id {
238        context.set_secret_gpg_key_id(gpg_key_id.clone());
239      }
240    }
241
242    if let Some(vault_location) = &self.vault_location {
243      context.set_secret_vault_location(vault_location.clone());
244    }
245
246    if let Some(keys_location) = &self.keys_location {
247      context.set_secret_keys_location(keys_location.clone());
248    }
249
250    if let Some(key_name) = &self.key_name {
251      context.set_secret_key_name(key_name.clone());
252    }
253
254    if let Some(gpg_key_id) = &self.gpg_key_id {
255      context.set_secret_gpg_key_id(gpg_key_id.clone());
256    }
257
258    // Load environment variables from root and task environments and env files.
259    if !context.is_nested {
260      let config_base_dir = self.config_base_dir(context);
261      let root_env = context.task_root.environment.clone();
262      let root_env_files = load_env_files_in_dir(&context.task_root.env_file, &config_base_dir)?;
263      let root_secret_env = load_secret_env(
264        &context.task_root.secrets_path,
265        &config_base_dir,
266        context.secret_vault_location.as_deref(),
267        context.secret_keys_location.as_deref(),
268        context.secret_key_name.as_deref(),
269        context.secret_gpg_key_id.as_deref(),
270      )?;
271      context.extend_env_vars(root_env);
272      context.extend_env_vars(root_env_files);
273      context.extend_env_vars(root_secret_env);
274    }
275
276    // Load environment variables from the task environment and env files field
277    let defined_env = self.load_static_env(context)?;
278    let additional_env = self.load_env_file(context)?;
279    let secret_env = self.load_secret_env(context)?;
280
281    context.extend_env_vars(defined_env);
282    context.extend_env_vars(additional_env);
283    context.extend_env_vars(secret_env);
284
285    if self.should_skip_from_cache(context)? {
286      context.emit_event(&serde_json::json!({
287        "event": "task_skipped",
288        "task": context.current_task_name.clone().unwrap_or_else(|| "<task>".to_string()),
289        "reason": "cache_hit",
290      }))?;
291      return Ok(());
292    }
293
294    let mut rng = rand::thread_rng();
295    // Spinners can be found here:
296    // https://github.com/sindresorhus/cli-spinners/blob/main/spinners.json
297    let pb_style =
298      ProgressStyle::with_template("{spinner:.green} [{prefix:.bold.dim}] {wide_msg:.cyan/blue} ")?
299        .tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏⦿");
300
301    let depends_on_pb = context.multi.add(ProgressBar::new(self.depends_on.len() as u64));
302
303    if !self.depends_on.is_empty() {
304      depends_on_pb.set_style(pb_style.clone());
305      depends_on_pb.set_message("Running task dependencies...");
306      depends_on_pb.enable_steady_tick(tick_interval);
307      for (i, dependency) in self.depends_on.iter().enumerate() {
308        thread::sleep(Duration::from_millis(rng.gen_range(40..300)));
309        depends_on_pb.set_prefix(format!("{}/{}", i + 1, self.depends_on.len()));
310        dependency.run(context)?;
311        depends_on_pb.inc(1);
312      }
313
314      let message = format!("Dependencies completed in {}.", HumanDuration(started.elapsed()));
315      if context.is_nested {
316        depends_on_pb.finish_and_clear();
317      } else {
318        depends_on_pb.finish_with_message(message);
319      }
320    }
321
322    let precondition_pb = context
323      .multi
324      .add(ProgressBar::new(self.preconditions.len() as u64));
325
326    if !self.preconditions.is_empty() {
327      precondition_pb.set_style(pb_style.clone());
328      precondition_pb.set_message("Running task precondition...");
329      precondition_pb.enable_steady_tick(tick_interval);
330      for (i, precondition) in self.preconditions.iter().enumerate() {
331        thread::sleep(Duration::from_millis(rng.gen_range(40..300)));
332        precondition_pb.set_prefix(format!("{}/{}", i + 1, self.preconditions.len()));
333        precondition.execute(context)?;
334        precondition_pb.inc(1);
335      }
336
337      let message = format!("Preconditions completed in {}.", HumanDuration(started.elapsed()));
338      if context.is_nested {
339        precondition_pb.finish_and_clear();
340      } else {
341        precondition_pb.finish_with_message(message);
342      }
343    }
344
345    if self.is_parallel() {
346      self.execute_commands_parallel(context)?;
347    } else {
348      let command_pb = context.multi.add(ProgressBar::new(self.commands.len() as u64));
349      command_pb.set_style(pb_style);
350      command_pb.set_message("Running task command...");
351      command_pb.enable_steady_tick(tick_interval);
352      for (i, command) in self.commands.iter().enumerate() {
353        thread::sleep(Duration::from_millis(rng.gen_range(100..400)));
354        command_pb.set_prefix(format!("{}/{}", i + 1, self.commands.len()));
355        self.refresh_output_env(context)?;
356        command.execute(context)?;
357        command_pb.inc(1);
358      }
359
360      let message = format!("Commands completed in {}.", HumanDuration(started.elapsed()));
361      if context.is_nested {
362        command_pb.finish_and_clear();
363      } else {
364        command_pb.finish_with_message(message);
365      }
366    }
367
368    self.update_cache(context)?;
369
370    Ok(())
371  }
372
373  /// Validate if the task can be run in parallel
374  fn validate_parallel_commands(&self) -> anyhow::Result<()> {
375    if !self.is_parallel() {
376      return Ok(());
377    }
378
379    for command in &self.commands {
380      match command {
381        CommandRunner::LocalRun(local_run) if local_run.is_parallel_safe() => continue,
382        CommandRunner::LocalRun(local_run) if local_run.interactive_enabled() => {
383          return Err(anyhow::anyhow!(
384            "Interactive local commands cannot be run in parallel"
385          ))
386        },
387        CommandRunner::LocalRun(_) => {
388          return Err(anyhow::anyhow!(
389            "Local commands with `retrigger: true` cannot be run in parallel"
390          ))
391        },
392        _ => {
393          return Err(anyhow::anyhow!(
394            "Parallel execution is only supported for non-interactive local commands"
395          ))
396        },
397      }
398    }
399    Ok(())
400  }
401
402  /// Execute the commands in parallel
403  fn execute_commands_parallel(&self, context: &TaskContext) -> anyhow::Result<()> {
404    let (tx, rx): (Sender<CommandResult>, Receiver<CommandResult>) = channel();
405    let mut handles = vec![];
406    let command_count = self.commands.len();
407    let max_parallel = self.max_parallel().min(command_count.max(1));
408    let fail_fast = self.fail_fast();
409    let command_pb = context.multi.add(ProgressBar::new(command_count as u64));
410    command_pb.set_style(ProgressStyle::with_template(
411      "{spinner:.green} [{prefix:.bold.dim}] {wide_msg:.cyan/blue} ",
412    )?);
413    command_pb.set_prefix("?/?");
414    command_pb.set_message("Running task commands in parallel...");
415    command_pb.enable_steady_tick(Duration::from_millis(80));
416    let mut failures = Vec::new();
417
418    // Clone all commands upfront to avoid borrowing issues
419    let commands: Vec<_> = self.commands.to_vec();
420
421    // Track results in order
422    let mut completed = 0;
423
424    let mut iter = commands.into_iter().enumerate();
425    let mut running = 0usize;
426    let mut stop_scheduling = false;
427
428    while completed < command_count {
429      while !stop_scheduling && running < max_parallel {
430        let Some((i, command)) = iter.next() else {
431          break;
432        };
433
434        let tx = tx.clone();
435        let context = context.clone();
436
437        let handle = thread::spawn(move || {
438          let result = match command.execute(&context) {
439            Ok(_) => CommandResult {
440              index: i,
441              success: true,
442              message: format!("Command {} completed successfully", i + 1),
443            },
444            Err(e) => CommandResult {
445              index: i,
446              success: false,
447              message: format!("Command {} failed: {}", i + 1, e),
448            },
449          };
450          tx.send(result).unwrap();
451        });
452
453        handles.push(handle);
454        running += 1;
455      }
456
457      if running == 0 {
458        break;
459      }
460
461      match rx.recv() {
462        Ok(result) => {
463          running -= 1;
464          let index = result.index;
465          if !result.success && !context.ignore_errors() {
466            failures.push(result.message);
467            if fail_fast {
468              stop_scheduling = true;
469            }
470          }
471
472          completed += 1;
473          command_pb.set_prefix(format!("{}/{}", completed, command_count));
474          command_pb.inc(1);
475
476          command_pb.set_message(format!(
477            "Running task commands in parallel (completed {})",
478            index + 1
479          ));
480        },
481        Err(e) => {
482          command_pb.finish_with_message("Error receiving command results");
483          return Err(anyhow::anyhow!("Channel error: {}", e));
484        },
485      }
486    }
487
488    // Wait for all threads to complete
489    for handle in handles {
490      handle.join().unwrap();
491    }
492
493    if !failures.is_empty() {
494      command_pb.finish_with_message("Some commands failed");
495
496      // Sort failures by command index for clearer error reporting
497      failures.sort();
498      return Err(anyhow::anyhow!("Failed commands:\n{}", failures.join("\n")));
499    }
500
501    let message = "Commands completed in parallel";
502    if context.is_nested {
503      command_pb.finish_and_clear();
504    } else {
505      command_pb.finish_with_message(message);
506    }
507
508    Ok(())
509  }
510
511  fn load_static_env(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
512    let mut local_env: HashMap<String, String> = HashMap::new();
513    for (key, value) in &self.environment {
514      if contains_output_reference(value) {
515        continue;
516      }
517      let value = self.get_env_value(context, value)?;
518      local_env.insert(key.clone(), value);
519    }
520
521    Ok(local_env)
522  }
523
524  fn refresh_output_env(&self, context: &mut TaskContext) -> anyhow::Result<()> {
525    let mut updated_env = HashMap::new();
526
527    for (key, value) in &self.environment {
528      let output_refs = extract_output_references(value);
529      if output_refs.is_empty() {
530        continue;
531      }
532
533      let mut all_outputs_ready = true;
534      for output_name in &output_refs {
535        if !context.has_task_output(output_name)? {
536          all_outputs_ready = false;
537          break;
538        }
539      }
540      if !all_outputs_ready {
541        continue;
542      }
543
544      updated_env.insert(key.clone(), self.get_env_value(context, value)?);
545    }
546
547    context.extend_env_vars(updated_env);
548    Ok(())
549  }
550
551  fn load_env_file(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
552    load_env_files_in_dir(&self.env_file, &self.config_base_dir(context))
553  }
554
555  fn load_secret_env(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
556    load_secret_env(
557      &self.secrets_path,
558      &self.config_base_dir(context),
559      context.secret_vault_location.as_deref(),
560      context.secret_keys_location.as_deref(),
561      context.secret_key_name.as_deref(),
562      context.secret_gpg_key_id.as_deref(),
563    )
564  }
565
566  fn get_env_value(&self, context: &TaskContext, value_in: &str) -> anyhow::Result<String> {
567    if is_shell_command(value_in)? {
568      let verbose = self.verbose();
569      let mut cmd = self
570        .shell
571        .as_ref()
572        .map(|shell| shell.proc())
573        .unwrap_or_else(|| context.shell().proc());
574      let output = run_shell_command!(value_in, cmd, verbose);
575      Ok(output)
576    } else if super::is_template_command(value_in)? || contains_output_reference(value_in) {
577      Ok(interpolate_template_string(value_in, context)?)
578    } else {
579      Ok(value_in.to_string())
580    }
581  }
582
583  fn verbose(&self) -> bool {
584    self.verbose.unwrap_or(default_verbose())
585  }
586
587  pub(crate) fn execution_mode(&self) -> ExecutionMode {
588    self
589      .execution
590      .as_ref()
591      .and_then(|execution| execution.mode.clone())
592      .or_else(|| {
593        self.parallel.map(|parallel| {
594          if parallel {
595            ExecutionMode::Parallel
596          } else {
597            ExecutionMode::Sequential
598          }
599        })
600      })
601      .unwrap_or(ExecutionMode::Sequential)
602  }
603
604  pub(crate) fn is_parallel(&self) -> bool {
605    self.execution_mode().is_parallel()
606  }
607
608  pub(crate) fn max_parallel(&self) -> usize {
609    self
610      .execution
611      .as_ref()
612      .and_then(|execution| execution.max_parallel)
613      .unwrap_or(self.commands.len().max(1))
614  }
615
616  pub(crate) fn fail_fast(&self) -> bool {
617    self
618      .execution
619      .as_ref()
620      .map(|execution| execution.fail_fast)
621      .unwrap_or(true)
622  }
623
624  fn cache_enabled(&self) -> bool {
625    self.cache.as_ref().map(|cache| cache.enabled).unwrap_or(false)
626  }
627
628  fn should_skip_from_cache(&self, context: &TaskContext) -> anyhow::Result<bool> {
629    if context.force || !self.cache_enabled() || self.outputs.is_empty() {
630      return Ok(false);
631    }
632
633    let resolved_outputs = self.resolve_output_paths(context)?;
634    let outputs_exist = self
635      .resolve_output_paths(context)?
636      .iter()
637      .all(|output| output.exists());
638    if !outputs_exist {
639      return Ok(false);
640    }
641
642    let env_vars = sorted_env_vars(&context.env_vars);
643    let inputs = self.resolve_input_paths(context)?;
644    let mut env_files = self.resolve_env_file_paths(context);
645    env_files.extend(self.resolve_secret_paths(context));
646    env_files.sort();
647    env_files.dedup();
648    let fingerprint = compute_fingerprint(
649      &context
650        .current_task_name
651        .clone()
652        .unwrap_or_else(|| "<task>".to_string()),
653      &stable_task_debug(self),
654      &env_vars,
655      &inputs,
656      &env_files,
657      &resolved_outputs,
658    )?;
659
660    let store = context
661      .cache_store
662      .lock()
663      .map_err(|e| anyhow::anyhow!("Failed to lock cache store - {}", e))?;
664    let Some(entry) = store.tasks.get(&fingerprint_task_key(context, self)) else {
665      return Ok(false);
666    };
667
668    Ok(entry.fingerprint == fingerprint)
669  }
670
671  fn update_cache(&self, context: &TaskContext) -> anyhow::Result<()> {
672    if !self.cache_enabled() || self.outputs.is_empty() {
673      return Ok(());
674    }
675
676    let env_vars = sorted_env_vars(&context.env_vars);
677    let inputs = self.resolve_input_paths(context)?;
678    let mut env_files = self.resolve_env_file_paths(context);
679    env_files.extend(self.resolve_secret_paths(context));
680    env_files.sort();
681    env_files.dedup();
682    let resolved_outputs = self.resolve_output_paths(context)?;
683    let fingerprint = compute_fingerprint(
684      &context
685        .current_task_name
686        .clone()
687        .unwrap_or_else(|| "<task>".to_string()),
688      &stable_task_debug(self),
689      &env_vars,
690      &inputs,
691      &env_files,
692      &resolved_outputs,
693    )?;
694    let key = fingerprint_task_key(context, self);
695
696    {
697      let mut store = context
698        .cache_store
699        .lock()
700        .map_err(|e| anyhow::anyhow!("Failed to lock cache store - {}", e))?;
701      store.tasks.insert(
702        key,
703        CacheEntry {
704          fingerprint,
705          outputs: resolved_outputs
706            .iter()
707            .map(|path| path.to_string_lossy().into_owned())
708            .collect(),
709          updated_at: chrono::Utc::now().to_rfc3339(),
710        },
711      );
712      store.save_in_dir(&context.task_root.cache_base_dir())?;
713    }
714
715    Ok(())
716  }
717
718  pub(crate) fn config_base_dir_from_root(&self, root: &super::TaskRoot) -> std::path::PathBuf {
719    root.config_base_dir()
720  }
721
722  pub(crate) fn task_base_dir_from_root(&self, root: &super::TaskRoot) -> std::path::PathBuf {
723    let config_base_dir = self.config_base_dir_from_root(root);
724    let mut work_dirs = self
725      .commands
726      .iter()
727      .filter_map(|command| match command {
728        CommandRunner::LocalRun(local_run) => local_run.work_dir.as_ref(),
729        _ => None,
730      })
731      .map(|work_dir| resolve_path(&config_base_dir, work_dir))
732      .collect::<Vec<_>>();
733
734    work_dirs.sort();
735    work_dirs.dedup();
736
737    if work_dirs.len() == 1 {
738      work_dirs.remove(0)
739    } else {
740      config_base_dir
741    }
742  }
743
744  fn config_base_dir(&self, context: &TaskContext) -> std::path::PathBuf {
745    self.config_base_dir_from_root(&context.task_root)
746  }
747
748  fn task_base_dir(&self, context: &TaskContext) -> std::path::PathBuf {
749    self.task_base_dir_from_root(&context.task_root)
750  }
751
752  fn resolve_input_paths(&self, context: &TaskContext) -> anyhow::Result<Vec<std::path::PathBuf>> {
753    expand_patterns_in_dir(&self.task_base_dir(context), &self.inputs)
754  }
755
756  fn resolve_output_paths(&self, context: &TaskContext) -> anyhow::Result<Vec<std::path::PathBuf>> {
757    let base_dir = self.task_base_dir(context);
758    Ok(
759      self
760        .outputs
761        .iter()
762        .map(|output| resolve_path(&base_dir, output))
763        .collect(),
764    )
765  }
766
767  fn resolve_env_file_paths(&self, context: &TaskContext) -> Vec<std::path::PathBuf> {
768    let config_base_dir = self.config_base_dir(context);
769    let mut env_files = context
770      .task_root
771      .env_file
772      .iter()
773      .chain(self.env_file.iter())
774      .map(|env_file| resolve_path(&config_base_dir, env_file))
775      .collect::<Vec<_>>();
776    env_files.sort();
777    env_files.dedup();
778    env_files
779  }
780
781  fn resolve_secret_paths(&self, context: &TaskContext) -> Vec<std::path::PathBuf> {
782    let config_base_dir = self.config_base_dir(context);
783    let vault_location = context
784      .secret_vault_location
785      .as_deref()
786      .map(|path| resolve_path(&config_base_dir, path))
787      .unwrap_or_else(|| resolve_path(&config_base_dir, "./.mk/vault"));
788    let mut secret_paths = context
789      .task_root
790      .secrets_path
791      .iter()
792      .chain(self.secrets_path.iter())
793      .map(|secret_path| vault_location.join(secret_path))
794      .collect::<Vec<_>>();
795    secret_paths.sort();
796    secret_paths.dedup();
797    secret_paths
798  }
799}
800
801impl ExecutionMode {
802  fn is_parallel(&self) -> bool {
803    matches!(self, ExecutionMode::Parallel)
804  }
805}
806
807fn sorted_env_vars(env_vars: &HashMap<String, String>) -> Vec<(String, String)> {
808  let mut pairs: Vec<_> = env_vars
809    .iter()
810    .map(|(key, value)| (key.clone(), value.clone()))
811    .collect();
812  pairs.sort();
813  pairs
814}
815
816fn fingerprint_task_key(context: &TaskContext, task: &TaskArgs) -> String {
817  context.current_task_name.clone().unwrap_or_else(|| {
818    if !task.description.is_empty() {
819      task.description.clone()
820    } else {
821      format!("{:?}", task.commands)
822    }
823  })
824}
825
826fn stable_task_debug(task: &TaskArgs) -> String {
827  let mut labels: Vec<_> = task
828    .labels
829    .iter()
830    .map(|(key, value)| (key.clone(), value.clone()))
831    .collect();
832  labels.sort();
833
834  let mut environment: Vec<_> = task
835    .environment
836    .iter()
837    .map(|(key, value)| (key.clone(), value.clone()))
838    .collect();
839  environment.sort();
840
841  let mut secrets_path = task.secrets_path.clone();
842  secrets_path.sort();
843
844  format!(
845    "commands={:?};preconditions={:?};depends_on={:?};labels={:?};description={:?};environment={:?};env_file={:?};secrets_path={:?};vault_location={:?};keys_location={:?};key_name={:?};shell={:?};execution_mode={:?};max_parallel={:?};fail_fast={};cache_enabled={};inputs={:?};outputs={:?};ignore_errors={:?};verbose={:?}",
846    task.commands,
847    task.preconditions,
848    task.depends_on,
849    labels,
850    task.description,
851    environment,
852    task.env_file,
853    secrets_path,
854    task.vault_location,
855    task.keys_location,
856    task.key_name,
857    task.shell,
858    task.execution_mode(),
859    task.execution.as_ref().and_then(|execution| execution.max_parallel),
860    task.fail_fast(),
861    task.cache_enabled(),
862    task.inputs,
863    task.outputs,
864    task.ignore_errors,
865    task.verbose
866  )
867}
868
869#[cfg(test)]
870mod test {
871  use super::*;
872
873  #[test]
874  fn test_task_1() -> anyhow::Result<()> {
875    {
876      let yaml = "
877        commands:
878          - command: echo \"Hello, World!\"
879            ignore_errors: false
880            verbose: false
881        depends_on:
882          - name: task1
883        description: This is a task
884        environment:
885          FOO: bar
886        env_file:
887          - test.env
888          - test2.env
889      ";
890
891      let task = serde_yaml::from_str::<Task>(yaml)?;
892
893      if let Task::Task(task) = &task {
894        if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
895          assert_eq!(local_run.command, "echo \"Hello, World!\"");
896          assert_eq!(local_run.work_dir, None);
897          assert_eq!(local_run.ignore_errors, Some(false));
898          assert_eq!(local_run.verbose, Some(false));
899        }
900
901        if let TaskDependency::TaskDependency(args) = &task.depends_on[0] {
902          assert_eq!(args.name, "task1");
903        }
904
905        assert_eq!(task.labels.len(), 0);
906        assert_eq!(task.description, "This is a task");
907        assert_eq!(task.environment.len(), 1);
908        assert_eq!(task.env_file.len(), 2);
909      } else {
910        panic!("Expected Task::Task");
911      }
912
913      Ok(())
914    }
915  }
916
917  #[test]
918  fn test_task_2() -> anyhow::Result<()> {
919    {
920      let yaml = "
921        commands:
922          - command: echo 'Hello, World!'
923            ignore_errors: false
924            verbose: false
925        description: This is a task
926        environment:
927          FOO: bar
928          BAR: foo
929      ";
930
931      let task = serde_yaml::from_str::<Task>(yaml)?;
932
933      if let Task::Task(task) = &task {
934        if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
935          assert_eq!(local_run.command, "echo 'Hello, World!'");
936          assert_eq!(local_run.work_dir, None);
937          assert_eq!(local_run.ignore_errors, Some(false));
938          assert_eq!(local_run.verbose, Some(false));
939        }
940
941        assert_eq!(task.description, "This is a task");
942        assert_eq!(task.depends_on.len(), 0);
943        assert_eq!(task.labels.len(), 0);
944        assert_eq!(task.env_file.len(), 0);
945        assert_eq!(task.environment.len(), 2);
946      } else {
947        panic!("Expected Task::Task");
948      }
949
950      Ok(())
951    }
952  }
953
954  #[test]
955  fn test_task_3() -> anyhow::Result<()> {
956    {
957      let yaml = "
958        commands:
959          - command: echo 'Hello, World!'
960      ";
961
962      let task = serde_yaml::from_str::<Task>(yaml)?;
963
964      if let Task::Task(task) = &task {
965        if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
966          assert_eq!(local_run.command, "echo 'Hello, World!'");
967          assert_eq!(local_run.work_dir, None);
968          assert_eq!(local_run.ignore_errors, None);
969          assert_eq!(local_run.verbose, None);
970        }
971
972        assert_eq!(task.description.len(), 0);
973        assert_eq!(task.depends_on.len(), 0);
974        assert_eq!(task.labels.len(), 0);
975        assert_eq!(task.env_file.len(), 0);
976        assert_eq!(task.environment.len(), 0);
977      } else {
978        panic!("Expected Task::Task");
979      }
980
981      Ok(())
982    }
983  }
984
985  #[test]
986  fn test_task_4() -> anyhow::Result<()> {
987    {
988      let yaml = "
989        commands:
990          - container_command:
991              - echo
992              - Hello, World!
993            image: docker.io/library/hello-world:latest
994      ";
995
996      let task = serde_yaml::from_str::<Task>(yaml)?;
997
998      if let Task::Task(task) = &task {
999        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1000          assert_eq!(container_run.container_command.len(), 2);
1001          assert_eq!(container_run.container_command[0], "echo");
1002          assert_eq!(container_run.container_command[1], "Hello, World!");
1003          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1004          assert_eq!(container_run.mounted_paths, Vec::<String>::new());
1005          assert_eq!(container_run.ignore_errors, None);
1006          assert_eq!(container_run.verbose, None);
1007        }
1008
1009        assert_eq!(task.description.len(), 0);
1010        assert_eq!(task.depends_on.len(), 0);
1011        assert_eq!(task.labels.len(), 0);
1012        assert_eq!(task.env_file.len(), 0);
1013        assert_eq!(task.environment.len(), 0);
1014      } else {
1015        panic!("Expected Task::Task");
1016      }
1017
1018      Ok(())
1019    }
1020  }
1021
1022  #[test]
1023  fn test_task_5() -> anyhow::Result<()> {
1024    {
1025      let yaml = "
1026        commands:
1027          - container_command:
1028              - echo
1029              - Hello, World!
1030            image: docker.io/library/hello-world:latest
1031            mounted_paths:
1032              - /tmp
1033              - /var/tmp
1034      ";
1035
1036      let task = serde_yaml::from_str::<Task>(yaml)?;
1037
1038      if let Task::Task(task) = &task {
1039        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1040          assert_eq!(container_run.container_command.len(), 2);
1041          assert_eq!(container_run.container_command[0], "echo");
1042          assert_eq!(container_run.container_command[1], "Hello, World!");
1043          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1044          assert_eq!(container_run.mounted_paths, vec!["/tmp", "/var/tmp"]);
1045          assert_eq!(container_run.ignore_errors, None);
1046          assert_eq!(container_run.verbose, None);
1047        }
1048
1049        assert_eq!(task.description.len(), 0);
1050        assert_eq!(task.depends_on.len(), 0);
1051        assert_eq!(task.labels.len(), 0);
1052        assert_eq!(task.env_file.len(), 0);
1053        assert_eq!(task.environment.len(), 0);
1054      } else {
1055        panic!("Expected Task::Task");
1056      }
1057
1058      Ok(())
1059    }
1060  }
1061
1062  #[test]
1063  fn test_task_6() -> anyhow::Result<()> {
1064    {
1065      let yaml = "
1066        commands:
1067          - container_command:
1068              - echo
1069              - Hello, World!
1070            image: docker.io/library/hello-world:latest
1071            mounted_paths:
1072              - /tmp
1073              - /var/tmp
1074            ignore_errors: true
1075      ";
1076
1077      let task = serde_yaml::from_str::<Task>(yaml)?;
1078
1079      if let Task::Task(task) = &task {
1080        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1081          assert_eq!(container_run.container_command.len(), 2);
1082          assert_eq!(container_run.container_command[0], "echo");
1083          assert_eq!(container_run.container_command[1], "Hello, World!");
1084          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1085          assert_eq!(container_run.mounted_paths, vec!["/tmp", "/var/tmp"]);
1086          assert_eq!(container_run.ignore_errors, Some(true));
1087          assert_eq!(container_run.verbose, None);
1088        }
1089
1090        assert_eq!(task.description.len(), 0);
1091        assert_eq!(task.depends_on.len(), 0);
1092        assert_eq!(task.labels.len(), 0);
1093        assert_eq!(task.env_file.len(), 0);
1094        assert_eq!(task.environment.len(), 0);
1095      } else {
1096        panic!("Expected Task::Task");
1097      }
1098
1099      Ok(())
1100    }
1101  }
1102
1103  #[test]
1104  fn test_task_7() -> anyhow::Result<()> {
1105    {
1106      let yaml = "
1107        commands:
1108          - container_command:
1109              - echo
1110              - Hello, World!
1111            image: docker.io/library/hello-world:latest
1112            verbose: false
1113      ";
1114
1115      let task = serde_yaml::from_str::<Task>(yaml)?;
1116
1117      if let Task::Task(task) = &task {
1118        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1119          assert_eq!(container_run.container_command.len(), 2);
1120          assert_eq!(container_run.container_command[0], "echo");
1121          assert_eq!(container_run.container_command[1], "Hello, World!");
1122          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1123          assert_eq!(container_run.mounted_paths, Vec::<String>::new());
1124          assert_eq!(container_run.ignore_errors, None);
1125          assert_eq!(container_run.verbose, Some(false));
1126        }
1127
1128        assert_eq!(task.description.len(), 0);
1129        assert_eq!(task.depends_on.len(), 0);
1130        assert_eq!(task.labels.len(), 0);
1131        assert_eq!(task.env_file.len(), 0);
1132        assert_eq!(task.environment.len(), 0);
1133      } else {
1134        panic!("Expected Task::Task");
1135      }
1136
1137      Ok(())
1138    }
1139  }
1140
1141  #[test]
1142  fn test_task_8() -> anyhow::Result<()> {
1143    {
1144      let yaml = "
1145        commands:
1146          - task: task1
1147      ";
1148
1149      let task = serde_yaml::from_str::<Task>(yaml)?;
1150
1151      if let Task::Task(task) = &task {
1152        if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1153          assert_eq!(task_run.task, "task1");
1154          assert_eq!(task_run.ignore_errors, None);
1155          assert_eq!(task_run.verbose, None);
1156        }
1157
1158        assert_eq!(task.description.len(), 0);
1159        assert_eq!(task.depends_on.len(), 0);
1160        assert_eq!(task.labels.len(), 0);
1161        assert_eq!(task.env_file.len(), 0);
1162        assert_eq!(task.environment.len(), 0);
1163      } else {
1164        panic!("Expected Task::Task");
1165      }
1166
1167      Ok(())
1168    }
1169  }
1170
1171  #[test]
1172  fn test_task_9() -> anyhow::Result<()> {
1173    {
1174      let yaml = "
1175        commands:
1176          - task: task1
1177            verbose: true
1178      ";
1179
1180      let task = serde_yaml::from_str::<Task>(yaml)?;
1181
1182      if let Task::Task(task) = &task {
1183        if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1184          assert_eq!(task_run.task, "task1");
1185          assert_eq!(task_run.ignore_errors, None);
1186          assert_eq!(task_run.verbose, Some(true));
1187        }
1188
1189        assert_eq!(task.description.len(), 0);
1190        assert_eq!(task.depends_on.len(), 0);
1191        assert_eq!(task.labels.len(), 0);
1192        assert_eq!(task.env_file.len(), 0);
1193        assert_eq!(task.environment.len(), 0);
1194      } else {
1195        panic!("Expected Task::Task");
1196      }
1197
1198      Ok(())
1199    }
1200  }
1201
1202  #[test]
1203  fn test_task_10() -> anyhow::Result<()> {
1204    {
1205      let yaml = "
1206        commands:
1207          - task: task1
1208            ignore_errors: true
1209      ";
1210
1211      let task = serde_yaml::from_str::<Task>(yaml)?;
1212
1213      if let Task::Task(task) = &task {
1214        if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1215          assert_eq!(task_run.task, "task1");
1216          assert_eq!(task_run.ignore_errors, Some(true));
1217          assert_eq!(task_run.verbose, None);
1218        }
1219
1220        assert_eq!(task.description.len(), 0);
1221        assert_eq!(task.depends_on.len(), 0);
1222        assert_eq!(task.labels.len(), 0);
1223        assert_eq!(task.env_file.len(), 0);
1224        assert_eq!(task.environment.len(), 0);
1225      } else {
1226        panic!("Expected Task::Task");
1227      }
1228
1229      Ok(())
1230    }
1231  }
1232
1233  #[test]
1234  fn test_task_11() -> anyhow::Result<()> {
1235    {
1236      let yaml = "
1237        echo 'Hello, World!'
1238      ";
1239
1240      let task = serde_yaml::from_str::<Task>(yaml)?;
1241
1242      if let Task::String(task) = &task {
1243        assert_eq!(task, "echo 'Hello, World!'");
1244      } else {
1245        panic!("Expected Task::String");
1246      }
1247
1248      Ok(())
1249    }
1250  }
1251
1252  #[test]
1253  fn test_task_12() -> anyhow::Result<()> {
1254    {
1255      let yaml = "
1256        'true'
1257      ";
1258
1259      let task = serde_yaml::from_str::<Task>(yaml)?;
1260
1261      if let Task::String(task) = &task {
1262        assert_eq!(task, "true");
1263      } else {
1264        panic!("Expected Task::String");
1265      }
1266
1267      Ok(())
1268    }
1269  }
1270
1271  #[test]
1272  fn test_task_13() -> anyhow::Result<()> {
1273    {
1274      let yaml = "
1275        commands: []
1276        environment:
1277          FOO: bar
1278          BAR: foo
1279          KEY: 42
1280          PIS: 3.14
1281      ";
1282
1283      let task = serde_yaml::from_str::<Task>(yaml)?;
1284
1285      if let Task::Task(task) = &task {
1286        assert_eq!(task.environment.len(), 4);
1287        assert_eq!(task.environment.get("FOO").unwrap(), "bar");
1288        assert_eq!(task.environment.get("BAR").unwrap(), "foo");
1289        assert_eq!(task.environment.get("KEY").unwrap(), "42");
1290      } else {
1291        panic!("Expected Task::Task");
1292      }
1293
1294      Ok(())
1295    }
1296  }
1297
1298  #[test]
1299  fn test_task_14() -> anyhow::Result<()> {
1300    let yaml = "
1301      commands: []
1302      secrets_path:
1303        - app/common
1304      vault_location: ./.mk/vault
1305      keys_location: ./.mk/keys
1306      key_name: team
1307      environment:
1308        SECRET_VALUE: ${{ secrets.app/password }}
1309    ";
1310
1311    let task = serde_yaml::from_str::<Task>(yaml)?;
1312
1313    if let Task::Task(task) = &task {
1314      assert_eq!(task.secrets_path, vec!["app/common"]);
1315      assert_eq!(task.vault_location.as_deref(), Some("./.mk/vault"));
1316      assert_eq!(task.keys_location.as_deref(), Some("./.mk/keys"));
1317      assert_eq!(task.key_name.as_deref(), Some("team"));
1318      assert_eq!(
1319        task.environment.get("SECRET_VALUE").map(String::as_str),
1320        Some("${{ secrets.app/password }}")
1321      );
1322    } else {
1323      panic!("Expected Task::Task");
1324    }
1325
1326    Ok(())
1327  }
1328
1329  #[test]
1330  fn test_parallel_interactive_rejected() -> anyhow::Result<()> {
1331    let yaml = r#"
1332          commands:
1333            - command: "echo hello"
1334              interactive: true
1335            - command: "echo world"
1336          parallel: true
1337      "#;
1338
1339    let task = serde_yaml::from_str::<Task>(yaml)?;
1340    let mut context = TaskContext::empty();
1341
1342    if let Task::Task(task) = task {
1343      let result = task.run(&mut context);
1344      assert!(result.is_err());
1345      assert!(result
1346        .unwrap_err()
1347        .to_string()
1348        .contains("Interactive local commands cannot be run in parallel"));
1349    }
1350
1351    Ok(())
1352  }
1353
1354  #[test]
1355  fn test_parallel_non_interactive_accepted() -> anyhow::Result<()> {
1356    let yaml = r#"
1357          commands:
1358            - command: "echo hello"
1359              interactive: false
1360            - command: "echo world"
1361          parallel: true
1362      "#;
1363
1364    let task = serde_yaml::from_str::<Task>(yaml)?;
1365    let mut context = TaskContext::empty();
1366
1367    if let Task::Task(task) = task {
1368      let result = task.run(&mut context);
1369      assert!(result.is_ok());
1370    }
1371
1372    Ok(())
1373  }
1374
1375  #[test]
1376  fn test_parallel_retrigger_rejected() -> anyhow::Result<()> {
1377    let yaml = r#"
1378          commands:
1379            - command: "go run ."
1380              retrigger: true
1381          parallel: true
1382      "#;
1383
1384    let task = serde_yaml::from_str::<Task>(yaml)?;
1385    let mut context = TaskContext::empty();
1386
1387    if let Task::Task(task) = task {
1388      let result = task.run(&mut context);
1389      assert!(result.is_err());
1390      assert!(result
1391        .unwrap_err()
1392        .to_string()
1393        .contains("Local commands with `retrigger: true` cannot be run in parallel"));
1394    }
1395
1396    Ok(())
1397  }
1398}