1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use log::*;
use std::{
time::Duration,
collections::HashMap,
sync::{Arc}
};
use cyfs_debug::Mutex;
use futures::executor::ThreadPool;
use cyfs_base::*;
use crate::{
types::*,
protocol::{DynamicPackage, PackageBox}
};
use super::{
net_listener::{UdpSender}
};
struct PackageResendInfo {
pkg: Arc<PackageBox>,
sender: Arc<UdpSender>,
interval: Duration,
times: u8,
last_time: Timestamp,
nick_name: String,
}
pub struct ResendQueue {
default_interval: Duration,
max_times: u8,
thread_pool: ThreadPool,
packages: Mutex<HashMap<u32, PackageResendInfo>>,
}
impl ResendQueue {
pub fn new(
thread_pool: ThreadPool,
default_interval: Duration,
max_times: u8) -> ResendQueue {
ResendQueue {
default_interval,
max_times,
thread_pool,
packages: Mutex::new(Default::default()),
}
}
pub fn send(
&self,
sender: Arc<UdpSender>,
pkg: DynamicPackage,
pkg_id: u32,
pkg_nick_name: String) {
let now = bucky_time_now();
let to_send = {
let mut packages = self.packages.lock().unwrap();
if let Some(info) = packages.get_mut(&pkg_id) {
if info.times > 1 {
info.times >>= 1;
info.interval = info.interval / 2;
}
let pkg_box = sender.box_pkg(pkg);
info.sender = sender;
info.pkg = Arc::new(pkg_box);
info.nick_name = pkg_nick_name.clone();
if now > info.last_time
&& Duration::from_micros(now - info.last_time) > info.interval {
info.times += 1;
info.last_time = now;
Some((info.sender.clone(), info.pkg.clone()))
} else {
None
}
} else {
let pkg_box = Arc::new(sender.box_pkg(pkg));
packages.insert(pkg_id, PackageResendInfo {
pkg: pkg_box.clone(),
sender: sender.clone(),
interval: self.default_interval,
times: 1,
last_time: bucky_time_now(),
nick_name: pkg_nick_name.clone()
});
Some((sender, pkg_box.clone()))
}
};
if let Some((sender, pkg)) = to_send {
self.thread_pool.spawn_ok(async move {
match sender.send(&*pkg).await {
Ok(_) => {
info!("{} send ok.", pkg_nick_name);
},
Err(e) => {
warn!("{} send failed, error: {}.", pkg_nick_name, e.to_string());
}
}
});
}
}
pub fn confirm_pkg(&self, pkg_id: u32) {
self.packages.lock().unwrap().remove(&pkg_id);
}
pub fn try_resend(&self, now: Timestamp) {
let mut to_send = vec![];
let mut will_remove = vec![];
{
let mut packages = self.packages.lock().unwrap();
for (pkg_id, pkg_info) in packages.iter_mut() {
if now > pkg_info.last_time
&& Duration::from_micros(now - pkg_info.last_time) > pkg_info.interval {
pkg_info.times += 1;
pkg_info.interval = pkg_info.interval * 2;
pkg_info.last_time = now;
if pkg_info.times >= self.max_times {
will_remove.push(*pkg_id);
}
let pkg = pkg_info.pkg.clone();
let sender = pkg_info.sender.clone();
let nick_name = pkg_info.nick_name.clone();
to_send.push((pkg, sender, nick_name));
}
}
for id in will_remove {
let pkg = packages.remove(&id);
if let Some(p) = pkg {
warn!("{} resend timeout, to: {}.", p.nick_name, p.sender.session_name());
}
}
}
for (pkg, sender, nick_name) in to_send {
self.thread_pool.spawn_ok(async move {
match sender.send(&*pkg).await {
Ok(_) => {
info!("{} send ok, to: {}.", nick_name, sender.session_name());
},
Err(e) => {
warn!("{} send failed, to: {}, error: {}.", nick_name, sender.session_name(), e.to_string());
}
}
});
}
}
}