Skip to main content

rch_common/ui/progress/
pipeline.rs

1//! PipelineProgress - Multi-stage operation visualization.
2//!
3//! Renders a tree-like display of pipeline stages with:
4//! - Stage-by-stage progress with checkmarks for completed stages
5//! - Current stage highlighted with timing
6//! - Stage timing breakdown (completed stages show duration)
7//! - Overall elapsed time and ETA
8//! - Error indication at failed stage with subsequent stages grayed
9
10use crate::ui::{Icons, OutputContext, ProgressContext};
11use std::time::{Duration, Instant};
12
13/// Pipeline stages for RCH compile workflow.
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
15pub enum PipelineStage {
16    /// Workspace analysis (file enumeration).
17    WorkspaceAnalysis,
18    /// Upload to worker (rsync).
19    Upload,
20    /// Remote compilation (cargo).
21    Compilation,
22    /// Artifact retrieval (rsync download).
23    ArtifactRetrieval,
24    /// Cache update (optional).
25    CacheUpdate,
26}
27
28impl PipelineStage {
29    /// Get the display label for this stage.
30    #[must_use]
31    pub fn label(self) -> &'static str {
32        match self {
33            Self::WorkspaceAnalysis => "Workspace analysis",
34            Self::Upload => "Upload to worker",
35            Self::Compilation => "Remote compilation",
36            Self::ArtifactRetrieval => "Artifact retrieval",
37            Self::CacheUpdate => "Cache update",
38        }
39    }
40
41    /// Get the short label for compact display.
42    #[must_use]
43    pub fn short_label(self) -> &'static str {
44        match self {
45            Self::WorkspaceAnalysis => "Workspace",
46            Self::Upload => "Upload",
47            Self::Compilation => "Compile",
48            Self::ArtifactRetrieval => "Download",
49            Self::CacheUpdate => "Cache",
50        }
51    }
52
53    /// Get all stages in order.
54    #[must_use]
55    pub fn all() -> &'static [PipelineStage] {
56        &[
57            Self::WorkspaceAnalysis,
58            Self::Upload,
59            Self::Compilation,
60            Self::ArtifactRetrieval,
61            Self::CacheUpdate,
62        ]
63    }
64
65    /// Get the index of this stage (0-based).
66    #[must_use]
67    pub fn index(self) -> usize {
68        match self {
69            Self::WorkspaceAnalysis => 0,
70            Self::Upload => 1,
71            Self::Compilation => 2,
72            Self::ArtifactRetrieval => 3,
73            Self::CacheUpdate => 4,
74        }
75    }
76}
77
78/// Status of a pipeline stage.
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
80pub enum StageStatus {
81    /// Stage not yet started.
82    #[default]
83    Pending,
84    /// Stage currently in progress.
85    InProgress,
86    /// Stage completed successfully.
87    Completed,
88    /// Stage skipped (e.g., cache hit).
89    Skipped,
90    /// Stage failed with error.
91    Failed,
92}
93
94impl StageStatus {
95    fn icon(self, ctx: OutputContext) -> &'static str {
96        match self {
97            Self::Pending => Icons::bullet_hollow(ctx),
98            Self::InProgress => Icons::hourglass(ctx),
99            Self::Completed => Icons::check(ctx),
100            Self::Skipped => Icons::status_disabled(ctx),
101            Self::Failed => Icons::cross(ctx),
102        }
103    }
104}
105
106/// Information about a single stage's execution.
107#[derive(Debug, Clone)]
108struct StageInfo {
109    status: StageStatus,
110    start_time: Option<Instant>,
111    duration: Option<Duration>,
112    detail: Option<String>,
113    skip_reason: Option<String>,
114    error_message: Option<String>,
115}
116
117impl Default for StageInfo {
118    fn default() -> Self {
119        Self {
120            status: StageStatus::Pending,
121            start_time: None,
122            duration: None,
123            detail: None,
124            skip_reason: None,
125            error_message: None,
126        }
127    }
128}
129
130/// Progress display for multi-stage pipeline operations.
131///
132/// Tracks and renders progress through the RCH compile pipeline:
133/// 1. Workspace analysis (file enumeration)
134/// 2. Upload to worker (rsync)
135/// 3. Remote compilation (cargo)
136/// 4. Artifact retrieval (rsync)
137/// 5. Cache update (optional)
138///
139/// # Example
140///
141/// ```ignore
142/// use rch_common::ui::{OutputContext, PipelineProgress, PipelineStage};
143///
144/// let ctx = OutputContext::detect();
145/// let mut pipeline = PipelineProgress::new(ctx, "worker1", false);
146///
147/// pipeline.start_stage(PipelineStage::WorkspaceAnalysis);
148/// pipeline.set_stage_detail("425 files");
149/// pipeline.complete_stage();
150///
151/// pipeline.start_stage(PipelineStage::Upload);
152/// pipeline.set_stage_detail("78.1 MB");
153/// pipeline.complete_stage();
154///
155/// pipeline.finish();
156/// ```
157#[derive(Debug)]
158pub struct PipelineProgress {
159    ctx: OutputContext,
160    worker: String,
161    enabled: bool,
162    progress: Option<ProgressContext>,
163    start: Instant,
164    stages: Vec<StageInfo>,
165    current_stage: Option<PipelineStage>,
166    cache_saved_time: Option<Duration>,
167}
168
169impl PipelineProgress {
170    /// Create a new pipeline progress display.
171    #[must_use]
172    pub fn new(ctx: OutputContext, worker: impl Into<String>, quiet: bool) -> Self {
173        let enabled = !quiet && !ctx.is_machine();
174        let progress = if enabled && matches!(ctx, OutputContext::Interactive) {
175            Some(ProgressContext::new(ctx))
176        } else {
177            None
178        };
179
180        let stages = PipelineStage::all()
181            .iter()
182            .map(|_| StageInfo::default())
183            .collect();
184
185        Self {
186            ctx,
187            worker: worker.into(),
188            enabled,
189            progress,
190            start: Instant::now(),
191            stages,
192            current_stage: None,
193            cache_saved_time: None,
194        }
195    }
196
197    /// Start a new pipeline stage.
198    pub fn start_stage(&mut self, stage: PipelineStage) {
199        let idx = stage.index();
200        if idx < self.stages.len() {
201            self.stages[idx].status = StageStatus::InProgress;
202            self.stages[idx].start_time = Some(Instant::now());
203            self.current_stage = Some(stage);
204        }
205        self.render();
206    }
207
208    /// Set detail text for the current stage (e.g., "425 files", "78.1 MB").
209    pub fn set_stage_detail(&mut self, detail: impl Into<String>) {
210        if let Some(stage) = self.current_stage {
211            let idx = stage.index();
212            if idx < self.stages.len() {
213                self.stages[idx].detail = Some(detail.into());
214            }
215        }
216        self.render();
217    }
218
219    /// Update the current stage detail (rate-limited render).
220    pub fn update_detail(&mut self, detail: impl Into<String>) {
221        if let Some(stage) = self.current_stage {
222            let idx = stage.index();
223            if idx < self.stages.len() {
224                self.stages[idx].detail = Some(detail.into());
225            }
226        }
227        self.render();
228    }
229
230    /// Complete the current stage successfully.
231    pub fn complete_stage(&mut self) {
232        if let Some(stage) = self.current_stage.take() {
233            let idx = stage.index();
234            if idx < self.stages.len() {
235                let info = &mut self.stages[idx];
236                info.status = StageStatus::Completed;
237                info.duration = info.start_time.map(|start| start.elapsed());
238            }
239        }
240        self.render();
241    }
242
243    /// Skip a stage with a reason.
244    pub fn skip_stage(&mut self, stage: PipelineStage, reason: impl Into<String>) {
245        let idx = stage.index();
246        if idx < self.stages.len() {
247            self.stages[idx].status = StageStatus::Skipped;
248            self.stages[idx].skip_reason = Some(reason.into());
249        }
250        // Clear current if it matches
251        if self.current_stage == Some(stage) {
252            self.current_stage = None;
253        }
254        self.render();
255    }
256
257    /// Mark the current stage as failed.
258    pub fn fail_stage(&mut self, error: impl Into<String>) {
259        if let Some(stage) = self.current_stage.take() {
260            let idx = stage.index();
261            if idx < self.stages.len() {
262                let info = &mut self.stages[idx];
263                info.status = StageStatus::Failed;
264                info.duration = info.start_time.map(|start| start.elapsed());
265                info.error_message = Some(error.into());
266            }
267        }
268        self.render();
269    }
270
271    /// Set time saved due to cache hit.
272    pub fn set_cache_saved_time(&mut self, saved: Duration) {
273        self.cache_saved_time = Some(saved);
274    }
275
276    /// Get the total elapsed time.
277    #[must_use]
278    pub fn elapsed(&self) -> Duration {
279        self.start.elapsed()
280    }
281
282    /// Check if any stage has failed.
283    #[must_use]
284    pub fn has_failed(&self) -> bool {
285        self.stages.iter().any(|s| s.status == StageStatus::Failed)
286    }
287
288    /// Get the current stage if any.
289    #[must_use]
290    pub fn current_stage(&self) -> Option<PipelineStage> {
291        self.current_stage
292    }
293
294    /// Calculate estimated time remaining based on completed stages.
295    fn estimate_remaining(&self) -> Option<Duration> {
296        // Simple estimation: average completed stage time * remaining stages
297        let completed: Vec<Duration> = self
298            .stages
299            .iter()
300            .filter_map(|s| {
301                if s.status == StageStatus::Completed {
302                    s.duration
303                } else {
304                    None
305                }
306            })
307            .collect();
308
309        if completed.is_empty() {
310            return None;
311        }
312
313        // Safe: completed is non-empty so len() >= 1; try_into maps any usize >= 1 to u32 >= 1.
314        let completed_count: u32 = completed.len().try_into().unwrap_or(u32::MAX);
315        let avg_duration: Duration = completed.iter().sum::<Duration>() / completed_count;
316
317        let remaining_count: u32 = self
318            .stages
319            .iter()
320            .filter(|s| matches!(s.status, StageStatus::Pending | StageStatus::InProgress))
321            .count()
322            .try_into()
323            .unwrap_or(u32::MAX);
324
325        if remaining_count == 0 {
326            return None;
327        }
328
329        Some(avg_duration.saturating_mul(remaining_count))
330    }
331
332    /// Render the pipeline progress display.
333    fn render(&mut self) {
334        if !self.enabled {
335            return;
336        }
337
338        let elapsed = format_duration(self.start.elapsed());
339        let eta = self
340            .estimate_remaining()
341            .map(|d| format!("~{}", format_duration(d)))
342            .unwrap_or_else(|| "--".to_string());
343
344        // Build compact single-line display (for future multi-line rendering)
345        let _stages_display: Vec<String> = PipelineStage::all()
346            .iter()
347            .map(|stage| {
348                let idx = stage.index();
349                let info = &self.stages[idx];
350                let icon = info.status.icon(self.ctx);
351                let label = stage.short_label();
352
353                match info.status {
354                    StageStatus::Completed => {
355                        let dur = info
356                            .duration
357                            .map(format_duration)
358                            .unwrap_or_else(|| "-".to_string());
359                        let detail = info
360                            .detail
361                            .as_ref()
362                            .map(|d| format!(" ({d})"))
363                            .unwrap_or_default();
364                        format!("{icon} {label} {dur}{detail}")
365                    }
366                    StageStatus::InProgress => {
367                        let dur = info
368                            .start_time
369                            .map(|s| format_duration(s.elapsed()))
370                            .unwrap_or_else(|| "-".to_string());
371                        let detail = info
372                            .detail
373                            .as_ref()
374                            .map(|d| format!(" ({d})"))
375                            .unwrap_or_default();
376                        format!("{icon} {label} {dur}{detail}")
377                    }
378                    StageStatus::Skipped => {
379                        let reason = info
380                            .skip_reason
381                            .as_ref()
382                            .map(|r| format!(" ({r})"))
383                            .unwrap_or_default();
384                        format!("{icon} {label}{reason}")
385                    }
386                    StageStatus::Failed => {
387                        let dur = info
388                            .duration
389                            .map(format_duration)
390                            .unwrap_or_else(|| "-".to_string());
391                        format!("{icon} {label} {dur} FAILED")
392                    }
393                    StageStatus::Pending => {
394                        format!("{icon} {label}")
395                    }
396                }
397            })
398            .collect();
399
400        let line = format!(
401            "Pipeline [{}/{}] {} | {} | ETA {}",
402            self.count_completed(),
403            PipelineStage::all().len(),
404            self.worker,
405            elapsed,
406            eta
407        );
408
409        if let Some(progress) = &mut self.progress {
410            progress.render(&line);
411        }
412    }
413
414    fn count_completed(&self) -> usize {
415        self.stages
416            .iter()
417            .filter(|s| matches!(s.status, StageStatus::Completed | StageStatus::Skipped))
418            .count()
419    }
420
421    /// Clear progress display.
422    pub fn clear(&self) {
423        if let Some(progress) = &self.progress {
424            progress.clear();
425        }
426    }
427
428    /// Finish the pipeline and print summary.
429    pub fn finish(&mut self) {
430        self.clear();
431
432        if !self.enabled {
433            return;
434        }
435
436        let duration = self.start.elapsed();
437        let icon = if self.has_failed() {
438            Icons::cross(self.ctx)
439        } else {
440            Icons::check(self.ctx)
441        };
442
443        let status = if self.has_failed() {
444            "failed"
445        } else {
446            "completed"
447        };
448        let elapsed = format_duration(duration);
449
450        // Build summary line
451        let mut summary = format!("{icon} Pipeline {status} on {} in {elapsed}", self.worker);
452
453        // Add cache savings if applicable
454        if let Some(saved) = self.cache_saved_time {
455            let saved_str = format_duration(saved);
456            summary.push_str(&format!(" (cache saved ~{saved_str})"));
457        }
458
459        eprintln!("{summary}");
460
461        // Print stage breakdown if verbose
462        for stage in PipelineStage::all() {
463            let idx = stage.index();
464            let info = &self.stages[idx];
465            let icon = info.status.icon(self.ctx);
466            let label = stage.label();
467
468            match info.status {
469                StageStatus::Completed => {
470                    let dur = info
471                        .duration
472                        .map(format_duration)
473                        .unwrap_or_else(|| "-".to_string());
474                    let detail = info
475                        .detail
476                        .as_ref()
477                        .map(|d| format!("  ({d})"))
478                        .unwrap_or_default();
479                    eprintln!("   {icon} {label:<22} {dur:>8}{detail}");
480                }
481                StageStatus::Skipped => {
482                    let reason = info
483                        .skip_reason
484                        .as_ref()
485                        .map(|r| format!("  ({r})"))
486                        .unwrap_or_default();
487                    eprintln!("   {icon} {label:<22} skipped{reason}");
488                }
489                StageStatus::Failed => {
490                    let dur = info
491                        .duration
492                        .map(format_duration)
493                        .unwrap_or_else(|| "-".to_string());
494                    let error = info
495                        .error_message
496                        .as_ref()
497                        .map(|e| format!("  ({e})"))
498                        .unwrap_or_default();
499                    eprintln!("   {icon} {label:<22} {dur:>8} FAILED{error}");
500                }
501                _ => {}
502            }
503        }
504    }
505
506    /// Finish with error summary.
507    pub fn finish_error(&mut self, error: &str) {
508        self.clear();
509
510        if !self.enabled {
511            return;
512        }
513
514        let icon = Icons::cross(self.ctx);
515        let elapsed = format_duration(self.start.elapsed());
516
517        eprintln!(
518            "{icon} Pipeline failed on {} after {elapsed}: {error}",
519            self.worker
520        );
521
522        // Show completed stages for debugging
523        for stage in PipelineStage::all() {
524            let idx = stage.index();
525            let info = &self.stages[idx];
526            if info.status == StageStatus::Completed || info.status == StageStatus::Failed {
527                let stage_icon = info.status.icon(self.ctx);
528                let label = stage.label();
529                let dur = info
530                    .duration
531                    .map(format_duration)
532                    .unwrap_or_else(|| "-".to_string());
533                eprintln!("   {stage_icon} {label:<22} {dur:>8}");
534            }
535        }
536    }
537}
538
539fn format_duration(duration: Duration) -> String {
540    let total_secs = duration.as_secs();
541    if total_secs == 0 {
542        let ms = duration.as_millis();
543        if ms < 100 {
544            return format!("{ms}ms");
545        }
546        return format!("{:.1}s", duration.as_secs_f64());
547    }
548    if total_secs < 60 {
549        format!("{:.1}s", duration.as_secs_f64())
550    } else if total_secs < 3600 {
551        let mins = total_secs / 60;
552        let secs = total_secs % 60;
553        format!("{mins}:{secs:02}")
554    } else {
555        let hours = total_secs / 3600;
556        let mins = (total_secs % 3600) / 60;
557        let secs = total_secs % 60;
558        format!("{hours}:{mins:02}:{secs:02}")
559    }
560}
561
562#[cfg(test)]
563mod tests {
564    use super::*;
565
566    #[test]
567    fn pipeline_stage_ordering() {
568        let stages = PipelineStage::all();
569        assert_eq!(stages.len(), 5);
570        assert_eq!(stages[0], PipelineStage::WorkspaceAnalysis);
571        assert_eq!(stages[4], PipelineStage::CacheUpdate);
572    }
573
574    #[test]
575    fn pipeline_stage_indices() {
576        assert_eq!(PipelineStage::WorkspaceAnalysis.index(), 0);
577        assert_eq!(PipelineStage::Upload.index(), 1);
578        assert_eq!(PipelineStage::Compilation.index(), 2);
579        assert_eq!(PipelineStage::ArtifactRetrieval.index(), 3);
580        assert_eq!(PipelineStage::CacheUpdate.index(), 4);
581    }
582
583    #[test]
584    fn stage_status_icons_ascii() {
585        let ctx = OutputContext::Plain;
586        assert_eq!(StageStatus::Pending.icon(ctx), "o");
587        assert_eq!(StageStatus::Completed.icon(ctx), "[OK]");
588        assert_eq!(StageStatus::Failed.icon(ctx), "[FAIL]");
589    }
590
591    #[test]
592    fn pipeline_progress_stages() {
593        let ctx = OutputContext::Plain;
594        let mut pipeline = PipelineProgress::new(ctx, "test-worker", true);
595
596        assert!(pipeline.current_stage().is_none());
597        assert!(!pipeline.has_failed());
598
599        pipeline.start_stage(PipelineStage::WorkspaceAnalysis);
600        assert_eq!(
601            pipeline.current_stage(),
602            Some(PipelineStage::WorkspaceAnalysis)
603        );
604
605        pipeline.set_stage_detail("100 files");
606        pipeline.complete_stage();
607        assert!(pipeline.current_stage().is_none());
608
609        assert_eq!(pipeline.count_completed(), 1);
610    }
611
612    #[test]
613    fn pipeline_skip_stage() {
614        let ctx = OutputContext::Plain;
615        let mut pipeline = PipelineProgress::new(ctx, "worker", true);
616
617        pipeline.skip_stage(PipelineStage::Upload, "cache hit");
618        assert_eq!(pipeline.stages[1].status, StageStatus::Skipped);
619        assert_eq!(pipeline.stages[1].skip_reason.as_deref(), Some("cache hit"));
620    }
621
622    #[test]
623    fn pipeline_fail_stage() {
624        let ctx = OutputContext::Plain;
625        let mut pipeline = PipelineProgress::new(ctx, "worker", true);
626
627        pipeline.start_stage(PipelineStage::Compilation);
628        pipeline.fail_stage("build error");
629
630        assert!(pipeline.has_failed());
631        assert_eq!(pipeline.stages[2].status, StageStatus::Failed);
632        assert_eq!(
633            pipeline.stages[2].error_message.as_deref(),
634            Some("build error")
635        );
636    }
637
638    #[test]
639    fn format_duration_milliseconds() {
640        assert_eq!(format_duration(Duration::from_millis(50)), "50ms");
641        assert_eq!(format_duration(Duration::from_millis(250)), "0.2s");
642    }
643
644    #[test]
645    fn format_duration_seconds() {
646        assert_eq!(format_duration(Duration::from_secs(5)), "5.0s");
647        assert_eq!(format_duration(Duration::from_secs(45)), "45.0s");
648    }
649
650    #[test]
651    fn format_duration_minutes() {
652        assert_eq!(format_duration(Duration::from_secs(90)), "1:30");
653        assert_eq!(format_duration(Duration::from_secs(605)), "10:05");
654    }
655
656    #[test]
657    fn format_duration_hours() {
658        assert_eq!(format_duration(Duration::from_secs(3665)), "1:01:05");
659    }
660
661    #[test]
662    fn estimate_remaining_no_completed() {
663        let ctx = OutputContext::Plain;
664        let pipeline = PipelineProgress::new(ctx, "worker", true);
665        assert!(pipeline.estimate_remaining().is_none());
666    }
667
668    #[test]
669    fn count_completed_with_skipped() {
670        let ctx = OutputContext::Plain;
671        let mut pipeline = PipelineProgress::new(ctx, "worker", true);
672
673        pipeline.start_stage(PipelineStage::WorkspaceAnalysis);
674        pipeline.complete_stage();
675        pipeline.skip_stage(PipelineStage::Upload, "cache hit");
676
677        assert_eq!(pipeline.count_completed(), 2);
678    }
679
680    // -------------------------------------------------------------------------
681    // PipelineStage label tests
682    // -------------------------------------------------------------------------
683
684    #[test]
685    fn pipeline_stage_labels() {
686        assert_eq!(
687            PipelineStage::WorkspaceAnalysis.label(),
688            "Workspace analysis"
689        );
690        assert_eq!(PipelineStage::Upload.label(), "Upload to worker");
691        assert_eq!(PipelineStage::Compilation.label(), "Remote compilation");
692        assert_eq!(
693            PipelineStage::ArtifactRetrieval.label(),
694            "Artifact retrieval"
695        );
696        assert_eq!(PipelineStage::CacheUpdate.label(), "Cache update");
697    }
698
699    #[test]
700    fn pipeline_stage_short_labels() {
701        assert_eq!(PipelineStage::WorkspaceAnalysis.short_label(), "Workspace");
702        assert_eq!(PipelineStage::Upload.short_label(), "Upload");
703        assert_eq!(PipelineStage::Compilation.short_label(), "Compile");
704        assert_eq!(PipelineStage::ArtifactRetrieval.short_label(), "Download");
705        assert_eq!(PipelineStage::CacheUpdate.short_label(), "Cache");
706    }
707
708    #[test]
709    fn stage_status_in_progress_icon() {
710        let ctx = OutputContext::Plain;
711        assert_eq!(StageStatus::InProgress.icon(ctx), "[...]");
712    }
713
714    #[test]
715    fn stage_status_skipped_icon() {
716        let ctx = OutputContext::Plain;
717        assert_eq!(StageStatus::Skipped.icon(ctx), "[x]");
718    }
719
720    #[test]
721    fn stage_status_default() {
722        assert_eq!(StageStatus::default(), StageStatus::Pending);
723    }
724
725    #[test]
726    fn pipeline_progress_worker_info() {
727        let ctx = OutputContext::Plain;
728        let pipeline = PipelineProgress::new(ctx, "my-worker", false);
729        assert_eq!(pipeline.worker, "my-worker");
730        // enabled=true when quiet=false and not machine context
731        assert!(pipeline.enabled);
732    }
733
734    #[test]
735    fn pipeline_progress_quiet_mode() {
736        let ctx = OutputContext::Plain;
737        let pipeline = PipelineProgress::new(ctx, "worker", true);
738        // enabled=false when quiet=true
739        assert!(!pipeline.enabled);
740    }
741
742    #[test]
743    fn format_duration_zero() {
744        assert_eq!(format_duration(Duration::ZERO), "0ms");
745    }
746
747    #[test]
748    fn format_duration_exact_minute() {
749        assert_eq!(format_duration(Duration::from_secs(60)), "1:00");
750    }
751
752    #[test]
753    fn format_duration_exact_hour() {
754        assert_eq!(format_duration(Duration::from_secs(3600)), "1:00:00");
755    }
756}