use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use calimero_network_primitives::messages::{NetworkEvent, NetworkEventDispatcher};
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry;
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Copy)]
pub struct NetworkEventChannelConfig {
pub channel_size: usize,
pub warning_threshold: f64,
pub stats_log_interval: Duration,
}
impl Default for NetworkEventChannelConfig {
fn default() -> Self {
Self {
channel_size: 1000,
warning_threshold: 0.8,
stats_log_interval: Duration::from_secs(30),
}
}
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct EventTypeLabel {
pub event_type: String,
}
#[derive(Debug, Clone)]
pub struct NetworkEventChannelMetrics {
pub channel_depth: Gauge,
pub events_received: Counter,
pub events_processed: Counter,
pub events_dropped: Counter,
pub processing_latency: Histogram,
pub high_watermark: Arc<AtomicU64>,
}
impl NetworkEventChannelMetrics {
pub fn new(registry: &mut Registry) -> Self {
let channel_depth = Gauge::default();
let events_received = Counter::default();
let events_processed = Counter::default();
let events_dropped = Counter::default();
let processing_latency = Histogram::new(exponential_buckets(0.0001, 2.0, 18));
let sub_registry = registry.sub_registry_with_prefix("network_event_channel");
sub_registry.register(
"depth",
"Current number of events waiting in the channel",
channel_depth.clone(),
);
sub_registry.register(
"received_total",
"Total number of events sent to the channel",
events_received.clone(),
);
sub_registry.register(
"processed_total",
"Total number of events received from the channel",
events_processed.clone(),
);
sub_registry.register(
"dropped_total",
"Number of events dropped due to full channel",
events_dropped.clone(),
);
sub_registry.register(
"processing_latency_seconds",
"Time from event send to processing start",
processing_latency.clone(),
);
Self {
channel_depth,
events_received,
events_processed,
events_dropped,
processing_latency,
high_watermark: Arc::new(AtomicU64::new(0)),
}
}
#[cfg(test)]
pub fn new_unregistered() -> Self {
Self {
channel_depth: Gauge::default(),
events_received: Counter::default(),
events_processed: Counter::default(),
events_dropped: Counter::default(),
processing_latency: Histogram::new(exponential_buckets(0.0001, 2.0, 18)),
high_watermark: Arc::new(AtomicU64::new(0)),
}
}
fn update_high_watermark(&self, current_depth: u64) {
let mut current_max = self.high_watermark.load(Ordering::Relaxed);
while current_depth > current_max {
match self.high_watermark.compare_exchange_weak(
current_max,
current_depth,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current_max = actual,
}
}
}
}
#[derive(Debug)]
pub struct TimestampedEvent {
pub event: NetworkEvent,
pub enqueued_at: Instant,
}
#[derive(Debug, Clone)]
pub struct NetworkEventSender {
tx: mpsc::Sender<TimestampedEvent>,
config: NetworkEventChannelConfig,
metrics: NetworkEventChannelMetrics,
}
impl NetworkEventSender {
pub fn send(&self, event: NetworkEvent) -> bool {
let event_type = event_type_name(&event);
let timestamped = TimestampedEvent {
event,
enqueued_at: Instant::now(),
};
match self.tx.try_send(timestamped) {
Ok(()) => {
self.metrics.events_received.inc();
let capacity = self.tx.capacity();
let max_capacity = self.config.channel_size;
let current_depth = max_capacity.saturating_sub(capacity) as u64;
self.metrics.channel_depth.set(current_depth as i64);
self.metrics.update_high_watermark(current_depth);
let fill_ratio = current_depth as f64 / max_capacity as f64;
if fill_ratio >= self.config.warning_threshold {
warn!(
current_depth,
max_capacity,
fill_percent = fill_ratio * 100.0,
event_type,
"Network event channel approaching capacity"
);
}
true
}
Err(mpsc::error::TrySendError::Full(dropped)) => {
self.metrics.events_dropped.inc();
warn!(
event_type,
channel_size = self.config.channel_size,
"Network event channel FULL - dropping event! \
This indicates the processor cannot keep up with incoming events."
);
debug!(
?dropped.event,
"Dropped event details"
);
false
}
Err(mpsc::error::TrySendError::Closed(_)) => {
warn!(
event_type,
"Network event channel closed - processor has shut down"
);
false
}
}
}
pub fn depth(&self) -> usize {
self.config.channel_size.saturating_sub(self.tx.capacity())
}
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}
}
impl NetworkEventDispatcher for NetworkEventSender {
fn dispatch(&self, event: NetworkEvent) -> bool {
self.send(event)
}
}
pub struct NetworkEventReceiver {
rx: mpsc::Receiver<TimestampedEvent>,
metrics: NetworkEventChannelMetrics,
last_stats_log: Instant,
config: NetworkEventChannelConfig,
}
impl NetworkEventReceiver {
pub async fn recv(&mut self) -> Option<NetworkEvent> {
let timestamped = self.rx.recv().await?;
let latency = timestamped.enqueued_at.elapsed();
self.metrics
.processing_latency
.observe(latency.as_secs_f64());
self.metrics.events_processed.inc();
let remaining = self.rx.len();
self.metrics.channel_depth.set(remaining as i64);
if self.last_stats_log.elapsed() >= self.config.stats_log_interval {
self.log_stats();
self.last_stats_log = Instant::now();
}
Some(timestamped.event)
}
pub fn try_recv(&mut self) -> Option<NetworkEvent> {
match self.rx.try_recv() {
Ok(timestamped) => {
let latency = timestamped.enqueued_at.elapsed();
self.metrics
.processing_latency
.observe(latency.as_secs_f64());
self.metrics.events_processed.inc();
self.metrics.channel_depth.set(self.rx.len() as i64);
Some(timestamped.event)
}
Err(_) => None,
}
}
pub fn drain(&mut self) -> Vec<NetworkEvent> {
let mut events = Vec::new();
while let Some(event) = self.try_recv() {
events.push(event);
}
if !events.is_empty() {
info!(
count = events.len(),
"Drained remaining events during shutdown"
);
}
events
}
pub fn close(&mut self) {
self.rx.close();
}
fn log_stats(&self) {
let received = self.metrics.events_received.get();
let processed = self.metrics.events_processed.get();
let dropped = self.metrics.events_dropped.get();
let high_watermark = self.metrics.high_watermark.load(Ordering::Relaxed);
let current_depth = self.rx.len();
info!(
received,
processed, dropped, current_depth, high_watermark, "Network event channel statistics"
);
}
}
pub fn channel(
config: NetworkEventChannelConfig,
registry: &mut Registry,
) -> (NetworkEventSender, NetworkEventReceiver) {
let (tx, rx) = mpsc::channel(config.channel_size);
let metrics = NetworkEventChannelMetrics::new(registry);
let sender = NetworkEventSender {
tx,
config,
metrics: metrics.clone(),
};
let receiver = NetworkEventReceiver {
rx,
metrics,
last_stats_log: Instant::now(),
config,
};
(sender, receiver)
}
#[cfg(test)]
pub fn channel_unregistered(
config: NetworkEventChannelConfig,
) -> (NetworkEventSender, NetworkEventReceiver) {
let (tx, rx) = mpsc::channel(config.channel_size);
let metrics = NetworkEventChannelMetrics::new_unregistered();
let sender = NetworkEventSender {
tx,
config,
metrics: metrics.clone(),
};
let receiver = NetworkEventReceiver {
rx,
metrics,
last_stats_log: Instant::now(),
config,
};
(sender, receiver)
}
fn event_type_name(event: &NetworkEvent) -> &'static str {
match event {
NetworkEvent::ListeningOn { .. } => "listening_on",
NetworkEvent::Subscribed { .. } => "subscribed",
NetworkEvent::Unsubscribed { .. } => "unsubscribed",
NetworkEvent::Message { .. } => "message",
NetworkEvent::StreamOpened { .. } => "stream_opened",
NetworkEvent::BlobRequested { .. } => "blob_requested",
NetworkEvent::BlobProvidersFound { .. } => "blob_providers_found",
NetworkEvent::BlobDownloaded { .. } => "blob_downloaded",
NetworkEvent::BlobDownloadFailed { .. } => "blob_download_failed",
NetworkEvent::SpecializedNodeVerificationRequest { .. } => {
"specialized_node_verification_request"
}
NetworkEvent::SpecializedNodeInvitationResponse { .. } => {
"specialized_node_invitation_response"
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use libp2p::gossipsub::{MessageId, TopicHash};
use libp2p::PeerId;
fn create_test_message_event() -> NetworkEvent {
NetworkEvent::Message {
id: MessageId::new(b"test"),
message: libp2p::gossipsub::Message {
source: Some(PeerId::random()),
data: vec![0, 1, 2, 3],
sequence_number: Some(1),
topic: TopicHash::from_raw("test-topic"),
},
}
}
#[tokio::test]
async fn test_basic_send_receive() {
let config = NetworkEventChannelConfig {
channel_size: 10,
..Default::default()
};
let (sender, mut receiver) = channel_unregistered(config);
let event = create_test_message_event();
assert!(sender.send(event));
let received = receiver.recv().await;
assert!(received.is_some());
}
#[tokio::test]
async fn test_channel_full_drops_events() {
let config = NetworkEventChannelConfig {
channel_size: 2,
warning_threshold: 0.5,
..Default::default()
};
let (sender, mut receiver) = channel_unregistered(config);
assert!(sender.send(create_test_message_event()));
assert!(sender.send(create_test_message_event()));
assert!(!sender.send(create_test_message_event()));
assert_eq!(sender.metrics.events_received.get(), 2);
assert_eq!(sender.metrics.events_dropped.get(), 1);
let events = receiver.drain();
assert_eq!(events.len(), 2);
}
#[tokio::test]
async fn test_graceful_shutdown_drain() {
let config = NetworkEventChannelConfig {
channel_size: 100,
..Default::default()
};
let (sender, mut receiver) = channel_unregistered(config);
for _ in 0..10 {
sender.send(create_test_message_event());
}
receiver.close();
let drained = receiver.drain();
assert_eq!(drained.len(), 10);
}
#[tokio::test]
async fn test_latency_tracking() {
let config = NetworkEventChannelConfig {
channel_size: 10,
..Default::default()
};
let (sender, mut receiver) = channel_unregistered(config);
sender.send(create_test_message_event());
tokio::time::sleep(Duration::from_millis(1)).await;
let _ = receiver.recv().await;
assert_eq!(receiver.metrics.events_processed.get(), 1);
}
}