use std::{io, net::SocketAddr, sync::OnceLock};
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
};
use crate::{connections::Connection, node::Node};
mod handshake;
mod on_connect;
mod on_disconnect;
mod reading;
mod writing;
pub use handshake::Handshake;
pub use on_connect::OnConnect;
pub use on_disconnect::OnDisconnect;
pub use reading::Reading;
pub use writing::Writing;
type OnDisconnectBundle = (JoinHandle<()>, oneshot::Receiver<()>);
#[derive(Default)]
pub(crate) struct Protocols {
pub(crate) handshake: OnceLock<ProtocolHandler<Connection, io::Result<Connection>>>,
pub(crate) reading: OnceLock<ProtocolHandler<Connection, io::Result<Connection>>>,
pub(crate) writing: OnceLock<writing::WritingHandler>,
pub(crate) on_connect: OnceLock<ProtocolHandler<SocketAddr, JoinHandle<()>>>,
pub(crate) on_disconnect: OnceLock<ProtocolHandler<SocketAddr, OnDisconnectBundle>>,
}
pub(crate) type ReturnableItem<T, U> = (T, oneshot::Sender<U>);
pub(crate) type ReturnableConnection = ReturnableItem<Connection, io::Result<Connection>>;
pub(crate) struct ProtocolHandler<T, U>(mpsc::Sender<ReturnableItem<T, U>>);
pub(crate) trait Protocol<T, U> {
async fn trigger(&self, item: ReturnableItem<T, U>);
}
impl<T, U> Protocol<T, U> for ProtocolHandler<T, U> {
async fn trigger(&self, item: ReturnableItem<T, U>) {
let _ = self.0.send(item).await;
}
}
pub(crate) struct DisconnectOnDrop {
pub(crate) node: Option<Node>,
pub(crate) addr: SocketAddr,
}
impl DisconnectOnDrop {
pub(crate) fn new(node: Node, addr: SocketAddr) -> Self {
Self {
node: Some(node),
addr,
}
}
}
impl Drop for DisconnectOnDrop {
fn drop(&mut self) {
if let Some(node) = self.node.take() {
let addr = self.addr;
tokio::spawn(async move { node.disconnect(addr).await });
}
}
}
pub(crate) fn log_setup_join(
span: &tracing::Span,
protocol: &'static str,
res: Option<Result<(), tokio::task::JoinError>>,
) {
if let Some(Err(e)) = res {
if e.is_panic() {
tracing::error!(parent: span, "a {protocol} setup task panicked: {e}");
}
}
}