1use std::collections::HashMap;
2use std::hash::Hash;
3use std::io;
4use std::net::SocketAddr;
5use std::sync::atomic::AtomicUsize;
6use std::sync::Arc;
7use std::time::Instant;
8
9use crate::route::{Index, RouteKey, RouteSortKey, DEFAULT_RTT};
10use crate::tunnel::config::LoadBalance;
11use crossbeam_utils::atomic::AtomicCell;
12use dashmap::DashMap;
13
14#[derive(Copy, Clone, Debug)]
15pub struct Route {
16 index: Index,
17 addr: SocketAddr,
18 metric: u8,
19 rtt: u32,
20}
21impl Route {
22 pub fn from(route_key: RouteKey, metric: u8, rtt: u32) -> Self {
23 Self {
24 index: route_key.index,
25 addr: route_key.addr,
26 metric,
27 rtt,
28 }
29 }
30 pub fn from_default_rt(route_key: RouteKey, metric: u8) -> Self {
31 Self {
32 index: route_key.index,
33 addr: route_key.addr,
34 metric,
35 rtt: DEFAULT_RTT,
36 }
37 }
38 pub fn route_key(&self) -> RouteKey {
39 RouteKey {
40 index: self.index,
41 addr: self.addr,
42 }
43 }
44 pub fn sort_key(&self) -> RouteSortKey {
45 RouteSortKey {
46 metric: self.metric,
47 rtt: self.rtt,
48 }
49 }
50 pub fn is_direct(&self) -> bool {
51 self.metric == 0
52 }
53 pub fn is_relay(&self) -> bool {
54 self.metric > 0
55 }
56 pub fn rtt(&self) -> u32 {
57 self.rtt
58 }
59 pub fn metric(&self) -> u8 {
60 self.metric
61 }
62}
63
64impl From<(RouteKey, u8)> for Route {
65 fn from((key, metric): (RouteKey, u8)) -> Self {
66 Route::from_default_rt(key, metric)
67 }
68}
69
70pub(crate) type RouteTableInner<PeerID> =
71 Arc<DashMap<PeerID, (AtomicUsize, Vec<(Route, AtomicCell<Instant>)>)>>;
72pub struct RouteTable<PeerID> {
73 pub(crate) route_table: RouteTableInner<PeerID>,
74 route_key_table: Arc<DashMap<RouteKey, PeerID>>,
75 load_balance: LoadBalance,
76}
77impl<PeerID: Hash + Eq> Default for RouteTable<PeerID> {
78 fn default() -> Self {
79 Self {
80 route_table: Default::default(),
81 route_key_table: Default::default(),
82 load_balance: Default::default(),
83 }
84 }
85}
86impl<PeerID> Clone for RouteTable<PeerID> {
87 fn clone(&self) -> Self {
88 Self {
89 route_table: self.route_table.clone(),
90 route_key_table: self.route_key_table.clone(),
91 load_balance: self.load_balance,
92 }
93 }
94}
95impl<PeerID: Hash + Eq> RouteTable<PeerID> {
96 pub fn new(load_balance: LoadBalance) -> RouteTable<PeerID> {
97 Self {
98 route_table: Arc::new(DashMap::with_capacity(64)),
99 route_key_table: Arc::new(DashMap::with_capacity(64)),
100 load_balance,
101 }
102 }
103}
104impl<PeerID: Hash + Eq> RouteTable<PeerID> {
105 pub fn is_empty(&self) -> bool {
106 self.route_table.is_empty()
107 }
108 pub fn is_route_of_peer_id(&self, id: &PeerID, route_key: &RouteKey) -> bool {
109 if let Some(src) = self.route_key_table.get(route_key) {
110 return src.value() == id;
111 }
112 false
113 }
114 pub fn get_route_by_id(&self, id: &PeerID) -> io::Result<Route> {
115 if let Some(entry) = self.route_table.get(id) {
116 let (count, routes) = entry.value();
117 if LoadBalance::RoundRobin != self.load_balance {
118 if let Some((route, _)) = routes.first() {
119 return Ok(*route);
120 }
121 } else {
122 let len = routes.len();
123 if len != 0 {
124 let index = count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % len;
125 return Ok(routes[index].0);
126 }
127 }
128 }
129 Err(io::Error::new(io::ErrorKind::NotFound, "route not found"))
130 }
131 pub fn exists(&self, id: &PeerID) -> bool {
132 self.route_table.get(id).is_some()
133 }
134}
135impl<PeerID: Hash + Eq + Clone> RouteTable<PeerID> {
136 pub fn add_route_if_absent(&self, id: PeerID, route: Route) -> bool {
137 self.add_route_(id, route, true)
138 }
139 pub fn add_route<R: Into<Route>>(&self, id: PeerID, route: R) -> bool {
140 self.add_route_(id, route.into(), false)
141 }
142 pub fn update_read_time(&self, id: &PeerID, route_key: &RouteKey) -> bool {
145 if let Some(entry) = self.route_table.get(id) {
146 let (_, routes) = entry.value();
147 for (route, time) in routes {
148 if &route.route_key() == route_key {
149 time.store(Instant::now());
150 return true;
151 }
152 }
153 }
154 false
155 }
156 pub fn remove_route(&self, id: &PeerID, route_key: &RouteKey) {
158 self.route_table.remove_if_mut(id, |_, (_, routes)| {
159 routes.retain(|(x, _)| &x.route_key() != route_key);
160 self.route_key_table.remove_if(route_key, |_, v| v == id);
161 routes.is_empty()
162 });
163 }
164 pub fn remove_all(&self, id: &PeerID) {
165 self.route_table.remove_if(id, |_, (_, routes)| {
166 for (route, _) in routes {
167 if route.is_direct() {
168 self.route_key_table
169 .remove_if(&route.route_key(), |_, v| v == id);
170 }
171 }
172 true
173 });
174 }
175 pub fn get_id_by_route_key(&self, route_key: &RouteKey) -> Option<PeerID> {
176 self.route_key_table
177 .get(route_key)
178 .map(|v| v.value().clone())
179 }
180 pub fn route(&self, id: &PeerID) -> Option<Vec<Route>> {
181 if let Some(entry) = self.route_table.get(id) {
182 let (_, routes) = entry.value();
183 Some(routes.iter().map(|(i, _)| *i).collect())
184 } else {
185 None
186 }
187 }
188 pub fn route_one(&self, id: &PeerID) -> Option<Route> {
189 if let Some(entry) = self.route_table.get(id) {
190 let (_, routes) = entry.value();
191 routes.first().map(|(i, _)| *i)
192 } else {
193 None
194 }
195 }
196 pub fn route_one_p2p(&self, id: &PeerID) -> Option<Route> {
197 if let Some(entry) = self.route_table.get(id) {
198 let (_, routes) = entry.value();
199 for (i, _) in routes {
200 if i.is_direct() {
201 return Some(*i);
202 }
203 }
204 }
205 None
206 }
207 pub fn route_to_id(&self, route_key: &RouteKey) -> Option<PeerID> {
208 let table = self.route_table.iter();
209 for entry in table {
210 let (id, (_, routes)) = (entry.key(), entry.value());
211 for (route, _) in routes {
212 if &route.route_key() == route_key && route.is_direct() {
213 return Some(id.clone());
214 }
215 }
216 }
217 None
218 }
219 pub fn need_punch(&self, id: &PeerID) -> bool {
220 if let Some(entry) = self.route_table.get(id) {
221 let (_, routes) = entry.value();
222 return !routes.iter().any(|(k, _)| k.is_direct());
224 }
225 true
226 }
227 pub fn no_need_punch(&self, id: &PeerID) -> bool {
228 !self.need_punch(id)
229 }
230 pub fn p2p_num(&self, id: &PeerID) -> usize {
231 if let Some(entry) = self.route_table.get(id) {
232 let (_, routes) = entry.value();
233 routes.iter().filter(|(k, _)| k.is_direct()).count()
234 } else {
235 0
236 }
237 }
238 pub fn relay_num(&self, id: &PeerID) -> usize {
239 if let Some(entry) = self.route_table.get(id) {
240 let (_, routes) = entry.value();
241 routes.iter().filter(|(k, _)| k.is_relay()).count()
242 } else {
243 0
244 }
245 }
246 pub fn route_table(&self) -> Vec<(PeerID, Vec<Route>)> {
248 let table = self.route_table.iter();
249
250 table
251 .map(|entry| {
252 (
253 entry.key().clone(),
254 entry.value().1.iter().map(|(i, _)| *i).collect(),
255 )
256 })
257 .collect()
258 }
259 pub fn route_table_p2p(&self) -> Vec<(PeerID, Route)> {
261 let table = self.route_table.iter();
262 let mut list = Vec::with_capacity(8);
263 for entry in table {
264 let (id, (_, routes)) = (entry.key(), entry.value());
265 for (route, _) in routes.iter() {
266 if route.is_direct() {
267 list.push((id.clone(), *route));
268 break;
269 }
270 }
271 }
272 list
273 }
274 pub fn route_table_one(&self) -> Vec<(PeerID, Route)> {
276 let mut list = Vec::with_capacity(8);
277 let table = self.route_table.iter();
278 for entry in table {
279 let (id, (_, routes)) = (entry.key(), entry.value());
280 if let Some((route, _)) = routes.first() {
281 list.push((id.clone(), *route));
282 }
283 }
284 list
285 }
286 pub fn route_key_table(&self) -> HashMap<RouteKey, Vec<PeerID>> {
289 let mut map: HashMap<RouteKey, Vec<PeerID>> = HashMap::new();
290 let table = self.route_table.iter();
291 for entry in table {
292 let (id, (_, routes)) = (entry.key(), entry.value());
293 for (route, _) in routes {
294 let is_p2p = route.is_direct();
295 map.entry(route.route_key())
296 .and_modify(|list| {
297 list.push(id.clone());
298 if is_p2p {
299 let last_index = list.len() - 1;
300 list.swap(0, last_index);
301 }
302 })
303 .or_insert_with(|| vec![id.clone()]);
304 }
305 }
306 map
307 }
308 pub fn route_table_ids(&self) -> Vec<PeerID> {
309 self.route_table.iter().map(|v| v.key().clone()).collect()
310 }
311 pub fn route_table_min_metric(&self) -> Vec<(PeerID, Route)> {
312 let mut list = Vec::with_capacity(8);
313 let table = self.route_table.iter();
314 for entry in table {
315 let (id, (_, routes)) = (entry.key(), entry.value());
316 if let Some((route, _)) = routes.iter().min_by_key(|(v, _)| v.metric) {
317 list.push((id.clone(), *route));
318 }
319 }
320 list
321 }
322 pub fn route_table_min_rtt(&self) -> Vec<(PeerID, Route)> {
323 let mut list = Vec::with_capacity(8);
324 let table = self.route_table.iter();
325 for entry in table {
326 let (id, (_, routes)) = (entry.key(), entry.value());
327 if let Some((route, _)) = routes.iter().min_by_key(|(v, _)| v.rtt) {
328 list.push((id.clone(), *route));
329 }
330 }
331 list
332 }
333
334 pub fn oldest_route(&self) -> Option<(PeerID, Route, Instant)> {
335 if self.route_table.is_empty() {
336 return None;
337 }
338 let mut option: Option<(PeerID, Route, Instant)> = None;
339 for entry in self.route_table.iter() {
340 let (peer_id, (_, routes)) = (entry.key(), entry.value());
341 for (route, time) in routes {
342 let instant = time.load();
343 if let Some((t_peer_id, t_route, t_instant)) = &mut option {
344 if *t_instant > instant {
345 *t_peer_id = peer_id.clone();
346 *t_route = *route;
347 *t_instant = instant;
348 }
349 } else {
350 option.replace((peer_id.clone(), *route, instant));
351 }
352 }
353 }
354 option
355 }
356}
357impl<PeerID: Hash + Eq + Clone> RouteTable<PeerID> {
358 fn add_route_(&self, id: PeerID, route: Route, only_if_absent: bool) -> bool {
359 let key = route.route_key();
360 if only_if_absent {
361 if let Some(entry) = self.route_table.get(&id) {
362 let (_, routes) = entry.value();
363 for (x, time) in routes {
364 if x.route_key() == key {
365 time.store(Instant::now());
366 return true;
367 }
368 }
369 }
370 }
371 let mut route_table = self
372 .route_table
373 .entry(id)
374 .or_insert_with(|| (AtomicUsize::new(0), Vec::with_capacity(4)));
375 let (peer_id, (_, list)) = route_table.pair_mut();
376 let mut exist_index = None;
377 for (index, (x, time)) in list.iter_mut().enumerate() {
378 if x.metric < route.metric && self.load_balance != LoadBalance::LowestLatency {
379 return false;
381 }
382 if x.route_key() == key {
383 time.store(Instant::now());
384 if only_if_absent {
385 return true;
386 }
387 x.metric = route.metric;
388 x.rtt = route.rtt;
389 exist_index = Some(index);
390 break;
391 }
392 }
393 if let Some(index) = exist_index {
394 if self.load_balance != LoadBalance::MostRecent {
395 list.sort_by_key(|(k, _)| k.rtt);
396 } else {
397 list.swap(0, index);
398 }
399 } else {
400 if self.load_balance != LoadBalance::LowestLatency && route.is_direct() {
401 list.retain(|(k, _)| k.is_direct());
403 };
404 if route.is_direct() {
405 self.route_key_table
406 .insert(route.route_key(), peer_id.clone());
407 }
408 if self.load_balance == LoadBalance::MostRecent {
409 list.insert(0, (route, AtomicCell::new(Instant::now())));
410 } else {
411 list.sort_by_key(|(k, _)| k.rtt);
412 list.push((route, AtomicCell::new(Instant::now())));
413 }
414 }
415 true
416 }
417}