1use crate::ui::{Icons, OutputContext, ProgressContext};
11use std::time::{Duration, Instant};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
15pub enum PipelineStage {
16 WorkspaceAnalysis,
18 Upload,
20 Compilation,
22 ArtifactRetrieval,
24 CacheUpdate,
26}
27
28impl PipelineStage {
29 #[must_use]
31 pub fn label(self) -> &'static str {
32 match self {
33 Self::WorkspaceAnalysis => "Workspace analysis",
34 Self::Upload => "Upload to worker",
35 Self::Compilation => "Remote compilation",
36 Self::ArtifactRetrieval => "Artifact retrieval",
37 Self::CacheUpdate => "Cache update",
38 }
39 }
40
41 #[must_use]
43 pub fn short_label(self) -> &'static str {
44 match self {
45 Self::WorkspaceAnalysis => "Workspace",
46 Self::Upload => "Upload",
47 Self::Compilation => "Compile",
48 Self::ArtifactRetrieval => "Download",
49 Self::CacheUpdate => "Cache",
50 }
51 }
52
53 #[must_use]
55 pub fn all() -> &'static [PipelineStage] {
56 &[
57 Self::WorkspaceAnalysis,
58 Self::Upload,
59 Self::Compilation,
60 Self::ArtifactRetrieval,
61 Self::CacheUpdate,
62 ]
63 }
64
65 #[must_use]
67 pub fn index(self) -> usize {
68 match self {
69 Self::WorkspaceAnalysis => 0,
70 Self::Upload => 1,
71 Self::Compilation => 2,
72 Self::ArtifactRetrieval => 3,
73 Self::CacheUpdate => 4,
74 }
75 }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
80pub enum StageStatus {
81 #[default]
83 Pending,
84 InProgress,
86 Completed,
88 Skipped,
90 Failed,
92}
93
94impl StageStatus {
95 fn icon(self, ctx: OutputContext) -> &'static str {
96 match self {
97 Self::Pending => Icons::bullet_hollow(ctx),
98 Self::InProgress => Icons::hourglass(ctx),
99 Self::Completed => Icons::check(ctx),
100 Self::Skipped => Icons::status_disabled(ctx),
101 Self::Failed => Icons::cross(ctx),
102 }
103 }
104}
105
106#[derive(Debug, Clone)]
108struct StageInfo {
109 status: StageStatus,
110 start_time: Option<Instant>,
111 duration: Option<Duration>,
112 detail: Option<String>,
113 skip_reason: Option<String>,
114 error_message: Option<String>,
115}
116
117impl Default for StageInfo {
118 fn default() -> Self {
119 Self {
120 status: StageStatus::Pending,
121 start_time: None,
122 duration: None,
123 detail: None,
124 skip_reason: None,
125 error_message: None,
126 }
127 }
128}
129
130#[derive(Debug)]
158pub struct PipelineProgress {
159 ctx: OutputContext,
160 worker: String,
161 enabled: bool,
162 progress: Option<ProgressContext>,
163 start: Instant,
164 stages: Vec<StageInfo>,
165 current_stage: Option<PipelineStage>,
166 cache_saved_time: Option<Duration>,
167}
168
169impl PipelineProgress {
170 #[must_use]
172 pub fn new(ctx: OutputContext, worker: impl Into<String>, quiet: bool) -> Self {
173 let enabled = !quiet && !ctx.is_machine();
174 let progress = if enabled && matches!(ctx, OutputContext::Interactive) {
175 Some(ProgressContext::new(ctx))
176 } else {
177 None
178 };
179
180 let stages = PipelineStage::all()
181 .iter()
182 .map(|_| StageInfo::default())
183 .collect();
184
185 Self {
186 ctx,
187 worker: worker.into(),
188 enabled,
189 progress,
190 start: Instant::now(),
191 stages,
192 current_stage: None,
193 cache_saved_time: None,
194 }
195 }
196
197 pub fn start_stage(&mut self, stage: PipelineStage) {
199 let idx = stage.index();
200 if idx < self.stages.len() {
201 self.stages[idx].status = StageStatus::InProgress;
202 self.stages[idx].start_time = Some(Instant::now());
203 self.current_stage = Some(stage);
204 }
205 self.render();
206 }
207
208 pub fn set_stage_detail(&mut self, detail: impl Into<String>) {
210 if let Some(stage) = self.current_stage {
211 let idx = stage.index();
212 if idx < self.stages.len() {
213 self.stages[idx].detail = Some(detail.into());
214 }
215 }
216 self.render();
217 }
218
219 pub fn update_detail(&mut self, detail: impl Into<String>) {
221 if let Some(stage) = self.current_stage {
222 let idx = stage.index();
223 if idx < self.stages.len() {
224 self.stages[idx].detail = Some(detail.into());
225 }
226 }
227 self.render();
228 }
229
230 pub fn complete_stage(&mut self) {
232 if let Some(stage) = self.current_stage.take() {
233 let idx = stage.index();
234 if idx < self.stages.len() {
235 let info = &mut self.stages[idx];
236 info.status = StageStatus::Completed;
237 info.duration = info.start_time.map(|start| start.elapsed());
238 }
239 }
240 self.render();
241 }
242
243 pub fn skip_stage(&mut self, stage: PipelineStage, reason: impl Into<String>) {
245 let idx = stage.index();
246 if idx < self.stages.len() {
247 self.stages[idx].status = StageStatus::Skipped;
248 self.stages[idx].skip_reason = Some(reason.into());
249 }
250 if self.current_stage == Some(stage) {
252 self.current_stage = None;
253 }
254 self.render();
255 }
256
257 pub fn fail_stage(&mut self, error: impl Into<String>) {
259 if let Some(stage) = self.current_stage.take() {
260 let idx = stage.index();
261 if idx < self.stages.len() {
262 let info = &mut self.stages[idx];
263 info.status = StageStatus::Failed;
264 info.duration = info.start_time.map(|start| start.elapsed());
265 info.error_message = Some(error.into());
266 }
267 }
268 self.render();
269 }
270
271 pub fn set_cache_saved_time(&mut self, saved: Duration) {
273 self.cache_saved_time = Some(saved);
274 }
275
276 #[must_use]
278 pub fn elapsed(&self) -> Duration {
279 self.start.elapsed()
280 }
281
282 #[must_use]
284 pub fn has_failed(&self) -> bool {
285 self.stages.iter().any(|s| s.status == StageStatus::Failed)
286 }
287
288 #[must_use]
290 pub fn current_stage(&self) -> Option<PipelineStage> {
291 self.current_stage
292 }
293
294 fn estimate_remaining(&self) -> Option<Duration> {
296 let completed: Vec<Duration> = self
298 .stages
299 .iter()
300 .filter_map(|s| {
301 if s.status == StageStatus::Completed {
302 s.duration
303 } else {
304 None
305 }
306 })
307 .collect();
308
309 if completed.is_empty() {
310 return None;
311 }
312
313 let completed_count: u32 = completed.len().try_into().unwrap_or(u32::MAX);
315 let avg_duration: Duration = completed.iter().sum::<Duration>() / completed_count;
316
317 let remaining_count: u32 = self
318 .stages
319 .iter()
320 .filter(|s| matches!(s.status, StageStatus::Pending | StageStatus::InProgress))
321 .count()
322 .try_into()
323 .unwrap_or(u32::MAX);
324
325 if remaining_count == 0 {
326 return None;
327 }
328
329 Some(avg_duration.saturating_mul(remaining_count))
330 }
331
332 fn render(&mut self) {
334 if !self.enabled {
335 return;
336 }
337
338 let elapsed = format_duration(self.start.elapsed());
339 let eta = self
340 .estimate_remaining()
341 .map(|d| format!("~{}", format_duration(d)))
342 .unwrap_or_else(|| "--".to_string());
343
344 let _stages_display: Vec<String> = PipelineStage::all()
346 .iter()
347 .map(|stage| {
348 let idx = stage.index();
349 let info = &self.stages[idx];
350 let icon = info.status.icon(self.ctx);
351 let label = stage.short_label();
352
353 match info.status {
354 StageStatus::Completed => {
355 let dur = info
356 .duration
357 .map(format_duration)
358 .unwrap_or_else(|| "-".to_string());
359 let detail = info
360 .detail
361 .as_ref()
362 .map(|d| format!(" ({d})"))
363 .unwrap_or_default();
364 format!("{icon} {label} {dur}{detail}")
365 }
366 StageStatus::InProgress => {
367 let dur = info
368 .start_time
369 .map(|s| format_duration(s.elapsed()))
370 .unwrap_or_else(|| "-".to_string());
371 let detail = info
372 .detail
373 .as_ref()
374 .map(|d| format!(" ({d})"))
375 .unwrap_or_default();
376 format!("{icon} {label} {dur}{detail}")
377 }
378 StageStatus::Skipped => {
379 let reason = info
380 .skip_reason
381 .as_ref()
382 .map(|r| format!(" ({r})"))
383 .unwrap_or_default();
384 format!("{icon} {label}{reason}")
385 }
386 StageStatus::Failed => {
387 let dur = info
388 .duration
389 .map(format_duration)
390 .unwrap_or_else(|| "-".to_string());
391 format!("{icon} {label} {dur} FAILED")
392 }
393 StageStatus::Pending => {
394 format!("{icon} {label}")
395 }
396 }
397 })
398 .collect();
399
400 let line = format!(
401 "Pipeline [{}/{}] {} | {} | ETA {}",
402 self.count_completed(),
403 PipelineStage::all().len(),
404 self.worker,
405 elapsed,
406 eta
407 );
408
409 if let Some(progress) = &mut self.progress {
410 progress.render(&line);
411 }
412 }
413
414 fn count_completed(&self) -> usize {
415 self.stages
416 .iter()
417 .filter(|s| matches!(s.status, StageStatus::Completed | StageStatus::Skipped))
418 .count()
419 }
420
421 pub fn clear(&self) {
423 if let Some(progress) = &self.progress {
424 progress.clear();
425 }
426 }
427
428 pub fn finish(&mut self) {
430 self.clear();
431
432 if !self.enabled {
433 return;
434 }
435
436 let duration = self.start.elapsed();
437 let icon = if self.has_failed() {
438 Icons::cross(self.ctx)
439 } else {
440 Icons::check(self.ctx)
441 };
442
443 let status = if self.has_failed() {
444 "failed"
445 } else {
446 "completed"
447 };
448 let elapsed = format_duration(duration);
449
450 let mut summary = format!("{icon} Pipeline {status} on {} in {elapsed}", self.worker);
452
453 if let Some(saved) = self.cache_saved_time {
455 let saved_str = format_duration(saved);
456 summary.push_str(&format!(" (cache saved ~{saved_str})"));
457 }
458
459 eprintln!("{summary}");
460
461 for stage in PipelineStage::all() {
463 let idx = stage.index();
464 let info = &self.stages[idx];
465 let icon = info.status.icon(self.ctx);
466 let label = stage.label();
467
468 match info.status {
469 StageStatus::Completed => {
470 let dur = info
471 .duration
472 .map(format_duration)
473 .unwrap_or_else(|| "-".to_string());
474 let detail = info
475 .detail
476 .as_ref()
477 .map(|d| format!(" ({d})"))
478 .unwrap_or_default();
479 eprintln!(" {icon} {label:<22} {dur:>8}{detail}");
480 }
481 StageStatus::Skipped => {
482 let reason = info
483 .skip_reason
484 .as_ref()
485 .map(|r| format!(" ({r})"))
486 .unwrap_or_default();
487 eprintln!(" {icon} {label:<22} skipped{reason}");
488 }
489 StageStatus::Failed => {
490 let dur = info
491 .duration
492 .map(format_duration)
493 .unwrap_or_else(|| "-".to_string());
494 let error = info
495 .error_message
496 .as_ref()
497 .map(|e| format!(" ({e})"))
498 .unwrap_or_default();
499 eprintln!(" {icon} {label:<22} {dur:>8} FAILED{error}");
500 }
501 _ => {}
502 }
503 }
504 }
505
506 pub fn finish_error(&mut self, error: &str) {
508 self.clear();
509
510 if !self.enabled {
511 return;
512 }
513
514 let icon = Icons::cross(self.ctx);
515 let elapsed = format_duration(self.start.elapsed());
516
517 eprintln!(
518 "{icon} Pipeline failed on {} after {elapsed}: {error}",
519 self.worker
520 );
521
522 for stage in PipelineStage::all() {
524 let idx = stage.index();
525 let info = &self.stages[idx];
526 if info.status == StageStatus::Completed || info.status == StageStatus::Failed {
527 let stage_icon = info.status.icon(self.ctx);
528 let label = stage.label();
529 let dur = info
530 .duration
531 .map(format_duration)
532 .unwrap_or_else(|| "-".to_string());
533 eprintln!(" {stage_icon} {label:<22} {dur:>8}");
534 }
535 }
536 }
537}
538
539fn format_duration(duration: Duration) -> String {
540 let total_secs = duration.as_secs();
541 if total_secs == 0 {
542 let ms = duration.as_millis();
543 if ms < 100 {
544 return format!("{ms}ms");
545 }
546 return format!("{:.1}s", duration.as_secs_f64());
547 }
548 if total_secs < 60 {
549 format!("{:.1}s", duration.as_secs_f64())
550 } else if total_secs < 3600 {
551 let mins = total_secs / 60;
552 let secs = total_secs % 60;
553 format!("{mins}:{secs:02}")
554 } else {
555 let hours = total_secs / 3600;
556 let mins = (total_secs % 3600) / 60;
557 let secs = total_secs % 60;
558 format!("{hours}:{mins:02}:{secs:02}")
559 }
560}
561
562#[cfg(test)]
563mod tests {
564 use super::*;
565
566 #[test]
567 fn pipeline_stage_ordering() {
568 let stages = PipelineStage::all();
569 assert_eq!(stages.len(), 5);
570 assert_eq!(stages[0], PipelineStage::WorkspaceAnalysis);
571 assert_eq!(stages[4], PipelineStage::CacheUpdate);
572 }
573
574 #[test]
575 fn pipeline_stage_indices() {
576 assert_eq!(PipelineStage::WorkspaceAnalysis.index(), 0);
577 assert_eq!(PipelineStage::Upload.index(), 1);
578 assert_eq!(PipelineStage::Compilation.index(), 2);
579 assert_eq!(PipelineStage::ArtifactRetrieval.index(), 3);
580 assert_eq!(PipelineStage::CacheUpdate.index(), 4);
581 }
582
583 #[test]
584 fn stage_status_icons_ascii() {
585 let ctx = OutputContext::Plain;
586 assert_eq!(StageStatus::Pending.icon(ctx), "o");
587 assert_eq!(StageStatus::Completed.icon(ctx), "[OK]");
588 assert_eq!(StageStatus::Failed.icon(ctx), "[FAIL]");
589 }
590
591 #[test]
592 fn pipeline_progress_stages() {
593 let ctx = OutputContext::Plain;
594 let mut pipeline = PipelineProgress::new(ctx, "test-worker", true);
595
596 assert!(pipeline.current_stage().is_none());
597 assert!(!pipeline.has_failed());
598
599 pipeline.start_stage(PipelineStage::WorkspaceAnalysis);
600 assert_eq!(
601 pipeline.current_stage(),
602 Some(PipelineStage::WorkspaceAnalysis)
603 );
604
605 pipeline.set_stage_detail("100 files");
606 pipeline.complete_stage();
607 assert!(pipeline.current_stage().is_none());
608
609 assert_eq!(pipeline.count_completed(), 1);
610 }
611
612 #[test]
613 fn pipeline_skip_stage() {
614 let ctx = OutputContext::Plain;
615 let mut pipeline = PipelineProgress::new(ctx, "worker", true);
616
617 pipeline.skip_stage(PipelineStage::Upload, "cache hit");
618 assert_eq!(pipeline.stages[1].status, StageStatus::Skipped);
619 assert_eq!(pipeline.stages[1].skip_reason.as_deref(), Some("cache hit"));
620 }
621
622 #[test]
623 fn pipeline_fail_stage() {
624 let ctx = OutputContext::Plain;
625 let mut pipeline = PipelineProgress::new(ctx, "worker", true);
626
627 pipeline.start_stage(PipelineStage::Compilation);
628 pipeline.fail_stage("build error");
629
630 assert!(pipeline.has_failed());
631 assert_eq!(pipeline.stages[2].status, StageStatus::Failed);
632 assert_eq!(
633 pipeline.stages[2].error_message.as_deref(),
634 Some("build error")
635 );
636 }
637
638 #[test]
639 fn format_duration_milliseconds() {
640 assert_eq!(format_duration(Duration::from_millis(50)), "50ms");
641 assert_eq!(format_duration(Duration::from_millis(250)), "0.2s");
642 }
643
644 #[test]
645 fn format_duration_seconds() {
646 assert_eq!(format_duration(Duration::from_secs(5)), "5.0s");
647 assert_eq!(format_duration(Duration::from_secs(45)), "45.0s");
648 }
649
650 #[test]
651 fn format_duration_minutes() {
652 assert_eq!(format_duration(Duration::from_secs(90)), "1:30");
653 assert_eq!(format_duration(Duration::from_secs(605)), "10:05");
654 }
655
656 #[test]
657 fn format_duration_hours() {
658 assert_eq!(format_duration(Duration::from_secs(3665)), "1:01:05");
659 }
660
661 #[test]
662 fn estimate_remaining_no_completed() {
663 let ctx = OutputContext::Plain;
664 let pipeline = PipelineProgress::new(ctx, "worker", true);
665 assert!(pipeline.estimate_remaining().is_none());
666 }
667
668 #[test]
669 fn count_completed_with_skipped() {
670 let ctx = OutputContext::Plain;
671 let mut pipeline = PipelineProgress::new(ctx, "worker", true);
672
673 pipeline.start_stage(PipelineStage::WorkspaceAnalysis);
674 pipeline.complete_stage();
675 pipeline.skip_stage(PipelineStage::Upload, "cache hit");
676
677 assert_eq!(pipeline.count_completed(), 2);
678 }
679
680 #[test]
685 fn pipeline_stage_labels() {
686 assert_eq!(
687 PipelineStage::WorkspaceAnalysis.label(),
688 "Workspace analysis"
689 );
690 assert_eq!(PipelineStage::Upload.label(), "Upload to worker");
691 assert_eq!(PipelineStage::Compilation.label(), "Remote compilation");
692 assert_eq!(
693 PipelineStage::ArtifactRetrieval.label(),
694 "Artifact retrieval"
695 );
696 assert_eq!(PipelineStage::CacheUpdate.label(), "Cache update");
697 }
698
699 #[test]
700 fn pipeline_stage_short_labels() {
701 assert_eq!(PipelineStage::WorkspaceAnalysis.short_label(), "Workspace");
702 assert_eq!(PipelineStage::Upload.short_label(), "Upload");
703 assert_eq!(PipelineStage::Compilation.short_label(), "Compile");
704 assert_eq!(PipelineStage::ArtifactRetrieval.short_label(), "Download");
705 assert_eq!(PipelineStage::CacheUpdate.short_label(), "Cache");
706 }
707
708 #[test]
709 fn stage_status_in_progress_icon() {
710 let ctx = OutputContext::Plain;
711 assert_eq!(StageStatus::InProgress.icon(ctx), "[...]");
712 }
713
714 #[test]
715 fn stage_status_skipped_icon() {
716 let ctx = OutputContext::Plain;
717 assert_eq!(StageStatus::Skipped.icon(ctx), "[x]");
718 }
719
720 #[test]
721 fn stage_status_default() {
722 assert_eq!(StageStatus::default(), StageStatus::Pending);
723 }
724
725 #[test]
726 fn pipeline_progress_worker_info() {
727 let ctx = OutputContext::Plain;
728 let pipeline = PipelineProgress::new(ctx, "my-worker", false);
729 assert_eq!(pipeline.worker, "my-worker");
730 assert!(pipeline.enabled);
732 }
733
734 #[test]
735 fn pipeline_progress_quiet_mode() {
736 let ctx = OutputContext::Plain;
737 let pipeline = PipelineProgress::new(ctx, "worker", true);
738 assert!(!pipeline.enabled);
740 }
741
742 #[test]
743 fn format_duration_zero() {
744 assert_eq!(format_duration(Duration::ZERO), "0ms");
745 }
746
747 #[test]
748 fn format_duration_exact_minute() {
749 assert_eq!(format_duration(Duration::from_secs(60)), "1:00");
750 }
751
752 #[test]
753 fn format_duration_exact_hour() {
754 assert_eq!(format_duration(Duration::from_secs(3600)), "1:00:00");
755 }
756}