Skip to main content

mk_lib/schema/
task.rs

1use hashbrown::HashMap;
2use indicatif::{
3  HumanDuration,
4  ProgressBar,
5  ProgressStyle,
6};
7use rand::Rng as _;
8use schemars::JsonSchema;
9use serde::{
10  Deserialize,
11  Serialize,
12};
13
14use std::io::BufRead as _;
15use std::sync::mpsc::{
16  channel,
17  Receiver,
18  Sender,
19};
20use std::thread;
21use std::time::{
22  Duration,
23  Instant,
24};
25
26use super::{
27  contains_output_reference,
28  extract_output_references,
29  interpolate_template_string,
30  is_shell_command,
31  CommandRunner,
32  Precondition,
33  Shell,
34  TaskContext,
35  TaskDependency,
36};
37use crate::cache::{
38  compute_fingerprint,
39  expand_patterns_in_dir,
40  CacheEntry,
41};
42use crate::defaults::default_verbose;
43use crate::run_shell_command;
44use crate::secrets::load_secret_env;
45use crate::utils::{
46  deserialize_environment,
47  load_env_files_in_dir,
48  resolve_path,
49};
50
51fn default_cache_enabled() -> bool {
52  true
53}
54
55fn default_fail_fast() -> bool {
56  true
57}
58
59#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema)]
60#[serde(rename_all = "snake_case")]
61pub enum ExecutionMode {
62  Sequential,
63  Parallel,
64}
65
66#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
67pub struct TaskExecution {
68  #[serde(default)]
69  pub mode: Option<ExecutionMode>,
70
71  #[serde(default)]
72  pub max_parallel: Option<usize>,
73
74  #[serde(default = "default_fail_fast")]
75  pub fail_fast: bool,
76}
77
78#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
79pub struct TaskCache {
80  #[serde(default = "default_cache_enabled")]
81  pub enabled: bool,
82}
83
84/// This struct represents a task that can be executed. A task can contain multiple
85/// commands that are executed sequentially. A task can also have preconditions that
86/// must be met before the task can be executed.
87#[derive(Debug, Default, Deserialize, JsonSchema)]
88pub struct TaskArgs {
89  /// The commands to run
90  pub commands: Vec<CommandRunner>,
91
92  /// The preconditions that must be met before the task can be executed
93  #[serde(default)]
94  pub preconditions: Vec<Precondition>,
95
96  /// The tasks that must be executed before this task can be executed
97  #[serde(default)]
98  pub depends_on: Vec<TaskDependency>,
99
100  /// The labels for the task
101  #[schemars(with = "std::collections::HashMap<String, String>")]
102  #[serde(default)]
103  pub labels: HashMap<String, String>,
104
105  /// The description of the task
106  #[serde(default)]
107  pub description: String,
108
109  /// The environment variables to set before running the task
110  #[schemars(with = "std::collections::HashMap<String, String>")]
111  #[serde(default, deserialize_with = "deserialize_environment")]
112  pub environment: HashMap<String, String>,
113
114  /// The environment files to load before running the task
115  #[serde(default)]
116  pub env_file: Vec<String>,
117
118  /// Secret paths to load as dotenv-style environment entries before running the task
119  #[serde(default)]
120  pub secrets_path: Vec<String>,
121
122  /// The path to the secret vault
123  #[serde(default)]
124  pub vault_location: Option<String>,
125
126  /// The path to the private keys used for secret decryption
127  #[serde(default)]
128  pub keys_location: Option<String>,
129
130  /// The key name to use for secret decryption
131  #[serde(default)]
132  pub key_name: Option<String>,
133
134  /// GPG key ID or fingerprint for hardware/passphrase-protected keys (delegates to system gpg)
135  #[serde(default)]
136  pub gpg_key_id: Option<String>,
137
138  /// The shell to use when running the task
139  #[serde(default)]
140  pub shell: Option<Shell>,
141
142  /// Run the commands in parallel
143  /// It should only work if the task are local_run commands
144  #[serde(default)]
145  pub parallel: Option<bool>,
146
147  /// Richer execution configuration
148  #[serde(default)]
149  pub execution: Option<TaskExecution>,
150
151  /// Task caching configuration
152  #[serde(default)]
153  pub cache: Option<TaskCache>,
154
155  /// Files or glob patterns that affect task output
156  #[serde(default)]
157  pub inputs: Vec<String>,
158
159  /// Files produced by the task
160  #[serde(default)]
161  pub outputs: Vec<String>,
162
163  /// Ignore errors if the task fails
164  #[serde(default)]
165  pub ignore_errors: Option<bool>,
166
167  /// Show verbose output
168  #[serde(default)]
169  pub verbose: Option<bool>,
170}
171
172#[derive(Debug, Deserialize, JsonSchema)]
173#[serde(untagged)]
174/// A task definition: either a command string shorthand or a full task object.
175pub enum Task {
176  String(String),
177  Task(Box<TaskArgs>),
178}
179
180#[derive(Debug)]
181pub struct CommandResult {
182  index: usize,
183  success: bool,
184  message: String,
185}
186
187impl Task {
188  pub fn run(&self, context: &mut TaskContext) -> anyhow::Result<()> {
189    match self {
190      Task::String(command) => self.execute(context, command),
191      Task::Task(args) => args.run(context),
192    }
193  }
194
195  fn execute(&self, context: &mut TaskContext, command: &str) -> anyhow::Result<()> {
196    assert!(!command.is_empty());
197
198    TaskArgs {
199      commands: vec![CommandRunner::CommandRun(command.to_string())],
200      ..Default::default()
201    }
202    .run(context)
203  }
204}
205
206impl TaskArgs {
207  pub fn run(&self, context: &mut TaskContext) -> anyhow::Result<()> {
208    assert!(!self.commands.is_empty());
209
210    // Validate parallel execution requirements early
211    self.validate_parallel_commands()?;
212
213    let started = Instant::now();
214    let tick_interval = Duration::from_millis(80);
215
216    if let Some(shell) = &self.shell {
217      context.set_shell(shell);
218    }
219
220    if let Some(ignore_errors) = &self.ignore_errors {
221      context.set_ignore_errors(*ignore_errors);
222    }
223
224    if let Some(verbose) = &self.verbose {
225      context.set_verbose(*verbose);
226    }
227
228    if !context.is_nested {
229      if let Some(vault_location) = &context.task_root.vault_location {
230        context.set_secret_vault_location(vault_location.clone());
231      }
232
233      if let Some(keys_location) = &context.task_root.keys_location {
234        context.set_secret_keys_location(keys_location.clone());
235      }
236
237      if let Some(key_name) = &context.task_root.key_name {
238        context.set_secret_key_name(key_name.clone());
239      }
240
241      if let Some(gpg_key_id) = &context.task_root.gpg_key_id {
242        context.set_secret_gpg_key_id(gpg_key_id.clone());
243      }
244    }
245
246    if let Some(vault_location) = &self.vault_location {
247      context.set_secret_vault_location(vault_location.clone());
248    }
249
250    if let Some(keys_location) = &self.keys_location {
251      context.set_secret_keys_location(keys_location.clone());
252    }
253
254    if let Some(key_name) = &self.key_name {
255      context.set_secret_key_name(key_name.clone());
256    }
257
258    if let Some(gpg_key_id) = &self.gpg_key_id {
259      context.set_secret_gpg_key_id(gpg_key_id.clone());
260    }
261
262    // Load environment variables from root and task environments and env files.
263    if !context.is_nested {
264      let config_base_dir = self.config_base_dir(context);
265      let root_env = context.task_root.environment.clone();
266      let root_env_files = load_env_files_in_dir(&context.task_root.env_file, &config_base_dir)?;
267      let root_secret_env = load_secret_env(
268        &context.task_root.secrets_path,
269        &config_base_dir,
270        context.secret_vault_location.as_deref(),
271        context.secret_keys_location.as_deref(),
272        context.secret_key_name.as_deref(),
273        context.secret_gpg_key_id.as_deref(),
274      )?;
275      context.extend_env_vars(root_env);
276      context.extend_env_vars(root_env_files);
277      context.extend_env_vars(root_secret_env);
278    }
279
280    // Load environment variables from the task environment and env files field
281    let defined_env = self.load_static_env(context)?;
282    let additional_env = self.load_env_file(context)?;
283    let secret_env = self.load_secret_env(context)?;
284
285    context.extend_env_vars(defined_env);
286    context.extend_env_vars(additional_env);
287    context.extend_env_vars(secret_env);
288
289    if self.should_skip_from_cache(context)? {
290      context.emit_event(&serde_json::json!({
291        "event": "task_skipped",
292        "task": context.current_task_name.clone().unwrap_or_else(|| "<task>".to_string()),
293        "reason": "cache_hit",
294      }))?;
295      return Ok(());
296    }
297
298    let mut rng = rand::thread_rng();
299    // Spinners can be found here:
300    // https://github.com/sindresorhus/cli-spinners/blob/main/spinners.json
301    let pb_style =
302      ProgressStyle::with_template("{spinner:.green} [{prefix:.bold.dim}] {wide_msg:.cyan/blue} ")?
303        .tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏⦿");
304
305    let depends_on_pb = context.multi.add(ProgressBar::new(self.depends_on.len() as u64));
306
307    if !self.depends_on.is_empty() {
308      depends_on_pb.set_style(pb_style.clone());
309      depends_on_pb.set_message("Running task dependencies...");
310      depends_on_pb.enable_steady_tick(tick_interval);
311      for (i, dependency) in self.depends_on.iter().enumerate() {
312        thread::sleep(Duration::from_millis(rng.gen_range(40..300)));
313        depends_on_pb.set_prefix(format!("{}/{}", i + 1, self.depends_on.len()));
314        dependency.run(context)?;
315        depends_on_pb.inc(1);
316      }
317
318      let message = format!("Dependencies completed in {}.", HumanDuration(started.elapsed()));
319      if context.is_nested {
320        depends_on_pb.finish_and_clear();
321      } else {
322        depends_on_pb.finish_with_message(message);
323      }
324    }
325
326    let precondition_pb = context
327      .multi
328      .add(ProgressBar::new(self.preconditions.len() as u64));
329
330    if !self.preconditions.is_empty() {
331      precondition_pb.set_style(pb_style.clone());
332      precondition_pb.set_message("Running task precondition...");
333      precondition_pb.enable_steady_tick(tick_interval);
334      for (i, precondition) in self.preconditions.iter().enumerate() {
335        thread::sleep(Duration::from_millis(rng.gen_range(40..300)));
336        precondition_pb.set_prefix(format!("{}/{}", i + 1, self.preconditions.len()));
337        precondition.execute(context)?;
338        precondition_pb.inc(1);
339      }
340
341      let message = format!("Preconditions completed in {}.", HumanDuration(started.elapsed()));
342      if context.is_nested {
343        precondition_pb.finish_and_clear();
344      } else {
345        precondition_pb.finish_with_message(message);
346      }
347    }
348
349    if self.is_parallel() {
350      self.execute_commands_parallel(context)?;
351    } else {
352      let command_pb = context.multi.add(ProgressBar::new(self.commands.len() as u64));
353      command_pb.set_style(pb_style);
354      command_pb.set_message("Running task command...");
355      command_pb.enable_steady_tick(tick_interval);
356      for (i, command) in self.commands.iter().enumerate() {
357        thread::sleep(Duration::from_millis(rng.gen_range(100..400)));
358        command_pb.set_prefix(format!("{}/{}", i + 1, self.commands.len()));
359        self.refresh_output_env(context)?;
360        command.execute(context)?;
361        command_pb.inc(1);
362      }
363
364      let message = format!("Commands completed in {}.", HumanDuration(started.elapsed()));
365      if context.is_nested {
366        command_pb.finish_and_clear();
367      } else {
368        command_pb.finish_with_message(message);
369      }
370    }
371
372    self.update_cache(context)?;
373
374    Ok(())
375  }
376
377  /// Validate if the task can be run in parallel
378  fn validate_parallel_commands(&self) -> anyhow::Result<()> {
379    if !self.is_parallel() {
380      return Ok(());
381    }
382
383    for command in &self.commands {
384      match command {
385        CommandRunner::LocalRun(local_run) if local_run.is_parallel_safe() => continue,
386        CommandRunner::LocalRun(local_run) if local_run.interactive_enabled() => {
387          return Err(anyhow::anyhow!(
388            "Interactive local commands cannot be run in parallel"
389          ))
390        },
391        CommandRunner::LocalRun(_) => {
392          return Err(anyhow::anyhow!(
393            "Local commands with `retrigger: true` cannot be run in parallel"
394          ))
395        },
396        _ => {
397          return Err(anyhow::anyhow!(
398            "Parallel execution is only supported for non-interactive local commands"
399          ))
400        },
401      }
402    }
403    Ok(())
404  }
405
406  /// Execute the commands in parallel
407  fn execute_commands_parallel(&self, context: &TaskContext) -> anyhow::Result<()> {
408    let (tx, rx): (Sender<CommandResult>, Receiver<CommandResult>) = channel();
409    let mut handles = vec![];
410    let command_count = self.commands.len();
411    let max_parallel = self.max_parallel().min(command_count.max(1));
412    let fail_fast = self.fail_fast();
413    let command_pb = context.multi.add(ProgressBar::new(command_count as u64));
414    command_pb.set_style(ProgressStyle::with_template(
415      "{spinner:.green} [{prefix:.bold.dim}] {wide_msg:.cyan/blue} ",
416    )?);
417    command_pb.set_prefix("?/?");
418    command_pb.set_message("Running task commands in parallel...");
419    command_pb.enable_steady_tick(Duration::from_millis(80));
420    let mut failures = Vec::new();
421
422    // Clone all commands upfront to avoid borrowing issues
423    let commands: Vec<_> = self.commands.to_vec();
424
425    // Track results in order
426    let mut completed = 0;
427
428    let mut iter = commands.into_iter().enumerate();
429    let mut running = 0usize;
430    let mut stop_scheduling = false;
431
432    while completed < command_count {
433      while !stop_scheduling && running < max_parallel {
434        let Some((i, command)) = iter.next() else {
435          break;
436        };
437
438        let tx = tx.clone();
439        let context = context.clone();
440
441        let handle = thread::spawn(move || {
442          let result = match command.execute(&context) {
443            Ok(_) => CommandResult {
444              index: i,
445              success: true,
446              message: format!("Command {} completed successfully", i + 1),
447            },
448            Err(e) => CommandResult {
449              index: i,
450              success: false,
451              message: format!("Command {} failed: {}", i + 1, e),
452            },
453          };
454          tx.send(result).unwrap();
455        });
456
457        handles.push(handle);
458        running += 1;
459      }
460
461      if running == 0 {
462        break;
463      }
464
465      match rx.recv() {
466        Ok(result) => {
467          running -= 1;
468          let index = result.index;
469          if !result.success && !context.ignore_errors() {
470            failures.push(result.message);
471            if fail_fast {
472              stop_scheduling = true;
473            }
474          }
475
476          completed += 1;
477          command_pb.set_prefix(format!("{}/{}", completed, command_count));
478          command_pb.inc(1);
479
480          command_pb.set_message(format!(
481            "Running task commands in parallel (completed {})",
482            index + 1
483          ));
484        },
485        Err(e) => {
486          command_pb.finish_with_message("Error receiving command results");
487          return Err(anyhow::anyhow!("Channel error: {}", e));
488        },
489      }
490    }
491
492    // Wait for all threads to complete
493    for handle in handles {
494      handle.join().unwrap();
495    }
496
497    if !failures.is_empty() {
498      command_pb.finish_with_message("Some commands failed");
499
500      // Sort failures by command index for clearer error reporting
501      failures.sort();
502      return Err(anyhow::anyhow!("Failed commands:\n{}", failures.join("\n")));
503    }
504
505    let message = "Commands completed in parallel";
506    if context.is_nested {
507      command_pb.finish_and_clear();
508    } else {
509      command_pb.finish_with_message(message);
510    }
511
512    Ok(())
513  }
514
515  fn load_static_env(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
516    let mut local_env: HashMap<String, String> = HashMap::new();
517    for (key, value) in &self.environment {
518      if contains_output_reference(value) {
519        continue;
520      }
521      let value = self.get_env_value(context, value)?;
522      local_env.insert(key.clone(), value);
523    }
524
525    Ok(local_env)
526  }
527
528  fn refresh_output_env(&self, context: &mut TaskContext) -> anyhow::Result<()> {
529    let mut updated_env = HashMap::new();
530
531    for (key, value) in &self.environment {
532      let output_refs = extract_output_references(value);
533      if output_refs.is_empty() {
534        continue;
535      }
536
537      let mut all_outputs_ready = true;
538      for output_name in &output_refs {
539        if !context.has_task_output(output_name)? {
540          all_outputs_ready = false;
541          break;
542        }
543      }
544      if !all_outputs_ready {
545        continue;
546      }
547
548      updated_env.insert(key.clone(), self.get_env_value(context, value)?);
549    }
550
551    context.extend_env_vars(updated_env);
552    Ok(())
553  }
554
555  fn load_env_file(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
556    load_env_files_in_dir(&self.env_file, &self.config_base_dir(context))
557  }
558
559  fn load_secret_env(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
560    load_secret_env(
561      &self.secrets_path,
562      &self.config_base_dir(context),
563      context.secret_vault_location.as_deref(),
564      context.secret_keys_location.as_deref(),
565      context.secret_key_name.as_deref(),
566      context.secret_gpg_key_id.as_deref(),
567    )
568  }
569
570  fn get_env_value(&self, context: &TaskContext, value_in: &str) -> anyhow::Result<String> {
571    if is_shell_command(value_in)? {
572      let verbose = self.verbose();
573      let mut cmd = self
574        .shell
575        .as_ref()
576        .map(|shell| shell.proc())
577        .unwrap_or_else(|| context.shell().proc());
578      let output = run_shell_command!(value_in, cmd, verbose);
579      Ok(output)
580    } else if super::is_template_command(value_in)? || contains_output_reference(value_in) {
581      Ok(interpolate_template_string(value_in, context)?)
582    } else {
583      Ok(value_in.to_string())
584    }
585  }
586
587  fn verbose(&self) -> bool {
588    self.verbose.unwrap_or(default_verbose())
589  }
590
591  pub(crate) fn execution_mode(&self) -> ExecutionMode {
592    self
593      .execution
594      .as_ref()
595      .and_then(|execution| execution.mode.clone())
596      .or_else(|| {
597        self.parallel.map(|parallel| {
598          if parallel {
599            ExecutionMode::Parallel
600          } else {
601            ExecutionMode::Sequential
602          }
603        })
604      })
605      .unwrap_or(ExecutionMode::Sequential)
606  }
607
608  pub(crate) fn is_parallel(&self) -> bool {
609    self.execution_mode().is_parallel()
610  }
611
612  pub(crate) fn max_parallel(&self) -> usize {
613    self
614      .execution
615      .as_ref()
616      .and_then(|execution| execution.max_parallel)
617      .unwrap_or(self.commands.len().max(1))
618  }
619
620  pub(crate) fn fail_fast(&self) -> bool {
621    self
622      .execution
623      .as_ref()
624      .map(|execution| execution.fail_fast)
625      .unwrap_or(true)
626  }
627
628  fn cache_enabled(&self) -> bool {
629    self.cache.as_ref().map(|cache| cache.enabled).unwrap_or(false)
630  }
631
632  fn should_skip_from_cache(&self, context: &TaskContext) -> anyhow::Result<bool> {
633    if context.force || !self.cache_enabled() || self.outputs.is_empty() {
634      return Ok(false);
635    }
636
637    let resolved_outputs = self.resolve_output_paths(context)?;
638    let outputs_exist = self
639      .resolve_output_paths(context)?
640      .iter()
641      .all(|output| output.exists());
642    if !outputs_exist {
643      return Ok(false);
644    }
645
646    let env_vars = sorted_env_vars(&context.env_vars);
647    let inputs = self.resolve_input_paths(context)?;
648    let mut env_files = self.resolve_env_file_paths(context);
649    env_files.extend(self.resolve_secret_paths(context));
650    env_files.sort();
651    env_files.dedup();
652    let fingerprint = compute_fingerprint(
653      &context
654        .current_task_name
655        .clone()
656        .unwrap_or_else(|| "<task>".to_string()),
657      &stable_task_debug(self),
658      &env_vars,
659      &inputs,
660      &env_files,
661      &resolved_outputs,
662    )?;
663
664    let store = context
665      .cache_store
666      .lock()
667      .map_err(|e| anyhow::anyhow!("Failed to lock cache store - {}", e))?;
668    let Some(entry) = store.tasks.get(&fingerprint_task_key(context, self)) else {
669      return Ok(false);
670    };
671
672    Ok(entry.fingerprint == fingerprint)
673  }
674
675  fn update_cache(&self, context: &TaskContext) -> anyhow::Result<()> {
676    if !self.cache_enabled() || self.outputs.is_empty() {
677      return Ok(());
678    }
679
680    let env_vars = sorted_env_vars(&context.env_vars);
681    let inputs = self.resolve_input_paths(context)?;
682    let mut env_files = self.resolve_env_file_paths(context);
683    env_files.extend(self.resolve_secret_paths(context));
684    env_files.sort();
685    env_files.dedup();
686    let resolved_outputs = self.resolve_output_paths(context)?;
687    let fingerprint = compute_fingerprint(
688      &context
689        .current_task_name
690        .clone()
691        .unwrap_or_else(|| "<task>".to_string()),
692      &stable_task_debug(self),
693      &env_vars,
694      &inputs,
695      &env_files,
696      &resolved_outputs,
697    )?;
698    let key = fingerprint_task_key(context, self);
699
700    {
701      let mut store = context
702        .cache_store
703        .lock()
704        .map_err(|e| anyhow::anyhow!("Failed to lock cache store - {}", e))?;
705      store.tasks.insert(
706        key,
707        CacheEntry {
708          fingerprint,
709          outputs: resolved_outputs
710            .iter()
711            .map(|path| path.to_string_lossy().into_owned())
712            .collect(),
713          updated_at: chrono::Utc::now().to_rfc3339(),
714        },
715      );
716      store.save_in_dir(&context.task_root.cache_base_dir())?;
717    }
718
719    Ok(())
720  }
721
722  pub(crate) fn config_base_dir_from_root(&self, root: &super::TaskRoot) -> std::path::PathBuf {
723    root.config_base_dir()
724  }
725
726  pub(crate) fn task_base_dir_from_root(&self, root: &super::TaskRoot) -> std::path::PathBuf {
727    let config_base_dir = self.config_base_dir_from_root(root);
728    let mut work_dirs = self
729      .commands
730      .iter()
731      .filter_map(|command| match command {
732        CommandRunner::LocalRun(local_run) => local_run.work_dir.as_ref(),
733        _ => None,
734      })
735      .map(|work_dir| resolve_path(&config_base_dir, work_dir))
736      .collect::<Vec<_>>();
737
738    work_dirs.sort();
739    work_dirs.dedup();
740
741    if work_dirs.len() == 1 {
742      work_dirs.remove(0)
743    } else {
744      config_base_dir
745    }
746  }
747
748  fn config_base_dir(&self, context: &TaskContext) -> std::path::PathBuf {
749    self.config_base_dir_from_root(&context.task_root)
750  }
751
752  fn task_base_dir(&self, context: &TaskContext) -> std::path::PathBuf {
753    self.task_base_dir_from_root(&context.task_root)
754  }
755
756  fn resolve_input_paths(&self, context: &TaskContext) -> anyhow::Result<Vec<std::path::PathBuf>> {
757    expand_patterns_in_dir(&self.task_base_dir(context), &self.inputs)
758  }
759
760  fn resolve_output_paths(&self, context: &TaskContext) -> anyhow::Result<Vec<std::path::PathBuf>> {
761    let base_dir = self.task_base_dir(context);
762    Ok(
763      self
764        .outputs
765        .iter()
766        .map(|output| resolve_path(&base_dir, output))
767        .collect(),
768    )
769  }
770
771  fn resolve_env_file_paths(&self, context: &TaskContext) -> Vec<std::path::PathBuf> {
772    let config_base_dir = self.config_base_dir(context);
773    let mut env_files = context
774      .task_root
775      .env_file
776      .iter()
777      .chain(self.env_file.iter())
778      .map(|env_file| resolve_path(&config_base_dir, env_file))
779      .collect::<Vec<_>>();
780    env_files.sort();
781    env_files.dedup();
782    env_files
783  }
784
785  fn resolve_secret_paths(&self, context: &TaskContext) -> Vec<std::path::PathBuf> {
786    let config_base_dir = self.config_base_dir(context);
787    let vault_location = context
788      .secret_vault_location
789      .as_deref()
790      .map(|path| resolve_path(&config_base_dir, path))
791      .unwrap_or_else(|| resolve_path(&config_base_dir, "./.mk/vault"));
792    let mut secret_paths = context
793      .task_root
794      .secrets_path
795      .iter()
796      .chain(self.secrets_path.iter())
797      .map(|secret_path| vault_location.join(secret_path))
798      .collect::<Vec<_>>();
799    secret_paths.sort();
800    secret_paths.dedup();
801    secret_paths
802  }
803}
804
805impl ExecutionMode {
806  fn is_parallel(&self) -> bool {
807    matches!(self, ExecutionMode::Parallel)
808  }
809}
810
811fn sorted_env_vars(env_vars: &HashMap<String, String>) -> Vec<(String, String)> {
812  let mut pairs: Vec<_> = env_vars
813    .iter()
814    .map(|(key, value)| (key.clone(), value.clone()))
815    .collect();
816  pairs.sort();
817  pairs
818}
819
820fn fingerprint_task_key(context: &TaskContext, task: &TaskArgs) -> String {
821  context.current_task_name.clone().unwrap_or_else(|| {
822    if !task.description.is_empty() {
823      task.description.clone()
824    } else {
825      format!("{:?}", task.commands)
826    }
827  })
828}
829
830fn stable_task_debug(task: &TaskArgs) -> String {
831  let mut labels: Vec<_> = task
832    .labels
833    .iter()
834    .map(|(key, value)| (key.clone(), value.clone()))
835    .collect();
836  labels.sort();
837
838  let mut environment: Vec<_> = task
839    .environment
840    .iter()
841    .map(|(key, value)| (key.clone(), value.clone()))
842    .collect();
843  environment.sort();
844
845  let mut secrets_path = task.secrets_path.clone();
846  secrets_path.sort();
847
848  format!(
849    "commands={:?};preconditions={:?};depends_on={:?};labels={:?};description={:?};environment={:?};env_file={:?};secrets_path={:?};vault_location={:?};keys_location={:?};key_name={:?};shell={:?};execution_mode={:?};max_parallel={:?};fail_fast={};cache_enabled={};inputs={:?};outputs={:?};ignore_errors={:?};verbose={:?}",
850    task.commands,
851    task.preconditions,
852    task.depends_on,
853    labels,
854    task.description,
855    environment,
856    task.env_file,
857    secrets_path,
858    task.vault_location,
859    task.keys_location,
860    task.key_name,
861    task.shell,
862    task.execution_mode(),
863    task.execution.as_ref().and_then(|execution| execution.max_parallel),
864    task.fail_fast(),
865    task.cache_enabled(),
866    task.inputs,
867    task.outputs,
868    task.ignore_errors,
869    task.verbose
870  )
871}
872
873#[cfg(test)]
874mod test {
875  use super::*;
876
877  #[test]
878  fn test_task_1() -> anyhow::Result<()> {
879    {
880      let yaml = "
881        commands:
882          - command: echo \"Hello, World!\"
883            ignore_errors: false
884            verbose: false
885        depends_on:
886          - name: task1
887        description: This is a task
888        environment:
889          FOO: bar
890        env_file:
891          - test.env
892          - test2.env
893      ";
894
895      let task = serde_yaml::from_str::<Task>(yaml)?;
896
897      if let Task::Task(task) = &task {
898        if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
899          assert_eq!(local_run.command, "echo \"Hello, World!\"");
900          assert_eq!(local_run.work_dir, None);
901          assert_eq!(local_run.ignore_errors, Some(false));
902          assert_eq!(local_run.verbose, Some(false));
903        }
904
905        if let TaskDependency::TaskDependency(args) = &task.depends_on[0] {
906          assert_eq!(args.name, "task1");
907        }
908
909        assert_eq!(task.labels.len(), 0);
910        assert_eq!(task.description, "This is a task");
911        assert_eq!(task.environment.len(), 1);
912        assert_eq!(task.env_file.len(), 2);
913      } else {
914        panic!("Expected Task::Task");
915      }
916
917      Ok(())
918    }
919  }
920
921  #[test]
922  fn test_task_2() -> anyhow::Result<()> {
923    {
924      let yaml = "
925        commands:
926          - command: echo 'Hello, World!'
927            ignore_errors: false
928            verbose: false
929        description: This is a task
930        environment:
931          FOO: bar
932          BAR: foo
933      ";
934
935      let task = serde_yaml::from_str::<Task>(yaml)?;
936
937      if let Task::Task(task) = &task {
938        if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
939          assert_eq!(local_run.command, "echo 'Hello, World!'");
940          assert_eq!(local_run.work_dir, None);
941          assert_eq!(local_run.ignore_errors, Some(false));
942          assert_eq!(local_run.verbose, Some(false));
943        }
944
945        assert_eq!(task.description, "This is a task");
946        assert_eq!(task.depends_on.len(), 0);
947        assert_eq!(task.labels.len(), 0);
948        assert_eq!(task.env_file.len(), 0);
949        assert_eq!(task.environment.len(), 2);
950      } else {
951        panic!("Expected Task::Task");
952      }
953
954      Ok(())
955    }
956  }
957
958  #[test]
959  fn test_task_3() -> anyhow::Result<()> {
960    {
961      let yaml = "
962        commands:
963          - command: echo 'Hello, World!'
964      ";
965
966      let task = serde_yaml::from_str::<Task>(yaml)?;
967
968      if let Task::Task(task) = &task {
969        if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
970          assert_eq!(local_run.command, "echo 'Hello, World!'");
971          assert_eq!(local_run.work_dir, None);
972          assert_eq!(local_run.ignore_errors, None);
973          assert_eq!(local_run.verbose, None);
974        }
975
976        assert_eq!(task.description.len(), 0);
977        assert_eq!(task.depends_on.len(), 0);
978        assert_eq!(task.labels.len(), 0);
979        assert_eq!(task.env_file.len(), 0);
980        assert_eq!(task.environment.len(), 0);
981      } else {
982        panic!("Expected Task::Task");
983      }
984
985      Ok(())
986    }
987  }
988
989  #[test]
990  fn test_task_4() -> anyhow::Result<()> {
991    {
992      let yaml = "
993        commands:
994          - container_command:
995              - echo
996              - Hello, World!
997            image: docker.io/library/hello-world:latest
998      ";
999
1000      let task = serde_yaml::from_str::<Task>(yaml)?;
1001
1002      if let Task::Task(task) = &task {
1003        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1004          assert_eq!(container_run.container_command.len(), 2);
1005          assert_eq!(container_run.container_command[0], "echo");
1006          assert_eq!(container_run.container_command[1], "Hello, World!");
1007          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1008          assert_eq!(container_run.mounted_paths, Vec::<String>::new());
1009          assert_eq!(container_run.ignore_errors, None);
1010          assert_eq!(container_run.verbose, None);
1011        }
1012
1013        assert_eq!(task.description.len(), 0);
1014        assert_eq!(task.depends_on.len(), 0);
1015        assert_eq!(task.labels.len(), 0);
1016        assert_eq!(task.env_file.len(), 0);
1017        assert_eq!(task.environment.len(), 0);
1018      } else {
1019        panic!("Expected Task::Task");
1020      }
1021
1022      Ok(())
1023    }
1024  }
1025
1026  #[test]
1027  fn test_task_5() -> anyhow::Result<()> {
1028    {
1029      let yaml = "
1030        commands:
1031          - container_command:
1032              - echo
1033              - Hello, World!
1034            image: docker.io/library/hello-world:latest
1035            mounted_paths:
1036              - /tmp
1037              - /var/tmp
1038      ";
1039
1040      let task = serde_yaml::from_str::<Task>(yaml)?;
1041
1042      if let Task::Task(task) = &task {
1043        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1044          assert_eq!(container_run.container_command.len(), 2);
1045          assert_eq!(container_run.container_command[0], "echo");
1046          assert_eq!(container_run.container_command[1], "Hello, World!");
1047          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1048          assert_eq!(container_run.mounted_paths, vec!["/tmp", "/var/tmp"]);
1049          assert_eq!(container_run.ignore_errors, None);
1050          assert_eq!(container_run.verbose, None);
1051        }
1052
1053        assert_eq!(task.description.len(), 0);
1054        assert_eq!(task.depends_on.len(), 0);
1055        assert_eq!(task.labels.len(), 0);
1056        assert_eq!(task.env_file.len(), 0);
1057        assert_eq!(task.environment.len(), 0);
1058      } else {
1059        panic!("Expected Task::Task");
1060      }
1061
1062      Ok(())
1063    }
1064  }
1065
1066  #[test]
1067  fn test_task_6() -> anyhow::Result<()> {
1068    {
1069      let yaml = "
1070        commands:
1071          - container_command:
1072              - echo
1073              - Hello, World!
1074            image: docker.io/library/hello-world:latest
1075            mounted_paths:
1076              - /tmp
1077              - /var/tmp
1078            ignore_errors: true
1079      ";
1080
1081      let task = serde_yaml::from_str::<Task>(yaml)?;
1082
1083      if let Task::Task(task) = &task {
1084        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1085          assert_eq!(container_run.container_command.len(), 2);
1086          assert_eq!(container_run.container_command[0], "echo");
1087          assert_eq!(container_run.container_command[1], "Hello, World!");
1088          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1089          assert_eq!(container_run.mounted_paths, vec!["/tmp", "/var/tmp"]);
1090          assert_eq!(container_run.ignore_errors, Some(true));
1091          assert_eq!(container_run.verbose, None);
1092        }
1093
1094        assert_eq!(task.description.len(), 0);
1095        assert_eq!(task.depends_on.len(), 0);
1096        assert_eq!(task.labels.len(), 0);
1097        assert_eq!(task.env_file.len(), 0);
1098        assert_eq!(task.environment.len(), 0);
1099      } else {
1100        panic!("Expected Task::Task");
1101      }
1102
1103      Ok(())
1104    }
1105  }
1106
1107  #[test]
1108  fn test_task_7() -> anyhow::Result<()> {
1109    {
1110      let yaml = "
1111        commands:
1112          - container_command:
1113              - echo
1114              - Hello, World!
1115            image: docker.io/library/hello-world:latest
1116            verbose: false
1117      ";
1118
1119      let task = serde_yaml::from_str::<Task>(yaml)?;
1120
1121      if let Task::Task(task) = &task {
1122        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1123          assert_eq!(container_run.container_command.len(), 2);
1124          assert_eq!(container_run.container_command[0], "echo");
1125          assert_eq!(container_run.container_command[1], "Hello, World!");
1126          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1127          assert_eq!(container_run.mounted_paths, Vec::<String>::new());
1128          assert_eq!(container_run.ignore_errors, None);
1129          assert_eq!(container_run.verbose, Some(false));
1130        }
1131
1132        assert_eq!(task.description.len(), 0);
1133        assert_eq!(task.depends_on.len(), 0);
1134        assert_eq!(task.labels.len(), 0);
1135        assert_eq!(task.env_file.len(), 0);
1136        assert_eq!(task.environment.len(), 0);
1137      } else {
1138        panic!("Expected Task::Task");
1139      }
1140
1141      Ok(())
1142    }
1143  }
1144
1145  #[test]
1146  fn test_task_8() -> anyhow::Result<()> {
1147    {
1148      let yaml = "
1149        commands:
1150          - task: task1
1151      ";
1152
1153      let task = serde_yaml::from_str::<Task>(yaml)?;
1154
1155      if let Task::Task(task) = &task {
1156        if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1157          assert_eq!(task_run.task, "task1");
1158          assert_eq!(task_run.ignore_errors, None);
1159          assert_eq!(task_run.verbose, None);
1160        }
1161
1162        assert_eq!(task.description.len(), 0);
1163        assert_eq!(task.depends_on.len(), 0);
1164        assert_eq!(task.labels.len(), 0);
1165        assert_eq!(task.env_file.len(), 0);
1166        assert_eq!(task.environment.len(), 0);
1167      } else {
1168        panic!("Expected Task::Task");
1169      }
1170
1171      Ok(())
1172    }
1173  }
1174
1175  #[test]
1176  fn test_task_9() -> anyhow::Result<()> {
1177    {
1178      let yaml = "
1179        commands:
1180          - task: task1
1181            verbose: true
1182      ";
1183
1184      let task = serde_yaml::from_str::<Task>(yaml)?;
1185
1186      if let Task::Task(task) = &task {
1187        if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1188          assert_eq!(task_run.task, "task1");
1189          assert_eq!(task_run.ignore_errors, None);
1190          assert_eq!(task_run.verbose, Some(true));
1191        }
1192
1193        assert_eq!(task.description.len(), 0);
1194        assert_eq!(task.depends_on.len(), 0);
1195        assert_eq!(task.labels.len(), 0);
1196        assert_eq!(task.env_file.len(), 0);
1197        assert_eq!(task.environment.len(), 0);
1198      } else {
1199        panic!("Expected Task::Task");
1200      }
1201
1202      Ok(())
1203    }
1204  }
1205
1206  #[test]
1207  fn test_task_10() -> anyhow::Result<()> {
1208    {
1209      let yaml = "
1210        commands:
1211          - task: task1
1212            ignore_errors: true
1213      ";
1214
1215      let task = serde_yaml::from_str::<Task>(yaml)?;
1216
1217      if let Task::Task(task) = &task {
1218        if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1219          assert_eq!(task_run.task, "task1");
1220          assert_eq!(task_run.ignore_errors, Some(true));
1221          assert_eq!(task_run.verbose, None);
1222        }
1223
1224        assert_eq!(task.description.len(), 0);
1225        assert_eq!(task.depends_on.len(), 0);
1226        assert_eq!(task.labels.len(), 0);
1227        assert_eq!(task.env_file.len(), 0);
1228        assert_eq!(task.environment.len(), 0);
1229      } else {
1230        panic!("Expected Task::Task");
1231      }
1232
1233      Ok(())
1234    }
1235  }
1236
1237  #[test]
1238  fn test_task_11() -> anyhow::Result<()> {
1239    {
1240      let yaml = "
1241        echo 'Hello, World!'
1242      ";
1243
1244      let task = serde_yaml::from_str::<Task>(yaml)?;
1245
1246      if let Task::String(task) = &task {
1247        assert_eq!(task, "echo 'Hello, World!'");
1248      } else {
1249        panic!("Expected Task::String");
1250      }
1251
1252      Ok(())
1253    }
1254  }
1255
1256  #[test]
1257  fn test_task_12() -> anyhow::Result<()> {
1258    {
1259      let yaml = "
1260        'true'
1261      ";
1262
1263      let task = serde_yaml::from_str::<Task>(yaml)?;
1264
1265      if let Task::String(task) = &task {
1266        assert_eq!(task, "true");
1267      } else {
1268        panic!("Expected Task::String");
1269      }
1270
1271      Ok(())
1272    }
1273  }
1274
1275  #[test]
1276  fn test_task_13() -> anyhow::Result<()> {
1277    {
1278      let yaml = "
1279        commands: []
1280        environment:
1281          FOO: bar
1282          BAR: foo
1283          KEY: 42
1284          PIS: 3.14
1285      ";
1286
1287      let task = serde_yaml::from_str::<Task>(yaml)?;
1288
1289      if let Task::Task(task) = &task {
1290        assert_eq!(task.environment.len(), 4);
1291        assert_eq!(task.environment.get("FOO").unwrap(), "bar");
1292        assert_eq!(task.environment.get("BAR").unwrap(), "foo");
1293        assert_eq!(task.environment.get("KEY").unwrap(), "42");
1294      } else {
1295        panic!("Expected Task::Task");
1296      }
1297
1298      Ok(())
1299    }
1300  }
1301
1302  #[test]
1303  fn test_task_14() -> anyhow::Result<()> {
1304    let yaml = "
1305      commands: []
1306      secrets_path:
1307        - app/common
1308      vault_location: ./.mk/vault
1309      keys_location: ./.mk/keys
1310      key_name: team
1311      environment:
1312        SECRET_VALUE: ${{ secrets.app/password }}
1313    ";
1314
1315    let task = serde_yaml::from_str::<Task>(yaml)?;
1316
1317    if let Task::Task(task) = &task {
1318      assert_eq!(task.secrets_path, vec!["app/common"]);
1319      assert_eq!(task.vault_location.as_deref(), Some("./.mk/vault"));
1320      assert_eq!(task.keys_location.as_deref(), Some("./.mk/keys"));
1321      assert_eq!(task.key_name.as_deref(), Some("team"));
1322      assert_eq!(
1323        task.environment.get("SECRET_VALUE").map(String::as_str),
1324        Some("${{ secrets.app/password }}")
1325      );
1326    } else {
1327      panic!("Expected Task::Task");
1328    }
1329
1330    Ok(())
1331  }
1332
1333  #[test]
1334  fn test_parallel_interactive_rejected() -> anyhow::Result<()> {
1335    let yaml = r#"
1336          commands:
1337            - command: "echo hello"
1338              interactive: true
1339            - command: "echo world"
1340          parallel: true
1341      "#;
1342
1343    let task = serde_yaml::from_str::<Task>(yaml)?;
1344    let mut context = TaskContext::empty();
1345
1346    if let Task::Task(task) = task {
1347      let result = task.run(&mut context);
1348      assert!(result.is_err());
1349      assert!(result
1350        .unwrap_err()
1351        .to_string()
1352        .contains("Interactive local commands cannot be run in parallel"));
1353    }
1354
1355    Ok(())
1356  }
1357
1358  #[test]
1359  fn test_parallel_non_interactive_accepted() -> anyhow::Result<()> {
1360    let yaml = r#"
1361          commands:
1362            - command: "echo hello"
1363              interactive: false
1364            - command: "echo world"
1365          parallel: true
1366      "#;
1367
1368    let task = serde_yaml::from_str::<Task>(yaml)?;
1369    let mut context = TaskContext::empty();
1370
1371    if let Task::Task(task) = task {
1372      let result = task.run(&mut context);
1373      assert!(result.is_ok());
1374    }
1375
1376    Ok(())
1377  }
1378
1379  #[test]
1380  fn test_parallel_retrigger_rejected() -> anyhow::Result<()> {
1381    let yaml = r#"
1382          commands:
1383            - command: "go run ."
1384              retrigger: true
1385          parallel: true
1386      "#;
1387
1388    let task = serde_yaml::from_str::<Task>(yaml)?;
1389    let mut context = TaskContext::empty();
1390
1391    if let Task::Task(task) = task {
1392      let result = task.run(&mut context);
1393      assert!(result.is_err());
1394      assert!(result
1395        .unwrap_err()
1396        .to_string()
1397        .contains("Local commands with `retrigger: true` cannot be run in parallel"));
1398    }
1399
1400    Ok(())
1401  }
1402}