chie_core/
events.rs

1//! Event bus for internal pub/sub communication.
2//!
3//! This module provides a lightweight event system for decoupled communication
4//! between different parts of the CHIE node. It supports both sync and async
5//! event buses, event filtering, and event batching.
6//!
7//! # Features
8//!
9//! - **Sync Event Bus**: Standard multi-producer, multi-consumer channels
10//! - **Async Event Bus**: Tokio broadcast channels for async/await code
11//! - **Event Filtering**: Filter events by type, timestamp, or payload
12//! - **Event Batching**: Efficient bulk event processing
13//! - **Statistics**: Track event counts and subscriber metrics
14//!
15//! # Example (Sync)
16//!
17//! ```rust
18//! use chie_core::events::{EventBus, Event, EventType};
19//!
20//! let bus = EventBus::new();
21//!
22//! // Subscribe to content events
23//! let rx = bus.subscribe(EventType::ContentAdded);
24//!
25//! // Publish an event
26//! bus.publish(Event::content_added("QmExample123", 1024 * 1024));
27//!
28//! // Receive events (non-blocking)
29//! if let Ok(event) = rx.try_recv() {
30//!     println!("Received: {:?}", event);
31//! }
32//! ```
33//!
34//! # Example (Async)
35//!
36//! ```rust
37//! use chie_core::events::{AsyncEventBus, Event, EventType};
38//!
39//! # async fn example() {
40//! let bus = AsyncEventBus::new(100);
41//! let mut rx = bus.subscribe(EventType::ContentAdded);
42//!
43//! // Publish an event
44//! let _ = bus.publish(Event::content_added("QmExample123", 1024 * 1024));
45//!
46//! // Receive events (async)
47//! if let Ok(event) = rx.recv().await {
48//!     println!("Received: {:?}", event);
49//! }
50//! # }
51//! ```
52//!
53//! # Example (Filtering)
54//!
55//! ```rust
56//! use chie_core::events::{EventFilter, Event, EventType, PayloadFilter};
57//!
58//! let filter = EventFilter::new()
59//!     .with_types(vec![EventType::ContentAdded])
60//!     .with_payload_filter(PayloadFilter::MinBytes(1024 * 1024));
61//!
62//! let event = Event::content_added("QmExample123", 2 * 1024 * 1024);
63//! assert!(filter.matches(&event));
64//! ```
65
66use std::collections::HashMap;
67use std::sync::mpsc::{Receiver, Sender, channel};
68use std::sync::{Arc, Mutex};
69use tokio::sync::broadcast;
70
71/// Event types in the CHIE system.
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
73pub enum EventType {
74    /// Content was added to storage.
75    ContentAdded,
76    /// Content was removed from storage.
77    ContentRemoved,
78    /// Content was requested.
79    ContentRequested,
80    /// Bandwidth proof was generated.
81    ProofGenerated,
82    /// Bandwidth proof was submitted.
83    ProofSubmitted,
84    /// Peer connected.
85    PeerConnected,
86    /// Peer disconnected.
87    PeerDisconnected,
88    /// Peer reputation changed.
89    ReputationChanged,
90    /// Storage quota exceeded.
91    QuotaExceeded,
92    /// Garbage collection completed.
93    GarbageCollected,
94    /// Node started.
95    NodeStarted,
96    /// Node stopped.
97    NodeStopped,
98}
99
100/// Event data.
101#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
102pub struct Event {
103    /// Event type.
104    pub event_type: EventType,
105    /// Timestamp in milliseconds.
106    pub timestamp_ms: i64,
107    /// Event payload.
108    pub payload: EventPayload,
109}
110
111/// Event payload data.
112#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
113pub enum EventPayload {
114    /// Content event (CID, size in bytes).
115    Content { cid: String, size_bytes: u64 },
116    /// Proof event (proof ID, bytes transferred).
117    Proof { proof_id: String, bytes: u64 },
118    /// Peer event (peer ID).
119    Peer { peer_id: String },
120    /// Reputation event (peer ID, old score, new score).
121    Reputation {
122        peer_id: String,
123        old_score: f64,
124        new_score: f64,
125    },
126    /// Quota event (used bytes, max bytes).
127    Quota { used_bytes: u64, max_bytes: u64 },
128    /// Garbage collection event (freed bytes, items removed).
129    GarbageCollection {
130        freed_bytes: u64,
131        items_removed: usize,
132    },
133    /// Node lifecycle event.
134    Node,
135}
136
137impl Event {
138    /// Create a content added event.
139    #[must_use]
140    #[inline]
141    pub fn content_added(cid: impl Into<String>, size_bytes: u64) -> Self {
142        Self {
143            event_type: EventType::ContentAdded,
144            timestamp_ms: crate::utils::current_timestamp_ms(),
145            payload: EventPayload::Content {
146                cid: cid.into(),
147                size_bytes,
148            },
149        }
150    }
151
152    /// Create a content removed event.
153    #[must_use]
154    #[inline]
155    pub fn content_removed(cid: impl Into<String>, size_bytes: u64) -> Self {
156        Self {
157            event_type: EventType::ContentRemoved,
158            timestamp_ms: crate::utils::current_timestamp_ms(),
159            payload: EventPayload::Content {
160                cid: cid.into(),
161                size_bytes,
162            },
163        }
164    }
165
166    /// Create a content requested event.
167    #[must_use]
168    #[inline]
169    pub fn content_requested(cid: impl Into<String>, size_bytes: u64) -> Self {
170        Self {
171            event_type: EventType::ContentRequested,
172            timestamp_ms: crate::utils::current_timestamp_ms(),
173            payload: EventPayload::Content {
174                cid: cid.into(),
175                size_bytes,
176            },
177        }
178    }
179
180    /// Create a proof generated event.
181    #[must_use]
182    #[inline]
183    pub fn proof_generated(proof_id: impl Into<String>, bytes: u64) -> Self {
184        Self {
185            event_type: EventType::ProofGenerated,
186            timestamp_ms: crate::utils::current_timestamp_ms(),
187            payload: EventPayload::Proof {
188                proof_id: proof_id.into(),
189                bytes,
190            },
191        }
192    }
193
194    /// Create a proof submitted event.
195    #[must_use]
196    #[inline]
197    pub fn proof_submitted(proof_id: impl Into<String>, bytes: u64) -> Self {
198        Self {
199            event_type: EventType::ProofSubmitted,
200            timestamp_ms: crate::utils::current_timestamp_ms(),
201            payload: EventPayload::Proof {
202                proof_id: proof_id.into(),
203                bytes,
204            },
205        }
206    }
207
208    /// Create a peer connected event.
209    #[must_use]
210    #[inline]
211    pub fn peer_connected(peer_id: impl Into<String>) -> Self {
212        Self {
213            event_type: EventType::PeerConnected,
214            timestamp_ms: crate::utils::current_timestamp_ms(),
215            payload: EventPayload::Peer {
216                peer_id: peer_id.into(),
217            },
218        }
219    }
220
221    /// Create a peer disconnected event.
222    #[must_use]
223    #[inline]
224    pub fn peer_disconnected(peer_id: impl Into<String>) -> Self {
225        Self {
226            event_type: EventType::PeerDisconnected,
227            timestamp_ms: crate::utils::current_timestamp_ms(),
228            payload: EventPayload::Peer {
229                peer_id: peer_id.into(),
230            },
231        }
232    }
233
234    /// Create a reputation changed event.
235    #[must_use]
236    #[inline]
237    pub fn reputation_changed(peer_id: impl Into<String>, old_score: f64, new_score: f64) -> Self {
238        Self {
239            event_type: EventType::ReputationChanged,
240            timestamp_ms: crate::utils::current_timestamp_ms(),
241            payload: EventPayload::Reputation {
242                peer_id: peer_id.into(),
243                old_score,
244                new_score,
245            },
246        }
247    }
248
249    /// Create a quota exceeded event.
250    #[must_use]
251    #[inline]
252    pub fn quota_exceeded(used_bytes: u64, max_bytes: u64) -> Self {
253        Self {
254            event_type: EventType::QuotaExceeded,
255            timestamp_ms: crate::utils::current_timestamp_ms(),
256            payload: EventPayload::Quota {
257                used_bytes,
258                max_bytes,
259            },
260        }
261    }
262
263    /// Create a garbage collected event.
264    #[must_use]
265    #[inline]
266    pub fn garbage_collected(freed_bytes: u64, items_removed: usize) -> Self {
267        Self {
268            event_type: EventType::GarbageCollected,
269            timestamp_ms: crate::utils::current_timestamp_ms(),
270            payload: EventPayload::GarbageCollection {
271                freed_bytes,
272                items_removed,
273            },
274        }
275    }
276
277    /// Create a node started event.
278    #[must_use]
279    #[inline]
280    pub fn node_started() -> Self {
281        Self {
282            event_type: EventType::NodeStarted,
283            timestamp_ms: crate::utils::current_timestamp_ms(),
284            payload: EventPayload::Node,
285        }
286    }
287
288    /// Create a node stopped event.
289    #[must_use]
290    #[inline]
291    pub fn node_stopped() -> Self {
292        Self {
293            event_type: EventType::NodeStopped,
294            timestamp_ms: crate::utils::current_timestamp_ms(),
295            payload: EventPayload::Node,
296        }
297    }
298}
299
300/// Event bus for pub/sub communication.
301pub struct EventBus {
302    subscribers: Arc<Mutex<HashMap<EventType, Vec<Sender<Event>>>>>,
303    stats: Arc<Mutex<EventStats>>,
304}
305
306impl EventBus {
307    /// Create a new event bus.
308    #[must_use]
309    #[inline]
310    pub fn new() -> Self {
311        Self {
312            subscribers: Arc::new(Mutex::new(HashMap::new())),
313            stats: Arc::new(Mutex::new(EventStats::default())),
314        }
315    }
316
317    /// Subscribe to events of a specific type.
318    #[inline]
319    #[must_use]
320    pub fn subscribe(&self, event_type: EventType) -> Receiver<Event> {
321        let (tx, rx) = channel();
322        let mut subs = self.subscribers.lock().unwrap();
323        subs.entry(event_type).or_default().push(tx);
324        rx
325    }
326
327    /// Publish an event to all subscribers.
328    pub fn publish(&self, event: Event) {
329        let event_type = event.event_type;
330
331        // Update statistics
332        {
333            let mut stats = self.stats.lock().unwrap();
334            stats.total_events += 1;
335            *stats.events_by_type.entry(event_type).or_insert(0) += 1;
336        }
337
338        // Send to subscribers
339        let mut subs = self.subscribers.lock().unwrap();
340        if let Some(subscribers) = subs.get_mut(&event_type) {
341            // Remove disconnected subscribers
342            subscribers.retain(|tx| tx.send(event.clone()).is_ok());
343
344            // Update subscriber count
345            self.stats.lock().unwrap().active_subscribers = subs.values().map(|v| v.len()).sum();
346        }
347    }
348
349    /// Get event bus statistics.
350    #[must_use]
351    #[inline]
352    pub fn stats(&self) -> EventStats {
353        self.stats.lock().unwrap().clone()
354    }
355
356    /// Reset statistics.
357    #[inline]
358    pub fn reset_stats(&self) {
359        *self.stats.lock().unwrap() = EventStats::default();
360    }
361
362    /// Get number of subscribers for an event type.
363    #[must_use]
364    #[inline]
365    pub fn subscriber_count(&self, event_type: EventType) -> usize {
366        self.subscribers
367            .lock()
368            .unwrap()
369            .get(&event_type)
370            .map(|v| v.len())
371            .unwrap_or(0)
372    }
373
374    /// Clear all subscribers.
375    #[inline]
376    pub fn clear_subscribers(&self) {
377        self.subscribers.lock().unwrap().clear();
378        self.stats.lock().unwrap().active_subscribers = 0;
379    }
380}
381
382impl Default for EventBus {
383    fn default() -> Self {
384        Self::new()
385    }
386}
387
388/// Event bus statistics.
389#[derive(Debug, Clone, Default)]
390pub struct EventStats {
391    /// Total events published.
392    pub total_events: u64,
393    /// Events by type.
394    pub events_by_type: HashMap<EventType, u64>,
395    /// Active subscribers.
396    pub active_subscribers: usize,
397}
398
399impl EventStats {
400    /// Get the most common event type.
401    #[inline]
402    #[must_use]
403    pub fn most_common_event(&self) -> Option<(EventType, u64)> {
404        self.events_by_type
405            .iter()
406            .max_by_key(|(_, count)| *count)
407            .map(|(t, c)| (*t, *c))
408    }
409
410    /// Get event count for a specific type.
411    #[must_use]
412    #[inline]
413    pub fn event_count(&self, event_type: EventType) -> u64 {
414        self.events_by_type.get(&event_type).copied().unwrap_or(0)
415    }
416}
417
418/// Async event bus using tokio broadcast channels.
419///
420/// This provides a high-performance async-friendly event bus suitable
421/// for use in async/await code with better backpressure handling.
422pub struct AsyncEventBus {
423    broadcasters: Arc<Mutex<HashMap<EventType, broadcast::Sender<Event>>>>,
424    stats: Arc<Mutex<EventStats>>,
425    capacity: usize,
426}
427
428impl AsyncEventBus {
429    /// Create a new async event bus with specified channel capacity.
430    #[must_use]
431    pub fn new(capacity: usize) -> Self {
432        Self {
433            broadcasters: Arc::new(Mutex::new(HashMap::new())),
434            stats: Arc::new(Mutex::new(EventStats::default())),
435            capacity,
436        }
437    }
438
439    /// Subscribe to events of a specific type (async).
440    #[inline]
441    #[must_use]
442    pub fn subscribe(&self, event_type: EventType) -> broadcast::Receiver<Event> {
443        let mut broadcasters = self.broadcasters.lock().unwrap();
444        let tx = broadcasters
445            .entry(event_type)
446            .or_insert_with(|| broadcast::channel(self.capacity).0);
447        tx.subscribe()
448    }
449
450    /// Publish an event to all async subscribers.
451    pub fn publish(&self, event: Event) -> Result<usize, broadcast::error::SendError<Event>> {
452        let event_type = event.event_type;
453
454        // Update statistics
455        {
456            let mut stats = self.stats.lock().unwrap();
457            stats.total_events += 1;
458            *stats.events_by_type.entry(event_type).or_insert(0) += 1;
459        }
460
461        // Send to subscribers
462        let broadcasters = self.broadcasters.lock().unwrap();
463        if let Some(tx) = broadcasters.get(&event_type) {
464            let receiver_count = tx.receiver_count();
465            let _ = tx.send(event);
466            Ok(receiver_count)
467        } else {
468            Ok(0)
469        }
470    }
471
472    /// Get event bus statistics.
473    #[must_use]
474    #[inline]
475    pub fn stats(&self) -> EventStats {
476        self.stats.lock().unwrap().clone()
477    }
478
479    /// Reset statistics.
480    #[inline]
481    pub fn reset_stats(&self) {
482        *self.stats.lock().unwrap() = EventStats::default();
483    }
484
485    /// Get number of active receivers for an event type.
486    #[inline]
487    #[must_use]
488    pub fn receiver_count(&self, event_type: EventType) -> usize {
489        self.broadcasters
490            .lock()
491            .unwrap()
492            .get(&event_type)
493            .map(|tx| tx.receiver_count())
494            .unwrap_or(0)
495    }
496}
497
498impl Default for AsyncEventBus {
499    fn default() -> Self {
500        Self::new(100) // Default capacity
501    }
502}
503
504/// Event filter for selective event processing.
505#[derive(Debug, Clone)]
506pub struct EventFilter {
507    /// Allowed event types (None = all allowed).
508    pub allowed_types: Option<Vec<EventType>>,
509    /// Minimum timestamp (milliseconds).
510    pub min_timestamp: Option<i64>,
511    /// Filter by payload content.
512    pub payload_filter: Option<PayloadFilter>,
513}
514
515/// Payload filter criteria.
516#[derive(Debug, Clone)]
517pub enum PayloadFilter {
518    /// Filter by CID prefix.
519    CidPrefix(String),
520    /// Filter by peer ID.
521    PeerId(String),
522    /// Filter by minimum bytes.
523    MinBytes(u64),
524}
525
526impl EventFilter {
527    /// Create a new empty filter (allows all events).
528    #[must_use]
529    pub fn new() -> Self {
530        Self {
531            allowed_types: None,
532            min_timestamp: None,
533            payload_filter: None,
534        }
535    }
536
537    /// Set allowed event types.
538    #[must_use]
539    pub fn with_types(mut self, types: Vec<EventType>) -> Self {
540        self.allowed_types = Some(types);
541        self
542    }
543
544    /// Set minimum timestamp.
545    #[must_use]
546    pub fn with_min_timestamp(mut self, timestamp: i64) -> Self {
547        self.min_timestamp = Some(timestamp);
548        self
549    }
550
551    /// Set payload filter.
552    #[must_use]
553    pub fn with_payload_filter(mut self, filter: PayloadFilter) -> Self {
554        self.payload_filter = Some(filter);
555        self
556    }
557
558    /// Check if an event matches this filter.
559    #[inline]
560    #[must_use]
561    pub fn matches(&self, event: &Event) -> bool {
562        // Check event type
563        if let Some(ref allowed) = self.allowed_types {
564            if !allowed.contains(&event.event_type) {
565                return false;
566            }
567        }
568
569        // Check timestamp
570        if let Some(min_ts) = self.min_timestamp {
571            if event.timestamp_ms < min_ts {
572                return false;
573            }
574        }
575
576        // Check payload filter
577        if let Some(ref pf) = self.payload_filter {
578            let matches = match pf {
579                PayloadFilter::CidPrefix(prefix) => {
580                    if let EventPayload::Content { cid, .. } = &event.payload {
581                        cid.starts_with(prefix)
582                    } else {
583                        false
584                    }
585                }
586                PayloadFilter::PeerId(peer_id) => match &event.payload {
587                    EventPayload::Peer { peer_id: p } => p == peer_id,
588                    EventPayload::Reputation { peer_id: p, .. } => p == peer_id,
589                    _ => false,
590                },
591                PayloadFilter::MinBytes(min_bytes) => match &event.payload {
592                    EventPayload::Content { size_bytes, .. } => size_bytes >= min_bytes,
593                    EventPayload::Proof { bytes, .. } => bytes >= min_bytes,
594                    _ => false,
595                },
596            };
597            if !matches {
598                return false;
599            }
600        }
601
602        true
603    }
604}
605
606impl Default for EventFilter {
607    fn default() -> Self {
608        Self::new()
609    }
610}
611
612/// Event batch for efficient bulk processing.
613#[derive(Debug, Clone)]
614pub struct EventBatch {
615    /// Events in this batch.
616    pub events: Vec<Event>,
617    /// Batch creation timestamp.
618    pub created_at: i64,
619}
620
621impl EventBatch {
622    /// Create a new event batch.
623    #[must_use]
624    pub fn new() -> Self {
625        Self {
626            events: Vec::new(),
627            created_at: crate::utils::current_timestamp_ms(),
628        }
629    }
630
631    /// Add an event to the batch.
632    #[inline]
633    pub fn add(&mut self, event: Event) {
634        self.events.push(event);
635    }
636
637    /// Get the number of events in the batch.
638    #[must_use]
639    #[inline]
640    pub fn len(&self) -> usize {
641        self.events.len()
642    }
643
644    /// Check if the batch is empty.
645    #[must_use]
646    #[inline]
647    pub fn is_empty(&self) -> bool {
648        self.events.is_empty()
649    }
650
651    /// Get total bytes across all events in the batch.
652    #[must_use]
653    #[inline]
654    pub fn total_bytes(&self) -> u64 {
655        self.events
656            .iter()
657            .filter_map(|e| match &e.payload {
658                EventPayload::Content { size_bytes, .. } => Some(*size_bytes),
659                EventPayload::Proof { bytes, .. } => Some(*bytes),
660                EventPayload::GarbageCollection { freed_bytes, .. } => Some(*freed_bytes),
661                _ => None,
662            })
663            .sum()
664    }
665
666    /// Filter events in the batch.
667    #[inline]
668    #[must_use]
669    pub fn filter(&self, filter: &EventFilter) -> Vec<Event> {
670        self.events
671            .iter()
672            .filter(|e| filter.matches(e))
673            .cloned()
674            .collect()
675    }
676}
677
678impl Default for EventBatch {
679    fn default() -> Self {
680        Self::new()
681    }
682}
683
684/// Event persistence and replay system.
685///
686/// Stores events to disk in JSON Lines format (one JSON object per line)
687/// for efficient append-only writes and sequential replay.
688pub struct EventStore {
689    file_path: std::path::PathBuf,
690    file: Arc<Mutex<Option<std::fs::File>>>,
691    events_written: Arc<Mutex<u64>>,
692}
693
694impl EventStore {
695    /// Create a new event store at the specified file path.
696    ///
697    /// # Arguments
698    ///
699    /// * `file_path` - Path where events will be stored (JSON Lines format)
700    ///
701    /// # Returns
702    ///
703    /// Returns `Ok(EventStore)` if the file can be created/opened, or an error otherwise.
704    pub fn new<P: Into<std::path::PathBuf>>(file_path: P) -> std::io::Result<Self> {
705        let file_path = file_path.into();
706
707        // Create parent directory if it doesn't exist
708        if let Some(parent) = file_path.parent() {
709            std::fs::create_dir_all(parent)?;
710        }
711
712        // Open file in append mode
713        let file = std::fs::OpenOptions::new()
714            .create(true)
715            .append(true)
716            .open(&file_path)?;
717
718        Ok(Self {
719            file_path,
720            file: Arc::new(Mutex::new(Some(file))),
721            events_written: Arc::new(Mutex::new(0)),
722        })
723    }
724
725    /// Persist an event to disk.
726    ///
727    /// Events are written in JSON Lines format (one JSON object per line)
728    /// for efficient append-only writes.
729    ///
730    /// # Arguments
731    ///
732    /// * `event` - The event to persist
733    ///
734    /// # Returns
735    ///
736    /// Returns `Ok(())` if the event was successfully written, or an error otherwise.
737    pub fn persist(&self, event: &Event) -> std::io::Result<()> {
738        use std::io::Write;
739
740        let mut file_guard = self.file.lock().unwrap();
741        if let Some(file) = file_guard.as_mut() {
742            // Serialize event to JSON
743            let json = serde_json::to_string(event)
744                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
745
746            // Write JSON line
747            writeln!(file, "{}", json)?;
748            file.flush()?;
749
750            // Update counter
751            let mut count = self.events_written.lock().unwrap();
752            *count += 1;
753
754            Ok(())
755        } else {
756            Err(std::io::Error::other("Event store is closed"))
757        }
758    }
759
760    /// Persist multiple events in a batch.
761    ///
762    /// More efficient than calling `persist()` multiple times individually.
763    ///
764    /// # Arguments
765    ///
766    /// * `events` - Iterator of events to persist
767    ///
768    /// # Returns
769    ///
770    /// Returns `Ok(count)` with the number of events written, or an error if any write fails.
771    pub fn persist_batch<I>(&self, events: I) -> std::io::Result<usize>
772    where
773        I: IntoIterator<Item = Event>,
774    {
775        use std::io::Write;
776
777        let mut file_guard = self.file.lock().unwrap();
778        if let Some(file) = file_guard.as_mut() {
779            let mut count = 0;
780
781            for event in events {
782                let json = serde_json::to_string(&event)
783                    .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
784                writeln!(file, "{}", json)?;
785                count += 1;
786            }
787
788            file.flush()?;
789
790            // Update counter
791            let mut total = self.events_written.lock().unwrap();
792            *total += count as u64;
793
794            Ok(count)
795        } else {
796            Err(std::io::Error::other("Event store is closed"))
797        }
798    }
799
800    /// Get the number of events written to this store.
801    #[must_use]
802    #[inline]
803    pub fn events_written(&self) -> u64 {
804        *self.events_written.lock().unwrap()
805    }
806
807    /// Get the file path of this event store.
808    #[must_use]
809    #[inline]
810    pub fn file_path(&self) -> &std::path::Path {
811        &self.file_path
812    }
813
814    /// Close the event store, flushing any remaining data.
815    pub fn close(&self) -> std::io::Result<()> {
816        use std::io::Write;
817
818        let mut file_guard = self.file.lock().unwrap();
819        if let Some(mut file) = file_guard.take() {
820            file.flush()?;
821        }
822        Ok(())
823    }
824}
825
826/// Event replay system for reading persisted events.
827pub struct EventReplay {
828    file_path: std::path::PathBuf,
829}
830
831impl EventReplay {
832    /// Create a new event replay from the specified file path.
833    ///
834    /// # Arguments
835    ///
836    /// * `file_path` - Path to the event store file (JSON Lines format)
837    #[must_use]
838    pub fn new<P: Into<std::path::PathBuf>>(file_path: P) -> Self {
839        Self {
840            file_path: file_path.into(),
841        }
842    }
843
844    /// Replay all events from the store.
845    ///
846    /// Reads the entire event log and returns all events in chronological order.
847    ///
848    /// # Returns
849    ///
850    /// Returns `Ok(Vec<Event>)` with all events, or an error if reading fails.
851    pub fn replay_all(&self) -> std::io::Result<Vec<Event>> {
852        use std::io::{BufRead, BufReader};
853
854        let file = std::fs::File::open(&self.file_path)?;
855        let reader = BufReader::new(file);
856        let mut events = Vec::new();
857
858        for (line_num, line) in reader.lines().enumerate() {
859            let line = line?;
860            if line.trim().is_empty() {
861                continue; // Skip empty lines
862            }
863
864            let event: Event = serde_json::from_str(&line).map_err(|e| {
865                std::io::Error::new(
866                    std::io::ErrorKind::InvalidData,
867                    format!("Failed to parse event at line {}: {}", line_num + 1, e),
868                )
869            })?;
870            events.push(event);
871        }
872
873        Ok(events)
874    }
875
876    /// Replay events matching a specific filter.
877    ///
878    /// Only returns events that match the provided filter criteria.
879    ///
880    /// # Arguments
881    ///
882    /// * `filter` - Event filter to apply during replay
883    ///
884    /// # Returns
885    ///
886    /// Returns `Ok(Vec<Event>)` with matching events, or an error if reading fails.
887    pub fn replay_filtered(&self, filter: &EventFilter) -> std::io::Result<Vec<Event>> {
888        let all_events = self.replay_all()?;
889        Ok(all_events
890            .into_iter()
891            .filter(|e| filter.matches(e))
892            .collect())
893    }
894
895    /// Replay events since a specific timestamp.
896    ///
897    /// Returns only events that occurred after the specified timestamp.
898    ///
899    /// # Arguments
900    ///
901    /// * `since_timestamp_ms` - Timestamp in milliseconds
902    ///
903    /// # Returns
904    ///
905    /// Returns `Ok(Vec<Event>)` with events since the timestamp, or an error if reading fails.
906    pub fn replay_since(&self, since_timestamp_ms: i64) -> std::io::Result<Vec<Event>> {
907        let filter = EventFilter::new().with_min_timestamp(since_timestamp_ms);
908        self.replay_filtered(&filter)
909    }
910
911    /// Count the total number of events in the store.
912    ///
913    /// # Returns
914    ///
915    /// Returns `Ok(count)` with the number of events, or an error if reading fails.
916    pub fn count_events(&self) -> std::io::Result<usize> {
917        use std::io::{BufRead, BufReader};
918
919        let file = std::fs::File::open(&self.file_path)?;
920        let reader = BufReader::new(file);
921        Ok(reader
922            .lines()
923            .filter(|l| l.as_ref().is_ok_and(|line| !line.trim().is_empty()))
924            .count())
925    }
926
927    /// Check if the event store file exists.
928    #[must_use]
929    #[inline]
930    pub fn exists(&self) -> bool {
931        self.file_path.exists()
932    }
933
934    /// Get the file path of the event store.
935    #[must_use]
936    #[inline]
937    pub fn file_path(&self) -> &std::path::Path {
938        &self.file_path
939    }
940}
941
942#[cfg(test)]
943mod tests {
944    use super::*;
945
946    #[test]
947    fn test_event_bus_creation() {
948        let bus = EventBus::new();
949        let stats = bus.stats();
950        assert_eq!(stats.total_events, 0);
951        assert_eq!(stats.active_subscribers, 0);
952    }
953
954    #[test]
955    fn test_subscribe_and_publish() {
956        let bus = EventBus::new();
957        let rx = bus.subscribe(EventType::ContentAdded);
958
959        bus.publish(Event::content_added("QmTest", 1024));
960
961        let event = rx.try_recv().unwrap();
962        assert_eq!(event.event_type, EventType::ContentAdded);
963    }
964
965    #[test]
966    fn test_multiple_subscribers() {
967        let bus = EventBus::new();
968        let rx1 = bus.subscribe(EventType::ContentAdded);
969        let rx2 = bus.subscribe(EventType::ContentAdded);
970
971        assert_eq!(bus.subscriber_count(EventType::ContentAdded), 2);
972
973        bus.publish(Event::content_added("QmTest", 1024));
974
975        assert!(rx1.try_recv().is_ok());
976        assert!(rx2.try_recv().is_ok());
977    }
978
979    #[test]
980    fn test_event_type_filtering() {
981        let bus = EventBus::new();
982        let rx_content = bus.subscribe(EventType::ContentAdded);
983        let rx_peer = bus.subscribe(EventType::PeerConnected);
984
985        bus.publish(Event::content_added("QmTest", 1024));
986
987        assert!(rx_content.try_recv().is_ok());
988        assert!(rx_peer.try_recv().is_err()); // Should not receive
989    }
990
991    #[test]
992    fn test_event_creation_helpers() {
993        let event = Event::content_added("QmTest", 1024);
994        assert_eq!(event.event_type, EventType::ContentAdded);
995
996        let event = Event::peer_connected("peer1");
997        assert_eq!(event.event_type, EventType::PeerConnected);
998
999        let event = Event::proof_generated("proof1", 2048);
1000        assert_eq!(event.event_type, EventType::ProofGenerated);
1001    }
1002
1003    #[test]
1004    fn test_statistics_tracking() {
1005        let bus = EventBus::new();
1006
1007        bus.publish(Event::content_added("QmTest1", 1024));
1008        bus.publish(Event::content_added("QmTest2", 2048));
1009        bus.publish(Event::peer_connected("peer1"));
1010
1011        let stats = bus.stats();
1012        assert_eq!(stats.total_events, 3);
1013        assert_eq!(stats.event_count(EventType::ContentAdded), 2);
1014        assert_eq!(stats.event_count(EventType::PeerConnected), 1);
1015    }
1016
1017    #[test]
1018    fn test_most_common_event() {
1019        let bus = EventBus::new();
1020
1021        bus.publish(Event::content_added("QmTest1", 1024));
1022        bus.publish(Event::content_added("QmTest2", 2048));
1023        bus.publish(Event::peer_connected("peer1"));
1024
1025        let stats = bus.stats();
1026        let (event_type, count) = stats.most_common_event().unwrap();
1027        assert_eq!(event_type, EventType::ContentAdded);
1028        assert_eq!(count, 2);
1029    }
1030
1031    #[test]
1032    fn test_reset_stats() {
1033        let bus = EventBus::new();
1034        bus.publish(Event::content_added("QmTest", 1024));
1035
1036        assert_eq!(bus.stats().total_events, 1);
1037
1038        bus.reset_stats();
1039        assert_eq!(bus.stats().total_events, 0);
1040    }
1041
1042    #[test]
1043    fn test_clear_subscribers() {
1044        let bus = EventBus::new();
1045        let _rx1 = bus.subscribe(EventType::ContentAdded);
1046        let _rx2 = bus.subscribe(EventType::ContentAdded);
1047
1048        assert_eq!(bus.subscriber_count(EventType::ContentAdded), 2);
1049
1050        bus.clear_subscribers();
1051        assert_eq!(bus.subscriber_count(EventType::ContentAdded), 0);
1052    }
1053
1054    #[test]
1055    fn test_reputation_changed_event() {
1056        let event = Event::reputation_changed("peer1", 0.5, 0.8);
1057        assert_eq!(event.event_type, EventType::ReputationChanged);
1058
1059        if let EventPayload::Reputation {
1060            peer_id,
1061            old_score,
1062            new_score,
1063        } = event.payload
1064        {
1065            assert_eq!(peer_id, "peer1");
1066            assert_eq!(old_score, 0.5);
1067            assert_eq!(new_score, 0.8);
1068        } else {
1069            panic!("Wrong payload type");
1070        }
1071    }
1072
1073    #[test]
1074    fn test_quota_exceeded_event() {
1075        let event = Event::quota_exceeded(1000, 500);
1076        assert_eq!(event.event_type, EventType::QuotaExceeded);
1077    }
1078
1079    #[test]
1080    fn test_garbage_collected_event() {
1081        let event = Event::garbage_collected(1024 * 1024, 5);
1082        assert_eq!(event.event_type, EventType::GarbageCollected);
1083
1084        if let EventPayload::GarbageCollection {
1085            freed_bytes,
1086            items_removed,
1087        } = event.payload
1088        {
1089            assert_eq!(freed_bytes, 1024 * 1024);
1090            assert_eq!(items_removed, 5);
1091        } else {
1092            panic!("Wrong payload type");
1093        }
1094    }
1095
1096    #[test]
1097    fn test_node_lifecycle_events() {
1098        let started = Event::node_started();
1099        assert_eq!(started.event_type, EventType::NodeStarted);
1100
1101        let stopped = Event::node_stopped();
1102        assert_eq!(stopped.event_type, EventType::NodeStopped);
1103    }
1104
1105    #[tokio::test]
1106    async fn test_async_event_bus() {
1107        let bus = AsyncEventBus::new(10);
1108        let mut rx = bus.subscribe(EventType::ContentAdded);
1109
1110        let event = Event::content_added("QmTest", 1024);
1111        let result = bus.publish(event.clone());
1112        assert!(result.is_ok());
1113
1114        let received = rx.recv().await.unwrap();
1115        assert_eq!(received.event_type, EventType::ContentAdded);
1116    }
1117
1118    #[tokio::test]
1119    async fn test_async_event_bus_multiple_receivers() {
1120        let bus = AsyncEventBus::new(10);
1121        let mut rx1 = bus.subscribe(EventType::ContentAdded);
1122        let mut rx2 = bus.subscribe(EventType::ContentAdded);
1123
1124        assert_eq!(bus.receiver_count(EventType::ContentAdded), 2);
1125
1126        let event = Event::content_added("QmTest", 1024);
1127        let _ = bus.publish(event);
1128
1129        assert!(rx1.recv().await.is_ok());
1130        assert!(rx2.recv().await.is_ok());
1131    }
1132
1133    #[tokio::test]
1134    async fn test_async_event_bus_stats() {
1135        let bus = AsyncEventBus::new(10);
1136        let _rx = bus.subscribe(EventType::ContentAdded);
1137
1138        let _ = bus.publish(Event::content_added("QmTest1", 1024));
1139        let _ = bus.publish(Event::content_added("QmTest2", 2048));
1140
1141        let stats = bus.stats();
1142        assert_eq!(stats.total_events, 2);
1143        assert_eq!(stats.event_count(EventType::ContentAdded), 2);
1144    }
1145
1146    #[test]
1147    fn test_event_filter_type() {
1148        let filter =
1149            EventFilter::new().with_types(vec![EventType::ContentAdded, EventType::ContentRemoved]);
1150
1151        let event1 = Event::content_added("QmTest", 1024);
1152        assert!(filter.matches(&event1));
1153
1154        let event2 = Event::peer_connected("peer1");
1155        assert!(!filter.matches(&event2));
1156    }
1157
1158    #[test]
1159    fn test_event_filter_timestamp() {
1160        let now = crate::utils::current_timestamp_ms();
1161        let filter = EventFilter::new().with_min_timestamp(now);
1162
1163        let mut old_event = Event::content_added("QmTest", 1024);
1164        old_event.timestamp_ms = now - 1000;
1165        assert!(!filter.matches(&old_event));
1166
1167        let new_event = Event::content_added("QmTest", 1024);
1168        assert!(filter.matches(&new_event));
1169    }
1170
1171    #[test]
1172    fn test_event_filter_cid_prefix() {
1173        let filter =
1174            EventFilter::new().with_payload_filter(PayloadFilter::CidPrefix("Qm".to_string()));
1175
1176        let event1 = Event::content_added("QmTest123", 1024);
1177        assert!(filter.matches(&event1));
1178
1179        let event2 = Event::content_added("Bafytest", 1024);
1180        assert!(!filter.matches(&event2));
1181    }
1182
1183    #[test]
1184    fn test_event_filter_peer_id() {
1185        let filter =
1186            EventFilter::new().with_payload_filter(PayloadFilter::PeerId("peer1".to_string()));
1187
1188        let event1 = Event::peer_connected("peer1");
1189        assert!(filter.matches(&event1));
1190
1191        let event2 = Event::peer_connected("peer2");
1192        assert!(!filter.matches(&event2));
1193
1194        let event3 = Event::reputation_changed("peer1", 0.5, 0.8);
1195        assert!(filter.matches(&event3));
1196    }
1197
1198    #[test]
1199    fn test_event_filter_min_bytes() {
1200        let filter = EventFilter::new().with_payload_filter(PayloadFilter::MinBytes(2048));
1201
1202        let event1 = Event::content_added("QmTest", 4096);
1203        assert!(filter.matches(&event1));
1204
1205        let event2 = Event::content_added("QmTest", 1024);
1206        assert!(!filter.matches(&event2));
1207
1208        let event3 = Event::proof_generated("proof1", 3072);
1209        assert!(filter.matches(&event3));
1210    }
1211
1212    #[test]
1213    fn test_event_batch() {
1214        let mut batch = EventBatch::new();
1215        assert!(batch.is_empty());
1216
1217        batch.add(Event::content_added("QmTest1", 1024));
1218        batch.add(Event::content_added("QmTest2", 2048));
1219        batch.add(Event::peer_connected("peer1"));
1220
1221        assert_eq!(batch.len(), 3);
1222        assert!(!batch.is_empty());
1223        assert_eq!(batch.total_bytes(), 3072);
1224    }
1225
1226    #[test]
1227    fn test_event_batch_filter() {
1228        let mut batch = EventBatch::new();
1229        batch.add(Event::content_added("QmTest1", 1024));
1230        batch.add(Event::content_added("QmTest2", 2048));
1231        batch.add(Event::peer_connected("peer1"));
1232
1233        let filter = EventFilter::new().with_types(vec![EventType::ContentAdded]);
1234        let filtered = batch.filter(&filter);
1235
1236        assert_eq!(filtered.len(), 2);
1237    }
1238
1239    #[test]
1240    fn test_event_batch_total_bytes() {
1241        let mut batch = EventBatch::new();
1242        batch.add(Event::content_added("QmTest", 1024));
1243        batch.add(Event::proof_generated("proof1", 2048));
1244        batch.add(Event::garbage_collected(512, 3));
1245        batch.add(Event::peer_connected("peer1")); // No bytes
1246
1247        assert_eq!(batch.total_bytes(), 3584); // 1024 + 2048 + 512
1248    }
1249
1250    #[test]
1251    fn test_event_store_creation() {
1252        let temp_dir = std::env::temp_dir();
1253        let store_path = temp_dir.join("test_event_store_creation.jsonl");
1254
1255        // Clean up any existing file
1256        let _ = std::fs::remove_file(&store_path);
1257
1258        let store = EventStore::new(&store_path).unwrap();
1259        assert_eq!(store.events_written(), 0);
1260        assert_eq!(store.file_path(), store_path.as_path());
1261
1262        // Clean up
1263        let _ = std::fs::remove_file(&store_path);
1264    }
1265
1266    #[test]
1267    fn test_event_store_persist() {
1268        let temp_dir = std::env::temp_dir();
1269        let store_path = temp_dir.join("test_event_store_persist.jsonl");
1270
1271        // Clean up any existing file
1272        let _ = std::fs::remove_file(&store_path);
1273
1274        let store = EventStore::new(&store_path).unwrap();
1275        let event = Event::content_added("QmTest123", 1024);
1276
1277        store.persist(&event).unwrap();
1278        assert_eq!(store.events_written(), 1);
1279
1280        store.close().unwrap();
1281
1282        // Verify file exists and has content
1283        let content = std::fs::read_to_string(&store_path).unwrap();
1284        assert!(!content.is_empty());
1285        assert!(content.contains("QmTest123"));
1286
1287        // Clean up
1288        let _ = std::fs::remove_file(&store_path);
1289    }
1290
1291    #[test]
1292    fn test_event_store_persist_batch() {
1293        let temp_dir = std::env::temp_dir();
1294        let store_path = temp_dir.join("test_event_store_persist_batch.jsonl");
1295
1296        // Clean up any existing file
1297        let _ = std::fs::remove_file(&store_path);
1298
1299        let store = EventStore::new(&store_path).unwrap();
1300        let events = vec![
1301            Event::content_added("QmTest1", 1024),
1302            Event::content_added("QmTest2", 2048),
1303            Event::peer_connected("peer1"),
1304        ];
1305
1306        let count = store.persist_batch(events).unwrap();
1307        assert_eq!(count, 3);
1308        assert_eq!(store.events_written(), 3);
1309
1310        store.close().unwrap();
1311
1312        // Clean up
1313        let _ = std::fs::remove_file(&store_path);
1314    }
1315
1316    #[test]
1317    fn test_event_replay_all() {
1318        let temp_dir = std::env::temp_dir();
1319        let store_path = temp_dir.join("test_event_replay_all.jsonl");
1320
1321        // Clean up any existing file
1322        let _ = std::fs::remove_file(&store_path);
1323
1324        // Write some events
1325        let store = EventStore::new(&store_path).unwrap();
1326        let events = vec![
1327            Event::content_added("QmTest1", 1024),
1328            Event::content_added("QmTest2", 2048),
1329            Event::peer_connected("peer1"),
1330        ];
1331        store.persist_batch(events).unwrap();
1332        store.close().unwrap();
1333
1334        // Replay events
1335        let replay = EventReplay::new(&store_path);
1336        assert!(replay.exists());
1337
1338        let replayed = replay.replay_all().unwrap();
1339        assert_eq!(replayed.len(), 3);
1340        assert_eq!(replayed[0].event_type, EventType::ContentAdded);
1341        assert_eq!(replayed[1].event_type, EventType::ContentAdded);
1342        assert_eq!(replayed[2].event_type, EventType::PeerConnected);
1343
1344        // Clean up
1345        let _ = std::fs::remove_file(&store_path);
1346    }
1347
1348    #[test]
1349    fn test_event_replay_filtered() {
1350        let temp_dir = std::env::temp_dir();
1351        let store_path = temp_dir.join("test_event_replay_filtered.jsonl");
1352
1353        // Clean up any existing file
1354        let _ = std::fs::remove_file(&store_path);
1355
1356        // Write some events
1357        let store = EventStore::new(&store_path).unwrap();
1358        let events = vec![
1359            Event::content_added("QmTest1", 1024),
1360            Event::content_added("QmTest2", 2048),
1361            Event::peer_connected("peer1"),
1362            Event::proof_generated("proof1", 512),
1363        ];
1364        store.persist_batch(events).unwrap();
1365        store.close().unwrap();
1366
1367        // Replay with filter
1368        let replay = EventReplay::new(&store_path);
1369        let filter = EventFilter::new().with_types(vec![EventType::ContentAdded]);
1370        let filtered = replay.replay_filtered(&filter).unwrap();
1371
1372        assert_eq!(filtered.len(), 2);
1373        assert!(
1374            filtered
1375                .iter()
1376                .all(|e| e.event_type == EventType::ContentAdded)
1377        );
1378
1379        // Clean up
1380        let _ = std::fs::remove_file(&store_path);
1381    }
1382
1383    #[test]
1384    fn test_event_replay_since() {
1385        let temp_dir = std::env::temp_dir();
1386        let store_path = temp_dir.join("test_event_replay_since.jsonl");
1387
1388        // Clean up any existing file
1389        let _ = std::fs::remove_file(&store_path);
1390
1391        // Write some events with known timestamps
1392        let store = EventStore::new(&store_path).unwrap();
1393        let now = crate::utils::current_timestamp_ms();
1394
1395        let mut old_event = Event::content_added("QmOld", 1024);
1396        old_event.timestamp_ms = now - 10000;
1397
1398        let mut new_event = Event::content_added("QmNew", 2048);
1399        new_event.timestamp_ms = now + 1000;
1400
1401        store.persist(&old_event).unwrap();
1402        store.persist(&new_event).unwrap();
1403        store.close().unwrap();
1404
1405        // Replay events since timestamp
1406        let replay = EventReplay::new(&store_path);
1407        let recent = replay.replay_since(now).unwrap();
1408
1409        assert_eq!(recent.len(), 1);
1410        if let EventPayload::Content { cid, .. } = &recent[0].payload {
1411            assert_eq!(cid, "QmNew");
1412        } else {
1413            panic!("Expected Content payload");
1414        }
1415
1416        // Clean up
1417        let _ = std::fs::remove_file(&store_path);
1418    }
1419
1420    #[test]
1421    fn test_event_replay_count() {
1422        let temp_dir = std::env::temp_dir();
1423        let store_path = temp_dir.join("test_event_replay_count.jsonl");
1424
1425        // Clean up any existing file
1426        let _ = std::fs::remove_file(&store_path);
1427
1428        // Write some events
1429        let store = EventStore::new(&store_path).unwrap();
1430        let events = vec![
1431            Event::content_added("QmTest1", 1024),
1432            Event::content_added("QmTest2", 2048),
1433            Event::peer_connected("peer1"),
1434        ];
1435        store.persist_batch(events).unwrap();
1436        store.close().unwrap();
1437
1438        // Count events
1439        let replay = EventReplay::new(&store_path);
1440        let count = replay.count_events().unwrap();
1441        assert_eq!(count, 3);
1442
1443        // Clean up
1444        let _ = std::fs::remove_file(&store_path);
1445    }
1446}