1use std::sync::Arc;
4
5use crate::support::events::{
6 FetchSource as UnifiedFetchSource, LegacyPipelineAdapter,
7 PipelineEvent as UnifiedPipelineEvent, VoidEvent,
8};
9
10#[derive(Debug, Clone)]
12pub enum PipelineEvent {
13 FileDiscovered { path: String, size: u64 },
15 FileProcessed { path: String, shard_id: u64 },
17 ShardCreated {
19 cid: String,
20 size: u64,
21 file_count: usize,
22 },
23 ShardFetched { cid: String, source: FetchSource },
25 Progress {
27 stage: String,
28 current: u64,
29 total: u64,
30 },
31 Warning { message: String },
33 Error { message: String, recoverable: bool },
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum FetchSource {
40 Local,
42 Ipfs,
44}
45
46pub trait PipelineObserver: Send + Sync {
48 fn on_event(&self, event: &PipelineEvent);
50}
51
52pub struct NullObserver;
55
56impl PipelineObserver for NullObserver {
57 fn on_event(&self, _event: &PipelineEvent) {
58 }
60}
61
62pub struct MultiObserver {
64 observers: Vec<Arc<dyn PipelineObserver>>,
65}
66
67impl MultiObserver {
68 pub fn new(observers: Vec<Arc<dyn PipelineObserver>>) -> Self {
70 Self { observers }
71 }
72}
73
74impl PipelineObserver for MultiObserver {
75 fn on_event(&self, event: &PipelineEvent) {
76 for observer in &self.observers {
77 observer.on_event(event);
78 }
79 }
80}
81
82#[cfg(test)]
85pub struct CollectingObserver {
86 events: std::sync::Mutex<Vec<PipelineEvent>>,
87}
88
89#[cfg(test)]
90impl CollectingObserver {
91 pub fn new() -> Self {
92 Self {
93 events: std::sync::Mutex::new(Vec::new()),
94 }
95 }
96
97 pub fn events(&self) -> Vec<PipelineEvent> {
98 self.events.lock().unwrap_or_else(|e| e.into_inner()).clone()
99 }
100}
101
102#[cfg(test)]
103impl PipelineObserver for CollectingObserver {
104 fn on_event(&self, event: &PipelineEvent) {
105 self.events.lock().unwrap_or_else(|e| e.into_inner()).push(event.clone());
106 }
107}
108
109fn convert_fetch_source(source: FetchSource) -> UnifiedFetchSource {
115 match source {
116 FetchSource::Local => UnifiedFetchSource::Local,
117 FetchSource::Ipfs => UnifiedFetchSource::Ipfs,
118 }
119}
120
121fn convert_pipeline_event(event: &PipelineEvent) -> UnifiedPipelineEvent {
123 match event {
124 PipelineEvent::FileDiscovered { path, size } => UnifiedPipelineEvent::FileDiscovered {
125 path: path.clone(),
126 size: *size,
127 },
128 PipelineEvent::FileProcessed { path, shard_id } => UnifiedPipelineEvent::FileProcessed {
129 path: path.clone(),
130 shard_id: *shard_id,
131 },
132 PipelineEvent::ShardCreated {
133 cid,
134 size,
135 file_count,
136 } => UnifiedPipelineEvent::ShardCreated {
137 cid: cid.clone(),
138 size: *size,
139 file_count: *file_count,
140 },
141 PipelineEvent::ShardFetched { cid, source } => UnifiedPipelineEvent::ShardFetched {
142 cid: cid.clone(),
143 source: convert_fetch_source(*source),
144 },
145 PipelineEvent::Progress {
146 stage,
147 current,
148 total,
149 } => UnifiedPipelineEvent::Progress {
150 stage: stage.clone(),
151 current: *current,
152 total: *total,
153 },
154 PipelineEvent::Warning { message } => UnifiedPipelineEvent::Warning {
155 message: message.clone(),
156 },
157 PipelineEvent::Error {
158 message,
159 recoverable,
160 } => UnifiedPipelineEvent::Error {
161 message: message.clone(),
162 recoverable: *recoverable,
163 },
164 }
165}
166
167impl PipelineObserver for LegacyPipelineAdapter {
168 fn on_event(&self, event: &PipelineEvent) {
169 let unified_event = convert_pipeline_event(event);
170 self.inner().on_event(&VoidEvent::Pipeline(unified_event));
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::*;
177
178 #[test]
179 fn null_observer_accepts_events() {
180 let observer = NullObserver;
181 observer.on_event(&PipelineEvent::Progress {
182 stage: "test".into(),
183 current: 1,
184 total: 10,
185 });
186 }
188
189 #[test]
190 fn multi_observer_forwards_to_all() {
191 let collector1 = Arc::new(CollectingObserver::new());
192 let collector2 = Arc::new(CollectingObserver::new());
193
194 let multi = MultiObserver::new(vec![
195 collector1.clone() as Arc<dyn PipelineObserver>,
196 collector2.clone() as Arc<dyn PipelineObserver>,
197 ]);
198
199 multi.on_event(&PipelineEvent::FileDiscovered {
200 path: "test.rs".into(),
201 size: 100,
202 });
203
204 assert_eq!(collector1.events().len(), 1);
205 assert_eq!(collector2.events().len(), 1);
206 }
207
208 #[test]
209 fn collecting_observer_stores_events() {
210 let collector = CollectingObserver::new();
211
212 collector.on_event(&PipelineEvent::Progress {
213 stage: "seal".into(),
214 current: 5,
215 total: 10,
216 });
217 collector.on_event(&PipelineEvent::ShardCreated {
218 cid: "bafk...".into(),
219 size: 1024,
220 file_count: 3,
221 });
222
223 let events = collector.events();
224 assert_eq!(events.len(), 2);
225 }
226
227 #[test]
228 fn legacy_adapter_implements_pipeline_observer() {
229 use crate::support::events::{CollectingObserver as UnifiedCollector, VoidObserver};
230
231 let unified_collector = Arc::new(UnifiedCollector::new());
233
234 let adapter =
236 LegacyPipelineAdapter::new(unified_collector.clone() as Arc<dyn VoidObserver>);
237
238 let observer: &dyn PipelineObserver = &adapter;
240
241 observer.on_event(&PipelineEvent::FileDiscovered {
243 path: "test.rs".into(),
244 size: 1024,
245 });
246 observer.on_event(&PipelineEvent::ShardCreated {
247 cid: "bafk...".into(),
248 size: 2048,
249 file_count: 5,
250 });
251
252 let events = unified_collector.events();
254 assert_eq!(events.len(), 2);
255
256 match &events[0] {
258 VoidEvent::Pipeline(UnifiedPipelineEvent::FileDiscovered { path, size }) => {
259 assert_eq!(path, "test.rs");
260 assert_eq!(*size, 1024);
261 }
262 _ => panic!("Expected Pipeline(FileDiscovered), got {:?}", events[0]),
263 }
264
265 match &events[1] {
267 VoidEvent::Pipeline(UnifiedPipelineEvent::ShardCreated {
268 cid,
269 size,
270 file_count,
271 }) => {
272 assert_eq!(cid, "bafk...");
273 assert_eq!(*size, 2048);
274 assert_eq!(*file_count, 5);
275 }
276 _ => panic!("Expected Pipeline(ShardCreated), got {:?}", events[1]),
277 }
278 }
279}