1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use crate::{
    tasks::Senders,
    traits::{packet_dup::PacketDup, packet_rel::PacketRel}
};
use bytes::Bytes;
use for_event_bus::{BusError, IdentityOfSimple};
use log::{debug, warn};
use std::{sync::Arc, time::Duration};
use tokio::sync::{broadcast, mpsc, oneshot};

pub async fn complete_to_tx_packet<Ack: PacketRel, T: PacketDup>(
    rx_ack: &mut IdentityOfSimple<Ack>,
    packet_id: u16,
    duration: u64,
    tx: &Senders,
    packet: &mut T
) -> anyhow::Result<Arc<Ack>, CommonErr> {
    let data = packet.data();
    let mut dup_data = Option::<Arc<Bytes>>::None;
    tx.tx_network_default(data).await?;
    loop {
        if let Ok(packet) = tokio::time::timeout(
            Duration::from_secs(duration),
            timeout_rx(rx_ack, packet_id)
        )
        .await
        {
            return Ok(packet?);
        } else {
            let data = if let Some(data) = &dup_data {
                data.clone()
            } else {
                let data = packet.dup_data();
                dup_data = Some(data.clone());
                data
            };
            tx.tx_network_default(data.clone()).await?;
        }
    }
}

async fn timeout_rx<T: PacketRel>(
    rx_ack: &mut IdentityOfSimple<T>,
    packet_id: u16
) -> anyhow::Result<Arc<T>, CommonErr> {
    loop {
        let msg = rx_ack.recv().await?;
        if msg.is_rel(packet_id) {
            debug!("rx success: {:?}", msg);
            return Ok(msg);
        }
    }
}

#[derive(Debug)]
pub enum CommonErr {
    ChannelAbnormal
}

impl<T> From<broadcast::error::SendError<T>> for CommonErr {
    fn from(_: broadcast::error::SendError<T>) -> Self {
        Self::ChannelAbnormal
    }
}
impl<T> From<mpsc::error::SendError<T>> for CommonErr {
    fn from(_: mpsc::error::SendError<T>) -> Self {
        Self::ChannelAbnormal
    }
}
impl From<oneshot::error::RecvError> for CommonErr {
    fn from(_: oneshot::error::RecvError) -> Self {
        Self::ChannelAbnormal
    }
}
impl From<broadcast::error::RecvError> for CommonErr {
    fn from(_: broadcast::error::RecvError) -> Self {
        Self::ChannelAbnormal
    }
}
impl From<BusError> for CommonErr {
    fn from(err: BusError) -> Self {
        match err {
            BusError::ChannelErr => Self::ChannelAbnormal,
            BusError::DowncastErr => {
                warn!("downcast err");
                Self::ChannelAbnormal
            }
        }
    }
}
// impl From<Elapsed> for CommonErr {
//     fn from(_: Elapsed) -> Self {
//         Self::Elapsed
//     }
// }