1use hashbrown::HashMap;
2use indicatif::{
3 HumanDuration,
4 ProgressBar,
5 ProgressStyle,
6};
7use rand::Rng as _;
8use schemars::JsonSchema;
9use serde::{
10 Deserialize,
11 Serialize,
12};
13
14use std::io::BufRead as _;
15use std::sync::mpsc::{
16 channel,
17 Receiver,
18 Sender,
19};
20use std::thread;
21use std::time::{
22 Duration,
23 Instant,
24};
25
26use super::{
27 contains_output_reference,
28 extract_output_references,
29 interpolate_template_string,
30 is_shell_command,
31 CommandRunner,
32 Precondition,
33 Shell,
34 TaskContext,
35 TaskDependency,
36};
37use crate::cache::{
38 compute_fingerprint,
39 expand_patterns_in_dir,
40 CacheEntry,
41};
42use crate::defaults::default_verbose;
43use crate::run_shell_command;
44use crate::secrets::{
45 load_secret_env,
46 merge_optional_secret_settings,
47 resolve_secret_config,
48 SecretSettings,
49};
50use crate::utils::{
51 deserialize_environment,
52 load_env_files_in_dir,
53 resolve_path,
54};
55
56fn default_cache_enabled() -> bool {
57 true
58}
59
60fn default_fail_fast() -> bool {
61 true
62}
63
64#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema)]
65#[serde(rename_all = "snake_case")]
66pub enum ExecutionMode {
67 Sequential,
68 Parallel,
69}
70
71#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
72pub struct TaskExecution {
73 #[serde(default)]
74 pub mode: Option<ExecutionMode>,
75
76 #[serde(default)]
77 pub max_parallel: Option<usize>,
78
79 #[serde(default = "default_fail_fast")]
80 pub fail_fast: bool,
81}
82
83#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
84pub struct TaskCache {
85 #[serde(default = "default_cache_enabled")]
86 pub enabled: bool,
87}
88
89#[derive(Debug, Default, Deserialize, JsonSchema)]
93pub struct TaskArgs {
94 pub commands: Vec<CommandRunner>,
96
97 #[serde(default)]
99 pub preconditions: Vec<Precondition>,
100
101 #[serde(default)]
103 pub depends_on: Vec<TaskDependency>,
104
105 #[schemars(with = "std::collections::HashMap<String, String>")]
107 #[serde(default)]
108 pub labels: HashMap<String, String>,
109
110 #[serde(default)]
112 pub description: String,
113
114 #[schemars(with = "std::collections::HashMap<String, String>")]
116 #[serde(default, deserialize_with = "deserialize_environment")]
117 pub environment: HashMap<String, String>,
118
119 #[serde(default)]
121 pub env_file: Vec<String>,
122
123 #[serde(default)]
125 pub secrets_path: Vec<String>,
126
127 #[serde(default)]
129 pub secrets: Option<SecretSettings>,
130
131 #[serde(default)]
133 pub vault_location: Option<String>,
134
135 #[serde(default)]
137 pub keys_location: Option<String>,
138
139 #[serde(default)]
141 pub key_name: Option<String>,
142
143 #[serde(default)]
145 pub gpg_key_id: Option<String>,
146
147 #[serde(default)]
149 pub shell: Option<Shell>,
150
151 #[serde(default)]
154 pub parallel: Option<bool>,
155
156 #[serde(default)]
158 pub execution: Option<TaskExecution>,
159
160 #[serde(default)]
162 pub cache: Option<TaskCache>,
163
164 #[serde(default)]
166 pub inputs: Vec<String>,
167
168 #[serde(default)]
170 pub outputs: Vec<String>,
171
172 #[serde(default)]
174 pub ignore_errors: Option<bool>,
175
176 #[serde(default)]
178 pub verbose: Option<bool>,
179
180 #[schemars(skip)]
182 #[serde(skip)]
183 pub(crate) raw_legacy_secret_settings: Option<SecretSettings>,
184
185 #[schemars(skip)]
187 #[serde(skip)]
188 pub(crate) raw_secrets: Option<SecretSettings>,
189}
190
191#[derive(Debug, Deserialize, JsonSchema)]
192#[serde(untagged)]
193pub enum Task {
195 String(String),
196 Task(Box<TaskArgs>),
197}
198
199#[derive(Debug)]
200pub struct CommandResult {
201 index: usize,
202 success: bool,
203 message: String,
204}
205
206impl Task {
207 pub fn run(&self, context: &mut TaskContext) -> anyhow::Result<()> {
208 match self {
209 Task::String(command) => self.execute(context, command),
210 Task::Task(args) => args.run(context),
211 }
212 }
213
214 fn execute(&self, context: &mut TaskContext, command: &str) -> anyhow::Result<()> {
215 assert!(!command.is_empty());
216
217 TaskArgs {
218 commands: vec![CommandRunner::CommandRun(command.to_string())],
219 ..Default::default()
220 }
221 .run(context)
222 }
223}
224
225impl TaskArgs {
226 pub(crate) fn normalized_secret_settings(&self) -> Option<SecretSettings> {
227 merge_optional_secret_settings(
228 Some(SecretSettings::from_legacy(
229 self.vault_location.clone(),
230 self.keys_location.clone(),
231 self.key_name.clone(),
232 self.gpg_key_id.clone(),
233 self.secrets_path.clone(),
234 )),
235 self.secrets.clone(),
236 )
237 .filter(|settings| !settings.is_empty())
238 }
239
240 pub(crate) fn validation_legacy_secret_settings(&self) -> SecretSettings {
241 self.raw_legacy_secret_settings.clone().unwrap_or_else(|| {
242 SecretSettings::from_legacy(
243 self.vault_location.clone(),
244 self.keys_location.clone(),
245 self.key_name.clone(),
246 self.gpg_key_id.clone(),
247 self.secrets_path.clone(),
248 )
249 })
250 }
251
252 pub(crate) fn normalize_secret_settings(&mut self) {
253 if self.raw_legacy_secret_settings.is_none() {
254 self.raw_legacy_secret_settings = Some(SecretSettings::from_legacy(
255 self.vault_location.clone(),
256 self.keys_location.clone(),
257 self.key_name.clone(),
258 self.gpg_key_id.clone(),
259 self.secrets_path.clone(),
260 ));
261 }
262 if self.raw_secrets.is_none() {
263 self.raw_secrets = self.secrets.clone();
264 }
265
266 self.secrets = self.normalized_secret_settings();
267 if let Some(secrets) = &self.secrets {
268 self.vault_location = secrets.vault_location.clone();
269 self.keys_location = secrets.keys_location.clone();
270 self.key_name = secrets.key_name.clone();
271 self.gpg_key_id = secrets.gpg_key_id.clone();
272 self.secrets_path = secrets.secrets_path.clone().unwrap_or_default();
273 }
274 }
275
276 pub fn run(&self, context: &mut TaskContext) -> anyhow::Result<()> {
277 assert!(!self.commands.is_empty());
278
279 self.validate_parallel_commands()?;
281
282 let started = Instant::now();
283 let tick_interval = Duration::from_millis(80);
284
285 if let Some(shell) = &self.shell {
286 context.set_shell(shell);
287 }
288
289 if let Some(ignore_errors) = &self.ignore_errors {
290 context.set_ignore_errors(*ignore_errors);
291 }
292
293 if let Some(verbose) = &self.verbose {
294 context.set_verbose(*verbose);
295 }
296
297 let config_base_dir = self.config_base_dir(context);
298 let task_secret_config = resolve_secret_config(
299 &config_base_dir,
300 None,
301 self.secrets.as_ref(),
302 context.task_root.secrets.as_ref(),
303 );
304 context.set_secret_config(task_secret_config.clone());
305
306 if !context.is_nested {
308 let root_env = context.task_root.environment.clone();
309 let root_env_files = load_env_files_in_dir(&context.task_root.env_file, &config_base_dir)?;
310 context.extend_env_vars(root_env);
311 context.extend_env_vars(root_env_files);
312 }
313
314 let defined_env = self.load_static_env(context)?;
316 let additional_env = self.load_env_file(context)?;
317 let secret_env = self.load_secret_env(context)?;
318
319 context.extend_env_vars(defined_env);
320 context.extend_env_vars(additional_env);
321 context.extend_env_vars(secret_env);
322
323 let mut rng = rand::thread_rng();
324 let pb_style =
327 ProgressStyle::with_template("{spinner:.green} [{prefix:.bold.dim}] {wide_msg:.cyan/blue} ")?
328 .tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏⦿");
329
330 let depends_on_pb = context.multi.add(ProgressBar::new(self.depends_on.len() as u64));
331
332 if !self.depends_on.is_empty() {
333 depends_on_pb.set_style(pb_style.clone());
334 depends_on_pb.set_message("Running task dependencies...");
335 depends_on_pb.enable_steady_tick(tick_interval);
336 for (i, dependency) in self.depends_on.iter().enumerate() {
337 thread::sleep(Duration::from_millis(rng.gen_range(40..300)));
338 depends_on_pb.set_prefix(format!("{}/{}", i + 1, self.depends_on.len()));
339 dependency.run(context)?;
340 depends_on_pb.inc(1);
341 }
342
343 let message = format!("Dependencies completed in {}.", HumanDuration(started.elapsed()));
344 if context.is_nested {
345 depends_on_pb.finish_and_clear();
346 } else {
347 depends_on_pb.finish_with_message(message);
348 }
349 }
350
351 let precondition_pb = context
352 .multi
353 .add(ProgressBar::new(self.preconditions.len() as u64));
354
355 if !self.preconditions.is_empty() {
356 precondition_pb.set_style(pb_style.clone());
357 precondition_pb.set_message("Running task precondition...");
358 precondition_pb.enable_steady_tick(tick_interval);
359 for (i, precondition) in self.preconditions.iter().enumerate() {
360 thread::sleep(Duration::from_millis(rng.gen_range(40..300)));
361 precondition_pb.set_prefix(format!("{}/{}", i + 1, self.preconditions.len()));
362 precondition.execute(context)?;
363 precondition_pb.inc(1);
364 }
365
366 let message = format!("Preconditions completed in {}.", HumanDuration(started.elapsed()));
367 if context.is_nested {
368 precondition_pb.finish_and_clear();
369 } else {
370 precondition_pb.finish_with_message(message);
371 }
372 }
373
374 if self.should_skip_from_cache(context)? {
375 context.emit_event(&serde_json::json!({
376 "event": "task_skipped",
377 "task": context.current_task_name.clone().unwrap_or_else(|| "<task>".to_string()),
378 "reason": "cache_hit",
379 }))?;
380 return Ok(());
381 }
382
383 if self.is_parallel() {
384 self.execute_commands_parallel(context)?;
385 } else {
386 let command_pb = context.multi.add(ProgressBar::new(self.commands.len() as u64));
387 command_pb.set_style(pb_style);
388 command_pb.set_message("Running task command...");
389 command_pb.enable_steady_tick(tick_interval);
390 for (i, command) in self.commands.iter().enumerate() {
391 thread::sleep(Duration::from_millis(rng.gen_range(100..400)));
392 command_pb.set_prefix(format!("{}/{}", i + 1, self.commands.len()));
393 self.refresh_output_env(context)?;
394 command.execute(context)?;
395 command_pb.inc(1);
396 }
397
398 let message = format!("Commands completed in {}.", HumanDuration(started.elapsed()));
399 if context.is_nested {
400 command_pb.finish_and_clear();
401 } else {
402 command_pb.finish_with_message(message);
403 }
404 }
405
406 self.update_cache(context)?;
407
408 Ok(())
409 }
410
411 fn validate_parallel_commands(&self) -> anyhow::Result<()> {
413 if !self.is_parallel() {
414 return Ok(());
415 }
416
417 for command in &self.commands {
418 match command {
419 CommandRunner::LocalRun(local_run) if local_run.is_parallel_safe() => continue,
420 CommandRunner::LocalRun(local_run) if local_run.interactive_enabled() => {
421 return Err(anyhow::anyhow!(
422 "Interactive local commands cannot be run in parallel"
423 ))
424 },
425 CommandRunner::LocalRun(_) => {
426 return Err(anyhow::anyhow!(
427 "Local commands with `retrigger: true` cannot be run in parallel"
428 ))
429 },
430 _ => {
431 return Err(anyhow::anyhow!(
432 "Parallel execution is only supported for non-interactive local commands"
433 ))
434 },
435 }
436 }
437 Ok(())
438 }
439
440 fn execute_commands_parallel(&self, context: &TaskContext) -> anyhow::Result<()> {
442 context.clear_cancellation();
443 let (tx, rx): (Sender<CommandResult>, Receiver<CommandResult>) = channel();
444 let mut handles = vec![];
445 let command_count = self.commands.len();
446 let max_parallel = self.max_parallel().min(command_count.max(1));
447 let fail_fast = self.fail_fast();
448 let command_pb = context.multi.add(ProgressBar::new(command_count as u64));
449 command_pb.set_style(ProgressStyle::with_template(
450 "{spinner:.green} [{prefix:.bold.dim}] {wide_msg:.cyan/blue} ",
451 )?);
452 command_pb.set_prefix("?/?");
453 command_pb.set_message("Running task commands in parallel...");
454 command_pb.enable_steady_tick(Duration::from_millis(80));
455 let mut failures = Vec::new();
456
457 let commands: Vec<_> = self.commands.to_vec();
459
460 let mut completed = 0;
462
463 let mut iter = commands.into_iter().enumerate();
464 let mut running = 0usize;
465 let mut stop_scheduling = false;
466
467 while completed < command_count {
468 while !stop_scheduling && running < max_parallel {
469 let Some((i, command)) = iter.next() else {
470 break;
471 };
472
473 let tx = tx.clone();
474 let context = context.clone();
475
476 let handle = thread::spawn(move || {
477 let result = match command.execute_cancellable(&context) {
478 Ok(_) => CommandResult {
479 index: i,
480 success: true,
481 message: format!("Command {} completed successfully", i + 1),
482 },
483 Err(e) => CommandResult {
484 index: i,
485 success: false,
486 message: format!("Command {} failed: {}", i + 1, e),
487 },
488 };
489 tx.send(result).unwrap();
490 });
491
492 handles.push(handle);
493 running += 1;
494 }
495
496 if running == 0 {
497 break;
498 }
499
500 match rx.recv() {
501 Ok(result) => {
502 running -= 1;
503 let index = result.index;
504 if !result.success && !context.ignore_errors() {
505 failures.push(result.message);
506 if fail_fast {
507 stop_scheduling = true;
508 context.request_cancellation();
509 }
510 }
511
512 completed += 1;
513 command_pb.set_prefix(format!("{}/{}", completed, command_count));
514 command_pb.inc(1);
515
516 command_pb.set_message(format!(
517 "Running task commands in parallel (completed {})",
518 index + 1
519 ));
520 },
521 Err(e) => {
522 command_pb.finish_with_message("Error receiving command results");
523 return Err(anyhow::anyhow!("Channel error: {}", e));
524 },
525 }
526 }
527
528 for handle in handles {
530 handle.join().unwrap();
531 }
532
533 context.clear_cancellation();
534
535 if !failures.is_empty() {
536 command_pb.finish_with_message("Some commands failed");
537
538 failures.sort();
540 return Err(anyhow::anyhow!("Failed commands:\n{}", failures.join("\n")));
541 }
542
543 let message = "Commands completed in parallel";
544 if context.is_nested {
545 command_pb.finish_and_clear();
546 } else {
547 command_pb.finish_with_message(message);
548 }
549
550 Ok(())
551 }
552
553 fn load_static_env(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
554 let mut local_env: HashMap<String, String> = HashMap::new();
555 for (key, value) in &self.environment {
556 if contains_output_reference(value) {
557 continue;
558 }
559 let value = self.get_env_value(context, value)?;
560 local_env.insert(key.clone(), value);
561 }
562
563 Ok(local_env)
564 }
565
566 fn refresh_output_env(&self, context: &mut TaskContext) -> anyhow::Result<()> {
567 let mut updated_env = HashMap::new();
568
569 for (key, value) in &self.environment {
570 let output_refs = extract_output_references(value);
571 if output_refs.is_empty() {
572 continue;
573 }
574
575 let mut all_outputs_ready = true;
576 for output_name in &output_refs {
577 if !context.has_task_output(output_name)? {
578 all_outputs_ready = false;
579 break;
580 }
581 }
582 if !all_outputs_ready {
583 continue;
584 }
585
586 updated_env.insert(key.clone(), self.get_env_value(context, value)?);
587 }
588
589 context.extend_env_vars(updated_env);
590 Ok(())
591 }
592
593 fn load_env_file(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
594 load_env_files_in_dir(&self.env_file, &self.config_base_dir(context))
595 }
596
597 fn load_secret_env(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
598 let secret_config = context
599 .secret_config
600 .as_ref()
601 .ok_or_else(|| anyhow::anyhow!("Secret config missing from task context"))?;
602 load_secret_env(secret_config)
603 }
604
605 fn get_env_value(&self, context: &TaskContext, value_in: &str) -> anyhow::Result<String> {
606 if is_shell_command(value_in)? {
607 let verbose = self.verbose();
608 let mut cmd = self
609 .shell
610 .as_ref()
611 .map(|shell| shell.proc())
612 .unwrap_or_else(|| context.shell().proc());
613 let output = run_shell_command!(value_in, cmd, verbose);
614 Ok(output)
615 } else if super::is_template_command(value_in)? || contains_output_reference(value_in) {
616 Ok(interpolate_template_string(value_in, context)?)
617 } else {
618 Ok(value_in.to_string())
619 }
620 }
621
622 fn verbose(&self) -> bool {
623 self.verbose.unwrap_or(default_verbose())
624 }
625
626 pub(crate) fn execution_mode(&self) -> ExecutionMode {
627 self
628 .execution
629 .as_ref()
630 .and_then(|execution| execution.mode.clone())
631 .or_else(|| {
632 self.parallel.map(|parallel| {
633 if parallel {
634 ExecutionMode::Parallel
635 } else {
636 ExecutionMode::Sequential
637 }
638 })
639 })
640 .unwrap_or(ExecutionMode::Sequential)
641 }
642
643 pub(crate) fn is_parallel(&self) -> bool {
644 self.execution_mode().is_parallel()
645 }
646
647 pub(crate) fn max_parallel(&self) -> usize {
648 self
649 .execution
650 .as_ref()
651 .and_then(|execution| execution.max_parallel)
652 .unwrap_or(self.commands.len().max(1))
653 }
654
655 pub(crate) fn fail_fast(&self) -> bool {
656 self
657 .execution
658 .as_ref()
659 .map(|execution| execution.fail_fast)
660 .unwrap_or(true)
661 }
662
663 fn cache_enabled(&self) -> bool {
664 self.cache.as_ref().map(|cache| cache.enabled).unwrap_or(false)
665 }
666
667 fn should_skip_from_cache(&self, context: &TaskContext) -> anyhow::Result<bool> {
668 if context.force || !self.cache_enabled() || self.outputs.is_empty() {
669 return Ok(false);
670 }
671
672 let resolved_outputs = self.resolve_output_paths(context)?;
673 let outputs_exist = resolved_outputs.iter().all(|output| output.exists());
674 if !outputs_exist {
675 return Ok(false);
676 }
677
678 let env_vars = sorted_env_vars(&context.env_vars);
679 let inputs = self.resolve_input_paths(context)?;
680 let mut env_files = self.resolve_env_file_paths(context);
681 env_files.extend(self.resolve_secret_paths(context));
682 env_files.sort();
683 env_files.dedup();
684 let fingerprint = compute_fingerprint(
685 &context
686 .current_task_name
687 .clone()
688 .unwrap_or_else(|| "<task>".to_string()),
689 &stable_task_debug(self),
690 &env_vars,
691 &inputs,
692 &env_files,
693 &resolved_outputs,
694 )?;
695
696 let store = context
697 .cache_store
698 .lock()
699 .map_err(|e| anyhow::anyhow!("Failed to lock cache store - {}", e))?;
700 let Some(entry) = store.tasks.get(&fingerprint_task_key(context, self)) else {
701 return Ok(false);
702 };
703
704 Ok(entry.fingerprint == fingerprint)
705 }
706
707 fn update_cache(&self, context: &TaskContext) -> anyhow::Result<()> {
708 if !self.cache_enabled() || self.outputs.is_empty() {
709 return Ok(());
710 }
711
712 let env_vars = sorted_env_vars(&context.env_vars);
713 let inputs = self.resolve_input_paths(context)?;
714 let mut env_files = self.resolve_env_file_paths(context);
715 env_files.extend(self.resolve_secret_paths(context));
716 env_files.sort();
717 env_files.dedup();
718 let resolved_outputs = self.resolve_output_paths(context)?;
719 let fingerprint = compute_fingerprint(
720 &context
721 .current_task_name
722 .clone()
723 .unwrap_or_else(|| "<task>".to_string()),
724 &stable_task_debug(self),
725 &env_vars,
726 &inputs,
727 &env_files,
728 &resolved_outputs,
729 )?;
730 let key = fingerprint_task_key(context, self);
731
732 {
733 let mut store = context
734 .cache_store
735 .lock()
736 .map_err(|e| anyhow::anyhow!("Failed to lock cache store - {}", e))?;
737 store.tasks.insert(
738 key,
739 CacheEntry {
740 fingerprint,
741 outputs: resolved_outputs
742 .iter()
743 .map(|path| path.to_string_lossy().into_owned())
744 .collect(),
745 updated_at: chrono::Utc::now().to_rfc3339(),
746 },
747 );
748 store.save_in_dir(&context.task_root.cache_base_dir())?;
749 }
750
751 Ok(())
752 }
753
754 pub(crate) fn config_base_dir_from_root(&self, root: &super::TaskRoot) -> std::path::PathBuf {
755 root.config_base_dir()
756 }
757
758 pub(crate) fn task_base_dir_from_root(&self, root: &super::TaskRoot) -> std::path::PathBuf {
759 let config_base_dir = self.config_base_dir_from_root(root);
760 let mut work_dirs = self
761 .commands
762 .iter()
763 .filter_map(|command| match command {
764 CommandRunner::LocalRun(local_run) => local_run.work_dir.as_ref(),
765 _ => None,
766 })
767 .map(|work_dir| resolve_path(&config_base_dir, work_dir))
768 .collect::<Vec<_>>();
769
770 work_dirs.sort();
771 work_dirs.dedup();
772
773 if work_dirs.len() == 1 {
774 work_dirs.remove(0)
775 } else {
776 config_base_dir
777 }
778 }
779
780 fn config_base_dir(&self, context: &TaskContext) -> std::path::PathBuf {
781 self.config_base_dir_from_root(&context.task_root)
782 }
783
784 fn task_base_dir(&self, context: &TaskContext) -> std::path::PathBuf {
785 self.task_base_dir_from_root(&context.task_root)
786 }
787
788 fn resolve_input_paths(&self, context: &TaskContext) -> anyhow::Result<Vec<std::path::PathBuf>> {
789 expand_patterns_in_dir(&self.task_base_dir(context), &self.inputs)
790 }
791
792 fn resolve_output_paths(&self, context: &TaskContext) -> anyhow::Result<Vec<std::path::PathBuf>> {
793 let base_dir = self.task_base_dir(context);
794 Ok(
795 self
796 .outputs
797 .iter()
798 .map(|output| resolve_path(&base_dir, output))
799 .collect(),
800 )
801 }
802
803 fn resolve_env_file_paths(&self, context: &TaskContext) -> Vec<std::path::PathBuf> {
804 let config_base_dir = self.config_base_dir(context);
805 let mut env_files = context
806 .task_root
807 .env_file
808 .iter()
809 .chain(self.env_file.iter())
810 .map(|env_file| resolve_path(&config_base_dir, env_file))
811 .collect::<Vec<_>>();
812 env_files.sort();
813 env_files.dedup();
814 env_files
815 }
816
817 fn resolve_secret_paths(&self, context: &TaskContext) -> Vec<std::path::PathBuf> {
818 let vault_location = context
819 .secret_config
820 .as_ref()
821 .map(|config| config.vault_location.clone())
822 .unwrap_or_else(|| resolve_path(&self.config_base_dir(context), "./.mk/vault"));
823 let mut secret_paths = context
824 .task_root
825 .secrets
826 .as_ref()
827 .and_then(|settings| settings.secrets_path.as_ref())
828 .into_iter()
829 .flatten()
830 .chain(
831 context
832 .secret_config
833 .as_ref()
834 .map(|config| config.secrets_path.iter())
835 .into_iter()
836 .flatten(),
837 )
838 .map(|secret_path| vault_location.join(secret_path))
839 .collect::<Vec<_>>();
840 secret_paths.sort();
841 secret_paths.dedup();
842 secret_paths
843 }
844}
845
846impl ExecutionMode {
847 fn is_parallel(&self) -> bool {
848 matches!(self, ExecutionMode::Parallel)
849 }
850}
851
852fn sorted_env_vars(env_vars: &HashMap<String, String>) -> Vec<(String, String)> {
853 let mut pairs: Vec<_> = env_vars
854 .iter()
855 .map(|(key, value)| (key.clone(), value.clone()))
856 .collect();
857 pairs.sort();
858 pairs
859}
860
861fn fingerprint_task_key(context: &TaskContext, task: &TaskArgs) -> String {
862 context.current_task_name.clone().unwrap_or_else(|| {
863 if !task.description.is_empty() {
864 task.description.clone()
865 } else {
866 format!("{:?}", task.commands)
867 }
868 })
869}
870
871fn stable_task_debug(task: &TaskArgs) -> String {
872 let mut labels: Vec<_> = task
873 .labels
874 .iter()
875 .map(|(key, value)| (key.clone(), value.clone()))
876 .collect();
877 labels.sort();
878
879 let mut environment: Vec<_> = task
880 .environment
881 .iter()
882 .map(|(key, value)| (key.clone(), value.clone()))
883 .collect();
884 environment.sort();
885
886 let mut secrets_path = task.secrets_path.clone();
887 secrets_path.sort();
888
889 format!(
890 "commands={:?};preconditions={:?};depends_on={:?};labels={:?};description={:?};environment={:?};env_file={:?};secrets_path={:?};vault_location={:?};keys_location={:?};key_name={:?};shell={:?};execution_mode={:?};max_parallel={:?};fail_fast={};cache_enabled={};inputs={:?};outputs={:?};ignore_errors={:?};verbose={:?}",
891 task.commands,
892 task.preconditions,
893 task.depends_on,
894 labels,
895 task.description,
896 environment,
897 task.env_file,
898 secrets_path,
899 task.vault_location,
900 task.keys_location,
901 task.key_name,
902 task.shell,
903 task.execution_mode(),
904 task.execution.as_ref().and_then(|execution| execution.max_parallel),
905 task.fail_fast(),
906 task.cache_enabled(),
907 task.inputs,
908 task.outputs,
909 task.ignore_errors,
910 task.verbose
911 )
912}
913
914#[cfg(test)]
915mod test {
916 use super::*;
917
918 #[test]
919 fn test_task_1() -> anyhow::Result<()> {
920 {
921 let yaml = "
922 commands:
923 - command: echo \"Hello, World!\"
924 ignore_errors: false
925 verbose: false
926 depends_on:
927 - name: task1
928 description: This is a task
929 environment:
930 FOO: bar
931 env_file:
932 - test.env
933 - test2.env
934 ";
935
936 let task = serde_yaml::from_str::<Task>(yaml)?;
937
938 if let Task::Task(task) = &task {
939 if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
940 assert_eq!(local_run.command, "echo \"Hello, World!\"");
941 assert_eq!(local_run.work_dir, None);
942 assert_eq!(local_run.ignore_errors, Some(false));
943 assert_eq!(local_run.verbose, Some(false));
944 }
945
946 if let TaskDependency::TaskDependency(args) = &task.depends_on[0] {
947 assert_eq!(args.name, "task1");
948 }
949
950 assert_eq!(task.labels.len(), 0);
951 assert_eq!(task.description, "This is a task");
952 assert_eq!(task.environment.len(), 1);
953 assert_eq!(task.env_file.len(), 2);
954 } else {
955 panic!("Expected Task::Task");
956 }
957
958 Ok(())
959 }
960 }
961
962 #[test]
963 fn test_task_2() -> anyhow::Result<()> {
964 {
965 let yaml = "
966 commands:
967 - command: echo 'Hello, World!'
968 ignore_errors: false
969 verbose: false
970 description: This is a task
971 environment:
972 FOO: bar
973 BAR: foo
974 ";
975
976 let task = serde_yaml::from_str::<Task>(yaml)?;
977
978 if let Task::Task(task) = &task {
979 if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
980 assert_eq!(local_run.command, "echo 'Hello, World!'");
981 assert_eq!(local_run.work_dir, None);
982 assert_eq!(local_run.ignore_errors, Some(false));
983 assert_eq!(local_run.verbose, Some(false));
984 }
985
986 assert_eq!(task.description, "This is a task");
987 assert_eq!(task.depends_on.len(), 0);
988 assert_eq!(task.labels.len(), 0);
989 assert_eq!(task.env_file.len(), 0);
990 assert_eq!(task.environment.len(), 2);
991 } else {
992 panic!("Expected Task::Task");
993 }
994
995 Ok(())
996 }
997 }
998
999 #[test]
1000 fn test_task_3() -> anyhow::Result<()> {
1001 {
1002 let yaml = "
1003 commands:
1004 - command: echo 'Hello, World!'
1005 ";
1006
1007 let task = serde_yaml::from_str::<Task>(yaml)?;
1008
1009 if let Task::Task(task) = &task {
1010 if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
1011 assert_eq!(local_run.command, "echo 'Hello, World!'");
1012 assert_eq!(local_run.work_dir, None);
1013 assert_eq!(local_run.ignore_errors, None);
1014 assert_eq!(local_run.verbose, None);
1015 }
1016
1017 assert_eq!(task.description.len(), 0);
1018 assert_eq!(task.depends_on.len(), 0);
1019 assert_eq!(task.labels.len(), 0);
1020 assert_eq!(task.env_file.len(), 0);
1021 assert_eq!(task.environment.len(), 0);
1022 } else {
1023 panic!("Expected Task::Task");
1024 }
1025
1026 Ok(())
1027 }
1028 }
1029
1030 #[test]
1031 fn test_task_4() -> anyhow::Result<()> {
1032 {
1033 let yaml = "
1034 commands:
1035 - container_command:
1036 - echo
1037 - Hello, World!
1038 image: docker.io/library/hello-world:latest
1039 ";
1040
1041 let task = serde_yaml::from_str::<Task>(yaml)?;
1042
1043 if let Task::Task(task) = &task {
1044 if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1045 assert_eq!(container_run.container_command.len(), 2);
1046 assert_eq!(container_run.container_command[0], "echo");
1047 assert_eq!(container_run.container_command[1], "Hello, World!");
1048 assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1049 assert_eq!(container_run.mounted_paths, Vec::<String>::new());
1050 assert_eq!(container_run.ignore_errors, None);
1051 assert_eq!(container_run.verbose, None);
1052 }
1053
1054 assert_eq!(task.description.len(), 0);
1055 assert_eq!(task.depends_on.len(), 0);
1056 assert_eq!(task.labels.len(), 0);
1057 assert_eq!(task.env_file.len(), 0);
1058 assert_eq!(task.environment.len(), 0);
1059 } else {
1060 panic!("Expected Task::Task");
1061 }
1062
1063 Ok(())
1064 }
1065 }
1066
1067 #[test]
1068 fn test_task_5() -> anyhow::Result<()> {
1069 {
1070 let yaml = "
1071 commands:
1072 - container_command:
1073 - echo
1074 - Hello, World!
1075 image: docker.io/library/hello-world:latest
1076 mounted_paths:
1077 - /tmp
1078 - /var/tmp
1079 ";
1080
1081 let task = serde_yaml::from_str::<Task>(yaml)?;
1082
1083 if let Task::Task(task) = &task {
1084 if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1085 assert_eq!(container_run.container_command.len(), 2);
1086 assert_eq!(container_run.container_command[0], "echo");
1087 assert_eq!(container_run.container_command[1], "Hello, World!");
1088 assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1089 assert_eq!(container_run.mounted_paths, vec!["/tmp", "/var/tmp"]);
1090 assert_eq!(container_run.ignore_errors, None);
1091 assert_eq!(container_run.verbose, None);
1092 }
1093
1094 assert_eq!(task.description.len(), 0);
1095 assert_eq!(task.depends_on.len(), 0);
1096 assert_eq!(task.labels.len(), 0);
1097 assert_eq!(task.env_file.len(), 0);
1098 assert_eq!(task.environment.len(), 0);
1099 } else {
1100 panic!("Expected Task::Task");
1101 }
1102
1103 Ok(())
1104 }
1105 }
1106
1107 #[test]
1108 fn test_task_6() -> anyhow::Result<()> {
1109 {
1110 let yaml = "
1111 commands:
1112 - container_command:
1113 - echo
1114 - Hello, World!
1115 image: docker.io/library/hello-world:latest
1116 mounted_paths:
1117 - /tmp
1118 - /var/tmp
1119 ignore_errors: true
1120 ";
1121
1122 let task = serde_yaml::from_str::<Task>(yaml)?;
1123
1124 if let Task::Task(task) = &task {
1125 if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1126 assert_eq!(container_run.container_command.len(), 2);
1127 assert_eq!(container_run.container_command[0], "echo");
1128 assert_eq!(container_run.container_command[1], "Hello, World!");
1129 assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1130 assert_eq!(container_run.mounted_paths, vec!["/tmp", "/var/tmp"]);
1131 assert_eq!(container_run.ignore_errors, Some(true));
1132 assert_eq!(container_run.verbose, None);
1133 }
1134
1135 assert_eq!(task.description.len(), 0);
1136 assert_eq!(task.depends_on.len(), 0);
1137 assert_eq!(task.labels.len(), 0);
1138 assert_eq!(task.env_file.len(), 0);
1139 assert_eq!(task.environment.len(), 0);
1140 } else {
1141 panic!("Expected Task::Task");
1142 }
1143
1144 Ok(())
1145 }
1146 }
1147
1148 #[test]
1149 fn test_task_7() -> anyhow::Result<()> {
1150 {
1151 let yaml = "
1152 commands:
1153 - container_command:
1154 - echo
1155 - Hello, World!
1156 image: docker.io/library/hello-world:latest
1157 verbose: false
1158 ";
1159
1160 let task = serde_yaml::from_str::<Task>(yaml)?;
1161
1162 if let Task::Task(task) = &task {
1163 if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1164 assert_eq!(container_run.container_command.len(), 2);
1165 assert_eq!(container_run.container_command[0], "echo");
1166 assert_eq!(container_run.container_command[1], "Hello, World!");
1167 assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1168 assert_eq!(container_run.mounted_paths, Vec::<String>::new());
1169 assert_eq!(container_run.ignore_errors, None);
1170 assert_eq!(container_run.verbose, Some(false));
1171 }
1172
1173 assert_eq!(task.description.len(), 0);
1174 assert_eq!(task.depends_on.len(), 0);
1175 assert_eq!(task.labels.len(), 0);
1176 assert_eq!(task.env_file.len(), 0);
1177 assert_eq!(task.environment.len(), 0);
1178 } else {
1179 panic!("Expected Task::Task");
1180 }
1181
1182 Ok(())
1183 }
1184 }
1185
1186 #[test]
1187 fn test_task_8() -> anyhow::Result<()> {
1188 {
1189 let yaml = "
1190 commands:
1191 - task: task1
1192 ";
1193
1194 let task = serde_yaml::from_str::<Task>(yaml)?;
1195
1196 if let Task::Task(task) = &task {
1197 if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1198 assert_eq!(task_run.task, "task1");
1199 assert_eq!(task_run.ignore_errors, None);
1200 assert_eq!(task_run.verbose, None);
1201 }
1202
1203 assert_eq!(task.description.len(), 0);
1204 assert_eq!(task.depends_on.len(), 0);
1205 assert_eq!(task.labels.len(), 0);
1206 assert_eq!(task.env_file.len(), 0);
1207 assert_eq!(task.environment.len(), 0);
1208 } else {
1209 panic!("Expected Task::Task");
1210 }
1211
1212 Ok(())
1213 }
1214 }
1215
1216 #[test]
1217 fn test_task_9() -> anyhow::Result<()> {
1218 {
1219 let yaml = "
1220 commands:
1221 - task: task1
1222 verbose: true
1223 ";
1224
1225 let task = serde_yaml::from_str::<Task>(yaml)?;
1226
1227 if let Task::Task(task) = &task {
1228 if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1229 assert_eq!(task_run.task, "task1");
1230 assert_eq!(task_run.ignore_errors, None);
1231 assert_eq!(task_run.verbose, Some(true));
1232 }
1233
1234 assert_eq!(task.description.len(), 0);
1235 assert_eq!(task.depends_on.len(), 0);
1236 assert_eq!(task.labels.len(), 0);
1237 assert_eq!(task.env_file.len(), 0);
1238 assert_eq!(task.environment.len(), 0);
1239 } else {
1240 panic!("Expected Task::Task");
1241 }
1242
1243 Ok(())
1244 }
1245 }
1246
1247 #[test]
1248 fn test_task_10() -> anyhow::Result<()> {
1249 {
1250 let yaml = "
1251 commands:
1252 - task: task1
1253 ignore_errors: true
1254 ";
1255
1256 let task = serde_yaml::from_str::<Task>(yaml)?;
1257
1258 if let Task::Task(task) = &task {
1259 if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1260 assert_eq!(task_run.task, "task1");
1261 assert_eq!(task_run.ignore_errors, Some(true));
1262 assert_eq!(task_run.verbose, None);
1263 }
1264
1265 assert_eq!(task.description.len(), 0);
1266 assert_eq!(task.depends_on.len(), 0);
1267 assert_eq!(task.labels.len(), 0);
1268 assert_eq!(task.env_file.len(), 0);
1269 assert_eq!(task.environment.len(), 0);
1270 } else {
1271 panic!("Expected Task::Task");
1272 }
1273
1274 Ok(())
1275 }
1276 }
1277
1278 #[test]
1279 fn test_task_11() -> anyhow::Result<()> {
1280 {
1281 let yaml = "
1282 echo 'Hello, World!'
1283 ";
1284
1285 let task = serde_yaml::from_str::<Task>(yaml)?;
1286
1287 if let Task::String(task) = &task {
1288 assert_eq!(task, "echo 'Hello, World!'");
1289 } else {
1290 panic!("Expected Task::String");
1291 }
1292
1293 Ok(())
1294 }
1295 }
1296
1297 #[test]
1298 fn test_task_12() -> anyhow::Result<()> {
1299 {
1300 let yaml = "
1301 'true'
1302 ";
1303
1304 let task = serde_yaml::from_str::<Task>(yaml)?;
1305
1306 if let Task::String(task) = &task {
1307 assert_eq!(task, "true");
1308 } else {
1309 panic!("Expected Task::String");
1310 }
1311
1312 Ok(())
1313 }
1314 }
1315
1316 #[test]
1317 fn test_task_13() -> anyhow::Result<()> {
1318 {
1319 let yaml = "
1320 commands: []
1321 environment:
1322 FOO: bar
1323 BAR: foo
1324 KEY: 42
1325 PIS: 3.14
1326 ";
1327
1328 let task = serde_yaml::from_str::<Task>(yaml)?;
1329
1330 if let Task::Task(task) = &task {
1331 assert_eq!(task.environment.len(), 4);
1332 assert_eq!(task.environment.get("FOO").unwrap(), "bar");
1333 assert_eq!(task.environment.get("BAR").unwrap(), "foo");
1334 assert_eq!(task.environment.get("KEY").unwrap(), "42");
1335 } else {
1336 panic!("Expected Task::Task");
1337 }
1338
1339 Ok(())
1340 }
1341 }
1342
1343 #[test]
1344 fn test_task_14() -> anyhow::Result<()> {
1345 let yaml = "
1346 commands: []
1347 secrets_path:
1348 - app/common
1349 vault_location: ./.mk/vault
1350 keys_location: ./.mk/keys
1351 key_name: team
1352 environment:
1353 SECRET_VALUE: ${{ secrets.app/password }}
1354 ";
1355
1356 let task = serde_yaml::from_str::<Task>(yaml)?;
1357
1358 if let Task::Task(task) = &task {
1359 assert_eq!(task.secrets_path, vec!["app/common"]);
1360 assert_eq!(task.vault_location.as_deref(), Some("./.mk/vault"));
1361 assert_eq!(task.keys_location.as_deref(), Some("./.mk/keys"));
1362 assert_eq!(task.key_name.as_deref(), Some("team"));
1363 assert_eq!(
1364 task.environment.get("SECRET_VALUE").map(String::as_str),
1365 Some("${{ secrets.app/password }}")
1366 );
1367 } else {
1368 panic!("Expected Task::Task");
1369 }
1370
1371 Ok(())
1372 }
1373
1374 #[test]
1375 fn test_parallel_interactive_rejected() -> anyhow::Result<()> {
1376 let yaml = r#"
1377 commands:
1378 - command: "echo hello"
1379 interactive: true
1380 - command: "echo world"
1381 parallel: true
1382 "#;
1383
1384 let task = serde_yaml::from_str::<Task>(yaml)?;
1385 let mut context = TaskContext::empty();
1386
1387 if let Task::Task(task) = task {
1388 let result = task.run(&mut context);
1389 assert!(result.is_err());
1390 assert!(result
1391 .unwrap_err()
1392 .to_string()
1393 .contains("Interactive local commands cannot be run in parallel"));
1394 }
1395
1396 Ok(())
1397 }
1398
1399 #[test]
1400 fn test_parallel_non_interactive_accepted() -> anyhow::Result<()> {
1401 let yaml = r#"
1402 commands:
1403 - command: "echo hello"
1404 interactive: false
1405 - command: "echo world"
1406 parallel: true
1407 "#;
1408
1409 let task = serde_yaml::from_str::<Task>(yaml)?;
1410 let mut context = TaskContext::empty();
1411
1412 if let Task::Task(task) = task {
1413 let result = task.run(&mut context);
1414 assert!(result.is_ok());
1415 }
1416
1417 Ok(())
1418 }
1419
1420 #[test]
1421 fn test_parallel_retrigger_rejected() -> anyhow::Result<()> {
1422 let yaml = r#"
1423 commands:
1424 - command: "go run ."
1425 retrigger: true
1426 parallel: true
1427 "#;
1428
1429 let task = serde_yaml::from_str::<Task>(yaml)?;
1430 let mut context = TaskContext::empty();
1431
1432 if let Task::Task(task) = task {
1433 let result = task.run(&mut context);
1434 assert!(result.is_err());
1435 assert!(result
1436 .unwrap_err()
1437 .to_string()
1438 .contains("Local commands with `retrigger: true` cannot be run in parallel"));
1439 }
1440
1441 Ok(())
1442 }
1443}