1use std::collections::HashMap;
2use std::hash::Hash;
3use std::io;
4use std::net::SocketAddr;
5use std::sync::atomic::AtomicUsize;
6use std::sync::Arc;
7use std::time::Instant;
8
9use crate::route::{Index, RouteKey, RouteSortKey, DEFAULT_RTT};
10use crate::tunnel::config::LoadBalance;
11use crossbeam_utils::atomic::AtomicCell;
12use dashmap::DashMap;
13
14#[derive(Copy, Clone, Debug)]
15pub struct Route {
16 index: Index,
17 addr: SocketAddr,
18 metric: u8,
19 rtt: u32,
20}
21impl Route {
22 pub fn from(route_key: RouteKey, metric: u8, rtt: u32) -> Self {
23 Self {
24 index: route_key.index,
25 addr: route_key.addr,
26 metric,
27 rtt,
28 }
29 }
30 pub fn from_default_rt(route_key: RouteKey, metric: u8) -> Self {
31 Self {
32 index: route_key.index,
33 addr: route_key.addr,
34 metric,
35 rtt: DEFAULT_RTT,
36 }
37 }
38 pub fn route_key(&self) -> RouteKey {
39 RouteKey {
40 index: self.index,
41 addr: self.addr,
42 }
43 }
44 pub fn sort_key(&self) -> RouteSortKey {
45 RouteSortKey {
46 metric: self.metric,
47 rtt: self.rtt,
48 }
49 }
50 pub fn is_direct(&self) -> bool {
51 self.metric == 0
52 }
53 pub fn is_relay(&self) -> bool {
54 self.metric > 0
55 }
56 pub fn rtt(&self) -> u32 {
57 self.rtt
58 }
59 pub fn metric(&self) -> u8 {
60 self.metric
61 }
62}
63
64impl From<(RouteKey, u8)> for Route {
65 fn from((key, metric): (RouteKey, u8)) -> Self {
66 Route::from_default_rt(key, metric)
67 }
68}
69
70pub(crate) type RouteTableInner<PeerID> =
71 Arc<DashMap<PeerID, (AtomicUsize, Vec<(Route, AtomicCell<Instant>)>)>>;
72pub struct RouteTable<PeerID> {
73 pub(crate) route_table: RouteTableInner<PeerID>,
74 route_key_table: Arc<DashMap<RouteKey, PeerID>>,
75 load_balance: LoadBalance,
76}
77impl<PeerID: Hash + Eq> Default for RouteTable<PeerID> {
78 fn default() -> Self {
79 Self {
80 route_table: Default::default(),
81 route_key_table: Default::default(),
82 load_balance: Default::default(),
83 }
84 }
85}
86impl<PeerID> Clone for RouteTable<PeerID> {
87 fn clone(&self) -> Self {
88 Self {
89 route_table: self.route_table.clone(),
90 route_key_table: self.route_key_table.clone(),
91 load_balance: self.load_balance,
92 }
93 }
94}
95impl<PeerID: Hash + Eq> RouteTable<PeerID> {
96 pub fn new(load_balance: LoadBalance) -> RouteTable<PeerID> {
97 Self {
98 route_table: Arc::new(DashMap::with_capacity(64)),
99 route_key_table: Arc::new(DashMap::with_capacity(64)),
100 load_balance,
101 }
102 }
103}
104impl<PeerID: Hash + Eq> RouteTable<PeerID> {
105 pub fn is_empty(&self) -> bool {
106 self.route_table.is_empty()
107 }
108 pub fn is_route_of_peer_id(&self, id: &PeerID, route_key: &RouteKey) -> bool {
109 if let Some(src) = self.route_key_table.get(route_key) {
110 return src.value() == id;
111 }
112 false
113 }
114 pub fn get_route_by_id(&self, id: &PeerID) -> io::Result<Route> {
115 if let Some(entry) = self.route_table.get(id) {
116 let (count, routes) = entry.value();
117 if LoadBalance::RoundRobin != self.load_balance {
118 if let Some((route, _)) = routes.first() {
119 return Ok(*route);
120 }
121 } else {
122 let len = routes.len();
123 if len != 0 {
124 let index = count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % len;
125 return Ok(routes[index].0);
126 }
127 }
128 }
129 Err(io::Error::new(io::ErrorKind::NotFound, "route not found"))
130 }
131}
132impl<PeerID: Hash + Eq + Clone> RouteTable<PeerID> {
133 pub fn add_route_if_absent(&self, id: PeerID, route: Route) -> bool {
134 self.add_route_(id, route, true)
135 }
136 pub fn add_route<R: Into<Route>>(&self, id: PeerID, route: R) -> bool {
137 self.add_route_(id, route.into(), false)
138 }
139 pub fn update_read_time(&self, id: &PeerID, route_key: &RouteKey) -> bool {
142 if let Some(entry) = self.route_table.get(id) {
143 let (_, routes) = entry.value();
144 for (route, time) in routes {
145 if &route.route_key() == route_key {
146 time.store(Instant::now());
147 return true;
148 }
149 }
150 }
151 false
152 }
153 pub fn remove_route(&self, id: &PeerID, route_key: &RouteKey) {
155 self.route_table.remove_if_mut(id, |_, (_, routes)| {
156 routes.retain(|(x, _)| &x.route_key() != route_key);
157 self.route_key_table.remove_if(route_key, |_, v| v == id);
158 routes.is_empty()
159 });
160 }
161 pub fn remove_all(&self, id: &PeerID) {
162 self.route_table.remove_if(id, |_, (_, routes)| {
163 for (route, _) in routes {
164 if route.is_direct() {
165 self.route_key_table
166 .remove_if(&route.route_key(), |_, v| v == id);
167 }
168 }
169 true
170 });
171 }
172 pub fn get_id_by_route_key(&self, route_key: &RouteKey) -> Option<PeerID> {
173 self.route_key_table
174 .get(route_key)
175 .map(|v| v.value().clone())
176 }
177 pub fn route(&self, id: &PeerID) -> Option<Vec<Route>> {
178 if let Some(entry) = self.route_table.get(id) {
179 let (_, routes) = entry.value();
180 Some(routes.iter().map(|(i, _)| *i).collect())
181 } else {
182 None
183 }
184 }
185 pub fn route_one(&self, id: &PeerID) -> Option<Route> {
186 if let Some(entry) = self.route_table.get(id) {
187 let (_, routes) = entry.value();
188 routes.first().map(|(i, _)| *i)
189 } else {
190 None
191 }
192 }
193 pub fn route_one_p2p(&self, id: &PeerID) -> Option<Route> {
194 if let Some(entry) = self.route_table.get(id) {
195 let (_, routes) = entry.value();
196 for (i, _) in routes {
197 if i.is_direct() {
198 return Some(*i);
199 }
200 }
201 }
202 None
203 }
204 pub fn route_to_id(&self, route_key: &RouteKey) -> Option<PeerID> {
205 let table = self.route_table.iter();
206 for entry in table {
207 let (id, (_, routes)) = (entry.key(), entry.value());
208 for (route, _) in routes {
209 if &route.route_key() == route_key && route.is_direct() {
210 return Some(id.clone());
211 }
212 }
213 }
214 None
215 }
216 pub fn need_punch(&self, id: &PeerID) -> bool {
217 if let Some(entry) = self.route_table.get(id) {
218 let (_, routes) = entry.value();
219 return !routes.iter().any(|(k, _)| k.is_direct());
221 }
222 true
223 }
224 pub fn no_need_punch(&self, id: &PeerID) -> bool {
225 !self.need_punch(id)
226 }
227 pub fn p2p_num(&self, id: &PeerID) -> usize {
228 if let Some(entry) = self.route_table.get(id) {
229 let (_, routes) = entry.value();
230 routes.iter().filter(|(k, _)| k.is_direct()).count()
231 } else {
232 0
233 }
234 }
235 pub fn relay_num(&self, id: &PeerID) -> usize {
236 if let Some(entry) = self.route_table.get(id) {
237 let (_, routes) = entry.value();
238 routes.iter().filter(|(k, _)| k.is_relay()).count()
239 } else {
240 0
241 }
242 }
243 pub fn route_table(&self) -> Vec<(PeerID, Vec<Route>)> {
245 let table = self.route_table.iter();
246
247 table
248 .map(|entry| {
249 (
250 entry.key().clone(),
251 entry.value().1.iter().map(|(i, _)| *i).collect(),
252 )
253 })
254 .collect()
255 }
256 pub fn route_table_p2p(&self) -> Vec<(PeerID, Route)> {
258 let table = self.route_table.iter();
259 let mut list = Vec::with_capacity(8);
260 for entry in table {
261 let (id, (_, routes)) = (entry.key(), entry.value());
262 for (route, _) in routes.iter() {
263 if route.is_direct() {
264 list.push((id.clone(), *route));
265 break;
266 }
267 }
268 }
269 list
270 }
271 pub fn route_table_one(&self) -> Vec<(PeerID, Route)> {
273 let mut list = Vec::with_capacity(8);
274 let table = self.route_table.iter();
275 for entry in table {
276 let (id, (_, routes)) = (entry.key(), entry.value());
277 if let Some((route, _)) = routes.first() {
278 list.push((id.clone(), *route));
279 }
280 }
281 list
282 }
283 pub fn route_key_table(&self) -> HashMap<RouteKey, Vec<PeerID>> {
286 let mut map: HashMap<RouteKey, Vec<PeerID>> = HashMap::new();
287 let table = self.route_table.iter();
288 for entry in table {
289 let (id, (_, routes)) = (entry.key(), entry.value());
290 for (route, _) in routes {
291 let is_p2p = route.is_direct();
292 map.entry(route.route_key())
293 .and_modify(|list| {
294 list.push(id.clone());
295 if is_p2p {
296 let last_index = list.len() - 1;
297 list.swap(0, last_index);
298 }
299 })
300 .or_insert_with(|| vec![id.clone()]);
301 }
302 }
303 map
304 }
305 pub fn route_table_ids(&self) -> Vec<PeerID> {
306 self.route_table.iter().map(|v| v.key().clone()).collect()
307 }
308 pub fn route_table_min_metric(&self) -> Vec<(PeerID, Route)> {
309 let mut list = Vec::with_capacity(8);
310 let table = self.route_table.iter();
311 for entry in table {
312 let (id, (_, routes)) = (entry.key(), entry.value());
313 if let Some((route, _)) = routes.iter().min_by_key(|(v, _)| v.metric) {
314 list.push((id.clone(), *route));
315 }
316 }
317 list
318 }
319 pub fn route_table_min_rtt(&self) -> Vec<(PeerID, Route)> {
320 let mut list = Vec::with_capacity(8);
321 let table = self.route_table.iter();
322 for entry in table {
323 let (id, (_, routes)) = (entry.key(), entry.value());
324 if let Some((route, _)) = routes.iter().min_by_key(|(v, _)| v.rtt) {
325 list.push((id.clone(), *route));
326 }
327 }
328 list
329 }
330
331 pub fn oldest_route(&self) -> Option<(PeerID, Route, Instant)> {
332 if self.route_table.is_empty() {
333 return None;
334 }
335 let mut option: Option<(PeerID, Route, Instant)> = None;
336 for entry in self.route_table.iter() {
337 let (peer_id, (_, routes)) = (entry.key(), entry.value());
338 for (route, time) in routes {
339 let instant = time.load();
340 if let Some((t_peer_id, t_route, t_instant)) = &mut option {
341 if *t_instant > instant {
342 *t_peer_id = peer_id.clone();
343 *t_route = *route;
344 *t_instant = instant;
345 }
346 } else {
347 option.replace((peer_id.clone(), *route, instant));
348 }
349 }
350 }
351 option
352 }
353}
354impl<PeerID: Hash + Eq + Clone> RouteTable<PeerID> {
355 fn add_route_(&self, id: PeerID, route: Route, only_if_absent: bool) -> bool {
356 let key = route.route_key();
357 if only_if_absent {
358 if let Some(entry) = self.route_table.get(&id) {
359 let (_, routes) = entry.value();
360 for (x, time) in routes {
361 if x.route_key() == key {
362 time.store(Instant::now());
363 return true;
364 }
365 }
366 }
367 }
368 let mut route_table = self
369 .route_table
370 .entry(id)
371 .or_insert_with(|| (AtomicUsize::new(0), Vec::with_capacity(4)));
372 let (peer_id, (_, list)) = route_table.pair_mut();
373 let mut exist = false;
374 for (x, time) in list.iter_mut() {
375 if x.metric < route.metric && self.load_balance != LoadBalance::LowestLatency {
376 return false;
378 }
379 if x.route_key() == key {
380 time.store(Instant::now());
381 if only_if_absent {
382 return true;
383 }
384 x.metric = route.metric;
385 x.rtt = route.rtt;
386 exist = true;
387 break;
388 }
389 }
390 if exist {
391 if self.load_balance != LoadBalance::MostRecent {
392 list.sort_by_key(|(k, _)| k.rtt);
393 }
394 } else {
395 if self.load_balance != LoadBalance::LowestLatency && route.is_direct() {
396 list.retain(|(k, _)| k.is_direct());
398 };
399 if route.is_direct() {
400 self.route_key_table
401 .insert(route.route_key(), peer_id.clone());
402 }
403 if self.load_balance == LoadBalance::MostRecent {
404 list.insert(0, (route, AtomicCell::new(Instant::now())));
405 } else {
406 list.sort_by_key(|(k, _)| k.rtt);
407 list.push((route, AtomicCell::new(Instant::now())));
408 }
409 }
410 true
411 }
412}