Documentation
use std::sync::Arc;

use common_x::graceful_shutdown::CloseHandler;
use flume::Receiver;
use quinn::{Connection, RecvStream, SendStream};
use tokio::select;

use crate::{protocol::Sample, session::SessionRuntime, util::read_msg, EldegossId};

pub(crate) struct Link {
    pub(crate) id: EldegossId,
    pub(crate) locator: String,
    pub(crate) connection: Connection,
    pub(crate) msg_to_send: Receiver<Arc<Sample>>,
    pub(crate) send: SendStream,
    pub(crate) recv: RecvStream,
    pub(crate) session: SessionRuntime,
    pub(crate) close_handler: CloseHandler,
}

impl Link {
    pub(crate) const fn id(&self) -> EldegossId {
        self.id
    }

    pub(crate) async fn handle(self) {
        let Link {
            id,
            locator,
            connection,
            recv,
            send,
            session,
            msg_to_send,
            mut close_handler,
        } = self;

        select! {
            _ = reader(session.clone(), id, recv) => {
                connection.close(0_u32.into(), b"close by read msg error");
            }
            _ = writer(msg_to_send, send) => {
                connection.close(0_u32.into(), b"close by write msg error");
            }
            _ = close_handler.handle_async() => {
                connection.close(0_u32.into(), b"Active shutdown");
                info!("Link({id}): Active shutdown");
            }
            _ = connection.closed() => {
                info!("Link({}) connection closed, reason: {:?}", id, connection.close_reason());
            }
        }

        session.links.write().await.remove(&id);
        session.connected_locators.lock().await.remove(&locator);
        session.check_member_list.lock().await.push(id);
        info!("Link({}) closed", id);
    }
}

async fn reader(session: SessionRuntime, id: EldegossId, mut recv: RecvStream) {
    loop {
        match read_msg(&mut recv).await {
            Ok(msg) => {
                let id = id.to_u128();
                session.dispatch(msg, id).await;
            }
            Err(e) => {
                warn!("link handle recv msg failed: {e}");
                break;
            }
        }
    }
}

async fn writer(msg_to_send: Receiver<Arc<Sample>>, mut send: SendStream) {
    while let Ok(sample) = msg_to_send.recv_async().await {
        let msg_bytes = sample.encode();
        let len = msg_bytes.len() as u32;
        let len_bytes = len.to_le_bytes().to_vec();
        let bytes = [len_bytes, msg_bytes].concat();
        if let Err(e) = send.write_all(&bytes).await {
            warn!("link handle send msg failed: {e}");
            break;
        };
    }
}