1use std::collections::HashMap;
2use std::hash::Hash;
3use std::io;
4use std::net::SocketAddr;
5use std::sync::atomic::AtomicUsize;
6use std::sync::Arc;
7use std::time::Instant;
8
9use crate::route::{Index, RouteKey, RouteSortKey, DEFAULT_RTT};
10use crate::tunnel::config::LoadBalance;
11use crossbeam_utils::atomic::AtomicCell;
12use dashmap::DashMap;
13
14#[derive(Copy, Clone, Debug)]
15pub struct Route {
16 index: Index,
17 addr: SocketAddr,
18 metric: u8,
19 rtt: u32,
20}
21impl Route {
22 pub fn from(route_key: RouteKey, metric: u8, rtt: u32) -> Self {
23 Self {
24 index: route_key.index,
25 addr: route_key.addr,
26 metric,
27 rtt,
28 }
29 }
30 pub fn from_default_rt(route_key: RouteKey, metric: u8) -> Self {
31 Self {
32 index: route_key.index,
33 addr: route_key.addr,
34 metric,
35 rtt: DEFAULT_RTT,
36 }
37 }
38 pub fn route_key(&self) -> RouteKey {
39 RouteKey {
40 index: self.index,
41 addr: self.addr,
42 }
43 }
44 pub fn sort_key(&self) -> RouteSortKey {
45 RouteSortKey {
46 metric: self.metric,
47 rtt: self.rtt,
48 }
49 }
50 pub fn is_direct(&self) -> bool {
51 self.metric == 0
52 }
53 pub fn is_relay(&self) -> bool {
54 self.metric > 0
55 }
56 pub fn rtt(&self) -> u32 {
57 self.rtt
58 }
59 pub fn metric(&self) -> u8 {
60 self.metric
61 }
62}
63
64impl From<(RouteKey, u8)> for Route {
65 fn from((key, metric): (RouteKey, u8)) -> Self {
66 Route::from_default_rt(key, metric)
67 }
68}
69
70pub(crate) type RouteTableInner<PeerID> =
71 Arc<DashMap<PeerID, (AtomicUsize, Vec<(Route, AtomicCell<Instant>)>)>>;
72pub struct RouteTable<PeerID> {
73 pub(crate) route_table: RouteTableInner<PeerID>,
74 route_key_table: Arc<DashMap<RouteKey, PeerID>>,
75 load_balance: LoadBalance,
76}
77impl<PeerID: Hash + Eq> Default for RouteTable<PeerID> {
78 fn default() -> Self {
79 Self {
80 route_table: Default::default(),
81 route_key_table: Default::default(),
82 load_balance: Default::default(),
83 }
84 }
85}
86impl<PeerID> Clone for RouteTable<PeerID> {
87 fn clone(&self) -> Self {
88 Self {
89 route_table: self.route_table.clone(),
90 route_key_table: self.route_key_table.clone(),
91 load_balance: self.load_balance,
92 }
93 }
94}
95impl<PeerID: Hash + Eq> RouteTable<PeerID> {
96 pub fn new(load_balance: LoadBalance) -> RouteTable<PeerID> {
97 Self {
98 route_table: Arc::new(DashMap::with_capacity(64)),
99 route_key_table: Arc::new(DashMap::with_capacity(64)),
100 load_balance,
101 }
102 }
103}
104impl<PeerID: Hash + Eq> RouteTable<PeerID> {
105 pub fn is_empty(&self) -> bool {
106 self.route_table.is_empty()
107 }
108 pub fn is_route_of_peer_id(&self, id: &PeerID, route_key: &RouteKey) -> bool {
109 if let Some(src) = self.route_key_table.get(route_key) {
110 return src.value() == id;
111 }
112 false
113 }
114 pub fn get_route_by_id(&self, id: &PeerID) -> io::Result<Route> {
115 if let Some(entry) = self.route_table.get(id) {
116 let (count, routes) = entry.value();
117 if LoadBalance::LowestLatency == self.load_balance {
118 if let Some((route, _)) = routes.first() {
119 return Ok(*route);
120 }
121 } else {
122 let len = routes.len();
123 if len != 0 {
124 let index = if LoadBalance::RoundRobin == self.load_balance {
125 count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % len
126 } else {
127 0
128 };
129 return Ok(routes[index].0);
130 }
142 }
143 }
144 Err(io::Error::new(io::ErrorKind::NotFound, "route not found"))
145 }
146}
147impl<PeerID: Hash + Eq + Clone> RouteTable<PeerID> {
148 pub fn add_route_if_absent(&self, id: PeerID, route: Route) -> bool {
149 self.add_route_(id, route, true)
150 }
151 pub fn add_route<R: Into<Route>>(&self, id: PeerID, route: R) -> bool {
152 self.add_route_(id, route.into(), false)
153 }
154 pub fn update_read_time(&self, id: &PeerID, route_key: &RouteKey) -> bool {
157 if let Some(entry) = self.route_table.get(id) {
158 let (_, routes) = entry.value();
159 for (route, time) in routes {
160 if &route.route_key() == route_key {
161 time.store(Instant::now());
162 return true;
163 }
164 }
165 }
166 false
167 }
168 pub fn remove_route(&self, id: &PeerID, route_key: &RouteKey) {
170 self.route_table.remove_if_mut(id, |_, (_, routes)| {
171 routes.retain(|(x, _)| &x.route_key() != route_key);
172 self.route_key_table.remove_if(route_key, |_, v| v == id);
173 routes.is_empty()
174 });
175 }
176 pub fn remove_all(&self, id: &PeerID) {
177 self.route_table.remove_if(id, |_, (_, routes)| {
178 for (route, _) in routes {
179 if route.is_direct() {
180 self.route_key_table
181 .remove_if(&route.route_key(), |_, v| v == id);
182 }
183 }
184 true
185 });
186 }
187 pub fn get_id_by_route_key(&self, route_key: &RouteKey) -> Option<PeerID> {
188 self.route_key_table
189 .get(route_key)
190 .map(|v| v.value().clone())
191 }
192 pub fn route(&self, id: &PeerID) -> Option<Vec<Route>> {
193 if let Some(entry) = self.route_table.get(id) {
194 let (_, routes) = entry.value();
195 Some(routes.iter().map(|(i, _)| *i).collect())
196 } else {
197 None
198 }
199 }
200 pub fn route_one(&self, id: &PeerID) -> Option<Route> {
201 if let Some(entry) = self.route_table.get(id) {
202 let (_, routes) = entry.value();
203 routes.first().map(|(i, _)| *i)
204 } else {
205 None
206 }
207 }
208 pub fn route_one_p2p(&self, id: &PeerID) -> Option<Route> {
209 if let Some(entry) = self.route_table.get(id) {
210 let (_, routes) = entry.value();
211 for (i, _) in routes {
212 if i.is_direct() {
213 return Some(*i);
214 }
215 }
216 }
217 None
218 }
219 pub fn route_to_id(&self, route_key: &RouteKey) -> Option<PeerID> {
220 let table = self.route_table.iter();
221 for entry in table {
222 let (id, (_, routes)) = (entry.key(), entry.value());
223 for (route, _) in routes {
224 if &route.route_key() == route_key && route.is_direct() {
225 return Some(id.clone());
226 }
227 }
228 }
229 None
230 }
231 pub fn need_punch(&self, id: &PeerID) -> bool {
232 if let Some(entry) = self.route_table.get(id) {
233 let (_, routes) = entry.value();
234 return !routes.iter().any(|(k, _)| k.is_direct());
236 }
237 true
238 }
239 pub fn no_need_punch(&self, id: &PeerID) -> bool {
240 !self.need_punch(id)
241 }
242 pub fn p2p_num(&self, id: &PeerID) -> usize {
243 if let Some(entry) = self.route_table.get(id) {
244 let (_, routes) = entry.value();
245 routes.iter().filter(|(k, _)| k.is_direct()).count()
246 } else {
247 0
248 }
249 }
250 pub fn relay_num(&self, id: &PeerID) -> usize {
251 if let Some(entry) = self.route_table.get(id) {
252 let (_, routes) = entry.value();
253 routes.iter().filter(|(k, _)| k.is_relay()).count()
254 } else {
255 0
256 }
257 }
258 pub fn route_table(&self) -> Vec<(PeerID, Vec<Route>)> {
260 let table = self.route_table.iter();
261
262 table
263 .map(|entry| {
264 (
265 entry.key().clone(),
266 entry.value().1.iter().map(|(i, _)| *i).collect(),
267 )
268 })
269 .collect()
270 }
271 pub fn route_table_p2p(&self) -> Vec<(PeerID, Route)> {
273 let table = self.route_table.iter();
274 let mut list = Vec::with_capacity(8);
275 for entry in table {
276 let (id, (_, routes)) = (entry.key(), entry.value());
277 for (route, _) in routes.iter() {
278 if route.is_direct() {
279 list.push((id.clone(), *route));
280 break;
281 }
282 }
283 }
284 list
285 }
286 pub fn route_table_one(&self) -> Vec<(PeerID, Route)> {
288 let mut list = Vec::with_capacity(8);
289 let table = self.route_table.iter();
290 for entry in table {
291 let (id, (_, routes)) = (entry.key(), entry.value());
292 if let Some((route, _)) = routes.first() {
293 list.push((id.clone(), *route));
294 }
295 }
296 list
297 }
298 pub fn route_key_table(&self) -> HashMap<RouteKey, Vec<PeerID>> {
301 let mut map: HashMap<RouteKey, Vec<PeerID>> = HashMap::new();
302 let table = self.route_table.iter();
303 for entry in table {
304 let (id, (_, routes)) = (entry.key(), entry.value());
305 for (route, _) in routes {
306 let is_p2p = route.is_direct();
307 map.entry(route.route_key())
308 .and_modify(|list| {
309 list.push(id.clone());
310 if is_p2p {
311 let last_index = list.len() - 1;
312 list.swap(0, last_index);
313 }
314 })
315 .or_insert_with(|| vec![id.clone()]);
316 }
317 }
318 map
319 }
320 pub fn route_table_ids(&self) -> Vec<PeerID> {
321 self.route_table.iter().map(|v| v.key().clone()).collect()
322 }
323 pub fn route_table_min_metric(&self) -> Vec<(PeerID, Route)> {
324 let mut list = Vec::with_capacity(8);
325 let table = self.route_table.iter();
326 for entry in table {
327 let (id, (_, routes)) = (entry.key(), entry.value());
328 if let Some((route, _)) = routes.iter().min_by_key(|(v, _)| v.metric) {
329 list.push((id.clone(), *route));
330 }
331 }
332 list
333 }
334 pub fn route_table_min_rtt(&self) -> Vec<(PeerID, Route)> {
335 let mut list = Vec::with_capacity(8);
336 let table = self.route_table.iter();
337 for entry in table {
338 let (id, (_, routes)) = (entry.key(), entry.value());
339 if let Some((route, _)) = routes.iter().min_by_key(|(v, _)| v.rtt) {
340 list.push((id.clone(), *route));
341 }
342 }
343 list
344 }
345
346 pub fn oldest_route(&self) -> Option<(PeerID, Route, Instant)> {
347 if self.route_table.is_empty() {
348 return None;
349 }
350 let mut option: Option<(PeerID, Route, Instant)> = None;
351 for entry in self.route_table.iter() {
352 let (peer_id, (_, routes)) = (entry.key(), entry.value());
353 for (route, time) in routes {
354 let instant = time.load();
355 if let Some((t_peer_id, t_route, t_instant)) = &mut option {
356 if *t_instant > instant {
357 *t_peer_id = peer_id.clone();
358 *t_route = *route;
359 *t_instant = instant;
360 }
361 } else {
362 option.replace((peer_id.clone(), *route, instant));
363 }
364 }
365 }
366 option
367 }
368}
369impl<PeerID: Hash + Eq + Clone> RouteTable<PeerID> {
370 fn add_route_(&self, id: PeerID, route: Route, only_if_absent: bool) -> bool {
371 let key = route.route_key();
372 if only_if_absent {
373 if let Some(entry) = self.route_table.get(&id) {
374 let (_, routes) = entry.value();
375 for (x, time) in routes {
376 if x.route_key() == key {
377 time.store(Instant::now());
378 return true;
379 }
380 }
381 }
382 }
383 let mut route_table = self
384 .route_table
385 .entry(id)
386 .or_insert_with(|| (AtomicUsize::new(0), Vec::with_capacity(4)));
387 let (peer_id, (_, list)) = route_table.pair_mut();
388 let mut exist = false;
389 for (x, time) in list.iter_mut() {
390 if x.metric < route.metric && self.load_balance != LoadBalance::LowestLatency {
391 return false;
393 }
394 if x.route_key() == key {
395 time.store(Instant::now());
396 if only_if_absent {
397 return true;
398 }
399 x.metric = route.metric;
400 x.rtt = route.rtt;
401 exist = true;
402 break;
403 }
404 }
405 if exist {
406 if self.load_balance != LoadBalance::MostRecent {
407 list.sort_by_key(|(k, _)| k.rtt);
408 }
409 } else {
410 if self.load_balance != LoadBalance::LowestLatency && route.is_direct() {
411 list.retain(|(k, _)| k.is_direct());
413 };
414 if route.is_direct() {
415 self.route_key_table
416 .insert(route.route_key(), peer_id.clone());
417 }
418 if self.load_balance == LoadBalance::MostRecent {
419 list.insert(0, (route, AtomicCell::new(Instant::now())));
420 } else {
421 list.sort_by_key(|(k, _)| k.rtt);
422 list.push((route, AtomicCell::new(Instant::now())));
423 }
424 }
425 true
426 }
427}