1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use crate::consts::*;
use crate::datagram::Datagram;
use crate::sender;
use crate::Addr;
use fixed_queue::LinearMap;
use futures_intrusive::buffer::ArrayBuf;
use futures_intrusive::channel::shared::*;
use spin::Mutex;
type BTreeMap<K, V> = LinearMap<K, V, LISTENERS_LEN>;
static CH_LIST: Mutex<BTreeMap<u8, WaitDataSender>> = Mutex::new(BTreeMap::new());
pub struct Respose {
pub body: VecBody,
pub remote: Addr,
}
type RingBody = ArrayBuf<Respose, [Respose; 1]>;
type WaitDataSender = GenericSender<Mutex<()>, Respose, RingBody>;
type WaitDataRecver = GenericReceiver<Mutex<()>, Respose, RingBody>;
pub struct Config {
pub retry: u8,
}
pub struct Socket {
port: u8,
remote: Option<Addr>,
recver: WaitDataRecver,
timeout: (u64, u8),
cfg: Config,
}
impl Socket {
pub fn new(port: u8, config: Config) -> Result<Socket, &'static str> {
let mut ch_list = CH_LIST.lock();
if let Some(_) = ch_list.get(&port) {
return Err("repeat.");
}
let (sender, recver) = generic_channel(1);
if ch_list.len() >= LISTENERS_LEN {
return Err("full.");
}
ch_list.insert(port, sender);
Ok(Socket {
port: port,
remote: None,
recver: recver,
timeout: (10_000, 1),
cfg: config,
})
}
pub fn bind(&mut self, remote: Addr) {
self.remote = Some(remote)
}
pub async fn send(&mut self, data: &[u8]) -> Result<(), ()> {
if let Some(remote) = &self.remote {
if let Ok(consum) =
sender::send(self.port, remote, data, self.cfg.retry, self.timeout.0).await
{
self.update_timeout(consum);
return Ok(());
}
}
return Err(());
}
pub async fn send_to(&mut self, data: &[u8], remote: &Addr) -> Result<(), ()> {
if let Ok(consum) =
sender::send(self.port, remote, data, self.cfg.retry, self.timeout.0).await
{
self.update_timeout(consum);
return Ok(());
}
return Err(());
}
pub async fn recv(&self) -> Result<Respose, ()> {
if let Some(rep) = self.recver.receive().await {
Ok(rep)
} else {
Err(())
}
}
fn update_timeout(&mut self, consum: u64) {
let (timeout, num) = self.timeout;
if num < 200 {
self.timeout.0 = (timeout * num as u64 + 2 * (consum + 100)) / (num as u64 + 1);
self.timeout.1 += 1;
}
}
}
pub(crate) fn when_recv(datagram: &Datagram, remote: Addr) {
let ch_list = CH_LIST.lock();
if let Some(sender) = ch_list.get(&datagram.head().dstport()) {
let rep = Respose {
body: VecBody::from(datagram.body()),
remote: remote,
};
task_stream::spawn(sender.send(rep));
}
}