1use std::sync::Arc;
2
3#[derive(Debug, Clone)]
4pub struct ProgressEvent {
5 pub operation: String,
6 pub phase: ProgressPhase,
7 pub current: u64,
8 pub total: Option<u64>,
9 pub message: String,
10}
11
12#[derive(Debug, Clone, PartialEq, Eq)]
13pub enum ProgressPhase {
14 Started,
15 Progress,
16 Completed,
17 Failed(String),
18}
19
20pub trait ProgressSink: Send + Sync {
21 fn emit(&self, event: ProgressEvent);
22}
23
24pub struct NoopProgress;
25
26impl ProgressSink for NoopProgress {
27 fn emit(&self, _event: ProgressEvent) {}
28}
29
30pub struct ChannelProgress {
31 tx: tokio::sync::mpsc::UnboundedSender<ProgressEvent>,
32}
33
34impl ChannelProgress {
35 pub fn new() -> (Self, tokio::sync::mpsc::UnboundedReceiver<ProgressEvent>) {
36 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
37 (Self { tx }, rx)
38 }
39}
40
41impl ProgressSink for ChannelProgress {
42 fn emit(&self, event: ProgressEvent) {
43 let _ = self.tx.send(event);
44 }
45}
46
47pub struct ProgressEmitter {
48 sink: Option<Arc<dyn ProgressSink>>,
49}
50
51impl ProgressEmitter {
52 pub fn new(sink: Option<Arc<dyn ProgressSink>>) -> Self {
53 Self { sink }
54 }
55
56 pub fn started(&self, operation: &str, message: &str) {
57 self.emit(ProgressEvent {
58 operation: operation.to_string(),
59 phase: ProgressPhase::Started,
60 current: 0,
61 total: None,
62 message: message.to_string(),
63 });
64 }
65
66 pub fn progress(&self, operation: &str, current: u64, total: Option<u64>, message: &str) {
67 self.emit(ProgressEvent {
68 operation: operation.to_string(),
69 phase: ProgressPhase::Progress,
70 current,
71 total,
72 message: message.to_string(),
73 });
74 }
75
76 pub fn completed(&self, operation: &str, message: &str) {
77 self.emit(ProgressEvent {
78 operation: operation.to_string(),
79 phase: ProgressPhase::Completed,
80 current: 0,
81 total: None,
82 message: message.to_string(),
83 });
84 }
85
86 pub fn failed(&self, operation: &str, error: &str) {
87 self.emit(ProgressEvent {
88 operation: operation.to_string(),
89 phase: ProgressPhase::Failed(error.to_string()),
90 current: 0,
91 total: None,
92 message: error.to_string(),
93 });
94 }
95
96 fn emit(&self, event: ProgressEvent) {
97 if let Some(ref sink) = self.sink {
98 sink.emit(event);
99 }
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106
107 #[test]
108 fn test_noop_does_not_panic() {
109 let noop = NoopProgress;
110 noop.emit(ProgressEvent {
111 operation: "test".to_string(),
112 phase: ProgressPhase::Started,
113 current: 0,
114 total: None,
115 message: "test".to_string(),
116 });
117 }
118
119 #[test]
120 fn test_channel_receives_events() {
121 let (progress, mut rx) = ChannelProgress::new();
122 progress.emit(ProgressEvent {
123 operation: "index".to_string(),
124 phase: ProgressPhase::Started,
125 current: 0,
126 total: Some(100),
127 message: "starting".to_string(),
128 });
129
130 let event = rx.try_recv().unwrap();
131 assert_eq!(event.operation, "index");
132 assert_eq!(event.phase, ProgressPhase::Started);
133 assert_eq!(event.total, Some(100));
134 }
135
136 #[test]
137 fn test_emitter_with_no_sink() {
138 let emitter = ProgressEmitter::new(None);
139 emitter.started("test", "starting");
140 emitter.progress("test", 50, Some(100), "halfway");
141 emitter.completed("test", "done");
142 }
143
144 #[test]
145 fn test_emitter_with_channel() {
146 let (progress, mut rx) = ChannelProgress::new();
147 let emitter = ProgressEmitter::new(Some(Arc::new(progress)));
148
149 emitter.started("build", "compiling");
150 emitter.progress("build", 5, Some(10), "file 5");
151 emitter.completed("build", "success");
152
153 let e1 = rx.try_recv().unwrap();
154 assert_eq!(e1.phase, ProgressPhase::Started);
155 let e2 = rx.try_recv().unwrap();
156 assert_eq!(e2.current, 5);
157 let e3 = rx.try_recv().unwrap();
158 assert_eq!(e3.phase, ProgressPhase::Completed);
159 }
160
161 #[test]
162 fn test_emitter_failed() {
163 let (progress, mut rx) = ChannelProgress::new();
164 let emitter = ProgressEmitter::new(Some(Arc::new(progress)));
165
166 emitter.failed("build", "compile error");
167
168 let event = rx.try_recv().unwrap();
169 assert!(matches!(event.phase, ProgressPhase::Failed(ref e) if e == "compile error"));
170 }
171
172 #[test]
173 fn test_channel_dropped_receiver_does_not_panic() {
174 let (progress, rx) = ChannelProgress::new();
175 drop(rx);
176 progress.emit(ProgressEvent {
177 operation: "test".to_string(),
178 phase: ProgressPhase::Started,
179 current: 0,
180 total: None,
181 message: "should not panic".to_string(),
182 });
183 }
184}