Skip to main content

void_core/pipeline/
events.rs

1//! Pipeline event system for progress reporting and observability.
2
3use std::sync::Arc;
4
5use crate::support::events::{
6    FetchSource as UnifiedFetchSource, LegacyPipelineAdapter,
7    PipelineEvent as UnifiedPipelineEvent, VoidEvent,
8};
9
10/// Events emitted during pipeline operations.
11#[derive(Debug, Clone)]
12pub enum PipelineEvent {
13    /// A file was discovered during workspace traversal.
14    FileDiscovered { path: String, size: u64 },
15    /// A file was processed (compressed, encrypted, or written).
16    FileProcessed { path: String, shard_id: u64 },
17    /// A shard was created and stored.
18    ShardCreated {
19        cid: String,
20        size: u64,
21        file_count: usize,
22    },
23    /// A shard was fetched from storage.
24    ShardFetched { cid: String, source: FetchSource },
25    /// Progress update for long-running operations.
26    Progress {
27        stage: String,
28        current: u64,
29        total: u64,
30    },
31    /// Non-fatal warning.
32    Warning { message: String },
33    /// Error occurred (may or may not be recoverable).
34    Error { message: String, recoverable: bool },
35}
36
37/// Source of a fetched shard.
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum FetchSource {
40    /// Fetched from local object store.
41    Local,
42    /// Fetched from IPFS.
43    Ipfs,
44}
45
46/// Observer trait for receiving pipeline events.
47pub trait PipelineObserver: Send + Sync {
48    /// Called when an event occurs.
49    fn on_event(&self, event: &PipelineEvent);
50}
51
52/// No-op observer that discards all events.
53/// Used for backwards compatibility with non-event-aware code.
54pub struct NullObserver;
55
56impl PipelineObserver for NullObserver {
57    fn on_event(&self, _event: &PipelineEvent) {
58        // Intentionally empty
59    }
60}
61
62/// Observer that forwards events to multiple observers.
63pub struct MultiObserver {
64    observers: Vec<Arc<dyn PipelineObserver>>,
65}
66
67impl MultiObserver {
68    /// Create a new multi-observer from a list of observers.
69    pub fn new(observers: Vec<Arc<dyn PipelineObserver>>) -> Self {
70        Self { observers }
71    }
72}
73
74impl PipelineObserver for MultiObserver {
75    fn on_event(&self, event: &PipelineEvent) {
76        for observer in &self.observers {
77            observer.on_event(event);
78        }
79    }
80}
81
82/// Observer that collects events into a vector.
83/// Useful for testing.
84#[cfg(test)]
85pub struct CollectingObserver {
86    events: std::sync::Mutex<Vec<PipelineEvent>>,
87}
88
89#[cfg(test)]
90impl CollectingObserver {
91    pub fn new() -> Self {
92        Self {
93            events: std::sync::Mutex::new(Vec::new()),
94        }
95    }
96
97    pub fn events(&self) -> Vec<PipelineEvent> {
98        self.events.lock().unwrap_or_else(|e| e.into_inner()).clone()
99    }
100}
101
102#[cfg(test)]
103impl PipelineObserver for CollectingObserver {
104    fn on_event(&self, event: &PipelineEvent) {
105        self.events.lock().unwrap_or_else(|e| e.into_inner()).push(event.clone());
106    }
107}
108
109// ============================================================================
110// Legacy Adapter Implementation
111// ============================================================================
112
113/// Convert legacy FetchSource to unified FetchSource.
114fn convert_fetch_source(source: FetchSource) -> UnifiedFetchSource {
115    match source {
116        FetchSource::Local => UnifiedFetchSource::Local,
117        FetchSource::Ipfs => UnifiedFetchSource::Ipfs,
118    }
119}
120
121/// Convert legacy PipelineEvent to unified PipelineEvent.
122fn convert_pipeline_event(event: &PipelineEvent) -> UnifiedPipelineEvent {
123    match event {
124        PipelineEvent::FileDiscovered { path, size } => UnifiedPipelineEvent::FileDiscovered {
125            path: path.clone(),
126            size: *size,
127        },
128        PipelineEvent::FileProcessed { path, shard_id } => UnifiedPipelineEvent::FileProcessed {
129            path: path.clone(),
130            shard_id: *shard_id,
131        },
132        PipelineEvent::ShardCreated {
133            cid,
134            size,
135            file_count,
136        } => UnifiedPipelineEvent::ShardCreated {
137            cid: cid.clone(),
138            size: *size,
139            file_count: *file_count,
140        },
141        PipelineEvent::ShardFetched { cid, source } => UnifiedPipelineEvent::ShardFetched {
142            cid: cid.clone(),
143            source: convert_fetch_source(*source),
144        },
145        PipelineEvent::Progress {
146            stage,
147            current,
148            total,
149        } => UnifiedPipelineEvent::Progress {
150            stage: stage.clone(),
151            current: *current,
152            total: *total,
153        },
154        PipelineEvent::Warning { message } => UnifiedPipelineEvent::Warning {
155            message: message.clone(),
156        },
157        PipelineEvent::Error {
158            message,
159            recoverable,
160        } => UnifiedPipelineEvent::Error {
161            message: message.clone(),
162            recoverable: *recoverable,
163        },
164    }
165}
166
167impl PipelineObserver for LegacyPipelineAdapter {
168    fn on_event(&self, event: &PipelineEvent) {
169        let unified_event = convert_pipeline_event(event);
170        self.inner().on_event(&VoidEvent::Pipeline(unified_event));
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::*;
177
178    #[test]
179    fn null_observer_accepts_events() {
180        let observer = NullObserver;
181        observer.on_event(&PipelineEvent::Progress {
182            stage: "test".into(),
183            current: 1,
184            total: 10,
185        });
186        // Should not panic
187    }
188
189    #[test]
190    fn multi_observer_forwards_to_all() {
191        let collector1 = Arc::new(CollectingObserver::new());
192        let collector2 = Arc::new(CollectingObserver::new());
193
194        let multi = MultiObserver::new(vec![
195            collector1.clone() as Arc<dyn PipelineObserver>,
196            collector2.clone() as Arc<dyn PipelineObserver>,
197        ]);
198
199        multi.on_event(&PipelineEvent::FileDiscovered {
200            path: "test.rs".into(),
201            size: 100,
202        });
203
204        assert_eq!(collector1.events().len(), 1);
205        assert_eq!(collector2.events().len(), 1);
206    }
207
208    #[test]
209    fn collecting_observer_stores_events() {
210        let collector = CollectingObserver::new();
211
212        collector.on_event(&PipelineEvent::Progress {
213            stage: "seal".into(),
214            current: 5,
215            total: 10,
216        });
217        collector.on_event(&PipelineEvent::ShardCreated {
218            cid: "bafk...".into(),
219            size: 1024,
220            file_count: 3,
221        });
222
223        let events = collector.events();
224        assert_eq!(events.len(), 2);
225    }
226
227    #[test]
228    fn legacy_adapter_implements_pipeline_observer() {
229        use crate::support::events::{CollectingObserver as UnifiedCollector, VoidObserver};
230
231        // Create a unified VoidObserver to capture events
232        let unified_collector = Arc::new(UnifiedCollector::new());
233
234        // Wrap it in LegacyPipelineAdapter
235        let adapter =
236            LegacyPipelineAdapter::new(unified_collector.clone() as Arc<dyn VoidObserver>);
237
238        // Use it as a PipelineObserver (the impl we just added)
239        let observer: &dyn PipelineObserver = &adapter;
240
241        // Send a legacy PipelineEvent through the trait
242        observer.on_event(&PipelineEvent::FileDiscovered {
243            path: "test.rs".into(),
244            size: 1024,
245        });
246        observer.on_event(&PipelineEvent::ShardCreated {
247            cid: "bafk...".into(),
248            size: 2048,
249            file_count: 5,
250        });
251
252        // Verify events were received by the unified collector
253        let events = unified_collector.events();
254        assert_eq!(events.len(), 2);
255
256        // Verify first event converted correctly
257        match &events[0] {
258            VoidEvent::Pipeline(UnifiedPipelineEvent::FileDiscovered { path, size }) => {
259                assert_eq!(path, "test.rs");
260                assert_eq!(*size, 1024);
261            }
262            _ => panic!("Expected Pipeline(FileDiscovered), got {:?}", events[0]),
263        }
264
265        // Verify second event converted correctly
266        match &events[1] {
267            VoidEvent::Pipeline(UnifiedPipelineEvent::ShardCreated {
268                cid,
269                size,
270                file_count,
271            }) => {
272                assert_eq!(cid, "bafk...");
273                assert_eq!(*size, 2048);
274                assert_eq!(*file_count, 5);
275            }
276            _ => panic!("Expected Pipeline(ShardCreated), got {:?}", events[1]),
277        }
278    }
279}