gwyh 0.1.0

gwyh is a sweet gossip library
Documentation
use std::sync::Arc;

use tokio::net::UdpSocket;
use tokio::sync::Notify;
use tracing::info;

use crate::handshake::HandshakerMessage;
use crate::nodeinfo::NodeInfo;
use crate::packet_handler::PacketHandlerMessage;
use crate::peer_manager::PeerManagerRequest;
use crate::registry::Registry;
use crate::GwyhHandler;

pub struct Server {
    socket: Arc<UdpSocket>,
    registry: Registry,
    notify_ready: Option<Arc<Notify>>,
    broadcast_handler: Option<Arc<dyn GwyhHandler + Send + Sync>>,
    notify_shutdown: Arc<Notify>,
}

impl Server {
    pub async fn new(
        nodeinfo: &NodeInfo,
        registry: Registry,
        notify_ready: Arc<Notify>,
        broadcast_handler: Option<Arc<dyn GwyhHandler + Send + Sync>>,
        notify_shutdown: Arc<Notify>,
    ) -> std::io::Result<Self> {
        let nodeinfo = nodeinfo.clone();
        let socket = Arc::new(UdpSocket::bind(nodeinfo.bind_addr()).await?);
        println!("{} bound to {}", nodeinfo.id(), nodeinfo.bind_addr());

        Ok(Self {
            socket,
            registry,
            notify_ready: Some(notify_ready),
            broadcast_handler,
            notify_shutdown,
        })
    }

    pub async fn run(mut self) {
        let nodeinfo = self.registry.nodeinfo().clone();
        info!("starting server on nid={}", nodeinfo.id());

        self.registry
            .cast_peer_manager(PeerManagerRequest::SetNotify(
                self.notify_ready.take().unwrap(),
            ))
            .await
            .expect("unable to deliver message");
        self.registry
            .cast_packet_handler(PacketHandlerMessage::SetBroadcastHandler(
                self.broadcast_handler.take(),
                self.broadcast_handler.take(),
            ))
            .await
            .expect("unable to deliver message");
        self.registry
            .cast_packet_handler(PacketHandlerMessage::Start(self.socket.clone()))
            .await
            .expect("unable to deliver message");
        self.registry
            .cast_handshaker(HandshakerMessage::Bootstrap)
            .await
            .expect("unable to deliver message");
        self.registry
            .cast_peer_manager(PeerManagerRequest::StartHeartbeats)
            .await
            .expect("unable to deliver message");

        tokio::spawn(async move {
            self.notify_shutdown.notified().await;
            self.registry
                .call_peer_manager(PeerManagerRequest::Shutdown)
                .await
                .ok();
            self.notify_shutdown.notify_one();
        });
    }
}