rust_p2p_core/route/
route_table.rs

1use std::collections::HashMap;
2use std::hash::Hash;
3use std::io;
4use std::net::SocketAddr;
5use std::sync::atomic::AtomicUsize;
6use std::sync::Arc;
7use std::time::Instant;
8
9use crate::route::{Index, RouteKey, RouteSortKey, DEFAULT_RTT};
10use crate::tunnel::config::LoadBalance;
11use crossbeam_utils::atomic::AtomicCell;
12use dashmap::DashMap;
13
14#[derive(Copy, Clone, Debug)]
15pub struct Route {
16    index: Index,
17    addr: SocketAddr,
18    metric: u8,
19    rtt: u32,
20}
21impl Route {
22    pub fn from(route_key: RouteKey, metric: u8, rtt: u32) -> Self {
23        Self {
24            index: route_key.index,
25            addr: route_key.addr,
26            metric,
27            rtt,
28        }
29    }
30    pub fn from_default_rt(route_key: RouteKey, metric: u8) -> Self {
31        Self {
32            index: route_key.index,
33            addr: route_key.addr,
34            metric,
35            rtt: DEFAULT_RTT,
36        }
37    }
38    pub fn route_key(&self) -> RouteKey {
39        RouteKey {
40            index: self.index,
41            addr: self.addr,
42        }
43    }
44    pub fn sort_key(&self) -> RouteSortKey {
45        RouteSortKey {
46            metric: self.metric,
47            rtt: self.rtt,
48        }
49    }
50    pub fn is_direct(&self) -> bool {
51        self.metric == 0
52    }
53    pub fn is_relay(&self) -> bool {
54        self.metric > 0
55    }
56    pub fn rtt(&self) -> u32 {
57        self.rtt
58    }
59    pub fn metric(&self) -> u8 {
60        self.metric
61    }
62}
63
64impl From<(RouteKey, u8)> for Route {
65    fn from((key, metric): (RouteKey, u8)) -> Self {
66        Route::from_default_rt(key, metric)
67    }
68}
69
70pub(crate) type RouteTableInner<PeerID> =
71    Arc<DashMap<PeerID, (AtomicUsize, Vec<(Route, AtomicCell<Instant>)>)>>;
72pub struct RouteTable<PeerID> {
73    pub(crate) route_table: RouteTableInner<PeerID>,
74    route_key_table: Arc<DashMap<RouteKey, PeerID>>,
75    load_balance: LoadBalance,
76}
77impl<PeerID: Hash + Eq> Default for RouteTable<PeerID> {
78    fn default() -> Self {
79        Self {
80            route_table: Default::default(),
81            route_key_table: Default::default(),
82            load_balance: Default::default(),
83        }
84    }
85}
86impl<PeerID> Clone for RouteTable<PeerID> {
87    fn clone(&self) -> Self {
88        Self {
89            route_table: self.route_table.clone(),
90            route_key_table: self.route_key_table.clone(),
91            load_balance: self.load_balance,
92        }
93    }
94}
95impl<PeerID: Hash + Eq> RouteTable<PeerID> {
96    pub fn new(load_balance: LoadBalance) -> RouteTable<PeerID> {
97        Self {
98            route_table: Arc::new(DashMap::with_capacity(64)),
99            route_key_table: Arc::new(DashMap::with_capacity(64)),
100            load_balance,
101        }
102    }
103}
104impl<PeerID: Hash + Eq> RouteTable<PeerID> {
105    pub fn is_empty(&self) -> bool {
106        self.route_table.is_empty()
107    }
108    pub fn is_route_of_peer_id(&self, id: &PeerID, route_key: &RouteKey) -> bool {
109        if let Some(src) = self.route_key_table.get(route_key) {
110            return src.value() == id;
111        }
112        false
113    }
114    pub fn get_route_by_id(&self, id: &PeerID) -> io::Result<Route> {
115        if let Some(entry) = self.route_table.get(id) {
116            let (count, routes) = entry.value();
117            if LoadBalance::LowestLatency == self.load_balance {
118                if let Some((route, _)) = routes.first() {
119                    return Ok(*route);
120                }
121            } else {
122                let len = routes.len();
123                if len != 0 {
124                    let index = if LoadBalance::RoundRobin == self.load_balance {
125                        count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % len
126                    } else {
127                        0
128                    };
129                    return Ok(routes[index].0);
130                    // let route = &routes[index].0;
131                    // // 尝试跳过默认rt的路由(一般是刚加入的),这有助于提升稳定性
132                    // if route.rtt != DEFAULT_RTT {
133                    //     return Ok(*route);
134                    // }
135                    // for (route, _) in routes {
136                    //     if route.rtt != DEFAULT_RTT {
137                    //         return Ok(*route);
138                    //     }
139                    // }
140                    // return Ok(routes[0].0);
141                }
142            }
143        }
144        Err(io::Error::new(io::ErrorKind::NotFound, "route not found"))
145    }
146}
147impl<PeerID: Hash + Eq + Clone> RouteTable<PeerID> {
148    pub fn add_route_if_absent(&self, id: PeerID, route: Route) -> bool {
149        self.add_route_(id, route, true)
150    }
151    pub fn add_route<R: Into<Route>>(&self, id: PeerID, route: R) -> bool {
152        self.add_route_(id, route.into(), false)
153    }
154    /// Update the usage time of the route,
155    /// routes that have not received data for a long time will be excluded
156    pub fn update_read_time(&self, id: &PeerID, route_key: &RouteKey) -> bool {
157        if let Some(entry) = self.route_table.get(id) {
158            let (_, routes) = entry.value();
159            for (route, time) in routes {
160                if &route.route_key() == route_key {
161                    time.store(Instant::now());
162                    return true;
163                }
164            }
165        }
166        false
167    }
168    /// Remove specified route
169    pub fn remove_route(&self, id: &PeerID, route_key: &RouteKey) {
170        self.route_table.remove_if_mut(id, |_, (_, routes)| {
171            routes.retain(|(x, _)| &x.route_key() != route_key);
172            self.route_key_table.remove_if(route_key, |_, v| v == id);
173            routes.is_empty()
174        });
175    }
176    pub fn remove_all(&self, id: &PeerID) {
177        self.route_table.remove_if(id, |_, (_, routes)| {
178            for (route, _) in routes {
179                if route.is_direct() {
180                    self.route_key_table
181                        .remove_if(&route.route_key(), |_, v| v == id);
182                }
183            }
184            true
185        });
186    }
187    pub fn get_id_by_route_key(&self, route_key: &RouteKey) -> Option<PeerID> {
188        self.route_key_table
189            .get(route_key)
190            .map(|v| v.value().clone())
191    }
192    pub fn route(&self, id: &PeerID) -> Option<Vec<Route>> {
193        if let Some(entry) = self.route_table.get(id) {
194            let (_, routes) = entry.value();
195            Some(routes.iter().map(|(i, _)| *i).collect())
196        } else {
197            None
198        }
199    }
200    pub fn route_one(&self, id: &PeerID) -> Option<Route> {
201        if let Some(entry) = self.route_table.get(id) {
202            let (_, routes) = entry.value();
203            routes.first().map(|(i, _)| *i)
204        } else {
205            None
206        }
207    }
208    pub fn route_one_p2p(&self, id: &PeerID) -> Option<Route> {
209        if let Some(entry) = self.route_table.get(id) {
210            let (_, routes) = entry.value();
211            for (i, _) in routes {
212                if i.is_direct() {
213                    return Some(*i);
214                }
215            }
216        }
217        None
218    }
219    pub fn route_to_id(&self, route_key: &RouteKey) -> Option<PeerID> {
220        let table = self.route_table.iter();
221        for entry in table {
222            let (id, (_, routes)) = (entry.key(), entry.value());
223            for (route, _) in routes {
224                if &route.route_key() == route_key && route.is_direct() {
225                    return Some(id.clone());
226                }
227            }
228        }
229        None
230    }
231    pub fn need_punch(&self, id: &PeerID) -> bool {
232        if let Some(entry) = self.route_table.get(id) {
233            let (_, routes) = entry.value();
234            //p2p的通道数符合要求
235            return !routes.iter().any(|(k, _)| k.is_direct());
236        }
237        true
238    }
239    pub fn no_need_punch(&self, id: &PeerID) -> bool {
240        !self.need_punch(id)
241    }
242    pub fn p2p_num(&self, id: &PeerID) -> usize {
243        if let Some(entry) = self.route_table.get(id) {
244            let (_, routes) = entry.value();
245            routes.iter().filter(|(k, _)| k.is_direct()).count()
246        } else {
247            0
248        }
249    }
250    pub fn relay_num(&self, id: &PeerID) -> usize {
251        if let Some(entry) = self.route_table.get(id) {
252            let (_, routes) = entry.value();
253            routes.iter().filter(|(k, _)| k.is_relay()).count()
254        } else {
255            0
256        }
257    }
258    /// Return all routes
259    pub fn route_table(&self) -> Vec<(PeerID, Vec<Route>)> {
260        let table = self.route_table.iter();
261
262        table
263            .map(|entry| {
264                (
265                    entry.key().clone(),
266                    entry.value().1.iter().map(|(i, _)| *i).collect(),
267                )
268            })
269            .collect()
270    }
271    /// Return all P2P routes
272    pub fn route_table_p2p(&self) -> Vec<(PeerID, Route)> {
273        let table = self.route_table.iter();
274        let mut list = Vec::with_capacity(8);
275        for entry in table {
276            let (id, (_, routes)) = (entry.key(), entry.value());
277            for (route, _) in routes.iter() {
278                if route.is_direct() {
279                    list.push((id.clone(), *route));
280                    break;
281                }
282            }
283        }
284        list
285    }
286    /// Return to the first route
287    pub fn route_table_one(&self) -> Vec<(PeerID, Route)> {
288        let mut list = Vec::with_capacity(8);
289        let table = self.route_table.iter();
290        for entry in table {
291            let (id, (_, routes)) = (entry.key(), entry.value());
292            if let Some((route, _)) = routes.first() {
293                list.push((id.clone(), *route));
294            }
295        }
296        list
297    }
298    /// Return to `route_key` -> `Vec<PeerID>`,
299    /// where `vec[0]` is the owner of the route
300    pub fn route_key_table(&self) -> HashMap<RouteKey, Vec<PeerID>> {
301        let mut map: HashMap<RouteKey, Vec<PeerID>> = HashMap::new();
302        let table = self.route_table.iter();
303        for entry in table {
304            let (id, (_, routes)) = (entry.key(), entry.value());
305            for (route, _) in routes {
306                let is_p2p = route.is_direct();
307                map.entry(route.route_key())
308                    .and_modify(|list| {
309                        list.push(id.clone());
310                        if is_p2p {
311                            let last_index = list.len() - 1;
312                            list.swap(0, last_index);
313                        }
314                    })
315                    .or_insert_with(|| vec![id.clone()]);
316            }
317        }
318        map
319    }
320    pub fn route_table_ids(&self) -> Vec<PeerID> {
321        self.route_table.iter().map(|v| v.key().clone()).collect()
322    }
323    pub fn route_table_min_metric(&self) -> Vec<(PeerID, Route)> {
324        let mut list = Vec::with_capacity(8);
325        let table = self.route_table.iter();
326        for entry in table {
327            let (id, (_, routes)) = (entry.key(), entry.value());
328            if let Some((route, _)) = routes.iter().min_by_key(|(v, _)| v.metric) {
329                list.push((id.clone(), *route));
330            }
331        }
332        list
333    }
334    pub fn route_table_min_rtt(&self) -> Vec<(PeerID, Route)> {
335        let mut list = Vec::with_capacity(8);
336        let table = self.route_table.iter();
337        for entry in table {
338            let (id, (_, routes)) = (entry.key(), entry.value());
339            if let Some((route, _)) = routes.iter().min_by_key(|(v, _)| v.rtt) {
340                list.push((id.clone(), *route));
341            }
342        }
343        list
344    }
345
346    pub fn oldest_route(&self) -> Option<(PeerID, Route, Instant)> {
347        if self.route_table.is_empty() {
348            return None;
349        }
350        let mut option: Option<(PeerID, Route, Instant)> = None;
351        for entry in self.route_table.iter() {
352            let (peer_id, (_, routes)) = (entry.key(), entry.value());
353            for (route, time) in routes {
354                let instant = time.load();
355                if let Some((t_peer_id, t_route, t_instant)) = &mut option {
356                    if *t_instant > instant {
357                        *t_peer_id = peer_id.clone();
358                        *t_route = *route;
359                        *t_instant = instant;
360                    }
361                } else {
362                    option.replace((peer_id.clone(), *route, instant));
363                }
364            }
365        }
366        option
367    }
368}
369impl<PeerID: Hash + Eq + Clone> RouteTable<PeerID> {
370    fn add_route_(&self, id: PeerID, route: Route, only_if_absent: bool) -> bool {
371        let key = route.route_key();
372        if only_if_absent {
373            if let Some(entry) = self.route_table.get(&id) {
374                let (_, routes) = entry.value();
375                for (x, time) in routes {
376                    if x.route_key() == key {
377                        time.store(Instant::now());
378                        return true;
379                    }
380                }
381            }
382        }
383        let mut route_table = self
384            .route_table
385            .entry(id)
386            .or_insert_with(|| (AtomicUsize::new(0), Vec::with_capacity(4)));
387        let (peer_id, (_, list)) = route_table.pair_mut();
388        let mut exist = false;
389        for (x, time) in list.iter_mut() {
390            if x.metric < route.metric && self.load_balance != LoadBalance::LowestLatency {
391                //非优先延迟的情况下 不能比当前的路径更长
392                return false;
393            }
394            if x.route_key() == key {
395                time.store(Instant::now());
396                if only_if_absent {
397                    return true;
398                }
399                x.metric = route.metric;
400                x.rtt = route.rtt;
401                exist = true;
402                break;
403            }
404        }
405        if exist {
406            if self.load_balance != LoadBalance::MostRecent {
407                list.sort_by_key(|(k, _)| k.rtt);
408            }
409        } else {
410            if self.load_balance != LoadBalance::LowestLatency && route.is_direct() {
411                //非优先延迟的情况下 添加了直连的则排除非直连的
412                list.retain(|(k, _)| k.is_direct());
413            };
414            if route.is_direct() {
415                self.route_key_table
416                    .insert(route.route_key(), peer_id.clone());
417            }
418            if self.load_balance == LoadBalance::MostRecent {
419                list.insert(0, (route, AtomicCell::new(Instant::now())));
420            } else {
421                list.sort_by_key(|(k, _)| k.rtt);
422                list.push((route, AtomicCell::new(Instant::now())));
423            }
424        }
425        true
426    }
427}