1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#![no_std]
#![feature(const_btree_new)]
#[cfg(feature = "role_center")]
extern crate alloc;
mod consts;
mod datagram;
mod history;
mod sender;
use consts::*;
use core::time::Duration;
pub use datagram::set_index;
use datagram::{Datagram, DatagramHeader};
use history::History;
use spin::RwLock;
use unmp::id::Id;
use unmp::link::Link;
use unmp::net;
use unmp::protocol;
use unmp::Connection;
cfg_if::cfg_if! {
if #[cfg(feature = "role_center")] {
use alloc::collections::BTreeMap;
} else{
use fixed_queue::LinearMap;
type BTreeMap<K, V> = LinearMap<K, V, LISTENERS_LEN>;
}
}
static HISTORY: History = History::new();
#[derive(Clone)]
pub struct Addr {
id: Id,
link: Link,
port: u8,
}
impl Addr {
pub const fn new(id: Id, port: u8) -> Self {
Self {
id: id,
link: Link::INVALID,
port: port,
}
}
pub fn id(&self) -> &Id {
&self.id
}
pub fn link(&self) -> &Link {
&self.link
}
pub fn set_link(&mut self, link: Link) {
self.link = link;
}
pub fn port(&self) -> &u8 {
&self.port
}
}
type RecvCallback = fn(remote: Addr, data: &[u8]);
static LISTENERS: RwLock<BTreeMap<u8, RecvCallback>> = RwLock::new(BTreeMap::new());
fn when_recv(conn: Connection, data: &[u8]) {
let datagram = Datagram::parse(data);
if let Ok(datagram) = datagram {
let head = datagram.head();
let body = datagram.body();
if head.control() < 0x80 {
let mut is_repeat = false;
if head.control() & (1 << 1) == (1 << 1) {
if !HISTORY.add(&datagram) {
is_repeat = true;
}
}
if head.control() & (1 << 0) == (1 << 0) {
let mut res =
DatagramHeader::new(head.dstport(), head.srcport(), Some(head.index()));
res.set_control(0x81 | (is_repeat as u8) << 1);
let mut buf: [u8; MAX_LEN] = [0; MAX_LEN];
let buf_len = res.generate(&[], &mut buf);
net::send(PROTOCOL_ID, &buf[..buf_len], &conn.id(), Some(conn.link()));
}
if is_repeat {
return;
}
let addr = Addr {
id: conn.id().clone(),
link: conn.link(),
port: head.srcport(),
};
if let Some(f) = LISTENERS.read().get(&head.dstport()) {
f(addr, body);
}
} else {
sender::finish(head.index());
}
}
}
pub fn init() {
protocol::add_protocol(PROTOCOL_ID, when_recv);
task_stream::spawn(async {
loop {
sender::resend();
task_stream::sleep(Duration::from_millis(100)).await;
}
});
net::on_disconnect(sender::when_disconnect);
}
pub fn listen(port: u8, f: RecvCallback) {
LISTENERS.write().insert(port, f);
}
pub fn send(port: u8, dst: &Addr, body: &[u8], retry: u8) {
let mut head = DatagramHeader::new(port, dst.port, None);
head.set_control(if retry == 0 { 2 } else { 3 });
let mut buf: [u8; MAX_LEN] = [0; MAX_LEN];
let buf_len = head.generate(body, &mut buf);
sender::send(head.index(), &buf[..buf_len], dst.id.clone(), retry);
}