Skip to main content

mk_lib/schema/
task.rs

1use hashbrown::HashMap;
2use indicatif::{
3  HumanDuration,
4  ProgressBar,
5  ProgressStyle,
6};
7use rand::Rng as _;
8use serde::{
9  Deserialize,
10  Serialize,
11};
12
13use std::io::BufRead as _;
14use std::sync::mpsc::{
15  channel,
16  Receiver,
17  Sender,
18};
19use std::thread;
20use std::time::{
21  Duration,
22  Instant,
23};
24
25use super::{
26  contains_output_reference,
27  extract_output_references,
28  interpolate_template_string,
29  is_shell_command,
30  CommandRunner,
31  Precondition,
32  Shell,
33  TaskContext,
34  TaskDependency,
35};
36use crate::cache::{
37  compute_fingerprint,
38  expand_patterns_in_dir,
39  CacheEntry,
40};
41use crate::defaults::default_verbose;
42use crate::run_shell_command;
43use crate::secrets::load_secret_env;
44use crate::utils::{
45  deserialize_environment,
46  load_env_files_in_dir,
47  resolve_path,
48};
49
50fn default_cache_enabled() -> bool {
51  true
52}
53
54fn default_fail_fast() -> bool {
55  true
56}
57
58#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
59#[serde(rename_all = "snake_case")]
60pub enum ExecutionMode {
61  Sequential,
62  Parallel,
63}
64
65#[derive(Debug, Clone, Deserialize, Serialize)]
66pub struct TaskExecution {
67  #[serde(default)]
68  pub mode: Option<ExecutionMode>,
69
70  #[serde(default)]
71  pub max_parallel: Option<usize>,
72
73  #[serde(default = "default_fail_fast")]
74  pub fail_fast: bool,
75}
76
77#[derive(Debug, Clone, Deserialize, Serialize)]
78pub struct TaskCache {
79  #[serde(default = "default_cache_enabled")]
80  pub enabled: bool,
81}
82
83/// This struct represents a task that can be executed. A task can contain multiple
84/// commands that are executed sequentially. A task can also have preconditions that
85/// must be met before the task can be executed.
86#[derive(Debug, Default, Deserialize)]
87pub struct TaskArgs {
88  /// The commands to run
89  pub commands: Vec<CommandRunner>,
90
91  /// The preconditions that must be met before the task can be executed
92  #[serde(default)]
93  pub preconditions: Vec<Precondition>,
94
95  /// The tasks that must be executed before this task can be executed
96  #[serde(default)]
97  pub depends_on: Vec<TaskDependency>,
98
99  /// The labels for the task
100  #[serde(default)]
101  pub labels: HashMap<String, String>,
102
103  /// The description of the task
104  #[serde(default)]
105  pub description: String,
106
107  /// The environment variables to set before running the task
108  #[serde(default, deserialize_with = "deserialize_environment")]
109  pub environment: HashMap<String, String>,
110
111  /// The environment files to load before running the task
112  #[serde(default)]
113  pub env_file: Vec<String>,
114
115  /// Secret paths to load as dotenv-style environment entries before running the task
116  #[serde(default)]
117  pub secrets_path: Vec<String>,
118
119  /// The path to the secret vault
120  #[serde(default)]
121  pub vault_location: Option<String>,
122
123  /// The path to the private keys used for secret decryption
124  #[serde(default)]
125  pub keys_location: Option<String>,
126
127  /// The key name to use for secret decryption
128  #[serde(default)]
129  pub key_name: Option<String>,
130
131  /// The shell to use when running the task
132  #[serde(default)]
133  pub shell: Option<Shell>,
134
135  /// Run the commands in parallel
136  /// It should only work if the task are local_run commands
137  #[serde(default)]
138  pub parallel: Option<bool>,
139
140  /// Richer execution configuration
141  #[serde(default)]
142  pub execution: Option<TaskExecution>,
143
144  /// Task caching configuration
145  #[serde(default)]
146  pub cache: Option<TaskCache>,
147
148  /// Files or glob patterns that affect task output
149  #[serde(default)]
150  pub inputs: Vec<String>,
151
152  /// Files produced by the task
153  #[serde(default)]
154  pub outputs: Vec<String>,
155
156  /// Ignore errors if the task fails
157  #[serde(default)]
158  pub ignore_errors: Option<bool>,
159
160  /// Show verbose output
161  #[serde(default)]
162  pub verbose: Option<bool>,
163}
164
165#[derive(Debug, Deserialize)]
166#[serde(untagged)]
167pub enum Task {
168  String(String),
169  Task(Box<TaskArgs>),
170}
171
172#[derive(Debug)]
173pub struct CommandResult {
174  index: usize,
175  success: bool,
176  message: String,
177}
178
179impl Task {
180  pub fn run(&self, context: &mut TaskContext) -> anyhow::Result<()> {
181    match self {
182      Task::String(command) => self.execute(context, command),
183      Task::Task(args) => args.run(context),
184    }
185  }
186
187  fn execute(&self, context: &mut TaskContext, command: &str) -> anyhow::Result<()> {
188    assert!(!command.is_empty());
189
190    TaskArgs {
191      commands: vec![CommandRunner::CommandRun(command.to_string())],
192      ..Default::default()
193    }
194    .run(context)
195  }
196}
197
198impl TaskArgs {
199  pub fn run(&self, context: &mut TaskContext) -> anyhow::Result<()> {
200    assert!(!self.commands.is_empty());
201
202    // Validate parallel execution requirements early
203    self.validate_parallel_commands()?;
204
205    let started = Instant::now();
206    let tick_interval = Duration::from_millis(80);
207
208    if let Some(shell) = &self.shell {
209      context.set_shell(shell);
210    }
211
212    if let Some(ignore_errors) = &self.ignore_errors {
213      context.set_ignore_errors(*ignore_errors);
214    }
215
216    if let Some(verbose) = &self.verbose {
217      context.set_verbose(*verbose);
218    }
219
220    if !context.is_nested {
221      if let Some(vault_location) = &context.task_root.vault_location {
222        context.set_secret_vault_location(vault_location.clone());
223      }
224
225      if let Some(keys_location) = &context.task_root.keys_location {
226        context.set_secret_keys_location(keys_location.clone());
227      }
228
229      if let Some(key_name) = &context.task_root.key_name {
230        context.set_secret_key_name(key_name.clone());
231      }
232    }
233
234    if let Some(vault_location) = &self.vault_location {
235      context.set_secret_vault_location(vault_location.clone());
236    }
237
238    if let Some(keys_location) = &self.keys_location {
239      context.set_secret_keys_location(keys_location.clone());
240    }
241
242    if let Some(key_name) = &self.key_name {
243      context.set_secret_key_name(key_name.clone());
244    }
245
246    // Load environment variables from root and task environments and env files.
247    if !context.is_nested {
248      let config_base_dir = self.config_base_dir(context);
249      let root_env = context.task_root.environment.clone();
250      let root_env_files = load_env_files_in_dir(&context.task_root.env_file, &config_base_dir)?;
251      let root_secret_env = load_secret_env(
252        &context.task_root.secrets_path,
253        &config_base_dir,
254        context.secret_vault_location.as_deref(),
255        context.secret_keys_location.as_deref(),
256        context.secret_key_name.as_deref(),
257      )?;
258      context.extend_env_vars(root_env);
259      context.extend_env_vars(root_env_files);
260      context.extend_env_vars(root_secret_env);
261    }
262
263    // Load environment variables from the task environment and env files field
264    let defined_env = self.load_static_env(context)?;
265    let additional_env = self.load_env_file(context)?;
266    let secret_env = self.load_secret_env(context)?;
267
268    context.extend_env_vars(defined_env);
269    context.extend_env_vars(additional_env);
270    context.extend_env_vars(secret_env);
271
272    if self.should_skip_from_cache(context)? {
273      context.emit_event(&serde_json::json!({
274        "event": "task_skipped",
275        "task": context.current_task_name.clone().unwrap_or_else(|| "<task>".to_string()),
276        "reason": "cache_hit",
277      }))?;
278      return Ok(());
279    }
280
281    let mut rng = rand::thread_rng();
282    // Spinners can be found here:
283    // https://github.com/sindresorhus/cli-spinners/blob/main/spinners.json
284    let pb_style =
285      ProgressStyle::with_template("{spinner:.green} [{prefix:.bold.dim}] {wide_msg:.cyan/blue} ")?
286        .tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏⦿");
287
288    let depends_on_pb = context.multi.add(ProgressBar::new(self.depends_on.len() as u64));
289
290    if !self.depends_on.is_empty() {
291      depends_on_pb.set_style(pb_style.clone());
292      depends_on_pb.set_message("Running task dependencies...");
293      depends_on_pb.enable_steady_tick(tick_interval);
294      for (i, dependency) in self.depends_on.iter().enumerate() {
295        thread::sleep(Duration::from_millis(rng.gen_range(40..300)));
296        depends_on_pb.set_prefix(format!("{}/{}", i + 1, self.depends_on.len()));
297        dependency.run(context)?;
298        depends_on_pb.inc(1);
299      }
300
301      let message = format!("Dependencies completed in {}.", HumanDuration(started.elapsed()));
302      if context.is_nested {
303        depends_on_pb.finish_and_clear();
304      } else {
305        depends_on_pb.finish_with_message(message);
306      }
307    }
308
309    let precondition_pb = context
310      .multi
311      .add(ProgressBar::new(self.preconditions.len() as u64));
312
313    if !self.preconditions.is_empty() {
314      precondition_pb.set_style(pb_style.clone());
315      precondition_pb.set_message("Running task precondition...");
316      precondition_pb.enable_steady_tick(tick_interval);
317      for (i, precondition) in self.preconditions.iter().enumerate() {
318        thread::sleep(Duration::from_millis(rng.gen_range(40..300)));
319        precondition_pb.set_prefix(format!("{}/{}", i + 1, self.preconditions.len()));
320        precondition.execute(context)?;
321        precondition_pb.inc(1);
322      }
323
324      let message = format!("Preconditions completed in {}.", HumanDuration(started.elapsed()));
325      if context.is_nested {
326        precondition_pb.finish_and_clear();
327      } else {
328        precondition_pb.finish_with_message(message);
329      }
330    }
331
332    if self.is_parallel() {
333      self.execute_commands_parallel(context)?;
334    } else {
335      let command_pb = context.multi.add(ProgressBar::new(self.commands.len() as u64));
336      command_pb.set_style(pb_style);
337      command_pb.set_message("Running task command...");
338      command_pb.enable_steady_tick(tick_interval);
339      for (i, command) in self.commands.iter().enumerate() {
340        thread::sleep(Duration::from_millis(rng.gen_range(100..400)));
341        command_pb.set_prefix(format!("{}/{}", i + 1, self.commands.len()));
342        self.refresh_output_env(context)?;
343        command.execute(context)?;
344        command_pb.inc(1);
345      }
346
347      let message = format!("Commands completed in {}.", HumanDuration(started.elapsed()));
348      if context.is_nested {
349        command_pb.finish_and_clear();
350      } else {
351        command_pb.finish_with_message(message);
352      }
353    }
354
355    self.update_cache(context)?;
356
357    Ok(())
358  }
359
360  /// Validate if the task can be run in parallel
361  fn validate_parallel_commands(&self) -> anyhow::Result<()> {
362    if !self.is_parallel() {
363      return Ok(());
364    }
365
366    for command in &self.commands {
367      match command {
368        CommandRunner::LocalRun(local_run) if local_run.is_parallel_safe() => continue,
369        CommandRunner::LocalRun(local_run) if local_run.interactive_enabled() => {
370          return Err(anyhow::anyhow!(
371            "Interactive local commands cannot be run in parallel"
372          ))
373        },
374        CommandRunner::LocalRun(_) => {
375          return Err(anyhow::anyhow!(
376            "Local commands with `retrigger: true` cannot be run in parallel"
377          ))
378        },
379        _ => {
380          return Err(anyhow::anyhow!(
381            "Parallel execution is only supported for non-interactive local commands"
382          ))
383        },
384      }
385    }
386    Ok(())
387  }
388
389  /// Execute the commands in parallel
390  fn execute_commands_parallel(&self, context: &TaskContext) -> anyhow::Result<()> {
391    let (tx, rx): (Sender<CommandResult>, Receiver<CommandResult>) = channel();
392    let mut handles = vec![];
393    let command_count = self.commands.len();
394    let max_parallel = self.max_parallel().min(command_count.max(1));
395    let fail_fast = self.fail_fast();
396    let command_pb = context.multi.add(ProgressBar::new(command_count as u64));
397    command_pb.set_style(ProgressStyle::with_template(
398      "{spinner:.green} [{prefix:.bold.dim}] {wide_msg:.cyan/blue} ",
399    )?);
400    command_pb.set_prefix("?/?");
401    command_pb.set_message("Running task commands in parallel...");
402    command_pb.enable_steady_tick(Duration::from_millis(80));
403    let mut failures = Vec::new();
404
405    // Clone all commands upfront to avoid borrowing issues
406    let commands: Vec<_> = self.commands.to_vec();
407
408    // Track results in order
409    let mut completed = 0;
410
411    let mut iter = commands.into_iter().enumerate();
412    let mut running = 0usize;
413    let mut stop_scheduling = false;
414
415    while completed < command_count {
416      while !stop_scheduling && running < max_parallel {
417        let Some((i, command)) = iter.next() else {
418          break;
419        };
420
421        let tx = tx.clone();
422        let context = context.clone();
423
424        let handle = thread::spawn(move || {
425          let result = match command.execute(&context) {
426            Ok(_) => CommandResult {
427              index: i,
428              success: true,
429              message: format!("Command {} completed successfully", i + 1),
430            },
431            Err(e) => CommandResult {
432              index: i,
433              success: false,
434              message: format!("Command {} failed: {}", i + 1, e),
435            },
436          };
437          tx.send(result).unwrap();
438        });
439
440        handles.push(handle);
441        running += 1;
442      }
443
444      if running == 0 {
445        break;
446      }
447
448      match rx.recv() {
449        Ok(result) => {
450          running -= 1;
451          let index = result.index;
452          if !result.success && !context.ignore_errors() {
453            failures.push(result.message);
454            if fail_fast {
455              stop_scheduling = true;
456            }
457          }
458
459          completed += 1;
460          command_pb.set_prefix(format!("{}/{}", completed, command_count));
461          command_pb.inc(1);
462
463          command_pb.set_message(format!(
464            "Running task commands in parallel (completed {})",
465            index + 1
466          ));
467        },
468        Err(e) => {
469          command_pb.finish_with_message("Error receiving command results");
470          return Err(anyhow::anyhow!("Channel error: {}", e));
471        },
472      }
473    }
474
475    // Wait for all threads to complete
476    for handle in handles {
477      handle.join().unwrap();
478    }
479
480    if !failures.is_empty() {
481      command_pb.finish_with_message("Some commands failed");
482
483      // Sort failures by command index for clearer error reporting
484      failures.sort();
485      return Err(anyhow::anyhow!("Failed commands:\n{}", failures.join("\n")));
486    }
487
488    let message = "Commands completed in parallel";
489    if context.is_nested {
490      command_pb.finish_and_clear();
491    } else {
492      command_pb.finish_with_message(message);
493    }
494
495    Ok(())
496  }
497
498  fn load_static_env(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
499    let mut local_env: HashMap<String, String> = HashMap::new();
500    for (key, value) in &self.environment {
501      if contains_output_reference(value) {
502        continue;
503      }
504      let value = self.get_env_value(context, value)?;
505      local_env.insert(key.clone(), value);
506    }
507
508    Ok(local_env)
509  }
510
511  fn refresh_output_env(&self, context: &mut TaskContext) -> anyhow::Result<()> {
512    let mut updated_env = HashMap::new();
513
514    for (key, value) in &self.environment {
515      let output_refs = extract_output_references(value);
516      if output_refs.is_empty() {
517        continue;
518      }
519
520      let mut all_outputs_ready = true;
521      for output_name in &output_refs {
522        if !context.has_task_output(output_name)? {
523          all_outputs_ready = false;
524          break;
525        }
526      }
527      if !all_outputs_ready {
528        continue;
529      }
530
531      updated_env.insert(key.clone(), self.get_env_value(context, value)?);
532    }
533
534    context.extend_env_vars(updated_env);
535    Ok(())
536  }
537
538  fn load_env_file(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
539    load_env_files_in_dir(&self.env_file, &self.config_base_dir(context))
540  }
541
542  fn load_secret_env(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
543    load_secret_env(
544      &self.secrets_path,
545      &self.config_base_dir(context),
546      context.secret_vault_location.as_deref(),
547      context.secret_keys_location.as_deref(),
548      context.secret_key_name.as_deref(),
549    )
550  }
551
552  fn get_env_value(&self, context: &TaskContext, value_in: &str) -> anyhow::Result<String> {
553    if is_shell_command(value_in)? {
554      let verbose = self.verbose();
555      let mut cmd = self
556        .shell
557        .as_ref()
558        .map(|shell| shell.proc())
559        .unwrap_or_else(|| context.shell().proc());
560      let output = run_shell_command!(value_in, cmd, verbose);
561      Ok(output)
562    } else if super::is_template_command(value_in)? || contains_output_reference(value_in) {
563      Ok(interpolate_template_string(value_in, context)?)
564    } else {
565      Ok(value_in.to_string())
566    }
567  }
568
569  fn verbose(&self) -> bool {
570    self.verbose.unwrap_or(default_verbose())
571  }
572
573  pub(crate) fn execution_mode(&self) -> ExecutionMode {
574    self
575      .execution
576      .as_ref()
577      .and_then(|execution| execution.mode.clone())
578      .or_else(|| {
579        self.parallel.map(|parallel| {
580          if parallel {
581            ExecutionMode::Parallel
582          } else {
583            ExecutionMode::Sequential
584          }
585        })
586      })
587      .unwrap_or(ExecutionMode::Sequential)
588  }
589
590  pub(crate) fn is_parallel(&self) -> bool {
591    self.execution_mode().is_parallel()
592  }
593
594  pub(crate) fn max_parallel(&self) -> usize {
595    self
596      .execution
597      .as_ref()
598      .and_then(|execution| execution.max_parallel)
599      .unwrap_or(self.commands.len().max(1))
600  }
601
602  pub(crate) fn fail_fast(&self) -> bool {
603    self
604      .execution
605      .as_ref()
606      .map(|execution| execution.fail_fast)
607      .unwrap_or(true)
608  }
609
610  fn cache_enabled(&self) -> bool {
611    self.cache.as_ref().map(|cache| cache.enabled).unwrap_or(false)
612  }
613
614  fn should_skip_from_cache(&self, context: &TaskContext) -> anyhow::Result<bool> {
615    if context.force || !self.cache_enabled() || self.outputs.is_empty() {
616      return Ok(false);
617    }
618
619    let resolved_outputs = self.resolve_output_paths(context)?;
620    let outputs_exist = self
621      .resolve_output_paths(context)?
622      .iter()
623      .all(|output| output.exists());
624    if !outputs_exist {
625      return Ok(false);
626    }
627
628    let env_vars = sorted_env_vars(&context.env_vars);
629    let inputs = self.resolve_input_paths(context)?;
630    let mut env_files = self.resolve_env_file_paths(context);
631    env_files.extend(self.resolve_secret_paths(context));
632    env_files.sort();
633    env_files.dedup();
634    let fingerprint = compute_fingerprint(
635      &context
636        .current_task_name
637        .clone()
638        .unwrap_or_else(|| "<task>".to_string()),
639      &stable_task_debug(self),
640      &env_vars,
641      &inputs,
642      &env_files,
643      &resolved_outputs,
644    )?;
645
646    let store = context
647      .cache_store
648      .lock()
649      .map_err(|e| anyhow::anyhow!("Failed to lock cache store - {}", e))?;
650    let Some(entry) = store.tasks.get(&fingerprint_task_key(context, self)) else {
651      return Ok(false);
652    };
653
654    Ok(entry.fingerprint == fingerprint)
655  }
656
657  fn update_cache(&self, context: &TaskContext) -> anyhow::Result<()> {
658    if !self.cache_enabled() || self.outputs.is_empty() {
659      return Ok(());
660    }
661
662    let env_vars = sorted_env_vars(&context.env_vars);
663    let inputs = self.resolve_input_paths(context)?;
664    let mut env_files = self.resolve_env_file_paths(context);
665    env_files.extend(self.resolve_secret_paths(context));
666    env_files.sort();
667    env_files.dedup();
668    let resolved_outputs = self.resolve_output_paths(context)?;
669    let fingerprint = compute_fingerprint(
670      &context
671        .current_task_name
672        .clone()
673        .unwrap_or_else(|| "<task>".to_string()),
674      &stable_task_debug(self),
675      &env_vars,
676      &inputs,
677      &env_files,
678      &resolved_outputs,
679    )?;
680    let key = fingerprint_task_key(context, self);
681
682    {
683      let mut store = context
684        .cache_store
685        .lock()
686        .map_err(|e| anyhow::anyhow!("Failed to lock cache store - {}", e))?;
687      store.tasks.insert(
688        key,
689        CacheEntry {
690          fingerprint,
691          outputs: resolved_outputs
692            .iter()
693            .map(|path| path.to_string_lossy().into_owned())
694            .collect(),
695          updated_at: chrono::Utc::now().to_rfc3339(),
696        },
697      );
698      store.save_in_dir(&context.task_root.cache_base_dir())?;
699    }
700
701    Ok(())
702  }
703
704  pub(crate) fn config_base_dir_from_root(&self, root: &super::TaskRoot) -> std::path::PathBuf {
705    root.config_base_dir()
706  }
707
708  pub(crate) fn task_base_dir_from_root(&self, root: &super::TaskRoot) -> std::path::PathBuf {
709    let config_base_dir = self.config_base_dir_from_root(root);
710    let mut work_dirs = self
711      .commands
712      .iter()
713      .filter_map(|command| match command {
714        CommandRunner::LocalRun(local_run) => local_run.work_dir.as_ref(),
715        _ => None,
716      })
717      .map(|work_dir| resolve_path(&config_base_dir, work_dir))
718      .collect::<Vec<_>>();
719
720    work_dirs.sort();
721    work_dirs.dedup();
722
723    if work_dirs.len() == 1 {
724      work_dirs.remove(0)
725    } else {
726      config_base_dir
727    }
728  }
729
730  fn config_base_dir(&self, context: &TaskContext) -> std::path::PathBuf {
731    self.config_base_dir_from_root(&context.task_root)
732  }
733
734  fn task_base_dir(&self, context: &TaskContext) -> std::path::PathBuf {
735    self.task_base_dir_from_root(&context.task_root)
736  }
737
738  fn resolve_input_paths(&self, context: &TaskContext) -> anyhow::Result<Vec<std::path::PathBuf>> {
739    expand_patterns_in_dir(&self.task_base_dir(context), &self.inputs)
740  }
741
742  fn resolve_output_paths(&self, context: &TaskContext) -> anyhow::Result<Vec<std::path::PathBuf>> {
743    let base_dir = self.task_base_dir(context);
744    Ok(
745      self
746        .outputs
747        .iter()
748        .map(|output| resolve_path(&base_dir, output))
749        .collect(),
750    )
751  }
752
753  fn resolve_env_file_paths(&self, context: &TaskContext) -> Vec<std::path::PathBuf> {
754    let config_base_dir = self.config_base_dir(context);
755    let mut env_files = context
756      .task_root
757      .env_file
758      .iter()
759      .chain(self.env_file.iter())
760      .map(|env_file| resolve_path(&config_base_dir, env_file))
761      .collect::<Vec<_>>();
762    env_files.sort();
763    env_files.dedup();
764    env_files
765  }
766
767  fn resolve_secret_paths(&self, context: &TaskContext) -> Vec<std::path::PathBuf> {
768    let config_base_dir = self.config_base_dir(context);
769    let vault_location = context
770      .secret_vault_location
771      .as_deref()
772      .map(|path| resolve_path(&config_base_dir, path))
773      .unwrap_or_else(|| resolve_path(&config_base_dir, "./.mk/vault"));
774    let mut secret_paths = context
775      .task_root
776      .secrets_path
777      .iter()
778      .chain(self.secrets_path.iter())
779      .map(|secret_path| vault_location.join(secret_path))
780      .collect::<Vec<_>>();
781    secret_paths.sort();
782    secret_paths.dedup();
783    secret_paths
784  }
785}
786
787impl ExecutionMode {
788  fn is_parallel(&self) -> bool {
789    matches!(self, ExecutionMode::Parallel)
790  }
791}
792
793fn sorted_env_vars(env_vars: &HashMap<String, String>) -> Vec<(String, String)> {
794  let mut pairs: Vec<_> = env_vars
795    .iter()
796    .map(|(key, value)| (key.clone(), value.clone()))
797    .collect();
798  pairs.sort();
799  pairs
800}
801
802fn fingerprint_task_key(context: &TaskContext, task: &TaskArgs) -> String {
803  context.current_task_name.clone().unwrap_or_else(|| {
804    if !task.description.is_empty() {
805      task.description.clone()
806    } else {
807      format!("{:?}", task.commands)
808    }
809  })
810}
811
812fn stable_task_debug(task: &TaskArgs) -> String {
813  let mut labels: Vec<_> = task
814    .labels
815    .iter()
816    .map(|(key, value)| (key.clone(), value.clone()))
817    .collect();
818  labels.sort();
819
820  let mut environment: Vec<_> = task
821    .environment
822    .iter()
823    .map(|(key, value)| (key.clone(), value.clone()))
824    .collect();
825  environment.sort();
826
827  let mut secrets_path = task.secrets_path.clone();
828  secrets_path.sort();
829
830  format!(
831    "commands={:?};preconditions={:?};depends_on={:?};labels={:?};description={:?};environment={:?};env_file={:?};secrets_path={:?};vault_location={:?};keys_location={:?};key_name={:?};shell={:?};execution_mode={:?};max_parallel={:?};fail_fast={};cache_enabled={};inputs={:?};outputs={:?};ignore_errors={:?};verbose={:?}",
832    task.commands,
833    task.preconditions,
834    task.depends_on,
835    labels,
836    task.description,
837    environment,
838    task.env_file,
839    secrets_path,
840    task.vault_location,
841    task.keys_location,
842    task.key_name,
843    task.shell,
844    task.execution_mode(),
845    task.execution.as_ref().and_then(|execution| execution.max_parallel),
846    task.fail_fast(),
847    task.cache_enabled(),
848    task.inputs,
849    task.outputs,
850    task.ignore_errors,
851    task.verbose
852  )
853}
854
855#[cfg(test)]
856mod test {
857  use super::*;
858
859  #[test]
860  fn test_task_1() -> anyhow::Result<()> {
861    {
862      let yaml = "
863        commands:
864          - command: echo \"Hello, World!\"
865            ignore_errors: false
866            verbose: false
867        depends_on:
868          - name: task1
869        description: This is a task
870        environment:
871          FOO: bar
872        env_file:
873          - test.env
874          - test2.env
875      ";
876
877      let task = serde_yaml::from_str::<Task>(yaml)?;
878
879      if let Task::Task(task) = &task {
880        if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
881          assert_eq!(local_run.command, "echo \"Hello, World!\"");
882          assert_eq!(local_run.work_dir, None);
883          assert_eq!(local_run.ignore_errors, Some(false));
884          assert_eq!(local_run.verbose, Some(false));
885        }
886
887        if let TaskDependency::TaskDependency(args) = &task.depends_on[0] {
888          assert_eq!(args.name, "task1");
889        }
890
891        assert_eq!(task.labels.len(), 0);
892        assert_eq!(task.description, "This is a task");
893        assert_eq!(task.environment.len(), 1);
894        assert_eq!(task.env_file.len(), 2);
895      } else {
896        panic!("Expected Task::Task");
897      }
898
899      Ok(())
900    }
901  }
902
903  #[test]
904  fn test_task_2() -> anyhow::Result<()> {
905    {
906      let yaml = "
907        commands:
908          - command: echo 'Hello, World!'
909            ignore_errors: false
910            verbose: false
911        description: This is a task
912        environment:
913          FOO: bar
914          BAR: foo
915      ";
916
917      let task = serde_yaml::from_str::<Task>(yaml)?;
918
919      if let Task::Task(task) = &task {
920        if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
921          assert_eq!(local_run.command, "echo 'Hello, World!'");
922          assert_eq!(local_run.work_dir, None);
923          assert_eq!(local_run.ignore_errors, Some(false));
924          assert_eq!(local_run.verbose, Some(false));
925        }
926
927        assert_eq!(task.description, "This is a task");
928        assert_eq!(task.depends_on.len(), 0);
929        assert_eq!(task.labels.len(), 0);
930        assert_eq!(task.env_file.len(), 0);
931        assert_eq!(task.environment.len(), 2);
932      } else {
933        panic!("Expected Task::Task");
934      }
935
936      Ok(())
937    }
938  }
939
940  #[test]
941  fn test_task_3() -> anyhow::Result<()> {
942    {
943      let yaml = "
944        commands:
945          - command: echo 'Hello, World!'
946      ";
947
948      let task = serde_yaml::from_str::<Task>(yaml)?;
949
950      if let Task::Task(task) = &task {
951        if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
952          assert_eq!(local_run.command, "echo 'Hello, World!'");
953          assert_eq!(local_run.work_dir, None);
954          assert_eq!(local_run.ignore_errors, None);
955          assert_eq!(local_run.verbose, None);
956        }
957
958        assert_eq!(task.description.len(), 0);
959        assert_eq!(task.depends_on.len(), 0);
960        assert_eq!(task.labels.len(), 0);
961        assert_eq!(task.env_file.len(), 0);
962        assert_eq!(task.environment.len(), 0);
963      } else {
964        panic!("Expected Task::Task");
965      }
966
967      Ok(())
968    }
969  }
970
971  #[test]
972  fn test_task_4() -> anyhow::Result<()> {
973    {
974      let yaml = "
975        commands:
976          - container_command:
977              - echo
978              - Hello, World!
979            image: docker.io/library/hello-world:latest
980      ";
981
982      let task = serde_yaml::from_str::<Task>(yaml)?;
983
984      if let Task::Task(task) = &task {
985        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
986          assert_eq!(container_run.container_command.len(), 2);
987          assert_eq!(container_run.container_command[0], "echo");
988          assert_eq!(container_run.container_command[1], "Hello, World!");
989          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
990          assert_eq!(container_run.mounted_paths, Vec::<String>::new());
991          assert_eq!(container_run.ignore_errors, None);
992          assert_eq!(container_run.verbose, None);
993        }
994
995        assert_eq!(task.description.len(), 0);
996        assert_eq!(task.depends_on.len(), 0);
997        assert_eq!(task.labels.len(), 0);
998        assert_eq!(task.env_file.len(), 0);
999        assert_eq!(task.environment.len(), 0);
1000      } else {
1001        panic!("Expected Task::Task");
1002      }
1003
1004      Ok(())
1005    }
1006  }
1007
1008  #[test]
1009  fn test_task_5() -> anyhow::Result<()> {
1010    {
1011      let yaml = "
1012        commands:
1013          - container_command:
1014              - echo
1015              - Hello, World!
1016            image: docker.io/library/hello-world:latest
1017            mounted_paths:
1018              - /tmp
1019              - /var/tmp
1020      ";
1021
1022      let task = serde_yaml::from_str::<Task>(yaml)?;
1023
1024      if let Task::Task(task) = &task {
1025        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1026          assert_eq!(container_run.container_command.len(), 2);
1027          assert_eq!(container_run.container_command[0], "echo");
1028          assert_eq!(container_run.container_command[1], "Hello, World!");
1029          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1030          assert_eq!(container_run.mounted_paths, vec!["/tmp", "/var/tmp"]);
1031          assert_eq!(container_run.ignore_errors, None);
1032          assert_eq!(container_run.verbose, None);
1033        }
1034
1035        assert_eq!(task.description.len(), 0);
1036        assert_eq!(task.depends_on.len(), 0);
1037        assert_eq!(task.labels.len(), 0);
1038        assert_eq!(task.env_file.len(), 0);
1039        assert_eq!(task.environment.len(), 0);
1040      } else {
1041        panic!("Expected Task::Task");
1042      }
1043
1044      Ok(())
1045    }
1046  }
1047
1048  #[test]
1049  fn test_task_6() -> anyhow::Result<()> {
1050    {
1051      let yaml = "
1052        commands:
1053          - container_command:
1054              - echo
1055              - Hello, World!
1056            image: docker.io/library/hello-world:latest
1057            mounted_paths:
1058              - /tmp
1059              - /var/tmp
1060            ignore_errors: true
1061      ";
1062
1063      let task = serde_yaml::from_str::<Task>(yaml)?;
1064
1065      if let Task::Task(task) = &task {
1066        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1067          assert_eq!(container_run.container_command.len(), 2);
1068          assert_eq!(container_run.container_command[0], "echo");
1069          assert_eq!(container_run.container_command[1], "Hello, World!");
1070          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1071          assert_eq!(container_run.mounted_paths, vec!["/tmp", "/var/tmp"]);
1072          assert_eq!(container_run.ignore_errors, Some(true));
1073          assert_eq!(container_run.verbose, None);
1074        }
1075
1076        assert_eq!(task.description.len(), 0);
1077        assert_eq!(task.depends_on.len(), 0);
1078        assert_eq!(task.labels.len(), 0);
1079        assert_eq!(task.env_file.len(), 0);
1080        assert_eq!(task.environment.len(), 0);
1081      } else {
1082        panic!("Expected Task::Task");
1083      }
1084
1085      Ok(())
1086    }
1087  }
1088
1089  #[test]
1090  fn test_task_7() -> anyhow::Result<()> {
1091    {
1092      let yaml = "
1093        commands:
1094          - container_command:
1095              - echo
1096              - Hello, World!
1097            image: docker.io/library/hello-world:latest
1098            verbose: false
1099      ";
1100
1101      let task = serde_yaml::from_str::<Task>(yaml)?;
1102
1103      if let Task::Task(task) = &task {
1104        if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1105          assert_eq!(container_run.container_command.len(), 2);
1106          assert_eq!(container_run.container_command[0], "echo");
1107          assert_eq!(container_run.container_command[1], "Hello, World!");
1108          assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1109          assert_eq!(container_run.mounted_paths, Vec::<String>::new());
1110          assert_eq!(container_run.ignore_errors, None);
1111          assert_eq!(container_run.verbose, Some(false));
1112        }
1113
1114        assert_eq!(task.description.len(), 0);
1115        assert_eq!(task.depends_on.len(), 0);
1116        assert_eq!(task.labels.len(), 0);
1117        assert_eq!(task.env_file.len(), 0);
1118        assert_eq!(task.environment.len(), 0);
1119      } else {
1120        panic!("Expected Task::Task");
1121      }
1122
1123      Ok(())
1124    }
1125  }
1126
1127  #[test]
1128  fn test_task_8() -> anyhow::Result<()> {
1129    {
1130      let yaml = "
1131        commands:
1132          - task: task1
1133      ";
1134
1135      let task = serde_yaml::from_str::<Task>(yaml)?;
1136
1137      if let Task::Task(task) = &task {
1138        if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1139          assert_eq!(task_run.task, "task1");
1140          assert_eq!(task_run.ignore_errors, None);
1141          assert_eq!(task_run.verbose, None);
1142        }
1143
1144        assert_eq!(task.description.len(), 0);
1145        assert_eq!(task.depends_on.len(), 0);
1146        assert_eq!(task.labels.len(), 0);
1147        assert_eq!(task.env_file.len(), 0);
1148        assert_eq!(task.environment.len(), 0);
1149      } else {
1150        panic!("Expected Task::Task");
1151      }
1152
1153      Ok(())
1154    }
1155  }
1156
1157  #[test]
1158  fn test_task_9() -> anyhow::Result<()> {
1159    {
1160      let yaml = "
1161        commands:
1162          - task: task1
1163            verbose: true
1164      ";
1165
1166      let task = serde_yaml::from_str::<Task>(yaml)?;
1167
1168      if let Task::Task(task) = &task {
1169        if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1170          assert_eq!(task_run.task, "task1");
1171          assert_eq!(task_run.ignore_errors, None);
1172          assert_eq!(task_run.verbose, Some(true));
1173        }
1174
1175        assert_eq!(task.description.len(), 0);
1176        assert_eq!(task.depends_on.len(), 0);
1177        assert_eq!(task.labels.len(), 0);
1178        assert_eq!(task.env_file.len(), 0);
1179        assert_eq!(task.environment.len(), 0);
1180      } else {
1181        panic!("Expected Task::Task");
1182      }
1183
1184      Ok(())
1185    }
1186  }
1187
1188  #[test]
1189  fn test_task_10() -> anyhow::Result<()> {
1190    {
1191      let yaml = "
1192        commands:
1193          - task: task1
1194            ignore_errors: true
1195      ";
1196
1197      let task = serde_yaml::from_str::<Task>(yaml)?;
1198
1199      if let Task::Task(task) = &task {
1200        if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1201          assert_eq!(task_run.task, "task1");
1202          assert_eq!(task_run.ignore_errors, Some(true));
1203          assert_eq!(task_run.verbose, None);
1204        }
1205
1206        assert_eq!(task.description.len(), 0);
1207        assert_eq!(task.depends_on.len(), 0);
1208        assert_eq!(task.labels.len(), 0);
1209        assert_eq!(task.env_file.len(), 0);
1210        assert_eq!(task.environment.len(), 0);
1211      } else {
1212        panic!("Expected Task::Task");
1213      }
1214
1215      Ok(())
1216    }
1217  }
1218
1219  #[test]
1220  fn test_task_11() -> anyhow::Result<()> {
1221    {
1222      let yaml = "
1223        echo 'Hello, World!'
1224      ";
1225
1226      let task = serde_yaml::from_str::<Task>(yaml)?;
1227
1228      if let Task::String(task) = &task {
1229        assert_eq!(task, "echo 'Hello, World!'");
1230      } else {
1231        panic!("Expected Task::String");
1232      }
1233
1234      Ok(())
1235    }
1236  }
1237
1238  #[test]
1239  fn test_task_12() -> anyhow::Result<()> {
1240    {
1241      let yaml = "
1242        'true'
1243      ";
1244
1245      let task = serde_yaml::from_str::<Task>(yaml)?;
1246
1247      if let Task::String(task) = &task {
1248        assert_eq!(task, "true");
1249      } else {
1250        panic!("Expected Task::String");
1251      }
1252
1253      Ok(())
1254    }
1255  }
1256
1257  #[test]
1258  fn test_task_13() -> anyhow::Result<()> {
1259    {
1260      let yaml = "
1261        commands: []
1262        environment:
1263          FOO: bar
1264          BAR: foo
1265          KEY: 42
1266          PIS: 3.14
1267      ";
1268
1269      let task = serde_yaml::from_str::<Task>(yaml)?;
1270
1271      if let Task::Task(task) = &task {
1272        assert_eq!(task.environment.len(), 4);
1273        assert_eq!(task.environment.get("FOO").unwrap(), "bar");
1274        assert_eq!(task.environment.get("BAR").unwrap(), "foo");
1275        assert_eq!(task.environment.get("KEY").unwrap(), "42");
1276      } else {
1277        panic!("Expected Task::Task");
1278      }
1279
1280      Ok(())
1281    }
1282  }
1283
1284  #[test]
1285  fn test_task_14() -> anyhow::Result<()> {
1286    let yaml = "
1287      commands: []
1288      secrets_path:
1289        - app/common
1290      vault_location: ./.mk/vault
1291      keys_location: ./.mk/keys
1292      key_name: team
1293      environment:
1294        SECRET_VALUE: ${{ secrets.app/password }}
1295    ";
1296
1297    let task = serde_yaml::from_str::<Task>(yaml)?;
1298
1299    if let Task::Task(task) = &task {
1300      assert_eq!(task.secrets_path, vec!["app/common"]);
1301      assert_eq!(task.vault_location.as_deref(), Some("./.mk/vault"));
1302      assert_eq!(task.keys_location.as_deref(), Some("./.mk/keys"));
1303      assert_eq!(task.key_name.as_deref(), Some("team"));
1304      assert_eq!(
1305        task.environment.get("SECRET_VALUE").map(String::as_str),
1306        Some("${{ secrets.app/password }}")
1307      );
1308    } else {
1309      panic!("Expected Task::Task");
1310    }
1311
1312    Ok(())
1313  }
1314
1315  #[test]
1316  fn test_parallel_interactive_rejected() -> anyhow::Result<()> {
1317    let yaml = r#"
1318          commands:
1319            - command: "echo hello"
1320              interactive: true
1321            - command: "echo world"
1322          parallel: true
1323      "#;
1324
1325    let task = serde_yaml::from_str::<Task>(yaml)?;
1326    let mut context = TaskContext::empty();
1327
1328    if let Task::Task(task) = task {
1329      let result = task.run(&mut context);
1330      assert!(result.is_err());
1331      assert!(result
1332        .unwrap_err()
1333        .to_string()
1334        .contains("Interactive local commands cannot be run in parallel"));
1335    }
1336
1337    Ok(())
1338  }
1339
1340  #[test]
1341  fn test_parallel_non_interactive_accepted() -> anyhow::Result<()> {
1342    let yaml = r#"
1343          commands:
1344            - command: "echo hello"
1345              interactive: false
1346            - command: "echo world"
1347          parallel: true
1348      "#;
1349
1350    let task = serde_yaml::from_str::<Task>(yaml)?;
1351    let mut context = TaskContext::empty();
1352
1353    if let Task::Task(task) = task {
1354      let result = task.run(&mut context);
1355      assert!(result.is_ok());
1356    }
1357
1358    Ok(())
1359  }
1360
1361  #[test]
1362  fn test_parallel_retrigger_rejected() -> anyhow::Result<()> {
1363    let yaml = r#"
1364          commands:
1365            - command: "go run ."
1366              retrigger: true
1367          parallel: true
1368      "#;
1369
1370    let task = serde_yaml::from_str::<Task>(yaml)?;
1371    let mut context = TaskContext::empty();
1372
1373    if let Task::Task(task) = task {
1374      let result = task.run(&mut context);
1375      assert!(result.is_err());
1376      assert!(result
1377        .unwrap_err()
1378        .to_string()
1379        .contains("Local commands with `retrigger: true` cannot be run in parallel"));
1380    }
1381
1382    Ok(())
1383  }
1384}