1use crate::{
6 LineDisplayStyles,
7 line_display_shared::{
8 HEADER_WIDTH, LineDisplayFormatter, LineDisplayOutput,
9 LineDisplayShared,
10 },
11 utils::ProgressRatioDisplay,
12};
13use chrono::{DateTime, Utc};
14use libsw::TokioSw;
15use owo_colors::OwoColorize;
16use oxide_update_engine_types::{
17 buffer::{
18 EventBuffer, ExecutionStatus, ExecutionTerminalInfo, TerminalKind,
19 },
20 errors::UnknownReportKey,
21 events::EventReport,
22 spec::EngineSpec,
23};
24use std::{borrow::Borrow, collections::BTreeMap, fmt, time::Duration};
25use swrite::{SWrite, swrite};
26use unicode_width::UnicodeWidthStr;
27
28#[derive(Debug)]
34pub struct GroupDisplay<K, W, S: EngineSpec> {
35 log: slog::Logger,
38 writer: W,
39 max_width: usize,
40 start_sw: TokioSw,
42 start_time: Option<DateTime<Utc>>,
43 single_states: BTreeMap<K, SingleState<S>>,
44 formatter: LineDisplayFormatter,
45 stats: GroupDisplayStats,
46}
47
48impl<K: Eq + Ord, W: std::io::Write, S: EngineSpec> GroupDisplay<K, W, S> {
49 pub fn new<Str>(
54 log: &slog::Logger,
55 keys_and_prefixes: impl IntoIterator<Item = (K, Str)>,
56 writer: W,
57 ) -> Self
58 where
59 Str: Into<String>,
60 {
61 let mut max_width = 0;
64 let keys_and_prefixes: Vec<_> = keys_and_prefixes
65 .into_iter()
66 .map(|(k, prefix)| {
67 let prefix = prefix.into();
68 max_width =
69 max_width.max(UnicodeWidthStr::width(prefix.as_str()));
70 (k, prefix)
71 })
72 .collect();
73 let single_states: BTreeMap<_, _> = keys_and_prefixes
74 .into_iter()
75 .map(|(k, prefix)| (k, SingleState::new(prefix, max_width)))
76 .collect();
77
78 let not_started = single_states.len();
79 Self {
80 log: log.new(slog::o!("component" => "GroupDisplay")),
81 writer,
82 max_width,
83 start_sw: TokioSw::new(),
86 start_time: None,
87 single_states,
88 formatter: LineDisplayFormatter::new(),
89 stats: GroupDisplayStats::new(not_started),
90 }
91 }
92
93 pub fn new_with_display(
96 log: &slog::Logger,
97 keys: impl IntoIterator<Item = K>,
98 writer: W,
99 ) -> Self
100 where
101 K: fmt::Display,
102 {
103 Self::new(
104 log,
105 keys.into_iter().map(|k| {
106 let prefix = k.to_string();
107 (k, prefix)
108 }),
109 writer,
110 )
111 }
112
113 #[inline]
115 pub fn set_styles(&mut self, styles: LineDisplayStyles) {
116 self.formatter.set_styles(styles);
117 }
118
119 #[inline]
125 pub fn set_start_time(&mut self, start_time: DateTime<Utc>) {
126 self.start_time = Some(start_time);
127 }
128
129 #[inline]
131 pub fn set_progress_interval(&mut self, interval: Duration) {
132 self.formatter.set_progress_interval(interval);
133 }
134
135 pub fn contains_key<Q>(&self, key: &Q) -> bool
138 where
139 K: Borrow<Q>,
140 Q: Ord,
141 {
142 self.single_states.contains_key(key)
143 }
144
145 pub fn add_event_report<Q>(
151 &mut self,
152 key: &Q,
153 event_report: EventReport<S>,
154 ) -> Result<(), UnknownReportKey>
155 where
156 K: Borrow<Q>,
157 Q: Ord,
158 {
159 if let Some(state) = self.single_states.get_mut(key) {
160 let result = state.add_event_report(event_report);
161 if let Some(root_total_elapsed) = result.root_total_elapsed
163 && self.start_sw.elapsed() < root_total_elapsed
164 {
165 self.start_sw =
166 TokioSw::with_elapsed_started(root_total_elapsed);
167 }
168
169 self.stats.apply_result(result);
170
171 if result.before != result.after {
172 slog::debug!(
173 self.log,
174 "add_event_report caused state transition";
175 "prefix" => &state.prefix,
176 "before" => %result.before,
177 "after" => %result.after,
178 "current_stats" => ?self.stats,
179 "root_total_elapsed" => ?result.root_total_elapsed,
180 );
181 } else {
182 slog::trace!(
183 self.log,
184 "add_event_report called, state did not change";
185 "prefix" => &state.prefix,
186 "state" => %result.before,
187 "current_stats" => ?self.stats,
188 "root_total_elapsed" => ?result.root_total_elapsed,
189 );
190 }
191
192 Ok(())
193 } else {
194 Err(UnknownReportKey {})
195 }
196 }
197
198 pub fn write_stats(&mut self, header: &str) -> std::io::Result<()> {
200 let prefix = " ".repeat(self.max_width);
202 let mut line = self.formatter.start_line(
203 &prefix,
204 self.start_time,
205 Some(self.start_sw.elapsed()),
206 );
207 self.stats.format_line(&mut line, header, &self.formatter);
208 writeln!(self.writer, "{line}")
209 }
210
211 pub fn write_events(&mut self) -> std::io::Result<()> {
213 let mut out = LineDisplayOutput::new();
214 for state in self.single_states.values_mut() {
215 state.format_events(&self.formatter, &mut out);
216 }
217 for line in out.iter() {
218 writeln!(self.writer, "{line}")?;
219 }
220 Ok(())
221 }
222
223 pub fn stats(&self) -> &GroupDisplayStats {
225 &self.stats
226 }
227}
228
229#[derive(Clone, Copy, Debug, Eq, PartialEq)]
230pub struct GroupDisplayStats {
231 pub total: usize,
233
234 pub not_started: usize,
236
237 pub running: usize,
239
240 pub completed: usize,
242
243 pub failed: usize,
245
246 pub aborted: usize,
248
249 pub overwritten: usize,
255}
256
257impl GroupDisplayStats {
258 fn new(total: usize) -> Self {
259 Self {
260 total,
261 not_started: total,
262 completed: 0,
263 failed: 0,
264 aborted: 0,
265 overwritten: 0,
266 running: 0,
267 }
268 }
269
270 pub fn terminal_count(&self) -> usize {
272 self.completed + self.failed + self.aborted + self.overwritten
273 }
274
275 pub fn is_terminal(&self) -> bool {
277 self.not_started == 0 && self.running == 0
278 }
279
280 pub fn has_failures(&self) -> bool {
282 self.failed > 0 || self.aborted > 0 || self.overwritten > 0
283 }
284
285 fn apply_result(&mut self, result: AddEventReportResult) {
286 if result.before == result.after {
287 return;
289 }
290
291 match result.before {
292 SingleStateTag::NotStarted => self.not_started -= 1,
293 SingleStateTag::Running => self.running -= 1,
294 SingleStateTag::Terminal(TerminalKind::Completed) => {
295 self.completed -= 1
296 }
297 SingleStateTag::Terminal(TerminalKind::Failed) => self.failed -= 1,
298 SingleStateTag::Terminal(TerminalKind::Aborted) => {
299 self.aborted -= 1
300 }
301 SingleStateTag::Overwritten => self.overwritten -= 1,
302 }
303
304 match result.after {
305 SingleStateTag::NotStarted => self.not_started += 1,
306 SingleStateTag::Running => self.running += 1,
307 SingleStateTag::Terminal(TerminalKind::Completed) => {
308 self.completed += 1
309 }
310 SingleStateTag::Terminal(TerminalKind::Failed) => self.failed += 1,
311 SingleStateTag::Terminal(TerminalKind::Aborted) => {
312 self.aborted += 1
313 }
314 SingleStateTag::Overwritten => self.overwritten += 1,
315 }
316 }
317
318 fn format_line(
319 &self,
320 line: &mut String,
321 header: &str,
322 formatter: &LineDisplayFormatter,
323 ) {
324 let header_style = if self.has_failures() {
325 formatter.styles().error_style
326 } else {
327 formatter.styles().progress_style
328 };
329
330 swrite!(line, "{:>HEADER_WIDTH$} ", header.style(header_style));
331 swrite!(
332 line,
333 "{}: {} running, {} {}",
334 ProgressRatioDisplay::current_and_total(
335 self.terminal_count(),
336 self.total
337 ),
338 self.running.style(formatter.styles().meta_style),
339 self.completed.style(formatter.styles().meta_style),
340 "completed".style(formatter.styles().progress_style),
341 );
342 if self.failed > 0 {
343 swrite!(
344 line,
345 ", {} {}",
346 self.failed.style(formatter.styles().meta_style),
347 "failed".style(formatter.styles().error_style),
348 );
349 }
350 if self.aborted > 0 {
351 swrite!(
352 line,
353 ", {} {}",
354 self.aborted.style(formatter.styles().meta_style),
355 "aborted".style(formatter.styles().error_style),
356 );
357 }
358 if self.overwritten > 0 {
359 swrite!(
360 line,
361 ", {} {}",
362 self.overwritten.style(formatter.styles().meta_style),
363 "overwritten".style(formatter.styles().error_style),
364 );
365 }
366 }
367}
368
369#[derive(Debug)]
370struct SingleState<S: EngineSpec> {
371 shared: LineDisplayShared,
372 kind: SingleStateKind<S>,
373 prefix: String,
374}
375
376impl<S: EngineSpec> SingleState<S> {
377 fn new(prefix: String, max_width: usize) -> Self {
378 let prefix = format!("{:>max_width$}", prefix);
380 Self {
381 shared: LineDisplayShared::default(),
382 kind: SingleStateKind::NotStarted { displayed: false },
383 prefix,
384 }
385 }
386
387 fn add_event_report(
389 &mut self,
390 event_report: EventReport<S>,
391 ) -> AddEventReportResult {
392 match &mut self.kind {
393 SingleStateKind::NotStarted { .. } => {
394 let before = SingleStateTag::NotStarted;
396 let mut event_buffer = EventBuffer::default();
397 let (after, root_total_elapsed) =
398 match Self::apply_report(&mut event_buffer, event_report) {
399 ApplyReportResult::NotStarted => {
400 (SingleStateTag::NotStarted, None)
403 }
404 ApplyReportResult::Running(root_total_elapsed) => {
405 self.kind =
406 SingleStateKind::Running { event_buffer };
407 (SingleStateTag::Running, Some(root_total_elapsed))
408 }
409 ApplyReportResult::Terminal(info) => {
410 let terminal_kind = info.kind;
411 let root_total_elapsed = info.root_total_elapsed;
412
413 self.kind = SingleStateKind::Terminal {
414 info,
415 pending_event_buffer: Some(event_buffer),
416 };
417 (
418 SingleStateTag::Terminal(terminal_kind),
419 root_total_elapsed,
420 )
421 }
422 ApplyReportResult::Overwritten => {
423 self.kind = SingleStateKind::Overwritten {
424 displayed: false,
425 };
426 (SingleStateTag::Overwritten, None)
427 }
428 };
429
430 AddEventReportResult { before, after, root_total_elapsed }
431 }
432 SingleStateKind::Running { event_buffer } => {
433 let before = SingleStateTag::Running;
435 let (after, root_total_elapsed) = match Self::apply_report(
436 event_buffer,
437 event_report,
438 ) {
439 ApplyReportResult::NotStarted => {
440 unreachable!(
445 "illegal state transition from Running to NotStarted"
446 )
447 }
448 ApplyReportResult::Running(root_total_elapsed) => {
449 (SingleStateTag::Running, Some(root_total_elapsed))
450 }
451 ApplyReportResult::Terminal(info) => {
452 let terminal_kind = info.kind;
453 let root_total_elapsed = info.root_total_elapsed;
454
455 let event_buffer = std::mem::replace(
458 event_buffer,
459 EventBuffer::new(0),
460 );
461
462 self.kind = SingleStateKind::Terminal {
463 info,
464 pending_event_buffer: Some(event_buffer),
465 };
466 (
467 SingleStateTag::Terminal(terminal_kind),
468 root_total_elapsed,
469 )
470 }
471 ApplyReportResult::Overwritten => {
472 self.kind =
473 SingleStateKind::Overwritten { displayed: false };
474 (SingleStateTag::Overwritten, None)
475 }
476 };
477 AddEventReportResult { before, after, root_total_elapsed }
478 }
479 SingleStateKind::Terminal { info, .. } => {
480 AddEventReportResult::unchanged(
483 SingleStateTag::Terminal(info.kind),
484 info.root_total_elapsed,
485 )
486 }
487 SingleStateKind::Overwritten { .. } => {
488 AddEventReportResult::unchanged(
491 SingleStateTag::Overwritten,
492 None,
493 )
494 }
495 }
496 }
497
498 fn apply_report(
500 event_buffer: &mut EventBuffer<S>,
501 event_report: EventReport<S>,
502 ) -> ApplyReportResult {
503 if let Some(root_execution_id) = event_buffer.root_execution_id()
504 && event_report.root_execution_id != Some(root_execution_id)
505 {
506 return ApplyReportResult::Overwritten;
510 }
511
512 event_buffer.add_event_report(event_report);
513 match event_buffer.root_execution_summary() {
514 Some(summary) => match summary.execution_status {
515 ExecutionStatus::NotStarted => ApplyReportResult::NotStarted,
516 ExecutionStatus::Running { root_total_elapsed, .. } => {
517 ApplyReportResult::Running(root_total_elapsed)
518 }
519 ExecutionStatus::Terminal(info) => {
520 ApplyReportResult::Terminal(info)
521 }
522 },
523 None => {
524 ApplyReportResult::NotStarted
526 }
527 }
528 }
529
530 fn format_events(
531 &mut self,
532 formatter: &LineDisplayFormatter,
533 out: &mut LineDisplayOutput,
534 ) {
535 let mut cx = self.shared.with_context(&self.prefix, formatter);
536 match &mut self.kind {
537 SingleStateKind::NotStarted { displayed } => {
538 if !*displayed {
539 let line =
540 cx.format_generic("Update not started, waiting...");
541 out.add_line(line);
542 *displayed = true;
543 }
544 }
545 SingleStateKind::Running { event_buffer } => {
546 cx.format_event_buffer(event_buffer, out);
547 }
548 SingleStateKind::Terminal { info, pending_event_buffer } => {
549 if let Some(event_buffer) = pending_event_buffer.take() {
552 cx.format_event_buffer(&event_buffer, out);
553 let line = cx.format_terminal_info(info);
555 out.add_line(line);
556 }
557
558 }
560 SingleStateKind::Overwritten { displayed } => {
561 if !*displayed {
562 let line = cx.format_generic(
563 "Update overwritten (a different update was started): \
564 assuming failure",
565 );
566 out.add_line(line);
567 *displayed = true;
568 }
569 }
570 }
571 }
572}
573
574#[derive(Debug)]
575enum SingleStateKind<S: EngineSpec> {
576 NotStarted {
577 displayed: bool,
578 },
579 Running {
580 event_buffer: EventBuffer<S>,
581 },
582 Terminal {
583 info: ExecutionTerminalInfo,
584 pending_event_buffer: Option<EventBuffer<S>>,
587 },
588 Overwritten {
589 displayed: bool,
590 },
591}
592
593#[derive(Clone, Copy, Debug, Eq, PartialEq)]
594struct AddEventReportResult {
595 before: SingleStateTag,
596 after: SingleStateTag,
597 root_total_elapsed: Option<Duration>,
598}
599
600impl AddEventReportResult {
601 fn unchanged(
602 tag: SingleStateTag,
603 root_total_elapsed: Option<Duration>,
604 ) -> Self {
605 Self { before: tag, after: tag, root_total_elapsed }
606 }
607}
608
609#[derive(Copy, Clone, Debug, Eq, PartialEq)]
610enum SingleStateTag {
611 NotStarted,
612 Running,
613 Terminal(TerminalKind),
614 Overwritten,
615}
616
617impl fmt::Display for SingleStateTag {
618 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
619 match self {
620 Self::NotStarted => write!(f, "not started"),
621 Self::Running => write!(f, "running"),
622 Self::Terminal(kind) => write!(f, "{kind}"),
623 Self::Overwritten => write!(f, "overwritten"),
624 }
625 }
626}
627
628#[derive(Clone, Debug)]
629enum ApplyReportResult {
630 NotStarted,
631 Running(Duration),
632 Terminal(ExecutionTerminalInfo),
633 Overwritten,
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639 use oxide_update_engine_test_utils::{
640 GenerateTestEventsKind, generate_test_events,
641 };
642 use oxide_update_engine_types::{buffer::EventBuffer, events::EventReport};
643
644 #[tokio::test]
645 async fn test_stats() {
646 let log = slog::Logger::root(slog::Discard, slog::o!());
647 let generated_completed =
649 generate_test_events(&log, GenerateTestEventsKind::Completed).await;
650 let generated_failed =
651 generate_test_events(&log, GenerateTestEventsKind::Failed).await;
652 let generated_aborted =
653 generate_test_events(&log, GenerateTestEventsKind::Aborted).await;
654
655 let mut group_display = GroupDisplay::new_with_display(
657 &log,
658 vec![
659 GroupDisplayKey::Completed,
660 GroupDisplayKey::Failed,
661 GroupDisplayKey::Aborted,
662 GroupDisplayKey::Overwritten,
663 ],
664 std::io::stdout(),
665 );
666
667 let mut expected_stats = GroupDisplayStats {
668 total: 4,
669 not_started: 4,
670 running: 0,
671 completed: 0,
672 failed: 0,
673 aborted: 0,
674 overwritten: 0,
675 };
676 assert_eq!(group_display.stats(), &expected_stats);
677 assert!(!expected_stats.is_terminal());
678 assert!(!expected_stats.has_failures());
679
680 group_display
684 .add_event_report(
685 &GroupDisplayKey::Completed,
686 EventReport::default(),
687 )
688 .unwrap();
689 assert_eq!(group_display.stats(), &expected_stats);
690
691 {
694 expected_stats.not_started -= 1;
695 expected_stats.running += 1;
696
697 let n = generated_completed.len();
698
699 let mut buffer = EventBuffer::default();
700 let mut last_seen = None;
701
702 for (i, event) in
703 generated_completed.clone().into_iter().enumerate()
704 {
705 buffer.add_event(event);
706 let report = buffer.generate_report_since(&mut last_seen);
707 group_display
708 .add_event_report(&GroupDisplayKey::Completed, report)
709 .unwrap();
710 if i == n - 1 {
711 expected_stats.running -= 1;
714 expected_stats.completed += 1;
715 } else {
716 }
718 assert_eq!(group_display.stats(), &expected_stats);
719 assert!(!expected_stats.is_terminal());
720 assert!(!expected_stats.has_failures());
721 }
722 }
723
724 {
727 expected_stats.not_started -= 1;
728 expected_stats.running += 1;
729
730 let n = generated_failed.len();
731
732 let mut buffer = EventBuffer::default();
733 for (i, event) in generated_failed.clone().into_iter().enumerate() {
734 buffer.add_event(event);
735 let report = buffer.generate_report();
736 group_display
737 .add_event_report(&GroupDisplayKey::Failed, report)
738 .unwrap();
739 if i == n - 1 {
740 expected_stats.running -= 1;
742 expected_stats.failed += 1;
743 assert!(expected_stats.has_failures());
744 } else {
745 assert!(!expected_stats.has_failures());
747 }
748 assert_eq!(group_display.stats(), &expected_stats);
749 }
750 }
751
752 {
754 expected_stats.not_started -= 1;
755 expected_stats.running += 1;
756
757 let mut buffer = EventBuffer::default();
758 for event in generated_aborted {
759 buffer.add_event(event);
760 }
761 let report = buffer.generate_report();
762 group_display
763 .add_event_report(&GroupDisplayKey::Aborted, report)
764 .unwrap();
765 expected_stats.running -= 1;
767 expected_stats.aborted += 1;
768 assert_eq!(group_display.stats(), &expected_stats);
769
770 let mut buffer = EventBuffer::default();
774 buffer.add_event(generated_failed.first().unwrap().clone());
775 let report = buffer.generate_report();
776 group_display
777 .add_event_report(&GroupDisplayKey::Aborted, report)
778 .unwrap();
779 assert_eq!(group_display.stats(), &expected_stats);
780 }
781
782 {
786 expected_stats.not_started -= 1;
787 expected_stats.running += 1;
788
789 let mut buffer = EventBuffer::default();
790 let n = generated_completed.len() / 2;
791 for event in generated_completed.into_iter().take(n) {
792 buffer.add_event(event);
793 }
794 let report = buffer.generate_report();
795 group_display
796 .add_event_report(&GroupDisplayKey::Overwritten, report)
797 .unwrap();
798 assert_eq!(group_display.stats(), &expected_stats);
799
800 let mut buffer = EventBuffer::default();
803 buffer.add_event(generated_failed.first().unwrap().clone());
804 let report = buffer.generate_report();
805 group_display
806 .add_event_report(&GroupDisplayKey::Overwritten, report)
807 .unwrap();
808 expected_stats.running -= 1;
811 expected_stats.overwritten += 1;
812 }
813
814 assert!(expected_stats.has_failures());
815 assert!(expected_stats.is_terminal());
816 }
817
818 #[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
819 enum GroupDisplayKey {
820 Completed,
821 Failed,
822 Aborted,
823 Overwritten,
824 }
825
826 impl fmt::Display for GroupDisplayKey {
827 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
828 match self {
829 Self::Completed => write!(f, "completed"),
830 Self::Failed => write!(f, "failed"),
831 Self::Aborted => write!(f, "aborted"),
832 Self::Overwritten => write!(f, "overwritten"),
833 }
834 }
835 }
836}