1use hashbrown::HashMap;
2use indicatif::{
3 HumanDuration,
4 ProgressBar,
5 ProgressStyle,
6};
7use rand::Rng as _;
8use schemars::JsonSchema;
9use serde::{
10 Deserialize,
11 Serialize,
12};
13
14use std::io::BufRead as _;
15use std::sync::mpsc::{
16 channel,
17 Receiver,
18 Sender,
19};
20use std::thread;
21use std::time::{
22 Duration,
23 Instant,
24};
25
26use super::{
27 contains_output_reference,
28 extract_output_references,
29 interpolate_template_string,
30 is_shell_command,
31 CommandRunner,
32 Precondition,
33 Shell,
34 TaskContext,
35 TaskDependency,
36};
37use crate::cache::{
38 compute_fingerprint,
39 expand_patterns_in_dir,
40 CacheEntry,
41};
42use crate::defaults::default_verbose;
43use crate::run_shell_command;
44use crate::secrets::load_secret_env;
45use crate::utils::{
46 deserialize_environment,
47 load_env_files_in_dir,
48 resolve_path,
49};
50
51fn default_cache_enabled() -> bool {
52 true
53}
54
55fn default_fail_fast() -> bool {
56 true
57}
58
59#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema)]
60#[serde(rename_all = "snake_case")]
61pub enum ExecutionMode {
62 Sequential,
63 Parallel,
64}
65
66#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
67pub struct TaskExecution {
68 #[serde(default)]
69 pub mode: Option<ExecutionMode>,
70
71 #[serde(default)]
72 pub max_parallel: Option<usize>,
73
74 #[serde(default = "default_fail_fast")]
75 pub fail_fast: bool,
76}
77
78#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
79pub struct TaskCache {
80 #[serde(default = "default_cache_enabled")]
81 pub enabled: bool,
82}
83
84#[derive(Debug, Default, Deserialize, JsonSchema)]
88pub struct TaskArgs {
89 pub commands: Vec<CommandRunner>,
91
92 #[serde(default)]
94 pub preconditions: Vec<Precondition>,
95
96 #[serde(default)]
98 pub depends_on: Vec<TaskDependency>,
99
100 #[schemars(with = "std::collections::HashMap<String, String>")]
102 #[serde(default)]
103 pub labels: HashMap<String, String>,
104
105 #[serde(default)]
107 pub description: String,
108
109 #[schemars(with = "std::collections::HashMap<String, String>")]
111 #[serde(default, deserialize_with = "deserialize_environment")]
112 pub environment: HashMap<String, String>,
113
114 #[serde(default)]
116 pub env_file: Vec<String>,
117
118 #[serde(default)]
120 pub secrets_path: Vec<String>,
121
122 #[serde(default)]
124 pub vault_location: Option<String>,
125
126 #[serde(default)]
128 pub keys_location: Option<String>,
129
130 #[serde(default)]
132 pub key_name: Option<String>,
133
134 #[serde(default)]
136 pub gpg_key_id: Option<String>,
137
138 #[serde(default)]
140 pub shell: Option<Shell>,
141
142 #[serde(default)]
145 pub parallel: Option<bool>,
146
147 #[serde(default)]
149 pub execution: Option<TaskExecution>,
150
151 #[serde(default)]
153 pub cache: Option<TaskCache>,
154
155 #[serde(default)]
157 pub inputs: Vec<String>,
158
159 #[serde(default)]
161 pub outputs: Vec<String>,
162
163 #[serde(default)]
165 pub ignore_errors: Option<bool>,
166
167 #[serde(default)]
169 pub verbose: Option<bool>,
170}
171
172#[derive(Debug, Deserialize, JsonSchema)]
173#[serde(untagged)]
174pub enum Task {
176 String(String),
177 Task(Box<TaskArgs>),
178}
179
180#[derive(Debug)]
181pub struct CommandResult {
182 index: usize,
183 success: bool,
184 message: String,
185}
186
187impl Task {
188 pub fn run(&self, context: &mut TaskContext) -> anyhow::Result<()> {
189 match self {
190 Task::String(command) => self.execute(context, command),
191 Task::Task(args) => args.run(context),
192 }
193 }
194
195 fn execute(&self, context: &mut TaskContext, command: &str) -> anyhow::Result<()> {
196 assert!(!command.is_empty());
197
198 TaskArgs {
199 commands: vec![CommandRunner::CommandRun(command.to_string())],
200 ..Default::default()
201 }
202 .run(context)
203 }
204}
205
206impl TaskArgs {
207 pub fn run(&self, context: &mut TaskContext) -> anyhow::Result<()> {
208 assert!(!self.commands.is_empty());
209
210 self.validate_parallel_commands()?;
212
213 let started = Instant::now();
214 let tick_interval = Duration::from_millis(80);
215
216 if let Some(shell) = &self.shell {
217 context.set_shell(shell);
218 }
219
220 if let Some(ignore_errors) = &self.ignore_errors {
221 context.set_ignore_errors(*ignore_errors);
222 }
223
224 if let Some(verbose) = &self.verbose {
225 context.set_verbose(*verbose);
226 }
227
228 if !context.is_nested {
229 if let Some(vault_location) = &context.task_root.vault_location {
230 context.set_secret_vault_location(vault_location.clone());
231 }
232
233 if let Some(keys_location) = &context.task_root.keys_location {
234 context.set_secret_keys_location(keys_location.clone());
235 }
236
237 if let Some(key_name) = &context.task_root.key_name {
238 context.set_secret_key_name(key_name.clone());
239 }
240
241 if let Some(gpg_key_id) = &context.task_root.gpg_key_id {
242 context.set_secret_gpg_key_id(gpg_key_id.clone());
243 }
244 }
245
246 if let Some(vault_location) = &self.vault_location {
247 context.set_secret_vault_location(vault_location.clone());
248 }
249
250 if let Some(keys_location) = &self.keys_location {
251 context.set_secret_keys_location(keys_location.clone());
252 }
253
254 if let Some(key_name) = &self.key_name {
255 context.set_secret_key_name(key_name.clone());
256 }
257
258 if let Some(gpg_key_id) = &self.gpg_key_id {
259 context.set_secret_gpg_key_id(gpg_key_id.clone());
260 }
261
262 if !context.is_nested {
264 let config_base_dir = self.config_base_dir(context);
265 let root_env = context.task_root.environment.clone();
266 let root_env_files = load_env_files_in_dir(&context.task_root.env_file, &config_base_dir)?;
267 let root_secret_env = load_secret_env(
268 &context.task_root.secrets_path,
269 &config_base_dir,
270 context.secret_vault_location.as_deref(),
271 context.secret_keys_location.as_deref(),
272 context.secret_key_name.as_deref(),
273 context.secret_gpg_key_id.as_deref(),
274 )?;
275 context.extend_env_vars(root_env);
276 context.extend_env_vars(root_env_files);
277 context.extend_env_vars(root_secret_env);
278 }
279
280 let defined_env = self.load_static_env(context)?;
282 let additional_env = self.load_env_file(context)?;
283 let secret_env = self.load_secret_env(context)?;
284
285 context.extend_env_vars(defined_env);
286 context.extend_env_vars(additional_env);
287 context.extend_env_vars(secret_env);
288
289 if self.should_skip_from_cache(context)? {
290 context.emit_event(&serde_json::json!({
291 "event": "task_skipped",
292 "task": context.current_task_name.clone().unwrap_or_else(|| "<task>".to_string()),
293 "reason": "cache_hit",
294 }))?;
295 return Ok(());
296 }
297
298 let mut rng = rand::thread_rng();
299 let pb_style =
302 ProgressStyle::with_template("{spinner:.green} [{prefix:.bold.dim}] {wide_msg:.cyan/blue} ")?
303 .tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏⦿");
304
305 let depends_on_pb = context.multi.add(ProgressBar::new(self.depends_on.len() as u64));
306
307 if !self.depends_on.is_empty() {
308 depends_on_pb.set_style(pb_style.clone());
309 depends_on_pb.set_message("Running task dependencies...");
310 depends_on_pb.enable_steady_tick(tick_interval);
311 for (i, dependency) in self.depends_on.iter().enumerate() {
312 thread::sleep(Duration::from_millis(rng.gen_range(40..300)));
313 depends_on_pb.set_prefix(format!("{}/{}", i + 1, self.depends_on.len()));
314 dependency.run(context)?;
315 depends_on_pb.inc(1);
316 }
317
318 let message = format!("Dependencies completed in {}.", HumanDuration(started.elapsed()));
319 if context.is_nested {
320 depends_on_pb.finish_and_clear();
321 } else {
322 depends_on_pb.finish_with_message(message);
323 }
324 }
325
326 let precondition_pb = context
327 .multi
328 .add(ProgressBar::new(self.preconditions.len() as u64));
329
330 if !self.preconditions.is_empty() {
331 precondition_pb.set_style(pb_style.clone());
332 precondition_pb.set_message("Running task precondition...");
333 precondition_pb.enable_steady_tick(tick_interval);
334 for (i, precondition) in self.preconditions.iter().enumerate() {
335 thread::sleep(Duration::from_millis(rng.gen_range(40..300)));
336 precondition_pb.set_prefix(format!("{}/{}", i + 1, self.preconditions.len()));
337 precondition.execute(context)?;
338 precondition_pb.inc(1);
339 }
340
341 let message = format!("Preconditions completed in {}.", HumanDuration(started.elapsed()));
342 if context.is_nested {
343 precondition_pb.finish_and_clear();
344 } else {
345 precondition_pb.finish_with_message(message);
346 }
347 }
348
349 if self.is_parallel() {
350 self.execute_commands_parallel(context)?;
351 } else {
352 let command_pb = context.multi.add(ProgressBar::new(self.commands.len() as u64));
353 command_pb.set_style(pb_style);
354 command_pb.set_message("Running task command...");
355 command_pb.enable_steady_tick(tick_interval);
356 for (i, command) in self.commands.iter().enumerate() {
357 thread::sleep(Duration::from_millis(rng.gen_range(100..400)));
358 command_pb.set_prefix(format!("{}/{}", i + 1, self.commands.len()));
359 self.refresh_output_env(context)?;
360 command.execute(context)?;
361 command_pb.inc(1);
362 }
363
364 let message = format!("Commands completed in {}.", HumanDuration(started.elapsed()));
365 if context.is_nested {
366 command_pb.finish_and_clear();
367 } else {
368 command_pb.finish_with_message(message);
369 }
370 }
371
372 self.update_cache(context)?;
373
374 Ok(())
375 }
376
377 fn validate_parallel_commands(&self) -> anyhow::Result<()> {
379 if !self.is_parallel() {
380 return Ok(());
381 }
382
383 for command in &self.commands {
384 match command {
385 CommandRunner::LocalRun(local_run) if local_run.is_parallel_safe() => continue,
386 CommandRunner::LocalRun(local_run) if local_run.interactive_enabled() => {
387 return Err(anyhow::anyhow!(
388 "Interactive local commands cannot be run in parallel"
389 ))
390 },
391 CommandRunner::LocalRun(_) => {
392 return Err(anyhow::anyhow!(
393 "Local commands with `retrigger: true` cannot be run in parallel"
394 ))
395 },
396 _ => {
397 return Err(anyhow::anyhow!(
398 "Parallel execution is only supported for non-interactive local commands"
399 ))
400 },
401 }
402 }
403 Ok(())
404 }
405
406 fn execute_commands_parallel(&self, context: &TaskContext) -> anyhow::Result<()> {
408 let (tx, rx): (Sender<CommandResult>, Receiver<CommandResult>) = channel();
409 let mut handles = vec![];
410 let command_count = self.commands.len();
411 let max_parallel = self.max_parallel().min(command_count.max(1));
412 let fail_fast = self.fail_fast();
413 let command_pb = context.multi.add(ProgressBar::new(command_count as u64));
414 command_pb.set_style(ProgressStyle::with_template(
415 "{spinner:.green} [{prefix:.bold.dim}] {wide_msg:.cyan/blue} ",
416 )?);
417 command_pb.set_prefix("?/?");
418 command_pb.set_message("Running task commands in parallel...");
419 command_pb.enable_steady_tick(Duration::from_millis(80));
420 let mut failures = Vec::new();
421
422 let commands: Vec<_> = self.commands.to_vec();
424
425 let mut completed = 0;
427
428 let mut iter = commands.into_iter().enumerate();
429 let mut running = 0usize;
430 let mut stop_scheduling = false;
431
432 while completed < command_count {
433 while !stop_scheduling && running < max_parallel {
434 let Some((i, command)) = iter.next() else {
435 break;
436 };
437
438 let tx = tx.clone();
439 let context = context.clone();
440
441 let handle = thread::spawn(move || {
442 let result = match command.execute(&context) {
443 Ok(_) => CommandResult {
444 index: i,
445 success: true,
446 message: format!("Command {} completed successfully", i + 1),
447 },
448 Err(e) => CommandResult {
449 index: i,
450 success: false,
451 message: format!("Command {} failed: {}", i + 1, e),
452 },
453 };
454 tx.send(result).unwrap();
455 });
456
457 handles.push(handle);
458 running += 1;
459 }
460
461 if running == 0 {
462 break;
463 }
464
465 match rx.recv() {
466 Ok(result) => {
467 running -= 1;
468 let index = result.index;
469 if !result.success && !context.ignore_errors() {
470 failures.push(result.message);
471 if fail_fast {
472 stop_scheduling = true;
473 }
474 }
475
476 completed += 1;
477 command_pb.set_prefix(format!("{}/{}", completed, command_count));
478 command_pb.inc(1);
479
480 command_pb.set_message(format!(
481 "Running task commands in parallel (completed {})",
482 index + 1
483 ));
484 },
485 Err(e) => {
486 command_pb.finish_with_message("Error receiving command results");
487 return Err(anyhow::anyhow!("Channel error: {}", e));
488 },
489 }
490 }
491
492 for handle in handles {
494 handle.join().unwrap();
495 }
496
497 if !failures.is_empty() {
498 command_pb.finish_with_message("Some commands failed");
499
500 failures.sort();
502 return Err(anyhow::anyhow!("Failed commands:\n{}", failures.join("\n")));
503 }
504
505 let message = "Commands completed in parallel";
506 if context.is_nested {
507 command_pb.finish_and_clear();
508 } else {
509 command_pb.finish_with_message(message);
510 }
511
512 Ok(())
513 }
514
515 fn load_static_env(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
516 let mut local_env: HashMap<String, String> = HashMap::new();
517 for (key, value) in &self.environment {
518 if contains_output_reference(value) {
519 continue;
520 }
521 let value = self.get_env_value(context, value)?;
522 local_env.insert(key.clone(), value);
523 }
524
525 Ok(local_env)
526 }
527
528 fn refresh_output_env(&self, context: &mut TaskContext) -> anyhow::Result<()> {
529 let mut updated_env = HashMap::new();
530
531 for (key, value) in &self.environment {
532 let output_refs = extract_output_references(value);
533 if output_refs.is_empty() {
534 continue;
535 }
536
537 let mut all_outputs_ready = true;
538 for output_name in &output_refs {
539 if !context.has_task_output(output_name)? {
540 all_outputs_ready = false;
541 break;
542 }
543 }
544 if !all_outputs_ready {
545 continue;
546 }
547
548 updated_env.insert(key.clone(), self.get_env_value(context, value)?);
549 }
550
551 context.extend_env_vars(updated_env);
552 Ok(())
553 }
554
555 fn load_env_file(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
556 load_env_files_in_dir(&self.env_file, &self.config_base_dir(context))
557 }
558
559 fn load_secret_env(&self, context: &TaskContext) -> anyhow::Result<HashMap<String, String>> {
560 load_secret_env(
561 &self.secrets_path,
562 &self.config_base_dir(context),
563 context.secret_vault_location.as_deref(),
564 context.secret_keys_location.as_deref(),
565 context.secret_key_name.as_deref(),
566 context.secret_gpg_key_id.as_deref(),
567 )
568 }
569
570 fn get_env_value(&self, context: &TaskContext, value_in: &str) -> anyhow::Result<String> {
571 if is_shell_command(value_in)? {
572 let verbose = self.verbose();
573 let mut cmd = self
574 .shell
575 .as_ref()
576 .map(|shell| shell.proc())
577 .unwrap_or_else(|| context.shell().proc());
578 let output = run_shell_command!(value_in, cmd, verbose);
579 Ok(output)
580 } else if super::is_template_command(value_in)? || contains_output_reference(value_in) {
581 Ok(interpolate_template_string(value_in, context)?)
582 } else {
583 Ok(value_in.to_string())
584 }
585 }
586
587 fn verbose(&self) -> bool {
588 self.verbose.unwrap_or(default_verbose())
589 }
590
591 pub(crate) fn execution_mode(&self) -> ExecutionMode {
592 self
593 .execution
594 .as_ref()
595 .and_then(|execution| execution.mode.clone())
596 .or_else(|| {
597 self.parallel.map(|parallel| {
598 if parallel {
599 ExecutionMode::Parallel
600 } else {
601 ExecutionMode::Sequential
602 }
603 })
604 })
605 .unwrap_or(ExecutionMode::Sequential)
606 }
607
608 pub(crate) fn is_parallel(&self) -> bool {
609 self.execution_mode().is_parallel()
610 }
611
612 pub(crate) fn max_parallel(&self) -> usize {
613 self
614 .execution
615 .as_ref()
616 .and_then(|execution| execution.max_parallel)
617 .unwrap_or(self.commands.len().max(1))
618 }
619
620 pub(crate) fn fail_fast(&self) -> bool {
621 self
622 .execution
623 .as_ref()
624 .map(|execution| execution.fail_fast)
625 .unwrap_or(true)
626 }
627
628 fn cache_enabled(&self) -> bool {
629 self.cache.as_ref().map(|cache| cache.enabled).unwrap_or(false)
630 }
631
632 fn should_skip_from_cache(&self, context: &TaskContext) -> anyhow::Result<bool> {
633 if context.force || !self.cache_enabled() || self.outputs.is_empty() {
634 return Ok(false);
635 }
636
637 let resolved_outputs = self.resolve_output_paths(context)?;
638 let outputs_exist = self
639 .resolve_output_paths(context)?
640 .iter()
641 .all(|output| output.exists());
642 if !outputs_exist {
643 return Ok(false);
644 }
645
646 let env_vars = sorted_env_vars(&context.env_vars);
647 let inputs = self.resolve_input_paths(context)?;
648 let mut env_files = self.resolve_env_file_paths(context);
649 env_files.extend(self.resolve_secret_paths(context));
650 env_files.sort();
651 env_files.dedup();
652 let fingerprint = compute_fingerprint(
653 &context
654 .current_task_name
655 .clone()
656 .unwrap_or_else(|| "<task>".to_string()),
657 &stable_task_debug(self),
658 &env_vars,
659 &inputs,
660 &env_files,
661 &resolved_outputs,
662 )?;
663
664 let store = context
665 .cache_store
666 .lock()
667 .map_err(|e| anyhow::anyhow!("Failed to lock cache store - {}", e))?;
668 let Some(entry) = store.tasks.get(&fingerprint_task_key(context, self)) else {
669 return Ok(false);
670 };
671
672 Ok(entry.fingerprint == fingerprint)
673 }
674
675 fn update_cache(&self, context: &TaskContext) -> anyhow::Result<()> {
676 if !self.cache_enabled() || self.outputs.is_empty() {
677 return Ok(());
678 }
679
680 let env_vars = sorted_env_vars(&context.env_vars);
681 let inputs = self.resolve_input_paths(context)?;
682 let mut env_files = self.resolve_env_file_paths(context);
683 env_files.extend(self.resolve_secret_paths(context));
684 env_files.sort();
685 env_files.dedup();
686 let resolved_outputs = self.resolve_output_paths(context)?;
687 let fingerprint = compute_fingerprint(
688 &context
689 .current_task_name
690 .clone()
691 .unwrap_or_else(|| "<task>".to_string()),
692 &stable_task_debug(self),
693 &env_vars,
694 &inputs,
695 &env_files,
696 &resolved_outputs,
697 )?;
698 let key = fingerprint_task_key(context, self);
699
700 {
701 let mut store = context
702 .cache_store
703 .lock()
704 .map_err(|e| anyhow::anyhow!("Failed to lock cache store - {}", e))?;
705 store.tasks.insert(
706 key,
707 CacheEntry {
708 fingerprint,
709 outputs: resolved_outputs
710 .iter()
711 .map(|path| path.to_string_lossy().into_owned())
712 .collect(),
713 updated_at: chrono::Utc::now().to_rfc3339(),
714 },
715 );
716 store.save_in_dir(&context.task_root.cache_base_dir())?;
717 }
718
719 Ok(())
720 }
721
722 pub(crate) fn config_base_dir_from_root(&self, root: &super::TaskRoot) -> std::path::PathBuf {
723 root.config_base_dir()
724 }
725
726 pub(crate) fn task_base_dir_from_root(&self, root: &super::TaskRoot) -> std::path::PathBuf {
727 let config_base_dir = self.config_base_dir_from_root(root);
728 let mut work_dirs = self
729 .commands
730 .iter()
731 .filter_map(|command| match command {
732 CommandRunner::LocalRun(local_run) => local_run.work_dir.as_ref(),
733 _ => None,
734 })
735 .map(|work_dir| resolve_path(&config_base_dir, work_dir))
736 .collect::<Vec<_>>();
737
738 work_dirs.sort();
739 work_dirs.dedup();
740
741 if work_dirs.len() == 1 {
742 work_dirs.remove(0)
743 } else {
744 config_base_dir
745 }
746 }
747
748 fn config_base_dir(&self, context: &TaskContext) -> std::path::PathBuf {
749 self.config_base_dir_from_root(&context.task_root)
750 }
751
752 fn task_base_dir(&self, context: &TaskContext) -> std::path::PathBuf {
753 self.task_base_dir_from_root(&context.task_root)
754 }
755
756 fn resolve_input_paths(&self, context: &TaskContext) -> anyhow::Result<Vec<std::path::PathBuf>> {
757 expand_patterns_in_dir(&self.task_base_dir(context), &self.inputs)
758 }
759
760 fn resolve_output_paths(&self, context: &TaskContext) -> anyhow::Result<Vec<std::path::PathBuf>> {
761 let base_dir = self.task_base_dir(context);
762 Ok(
763 self
764 .outputs
765 .iter()
766 .map(|output| resolve_path(&base_dir, output))
767 .collect(),
768 )
769 }
770
771 fn resolve_env_file_paths(&self, context: &TaskContext) -> Vec<std::path::PathBuf> {
772 let config_base_dir = self.config_base_dir(context);
773 let mut env_files = context
774 .task_root
775 .env_file
776 .iter()
777 .chain(self.env_file.iter())
778 .map(|env_file| resolve_path(&config_base_dir, env_file))
779 .collect::<Vec<_>>();
780 env_files.sort();
781 env_files.dedup();
782 env_files
783 }
784
785 fn resolve_secret_paths(&self, context: &TaskContext) -> Vec<std::path::PathBuf> {
786 let config_base_dir = self.config_base_dir(context);
787 let vault_location = context
788 .secret_vault_location
789 .as_deref()
790 .map(|path| resolve_path(&config_base_dir, path))
791 .unwrap_or_else(|| resolve_path(&config_base_dir, "./.mk/vault"));
792 let mut secret_paths = context
793 .task_root
794 .secrets_path
795 .iter()
796 .chain(self.secrets_path.iter())
797 .map(|secret_path| vault_location.join(secret_path))
798 .collect::<Vec<_>>();
799 secret_paths.sort();
800 secret_paths.dedup();
801 secret_paths
802 }
803}
804
805impl ExecutionMode {
806 fn is_parallel(&self) -> bool {
807 matches!(self, ExecutionMode::Parallel)
808 }
809}
810
811fn sorted_env_vars(env_vars: &HashMap<String, String>) -> Vec<(String, String)> {
812 let mut pairs: Vec<_> = env_vars
813 .iter()
814 .map(|(key, value)| (key.clone(), value.clone()))
815 .collect();
816 pairs.sort();
817 pairs
818}
819
820fn fingerprint_task_key(context: &TaskContext, task: &TaskArgs) -> String {
821 context.current_task_name.clone().unwrap_or_else(|| {
822 if !task.description.is_empty() {
823 task.description.clone()
824 } else {
825 format!("{:?}", task.commands)
826 }
827 })
828}
829
830fn stable_task_debug(task: &TaskArgs) -> String {
831 let mut labels: Vec<_> = task
832 .labels
833 .iter()
834 .map(|(key, value)| (key.clone(), value.clone()))
835 .collect();
836 labels.sort();
837
838 let mut environment: Vec<_> = task
839 .environment
840 .iter()
841 .map(|(key, value)| (key.clone(), value.clone()))
842 .collect();
843 environment.sort();
844
845 let mut secrets_path = task.secrets_path.clone();
846 secrets_path.sort();
847
848 format!(
849 "commands={:?};preconditions={:?};depends_on={:?};labels={:?};description={:?};environment={:?};env_file={:?};secrets_path={:?};vault_location={:?};keys_location={:?};key_name={:?};shell={:?};execution_mode={:?};max_parallel={:?};fail_fast={};cache_enabled={};inputs={:?};outputs={:?};ignore_errors={:?};verbose={:?}",
850 task.commands,
851 task.preconditions,
852 task.depends_on,
853 labels,
854 task.description,
855 environment,
856 task.env_file,
857 secrets_path,
858 task.vault_location,
859 task.keys_location,
860 task.key_name,
861 task.shell,
862 task.execution_mode(),
863 task.execution.as_ref().and_then(|execution| execution.max_parallel),
864 task.fail_fast(),
865 task.cache_enabled(),
866 task.inputs,
867 task.outputs,
868 task.ignore_errors,
869 task.verbose
870 )
871}
872
873#[cfg(test)]
874mod test {
875 use super::*;
876
877 #[test]
878 fn test_task_1() -> anyhow::Result<()> {
879 {
880 let yaml = "
881 commands:
882 - command: echo \"Hello, World!\"
883 ignore_errors: false
884 verbose: false
885 depends_on:
886 - name: task1
887 description: This is a task
888 environment:
889 FOO: bar
890 env_file:
891 - test.env
892 - test2.env
893 ";
894
895 let task = serde_yaml::from_str::<Task>(yaml)?;
896
897 if let Task::Task(task) = &task {
898 if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
899 assert_eq!(local_run.command, "echo \"Hello, World!\"");
900 assert_eq!(local_run.work_dir, None);
901 assert_eq!(local_run.ignore_errors, Some(false));
902 assert_eq!(local_run.verbose, Some(false));
903 }
904
905 if let TaskDependency::TaskDependency(args) = &task.depends_on[0] {
906 assert_eq!(args.name, "task1");
907 }
908
909 assert_eq!(task.labels.len(), 0);
910 assert_eq!(task.description, "This is a task");
911 assert_eq!(task.environment.len(), 1);
912 assert_eq!(task.env_file.len(), 2);
913 } else {
914 panic!("Expected Task::Task");
915 }
916
917 Ok(())
918 }
919 }
920
921 #[test]
922 fn test_task_2() -> anyhow::Result<()> {
923 {
924 let yaml = "
925 commands:
926 - command: echo 'Hello, World!'
927 ignore_errors: false
928 verbose: false
929 description: This is a task
930 environment:
931 FOO: bar
932 BAR: foo
933 ";
934
935 let task = serde_yaml::from_str::<Task>(yaml)?;
936
937 if let Task::Task(task) = &task {
938 if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
939 assert_eq!(local_run.command, "echo 'Hello, World!'");
940 assert_eq!(local_run.work_dir, None);
941 assert_eq!(local_run.ignore_errors, Some(false));
942 assert_eq!(local_run.verbose, Some(false));
943 }
944
945 assert_eq!(task.description, "This is a task");
946 assert_eq!(task.depends_on.len(), 0);
947 assert_eq!(task.labels.len(), 0);
948 assert_eq!(task.env_file.len(), 0);
949 assert_eq!(task.environment.len(), 2);
950 } else {
951 panic!("Expected Task::Task");
952 }
953
954 Ok(())
955 }
956 }
957
958 #[test]
959 fn test_task_3() -> anyhow::Result<()> {
960 {
961 let yaml = "
962 commands:
963 - command: echo 'Hello, World!'
964 ";
965
966 let task = serde_yaml::from_str::<Task>(yaml)?;
967
968 if let Task::Task(task) = &task {
969 if let CommandRunner::LocalRun(local_run) = &task.commands[0] {
970 assert_eq!(local_run.command, "echo 'Hello, World!'");
971 assert_eq!(local_run.work_dir, None);
972 assert_eq!(local_run.ignore_errors, None);
973 assert_eq!(local_run.verbose, None);
974 }
975
976 assert_eq!(task.description.len(), 0);
977 assert_eq!(task.depends_on.len(), 0);
978 assert_eq!(task.labels.len(), 0);
979 assert_eq!(task.env_file.len(), 0);
980 assert_eq!(task.environment.len(), 0);
981 } else {
982 panic!("Expected Task::Task");
983 }
984
985 Ok(())
986 }
987 }
988
989 #[test]
990 fn test_task_4() -> anyhow::Result<()> {
991 {
992 let yaml = "
993 commands:
994 - container_command:
995 - echo
996 - Hello, World!
997 image: docker.io/library/hello-world:latest
998 ";
999
1000 let task = serde_yaml::from_str::<Task>(yaml)?;
1001
1002 if let Task::Task(task) = &task {
1003 if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1004 assert_eq!(container_run.container_command.len(), 2);
1005 assert_eq!(container_run.container_command[0], "echo");
1006 assert_eq!(container_run.container_command[1], "Hello, World!");
1007 assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1008 assert_eq!(container_run.mounted_paths, Vec::<String>::new());
1009 assert_eq!(container_run.ignore_errors, None);
1010 assert_eq!(container_run.verbose, None);
1011 }
1012
1013 assert_eq!(task.description.len(), 0);
1014 assert_eq!(task.depends_on.len(), 0);
1015 assert_eq!(task.labels.len(), 0);
1016 assert_eq!(task.env_file.len(), 0);
1017 assert_eq!(task.environment.len(), 0);
1018 } else {
1019 panic!("Expected Task::Task");
1020 }
1021
1022 Ok(())
1023 }
1024 }
1025
1026 #[test]
1027 fn test_task_5() -> anyhow::Result<()> {
1028 {
1029 let yaml = "
1030 commands:
1031 - container_command:
1032 - echo
1033 - Hello, World!
1034 image: docker.io/library/hello-world:latest
1035 mounted_paths:
1036 - /tmp
1037 - /var/tmp
1038 ";
1039
1040 let task = serde_yaml::from_str::<Task>(yaml)?;
1041
1042 if let Task::Task(task) = &task {
1043 if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1044 assert_eq!(container_run.container_command.len(), 2);
1045 assert_eq!(container_run.container_command[0], "echo");
1046 assert_eq!(container_run.container_command[1], "Hello, World!");
1047 assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1048 assert_eq!(container_run.mounted_paths, vec!["/tmp", "/var/tmp"]);
1049 assert_eq!(container_run.ignore_errors, None);
1050 assert_eq!(container_run.verbose, None);
1051 }
1052
1053 assert_eq!(task.description.len(), 0);
1054 assert_eq!(task.depends_on.len(), 0);
1055 assert_eq!(task.labels.len(), 0);
1056 assert_eq!(task.env_file.len(), 0);
1057 assert_eq!(task.environment.len(), 0);
1058 } else {
1059 panic!("Expected Task::Task");
1060 }
1061
1062 Ok(())
1063 }
1064 }
1065
1066 #[test]
1067 fn test_task_6() -> anyhow::Result<()> {
1068 {
1069 let yaml = "
1070 commands:
1071 - container_command:
1072 - echo
1073 - Hello, World!
1074 image: docker.io/library/hello-world:latest
1075 mounted_paths:
1076 - /tmp
1077 - /var/tmp
1078 ignore_errors: true
1079 ";
1080
1081 let task = serde_yaml::from_str::<Task>(yaml)?;
1082
1083 if let Task::Task(task) = &task {
1084 if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1085 assert_eq!(container_run.container_command.len(), 2);
1086 assert_eq!(container_run.container_command[0], "echo");
1087 assert_eq!(container_run.container_command[1], "Hello, World!");
1088 assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1089 assert_eq!(container_run.mounted_paths, vec!["/tmp", "/var/tmp"]);
1090 assert_eq!(container_run.ignore_errors, Some(true));
1091 assert_eq!(container_run.verbose, None);
1092 }
1093
1094 assert_eq!(task.description.len(), 0);
1095 assert_eq!(task.depends_on.len(), 0);
1096 assert_eq!(task.labels.len(), 0);
1097 assert_eq!(task.env_file.len(), 0);
1098 assert_eq!(task.environment.len(), 0);
1099 } else {
1100 panic!("Expected Task::Task");
1101 }
1102
1103 Ok(())
1104 }
1105 }
1106
1107 #[test]
1108 fn test_task_7() -> anyhow::Result<()> {
1109 {
1110 let yaml = "
1111 commands:
1112 - container_command:
1113 - echo
1114 - Hello, World!
1115 image: docker.io/library/hello-world:latest
1116 verbose: false
1117 ";
1118
1119 let task = serde_yaml::from_str::<Task>(yaml)?;
1120
1121 if let Task::Task(task) = &task {
1122 if let CommandRunner::ContainerRun(container_run) = &task.commands[0] {
1123 assert_eq!(container_run.container_command.len(), 2);
1124 assert_eq!(container_run.container_command[0], "echo");
1125 assert_eq!(container_run.container_command[1], "Hello, World!");
1126 assert_eq!(container_run.image, "docker.io/library/hello-world:latest");
1127 assert_eq!(container_run.mounted_paths, Vec::<String>::new());
1128 assert_eq!(container_run.ignore_errors, None);
1129 assert_eq!(container_run.verbose, Some(false));
1130 }
1131
1132 assert_eq!(task.description.len(), 0);
1133 assert_eq!(task.depends_on.len(), 0);
1134 assert_eq!(task.labels.len(), 0);
1135 assert_eq!(task.env_file.len(), 0);
1136 assert_eq!(task.environment.len(), 0);
1137 } else {
1138 panic!("Expected Task::Task");
1139 }
1140
1141 Ok(())
1142 }
1143 }
1144
1145 #[test]
1146 fn test_task_8() -> anyhow::Result<()> {
1147 {
1148 let yaml = "
1149 commands:
1150 - task: task1
1151 ";
1152
1153 let task = serde_yaml::from_str::<Task>(yaml)?;
1154
1155 if let Task::Task(task) = &task {
1156 if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1157 assert_eq!(task_run.task, "task1");
1158 assert_eq!(task_run.ignore_errors, None);
1159 assert_eq!(task_run.verbose, None);
1160 }
1161
1162 assert_eq!(task.description.len(), 0);
1163 assert_eq!(task.depends_on.len(), 0);
1164 assert_eq!(task.labels.len(), 0);
1165 assert_eq!(task.env_file.len(), 0);
1166 assert_eq!(task.environment.len(), 0);
1167 } else {
1168 panic!("Expected Task::Task");
1169 }
1170
1171 Ok(())
1172 }
1173 }
1174
1175 #[test]
1176 fn test_task_9() -> anyhow::Result<()> {
1177 {
1178 let yaml = "
1179 commands:
1180 - task: task1
1181 verbose: true
1182 ";
1183
1184 let task = serde_yaml::from_str::<Task>(yaml)?;
1185
1186 if let Task::Task(task) = &task {
1187 if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1188 assert_eq!(task_run.task, "task1");
1189 assert_eq!(task_run.ignore_errors, None);
1190 assert_eq!(task_run.verbose, Some(true));
1191 }
1192
1193 assert_eq!(task.description.len(), 0);
1194 assert_eq!(task.depends_on.len(), 0);
1195 assert_eq!(task.labels.len(), 0);
1196 assert_eq!(task.env_file.len(), 0);
1197 assert_eq!(task.environment.len(), 0);
1198 } else {
1199 panic!("Expected Task::Task");
1200 }
1201
1202 Ok(())
1203 }
1204 }
1205
1206 #[test]
1207 fn test_task_10() -> anyhow::Result<()> {
1208 {
1209 let yaml = "
1210 commands:
1211 - task: task1
1212 ignore_errors: true
1213 ";
1214
1215 let task = serde_yaml::from_str::<Task>(yaml)?;
1216
1217 if let Task::Task(task) = &task {
1218 if let CommandRunner::TaskRun(task_run) = &task.commands[0] {
1219 assert_eq!(task_run.task, "task1");
1220 assert_eq!(task_run.ignore_errors, Some(true));
1221 assert_eq!(task_run.verbose, None);
1222 }
1223
1224 assert_eq!(task.description.len(), 0);
1225 assert_eq!(task.depends_on.len(), 0);
1226 assert_eq!(task.labels.len(), 0);
1227 assert_eq!(task.env_file.len(), 0);
1228 assert_eq!(task.environment.len(), 0);
1229 } else {
1230 panic!("Expected Task::Task");
1231 }
1232
1233 Ok(())
1234 }
1235 }
1236
1237 #[test]
1238 fn test_task_11() -> anyhow::Result<()> {
1239 {
1240 let yaml = "
1241 echo 'Hello, World!'
1242 ";
1243
1244 let task = serde_yaml::from_str::<Task>(yaml)?;
1245
1246 if let Task::String(task) = &task {
1247 assert_eq!(task, "echo 'Hello, World!'");
1248 } else {
1249 panic!("Expected Task::String");
1250 }
1251
1252 Ok(())
1253 }
1254 }
1255
1256 #[test]
1257 fn test_task_12() -> anyhow::Result<()> {
1258 {
1259 let yaml = "
1260 'true'
1261 ";
1262
1263 let task = serde_yaml::from_str::<Task>(yaml)?;
1264
1265 if let Task::String(task) = &task {
1266 assert_eq!(task, "true");
1267 } else {
1268 panic!("Expected Task::String");
1269 }
1270
1271 Ok(())
1272 }
1273 }
1274
1275 #[test]
1276 fn test_task_13() -> anyhow::Result<()> {
1277 {
1278 let yaml = "
1279 commands: []
1280 environment:
1281 FOO: bar
1282 BAR: foo
1283 KEY: 42
1284 PIS: 3.14
1285 ";
1286
1287 let task = serde_yaml::from_str::<Task>(yaml)?;
1288
1289 if let Task::Task(task) = &task {
1290 assert_eq!(task.environment.len(), 4);
1291 assert_eq!(task.environment.get("FOO").unwrap(), "bar");
1292 assert_eq!(task.environment.get("BAR").unwrap(), "foo");
1293 assert_eq!(task.environment.get("KEY").unwrap(), "42");
1294 } else {
1295 panic!("Expected Task::Task");
1296 }
1297
1298 Ok(())
1299 }
1300 }
1301
1302 #[test]
1303 fn test_task_14() -> anyhow::Result<()> {
1304 let yaml = "
1305 commands: []
1306 secrets_path:
1307 - app/common
1308 vault_location: ./.mk/vault
1309 keys_location: ./.mk/keys
1310 key_name: team
1311 environment:
1312 SECRET_VALUE: ${{ secrets.app/password }}
1313 ";
1314
1315 let task = serde_yaml::from_str::<Task>(yaml)?;
1316
1317 if let Task::Task(task) = &task {
1318 assert_eq!(task.secrets_path, vec!["app/common"]);
1319 assert_eq!(task.vault_location.as_deref(), Some("./.mk/vault"));
1320 assert_eq!(task.keys_location.as_deref(), Some("./.mk/keys"));
1321 assert_eq!(task.key_name.as_deref(), Some("team"));
1322 assert_eq!(
1323 task.environment.get("SECRET_VALUE").map(String::as_str),
1324 Some("${{ secrets.app/password }}")
1325 );
1326 } else {
1327 panic!("Expected Task::Task");
1328 }
1329
1330 Ok(())
1331 }
1332
1333 #[test]
1334 fn test_parallel_interactive_rejected() -> anyhow::Result<()> {
1335 let yaml = r#"
1336 commands:
1337 - command: "echo hello"
1338 interactive: true
1339 - command: "echo world"
1340 parallel: true
1341 "#;
1342
1343 let task = serde_yaml::from_str::<Task>(yaml)?;
1344 let mut context = TaskContext::empty();
1345
1346 if let Task::Task(task) = task {
1347 let result = task.run(&mut context);
1348 assert!(result.is_err());
1349 assert!(result
1350 .unwrap_err()
1351 .to_string()
1352 .contains("Interactive local commands cannot be run in parallel"));
1353 }
1354
1355 Ok(())
1356 }
1357
1358 #[test]
1359 fn test_parallel_non_interactive_accepted() -> anyhow::Result<()> {
1360 let yaml = r#"
1361 commands:
1362 - command: "echo hello"
1363 interactive: false
1364 - command: "echo world"
1365 parallel: true
1366 "#;
1367
1368 let task = serde_yaml::from_str::<Task>(yaml)?;
1369 let mut context = TaskContext::empty();
1370
1371 if let Task::Task(task) = task {
1372 let result = task.run(&mut context);
1373 assert!(result.is_ok());
1374 }
1375
1376 Ok(())
1377 }
1378
1379 #[test]
1380 fn test_parallel_retrigger_rejected() -> anyhow::Result<()> {
1381 let yaml = r#"
1382 commands:
1383 - command: "go run ."
1384 retrigger: true
1385 parallel: true
1386 "#;
1387
1388 let task = serde_yaml::from_str::<Task>(yaml)?;
1389 let mut context = TaskContext::empty();
1390
1391 if let Task::Task(task) = task {
1392 let result = task.run(&mut context);
1393 assert!(result.is_err());
1394 assert!(result
1395 .unwrap_err()
1396 .to_string()
1397 .contains("Local commands with `retrigger: true` cannot be run in parallel"));
1398 }
1399
1400 Ok(())
1401 }
1402}