1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
use crate::config::Settings;
use crate::task::Task;
use crate::task::task_helpers::task_needs_permit;
use crate::task::task_output::TaskOutput;
use crate::ui::multi_progress_report::MultiProgressReport;
use crate::ui::progress_report::SingleReport;
use indexmap::IndexMap;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
type TaskPrMap = Arc<Mutex<IndexMap<Task, Arc<Box<dyn SingleReport>>>>>;
/// A single line of output, tagged by stream.
pub enum KeepOrderLine {
Stdout(String, String), // (prefix, line)
Stderr(String, String), // (prefix, line)
}
/// Streaming state for keep-order mode.
///
/// One task at a time is "active" and streams output in real-time.
/// Other tasks buffer their output. When the active task finishes,
/// any already-finished tasks' buffers are flushed, then the next
/// running task with buffered output is promoted to stream live.
pub struct KeepOrderState {
/// The task whose output is currently being streamed live
active: Option<Task>,
/// Buffered output for non-active tasks (insertion order preserved)
buffers: IndexMap<Task, Vec<KeepOrderLine>>,
/// Tasks that finished while not active (in order of completion)
finished: Vec<Task>,
/// Set after flush_all — further output prints directly
done: bool,
}
impl KeepOrderState {
pub fn new() -> Self {
Self {
active: None,
buffers: IndexMap::new(),
finished: Vec::new(),
done: false,
}
}
pub fn init_task(&mut self, task: &Task) {
self.buffers.entry(task.clone()).or_default();
}
/// Whether this task should stream live (is active, or is first in
/// definition order when no task is active yet).
fn is_active(&self, task: &Task) -> bool {
if let Some(active) = &self.active {
active == task
} else {
// No active task yet — only the first task in definition order may claim it
self.buffers.first().map(|(t, _)| t) == Some(task)
}
}
/// Called when a stdout line is produced by a task's process.
pub fn on_stdout(&mut self, task: &Task, prefix: String, line: String) {
if self.done || self.is_active(task) {
self.active = Some(task.clone());
print_stdout(&prefix, &line);
} else {
self.buffers
.entry(task.clone())
.or_default()
.push(KeepOrderLine::Stdout(prefix, line));
}
}
/// Called when a stderr line is produced by a task's process,
/// or when metadata (command echo, timing) is emitted for a task.
pub fn on_stderr(&mut self, task: &Task, prefix: String, line: String) {
if self.done || self.is_active(task) {
self.active = Some(task.clone());
print_stderr(&prefix, &line);
} else {
self.buffers
.entry(task.clone())
.or_default()
.push(KeepOrderLine::Stderr(prefix, line));
}
}
/// Called when a task finishes execution.
pub fn on_task_finished(&mut self, task: &Task) {
if !self.buffers.contains_key(task) {
return; // Not a keep-order task
}
if self.is_active(task) {
// Active task finished — clear it, flush waiting tasks, promote next
self.active = None;
self.buffers.shift_remove(task);
self.flush_finished();
self.promote_next();
} else {
// Non-active task finished — remember it for later flushing
self.finished.push(task.clone());
}
}
/// Flush contiguous finished tasks from the front of the buffer.
/// Stops at the first non-finished task to preserve definition order.
fn flush_finished(&mut self) {
let mut finished: std::collections::HashSet<_> = self.finished.drain(..).collect();
loop {
let Some((task, _)) = self.buffers.first() else {
break;
};
if !finished.remove(task) {
break; // Hit a non-finished task, stop
}
let task = task.clone();
if let Some(lines) = self.buffers.shift_remove(&task) {
Self::print_lines(&lines);
}
}
// Re-add finished tasks we couldn't flush (behind a still-running task)
self.finished.extend(finished);
}
/// Promote the next buffered (still-running) task to active and
/// flush its current buffer so it can stream live going forward.
fn promote_next(&mut self) {
if let Some((task, _)) = self.buffers.first() {
let task = task.clone();
self.active = Some(task.clone());
if let Some(lines) = self.buffers.get_mut(&task) {
let lines = std::mem::take(lines);
Self::print_lines(&lines);
}
}
}
fn print_lines(lines: &[KeepOrderLine]) {
for line in lines {
match line {
KeepOrderLine::Stdout(prefix, line) => print_stdout(prefix, line),
KeepOrderLine::Stderr(prefix, line) => print_stderr(prefix, line),
}
}
}
/// Safety-net: flush any remaining output (called at the very end).
/// After this, any further output prints directly.
pub fn flush_all(&mut self) {
self.active = None;
self.flush_finished();
for (_, lines) in self.buffers.drain(..) {
Self::print_lines(&lines);
}
self.done = true;
}
}
fn print_stdout(prefix: &str, line: &str) {
if console::colors_enabled() {
prefix_println!(prefix, "{line}\x1b[0m");
} else {
prefix_println!(prefix, "{line}");
}
}
fn print_stderr(prefix: &str, line: &str) {
if console::colors_enabled_stderr() {
prefix_eprintln!(prefix, "{line}\x1b[0m");
} else {
prefix_eprintln!(prefix, "{line}");
}
}
/// Configuration for OutputHandler
pub struct OutputHandlerConfig {
pub output: Option<TaskOutput>,
pub silent: bool,
pub quiet: bool,
pub raw: bool,
pub is_linear: bool,
pub jobs: Option<usize>,
}
/// Handles task output routing, formatting, and display
pub struct OutputHandler {
pub keep_order_state: Arc<Mutex<KeepOrderState>>,
pub task_prs: TaskPrMap,
pub timed_outputs: Arc<Mutex<IndexMap<String, (SystemTime, String)>>>,
// Configuration from CLI args
output: Option<TaskOutput>,
silent: bool,
quiet: bool,
raw: bool,
is_linear: bool,
jobs: Option<usize>,
}
impl Clone for OutputHandler {
fn clone(&self) -> Self {
Self {
keep_order_state: self.keep_order_state.clone(),
task_prs: self.task_prs.clone(),
timed_outputs: self.timed_outputs.clone(),
output: self.output,
silent: self.silent,
quiet: self.quiet,
raw: self.raw,
is_linear: self.is_linear,
jobs: self.jobs,
}
}
}
impl OutputHandler {
/// Get or lazily create a progress reporter for a task in Replacing mode.
pub fn get_or_init_task_pr(&self, task: &Task) -> Arc<Box<dyn SingleReport>> {
let mut prs = self.task_prs.lock().unwrap();
if let Some(pr) = prs.get(task) {
pr.clone()
} else {
let pr = MultiProgressReport::get().add(&task.estyled_prefix());
let pr = Arc::new(pr);
prs.insert(task.clone(), pr.clone());
pr
}
}
}
impl OutputHandler {
pub fn new(config: OutputHandlerConfig) -> Self {
Self {
keep_order_state: Arc::new(Mutex::new(KeepOrderState::new())),
task_prs: Arc::new(Mutex::new(IndexMap::new())),
timed_outputs: Arc::new(Mutex::new(IndexMap::new())),
output: config.output,
silent: config.silent,
quiet: config.quiet,
raw: config.raw,
is_linear: config.is_linear,
jobs: config.jobs,
}
}
/// Initialize output handling for a task
pub fn init_task(&mut self, task: &Task) {
match self.output(Some(task)) {
TaskOutput::KeepOrder => {
// Only add tasks that produce output (not orchestrator-only tasks)
if task_needs_permit(task) {
self.keep_order_state.lock().unwrap().init_task(task);
}
}
TaskOutput::Replacing => {
self.get_or_init_task_pr(task);
}
_ => {}
}
}
/// Determine the output mode for a task
pub fn output(&self, task: Option<&Task>) -> TaskOutput {
// Check for full silent mode (both streams)
// Only Silent::Bool(true) means completely silent, not Silent::Stdout or Silent::Stderr
if let Some(task_ref) = task
&& matches!(task_ref.silent, crate::task::Silent::Bool(true))
{
return TaskOutput::Silent;
}
// Check global output settings
if let Some(o) = self.output {
return o;
} else if let Some(task_ref) = task {
// Fall through to other checks if silent is Off
if self.silent_bool() {
return TaskOutput::Silent;
}
if self.quiet(Some(task_ref)) {
return TaskOutput::Quiet;
}
} else if self.silent_bool() {
return TaskOutput::Silent;
} else if self.quiet(task) {
return TaskOutput::Quiet;
}
if let Some(output) = Settings::get().task.output {
// Silent/quiet from config override raw (output suppression takes precedence)
// Other modes (prefix, etc.) allow raw to take precedence for stdin/stdout
if output.is_silent() || output.is_quiet() {
output
} else if self.raw(task) {
TaskOutput::Interleave
} else {
output
}
} else if self.raw(task) || self.jobs() == 1 || self.is_linear {
TaskOutput::Interleave
} else {
TaskOutput::Prefix
}
}
/// Print error/metadata message for a task.
/// For keep-order mode, routes through the streaming state so messages
/// stay ordered with the task's stdout/stderr.
pub fn eprint(&self, task: &Task, prefix: &str, line: &str) {
match self.output(Some(task)) {
TaskOutput::KeepOrder => {
self.keep_order_state.lock().unwrap().on_stderr(
task,
prefix.to_string(),
line.to_string(),
);
}
TaskOutput::Replacing => {
let pr = self.get_or_init_task_pr(task);
pr.set_message(format!("{prefix} {line}"));
}
_ => {
prefix_eprintln!(prefix, "{line}");
}
}
}
fn silent_bool(&self) -> bool {
self.silent
|| Settings::get().silent
|| self.output.is_some_and(|o| o.is_silent())
|| Settings::get().task.output.is_some_and(|o| o.is_silent())
}
pub fn silent(&self, task: Option<&Task>) -> bool {
self.silent_bool() || task.is_some_and(|t| t.silent.is_silent())
}
pub fn quiet(&self, task: Option<&Task>) -> bool {
self.quiet
|| Settings::get().quiet
|| self.output.is_some_and(|o| o.is_quiet())
|| Settings::get().task.output.is_some_and(|o| o.is_quiet())
|| task.is_some_and(|t| t.quiet)
|| self.silent(task)
}
pub fn raw(&self, task: Option<&Task>) -> bool {
// Interactive tasks are treated as raw for I/O (stdin/stdout/stderr inherit).
// This means CmdLineRunner will also acquire its internal RAW_LOCK — that's
// intentional and harmless since TASK_RUNTIME_LOCK already provides exclusivity.
self.raw || Settings::get().raw || task.is_some_and(|t| t.raw || t.interactive)
}
pub fn jobs(&self) -> usize {
if self.raw {
1
} else {
self.jobs.unwrap_or(Settings::get().jobs)
}
}
}