use std::sync::Arc;
use actix::Addr;
use calimero_network_primitives::messages::NetworkEvent;
use tokio::sync::Notify;
use tracing::{debug, info};
use crate::network_event_channel::NetworkEventReceiver;
use crate::NodeManager;
pub struct NetworkEventBridge {
receiver: NetworkEventReceiver,
node_manager: Addr<NodeManager>,
shutdown: Arc<Notify>,
}
impl NetworkEventBridge {
pub fn new(receiver: NetworkEventReceiver, node_manager: Addr<NodeManager>) -> Self {
Self {
receiver,
node_manager,
shutdown: Arc::new(Notify::new()),
}
}
pub fn shutdown_handle(&self) -> Arc<Notify> {
self.shutdown.clone()
}
pub async fn run(mut self) {
info!("Network event bridge started");
loop {
tokio::select! {
event = self.receiver.recv() => {
match event {
Some(event) => {
self.forward_event(event);
}
None => {
info!("Network event channel closed, shutting down bridge");
break;
}
}
}
_ = self.shutdown.notified() => {
info!("Network event bridge received shutdown signal");
break;
}
}
}
self.graceful_shutdown();
info!("Network event bridge stopped");
}
fn forward_event(&self, event: NetworkEvent) {
let event_type = match &event {
NetworkEvent::Message { .. } => "Message",
NetworkEvent::StreamOpened { .. } => "StreamOpened",
NetworkEvent::Subscribed { .. } => "Subscribed",
NetworkEvent::Unsubscribed { .. } => "Unsubscribed",
NetworkEvent::ListeningOn { .. } => "ListeningOn",
NetworkEvent::BlobRequested { .. } => "BlobRequested",
NetworkEvent::BlobProvidersFound { .. } => "BlobProvidersFound",
NetworkEvent::BlobDownloaded { .. } => "BlobDownloaded",
NetworkEvent::BlobDownloadFailed { .. } => "BlobDownloadFailed",
NetworkEvent::SpecializedNodeVerificationRequest { .. } => {
"SpecializedNodeVerificationRequest"
}
NetworkEvent::SpecializedNodeInvitationResponse { .. } => {
"SpecializedNodeInvitationResponse"
}
};
debug!(event_type, "Forwarding network event to NodeManager");
self.node_manager.do_send(event);
}
fn graceful_shutdown(&mut self) {
info!("Draining remaining network events...");
let remaining_events = self.receiver.drain();
let count = remaining_events.len();
if count > 0 {
info!(count, "Forwarding remaining events before shutdown");
for event in remaining_events {
self.forward_event(event);
}
}
info!("Graceful shutdown complete");
}
}
pub type NetworkEventProcessor = NetworkEventBridge;
#[derive(Debug, Clone, Default)]
pub struct NetworkEventProcessorConfig {
pub sync_timeout: std::time::Duration,
}
impl From<&crate::sync::SyncConfig> for NetworkEventProcessorConfig {
fn from(sync_config: &crate::sync::SyncConfig) -> Self {
Self {
sync_timeout: sync_config.timeout,
}
}
}