1use std::collections::HashMap;
67use std::sync::mpsc::{Receiver, Sender, channel};
68use std::sync::{Arc, Mutex};
69use tokio::sync::broadcast;
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
73pub enum EventType {
74 ContentAdded,
76 ContentRemoved,
78 ContentRequested,
80 ProofGenerated,
82 ProofSubmitted,
84 PeerConnected,
86 PeerDisconnected,
88 ReputationChanged,
90 QuotaExceeded,
92 GarbageCollected,
94 NodeStarted,
96 NodeStopped,
98}
99
100#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
102pub struct Event {
103 pub event_type: EventType,
105 pub timestamp_ms: i64,
107 pub payload: EventPayload,
109}
110
111#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
113pub enum EventPayload {
114 Content { cid: String, size_bytes: u64 },
116 Proof { proof_id: String, bytes: u64 },
118 Peer { peer_id: String },
120 Reputation {
122 peer_id: String,
123 old_score: f64,
124 new_score: f64,
125 },
126 Quota { used_bytes: u64, max_bytes: u64 },
128 GarbageCollection {
130 freed_bytes: u64,
131 items_removed: usize,
132 },
133 Node,
135}
136
137impl Event {
138 #[must_use]
140 #[inline]
141 pub fn content_added(cid: impl Into<String>, size_bytes: u64) -> Self {
142 Self {
143 event_type: EventType::ContentAdded,
144 timestamp_ms: crate::utils::current_timestamp_ms(),
145 payload: EventPayload::Content {
146 cid: cid.into(),
147 size_bytes,
148 },
149 }
150 }
151
152 #[must_use]
154 #[inline]
155 pub fn content_removed(cid: impl Into<String>, size_bytes: u64) -> Self {
156 Self {
157 event_type: EventType::ContentRemoved,
158 timestamp_ms: crate::utils::current_timestamp_ms(),
159 payload: EventPayload::Content {
160 cid: cid.into(),
161 size_bytes,
162 },
163 }
164 }
165
166 #[must_use]
168 #[inline]
169 pub fn content_requested(cid: impl Into<String>, size_bytes: u64) -> Self {
170 Self {
171 event_type: EventType::ContentRequested,
172 timestamp_ms: crate::utils::current_timestamp_ms(),
173 payload: EventPayload::Content {
174 cid: cid.into(),
175 size_bytes,
176 },
177 }
178 }
179
180 #[must_use]
182 #[inline]
183 pub fn proof_generated(proof_id: impl Into<String>, bytes: u64) -> Self {
184 Self {
185 event_type: EventType::ProofGenerated,
186 timestamp_ms: crate::utils::current_timestamp_ms(),
187 payload: EventPayload::Proof {
188 proof_id: proof_id.into(),
189 bytes,
190 },
191 }
192 }
193
194 #[must_use]
196 #[inline]
197 pub fn proof_submitted(proof_id: impl Into<String>, bytes: u64) -> Self {
198 Self {
199 event_type: EventType::ProofSubmitted,
200 timestamp_ms: crate::utils::current_timestamp_ms(),
201 payload: EventPayload::Proof {
202 proof_id: proof_id.into(),
203 bytes,
204 },
205 }
206 }
207
208 #[must_use]
210 #[inline]
211 pub fn peer_connected(peer_id: impl Into<String>) -> Self {
212 Self {
213 event_type: EventType::PeerConnected,
214 timestamp_ms: crate::utils::current_timestamp_ms(),
215 payload: EventPayload::Peer {
216 peer_id: peer_id.into(),
217 },
218 }
219 }
220
221 #[must_use]
223 #[inline]
224 pub fn peer_disconnected(peer_id: impl Into<String>) -> Self {
225 Self {
226 event_type: EventType::PeerDisconnected,
227 timestamp_ms: crate::utils::current_timestamp_ms(),
228 payload: EventPayload::Peer {
229 peer_id: peer_id.into(),
230 },
231 }
232 }
233
234 #[must_use]
236 #[inline]
237 pub fn reputation_changed(peer_id: impl Into<String>, old_score: f64, new_score: f64) -> Self {
238 Self {
239 event_type: EventType::ReputationChanged,
240 timestamp_ms: crate::utils::current_timestamp_ms(),
241 payload: EventPayload::Reputation {
242 peer_id: peer_id.into(),
243 old_score,
244 new_score,
245 },
246 }
247 }
248
249 #[must_use]
251 #[inline]
252 pub fn quota_exceeded(used_bytes: u64, max_bytes: u64) -> Self {
253 Self {
254 event_type: EventType::QuotaExceeded,
255 timestamp_ms: crate::utils::current_timestamp_ms(),
256 payload: EventPayload::Quota {
257 used_bytes,
258 max_bytes,
259 },
260 }
261 }
262
263 #[must_use]
265 #[inline]
266 pub fn garbage_collected(freed_bytes: u64, items_removed: usize) -> Self {
267 Self {
268 event_type: EventType::GarbageCollected,
269 timestamp_ms: crate::utils::current_timestamp_ms(),
270 payload: EventPayload::GarbageCollection {
271 freed_bytes,
272 items_removed,
273 },
274 }
275 }
276
277 #[must_use]
279 #[inline]
280 pub fn node_started() -> Self {
281 Self {
282 event_type: EventType::NodeStarted,
283 timestamp_ms: crate::utils::current_timestamp_ms(),
284 payload: EventPayload::Node,
285 }
286 }
287
288 #[must_use]
290 #[inline]
291 pub fn node_stopped() -> Self {
292 Self {
293 event_type: EventType::NodeStopped,
294 timestamp_ms: crate::utils::current_timestamp_ms(),
295 payload: EventPayload::Node,
296 }
297 }
298}
299
300pub struct EventBus {
302 subscribers: Arc<Mutex<HashMap<EventType, Vec<Sender<Event>>>>>,
303 stats: Arc<Mutex<EventStats>>,
304}
305
306impl EventBus {
307 #[must_use]
309 #[inline]
310 pub fn new() -> Self {
311 Self {
312 subscribers: Arc::new(Mutex::new(HashMap::new())),
313 stats: Arc::new(Mutex::new(EventStats::default())),
314 }
315 }
316
317 #[inline]
319 #[must_use]
320 pub fn subscribe(&self, event_type: EventType) -> Receiver<Event> {
321 let (tx, rx) = channel();
322 let mut subs = self.subscribers.lock().unwrap();
323 subs.entry(event_type).or_default().push(tx);
324 rx
325 }
326
327 pub fn publish(&self, event: Event) {
329 let event_type = event.event_type;
330
331 {
333 let mut stats = self.stats.lock().unwrap();
334 stats.total_events += 1;
335 *stats.events_by_type.entry(event_type).or_insert(0) += 1;
336 }
337
338 let mut subs = self.subscribers.lock().unwrap();
340 if let Some(subscribers) = subs.get_mut(&event_type) {
341 subscribers.retain(|tx| tx.send(event.clone()).is_ok());
343
344 self.stats.lock().unwrap().active_subscribers = subs.values().map(|v| v.len()).sum();
346 }
347 }
348
349 #[must_use]
351 #[inline]
352 pub fn stats(&self) -> EventStats {
353 self.stats.lock().unwrap().clone()
354 }
355
356 #[inline]
358 pub fn reset_stats(&self) {
359 *self.stats.lock().unwrap() = EventStats::default();
360 }
361
362 #[must_use]
364 #[inline]
365 pub fn subscriber_count(&self, event_type: EventType) -> usize {
366 self.subscribers
367 .lock()
368 .unwrap()
369 .get(&event_type)
370 .map(|v| v.len())
371 .unwrap_or(0)
372 }
373
374 #[inline]
376 pub fn clear_subscribers(&self) {
377 self.subscribers.lock().unwrap().clear();
378 self.stats.lock().unwrap().active_subscribers = 0;
379 }
380}
381
382impl Default for EventBus {
383 fn default() -> Self {
384 Self::new()
385 }
386}
387
388#[derive(Debug, Clone, Default)]
390pub struct EventStats {
391 pub total_events: u64,
393 pub events_by_type: HashMap<EventType, u64>,
395 pub active_subscribers: usize,
397}
398
399impl EventStats {
400 #[inline]
402 #[must_use]
403 pub fn most_common_event(&self) -> Option<(EventType, u64)> {
404 self.events_by_type
405 .iter()
406 .max_by_key(|(_, count)| *count)
407 .map(|(t, c)| (*t, *c))
408 }
409
410 #[must_use]
412 #[inline]
413 pub fn event_count(&self, event_type: EventType) -> u64 {
414 self.events_by_type.get(&event_type).copied().unwrap_or(0)
415 }
416}
417
418pub struct AsyncEventBus {
423 broadcasters: Arc<Mutex<HashMap<EventType, broadcast::Sender<Event>>>>,
424 stats: Arc<Mutex<EventStats>>,
425 capacity: usize,
426}
427
428impl AsyncEventBus {
429 #[must_use]
431 pub fn new(capacity: usize) -> Self {
432 Self {
433 broadcasters: Arc::new(Mutex::new(HashMap::new())),
434 stats: Arc::new(Mutex::new(EventStats::default())),
435 capacity,
436 }
437 }
438
439 #[inline]
441 #[must_use]
442 pub fn subscribe(&self, event_type: EventType) -> broadcast::Receiver<Event> {
443 let mut broadcasters = self.broadcasters.lock().unwrap();
444 let tx = broadcasters
445 .entry(event_type)
446 .or_insert_with(|| broadcast::channel(self.capacity).0);
447 tx.subscribe()
448 }
449
450 pub fn publish(&self, event: Event) -> Result<usize, broadcast::error::SendError<Event>> {
452 let event_type = event.event_type;
453
454 {
456 let mut stats = self.stats.lock().unwrap();
457 stats.total_events += 1;
458 *stats.events_by_type.entry(event_type).or_insert(0) += 1;
459 }
460
461 let broadcasters = self.broadcasters.lock().unwrap();
463 if let Some(tx) = broadcasters.get(&event_type) {
464 let receiver_count = tx.receiver_count();
465 let _ = tx.send(event);
466 Ok(receiver_count)
467 } else {
468 Ok(0)
469 }
470 }
471
472 #[must_use]
474 #[inline]
475 pub fn stats(&self) -> EventStats {
476 self.stats.lock().unwrap().clone()
477 }
478
479 #[inline]
481 pub fn reset_stats(&self) {
482 *self.stats.lock().unwrap() = EventStats::default();
483 }
484
485 #[inline]
487 #[must_use]
488 pub fn receiver_count(&self, event_type: EventType) -> usize {
489 self.broadcasters
490 .lock()
491 .unwrap()
492 .get(&event_type)
493 .map(|tx| tx.receiver_count())
494 .unwrap_or(0)
495 }
496}
497
498impl Default for AsyncEventBus {
499 fn default() -> Self {
500 Self::new(100) }
502}
503
504#[derive(Debug, Clone)]
506pub struct EventFilter {
507 pub allowed_types: Option<Vec<EventType>>,
509 pub min_timestamp: Option<i64>,
511 pub payload_filter: Option<PayloadFilter>,
513}
514
515#[derive(Debug, Clone)]
517pub enum PayloadFilter {
518 CidPrefix(String),
520 PeerId(String),
522 MinBytes(u64),
524}
525
526impl EventFilter {
527 #[must_use]
529 pub fn new() -> Self {
530 Self {
531 allowed_types: None,
532 min_timestamp: None,
533 payload_filter: None,
534 }
535 }
536
537 #[must_use]
539 pub fn with_types(mut self, types: Vec<EventType>) -> Self {
540 self.allowed_types = Some(types);
541 self
542 }
543
544 #[must_use]
546 pub fn with_min_timestamp(mut self, timestamp: i64) -> Self {
547 self.min_timestamp = Some(timestamp);
548 self
549 }
550
551 #[must_use]
553 pub fn with_payload_filter(mut self, filter: PayloadFilter) -> Self {
554 self.payload_filter = Some(filter);
555 self
556 }
557
558 #[inline]
560 #[must_use]
561 pub fn matches(&self, event: &Event) -> bool {
562 if let Some(ref allowed) = self.allowed_types {
564 if !allowed.contains(&event.event_type) {
565 return false;
566 }
567 }
568
569 if let Some(min_ts) = self.min_timestamp {
571 if event.timestamp_ms < min_ts {
572 return false;
573 }
574 }
575
576 if let Some(ref pf) = self.payload_filter {
578 let matches = match pf {
579 PayloadFilter::CidPrefix(prefix) => {
580 if let EventPayload::Content { cid, .. } = &event.payload {
581 cid.starts_with(prefix)
582 } else {
583 false
584 }
585 }
586 PayloadFilter::PeerId(peer_id) => match &event.payload {
587 EventPayload::Peer { peer_id: p } => p == peer_id,
588 EventPayload::Reputation { peer_id: p, .. } => p == peer_id,
589 _ => false,
590 },
591 PayloadFilter::MinBytes(min_bytes) => match &event.payload {
592 EventPayload::Content { size_bytes, .. } => size_bytes >= min_bytes,
593 EventPayload::Proof { bytes, .. } => bytes >= min_bytes,
594 _ => false,
595 },
596 };
597 if !matches {
598 return false;
599 }
600 }
601
602 true
603 }
604}
605
606impl Default for EventFilter {
607 fn default() -> Self {
608 Self::new()
609 }
610}
611
612#[derive(Debug, Clone)]
614pub struct EventBatch {
615 pub events: Vec<Event>,
617 pub created_at: i64,
619}
620
621impl EventBatch {
622 #[must_use]
624 pub fn new() -> Self {
625 Self {
626 events: Vec::new(),
627 created_at: crate::utils::current_timestamp_ms(),
628 }
629 }
630
631 #[inline]
633 pub fn add(&mut self, event: Event) {
634 self.events.push(event);
635 }
636
637 #[must_use]
639 #[inline]
640 pub fn len(&self) -> usize {
641 self.events.len()
642 }
643
644 #[must_use]
646 #[inline]
647 pub fn is_empty(&self) -> bool {
648 self.events.is_empty()
649 }
650
651 #[must_use]
653 #[inline]
654 pub fn total_bytes(&self) -> u64 {
655 self.events
656 .iter()
657 .filter_map(|e| match &e.payload {
658 EventPayload::Content { size_bytes, .. } => Some(*size_bytes),
659 EventPayload::Proof { bytes, .. } => Some(*bytes),
660 EventPayload::GarbageCollection { freed_bytes, .. } => Some(*freed_bytes),
661 _ => None,
662 })
663 .sum()
664 }
665
666 #[inline]
668 #[must_use]
669 pub fn filter(&self, filter: &EventFilter) -> Vec<Event> {
670 self.events
671 .iter()
672 .filter(|e| filter.matches(e))
673 .cloned()
674 .collect()
675 }
676}
677
678impl Default for EventBatch {
679 fn default() -> Self {
680 Self::new()
681 }
682}
683
684pub struct EventStore {
689 file_path: std::path::PathBuf,
690 file: Arc<Mutex<Option<std::fs::File>>>,
691 events_written: Arc<Mutex<u64>>,
692}
693
694impl EventStore {
695 pub fn new<P: Into<std::path::PathBuf>>(file_path: P) -> std::io::Result<Self> {
705 let file_path = file_path.into();
706
707 if let Some(parent) = file_path.parent() {
709 std::fs::create_dir_all(parent)?;
710 }
711
712 let file = std::fs::OpenOptions::new()
714 .create(true)
715 .append(true)
716 .open(&file_path)?;
717
718 Ok(Self {
719 file_path,
720 file: Arc::new(Mutex::new(Some(file))),
721 events_written: Arc::new(Mutex::new(0)),
722 })
723 }
724
725 pub fn persist(&self, event: &Event) -> std::io::Result<()> {
738 use std::io::Write;
739
740 let mut file_guard = self.file.lock().unwrap();
741 if let Some(file) = file_guard.as_mut() {
742 let json = serde_json::to_string(event)
744 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
745
746 writeln!(file, "{}", json)?;
748 file.flush()?;
749
750 let mut count = self.events_written.lock().unwrap();
752 *count += 1;
753
754 Ok(())
755 } else {
756 Err(std::io::Error::other("Event store is closed"))
757 }
758 }
759
760 pub fn persist_batch<I>(&self, events: I) -> std::io::Result<usize>
772 where
773 I: IntoIterator<Item = Event>,
774 {
775 use std::io::Write;
776
777 let mut file_guard = self.file.lock().unwrap();
778 if let Some(file) = file_guard.as_mut() {
779 let mut count = 0;
780
781 for event in events {
782 let json = serde_json::to_string(&event)
783 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
784 writeln!(file, "{}", json)?;
785 count += 1;
786 }
787
788 file.flush()?;
789
790 let mut total = self.events_written.lock().unwrap();
792 *total += count as u64;
793
794 Ok(count)
795 } else {
796 Err(std::io::Error::other("Event store is closed"))
797 }
798 }
799
800 #[must_use]
802 #[inline]
803 pub fn events_written(&self) -> u64 {
804 *self.events_written.lock().unwrap()
805 }
806
807 #[must_use]
809 #[inline]
810 pub fn file_path(&self) -> &std::path::Path {
811 &self.file_path
812 }
813
814 pub fn close(&self) -> std::io::Result<()> {
816 use std::io::Write;
817
818 let mut file_guard = self.file.lock().unwrap();
819 if let Some(mut file) = file_guard.take() {
820 file.flush()?;
821 }
822 Ok(())
823 }
824}
825
826pub struct EventReplay {
828 file_path: std::path::PathBuf,
829}
830
831impl EventReplay {
832 #[must_use]
838 pub fn new<P: Into<std::path::PathBuf>>(file_path: P) -> Self {
839 Self {
840 file_path: file_path.into(),
841 }
842 }
843
844 pub fn replay_all(&self) -> std::io::Result<Vec<Event>> {
852 use std::io::{BufRead, BufReader};
853
854 let file = std::fs::File::open(&self.file_path)?;
855 let reader = BufReader::new(file);
856 let mut events = Vec::new();
857
858 for (line_num, line) in reader.lines().enumerate() {
859 let line = line?;
860 if line.trim().is_empty() {
861 continue; }
863
864 let event: Event = serde_json::from_str(&line).map_err(|e| {
865 std::io::Error::new(
866 std::io::ErrorKind::InvalidData,
867 format!("Failed to parse event at line {}: {}", line_num + 1, e),
868 )
869 })?;
870 events.push(event);
871 }
872
873 Ok(events)
874 }
875
876 pub fn replay_filtered(&self, filter: &EventFilter) -> std::io::Result<Vec<Event>> {
888 let all_events = self.replay_all()?;
889 Ok(all_events
890 .into_iter()
891 .filter(|e| filter.matches(e))
892 .collect())
893 }
894
895 pub fn replay_since(&self, since_timestamp_ms: i64) -> std::io::Result<Vec<Event>> {
907 let filter = EventFilter::new().with_min_timestamp(since_timestamp_ms);
908 self.replay_filtered(&filter)
909 }
910
911 pub fn count_events(&self) -> std::io::Result<usize> {
917 use std::io::{BufRead, BufReader};
918
919 let file = std::fs::File::open(&self.file_path)?;
920 let reader = BufReader::new(file);
921 Ok(reader
922 .lines()
923 .filter(|l| l.as_ref().is_ok_and(|line| !line.trim().is_empty()))
924 .count())
925 }
926
927 #[must_use]
929 #[inline]
930 pub fn exists(&self) -> bool {
931 self.file_path.exists()
932 }
933
934 #[must_use]
936 #[inline]
937 pub fn file_path(&self) -> &std::path::Path {
938 &self.file_path
939 }
940}
941
942#[cfg(test)]
943mod tests {
944 use super::*;
945
946 #[test]
947 fn test_event_bus_creation() {
948 let bus = EventBus::new();
949 let stats = bus.stats();
950 assert_eq!(stats.total_events, 0);
951 assert_eq!(stats.active_subscribers, 0);
952 }
953
954 #[test]
955 fn test_subscribe_and_publish() {
956 let bus = EventBus::new();
957 let rx = bus.subscribe(EventType::ContentAdded);
958
959 bus.publish(Event::content_added("QmTest", 1024));
960
961 let event = rx.try_recv().unwrap();
962 assert_eq!(event.event_type, EventType::ContentAdded);
963 }
964
965 #[test]
966 fn test_multiple_subscribers() {
967 let bus = EventBus::new();
968 let rx1 = bus.subscribe(EventType::ContentAdded);
969 let rx2 = bus.subscribe(EventType::ContentAdded);
970
971 assert_eq!(bus.subscriber_count(EventType::ContentAdded), 2);
972
973 bus.publish(Event::content_added("QmTest", 1024));
974
975 assert!(rx1.try_recv().is_ok());
976 assert!(rx2.try_recv().is_ok());
977 }
978
979 #[test]
980 fn test_event_type_filtering() {
981 let bus = EventBus::new();
982 let rx_content = bus.subscribe(EventType::ContentAdded);
983 let rx_peer = bus.subscribe(EventType::PeerConnected);
984
985 bus.publish(Event::content_added("QmTest", 1024));
986
987 assert!(rx_content.try_recv().is_ok());
988 assert!(rx_peer.try_recv().is_err()); }
990
991 #[test]
992 fn test_event_creation_helpers() {
993 let event = Event::content_added("QmTest", 1024);
994 assert_eq!(event.event_type, EventType::ContentAdded);
995
996 let event = Event::peer_connected("peer1");
997 assert_eq!(event.event_type, EventType::PeerConnected);
998
999 let event = Event::proof_generated("proof1", 2048);
1000 assert_eq!(event.event_type, EventType::ProofGenerated);
1001 }
1002
1003 #[test]
1004 fn test_statistics_tracking() {
1005 let bus = EventBus::new();
1006
1007 bus.publish(Event::content_added("QmTest1", 1024));
1008 bus.publish(Event::content_added("QmTest2", 2048));
1009 bus.publish(Event::peer_connected("peer1"));
1010
1011 let stats = bus.stats();
1012 assert_eq!(stats.total_events, 3);
1013 assert_eq!(stats.event_count(EventType::ContentAdded), 2);
1014 assert_eq!(stats.event_count(EventType::PeerConnected), 1);
1015 }
1016
1017 #[test]
1018 fn test_most_common_event() {
1019 let bus = EventBus::new();
1020
1021 bus.publish(Event::content_added("QmTest1", 1024));
1022 bus.publish(Event::content_added("QmTest2", 2048));
1023 bus.publish(Event::peer_connected("peer1"));
1024
1025 let stats = bus.stats();
1026 let (event_type, count) = stats.most_common_event().unwrap();
1027 assert_eq!(event_type, EventType::ContentAdded);
1028 assert_eq!(count, 2);
1029 }
1030
1031 #[test]
1032 fn test_reset_stats() {
1033 let bus = EventBus::new();
1034 bus.publish(Event::content_added("QmTest", 1024));
1035
1036 assert_eq!(bus.stats().total_events, 1);
1037
1038 bus.reset_stats();
1039 assert_eq!(bus.stats().total_events, 0);
1040 }
1041
1042 #[test]
1043 fn test_clear_subscribers() {
1044 let bus = EventBus::new();
1045 let _rx1 = bus.subscribe(EventType::ContentAdded);
1046 let _rx2 = bus.subscribe(EventType::ContentAdded);
1047
1048 assert_eq!(bus.subscriber_count(EventType::ContentAdded), 2);
1049
1050 bus.clear_subscribers();
1051 assert_eq!(bus.subscriber_count(EventType::ContentAdded), 0);
1052 }
1053
1054 #[test]
1055 fn test_reputation_changed_event() {
1056 let event = Event::reputation_changed("peer1", 0.5, 0.8);
1057 assert_eq!(event.event_type, EventType::ReputationChanged);
1058
1059 if let EventPayload::Reputation {
1060 peer_id,
1061 old_score,
1062 new_score,
1063 } = event.payload
1064 {
1065 assert_eq!(peer_id, "peer1");
1066 assert_eq!(old_score, 0.5);
1067 assert_eq!(new_score, 0.8);
1068 } else {
1069 panic!("Wrong payload type");
1070 }
1071 }
1072
1073 #[test]
1074 fn test_quota_exceeded_event() {
1075 let event = Event::quota_exceeded(1000, 500);
1076 assert_eq!(event.event_type, EventType::QuotaExceeded);
1077 }
1078
1079 #[test]
1080 fn test_garbage_collected_event() {
1081 let event = Event::garbage_collected(1024 * 1024, 5);
1082 assert_eq!(event.event_type, EventType::GarbageCollected);
1083
1084 if let EventPayload::GarbageCollection {
1085 freed_bytes,
1086 items_removed,
1087 } = event.payload
1088 {
1089 assert_eq!(freed_bytes, 1024 * 1024);
1090 assert_eq!(items_removed, 5);
1091 } else {
1092 panic!("Wrong payload type");
1093 }
1094 }
1095
1096 #[test]
1097 fn test_node_lifecycle_events() {
1098 let started = Event::node_started();
1099 assert_eq!(started.event_type, EventType::NodeStarted);
1100
1101 let stopped = Event::node_stopped();
1102 assert_eq!(stopped.event_type, EventType::NodeStopped);
1103 }
1104
1105 #[tokio::test]
1106 async fn test_async_event_bus() {
1107 let bus = AsyncEventBus::new(10);
1108 let mut rx = bus.subscribe(EventType::ContentAdded);
1109
1110 let event = Event::content_added("QmTest", 1024);
1111 let result = bus.publish(event.clone());
1112 assert!(result.is_ok());
1113
1114 let received = rx.recv().await.unwrap();
1115 assert_eq!(received.event_type, EventType::ContentAdded);
1116 }
1117
1118 #[tokio::test]
1119 async fn test_async_event_bus_multiple_receivers() {
1120 let bus = AsyncEventBus::new(10);
1121 let mut rx1 = bus.subscribe(EventType::ContentAdded);
1122 let mut rx2 = bus.subscribe(EventType::ContentAdded);
1123
1124 assert_eq!(bus.receiver_count(EventType::ContentAdded), 2);
1125
1126 let event = Event::content_added("QmTest", 1024);
1127 let _ = bus.publish(event);
1128
1129 assert!(rx1.recv().await.is_ok());
1130 assert!(rx2.recv().await.is_ok());
1131 }
1132
1133 #[tokio::test]
1134 async fn test_async_event_bus_stats() {
1135 let bus = AsyncEventBus::new(10);
1136 let _rx = bus.subscribe(EventType::ContentAdded);
1137
1138 let _ = bus.publish(Event::content_added("QmTest1", 1024));
1139 let _ = bus.publish(Event::content_added("QmTest2", 2048));
1140
1141 let stats = bus.stats();
1142 assert_eq!(stats.total_events, 2);
1143 assert_eq!(stats.event_count(EventType::ContentAdded), 2);
1144 }
1145
1146 #[test]
1147 fn test_event_filter_type() {
1148 let filter =
1149 EventFilter::new().with_types(vec![EventType::ContentAdded, EventType::ContentRemoved]);
1150
1151 let event1 = Event::content_added("QmTest", 1024);
1152 assert!(filter.matches(&event1));
1153
1154 let event2 = Event::peer_connected("peer1");
1155 assert!(!filter.matches(&event2));
1156 }
1157
1158 #[test]
1159 fn test_event_filter_timestamp() {
1160 let now = crate::utils::current_timestamp_ms();
1161 let filter = EventFilter::new().with_min_timestamp(now);
1162
1163 let mut old_event = Event::content_added("QmTest", 1024);
1164 old_event.timestamp_ms = now - 1000;
1165 assert!(!filter.matches(&old_event));
1166
1167 let new_event = Event::content_added("QmTest", 1024);
1168 assert!(filter.matches(&new_event));
1169 }
1170
1171 #[test]
1172 fn test_event_filter_cid_prefix() {
1173 let filter =
1174 EventFilter::new().with_payload_filter(PayloadFilter::CidPrefix("Qm".to_string()));
1175
1176 let event1 = Event::content_added("QmTest123", 1024);
1177 assert!(filter.matches(&event1));
1178
1179 let event2 = Event::content_added("Bafytest", 1024);
1180 assert!(!filter.matches(&event2));
1181 }
1182
1183 #[test]
1184 fn test_event_filter_peer_id() {
1185 let filter =
1186 EventFilter::new().with_payload_filter(PayloadFilter::PeerId("peer1".to_string()));
1187
1188 let event1 = Event::peer_connected("peer1");
1189 assert!(filter.matches(&event1));
1190
1191 let event2 = Event::peer_connected("peer2");
1192 assert!(!filter.matches(&event2));
1193
1194 let event3 = Event::reputation_changed("peer1", 0.5, 0.8);
1195 assert!(filter.matches(&event3));
1196 }
1197
1198 #[test]
1199 fn test_event_filter_min_bytes() {
1200 let filter = EventFilter::new().with_payload_filter(PayloadFilter::MinBytes(2048));
1201
1202 let event1 = Event::content_added("QmTest", 4096);
1203 assert!(filter.matches(&event1));
1204
1205 let event2 = Event::content_added("QmTest", 1024);
1206 assert!(!filter.matches(&event2));
1207
1208 let event3 = Event::proof_generated("proof1", 3072);
1209 assert!(filter.matches(&event3));
1210 }
1211
1212 #[test]
1213 fn test_event_batch() {
1214 let mut batch = EventBatch::new();
1215 assert!(batch.is_empty());
1216
1217 batch.add(Event::content_added("QmTest1", 1024));
1218 batch.add(Event::content_added("QmTest2", 2048));
1219 batch.add(Event::peer_connected("peer1"));
1220
1221 assert_eq!(batch.len(), 3);
1222 assert!(!batch.is_empty());
1223 assert_eq!(batch.total_bytes(), 3072);
1224 }
1225
1226 #[test]
1227 fn test_event_batch_filter() {
1228 let mut batch = EventBatch::new();
1229 batch.add(Event::content_added("QmTest1", 1024));
1230 batch.add(Event::content_added("QmTest2", 2048));
1231 batch.add(Event::peer_connected("peer1"));
1232
1233 let filter = EventFilter::new().with_types(vec![EventType::ContentAdded]);
1234 let filtered = batch.filter(&filter);
1235
1236 assert_eq!(filtered.len(), 2);
1237 }
1238
1239 #[test]
1240 fn test_event_batch_total_bytes() {
1241 let mut batch = EventBatch::new();
1242 batch.add(Event::content_added("QmTest", 1024));
1243 batch.add(Event::proof_generated("proof1", 2048));
1244 batch.add(Event::garbage_collected(512, 3));
1245 batch.add(Event::peer_connected("peer1")); assert_eq!(batch.total_bytes(), 3584); }
1249
1250 #[test]
1251 fn test_event_store_creation() {
1252 let temp_dir = std::env::temp_dir();
1253 let store_path = temp_dir.join("test_event_store_creation.jsonl");
1254
1255 let _ = std::fs::remove_file(&store_path);
1257
1258 let store = EventStore::new(&store_path).unwrap();
1259 assert_eq!(store.events_written(), 0);
1260 assert_eq!(store.file_path(), store_path.as_path());
1261
1262 let _ = std::fs::remove_file(&store_path);
1264 }
1265
1266 #[test]
1267 fn test_event_store_persist() {
1268 let temp_dir = std::env::temp_dir();
1269 let store_path = temp_dir.join("test_event_store_persist.jsonl");
1270
1271 let _ = std::fs::remove_file(&store_path);
1273
1274 let store = EventStore::new(&store_path).unwrap();
1275 let event = Event::content_added("QmTest123", 1024);
1276
1277 store.persist(&event).unwrap();
1278 assert_eq!(store.events_written(), 1);
1279
1280 store.close().unwrap();
1281
1282 let content = std::fs::read_to_string(&store_path).unwrap();
1284 assert!(!content.is_empty());
1285 assert!(content.contains("QmTest123"));
1286
1287 let _ = std::fs::remove_file(&store_path);
1289 }
1290
1291 #[test]
1292 fn test_event_store_persist_batch() {
1293 let temp_dir = std::env::temp_dir();
1294 let store_path = temp_dir.join("test_event_store_persist_batch.jsonl");
1295
1296 let _ = std::fs::remove_file(&store_path);
1298
1299 let store = EventStore::new(&store_path).unwrap();
1300 let events = vec![
1301 Event::content_added("QmTest1", 1024),
1302 Event::content_added("QmTest2", 2048),
1303 Event::peer_connected("peer1"),
1304 ];
1305
1306 let count = store.persist_batch(events).unwrap();
1307 assert_eq!(count, 3);
1308 assert_eq!(store.events_written(), 3);
1309
1310 store.close().unwrap();
1311
1312 let _ = std::fs::remove_file(&store_path);
1314 }
1315
1316 #[test]
1317 fn test_event_replay_all() {
1318 let temp_dir = std::env::temp_dir();
1319 let store_path = temp_dir.join("test_event_replay_all.jsonl");
1320
1321 let _ = std::fs::remove_file(&store_path);
1323
1324 let store = EventStore::new(&store_path).unwrap();
1326 let events = vec![
1327 Event::content_added("QmTest1", 1024),
1328 Event::content_added("QmTest2", 2048),
1329 Event::peer_connected("peer1"),
1330 ];
1331 store.persist_batch(events).unwrap();
1332 store.close().unwrap();
1333
1334 let replay = EventReplay::new(&store_path);
1336 assert!(replay.exists());
1337
1338 let replayed = replay.replay_all().unwrap();
1339 assert_eq!(replayed.len(), 3);
1340 assert_eq!(replayed[0].event_type, EventType::ContentAdded);
1341 assert_eq!(replayed[1].event_type, EventType::ContentAdded);
1342 assert_eq!(replayed[2].event_type, EventType::PeerConnected);
1343
1344 let _ = std::fs::remove_file(&store_path);
1346 }
1347
1348 #[test]
1349 fn test_event_replay_filtered() {
1350 let temp_dir = std::env::temp_dir();
1351 let store_path = temp_dir.join("test_event_replay_filtered.jsonl");
1352
1353 let _ = std::fs::remove_file(&store_path);
1355
1356 let store = EventStore::new(&store_path).unwrap();
1358 let events = vec![
1359 Event::content_added("QmTest1", 1024),
1360 Event::content_added("QmTest2", 2048),
1361 Event::peer_connected("peer1"),
1362 Event::proof_generated("proof1", 512),
1363 ];
1364 store.persist_batch(events).unwrap();
1365 store.close().unwrap();
1366
1367 let replay = EventReplay::new(&store_path);
1369 let filter = EventFilter::new().with_types(vec![EventType::ContentAdded]);
1370 let filtered = replay.replay_filtered(&filter).unwrap();
1371
1372 assert_eq!(filtered.len(), 2);
1373 assert!(
1374 filtered
1375 .iter()
1376 .all(|e| e.event_type == EventType::ContentAdded)
1377 );
1378
1379 let _ = std::fs::remove_file(&store_path);
1381 }
1382
1383 #[test]
1384 fn test_event_replay_since() {
1385 let temp_dir = std::env::temp_dir();
1386 let store_path = temp_dir.join("test_event_replay_since.jsonl");
1387
1388 let _ = std::fs::remove_file(&store_path);
1390
1391 let store = EventStore::new(&store_path).unwrap();
1393 let now = crate::utils::current_timestamp_ms();
1394
1395 let mut old_event = Event::content_added("QmOld", 1024);
1396 old_event.timestamp_ms = now - 10000;
1397
1398 let mut new_event = Event::content_added("QmNew", 2048);
1399 new_event.timestamp_ms = now + 1000;
1400
1401 store.persist(&old_event).unwrap();
1402 store.persist(&new_event).unwrap();
1403 store.close().unwrap();
1404
1405 let replay = EventReplay::new(&store_path);
1407 let recent = replay.replay_since(now).unwrap();
1408
1409 assert_eq!(recent.len(), 1);
1410 if let EventPayload::Content { cid, .. } = &recent[0].payload {
1411 assert_eq!(cid, "QmNew");
1412 } else {
1413 panic!("Expected Content payload");
1414 }
1415
1416 let _ = std::fs::remove_file(&store_path);
1418 }
1419
1420 #[test]
1421 fn test_event_replay_count() {
1422 let temp_dir = std::env::temp_dir();
1423 let store_path = temp_dir.join("test_event_replay_count.jsonl");
1424
1425 let _ = std::fs::remove_file(&store_path);
1427
1428 let store = EventStore::new(&store_path).unwrap();
1430 let events = vec![
1431 Event::content_added("QmTest1", 1024),
1432 Event::content_added("QmTest2", 2048),
1433 Event::peer_connected("peer1"),
1434 ];
1435 store.persist_batch(events).unwrap();
1436 store.close().unwrap();
1437
1438 let replay = EventReplay::new(&store_path);
1440 let count = replay.count_events().unwrap();
1441 assert_eq!(count, 3);
1442
1443 let _ = std::fs::remove_file(&store_path);
1445 }
1446}