Skip to main content

void_core/support/
events.rs

1//! Unified event system for progress reporting and observability.
2//!
3//! This module provides a unified event system that wraps all event categories
4//! used throughout void-core. It enables consistent progress reporting and
5//! observability across different subsystems.
6//!
7//! # Event Categories
8//!
9//! - [`PipelineEvent`] - seal/unseal operations
10//! - [`WorkspaceEvent`] - stage/checkout operations
11//! - [`OpsEvent`] - fsck/merge operations
12//! - [`P2PEvent`] - networking operations
13//!
14//! # Usage
15//!
16//! ```rust,ignore
17//! use void_core::support::events::{VoidEvent, VoidObserver, emit};
18//!
19//! struct MyObserver;
20//!
21//! impl VoidObserver for MyObserver {
22//!     fn on_event(&self, event: &VoidEvent) {
23//!         println!("Event: {:?}", event);
24//!     }
25//! }
26//! ```
27
28use std::sync::Arc;
29
30// ============================================================================
31// Unified Event Type
32// ============================================================================
33
34/// Unified event type wrapping all event categories.
35///
36/// This enum provides a single type for all events emitted by void-core,
37/// making it easy to implement observers that handle all event types.
38#[derive(Debug, Clone)]
39pub enum VoidEvent {
40    /// Pipeline events (seal/unseal operations).
41    Pipeline(PipelineEvent),
42    /// Workspace events (stage/checkout operations).
43    Workspace(WorkspaceEvent),
44    /// Ops events (fsck/merge operations).
45    Ops(OpsEvent),
46    /// P2P events (networking operations).
47    P2P(P2PEvent),
48}
49
50// ============================================================================
51// Event Type Enums
52// ============================================================================
53
54/// Events emitted during pipeline operations (seal/unseal).
55#[derive(Debug, Clone)]
56pub enum PipelineEvent {
57    /// A file was discovered during workspace traversal.
58    FileDiscovered {
59        /// Path to the discovered file.
60        path: String,
61        /// Size of the file in bytes.
62        size: u64,
63    },
64    /// A file was processed (compressed, encrypted, or written).
65    FileProcessed {
66        /// Path to the processed file.
67        path: String,
68        /// ID of the shard containing the file.
69        shard_id: u64,
70    },
71    /// A shard was created and stored.
72    ShardCreated {
73        /// CID of the created shard.
74        cid: String,
75        /// Size of the shard in bytes.
76        size: u64,
77        /// Number of files in the shard.
78        file_count: usize,
79    },
80    /// A shard was fetched from storage.
81    ShardFetched {
82        /// CID of the fetched shard.
83        cid: String,
84        /// Source from which the shard was fetched.
85        source: FetchSource,
86    },
87    /// Progress update for long-running operations.
88    Progress {
89        /// Current stage name.
90        stage: String,
91        /// Current progress value.
92        current: u64,
93        /// Total expected value.
94        total: u64,
95    },
96    /// Non-fatal warning during operation.
97    Warning {
98        /// Warning message.
99        message: String,
100    },
101    /// Error occurred (may or may not be recoverable).
102    Error {
103        /// Error message.
104        message: String,
105        /// Whether the error is recoverable.
106        recoverable: bool,
107    },
108}
109
110/// Events emitted during workspace operations (stage/checkout).
111#[derive(Debug, Clone)]
112pub enum WorkspaceEvent {
113    /// A file was staged for commit.
114    FileStaged {
115        /// Path to the staged file.
116        path: String,
117    },
118    /// A file was unstaged.
119    FileUnstaged {
120        /// Path to the unstaged file.
121        path: String,
122    },
123    /// A file was checked out from a commit.
124    FileCheckedOut {
125        /// Path to the checked out file.
126        path: String,
127    },
128    /// A file was skipped during checkout.
129    FileSkipped {
130        /// Path to the skipped file.
131        path: String,
132        /// Reason for skipping.
133        reason: String,
134    },
135    /// Progress update for workspace operations.
136    Progress {
137        /// Current stage name.
138        stage: String,
139        /// Current progress value.
140        current: u64,
141        /// Total expected value.
142        total: u64,
143    },
144}
145
146/// Events emitted during ops operations (fsck/merge).
147#[derive(Debug, Clone)]
148pub enum OpsEvent {
149    /// An object was checked during fsck.
150    ObjectChecked {
151        /// CID of the checked object.
152        cid: String,
153        /// Type of the object (commit, shard, etc.).
154        object_type: String,
155    },
156    /// An issue was found during fsck.
157    IssueFound {
158        /// CID of the problematic object.
159        cid: String,
160        /// Description of the issue.
161        message: String,
162        /// Severity of the issue.
163        severity: IssueSeverity,
164    },
165    /// Progress update for merge operations.
166    MergeProgress {
167        /// Current stage of the merge.
168        stage: String,
169        /// Number of conflicts resolved.
170        conflicts_resolved: u64,
171        /// Total number of conflicts.
172        conflicts_total: u64,
173    },
174    /// Progress update for ops operations.
175    Progress {
176        /// Current stage name.
177        stage: String,
178        /// Current progress value.
179        current: u64,
180        /// Total expected value.
181        total: u64,
182    },
183}
184
185/// Events emitted during P2P networking operations.
186#[derive(Debug, Clone)]
187pub enum P2PEvent {
188    /// A peer was discovered on the network.
189    PeerDiscovered {
190        /// Peer ID of the discovered peer.
191        peer_id: String,
192        /// Multiaddresses of the peer.
193        addresses: Vec<String>,
194    },
195    /// Connected to a peer.
196    PeerConnected {
197        /// Peer ID of the connected peer.
198        peer_id: String,
199    },
200    /// Disconnected from a peer.
201    PeerDisconnected {
202        /// Peer ID of the disconnected peer.
203        peer_id: String,
204    },
205    /// A peer was identified with additional metadata.
206    PeerIdentified {
207        /// Peer ID of the identified peer.
208        peer_id: String,
209        /// Agent version string.
210        agent_version: String,
211    },
212    /// Local node started listening on an address.
213    Listening {
214        /// Address the node is listening on.
215        address: String,
216    },
217    /// A share was received from a peer.
218    ShareReceived {
219        /// Peer ID of the sender.
220        from_peer: String,
221        /// CID of the shared content.
222        cid: String,
223    },
224    /// A share was sent to a peer.
225    ShareSent {
226        /// Peer ID of the recipient.
227        to_peer: String,
228        /// CID of the shared content.
229        cid: String,
230    },
231    /// A share operation failed.
232    ShareFailed {
233        /// Peer ID of the intended recipient (if known).
234        to_peer: Option<String>,
235        /// Error message.
236        error: String,
237    },
238    /// DHT bootstrap completed.
239    BootstrapComplete,
240    /// DHT record was found.
241    DhtRecordFound {
242        /// Key of the record.
243        key: String,
244    },
245    /// DHT record was stored.
246    DhtRecordStored {
247        /// Key of the record.
248        key: String,
249    },
250    /// NAT status changed.
251    NatStatusChanged {
252        /// New NAT status.
253        status: String,
254    },
255    /// Hole punch succeeded.
256    HolePunchSucceeded {
257        /// Peer ID of the remote peer.
258        peer_id: String,
259    },
260    /// An inbox head was ignored due to replay detection (seq already seen).
261    InboxReplayIgnored {
262        /// Peer ID of the sender.
263        sender_peer_id: String,
264        /// The sequence number that was replayed.
265        seq: u64,
266        /// The last good sequence number seen.
267        last_good_seq: u64,
268    },
269    /// A fork was detected in the inbox chain (same seq, different prev).
270    InboxForkDetected {
271        /// Peer ID of the sender.
272        sender_peer_id: String,
273        /// The sequence number where fork was detected.
274        seq: u64,
275        /// The expected prev CID.
276        expected_prev: Option<String>,
277        /// The actual prev CID received.
278        actual_prev: Option<String>,
279    },
280    /// A share send was attempted.
281    ShareSendAttempt {
282        /// Peer ID of the recipient.
283        to_peer: String,
284        /// Delivery method being attempted (direct or mailbox).
285        method: String,
286    },
287    /// An envelope fetch was queued for retrieval.
288    EnvelopeFetchQueued {
289        /// CID of the envelope to fetch.
290        cid: String,
291        /// Peer ID of the sender.
292        from_peer: String,
293    },
294    /// An envelope fetch failed.
295    EnvelopeFetchFailed {
296        /// CID of the envelope that failed to fetch.
297        cid: String,
298        /// Error message.
299        error: String,
300    },
301}
302
303// ============================================================================
304// Supporting Types
305// ============================================================================
306
307/// Source from which a shard was fetched.
308#[derive(Debug, Clone, Copy, PartialEq, Eq)]
309pub enum FetchSource {
310    /// Fetched from local object store.
311    Local,
312    /// Fetched from IPFS network.
313    Ipfs,
314}
315
316/// Severity level for issues found during fsck.
317#[derive(Debug, Clone, Copy, PartialEq, Eq)]
318pub enum IssueSeverity {
319    /// Informational message, not an actual problem.
320    Info,
321    /// Warning that may indicate a problem.
322    Warning,
323    /// Error that indicates data corruption or loss.
324    Error,
325}
326
327// ============================================================================
328// Observer Trait
329// ============================================================================
330
331/// Observer trait for receiving void events.
332///
333/// Implement this trait to receive events from void-core operations.
334/// The observer must be thread-safe (`Send + Sync`) as it may be called
335/// from multiple threads during parallel operations.
336pub trait VoidObserver: Send + Sync {
337    /// Called when an event occurs.
338    ///
339    /// # Arguments
340    ///
341    /// * `event` - The event that occurred.
342    fn on_event(&self, event: &VoidEvent);
343}
344
345// ============================================================================
346// Built-in Observers
347// ============================================================================
348
349/// No-op observer that discards all events.
350///
351/// Used when event handling is not needed or for backwards compatibility
352/// with code that doesn't use events.
353#[derive(Debug, Clone, Copy, Default)]
354pub struct NullObserver;
355
356impl VoidObserver for NullObserver {
357    fn on_event(&self, _event: &VoidEvent) {
358        // Intentionally empty - discards all events
359    }
360}
361
362/// Observer that forwards events to multiple child observers.
363///
364/// Useful for composing multiple observers, such as logging to both
365/// a progress bar and a log file.
366pub struct MultiObserver {
367    observers: Vec<Arc<dyn VoidObserver>>,
368}
369
370impl MultiObserver {
371    /// Create a new multi-observer from a list of child observers.
372    ///
373    /// # Arguments
374    ///
375    /// * `observers` - List of observers to forward events to.
376    pub fn new(observers: Vec<Arc<dyn VoidObserver>>) -> Self {
377        Self { observers }
378    }
379
380    /// Add an observer to the list.
381    ///
382    /// # Arguments
383    ///
384    /// * `observer` - Observer to add.
385    pub fn add(&mut self, observer: Arc<dyn VoidObserver>) {
386        self.observers.push(observer);
387    }
388
389    /// Returns the number of child observers.
390    pub fn len(&self) -> usize {
391        self.observers.len()
392    }
393
394    /// Returns true if there are no child observers.
395    pub fn is_empty(&self) -> bool {
396        self.observers.is_empty()
397    }
398}
399
400impl VoidObserver for MultiObserver {
401    fn on_event(&self, event: &VoidEvent) {
402        for observer in &self.observers {
403            observer.on_event(event);
404        }
405    }
406}
407
408impl std::fmt::Debug for MultiObserver {
409    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
410        f.debug_struct("MultiObserver")
411            .field("observer_count", &self.observers.len())
412            .finish()
413    }
414}
415
416/// Observer that collects events into a vector for testing.
417///
418/// This observer is only available in test builds and is useful for
419/// verifying that the correct events are emitted during operations.
420#[cfg(any(test, feature = "test-utils"))]
421pub struct CollectingObserver {
422    events: std::sync::Mutex<Vec<VoidEvent>>,
423}
424
425#[cfg(any(test, feature = "test-utils"))]
426impl CollectingObserver {
427    /// Create a new collecting observer.
428    pub fn new() -> Self {
429        Self {
430            events: std::sync::Mutex::new(Vec::new()),
431        }
432    }
433
434    /// Get a copy of all collected events.
435    pub fn events(&self) -> Vec<VoidEvent> {
436        self.events.lock().unwrap().clone()
437    }
438
439    /// Clear all collected events.
440    pub fn clear(&self) {
441        self.events.lock().unwrap().clear();
442    }
443
444    /// Returns the number of collected events.
445    pub fn len(&self) -> usize {
446        self.events.lock().unwrap().len()
447    }
448
449    /// Returns true if no events have been collected.
450    pub fn is_empty(&self) -> bool {
451        self.events.lock().unwrap().is_empty()
452    }
453}
454
455#[cfg(any(test, feature = "test-utils"))]
456impl Default for CollectingObserver {
457    fn default() -> Self {
458        Self::new()
459    }
460}
461
462#[cfg(any(test, feature = "test-utils"))]
463impl VoidObserver for CollectingObserver {
464    fn on_event(&self, event: &VoidEvent) {
465        self.events.lock().unwrap().push(event.clone());
466    }
467}
468
469#[cfg(any(test, feature = "test-utils"))]
470impl std::fmt::Debug for CollectingObserver {
471    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
472        f.debug_struct("CollectingObserver")
473            .field("event_count", &self.len())
474            .finish()
475    }
476}
477
478// ============================================================================
479// Helper Functions
480// ============================================================================
481
482/// Emit a void event to an optional observer.
483///
484/// This helper function handles the common pattern of emitting events
485/// to an optional observer, doing nothing if the observer is None.
486///
487/// # Arguments
488///
489/// * `observer` - Optional observer to emit to.
490/// * `event` - Event to emit.
491///
492/// # Example
493///
494/// ```rust,ignore
495/// use void_core::support::events::{VoidEvent, VoidObserver, emit, PipelineEvent};
496///
497/// fn process_file(observer: &Option<Arc<dyn VoidObserver>>, path: &str) {
498///     emit(observer, VoidEvent::Pipeline(PipelineEvent::FileDiscovered {
499///         path: path.to_string(),
500///         size: 1024,
501///     }));
502/// }
503/// ```
504pub fn emit(observer: &Option<Arc<dyn VoidObserver>>, event: VoidEvent) {
505    if let Some(obs) = observer {
506        obs.on_event(&event);
507    }
508}
509
510/// Emit a workspace event to an optional observer.
511///
512/// Convenience function that wraps a WorkspaceEvent in a VoidEvent.
513///
514/// # Arguments
515///
516/// * `observer` - Optional observer to emit to.
517/// * `event` - Workspace event to emit.
518pub fn emit_workspace(observer: &Option<Arc<dyn VoidObserver>>, event: WorkspaceEvent) {
519    emit(observer, VoidEvent::Workspace(event));
520}
521
522/// Emit an ops event to an optional observer.
523///
524/// Convenience function that wraps an OpsEvent in a VoidEvent.
525///
526/// # Arguments
527///
528/// * `observer` - Optional observer to emit to.
529/// * `event` - Ops event to emit.
530pub fn emit_ops(observer: &Option<Arc<dyn VoidObserver>>, event: OpsEvent) {
531    emit(observer, VoidEvent::Ops(event));
532}
533
534/// Emit a pipeline event to an optional observer.
535///
536/// Convenience function that wraps a PipelineEvent in a VoidEvent.
537///
538/// # Arguments
539///
540/// * `observer` - Optional observer to emit to.
541/// * `event` - Pipeline event to emit.
542pub fn emit_pipeline(observer: &Option<Arc<dyn VoidObserver>>, event: PipelineEvent) {
543    emit(observer, VoidEvent::Pipeline(event));
544}
545
546/// Emit a P2P event to an optional observer.
547///
548/// Convenience function that wraps a P2PEvent in a VoidEvent.
549///
550/// # Arguments
551///
552/// * `observer` - Optional observer to emit to.
553/// * `event` - P2P event to emit.
554pub fn emit_p2p(observer: &Option<Arc<dyn VoidObserver>>, event: P2PEvent) {
555    emit(observer, VoidEvent::P2P(event));
556}
557
558// ============================================================================
559// Legacy Adapter
560// ============================================================================
561
562/// Adapter for legacy PipelineObserver compatibility.
563///
564/// This struct wraps a VoidObserver and can be used where the legacy
565/// PipelineObserver trait is expected. The actual PipelineObserver impl
566/// is in pipeline/events.rs to avoid circular dependencies.
567pub struct LegacyPipelineAdapter {
568    observer: Arc<dyn VoidObserver>,
569}
570
571impl LegacyPipelineAdapter {
572    /// Create a new legacy adapter wrapping a VoidObserver.
573    ///
574    /// # Arguments
575    ///
576    /// * `observer` - The VoidObserver to wrap.
577    pub fn new(observer: Arc<dyn VoidObserver>) -> Self {
578        Self { observer }
579    }
580
581    /// Get a reference to the inner observer.
582    pub fn inner(&self) -> &Arc<dyn VoidObserver> {
583        &self.observer
584    }
585
586    /// Emit a pipeline event through the wrapped observer.
587    ///
588    /// This method converts the legacy PipelineEvent to the unified
589    /// event system and forwards it to the wrapped observer.
590    pub fn emit(&self, event: PipelineEvent) {
591        self.observer.on_event(&VoidEvent::Pipeline(event));
592    }
593}
594
595impl std::fmt::Debug for LegacyPipelineAdapter {
596    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
597        f.debug_struct("LegacyPipelineAdapter").finish()
598    }
599}