1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
//! Socket

use crate::consts::*;
use crate::datagram::Datagram;
use crate::sender;
use crate::Addr;
use fixed_queue::LinearMap;
use futures_intrusive::buffer::ArrayBuf;
use futures_intrusive::channel::shared::*;
use spin::Mutex;
type BTreeMap<K, V> = LinearMap<K, V, LISTENERS_LEN>;

static CH_LIST: Mutex<BTreeMap<u8, WaitDataSender>> = Mutex::new(BTreeMap::new());

pub struct Respose {
    pub body: VecBody,
    pub remote: Addr,
}
type RingBody = ArrayBuf<Respose, [Respose; 1]>;
type WaitDataSender = GenericSender<Mutex<()>, Respose, RingBody>;
type WaitDataRecver = GenericReceiver<Mutex<()>, Respose, RingBody>;

pub struct Config {
    pub retry: u8,
}
pub struct Socket {
    port: u8,
    remote: Option<Addr>,
    recver: WaitDataRecver,
    timeout: (u64, u8),
    cfg: Config,
}
impl Socket {
    /// 创建socket
    pub fn new(port: u8, config: Config) -> Result<Socket, &'static str> {
        let mut ch_list = CH_LIST.lock();
        if let Some(_) = ch_list.get(&port) {
            return Err("repeat.");
        }
        let (sender, recver) = generic_channel(1);
        if ch_list.len() >= LISTENERS_LEN {
            return Err("full.");
        }
        ch_list.insert(port, sender);

        Ok(Socket {
            port: port,
            remote: None,
            recver: recver,
            timeout: (10_000, 1),
            cfg: config,
        })
    }
    /// 设置绑定设备
    pub fn bind(&mut self, remote: Addr) {
        self.remote = Some(remote)
    }
    /// 发送数据给绑定设备
    pub async fn send(&mut self, data: &[u8]) -> Result<(), ()> {
        if let Some(remote) = &self.remote {
            if let Ok(consum) =
                sender::send(self.port, remote, data, self.cfg.retry, self.timeout.0).await
            {
                self.update_timeout(consum);
                return Ok(());
            }
        }
        return Err(());
    }
    /// 发送数据给目标设备
    pub async fn send_to(&mut self, data: &[u8], remote: &Addr) -> Result<(), ()> {
        if let Ok(consum) =
            sender::send(self.port, remote, data, self.cfg.retry, self.timeout.0).await
        {
            self.update_timeout(consum);
            return Ok(());
        }
        return Err(());
    }
    /// 接收数据
    pub async fn recv(&self) -> Result<Respose, ()> {
        if let Some(rep) = self.recver.receive().await {
            Ok(rep)
        } else {
            Err(())
        }
    }
    /// 更新超时时间
    fn update_timeout(&mut self, consum: u64) {
        let (timeout, num) = self.timeout;
        if num < 200 {
            self.timeout.0 = (timeout * num as u64 + 2 * (consum + 100)) / (num as u64 + 1);
            self.timeout.1 += 1;
        }
    }
}

pub(crate) fn when_recv(datagram: &Datagram, remote: Addr) {
    let ch_list = CH_LIST.lock();
    if let Some(sender) = ch_list.get(&datagram.head().dstport()) {
        let rep = Respose {
            body: VecBody::from(datagram.body()),
            remote: remote,
        };
        task_stream::spawn(sender.send(rep));
    }
}