1use bstr::ByteSlice;
4use std::fmt::{Debug, Display, Write};
5use std::io::{stderr, stdout, Stderr, Stdout, Write as WriteIo};
6use std::mem::take;
7use std::sync::{Arc, Mutex, RwLock};
8use std::time::{Duration, Instant};
9use std::{io, thread};
10
11use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
12use itertools::Itertools;
13use lazy_static::lazy_static;
14use tracing::warn;
15
16use crate::core::formatting::Glyphs;
17
18#[allow(missing_docs)]
19#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
20pub enum OperationType {
21 BuildRebasePlan,
22 CalculateDiff,
23 CalculatePatchId,
24 CheckForCycles,
25 ConstrainCommits,
26 DetectDuplicateCommits,
27 EvaluateRevset(Arc<String>),
28 FilterByTouchedPaths,
29 FilterCommits,
30 FindPathToMergeBase,
31 GetMergeBase,
32 GetTouchedPaths,
33 GetUpstreamPatchIds,
34 InitializeRebase,
35 MakeGraph,
36 ProcessEvents,
37 PushCommits,
38 QueryWorkingCopy,
39 ReadingFromCache,
40 RebaseCommits,
41 RepairBranches,
42 RepairCommits,
43 RunGitCommand(Arc<String>),
44 RunTestOnCommit(Arc<String>),
45 RunTests(Arc<String>),
46 SortCommits,
47 SyncCommits,
48 UpdateCommitGraph,
49 UpdateCommits,
50 WalkCommits,
51}
52
53impl Display for OperationType {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 match self {
56 OperationType::BuildRebasePlan => write!(f, "Building rebase plan"),
57 OperationType::CalculateDiff => write!(f, "Computing diffs"),
58 OperationType::CalculatePatchId => write!(f, "Hashing commit contents"),
59 OperationType::CheckForCycles => write!(f, "Checking for cycles"),
60 OperationType::ConstrainCommits => write!(f, "Creating commit constraints"),
61 OperationType::DetectDuplicateCommits => write!(f, "Checking for duplicate commits"),
62 OperationType::EvaluateRevset(revset) => {
63 write!(f, "Evaluating revset: {revset}")
64 }
65 OperationType::FilterByTouchedPaths => {
66 write!(f, "Filtering upstream commits by touched paths")
67 }
68 OperationType::FilterCommits => write!(f, "Filtering commits"),
69 OperationType::FindPathToMergeBase => write!(f, "Finding path to merge-base"),
70 OperationType::GetMergeBase => write!(f, "Calculating merge-bases"),
71 OperationType::GetTouchedPaths => write!(f, "Getting touched paths"),
72 OperationType::GetUpstreamPatchIds => write!(f, "Enumerating patch IDs"),
73 OperationType::InitializeRebase => write!(f, "Initializing rebase"),
74 OperationType::MakeGraph => write!(f, "Examining local history"),
75 OperationType::PushCommits => write!(f, "Pushing branches"),
76 OperationType::ProcessEvents => write!(f, "Processing events"),
77 OperationType::QueryWorkingCopy => write!(f, "Querying the working copy"),
78 OperationType::ReadingFromCache => write!(f, "Reading from cache"),
79 OperationType::RebaseCommits => write!(f, "Rebasing commits"),
80 OperationType::RepairBranches => write!(f, "Checking for broken branches"),
81 OperationType::RepairCommits => write!(f, "Checking for broken commits"),
82 OperationType::RunGitCommand(command) => {
83 write!(f, "Running Git command: {}", &command)
84 }
85 OperationType::RunTests(command) => write!(f, "Running command: {command}"),
86 OperationType::RunTestOnCommit(commit) => write!(f, "Waiting to run on {commit}"),
87 OperationType::SortCommits => write!(f, "Sorting commits"),
88 OperationType::SyncCommits => write!(f, "Syncing commit stacks"),
89 OperationType::UpdateCommits => write!(f, "Updating commits"),
90 OperationType::UpdateCommitGraph => write!(f, "Updating commit graph"),
91 OperationType::WalkCommits => write!(f, "Walking commits"),
92 }
93 }
94}
95
96#[derive(Clone, Debug)]
97enum OutputDest {
98 Stdout,
99 Suppress,
100 BufferForTest {
101 stdout: Arc<Mutex<Vec<u8>>>,
102 stderr: Arc<Mutex<Vec<u8>>>,
103 },
104}
105
106type OperationKey = [OperationType];
111
112#[derive(Debug, Default)]
113struct RootOperation {
114 multi_progress: MultiProgress,
115 children: Vec<OperationState>,
116}
117
118impl RootOperation {
119 pub fn hide_multi_progress(&mut self) {
120 self.multi_progress
121 .set_draw_target(ProgressDrawTarget::hidden());
122 }
123
124 pub fn show_multi_progress(&mut self) {
125 self.multi_progress
126 .set_draw_target(ProgressDrawTarget::stderr());
127 }
128
129 pub fn clear_operations_if_finished(&mut self) {
131 if self
132 .children
133 .iter()
134 .all(|operation_state| operation_state.start_times.is_empty())
135 {
136 if self.multi_progress.clear().is_err() {
137 }
140 self.children.clear();
141 }
142 }
143
144 pub fn get_or_create_child(&mut self, key: &[OperationType]) -> &mut OperationState {
145 match key {
146 [] => panic!("Empty operation key"),
147 [first, rest @ ..] => {
148 let index = match self
149 .children
150 .iter()
151 .find_position(|child| &child.operation_type == first)
152 {
153 Some((child_index, _)) => child_index,
154 None => {
155 self.children.push(OperationState {
156 operation_type: first.clone(),
157 progress_bar: ProgressBar::new_spinner(),
158 has_meter: Default::default(),
159 icon: Default::default(),
160 progress_message: first.to_string(),
161 start_times: Default::default(),
162 elapsed_duration: Default::default(),
163 children: Default::default(),
164 });
165 self.children.len() - 1
166 }
167 };
168 self.children
169 .get_mut(index)
170 .unwrap()
171 .get_or_create_child(rest)
172 }
173 }
174 }
175
176 pub fn get_child(&mut self, key: &[OperationType]) -> Option<&mut OperationState> {
177 match key {
178 [] => panic!("Empty operation key"),
179 [first, rest @ ..] => {
180 let index = self
181 .children
182 .iter()
183 .find_position(|child| &child.operation_type == first);
184 match index {
185 Some((index, _)) => self.children.get_mut(index).unwrap().get_child(rest),
186 None => None,
187 }
188 }
189 }
190 }
191
192 pub fn tick(&mut self) {
195 let operations = {
196 let mut acc = Vec::new();
197 Self::traverse_operations(&mut acc, 0, &self.children);
198 acc
199 };
200 for (nesting_level, operation) in operations {
201 operation.tick(nesting_level);
202 }
203 }
204
205 pub fn refresh_multi_progress(&mut self) {
207 let operations = {
208 let mut acc = Vec::new();
209 Self::traverse_operations(&mut acc, 0, &self.children);
210 acc
211 };
212 if self.multi_progress.clear().is_err() {
213 }
216 for (nesting_level, operation) in operations {
217 operation
220 .progress_bar
221 .set_draw_target(ProgressDrawTarget::hidden());
222
223 self.multi_progress.add(operation.progress_bar.clone());
224
225 operation.tick(nesting_level);
228 }
229 }
230
231 fn traverse_operations<'a>(
232 acc: &mut Vec<(usize, &'a OperationState)>,
233 current_level: usize,
234 operations: &'a [OperationState],
235 ) {
236 for operation in operations {
237 acc.push((current_level, operation));
238 Self::traverse_operations(acc, current_level + 1, &operation.children);
239 }
240 }
241}
242
243pub mod icons {
245 pub const CHECKMARK: &str = "✓";
247
248 pub const EXCLAMATION: &str = "!";
250
251 pub const CROSS: &str = "X";
256}
257
258#[derive(Clone, Copy, Debug)]
260pub enum OperationIcon {
261 InProgress,
263
264 Success,
266
267 Warning,
269
270 Failure,
272}
273
274impl Default for OperationIcon {
275 fn default() -> Self {
276 Self::InProgress
277 }
278}
279
280#[derive(Debug)]
281struct OperationState {
282 operation_type: OperationType,
283 progress_bar: ProgressBar,
284 has_meter: bool,
285 start_times: Vec<Instant>,
286 icon: OperationIcon,
287 progress_message: String,
288 elapsed_duration: Duration,
289 children: Vec<OperationState>,
290}
291
292impl OperationState {
293 pub fn set_progress(&mut self, current: usize, total: usize) {
294 self.has_meter = true;
295
296 if current
297 .try_into()
298 .map(|current: u64| current < self.progress_bar.position())
299 .unwrap_or(false)
300 {
301 self.progress_bar.reset_eta();
304 }
305
306 self.progress_bar.set_position(current.try_into().unwrap());
307 self.progress_bar.set_length(total.try_into().unwrap());
308 }
309
310 pub fn inc_progress(&mut self, increment: usize) {
311 self.progress_bar.inc(increment.try_into().unwrap());
312 }
313
314 pub fn get_or_create_child(&mut self, child_type: &[OperationType]) -> &mut Self {
315 match child_type {
316 [] => self,
317 [first, rest @ ..] => {
318 let index = match self
319 .children
320 .iter()
321 .find_position(|child| &child.operation_type == first)
322 {
323 Some((child_index, _)) => child_index,
324 None => {
325 self.children.push(OperationState {
326 operation_type: first.clone(),
327 progress_bar: ProgressBar::new_spinner(),
328 has_meter: Default::default(),
329 icon: Default::default(),
330 progress_message: first.to_string(),
331 start_times: Default::default(),
332 elapsed_duration: Default::default(),
333 children: Default::default(),
334 });
335 self.children.len() - 1
336 }
337 };
338 self.children
339 .get_mut(index)
340 .unwrap()
341 .get_or_create_child(rest)
342 }
343 }
344 }
345
346 pub fn get_child(&mut self, child_type: &[OperationType]) -> Option<&mut Self> {
347 match child_type {
348 [] => Some(self),
349 [first, rest @ ..] => {
350 let index = self
351 .children
352 .iter()
353 .find_position(|child| &child.operation_type == first);
354 match index {
355 Some((index, _)) => self.children.get_mut(index).unwrap().get_child(rest),
356 None => None,
357 }
358 }
359 }
360 }
361
362 pub fn tick(&self, nesting_level: usize) {
363 lazy_static! {
364 static ref CHECKMARK: String = console::style(icons::CHECKMARK).green().to_string();
365 static ref EXCLAMATION: String = console::style(icons::EXCLAMATION).yellow().to_string();
366 static ref CROSS: String = console::style(icons::CROSS).red().to_string();
367 static ref IN_PROGRESS_SPINNER_STYLE: Arc<Mutex<ProgressStyle>> =
368 Arc::new(Mutex::new(ProgressStyle::default_spinner().template("{prefix}{spinner} {wide_msg}").unwrap()));
369 static ref IN_PROGRESS_BAR_STYLE: Arc<Mutex<ProgressStyle>> =
370 Arc::new(Mutex::new(ProgressStyle::default_bar().template("{prefix}{spinner} {wide_msg} {bar} {pos}/{len} ").unwrap()));
375 static ref WAITING_PROGRESS_STYLE: Arc<Mutex<ProgressStyle>> = Arc::new(Mutex::new(IN_PROGRESS_SPINNER_STYLE
376 .clone().lock().unwrap().clone()
377 .tick_strings(&[" ", " "])
379 ));
380 static ref SUCCESS_PROGRESS_STYLE: Arc<Mutex<ProgressStyle>> = Arc::new(Mutex::new(IN_PROGRESS_SPINNER_STYLE
381 .clone()
382 .lock()
383 .unwrap()
384 .clone()
385 .tick_strings(&[&CHECKMARK, &CHECKMARK])));
387 static ref WARNING_PROGRESS_STYLE: Arc<Mutex<ProgressStyle>> = Arc::new(Mutex::new(IN_PROGRESS_SPINNER_STYLE
388 .clone()
389 .lock()
390 .unwrap()
391 .clone()
392 .tick_strings(&[&CROSS, &CROSS])));
394 static ref FAILURE_PROGRESS_STYLE: Arc<Mutex<ProgressStyle>> = Arc::new(Mutex::new(IN_PROGRESS_SPINNER_STYLE
395 .clone()
396 .lock()
397 .unwrap()
398 .clone()
399 .tick_strings(&[&CROSS, &CROSS])));
401 }
402
403 let elapsed_duration = match self.start_times.iter().min() {
404 None => self.elapsed_duration,
405 Some(start_time) => {
406 let additional_duration = Instant::now().saturating_duration_since(*start_time);
407 self.elapsed_duration + additional_duration
408 }
409 };
410
411 self.progress_bar.set_style(
412 match (self.start_times.as_slice(), self.has_meter, self.icon) {
413 (_, _, OperationIcon::Success) => SUCCESS_PROGRESS_STYLE.lock().unwrap().clone(),
414 (_, _, OperationIcon::Warning) => WARNING_PROGRESS_STYLE.lock().unwrap().clone(),
415 (_, _, OperationIcon::Failure) => FAILURE_PROGRESS_STYLE.lock().unwrap().clone(),
416 ([], _, OperationIcon::InProgress) => {
417 WAITING_PROGRESS_STYLE.lock().unwrap().clone()
418 }
419 ([..], false, OperationIcon::InProgress) => {
420 IN_PROGRESS_SPINNER_STYLE.lock().unwrap().clone()
421 }
422 ([..], true, OperationIcon::InProgress) => {
423 IN_PROGRESS_BAR_STYLE.lock().unwrap().clone()
424 }
425 },
426 );
427 self.progress_bar.set_prefix(" ".repeat(nesting_level));
428 self.progress_bar.set_message(format!(
429 "{} ({:.1}s)",
430 self.progress_message,
431 elapsed_duration.as_secs_f64(),
432 ));
433 self.progress_bar.tick();
434 }
435}
436
437#[derive(Clone)]
440pub struct Effects {
441 glyphs: Glyphs,
442 dest: OutputDest,
443 updater_thread_handle: Arc<RwLock<UpdaterThreadHandle>>,
444 operation_key: Vec<OperationType>,
445 root_operation: Arc<Mutex<RootOperation>>,
446}
447
448impl std::fmt::Debug for Effects {
449 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450 write!(
451 f,
452 "<Output fancy={}>",
453 self.glyphs.should_write_ansi_escape_codes
454 )
455 }
456}
457
458#[derive(Clone, Debug, Default)]
459struct UpdaterThreadHandle {
460 is_visible: bool,
461}
462
463fn spawn_progress_updater_thread(
464 root_operation: &Arc<Mutex<RootOperation>>,
465) -> Arc<RwLock<UpdaterThreadHandle>> {
466 {
467 let mut root_operation = root_operation.lock().unwrap();
468 root_operation.hide_multi_progress();
469 }
470 let root_operation = Arc::downgrade(root_operation);
471 let handle = Arc::new(RwLock::new(UpdaterThreadHandle { is_visible: false }));
472
473 thread::spawn({
474 let handle = Arc::clone(&handle);
475 move || {
476 thread::sleep(Duration::from_millis(250));
479 {
480 let mut handle = handle.write().unwrap();
481 match root_operation.upgrade() {
482 Some(root_operation) => {
483 let mut root_operation = root_operation.lock().unwrap();
484 root_operation.show_multi_progress();
485 }
486 None => return,
487 }
488 handle.is_visible = true;
489 }
490
491 loop {
492 match root_operation.upgrade() {
495 None => return,
496 Some(root_operation) => {
497 let mut root_operation = root_operation.lock().unwrap();
498 root_operation.tick();
499 }
500 }
501
502 thread::sleep(Duration::from_millis(100));
503 }
504 }
505 });
506
507 handle
508}
509
510impl Effects {
511 pub fn new(glyphs: Glyphs) -> Self {
513 let root_operation = Default::default();
514 let updater_thread_handle = spawn_progress_updater_thread(&root_operation);
515 Effects {
516 glyphs,
517 dest: OutputDest::Stdout,
518 updater_thread_handle,
519 operation_key: Default::default(),
520 root_operation,
521 }
522 }
523
524 pub fn new_suppress_for_test(glyphs: Glyphs) -> Self {
526 Effects {
527 glyphs,
528 dest: OutputDest::Suppress,
529 updater_thread_handle: Default::default(),
530 operation_key: Default::default(),
531 root_operation: Default::default(),
532 }
533 }
534
535 pub fn new_from_buffer_for_test(
537 glyphs: Glyphs,
538 stdout: &Arc<Mutex<Vec<u8>>>,
539 stderr: &Arc<Mutex<Vec<u8>>>,
540 ) -> Self {
541 Effects {
542 glyphs,
543 dest: OutputDest::BufferForTest {
544 stdout: Arc::clone(stdout),
545 stderr: Arc::clone(stderr),
546 },
547 updater_thread_handle: Default::default(),
548 operation_key: Default::default(),
549 root_operation: Default::default(),
550 }
551 }
552
553 pub fn enable_tui_mode(&self) -> Self {
556 let mut root_operation = self.root_operation.lock().unwrap();
557 root_operation.hide_multi_progress();
558 Self {
559 dest: OutputDest::Suppress,
560 ..self.clone()
561 }
562 }
563
564 pub fn suppress(&self) -> Self {
566 Self {
567 dest: OutputDest::Suppress,
568 ..self.clone()
569 }
570 }
571
572 pub fn reverse_order(&self, reverse: bool) -> Self {
575 Self {
576 glyphs: self.glyphs.clone().reverse_order(reverse),
577 ..self.clone()
578 }
579 }
580
581 pub fn start_operation(&self, operation_type: OperationType) -> (Effects, ProgressHandle) {
601 let operation_key = {
602 let mut result = self.operation_key.clone();
603 result.push(operation_type);
604 result
605 };
606 let progress = ProgressHandle {
607 effects: self,
608 operation_key: operation_key.clone(),
609 };
610 match self.dest {
611 OutputDest::Stdout => {}
612 OutputDest::Suppress | OutputDest::BufferForTest { .. } => {
613 return (self.clone(), progress)
614 }
615 }
616
617 let now = Instant::now();
618 let mut root_operation = self.root_operation.lock().unwrap();
619 let operation_state = root_operation.get_or_create_child(&operation_key);
620 operation_state.start_times.push(now);
621 root_operation.refresh_multi_progress();
622
623 let effects = Self {
624 operation_key,
625 ..self.clone()
626 };
627 (effects, progress)
628 }
629
630 fn on_notify_progress(&self, operation_key: &OperationKey, current: usize, total: usize) {
631 match self.dest {
632 OutputDest::Stdout => {}
633 OutputDest::Suppress | OutputDest::BufferForTest { .. } => return,
634 }
635
636 let mut root_operation = self.root_operation.lock().unwrap();
637 let operation_state = root_operation.get_or_create_child(operation_key);
638 operation_state.set_progress(current, total);
639 }
640
641 fn on_notify_progress_inc(&self, operation_key: &OperationKey, increment: usize) {
642 match self.dest {
643 OutputDest::Stdout => {}
644 OutputDest::Suppress | OutputDest::BufferForTest { .. } => return,
645 }
646
647 let mut root_operation = self.root_operation.lock().unwrap();
648 let operation = root_operation.get_child(operation_key);
649 let operation_state = match operation {
650 Some(operation_state) => operation_state,
651 None => return,
652 };
653 operation_state.inc_progress(increment);
654 }
655
656 fn on_set_message(&self, operation_key: &OperationKey, icon: OperationIcon, message: String) {
657 match self.dest {
658 OutputDest::Stdout => {}
659 OutputDest::Suppress | OutputDest::BufferForTest { .. } => return,
660 }
661
662 let mut root_operation = self.root_operation.lock().unwrap();
663 let operation = root_operation.get_child(operation_key);
664 let operation_state = match operation {
665 Some(operation_state) => operation_state,
666 None => return,
667 };
668 operation_state.icon = icon;
669 operation_state.progress_message = message;
670 }
671
672 fn on_drop_progress_handle(&self, operation_key: &OperationKey) {
673 match self.dest {
674 OutputDest::Stdout => {}
675 OutputDest::Suppress | OutputDest::BufferForTest { .. } => return,
676 }
677
678 let now = Instant::now();
679 let mut root_operation = self.root_operation.lock().unwrap();
680
681 let operation = root_operation.get_child(operation_key);
682 let operation_state = match operation {
683 Some(operation_state) => operation_state,
684 None => {
685 drop(root_operation); warn!(?operation_key, "Progress operation not started");
687 return;
688 }
689 };
690
691 let previous_start_time = match operation_state
692 .start_times
693 .iter()
694 .position_max_by_key(|x| *x)
698 {
699 Some(start_time_index) => operation_state.start_times.remove(start_time_index),
700 None => {
701 drop(root_operation); warn!(
703 ?operation_key,
704 "Progress operation ended without matching start call"
705 );
706 return;
707 }
708 };
709
710 operation_state.elapsed_duration += if operation_state.start_times.is_empty() {
713 now.saturating_duration_since(previous_start_time)
714 } else {
715 Duration::ZERO
716 };
717
718 root_operation.clear_operations_if_finished();
719 }
720
721 pub fn get_glyphs(&self) -> &Glyphs {
723 &self.glyphs
724 }
725
726 pub fn get_output_stream(&self) -> OutputStream {
729 OutputStream {
730 dest: self.dest.clone(),
731 buffer: Default::default(),
732 updater_thread_handle: Arc::clone(&self.updater_thread_handle),
733 root_operation: Arc::clone(&self.root_operation),
734 }
735 }
736
737 pub fn get_error_stream(&self) -> ErrorStream {
740 ErrorStream {
741 dest: self.dest.clone(),
742 buffer: Default::default(),
743 updater_thread_handle: Arc::clone(&self.updater_thread_handle),
744 root_operation: Arc::clone(&self.root_operation),
745 }
746 }
747}
748
749trait WriteProgress {
750 type Stream: WriteIo;
751 fn get_stream() -> Self::Stream;
752 fn get_buffer(&mut self) -> &mut String;
753 fn get_root_operation(&self) -> Arc<Mutex<RootOperation>>;
754 fn get_updater_thread_handle(&self) -> Arc<RwLock<UpdaterThreadHandle>>;
755 fn style_output(output: &str) -> String;
756
757 fn flush(&mut self) {
758 let root_operation = self.get_root_operation();
759 let root_operation = root_operation.lock().unwrap();
760
761 let operation = root_operation.children.first();
766 match operation {
767 None => {
768 write!(Self::get_stream(), "{}", take(self.get_buffer())).unwrap();
772 Self::get_stream().flush().unwrap();
773 }
774
775 Some(_operation_state) if !console::user_attended_stderr() => {
776 write!(Self::get_stream(), "{}", take(self.get_buffer())).unwrap();
779 Self::get_stream().flush().unwrap();
780 }
781
782 Some(_operation_state)
783 if !self.get_updater_thread_handle().read().unwrap().is_visible =>
784 {
785 write!(
797 Self::get_stream(),
798 "{}",
799 Self::style_output(&take(self.get_buffer()))
800 )
801 .unwrap();
802 Self::get_stream().flush().unwrap();
803 }
804
805 Some(operation_state) => {
806 *self.get_buffer() = {
810 let mut new_buffer = String::new();
811 let lines = self.get_buffer().split_inclusive('\n');
812 for line in lines {
813 match line.strip_suffix('\n') {
814 Some(line) => operation_state
815 .progress_bar
816 .println(Self::style_output(line)),
817 None => {
818 new_buffer.push_str(line);
820 }
821 }
822 }
823 new_buffer
824 };
825 }
826 };
827 }
828
829 fn drop(&mut self) {
830 let buffer = self.get_buffer();
831 if !buffer.is_empty() {
832 self.flush();
837 }
838 }
839}
840
841pub struct OutputStream {
843 dest: OutputDest,
844 buffer: String,
845 updater_thread_handle: Arc<RwLock<UpdaterThreadHandle>>,
846 root_operation: Arc<Mutex<RootOperation>>,
847}
848
849impl WriteProgress for OutputStream {
850 type Stream = Stdout;
851
852 fn get_stream() -> Self::Stream {
853 stdout()
854 }
855
856 fn get_buffer(&mut self) -> &mut String {
857 &mut self.buffer
858 }
859
860 fn get_root_operation(&self) -> Arc<Mutex<RootOperation>> {
861 Arc::clone(&self.root_operation)
862 }
863
864 fn get_updater_thread_handle(&self) -> Arc<RwLock<UpdaterThreadHandle>> {
865 Arc::clone(&self.updater_thread_handle)
866 }
867
868 fn style_output(output: &str) -> String {
869 let output = console::strip_ansi_codes(output);
870 console::style(output).dim().to_string()
871 }
872}
873
874impl Write for OutputStream {
875 fn write_str(&mut self, s: &str) -> std::fmt::Result {
876 match &self.dest {
877 OutputDest::Stdout => {
878 self.buffer.push_str(s);
879 self.flush();
880 }
881
882 OutputDest::Suppress => {
883 }
885
886 OutputDest::BufferForTest { stdout, stderr: _ } => {
887 let mut buffer = stdout.lock().unwrap();
888 write!(buffer, "{s}").unwrap();
889 }
890 }
891 Ok(())
892 }
893}
894
895impl Drop for OutputStream {
896 fn drop(&mut self) {
897 WriteProgress::drop(self)
898 }
899}
900
901pub struct ErrorStream {
903 dest: OutputDest,
904 buffer: String,
905 updater_thread_handle: Arc<RwLock<UpdaterThreadHandle>>,
906 root_operation: Arc<Mutex<RootOperation>>,
907}
908
909impl WriteProgress for ErrorStream {
910 type Stream = Stderr;
911
912 fn get_stream() -> Self::Stream {
913 stderr()
914 }
915
916 fn get_buffer(&mut self) -> &mut String {
917 &mut self.buffer
918 }
919
920 fn get_root_operation(&self) -> Arc<Mutex<RootOperation>> {
921 Arc::clone(&self.root_operation)
922 }
923
924 fn get_updater_thread_handle(&self) -> Arc<RwLock<UpdaterThreadHandle>> {
925 Arc::clone(&self.updater_thread_handle)
926 }
927
928 fn style_output(output: &str) -> String {
929 let output = console::strip_ansi_codes(output);
930 console::style(output).dim().to_string()
931 }
932}
933
934impl Write for ErrorStream {
935 fn write_str(&mut self, s: &str) -> std::fmt::Result {
936 match &self.dest {
937 OutputDest::Stdout => {
938 self.buffer.push_str(s);
939 WriteProgress::flush(self);
940 }
941
942 OutputDest::Suppress => {
943 }
945
946 OutputDest::BufferForTest { stdout: _, stderr } => {
947 let mut buffer = stderr.lock().unwrap();
948 write!(buffer, "{s}").unwrap();
949 }
950 }
951 Ok(())
952 }
953}
954
955impl io::Write for ErrorStream {
959 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
960 match &self.dest {
961 OutputDest::Stdout => {
962 self.buffer.push_str(buf.to_str_lossy().as_ref());
963 Ok(buf.len())
964 }
965 OutputDest::Suppress => {
966 Ok(buf.len())
968 }
969 OutputDest::BufferForTest { stdout: _, stderr } => {
970 let mut buffer = stderr.lock().unwrap();
971 buffer.write(buf)
972 }
973 }
974 }
975
976 fn flush(&mut self) -> io::Result<()> {
977 WriteProgress::flush(self);
978 Ok(())
979 }
980}
981
982impl Drop for ErrorStream {
983 fn drop(&mut self) {
984 WriteProgress::drop(self);
985 }
986}
987
988#[derive(Debug)]
992pub struct ProgressHandle<'a> {
993 effects: &'a Effects,
994 operation_key: Vec<OperationType>,
995}
996
997impl Drop for ProgressHandle<'_> {
998 fn drop(&mut self) {
999 self.effects.on_drop_progress_handle(&self.operation_key)
1000 }
1001}
1002
1003impl<'a> ProgressHandle<'a> {
1004 pub fn notify_progress(&self, current: usize, total: usize) {
1008 self.effects
1009 .on_notify_progress(&self.operation_key, current, total);
1010 }
1011
1012 pub fn notify_progress_inc(&self, increment: usize) {
1016 self.effects
1017 .on_notify_progress_inc(&self.operation_key, increment);
1018 }
1019
1020 pub fn notify_status(&self, icon: OperationIcon, message: impl Into<String>) {
1022 let message = message.into();
1023 self.effects
1024 .on_set_message(&self.operation_key, icon, message);
1025 }
1026}
1027
1028pub struct ProgressIter<'a, TItem, TInner: Iterator<Item = TItem>> {
1030 index: usize,
1031 inner: TInner,
1032 progress: ProgressHandle<'a>,
1033}
1034
1035impl<TItem, TInner: Iterator<Item = TItem>> Iterator for ProgressIter<'_, TItem, TInner> {
1036 type Item = TItem;
1037
1038 fn next(&mut self) -> Option<Self::Item> {
1039 let (lower, upper) = self.inner.size_hint();
1040 let size_guess = upper.unwrap_or(lower);
1041 self.progress.notify_progress(self.index, size_guess);
1042 self.index += 1;
1043 self.inner.next()
1044 }
1045}
1046
1047pub trait WithProgress<'a, TItem>: Iterator<Item = TItem> {
1049 type Iter: Iterator<Item = TItem> + 'a;
1051
1052 fn with_progress(self, progress: ProgressHandle<'a>) -> Self::Iter;
1054}
1055
1056impl<'a, TItem: 'a, TIter: Iterator<Item = TItem> + 'a> WithProgress<'a, TItem> for TIter {
1057 type Iter = ProgressIter<'a, TItem, TIter>;
1058
1059 fn with_progress(self, progress: ProgressHandle<'a>) -> Self::Iter {
1060 ProgressIter {
1061 index: 0,
1062 inner: self,
1063 progress,
1064 }
1065 }
1066}
1067
1068#[cfg(test)]
1069mod tests {
1070 use super::*;
1071
1072 #[test]
1073 fn test_effects_progress() -> eyre::Result<()> {
1074 let effects = Effects::new(Glyphs::text());
1075 let (effects2, progress2) = effects.start_operation(OperationType::GetMergeBase);
1076
1077 {
1078 let mut root_operation = effects.root_operation.lock().unwrap();
1079 let get_merge_base_operation = root_operation
1080 .get_child(&[OperationType::GetMergeBase])
1081 .unwrap();
1082 assert_eq!(get_merge_base_operation.start_times.len(), 1);
1083 }
1084
1085 std::thread::sleep(Duration::from_millis(1));
1086 let (_effects3, progress3) = effects.start_operation(OperationType::GetMergeBase);
1087 let earlier_start_time = {
1088 let mut root_operation = effects.root_operation.lock().unwrap();
1089 let get_merge_base_operation = root_operation
1090 .get_child(&[OperationType::GetMergeBase])
1091 .ok_or_else(|| eyre::eyre!("Could not find merge-base operation"))?;
1092 assert_eq!(get_merge_base_operation.start_times.len(), 2);
1093 get_merge_base_operation.start_times[0]
1094 };
1095
1096 drop(progress3);
1097 {
1098 let mut root_operation = effects.root_operation.lock().unwrap();
1099 let get_merge_base_operation = root_operation
1100 .get_child(&[OperationType::GetMergeBase])
1101 .unwrap();
1102 assert_eq!(
1105 get_merge_base_operation.start_times,
1106 vec![earlier_start_time]
1107 );
1108 }
1109
1110 let (_effects4, progress4) = effects2.start_operation(OperationType::CalculateDiff);
1112 std::thread::sleep(Duration::from_millis(1));
1113 drop(progress4);
1114 {
1115 let mut root_operation = effects.root_operation.lock().unwrap();
1116 let calculate_diff_operation = root_operation
1119 .get_child(&[OperationType::GetMergeBase, OperationType::CalculateDiff])
1120 .unwrap();
1121 assert!(calculate_diff_operation.start_times.is_empty());
1122 assert!(calculate_diff_operation.elapsed_duration >= Duration::from_millis(1));
1123 }
1124
1125 drop(progress2);
1126 {
1127 let root_operation = effects.root_operation.lock().unwrap();
1128 assert!(root_operation.children.is_empty());
1129 }
1130
1131 Ok(())
1132 }
1133
1134 #[test]
1136 fn test_effects_progress_rewind_panic() -> eyre::Result<()> {
1137 let effects = Effects::new(Glyphs::text());
1138 let (effects, progress) = effects.start_operation(OperationType::GetMergeBase);
1139 let _ = effects;
1140 progress.notify_progress(3, 10);
1141 progress.notify_progress(0, 10);
1143 Ok(())
1144 }
1145}