use std::collections::HashMap;
use std::sync::mpsc::{Receiver, Sender, channel};
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub enum EventType {
ContentAdded,
ContentRemoved,
ContentRequested,
ProofGenerated,
ProofSubmitted,
PeerConnected,
PeerDisconnected,
ReputationChanged,
QuotaExceeded,
GarbageCollected,
NodeStarted,
NodeStopped,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Event {
pub event_type: EventType,
pub timestamp_ms: i64,
pub payload: EventPayload,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum EventPayload {
Content { cid: String, size_bytes: u64 },
Proof { proof_id: String, bytes: u64 },
Peer { peer_id: String },
Reputation {
peer_id: String,
old_score: f64,
new_score: f64,
},
Quota { used_bytes: u64, max_bytes: u64 },
GarbageCollection {
freed_bytes: u64,
items_removed: usize,
},
Node,
}
impl Event {
#[must_use]
#[inline]
pub fn content_added(cid: impl Into<String>, size_bytes: u64) -> Self {
Self {
event_type: EventType::ContentAdded,
timestamp_ms: crate::utils::current_timestamp_ms(),
payload: EventPayload::Content {
cid: cid.into(),
size_bytes,
},
}
}
#[must_use]
#[inline]
pub fn content_removed(cid: impl Into<String>, size_bytes: u64) -> Self {
Self {
event_type: EventType::ContentRemoved,
timestamp_ms: crate::utils::current_timestamp_ms(),
payload: EventPayload::Content {
cid: cid.into(),
size_bytes,
},
}
}
#[must_use]
#[inline]
pub fn content_requested(cid: impl Into<String>, size_bytes: u64) -> Self {
Self {
event_type: EventType::ContentRequested,
timestamp_ms: crate::utils::current_timestamp_ms(),
payload: EventPayload::Content {
cid: cid.into(),
size_bytes,
},
}
}
#[must_use]
#[inline]
pub fn proof_generated(proof_id: impl Into<String>, bytes: u64) -> Self {
Self {
event_type: EventType::ProofGenerated,
timestamp_ms: crate::utils::current_timestamp_ms(),
payload: EventPayload::Proof {
proof_id: proof_id.into(),
bytes,
},
}
}
#[must_use]
#[inline]
pub fn proof_submitted(proof_id: impl Into<String>, bytes: u64) -> Self {
Self {
event_type: EventType::ProofSubmitted,
timestamp_ms: crate::utils::current_timestamp_ms(),
payload: EventPayload::Proof {
proof_id: proof_id.into(),
bytes,
},
}
}
#[must_use]
#[inline]
pub fn peer_connected(peer_id: impl Into<String>) -> Self {
Self {
event_type: EventType::PeerConnected,
timestamp_ms: crate::utils::current_timestamp_ms(),
payload: EventPayload::Peer {
peer_id: peer_id.into(),
},
}
}
#[must_use]
#[inline]
pub fn peer_disconnected(peer_id: impl Into<String>) -> Self {
Self {
event_type: EventType::PeerDisconnected,
timestamp_ms: crate::utils::current_timestamp_ms(),
payload: EventPayload::Peer {
peer_id: peer_id.into(),
},
}
}
#[must_use]
#[inline]
pub fn reputation_changed(peer_id: impl Into<String>, old_score: f64, new_score: f64) -> Self {
Self {
event_type: EventType::ReputationChanged,
timestamp_ms: crate::utils::current_timestamp_ms(),
payload: EventPayload::Reputation {
peer_id: peer_id.into(),
old_score,
new_score,
},
}
}
#[must_use]
#[inline]
pub fn quota_exceeded(used_bytes: u64, max_bytes: u64) -> Self {
Self {
event_type: EventType::QuotaExceeded,
timestamp_ms: crate::utils::current_timestamp_ms(),
payload: EventPayload::Quota {
used_bytes,
max_bytes,
},
}
}
#[must_use]
#[inline]
pub fn garbage_collected(freed_bytes: u64, items_removed: usize) -> Self {
Self {
event_type: EventType::GarbageCollected,
timestamp_ms: crate::utils::current_timestamp_ms(),
payload: EventPayload::GarbageCollection {
freed_bytes,
items_removed,
},
}
}
#[must_use]
#[inline]
pub fn node_started() -> Self {
Self {
event_type: EventType::NodeStarted,
timestamp_ms: crate::utils::current_timestamp_ms(),
payload: EventPayload::Node,
}
}
#[must_use]
#[inline]
pub fn node_stopped() -> Self {
Self {
event_type: EventType::NodeStopped,
timestamp_ms: crate::utils::current_timestamp_ms(),
payload: EventPayload::Node,
}
}
}
pub struct EventBus {
subscribers: Arc<Mutex<HashMap<EventType, Vec<Sender<Event>>>>>,
stats: Arc<Mutex<EventStats>>,
}
impl EventBus {
#[must_use]
#[inline]
pub fn new() -> Self {
Self {
subscribers: Arc::new(Mutex::new(HashMap::new())),
stats: Arc::new(Mutex::new(EventStats::default())),
}
}
#[inline]
#[must_use]
pub fn subscribe(&self, event_type: EventType) -> Receiver<Event> {
let (tx, rx) = channel();
let mut subs = self.subscribers.lock().unwrap();
subs.entry(event_type).or_default().push(tx);
rx
}
pub fn publish(&self, event: Event) {
let event_type = event.event_type;
{
let mut stats = self.stats.lock().unwrap();
stats.total_events += 1;
*stats.events_by_type.entry(event_type).or_insert(0) += 1;
}
let mut subs = self.subscribers.lock().unwrap();
if let Some(subscribers) = subs.get_mut(&event_type) {
subscribers.retain(|tx| tx.send(event.clone()).is_ok());
self.stats.lock().unwrap().active_subscribers = subs.values().map(|v| v.len()).sum();
}
}
#[must_use]
#[inline]
pub fn stats(&self) -> EventStats {
self.stats.lock().unwrap().clone()
}
#[inline]
pub fn reset_stats(&self) {
*self.stats.lock().unwrap() = EventStats::default();
}
#[must_use]
#[inline]
pub fn subscriber_count(&self, event_type: EventType) -> usize {
self.subscribers
.lock()
.unwrap()
.get(&event_type)
.map(|v| v.len())
.unwrap_or(0)
}
#[inline]
pub fn clear_subscribers(&self) {
self.subscribers.lock().unwrap().clear();
self.stats.lock().unwrap().active_subscribers = 0;
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Default)]
pub struct EventStats {
pub total_events: u64,
pub events_by_type: HashMap<EventType, u64>,
pub active_subscribers: usize,
}
impl EventStats {
#[inline]
#[must_use]
pub fn most_common_event(&self) -> Option<(EventType, u64)> {
self.events_by_type
.iter()
.max_by_key(|(_, count)| *count)
.map(|(t, c)| (*t, *c))
}
#[must_use]
#[inline]
pub fn event_count(&self, event_type: EventType) -> u64 {
self.events_by_type.get(&event_type).copied().unwrap_or(0)
}
}
pub struct AsyncEventBus {
broadcasters: Arc<Mutex<HashMap<EventType, broadcast::Sender<Event>>>>,
stats: Arc<Mutex<EventStats>>,
capacity: usize,
}
impl AsyncEventBus {
#[must_use]
pub fn new(capacity: usize) -> Self {
Self {
broadcasters: Arc::new(Mutex::new(HashMap::new())),
stats: Arc::new(Mutex::new(EventStats::default())),
capacity,
}
}
#[inline]
#[must_use]
pub fn subscribe(&self, event_type: EventType) -> broadcast::Receiver<Event> {
let mut broadcasters = self.broadcasters.lock().unwrap();
let tx = broadcasters
.entry(event_type)
.or_insert_with(|| broadcast::channel(self.capacity).0);
tx.subscribe()
}
pub fn publish(&self, event: Event) -> Result<usize, broadcast::error::SendError<Event>> {
let event_type = event.event_type;
{
let mut stats = self.stats.lock().unwrap();
stats.total_events += 1;
*stats.events_by_type.entry(event_type).or_insert(0) += 1;
}
let broadcasters = self.broadcasters.lock().unwrap();
if let Some(tx) = broadcasters.get(&event_type) {
let receiver_count = tx.receiver_count();
let _ = tx.send(event);
Ok(receiver_count)
} else {
Ok(0)
}
}
#[must_use]
#[inline]
pub fn stats(&self) -> EventStats {
self.stats.lock().unwrap().clone()
}
#[inline]
pub fn reset_stats(&self) {
*self.stats.lock().unwrap() = EventStats::default();
}
#[inline]
#[must_use]
pub fn receiver_count(&self, event_type: EventType) -> usize {
self.broadcasters
.lock()
.unwrap()
.get(&event_type)
.map(|tx| tx.receiver_count())
.unwrap_or(0)
}
}
impl Default for AsyncEventBus {
fn default() -> Self {
Self::new(100) }
}
#[derive(Debug, Clone)]
pub struct EventFilter {
pub allowed_types: Option<Vec<EventType>>,
pub min_timestamp: Option<i64>,
pub payload_filter: Option<PayloadFilter>,
}
#[derive(Debug, Clone)]
pub enum PayloadFilter {
CidPrefix(String),
PeerId(String),
MinBytes(u64),
}
impl EventFilter {
#[must_use]
pub fn new() -> Self {
Self {
allowed_types: None,
min_timestamp: None,
payload_filter: None,
}
}
#[must_use]
pub fn with_types(mut self, types: Vec<EventType>) -> Self {
self.allowed_types = Some(types);
self
}
#[must_use]
pub fn with_min_timestamp(mut self, timestamp: i64) -> Self {
self.min_timestamp = Some(timestamp);
self
}
#[must_use]
pub fn with_payload_filter(mut self, filter: PayloadFilter) -> Self {
self.payload_filter = Some(filter);
self
}
#[inline]
#[must_use]
pub fn matches(&self, event: &Event) -> bool {
if let Some(ref allowed) = self.allowed_types {
if !allowed.contains(&event.event_type) {
return false;
}
}
if let Some(min_ts) = self.min_timestamp {
if event.timestamp_ms < min_ts {
return false;
}
}
if let Some(ref pf) = self.payload_filter {
let matches = match pf {
PayloadFilter::CidPrefix(prefix) => {
if let EventPayload::Content { cid, .. } = &event.payload {
cid.starts_with(prefix)
} else {
false
}
}
PayloadFilter::PeerId(peer_id) => match &event.payload {
EventPayload::Peer { peer_id: p } => p == peer_id,
EventPayload::Reputation { peer_id: p, .. } => p == peer_id,
_ => false,
},
PayloadFilter::MinBytes(min_bytes) => match &event.payload {
EventPayload::Content { size_bytes, .. } => size_bytes >= min_bytes,
EventPayload::Proof { bytes, .. } => bytes >= min_bytes,
_ => false,
},
};
if !matches {
return false;
}
}
true
}
}
impl Default for EventFilter {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct EventBatch {
pub events: Vec<Event>,
pub created_at: i64,
}
impl EventBatch {
#[must_use]
pub fn new() -> Self {
Self {
events: Vec::new(),
created_at: crate::utils::current_timestamp_ms(),
}
}
#[inline]
pub fn add(&mut self, event: Event) {
self.events.push(event);
}
#[must_use]
#[inline]
pub fn len(&self) -> usize {
self.events.len()
}
#[must_use]
#[inline]
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
#[must_use]
#[inline]
pub fn total_bytes(&self) -> u64 {
self.events
.iter()
.filter_map(|e| match &e.payload {
EventPayload::Content { size_bytes, .. } => Some(*size_bytes),
EventPayload::Proof { bytes, .. } => Some(*bytes),
EventPayload::GarbageCollection { freed_bytes, .. } => Some(*freed_bytes),
_ => None,
})
.sum()
}
#[inline]
#[must_use]
pub fn filter(&self, filter: &EventFilter) -> Vec<Event> {
self.events
.iter()
.filter(|e| filter.matches(e))
.cloned()
.collect()
}
}
impl Default for EventBatch {
fn default() -> Self {
Self::new()
}
}
pub struct EventStore {
file_path: std::path::PathBuf,
file: Arc<Mutex<Option<std::fs::File>>>,
events_written: Arc<Mutex<u64>>,
}
impl EventStore {
pub fn new<P: Into<std::path::PathBuf>>(file_path: P) -> std::io::Result<Self> {
let file_path = file_path.into();
if let Some(parent) = file_path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&file_path)?;
Ok(Self {
file_path,
file: Arc::new(Mutex::new(Some(file))),
events_written: Arc::new(Mutex::new(0)),
})
}
pub fn persist(&self, event: &Event) -> std::io::Result<()> {
use std::io::Write;
let mut file_guard = self.file.lock().unwrap();
if let Some(file) = file_guard.as_mut() {
let json = serde_json::to_string(event)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
writeln!(file, "{}", json)?;
file.flush()?;
let mut count = self.events_written.lock().unwrap();
*count += 1;
Ok(())
} else {
Err(std::io::Error::other("Event store is closed"))
}
}
pub fn persist_batch<I>(&self, events: I) -> std::io::Result<usize>
where
I: IntoIterator<Item = Event>,
{
use std::io::Write;
let mut file_guard = self.file.lock().unwrap();
if let Some(file) = file_guard.as_mut() {
let mut count = 0;
for event in events {
let json = serde_json::to_string(&event)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
writeln!(file, "{}", json)?;
count += 1;
}
file.flush()?;
let mut total = self.events_written.lock().unwrap();
*total += count as u64;
Ok(count)
} else {
Err(std::io::Error::other("Event store is closed"))
}
}
#[must_use]
#[inline]
pub fn events_written(&self) -> u64 {
*self.events_written.lock().unwrap()
}
#[must_use]
#[inline]
pub fn file_path(&self) -> &std::path::Path {
&self.file_path
}
pub fn close(&self) -> std::io::Result<()> {
use std::io::Write;
let mut file_guard = self.file.lock().unwrap();
if let Some(mut file) = file_guard.take() {
file.flush()?;
}
Ok(())
}
}
pub struct EventReplay {
file_path: std::path::PathBuf,
}
impl EventReplay {
#[must_use]
pub fn new<P: Into<std::path::PathBuf>>(file_path: P) -> Self {
Self {
file_path: file_path.into(),
}
}
pub fn replay_all(&self) -> std::io::Result<Vec<Event>> {
use std::io::{BufRead, BufReader};
let file = std::fs::File::open(&self.file_path)?;
let reader = BufReader::new(file);
let mut events = Vec::new();
for (line_num, line) in reader.lines().enumerate() {
let line = line?;
if line.trim().is_empty() {
continue; }
let event: Event = serde_json::from_str(&line).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Failed to parse event at line {}: {}", line_num + 1, e),
)
})?;
events.push(event);
}
Ok(events)
}
pub fn replay_filtered(&self, filter: &EventFilter) -> std::io::Result<Vec<Event>> {
let all_events = self.replay_all()?;
Ok(all_events
.into_iter()
.filter(|e| filter.matches(e))
.collect())
}
pub fn replay_since(&self, since_timestamp_ms: i64) -> std::io::Result<Vec<Event>> {
let filter = EventFilter::new().with_min_timestamp(since_timestamp_ms);
self.replay_filtered(&filter)
}
pub fn count_events(&self) -> std::io::Result<usize> {
use std::io::{BufRead, BufReader};
let file = std::fs::File::open(&self.file_path)?;
let reader = BufReader::new(file);
Ok(reader
.lines()
.filter(|l| l.as_ref().is_ok_and(|line| !line.trim().is_empty()))
.count())
}
#[must_use]
#[inline]
pub fn exists(&self) -> bool {
self.file_path.exists()
}
#[must_use]
#[inline]
pub fn file_path(&self) -> &std::path::Path {
&self.file_path
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_bus_creation() {
let bus = EventBus::new();
let stats = bus.stats();
assert_eq!(stats.total_events, 0);
assert_eq!(stats.active_subscribers, 0);
}
#[test]
fn test_subscribe_and_publish() {
let bus = EventBus::new();
let rx = bus.subscribe(EventType::ContentAdded);
bus.publish(Event::content_added("QmTest", 1024));
let event = rx.try_recv().unwrap();
assert_eq!(event.event_type, EventType::ContentAdded);
}
#[test]
fn test_multiple_subscribers() {
let bus = EventBus::new();
let rx1 = bus.subscribe(EventType::ContentAdded);
let rx2 = bus.subscribe(EventType::ContentAdded);
assert_eq!(bus.subscriber_count(EventType::ContentAdded), 2);
bus.publish(Event::content_added("QmTest", 1024));
assert!(rx1.try_recv().is_ok());
assert!(rx2.try_recv().is_ok());
}
#[test]
fn test_event_type_filtering() {
let bus = EventBus::new();
let rx_content = bus.subscribe(EventType::ContentAdded);
let rx_peer = bus.subscribe(EventType::PeerConnected);
bus.publish(Event::content_added("QmTest", 1024));
assert!(rx_content.try_recv().is_ok());
assert!(rx_peer.try_recv().is_err()); }
#[test]
fn test_event_creation_helpers() {
let event = Event::content_added("QmTest", 1024);
assert_eq!(event.event_type, EventType::ContentAdded);
let event = Event::peer_connected("peer1");
assert_eq!(event.event_type, EventType::PeerConnected);
let event = Event::proof_generated("proof1", 2048);
assert_eq!(event.event_type, EventType::ProofGenerated);
}
#[test]
fn test_statistics_tracking() {
let bus = EventBus::new();
bus.publish(Event::content_added("QmTest1", 1024));
bus.publish(Event::content_added("QmTest2", 2048));
bus.publish(Event::peer_connected("peer1"));
let stats = bus.stats();
assert_eq!(stats.total_events, 3);
assert_eq!(stats.event_count(EventType::ContentAdded), 2);
assert_eq!(stats.event_count(EventType::PeerConnected), 1);
}
#[test]
fn test_most_common_event() {
let bus = EventBus::new();
bus.publish(Event::content_added("QmTest1", 1024));
bus.publish(Event::content_added("QmTest2", 2048));
bus.publish(Event::peer_connected("peer1"));
let stats = bus.stats();
let (event_type, count) = stats.most_common_event().unwrap();
assert_eq!(event_type, EventType::ContentAdded);
assert_eq!(count, 2);
}
#[test]
fn test_reset_stats() {
let bus = EventBus::new();
bus.publish(Event::content_added("QmTest", 1024));
assert_eq!(bus.stats().total_events, 1);
bus.reset_stats();
assert_eq!(bus.stats().total_events, 0);
}
#[test]
fn test_clear_subscribers() {
let bus = EventBus::new();
let _rx1 = bus.subscribe(EventType::ContentAdded);
let _rx2 = bus.subscribe(EventType::ContentAdded);
assert_eq!(bus.subscriber_count(EventType::ContentAdded), 2);
bus.clear_subscribers();
assert_eq!(bus.subscriber_count(EventType::ContentAdded), 0);
}
#[test]
fn test_reputation_changed_event() {
let event = Event::reputation_changed("peer1", 0.5, 0.8);
assert_eq!(event.event_type, EventType::ReputationChanged);
if let EventPayload::Reputation {
peer_id,
old_score,
new_score,
} = event.payload
{
assert_eq!(peer_id, "peer1");
assert_eq!(old_score, 0.5);
assert_eq!(new_score, 0.8);
} else {
panic!("Wrong payload type");
}
}
#[test]
fn test_quota_exceeded_event() {
let event = Event::quota_exceeded(1000, 500);
assert_eq!(event.event_type, EventType::QuotaExceeded);
}
#[test]
fn test_garbage_collected_event() {
let event = Event::garbage_collected(1024 * 1024, 5);
assert_eq!(event.event_type, EventType::GarbageCollected);
if let EventPayload::GarbageCollection {
freed_bytes,
items_removed,
} = event.payload
{
assert_eq!(freed_bytes, 1024 * 1024);
assert_eq!(items_removed, 5);
} else {
panic!("Wrong payload type");
}
}
#[test]
fn test_node_lifecycle_events() {
let started = Event::node_started();
assert_eq!(started.event_type, EventType::NodeStarted);
let stopped = Event::node_stopped();
assert_eq!(stopped.event_type, EventType::NodeStopped);
}
#[tokio::test]
async fn test_async_event_bus() {
let bus = AsyncEventBus::new(10);
let mut rx = bus.subscribe(EventType::ContentAdded);
let event = Event::content_added("QmTest", 1024);
let result = bus.publish(event.clone());
assert!(result.is_ok());
let received = rx.recv().await.unwrap();
assert_eq!(received.event_type, EventType::ContentAdded);
}
#[tokio::test]
async fn test_async_event_bus_multiple_receivers() {
let bus = AsyncEventBus::new(10);
let mut rx1 = bus.subscribe(EventType::ContentAdded);
let mut rx2 = bus.subscribe(EventType::ContentAdded);
assert_eq!(bus.receiver_count(EventType::ContentAdded), 2);
let event = Event::content_added("QmTest", 1024);
let _ = bus.publish(event);
assert!(rx1.recv().await.is_ok());
assert!(rx2.recv().await.is_ok());
}
#[tokio::test]
async fn test_async_event_bus_stats() {
let bus = AsyncEventBus::new(10);
let _rx = bus.subscribe(EventType::ContentAdded);
let _ = bus.publish(Event::content_added("QmTest1", 1024));
let _ = bus.publish(Event::content_added("QmTest2", 2048));
let stats = bus.stats();
assert_eq!(stats.total_events, 2);
assert_eq!(stats.event_count(EventType::ContentAdded), 2);
}
#[test]
fn test_event_filter_type() {
let filter =
EventFilter::new().with_types(vec![EventType::ContentAdded, EventType::ContentRemoved]);
let event1 = Event::content_added("QmTest", 1024);
assert!(filter.matches(&event1));
let event2 = Event::peer_connected("peer1");
assert!(!filter.matches(&event2));
}
#[test]
fn test_event_filter_timestamp() {
let now = crate::utils::current_timestamp_ms();
let filter = EventFilter::new().with_min_timestamp(now);
let mut old_event = Event::content_added("QmTest", 1024);
old_event.timestamp_ms = now - 1000;
assert!(!filter.matches(&old_event));
let new_event = Event::content_added("QmTest", 1024);
assert!(filter.matches(&new_event));
}
#[test]
fn test_event_filter_cid_prefix() {
let filter =
EventFilter::new().with_payload_filter(PayloadFilter::CidPrefix("Qm".to_string()));
let event1 = Event::content_added("QmTest123", 1024);
assert!(filter.matches(&event1));
let event2 = Event::content_added("Bafytest", 1024);
assert!(!filter.matches(&event2));
}
#[test]
fn test_event_filter_peer_id() {
let filter =
EventFilter::new().with_payload_filter(PayloadFilter::PeerId("peer1".to_string()));
let event1 = Event::peer_connected("peer1");
assert!(filter.matches(&event1));
let event2 = Event::peer_connected("peer2");
assert!(!filter.matches(&event2));
let event3 = Event::reputation_changed("peer1", 0.5, 0.8);
assert!(filter.matches(&event3));
}
#[test]
fn test_event_filter_min_bytes() {
let filter = EventFilter::new().with_payload_filter(PayloadFilter::MinBytes(2048));
let event1 = Event::content_added("QmTest", 4096);
assert!(filter.matches(&event1));
let event2 = Event::content_added("QmTest", 1024);
assert!(!filter.matches(&event2));
let event3 = Event::proof_generated("proof1", 3072);
assert!(filter.matches(&event3));
}
#[test]
fn test_event_batch() {
let mut batch = EventBatch::new();
assert!(batch.is_empty());
batch.add(Event::content_added("QmTest1", 1024));
batch.add(Event::content_added("QmTest2", 2048));
batch.add(Event::peer_connected("peer1"));
assert_eq!(batch.len(), 3);
assert!(!batch.is_empty());
assert_eq!(batch.total_bytes(), 3072);
}
#[test]
fn test_event_batch_filter() {
let mut batch = EventBatch::new();
batch.add(Event::content_added("QmTest1", 1024));
batch.add(Event::content_added("QmTest2", 2048));
batch.add(Event::peer_connected("peer1"));
let filter = EventFilter::new().with_types(vec![EventType::ContentAdded]);
let filtered = batch.filter(&filter);
assert_eq!(filtered.len(), 2);
}
#[test]
fn test_event_batch_total_bytes() {
let mut batch = EventBatch::new();
batch.add(Event::content_added("QmTest", 1024));
batch.add(Event::proof_generated("proof1", 2048));
batch.add(Event::garbage_collected(512, 3));
batch.add(Event::peer_connected("peer1"));
assert_eq!(batch.total_bytes(), 3584); }
#[test]
fn test_event_store_creation() {
let temp_dir = std::env::temp_dir();
let store_path = temp_dir.join("test_event_store_creation.jsonl");
let _ = std::fs::remove_file(&store_path);
let store = EventStore::new(&store_path).unwrap();
assert_eq!(store.events_written(), 0);
assert_eq!(store.file_path(), store_path.as_path());
let _ = std::fs::remove_file(&store_path);
}
#[test]
fn test_event_store_persist() {
let temp_dir = std::env::temp_dir();
let store_path = temp_dir.join("test_event_store_persist.jsonl");
let _ = std::fs::remove_file(&store_path);
let store = EventStore::new(&store_path).unwrap();
let event = Event::content_added("QmTest123", 1024);
store.persist(&event).unwrap();
assert_eq!(store.events_written(), 1);
store.close().unwrap();
let content = std::fs::read_to_string(&store_path).unwrap();
assert!(!content.is_empty());
assert!(content.contains("QmTest123"));
let _ = std::fs::remove_file(&store_path);
}
#[test]
fn test_event_store_persist_batch() {
let temp_dir = std::env::temp_dir();
let store_path = temp_dir.join("test_event_store_persist_batch.jsonl");
let _ = std::fs::remove_file(&store_path);
let store = EventStore::new(&store_path).unwrap();
let events = vec![
Event::content_added("QmTest1", 1024),
Event::content_added("QmTest2", 2048),
Event::peer_connected("peer1"),
];
let count = store.persist_batch(events).unwrap();
assert_eq!(count, 3);
assert_eq!(store.events_written(), 3);
store.close().unwrap();
let _ = std::fs::remove_file(&store_path);
}
#[test]
fn test_event_replay_all() {
let temp_dir = std::env::temp_dir();
let store_path = temp_dir.join("test_event_replay_all.jsonl");
let _ = std::fs::remove_file(&store_path);
let store = EventStore::new(&store_path).unwrap();
let events = vec![
Event::content_added("QmTest1", 1024),
Event::content_added("QmTest2", 2048),
Event::peer_connected("peer1"),
];
store.persist_batch(events).unwrap();
store.close().unwrap();
let replay = EventReplay::new(&store_path);
assert!(replay.exists());
let replayed = replay.replay_all().unwrap();
assert_eq!(replayed.len(), 3);
assert_eq!(replayed[0].event_type, EventType::ContentAdded);
assert_eq!(replayed[1].event_type, EventType::ContentAdded);
assert_eq!(replayed[2].event_type, EventType::PeerConnected);
let _ = std::fs::remove_file(&store_path);
}
#[test]
fn test_event_replay_filtered() {
let temp_dir = std::env::temp_dir();
let store_path = temp_dir.join("test_event_replay_filtered.jsonl");
let _ = std::fs::remove_file(&store_path);
let store = EventStore::new(&store_path).unwrap();
let events = vec![
Event::content_added("QmTest1", 1024),
Event::content_added("QmTest2", 2048),
Event::peer_connected("peer1"),
Event::proof_generated("proof1", 512),
];
store.persist_batch(events).unwrap();
store.close().unwrap();
let replay = EventReplay::new(&store_path);
let filter = EventFilter::new().with_types(vec![EventType::ContentAdded]);
let filtered = replay.replay_filtered(&filter).unwrap();
assert_eq!(filtered.len(), 2);
assert!(
filtered
.iter()
.all(|e| e.event_type == EventType::ContentAdded)
);
let _ = std::fs::remove_file(&store_path);
}
#[test]
fn test_event_replay_since() {
let temp_dir = std::env::temp_dir();
let store_path = temp_dir.join("test_event_replay_since.jsonl");
let _ = std::fs::remove_file(&store_path);
let store = EventStore::new(&store_path).unwrap();
let now = crate::utils::current_timestamp_ms();
let mut old_event = Event::content_added("QmOld", 1024);
old_event.timestamp_ms = now - 10000;
let mut new_event = Event::content_added("QmNew", 2048);
new_event.timestamp_ms = now + 1000;
store.persist(&old_event).unwrap();
store.persist(&new_event).unwrap();
store.close().unwrap();
let replay = EventReplay::new(&store_path);
let recent = replay.replay_since(now).unwrap();
assert_eq!(recent.len(), 1);
if let EventPayload::Content { cid, .. } = &recent[0].payload {
assert_eq!(cid, "QmNew");
} else {
panic!("Expected Content payload");
}
let _ = std::fs::remove_file(&store_path);
}
#[test]
fn test_event_replay_count() {
let temp_dir = std::env::temp_dir();
let store_path = temp_dir.join("test_event_replay_count.jsonl");
let _ = std::fs::remove_file(&store_path);
let store = EventStore::new(&store_path).unwrap();
let events = vec![
Event::content_added("QmTest1", 1024),
Event::content_added("QmTest2", 2048),
Event::peer_connected("peer1"),
];
store.persist_batch(events).unwrap();
store.close().unwrap();
let replay = EventReplay::new(&store_path);
let count = replay.count_events().unwrap();
assert_eq!(count, 3);
let _ = std::fs::remove_file(&store_path);
}
}