1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
use crate::config::Settings;
use crate::config::env_directive::EnvDirective;
use crate::task::task_fetcher::TaskFetcher;
use crate::task::{Task, dep_has_usage_ref, parse_usage_values_from_task};
use crate::{config::Config, task::task_list::resolve_depends};
use itertools::Itertools;
use petgraph::Direction;
use petgraph::graph::DiGraph;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::sync::mpsc;
/// Unique key for a task instance, including name, args, and env vars
pub type TaskKey = (String, Vec<String>, Vec<(String, String)>);
#[derive(Debug)]
pub struct Deps {
pub graph: DiGraph<Task, ()>,
sent: HashSet<TaskKey>, // tasks that have already started so should not run again
removed: HashSet<TaskKey>, // tasks that have already finished to track if we are in an infinitve loop
executed: HashSet<TaskKey>, // tasks that actually began executing (not just scheduled)
ran: HashSet<TaskKey>, // tasks that actually ran their commands (not skipped due to fresh sources)
dep_edges: HashMap<TaskKey, HashSet<TaskKey>>, // maps each task to its direct dependency task keys
post_dep_parents: HashMap<TaskKey, HashSet<TaskKey>>, // maps each post-dep to its parent tasks
tx: mpsc::UnboundedSender<Option<Task>>,
// not clone, notify waiters via tx None
}
/// Extract a hashable key from a task, including env vars set via dependencies
pub fn task_key(task: &Task) -> TaskKey {
// Extract simple key-value env vars for deduplication
// This ensures tasks with same name/args but different env are treated as distinct
let env_key: Vec<(String, String)> = task
.env
.0
.iter()
.filter_map(|d| match d {
EnvDirective::Val(k, v, _) => Some((k.clone(), v.clone())),
_ => None,
})
.sorted()
.collect();
(task.name.clone(), task.args.clone(), env_key)
}
/// manages a dependency graph of tasks so `mise run` knows what to run next
impl Deps {
pub async fn new(config: &Arc<Config>, tasks: Vec<Task>) -> eyre::Result<Self> {
let mut graph = DiGraph::new();
let mut indexes = HashMap::new();
let mut stack = vec![];
let mut seen = HashSet::new();
let mut post_dep_parents: HashMap<TaskKey, HashSet<TaskKey>> = HashMap::new();
let mut dep_edges: HashMap<TaskKey, HashSet<TaskKey>> = HashMap::new();
let mut add_idx = |task: &Task, graph: &mut DiGraph<Task, ()>| {
*indexes
.entry(task_key(task))
.or_insert_with(|| graph.add_node(task.clone()))
};
// first we add all tasks to the graph, create a stack of work for this function, and
// store the index of each task in the graph
for t in &tasks {
stack.push(t.clone());
add_idx(t, &mut graph);
}
let all_tasks_to_run = resolve_depends(config, tasks).await?;
let no_cache = Settings::get().task.remote_no_cache.unwrap_or(false);
let fetcher = TaskFetcher::new(no_cache);
while let Some(mut a) = stack.pop() {
if seen.contains(&a) {
// prevent infinite loop
continue;
}
// Fetch remote task files so file-based tasks have local paths
// before we try to parse their usage specs or execute them.
if a.file
.as_ref()
.is_some_and(|f| TaskFetcher::is_remote_source(&f.to_string_lossy()))
{
let mut tasks_to_fetch = vec![a];
fetcher.fetch_tasks(&mut tasks_to_fetch).await?;
a = tasks_to_fetch.into_iter().next().unwrap();
}
// Re-render dependency templates with usage values (including defaults)
// so {{usage.*}} resolves.
let has_usage_deps = |raw: &Option<Vec<_>>| {
raw.as_ref()
.is_some_and(|r| r.iter().any(dep_has_usage_ref))
};
if has_usage_deps(&a.depends_raw)
|| has_usage_deps(&a.depends_post_raw)
|| has_usage_deps(&a.wait_for_raw)
{
let usage_values = parse_usage_values_from_task(config, &a).await?;
if !usage_values.is_empty() {
a.render_depends_with_usage(config, &usage_values).await?;
}
}
let a_idx = add_idx(&a, &mut graph);
// Update the graph node with the fetched version of the task
// (add_idx may have returned an existing index with an unfetched task)
graph[a_idx] = a.clone();
let (pre, post) = a.resolve_depends(config, &all_tasks_to_run).await?;
for b in pre {
let b_idx = add_idx(&b, &mut graph);
graph.update_edge(a_idx, b_idx, ());
dep_edges
.entry(task_key(&a))
.or_default()
.insert(task_key(&b));
stack.push(b.clone());
}
for b in post {
let b_idx = add_idx(&b, &mut graph);
graph.update_edge(b_idx, a_idx, ());
post_dep_parents
.entry(task_key(&b))
.or_default()
.insert(task_key(&a));
stack.push(b.clone());
}
seen.insert(a);
}
let (tx, _) = mpsc::unbounded_channel();
let sent = HashSet::new();
let removed = HashSet::new();
let executed = HashSet::new();
let ran = HashSet::new();
Ok(Self {
graph,
tx,
sent,
removed,
executed,
ran,
dep_edges,
post_dep_parents,
})
}
/// Create a sub-graph that prunes tasks already completed by the caller.
/// `completed` is a snapshot of task keys that have finished in the parent
/// graph — these are removed from the sub-graph so they don't run again.
pub async fn new_pruned(
config: &Arc<Config>,
tasks: Vec<Task>,
completed: &HashSet<TaskKey>,
) -> eyre::Result<Self> {
let mut deps = Self::new(config, tasks).await?;
let mut to_remove = vec![];
for idx in deps.graph.node_indices() {
let key = task_key(&deps.graph[idx]);
if completed.contains(&key) {
to_remove.push(idx);
}
}
// Remove in reverse index order so petgraph swap-remove
// doesn't invalidate indices we haven't processed yet
to_remove.sort_unstable_by(|a, b| b.cmp(a));
for idx in to_remove {
deps.graph.remove_node(idx);
}
deps.mark_ambiguous_prefixes();
Ok(deps)
}
/// main method to emit tasks that no longer have dependencies being waited on
fn emit_leaves(&mut self) {
let leaves = leaves(&self.graph);
let leaves_is_empty = leaves.is_empty();
for task in leaves {
let key = task_key(&task);
if self.sent.insert(key.clone()) {
trace!("Scheduling task {0}", task.name);
if let Err(e) = self.tx.send(Some(task)) {
trace!("Error sending task: {e:?}");
self.sent.remove(&key);
}
}
}
if self.is_empty() {
trace!("All tasks finished");
if let Err(e) = self.tx.send(None) {
trace!("Error closing task stream: {e:?}");
}
} else if leaves_is_empty && self.sent.len() == self.removed.len() {
panic!(
"Infinitive loop detected, all tasks are finished but the graph isn't empty {0} {1:#?}",
self.all().map(|t| t.name.clone()).join(", "),
self.graph
)
}
}
/// listened to by `mise run` which gets a stream of tasks to run
pub fn subscribe(&mut self) -> mpsc::UnboundedReceiver<Option<Task>> {
let (tx, rx) = mpsc::unbounded_channel();
self.tx = tx;
self.emit_leaves();
rx
}
pub fn is_empty(&self) -> bool {
self.graph.node_count() == 0
}
/// Snapshot of task keys that have completed (removed from the graph).
/// Used by `new_pruned` so sub-graphs skip tasks the parent already ran.
/// Only includes confirmed-complete tasks, not in-flight ones, to
/// preserve dependency ordering in the sub-graph.
pub fn handled_task_keys(&self) -> HashSet<TaskKey> {
self.removed.clone()
}
/// Check if a post-dep task should actually run: it must be a post-dependency
/// AND its parent must have actually started executing (not just been scheduled).
/// Returns false for non-post-dep tasks or post-deps whose parent was never executed.
pub fn is_runnable_post_dep(&self, task: &Task) -> bool {
let key = task_key(task);
match self.post_dep_parents.get(&key) {
Some(parent_keys) => parent_keys.iter().any(|pk| self.executed.contains(pk)),
None => false,
}
}
/// Mark a task as having actually started execution.
/// This is distinct from being scheduled (sent) — a task may be scheduled as a
/// graph leaf but then skipped because an earlier task failed.
pub fn mark_executed(&mut self, task: &Task) {
self.executed.insert(task_key(task));
}
/// Mark a task as having actually run its commands (not skipped due to fresh sources).
/// Used to invalidate dependent tasks' source freshness checks.
pub fn mark_ran(&mut self, task: &Task) {
self.ran.insert(task_key(task));
}
/// Check if any direct dependency of the given task actually ran (not skipped).
pub fn any_dep_ran(&self, task: &Task) -> bool {
let key = task_key(task);
self.dep_edges
.get(&key)
.is_some_and(|deps| deps.iter().any(|dep_key| self.ran.contains(dep_key)))
}
/// Remove multiple tasks from the graph in a batch, emitting leaves only once at the end.
/// This prevents intermediate emit_leaves from scheduling tasks that will be removed later.
pub fn remove_batch(&mut self, tasks: &[Task]) {
for task in tasks {
if let Some(idx) = self.node_idx(task) {
self.graph.remove_node(idx);
let key = task_key(task);
self.removed.insert(key);
}
}
self.emit_leaves();
}
// use contracts::{ensures, requires};
// #[requires(self.graph.node_count() > 0)]
// #[ensures(self.graph.node_count() == old(self.graph.node_count()) - 1)]
pub fn remove(&mut self, task: &Task) {
if let Some(idx) = self.node_idx(task) {
self.graph.remove_node(idx);
let key = task_key(task);
self.removed.insert(key);
self.emit_leaves();
}
}
fn node_idx(&self, task: &Task) -> Option<petgraph::graph::NodeIndex> {
self.graph
.node_indices()
.find(|&idx| &self.graph[idx] == task)
}
pub fn all(&self) -> impl Iterator<Item = &Task> {
self.graph.node_indices().map(|idx| &self.graph[idx])
}
/// Mark tasks that share a display_name so their prefix includes args
/// for disambiguation (e.g. `[test-docker 4.1]` vs `[test-docker 4.2]`).
pub fn mark_ambiguous_prefixes(&mut self) {
let mut name_to_indices: HashMap<String, Vec<petgraph::graph::NodeIndex>> = HashMap::new();
for idx in self.graph.node_indices() {
name_to_indices
.entry(self.graph[idx].display_name.clone())
.or_default()
.push(idx);
}
for indices in name_to_indices.values() {
if indices.len() > 1 {
for &idx in indices {
self.graph[idx].show_args_in_prefix = true;
}
}
}
}
pub fn is_linear(&self) -> bool {
let mut graph = self.graph.clone();
// pop dependencies off, if we get multiple dependencies at once it's not linear
loop {
let leaves = leaves(&graph);
if leaves.is_empty() {
return true;
} else if leaves.len() > 1 {
return false;
} else {
let idx = self
.graph
.node_indices()
.find(|&idx| graph[idx] == leaves[0])
.unwrap();
graph.remove_node(idx);
}
}
}
}
fn leaves(graph: &DiGraph<Task, ()>) -> Vec<Task> {
graph
.externals(Direction::Outgoing)
.map(|idx| graph[idx].clone())
.collect()
}