branchless/core/
effects.rs

1//! Wrappers around various side effects.
2
3use bstr::ByteSlice;
4use std::fmt::{Debug, Display, Write};
5use std::io::{stderr, stdout, Stderr, Stdout, Write as WriteIo};
6use std::mem::take;
7use std::sync::{Arc, Mutex, RwLock};
8use std::time::{Duration, Instant};
9use std::{io, thread};
10
11use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
12use itertools::Itertools;
13use lazy_static::lazy_static;
14use tracing::warn;
15
16use crate::core::formatting::Glyphs;
17
18#[allow(missing_docs)]
19#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
20pub enum OperationType {
21    BuildRebasePlan,
22    CalculateDiff,
23    CalculatePatchId,
24    CheckForCycles,
25    ConstrainCommits,
26    DetectDuplicateCommits,
27    EvaluateRevset(Arc<String>),
28    FilterByTouchedPaths,
29    FilterCommits,
30    FindPathToMergeBase,
31    GetMergeBase,
32    GetTouchedPaths,
33    GetUpstreamPatchIds,
34    InitializeRebase,
35    MakeGraph,
36    ProcessEvents,
37    PushCommits,
38    QueryWorkingCopy,
39    ReadingFromCache,
40    RebaseCommits,
41    RepairBranches,
42    RepairCommits,
43    RunGitCommand(Arc<String>),
44    RunTestOnCommit(Arc<String>),
45    RunTests(Arc<String>),
46    SortCommits,
47    SyncCommits,
48    UpdateCommitGraph,
49    UpdateCommits,
50    WalkCommits,
51}
52
53impl Display for OperationType {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        match self {
56            OperationType::BuildRebasePlan => write!(f, "Building rebase plan"),
57            OperationType::CalculateDiff => write!(f, "Computing diffs"),
58            OperationType::CalculatePatchId => write!(f, "Hashing commit contents"),
59            OperationType::CheckForCycles => write!(f, "Checking for cycles"),
60            OperationType::ConstrainCommits => write!(f, "Creating commit constraints"),
61            OperationType::DetectDuplicateCommits => write!(f, "Checking for duplicate commits"),
62            OperationType::EvaluateRevset(revset) => {
63                write!(f, "Evaluating revset: {revset}")
64            }
65            OperationType::FilterByTouchedPaths => {
66                write!(f, "Filtering upstream commits by touched paths")
67            }
68            OperationType::FilterCommits => write!(f, "Filtering commits"),
69            OperationType::FindPathToMergeBase => write!(f, "Finding path to merge-base"),
70            OperationType::GetMergeBase => write!(f, "Calculating merge-bases"),
71            OperationType::GetTouchedPaths => write!(f, "Getting touched paths"),
72            OperationType::GetUpstreamPatchIds => write!(f, "Enumerating patch IDs"),
73            OperationType::InitializeRebase => write!(f, "Initializing rebase"),
74            OperationType::MakeGraph => write!(f, "Examining local history"),
75            OperationType::PushCommits => write!(f, "Pushing branches"),
76            OperationType::ProcessEvents => write!(f, "Processing events"),
77            OperationType::QueryWorkingCopy => write!(f, "Querying the working copy"),
78            OperationType::ReadingFromCache => write!(f, "Reading from cache"),
79            OperationType::RebaseCommits => write!(f, "Rebasing commits"),
80            OperationType::RepairBranches => write!(f, "Checking for broken branches"),
81            OperationType::RepairCommits => write!(f, "Checking for broken commits"),
82            OperationType::RunGitCommand(command) => {
83                write!(f, "Running Git command: {}", &command)
84            }
85            OperationType::RunTests(command) => write!(f, "Running command: {command}"),
86            OperationType::RunTestOnCommit(commit) => write!(f, "Waiting to run on {commit}"),
87            OperationType::SortCommits => write!(f, "Sorting commits"),
88            OperationType::SyncCommits => write!(f, "Syncing commit stacks"),
89            OperationType::UpdateCommits => write!(f, "Updating commits"),
90            OperationType::UpdateCommitGraph => write!(f, "Updating commit graph"),
91            OperationType::WalkCommits => write!(f, "Walking commits"),
92        }
93    }
94}
95
96#[derive(Clone, Debug)]
97enum OutputDest {
98    Stdout,
99    Suppress,
100    BufferForTest {
101        stdout: Arc<Mutex<Vec<u8>>>,
102        stderr: Arc<Mutex<Vec<u8>>>,
103    },
104}
105
106/// An index into the recursive hierarchy of progress bars. For example, the key
107/// `[OperationType::GetMergeBase, OperationType::WalkCommits]` refers to the
108/// "walk commits" operation which is nested under the "get merge-base"
109/// operation.
110type OperationKey = [OperationType];
111
112#[derive(Debug, Default)]
113struct RootOperation {
114    multi_progress: MultiProgress,
115    children: Vec<OperationState>,
116}
117
118impl RootOperation {
119    pub fn hide_multi_progress(&mut self) {
120        self.multi_progress
121            .set_draw_target(ProgressDrawTarget::hidden());
122    }
123
124    pub fn show_multi_progress(&mut self) {
125        self.multi_progress
126            .set_draw_target(ProgressDrawTarget::stderr());
127    }
128
129    /// If all operations are no longer in progress, clear the multi-progress bar.
130    pub fn clear_operations_if_finished(&mut self) {
131        if self
132            .children
133            .iter()
134            .all(|operation_state| operation_state.start_times.is_empty())
135        {
136            if self.multi_progress.clear().is_err() {
137                // Ignore error. Assume that the draw target is no longer available
138                // to write to.
139            }
140            self.children.clear();
141        }
142    }
143
144    pub fn get_or_create_child(&mut self, key: &[OperationType]) -> &mut OperationState {
145        match key {
146            [] => panic!("Empty operation key"),
147            [first, rest @ ..] => {
148                let index = match self
149                    .children
150                    .iter()
151                    .find_position(|child| &child.operation_type == first)
152                {
153                    Some((child_index, _)) => child_index,
154                    None => {
155                        self.children.push(OperationState {
156                            operation_type: first.clone(),
157                            progress_bar: ProgressBar::new_spinner(),
158                            has_meter: Default::default(),
159                            icon: Default::default(),
160                            progress_message: first.to_string(),
161                            start_times: Default::default(),
162                            elapsed_duration: Default::default(),
163                            children: Default::default(),
164                        });
165                        self.children.len() - 1
166                    }
167                };
168                self.children
169                    .get_mut(index)
170                    .unwrap()
171                    .get_or_create_child(rest)
172            }
173        }
174    }
175
176    pub fn get_child(&mut self, key: &[OperationType]) -> Option<&mut OperationState> {
177        match key {
178            [] => panic!("Empty operation key"),
179            [first, rest @ ..] => {
180                let index = self
181                    .children
182                    .iter()
183                    .find_position(|child| &child.operation_type == first);
184                match index {
185                    Some((index, _)) => self.children.get_mut(index).unwrap().get_child(rest),
186                    None => None,
187                }
188            }
189        }
190    }
191
192    /// Re-render all operation progress bars. This does not change their
193    /// ordering like [`refresh_multi_progress`] does.
194    pub fn tick(&mut self) {
195        let operations = {
196            let mut acc = Vec::new();
197            Self::traverse_operations(&mut acc, 0, &self.children);
198            acc
199        };
200        for (nesting_level, operation) in operations {
201            operation.tick(nesting_level);
202        }
203    }
204
205    /// Update the ordering of progress bars in the multi-progress. This should be called after
206    pub fn refresh_multi_progress(&mut self) {
207        let operations = {
208            let mut acc = Vec::new();
209            Self::traverse_operations(&mut acc, 0, &self.children);
210            acc
211        };
212        if self.multi_progress.clear().is_err() {
213            // Ignore the error and assume that the multi-progress is now dead,
214            // so it doesn't need to be updated.
215        }
216        for (nesting_level, operation) in operations {
217            // Avoid deadlock inside the progress bar library when we call
218            // `add`, which sets the draw target again.
219            operation
220                .progress_bar
221                .set_draw_target(ProgressDrawTarget::hidden());
222
223            self.multi_progress.add(operation.progress_bar.clone());
224
225            // Re-render only after it's been added to the multi-progress, so
226            // that the draw target has been set.
227            operation.tick(nesting_level);
228        }
229    }
230
231    fn traverse_operations<'a>(
232        acc: &mut Vec<(usize, &'a OperationState)>,
233        current_level: usize,
234        operations: &'a [OperationState],
235    ) {
236        for operation in operations {
237            acc.push((current_level, operation));
238            Self::traverse_operations(acc, current_level + 1, &operation.children);
239        }
240    }
241}
242
243/// The string values associated with [`OperationIcon`]s.
244pub mod icons {
245    /// Used to indicate success.
246    pub const CHECKMARK: &str = "✓";
247
248    /// Used to indicate a warning.
249    pub const EXCLAMATION: &str = "!";
250
251    /// Used to indicate failure.
252    ///
253    /// Can't use "✗️" in interactive progress meters because some terminals think its width is >1,
254    /// which seems to cause rendering issues because we use 1 as its width.
255    pub const CROSS: &str = "X";
256}
257
258/// An icon denoting the status of an operation.
259#[derive(Clone, Copy, Debug)]
260pub enum OperationIcon {
261    /// A suitable waiting icon should be rendered.
262    InProgress,
263
264    /// The operation was a success.
265    Success,
266
267    /// The operation produced a warning.
268    Warning,
269
270    /// The operation was a failure.
271    Failure,
272}
273
274impl Default for OperationIcon {
275    fn default() -> Self {
276        Self::InProgress
277    }
278}
279
280#[derive(Debug)]
281struct OperationState {
282    operation_type: OperationType,
283    progress_bar: ProgressBar,
284    has_meter: bool,
285    start_times: Vec<Instant>,
286    icon: OperationIcon,
287    progress_message: String,
288    elapsed_duration: Duration,
289    children: Vec<OperationState>,
290}
291
292impl OperationState {
293    pub fn set_progress(&mut self, current: usize, total: usize) {
294        self.has_meter = true;
295
296        if current
297            .try_into()
298            .map(|current: u64| current < self.progress_bar.position())
299            .unwrap_or(false)
300        {
301            // Workaround for issue fixed by
302            // <https://github.com/console-rs/indicatif/pull/403>.
303            self.progress_bar.reset_eta();
304        }
305
306        self.progress_bar.set_position(current.try_into().unwrap());
307        self.progress_bar.set_length(total.try_into().unwrap());
308    }
309
310    pub fn inc_progress(&mut self, increment: usize) {
311        self.progress_bar.inc(increment.try_into().unwrap());
312    }
313
314    pub fn get_or_create_child(&mut self, child_type: &[OperationType]) -> &mut Self {
315        match child_type {
316            [] => self,
317            [first, rest @ ..] => {
318                let index = match self
319                    .children
320                    .iter()
321                    .find_position(|child| &child.operation_type == first)
322                {
323                    Some((child_index, _)) => child_index,
324                    None => {
325                        self.children.push(OperationState {
326                            operation_type: first.clone(),
327                            progress_bar: ProgressBar::new_spinner(),
328                            has_meter: Default::default(),
329                            icon: Default::default(),
330                            progress_message: first.to_string(),
331                            start_times: Default::default(),
332                            elapsed_duration: Default::default(),
333                            children: Default::default(),
334                        });
335                        self.children.len() - 1
336                    }
337                };
338                self.children
339                    .get_mut(index)
340                    .unwrap()
341                    .get_or_create_child(rest)
342            }
343        }
344    }
345
346    pub fn get_child(&mut self, child_type: &[OperationType]) -> Option<&mut Self> {
347        match child_type {
348            [] => Some(self),
349            [first, rest @ ..] => {
350                let index = self
351                    .children
352                    .iter()
353                    .find_position(|child| &child.operation_type == first);
354                match index {
355                    Some((index, _)) => self.children.get_mut(index).unwrap().get_child(rest),
356                    None => None,
357                }
358            }
359        }
360    }
361
362    pub fn tick(&self, nesting_level: usize) {
363        lazy_static! {
364            static ref CHECKMARK: String = console::style(icons::CHECKMARK).green().to_string();
365            static ref EXCLAMATION: String = console::style(icons::EXCLAMATION).yellow().to_string();
366            static ref CROSS: String = console::style(icons::CROSS).red().to_string();
367            static ref IN_PROGRESS_SPINNER_STYLE: Arc<Mutex<ProgressStyle>> =
368                Arc::new(Mutex::new(ProgressStyle::default_spinner().template("{prefix}{spinner} {wide_msg}").unwrap()));
369            static ref IN_PROGRESS_BAR_STYLE: Arc<Mutex<ProgressStyle>> =
370                // indicatif places the cursor at the end of the line, which may
371                // be visible in the terminal, so we add a space at the end of
372                // the line so that the length number isn't overlapped by the
373                // cursor.
374                Arc::new(Mutex::new(ProgressStyle::default_bar().template("{prefix}{spinner} {wide_msg} {bar} {pos}/{len} ").unwrap()));
375            static ref WAITING_PROGRESS_STYLE: Arc<Mutex<ProgressStyle>> = Arc::new(Mutex::new(IN_PROGRESS_SPINNER_STYLE
376                .clone().lock().unwrap().clone()
377                // Requires at least two tick values, so just pass the same one twice.
378                .tick_strings(&[" ", " "])
379            ));
380            static ref SUCCESS_PROGRESS_STYLE: Arc<Mutex<ProgressStyle>> = Arc::new(Mutex::new(IN_PROGRESS_SPINNER_STYLE
381                .clone()
382                .lock()
383                .unwrap()
384                .clone()
385                // Requires at least two tick values, so just pass the same one twice.
386                .tick_strings(&[&CHECKMARK, &CHECKMARK])));
387            static ref WARNING_PROGRESS_STYLE: Arc<Mutex<ProgressStyle>> = Arc::new(Mutex::new(IN_PROGRESS_SPINNER_STYLE
388                .clone()
389                .lock()
390                .unwrap()
391                .clone()
392                // Requires at least two tick values, so just pass the same one twice.
393                .tick_strings(&[&CROSS, &CROSS])));
394            static ref FAILURE_PROGRESS_STYLE: Arc<Mutex<ProgressStyle>> = Arc::new(Mutex::new(IN_PROGRESS_SPINNER_STYLE
395                .clone()
396                .lock()
397                .unwrap()
398                .clone()
399                // Requires at least two tick values, so just pass the same one twice.
400                .tick_strings(&[&CROSS, &CROSS])));
401        }
402
403        let elapsed_duration = match self.start_times.iter().min() {
404            None => self.elapsed_duration,
405            Some(start_time) => {
406                let additional_duration = Instant::now().saturating_duration_since(*start_time);
407                self.elapsed_duration + additional_duration
408            }
409        };
410
411        self.progress_bar.set_style(
412            match (self.start_times.as_slice(), self.has_meter, self.icon) {
413                (_, _, OperationIcon::Success) => SUCCESS_PROGRESS_STYLE.lock().unwrap().clone(),
414                (_, _, OperationIcon::Warning) => WARNING_PROGRESS_STYLE.lock().unwrap().clone(),
415                (_, _, OperationIcon::Failure) => FAILURE_PROGRESS_STYLE.lock().unwrap().clone(),
416                ([], _, OperationIcon::InProgress) => {
417                    WAITING_PROGRESS_STYLE.lock().unwrap().clone()
418                }
419                ([..], false, OperationIcon::InProgress) => {
420                    IN_PROGRESS_SPINNER_STYLE.lock().unwrap().clone()
421                }
422                ([..], true, OperationIcon::InProgress) => {
423                    IN_PROGRESS_BAR_STYLE.lock().unwrap().clone()
424                }
425            },
426        );
427        self.progress_bar.set_prefix("  ".repeat(nesting_level));
428        self.progress_bar.set_message(format!(
429            "{} ({:.1}s)",
430            self.progress_message,
431            elapsed_duration.as_secs_f64(),
432        ));
433        self.progress_bar.tick();
434    }
435}
436
437/// Wrapper around side-effectful operations, such as output and progress
438/// indicators.
439#[derive(Clone)]
440pub struct Effects {
441    glyphs: Glyphs,
442    dest: OutputDest,
443    updater_thread_handle: Arc<RwLock<UpdaterThreadHandle>>,
444    operation_key: Vec<OperationType>,
445    root_operation: Arc<Mutex<RootOperation>>,
446}
447
448impl std::fmt::Debug for Effects {
449    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450        write!(
451            f,
452            "<Output fancy={}>",
453            self.glyphs.should_write_ansi_escape_codes
454        )
455    }
456}
457
458#[derive(Clone, Debug, Default)]
459struct UpdaterThreadHandle {
460    is_visible: bool,
461}
462
463fn spawn_progress_updater_thread(
464    root_operation: &Arc<Mutex<RootOperation>>,
465) -> Arc<RwLock<UpdaterThreadHandle>> {
466    {
467        let mut root_operation = root_operation.lock().unwrap();
468        root_operation.hide_multi_progress();
469    }
470    let root_operation = Arc::downgrade(root_operation);
471    let handle = Arc::new(RwLock::new(UpdaterThreadHandle { is_visible: false }));
472
473    thread::spawn({
474        let handle = Arc::clone(&handle);
475        move || {
476            // Don't start displaying progress immediately, since if the operation
477            // finishes quickly, then it will flicker annoyingly.
478            thread::sleep(Duration::from_millis(250));
479            {
480                let mut handle = handle.write().unwrap();
481                match root_operation.upgrade() {
482                    Some(root_operation) => {
483                        let mut root_operation = root_operation.lock().unwrap();
484                        root_operation.show_multi_progress();
485                    }
486                    None => return,
487                }
488                handle.is_visible = true;
489            }
490
491            loop {
492                // Drop the `Arc` after this block, before the sleep, to make sure
493                // that progress bars aren't kept alive longer than they should be.
494                match root_operation.upgrade() {
495                    None => return,
496                    Some(root_operation) => {
497                        let mut root_operation = root_operation.lock().unwrap();
498                        root_operation.tick();
499                    }
500                }
501
502                thread::sleep(Duration::from_millis(100));
503            }
504        }
505    });
506
507    handle
508}
509
510impl Effects {
511    /// Constructor. Writes to stdout.
512    pub fn new(glyphs: Glyphs) -> Self {
513        let root_operation = Default::default();
514        let updater_thread_handle = spawn_progress_updater_thread(&root_operation);
515        Effects {
516            glyphs,
517            dest: OutputDest::Stdout,
518            updater_thread_handle,
519            operation_key: Default::default(),
520            root_operation,
521        }
522    }
523
524    /// Constructor. Suppresses all output.
525    pub fn new_suppress_for_test(glyphs: Glyphs) -> Self {
526        Effects {
527            glyphs,
528            dest: OutputDest::Suppress,
529            updater_thread_handle: Default::default(),
530            operation_key: Default::default(),
531            root_operation: Default::default(),
532        }
533    }
534
535    /// Constructor. Writes to the provided buffer.
536    pub fn new_from_buffer_for_test(
537        glyphs: Glyphs,
538        stdout: &Arc<Mutex<Vec<u8>>>,
539        stderr: &Arc<Mutex<Vec<u8>>>,
540    ) -> Self {
541        Effects {
542            glyphs,
543            dest: OutputDest::BufferForTest {
544                stdout: Arc::clone(stdout),
545                stderr: Arc::clone(stderr),
546            },
547            updater_thread_handle: Default::default(),
548            operation_key: Default::default(),
549            root_operation: Default::default(),
550        }
551    }
552
553    /// Send output to an appropriate place when using a terminal user interface
554    /// (TUI), such as for `git undo`.
555    pub fn enable_tui_mode(&self) -> Self {
556        let mut root_operation = self.root_operation.lock().unwrap();
557        root_operation.hide_multi_progress();
558        Self {
559            dest: OutputDest::Suppress,
560            ..self.clone()
561        }
562    }
563
564    /// Suppress output sent to the returned `Effects`.
565    pub fn suppress(&self) -> Self {
566        Self {
567            dest: OutputDest::Suppress,
568            ..self.clone()
569        }
570    }
571
572    /// Apply transformations to the returned `Effects` to support emitting
573    /// graphical output in the opposite of its usual order.
574    pub fn reverse_order(&self, reverse: bool) -> Self {
575        Self {
576            glyphs: self.glyphs.clone().reverse_order(reverse),
577            ..self.clone()
578        }
579    }
580
581    /// Start reporting progress for the specified operation type.
582    ///
583    /// A progress spinner is shown until the returned `ProgressHandle` is
584    /// dropped, at which point the spinner transitions to a "complete" message.
585    ///
586    /// Progress spinners are nested hierarchically. If this function is called
587    /// while another `ProgressHandle` is still alive, then the returned
588    /// `ProgressHandle` will be a child of the first handle.
589    ///
590    /// None of the progress indicators are cleared from the screen until *all*
591    /// of the operations have completed. Furthermore, their internal timer is
592    /// not reset while they are still on the screen.
593    ///
594    /// If you finish and then start the same operation type again while it is
595    /// still displayed, then it will transition back into the "progress" state,
596    /// and its displayed duration will start increasing from its previous
597    /// value, rather than from zero. The practical implication is that you can
598    /// see the aggregate time it took to carry out sibling operations, i.e. the
599    /// same operation called multiple times in a loop.
600    pub fn start_operation(&self, operation_type: OperationType) -> (Effects, ProgressHandle) {
601        let operation_key = {
602            let mut result = self.operation_key.clone();
603            result.push(operation_type);
604            result
605        };
606        let progress = ProgressHandle {
607            effects: self,
608            operation_key: operation_key.clone(),
609        };
610        match self.dest {
611            OutputDest::Stdout => {}
612            OutputDest::Suppress | OutputDest::BufferForTest { .. } => {
613                return (self.clone(), progress)
614            }
615        }
616
617        let now = Instant::now();
618        let mut root_operation = self.root_operation.lock().unwrap();
619        let operation_state = root_operation.get_or_create_child(&operation_key);
620        operation_state.start_times.push(now);
621        root_operation.refresh_multi_progress();
622
623        let effects = Self {
624            operation_key,
625            ..self.clone()
626        };
627        (effects, progress)
628    }
629
630    fn on_notify_progress(&self, operation_key: &OperationKey, current: usize, total: usize) {
631        match self.dest {
632            OutputDest::Stdout => {}
633            OutputDest::Suppress | OutputDest::BufferForTest { .. } => return,
634        }
635
636        let mut root_operation = self.root_operation.lock().unwrap();
637        let operation_state = root_operation.get_or_create_child(operation_key);
638        operation_state.set_progress(current, total);
639    }
640
641    fn on_notify_progress_inc(&self, operation_key: &OperationKey, increment: usize) {
642        match self.dest {
643            OutputDest::Stdout => {}
644            OutputDest::Suppress | OutputDest::BufferForTest { .. } => return,
645        }
646
647        let mut root_operation = self.root_operation.lock().unwrap();
648        let operation = root_operation.get_child(operation_key);
649        let operation_state = match operation {
650            Some(operation_state) => operation_state,
651            None => return,
652        };
653        operation_state.inc_progress(increment);
654    }
655
656    fn on_set_message(&self, operation_key: &OperationKey, icon: OperationIcon, message: String) {
657        match self.dest {
658            OutputDest::Stdout => {}
659            OutputDest::Suppress | OutputDest::BufferForTest { .. } => return,
660        }
661
662        let mut root_operation = self.root_operation.lock().unwrap();
663        let operation = root_operation.get_child(operation_key);
664        let operation_state = match operation {
665            Some(operation_state) => operation_state,
666            None => return,
667        };
668        operation_state.icon = icon;
669        operation_state.progress_message = message;
670    }
671
672    fn on_drop_progress_handle(&self, operation_key: &OperationKey) {
673        match self.dest {
674            OutputDest::Stdout => {}
675            OutputDest::Suppress | OutputDest::BufferForTest { .. } => return,
676        }
677
678        let now = Instant::now();
679        let mut root_operation = self.root_operation.lock().unwrap();
680
681        let operation = root_operation.get_child(operation_key);
682        let operation_state = match operation {
683            Some(operation_state) => operation_state,
684            None => {
685                drop(root_operation); // Avoid potential deadlock.
686                warn!(?operation_key, "Progress operation not started");
687                return;
688            }
689        };
690
691        let previous_start_time = match operation_state
692            .start_times
693            .iter()
694            // Remove a maximum element each time. This means that the last
695            // element removed will be the minimum of all elements seen over
696            // time.
697            .position_max_by_key(|x| *x)
698        {
699            Some(start_time_index) => operation_state.start_times.remove(start_time_index),
700            None => {
701                drop(root_operation); // Avoid potential deadlock.
702                warn!(
703                    ?operation_key,
704                    "Progress operation ended without matching start call"
705                );
706                return;
707            }
708        };
709
710        // In the event of multiple concurrent operations, we only want to add
711        // the wall-clock time to the elapsed duration.
712        operation_state.elapsed_duration += if operation_state.start_times.is_empty() {
713            now.saturating_duration_since(previous_start_time)
714        } else {
715            Duration::ZERO
716        };
717
718        root_operation.clear_operations_if_finished();
719    }
720
721    /// Get the set of glyphs associated with the output.
722    pub fn get_glyphs(&self) -> &Glyphs {
723        &self.glyphs
724    }
725
726    /// Create a stream that can be written to. The output might go to stdout or
727    /// be rendered specially in the terminal.
728    pub fn get_output_stream(&self) -> OutputStream {
729        OutputStream {
730            dest: self.dest.clone(),
731            buffer: Default::default(),
732            updater_thread_handle: Arc::clone(&self.updater_thread_handle),
733            root_operation: Arc::clone(&self.root_operation),
734        }
735    }
736
737    /// Create a stream that error output can be written to, rather than regular
738    /// output.
739    pub fn get_error_stream(&self) -> ErrorStream {
740        ErrorStream {
741            dest: self.dest.clone(),
742            buffer: Default::default(),
743            updater_thread_handle: Arc::clone(&self.updater_thread_handle),
744            root_operation: Arc::clone(&self.root_operation),
745        }
746    }
747}
748
749trait WriteProgress {
750    type Stream: WriteIo;
751    fn get_stream() -> Self::Stream;
752    fn get_buffer(&mut self) -> &mut String;
753    fn get_root_operation(&self) -> Arc<Mutex<RootOperation>>;
754    fn get_updater_thread_handle(&self) -> Arc<RwLock<UpdaterThreadHandle>>;
755    fn style_output(output: &str) -> String;
756
757    fn flush(&mut self) {
758        let root_operation = self.get_root_operation();
759        let root_operation = root_operation.lock().unwrap();
760
761        // Get an arbitrary progress meter. It turns out that when a
762        // `ProgressBar` is included in a `MultiProgress`, it doesn't matter
763        // which of them we call `println` on. The output will be printed above
764        // the `MultiProgress` regardless.
765        let operation = root_operation.children.first();
766        match operation {
767            None => {
768                // There's no progress meters, so we can write directly to
769                // stdout. Note that we don't style output here; instead, we
770                // pass through exactly what we got from the caller.
771                write!(Self::get_stream(), "{}", take(self.get_buffer())).unwrap();
772                Self::get_stream().flush().unwrap();
773            }
774
775            Some(_operation_state) if !console::user_attended_stderr() => {
776                // The progress meters will be hidden, and any `println`
777                // calls on them will be ignored.
778                write!(Self::get_stream(), "{}", take(self.get_buffer())).unwrap();
779                Self::get_stream().flush().unwrap();
780            }
781
782            Some(_operation_state)
783                if !self.get_updater_thread_handle().read().unwrap().is_visible =>
784            {
785                // An operation has started, but we haven't started rendering
786                // the progress bars yet, because it hasn't been long enough.
787                // We'll write directly to the output stream, but make sure to
788                // style the output.
789                //
790                // Note that we can't use `ProgressBar::is_hidden` to detect if
791                // we should enter this case. The `ProgressBar`'s draw target
792                // will be set to the parent `MultiProgress`. The parent
793                // `MultiProgress` is the object with the hidden output, but
794                // it's not possible to get the draw target for the
795                // `MultiProgress` at present.
796                write!(
797                    Self::get_stream(),
798                    "{}",
799                    Self::style_output(&take(self.get_buffer()))
800                )
801                .unwrap();
802                Self::get_stream().flush().unwrap();
803            }
804
805            Some(operation_state) => {
806                // Use `Progress::println` to render output above the progress
807                // meters. We rely on buffering output because we can only print
808                // full lines at a time.
809                *self.get_buffer() = {
810                    let mut new_buffer = String::new();
811                    let lines = self.get_buffer().split_inclusive('\n');
812                    for line in lines {
813                        match line.strip_suffix('\n') {
814                            Some(line) => operation_state
815                                .progress_bar
816                                .println(Self::style_output(line)),
817                            None => {
818                                // This should only happen for the last element.
819                                new_buffer.push_str(line);
820                            }
821                        }
822                    }
823                    new_buffer
824                };
825            }
826        };
827    }
828
829    fn drop(&mut self) {
830        let buffer = self.get_buffer();
831        if !buffer.is_empty() {
832            // NB: this only flushes completely-written lines, which is not
833            // correct. We should flush the entire buffer contents, and possibly
834            // force a newline at the end in the case of a progress meter being
835            // visible.
836            self.flush();
837        }
838    }
839}
840
841/// A handle to stdout, but doesn't overwrite interactive progress notifications.
842pub struct OutputStream {
843    dest: OutputDest,
844    buffer: String,
845    updater_thread_handle: Arc<RwLock<UpdaterThreadHandle>>,
846    root_operation: Arc<Mutex<RootOperation>>,
847}
848
849impl WriteProgress for OutputStream {
850    type Stream = Stdout;
851
852    fn get_stream() -> Self::Stream {
853        stdout()
854    }
855
856    fn get_buffer(&mut self) -> &mut String {
857        &mut self.buffer
858    }
859
860    fn get_root_operation(&self) -> Arc<Mutex<RootOperation>> {
861        Arc::clone(&self.root_operation)
862    }
863
864    fn get_updater_thread_handle(&self) -> Arc<RwLock<UpdaterThreadHandle>> {
865        Arc::clone(&self.updater_thread_handle)
866    }
867
868    fn style_output(output: &str) -> String {
869        let output = console::strip_ansi_codes(output);
870        console::style(output).dim().to_string()
871    }
872}
873
874impl Write for OutputStream {
875    fn write_str(&mut self, s: &str) -> std::fmt::Result {
876        match &self.dest {
877            OutputDest::Stdout => {
878                self.buffer.push_str(s);
879                self.flush();
880            }
881
882            OutputDest::Suppress => {
883                // Do nothing.
884            }
885
886            OutputDest::BufferForTest { stdout, stderr: _ } => {
887                let mut buffer = stdout.lock().unwrap();
888                write!(buffer, "{s}").unwrap();
889            }
890        }
891        Ok(())
892    }
893}
894
895impl Drop for OutputStream {
896    fn drop(&mut self) {
897        WriteProgress::drop(self)
898    }
899}
900
901/// A handle to stderr, but doesn't overwrite interactive progress notifications.
902pub struct ErrorStream {
903    dest: OutputDest,
904    buffer: String,
905    updater_thread_handle: Arc<RwLock<UpdaterThreadHandle>>,
906    root_operation: Arc<Mutex<RootOperation>>,
907}
908
909impl WriteProgress for ErrorStream {
910    type Stream = Stderr;
911
912    fn get_stream() -> Self::Stream {
913        stderr()
914    }
915
916    fn get_buffer(&mut self) -> &mut String {
917        &mut self.buffer
918    }
919
920    fn get_root_operation(&self) -> Arc<Mutex<RootOperation>> {
921        Arc::clone(&self.root_operation)
922    }
923
924    fn get_updater_thread_handle(&self) -> Arc<RwLock<UpdaterThreadHandle>> {
925        Arc::clone(&self.updater_thread_handle)
926    }
927
928    fn style_output(output: &str) -> String {
929        let output = console::strip_ansi_codes(output);
930        console::style(output).dim().to_string()
931    }
932}
933
934impl Write for ErrorStream {
935    fn write_str(&mut self, s: &str) -> std::fmt::Result {
936        match &self.dest {
937            OutputDest::Stdout => {
938                self.buffer.push_str(s);
939                WriteProgress::flush(self);
940            }
941
942            OutputDest::Suppress => {
943                // Do nothing.
944            }
945
946            OutputDest::BufferForTest { stdout: _, stderr } => {
947                let mut buffer = stderr.lock().unwrap();
948                write!(buffer, "{s}").unwrap();
949            }
950        }
951        Ok(())
952    }
953}
954
955/// You probably don't want this. This implementation is only for `tracing`'s `fmt_layer`, because
956/// it needs a writer of type `io::Write`, but `Effects` normally uses its implementation of
957/// `fmt::Write`.
958impl io::Write for ErrorStream {
959    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
960        match &self.dest {
961            OutputDest::Stdout => {
962                self.buffer.push_str(buf.to_str_lossy().as_ref());
963                Ok(buf.len())
964            }
965            OutputDest::Suppress => {
966                // Do nothing.
967                Ok(buf.len())
968            }
969            OutputDest::BufferForTest { stdout: _, stderr } => {
970                let mut buffer = stderr.lock().unwrap();
971                buffer.write(buf)
972            }
973        }
974    }
975
976    fn flush(&mut self) -> io::Result<()> {
977        WriteProgress::flush(self);
978        Ok(())
979    }
980}
981
982impl Drop for ErrorStream {
983    fn drop(&mut self) {
984        WriteProgress::drop(self);
985    }
986}
987
988/// A handle to an operation in progress. This object should be kept live while
989/// the operation is underway, and a timing entry for it will be displayed in
990/// the interactive progress display.
991#[derive(Debug)]
992pub struct ProgressHandle<'a> {
993    effects: &'a Effects,
994    operation_key: Vec<OperationType>,
995}
996
997impl Drop for ProgressHandle<'_> {
998    fn drop(&mut self) {
999        self.effects.on_drop_progress_handle(&self.operation_key)
1000    }
1001}
1002
1003impl<'a> ProgressHandle<'a> {
1004    /// Notify the progress meter that the current operation has `total`
1005    /// discrete units of work, and it's currently `current` units of the way
1006    /// through the operation.
1007    pub fn notify_progress(&self, current: usize, total: usize) {
1008        self.effects
1009            .on_notify_progress(&self.operation_key, current, total);
1010    }
1011
1012    /// Notify the progress meter that additional progress has taken place.
1013    /// Should be used only after a call to `notify_progress` to indicate how
1014    /// much total work there is.
1015    pub fn notify_progress_inc(&self, increment: usize) {
1016        self.effects
1017            .on_notify_progress_inc(&self.operation_key, increment);
1018    }
1019
1020    /// Update the message for this progress meter.
1021    pub fn notify_status(&self, icon: OperationIcon, message: impl Into<String>) {
1022        let message = message.into();
1023        self.effects
1024            .on_set_message(&self.operation_key, icon, message);
1025    }
1026}
1027
1028/// A wrapper around an iterator that reports progress as it iterates.
1029pub struct ProgressIter<'a, TItem, TInner: Iterator<Item = TItem>> {
1030    index: usize,
1031    inner: TInner,
1032    progress: ProgressHandle<'a>,
1033}
1034
1035impl<TItem, TInner: Iterator<Item = TItem>> Iterator for ProgressIter<'_, TItem, TInner> {
1036    type Item = TItem;
1037
1038    fn next(&mut self) -> Option<Self::Item> {
1039        let (lower, upper) = self.inner.size_hint();
1040        let size_guess = upper.unwrap_or(lower);
1041        self.progress.notify_progress(self.index, size_guess);
1042        self.index += 1;
1043        self.inner.next()
1044    }
1045}
1046
1047/// Extension trait for iterators that adds a `with_progress` method.
1048pub trait WithProgress<'a, TItem>: Iterator<Item = TItem> {
1049    /// The type of the iterator returned by `with_progress`.
1050    type Iter: Iterator<Item = TItem> + 'a;
1051
1052    /// Wrap the iterator into an iterator that reports progress as it consumes items.
1053    fn with_progress(self, progress: ProgressHandle<'a>) -> Self::Iter;
1054}
1055
1056impl<'a, TItem: 'a, TIter: Iterator<Item = TItem> + 'a> WithProgress<'a, TItem> for TIter {
1057    type Iter = ProgressIter<'a, TItem, TIter>;
1058
1059    fn with_progress(self, progress: ProgressHandle<'a>) -> Self::Iter {
1060        ProgressIter {
1061            index: 0,
1062            inner: self,
1063            progress,
1064        }
1065    }
1066}
1067
1068#[cfg(test)]
1069mod tests {
1070    use super::*;
1071
1072    #[test]
1073    fn test_effects_progress() -> eyre::Result<()> {
1074        let effects = Effects::new(Glyphs::text());
1075        let (effects2, progress2) = effects.start_operation(OperationType::GetMergeBase);
1076
1077        {
1078            let mut root_operation = effects.root_operation.lock().unwrap();
1079            let get_merge_base_operation = root_operation
1080                .get_child(&[OperationType::GetMergeBase])
1081                .unwrap();
1082            assert_eq!(get_merge_base_operation.start_times.len(), 1);
1083        }
1084
1085        std::thread::sleep(Duration::from_millis(1));
1086        let (_effects3, progress3) = effects.start_operation(OperationType::GetMergeBase);
1087        let earlier_start_time = {
1088            let mut root_operation = effects.root_operation.lock().unwrap();
1089            let get_merge_base_operation = root_operation
1090                .get_child(&[OperationType::GetMergeBase])
1091                .ok_or_else(|| eyre::eyre!("Could not find merge-base operation"))?;
1092            assert_eq!(get_merge_base_operation.start_times.len(), 2);
1093            get_merge_base_operation.start_times[0]
1094        };
1095
1096        drop(progress3);
1097        {
1098            let mut root_operation = effects.root_operation.lock().unwrap();
1099            let get_merge_base_operation = root_operation
1100                .get_child(&[OperationType::GetMergeBase])
1101                .unwrap();
1102            // Ensure that we try to keep the earliest times in the list, to
1103            // accurately gauge the wall-clock time.
1104            assert_eq!(
1105                get_merge_base_operation.start_times,
1106                vec![earlier_start_time]
1107            );
1108        }
1109
1110        // Nest an operation.
1111        let (_effects4, progress4) = effects2.start_operation(OperationType::CalculateDiff);
1112        std::thread::sleep(Duration::from_millis(1));
1113        drop(progress4);
1114        {
1115            let mut root_operation = effects.root_operation.lock().unwrap();
1116            // The operation should still be present until the root-level
1117            // operation has finished, even if it's not currently in progress.
1118            let calculate_diff_operation = root_operation
1119                .get_child(&[OperationType::GetMergeBase, OperationType::CalculateDiff])
1120                .unwrap();
1121            assert!(calculate_diff_operation.start_times.is_empty());
1122            assert!(calculate_diff_operation.elapsed_duration >= Duration::from_millis(1));
1123        }
1124
1125        drop(progress2);
1126        {
1127            let root_operation = effects.root_operation.lock().unwrap();
1128            assert!(root_operation.children.is_empty());
1129        }
1130
1131        Ok(())
1132    }
1133
1134    /// Test for the issue fixed by <https://github.com/console-rs/indicatif/pull/403>.
1135    #[test]
1136    fn test_effects_progress_rewind_panic() -> eyre::Result<()> {
1137        let effects = Effects::new(Glyphs::text());
1138        let (effects, progress) = effects.start_operation(OperationType::GetMergeBase);
1139        let _ = effects;
1140        progress.notify_progress(3, 10);
1141        // Should not panic.
1142        progress.notify_progress(0, 10);
1143        Ok(())
1144    }
1145}