Documentation
use color_eyre::Result;
use flume::Receiver;
use quinn::{Connection, RecvStream, SendStream};

use crate::{session::Session, util::read_msg, EldegossId};

#[derive(Debug)]
pub(crate) struct Link {
    pub(crate) id: EldegossId,
    pub(crate) locator: String,
    pub(crate) connection: Connection,
    pub(crate) msg_to_send: Receiver<Vec<u8>>,
    pub(crate) send: SendStream,
    pub(crate) recv: RecvStream,
    pub(crate) session: Session,
}

impl Link {
    pub(crate) const fn id(&self) -> EldegossId {
        self.id
    }

    pub(crate) async fn handle(self) {
        let Link {
            id,
            locator,
            connection,
            recv,
            send,
            session,
            msg_to_send,
        } = self;

        tokio::spawn(writer(msg_to_send, send));
        tokio::spawn(reader(session, id, locator, connection, recv));
    }
}

async fn reader(
    session: Session,
    id: EldegossId,
    locator: String,
    connection: Connection,
    mut recv: RecvStream,
) {
    loop {
        match read_msg(&mut recv).await {
            Ok(msg) => {
                let id = id.to_u128();
                session.dispatch(msg, id).await;
            }
            Err(e) => {
                warn!("link handle recv msg failed: {e}");
                session.connected_locators.lock().await.remove(&locator);
                session.links.write().await.remove(&id);
                session.check_member_list.lock().await.push(id);
                warn!("link({id}) closed: {:?}", connection.close_reason());
                connection.close(0u32.into(), b"close");
                break;
            }
        }
    }
}

async fn writer(msg_to_send: Receiver<Vec<u8>>, mut send: SendStream) -> Result<()> {
    while let Ok(msg_bytes) = msg_to_send.recv_async().await {
        let len = msg_bytes.len() as u32;
        let len_bytes = len.to_le_bytes().to_vec();
        let bytes = [len_bytes, msg_bytes].concat();
        send.write_all(&bytes).await?;
    }
    Ok(())
}