Skip to main content

rust_p2p_core/route/
route_table.rs

1use std::collections::HashMap;
2use std::hash::Hash;
3use std::io;
4use std::net::SocketAddr;
5use std::sync::atomic::AtomicUsize;
6use std::sync::Arc;
7use std::time::Instant;
8
9use crate::route::{Index, RouteKey, RouteSortKey, DEFAULT_RTT};
10use crate::tunnel::config::LoadBalance;
11use crossbeam_utils::atomic::AtomicCell;
12use dashmap::DashMap;
13
14#[derive(Copy, Clone, Debug)]
15pub struct Route {
16    index: Index,
17    addr: SocketAddr,
18    metric: u8,
19    rtt: u32,
20}
21impl Route {
22    pub fn from(route_key: RouteKey, metric: u8, rtt: u32) -> Self {
23        Self {
24            index: route_key.index,
25            addr: route_key.addr,
26            metric,
27            rtt,
28        }
29    }
30    pub fn from_default_rt(route_key: RouteKey, metric: u8) -> Self {
31        Self {
32            index: route_key.index,
33            addr: route_key.addr,
34            metric,
35            rtt: DEFAULT_RTT,
36        }
37    }
38    pub fn route_key(&self) -> RouteKey {
39        RouteKey {
40            index: self.index,
41            addr: self.addr,
42        }
43    }
44    pub fn sort_key(&self) -> RouteSortKey {
45        RouteSortKey {
46            metric: self.metric,
47            rtt: self.rtt,
48        }
49    }
50    pub fn is_direct(&self) -> bool {
51        self.metric == 0
52    }
53    pub fn is_relay(&self) -> bool {
54        self.metric > 0
55    }
56    pub fn rtt(&self) -> u32 {
57        self.rtt
58    }
59    pub fn metric(&self) -> u8 {
60        self.metric
61    }
62}
63
64impl From<(RouteKey, u8)> for Route {
65    fn from((key, metric): (RouteKey, u8)) -> Self {
66        Route::from_default_rt(key, metric)
67    }
68}
69
70pub(crate) type RouteTableInner<PeerID> =
71    Arc<DashMap<PeerID, (AtomicUsize, Vec<(Route, AtomicCell<Instant>)>)>>;
72pub struct RouteTable<PeerID> {
73    pub(crate) route_table: RouteTableInner<PeerID>,
74    route_key_table: Arc<DashMap<RouteKey, PeerID>>,
75    load_balance: LoadBalance,
76}
77impl<PeerID: Hash + Eq> Default for RouteTable<PeerID> {
78    fn default() -> Self {
79        Self {
80            route_table: Default::default(),
81            route_key_table: Default::default(),
82            load_balance: Default::default(),
83        }
84    }
85}
86impl<PeerID> Clone for RouteTable<PeerID> {
87    fn clone(&self) -> Self {
88        Self {
89            route_table: self.route_table.clone(),
90            route_key_table: self.route_key_table.clone(),
91            load_balance: self.load_balance,
92        }
93    }
94}
95impl<PeerID: Hash + Eq> RouteTable<PeerID> {
96    pub fn new(load_balance: LoadBalance) -> RouteTable<PeerID> {
97        Self {
98            route_table: Arc::new(DashMap::with_capacity(64)),
99            route_key_table: Arc::new(DashMap::with_capacity(64)),
100            load_balance,
101        }
102    }
103}
104impl<PeerID: Hash + Eq> RouteTable<PeerID> {
105    pub fn is_empty(&self) -> bool {
106        self.route_table.is_empty()
107    }
108    pub fn is_route_of_peer_id(&self, id: &PeerID, route_key: &RouteKey) -> bool {
109        if let Some(src) = self.route_key_table.get(route_key) {
110            return src.value() == id;
111        }
112        false
113    }
114    pub fn get_route_by_id(&self, id: &PeerID) -> io::Result<Route> {
115        if let Some(entry) = self.route_table.get(id) {
116            let (count, routes) = entry.value();
117            if LoadBalance::RoundRobin != self.load_balance {
118                if let Some((route, _)) = routes.first() {
119                    return Ok(*route);
120                }
121            } else {
122                let len = routes.len();
123                if len != 0 {
124                    let index = count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % len;
125                    return Ok(routes[index].0);
126                }
127            }
128        }
129        Err(io::Error::new(io::ErrorKind::NotFound, "route not found"))
130    }
131    pub fn exists(&self, id: &PeerID) -> bool {
132        self.route_table.get(id).is_some()
133    }
134}
135impl<PeerID: Hash + Eq + Clone> RouteTable<PeerID> {
136    pub fn add_route_if_absent(&self, id: PeerID, route: Route) -> bool {
137        self.add_route_(id, route, true)
138    }
139    pub fn add_route<R: Into<Route>>(&self, id: PeerID, route: R) -> bool {
140        self.add_route_(id, route.into(), false)
141    }
142    /// Update the usage time of the route,
143    /// routes that have not received data for a long time will be excluded
144    pub fn update_read_time(&self, id: &PeerID, route_key: &RouteKey) -> bool {
145        if let Some(entry) = self.route_table.get(id) {
146            let (_, routes) = entry.value();
147            for (route, time) in routes {
148                if &route.route_key() == route_key {
149                    time.store(Instant::now());
150                    return true;
151                }
152            }
153        }
154        false
155    }
156    /// Remove specified route
157    pub fn remove_route(&self, id: &PeerID, route_key: &RouteKey) {
158        self.route_table.remove_if_mut(id, |_, (_, routes)| {
159            routes.retain(|(x, _)| &x.route_key() != route_key);
160            self.route_key_table.remove_if(route_key, |_, v| v == id);
161            routes.is_empty()
162        });
163    }
164    pub fn remove_all(&self, id: &PeerID) {
165        self.route_table.remove_if(id, |_, (_, routes)| {
166            for (route, _) in routes {
167                if route.is_direct() {
168                    self.route_key_table
169                        .remove_if(&route.route_key(), |_, v| v == id);
170                }
171            }
172            true
173        });
174    }
175    pub fn get_id_by_route_key(&self, route_key: &RouteKey) -> Option<PeerID> {
176        self.route_key_table
177            .get(route_key)
178            .map(|v| v.value().clone())
179    }
180    pub fn route(&self, id: &PeerID) -> Option<Vec<Route>> {
181        if let Some(entry) = self.route_table.get(id) {
182            let (_, routes) = entry.value();
183            Some(routes.iter().map(|(i, _)| *i).collect())
184        } else {
185            None
186        }
187    }
188    pub fn route_one(&self, id: &PeerID) -> Option<Route> {
189        if let Some(entry) = self.route_table.get(id) {
190            let (_, routes) = entry.value();
191            routes.first().map(|(i, _)| *i)
192        } else {
193            None
194        }
195    }
196    pub fn route_one_p2p(&self, id: &PeerID) -> Option<Route> {
197        if let Some(entry) = self.route_table.get(id) {
198            let (_, routes) = entry.value();
199            for (i, _) in routes {
200                if i.is_direct() {
201                    return Some(*i);
202                }
203            }
204        }
205        None
206    }
207    pub fn route_to_id(&self, route_key: &RouteKey) -> Option<PeerID> {
208        let table = self.route_table.iter();
209        for entry in table {
210            let (id, (_, routes)) = (entry.key(), entry.value());
211            for (route, _) in routes {
212                if &route.route_key() == route_key && route.is_direct() {
213                    return Some(id.clone());
214                }
215            }
216        }
217        None
218    }
219    pub fn need_punch(&self, id: &PeerID) -> bool {
220        if let Some(entry) = self.route_table.get(id) {
221            let (_, routes) = entry.value();
222            //p2p的通道数符合要求
223            return !routes.iter().any(|(k, _)| k.is_direct());
224        }
225        true
226    }
227    pub fn no_need_punch(&self, id: &PeerID) -> bool {
228        !self.need_punch(id)
229    }
230    pub fn p2p_num(&self, id: &PeerID) -> usize {
231        if let Some(entry) = self.route_table.get(id) {
232            let (_, routes) = entry.value();
233            routes.iter().filter(|(k, _)| k.is_direct()).count()
234        } else {
235            0
236        }
237    }
238    pub fn relay_num(&self, id: &PeerID) -> usize {
239        if let Some(entry) = self.route_table.get(id) {
240            let (_, routes) = entry.value();
241            routes.iter().filter(|(k, _)| k.is_relay()).count()
242        } else {
243            0
244        }
245    }
246    /// Return all routes
247    pub fn route_table(&self) -> Vec<(PeerID, Vec<Route>)> {
248        let table = self.route_table.iter();
249
250        table
251            .map(|entry| {
252                (
253                    entry.key().clone(),
254                    entry.value().1.iter().map(|(i, _)| *i).collect(),
255                )
256            })
257            .collect()
258    }
259    /// Return all P2P routes
260    pub fn route_table_p2p(&self) -> Vec<(PeerID, Route)> {
261        let table = self.route_table.iter();
262        let mut list = Vec::with_capacity(8);
263        for entry in table {
264            let (id, (_, routes)) = (entry.key(), entry.value());
265            for (route, _) in routes.iter() {
266                if route.is_direct() {
267                    list.push((id.clone(), *route));
268                    break;
269                }
270            }
271        }
272        list
273    }
274    /// Return to the first route
275    pub fn route_table_one(&self) -> Vec<(PeerID, Route)> {
276        let mut list = Vec::with_capacity(8);
277        let table = self.route_table.iter();
278        for entry in table {
279            let (id, (_, routes)) = (entry.key(), entry.value());
280            if let Some((route, _)) = routes.first() {
281                list.push((id.clone(), *route));
282            }
283        }
284        list
285    }
286    /// Return to `route_key` -> `Vec<PeerID>`,
287    /// where `vec[0]` is the owner of the route
288    pub fn route_key_table(&self) -> HashMap<RouteKey, Vec<PeerID>> {
289        let mut map: HashMap<RouteKey, Vec<PeerID>> = HashMap::new();
290        let table = self.route_table.iter();
291        for entry in table {
292            let (id, (_, routes)) = (entry.key(), entry.value());
293            for (route, _) in routes {
294                let is_p2p = route.is_direct();
295                map.entry(route.route_key())
296                    .and_modify(|list| {
297                        list.push(id.clone());
298                        if is_p2p {
299                            let last_index = list.len() - 1;
300                            list.swap(0, last_index);
301                        }
302                    })
303                    .or_insert_with(|| vec![id.clone()]);
304            }
305        }
306        map
307    }
308    pub fn route_table_ids(&self) -> Vec<PeerID> {
309        self.route_table.iter().map(|v| v.key().clone()).collect()
310    }
311    pub fn route_table_min_metric(&self) -> Vec<(PeerID, Route)> {
312        let mut list = Vec::with_capacity(8);
313        let table = self.route_table.iter();
314        for entry in table {
315            let (id, (_, routes)) = (entry.key(), entry.value());
316            if let Some((route, _)) = routes.iter().min_by_key(|(v, _)| v.metric) {
317                list.push((id.clone(), *route));
318            }
319        }
320        list
321    }
322    pub fn route_table_min_rtt(&self) -> Vec<(PeerID, Route)> {
323        let mut list = Vec::with_capacity(8);
324        let table = self.route_table.iter();
325        for entry in table {
326            let (id, (_, routes)) = (entry.key(), entry.value());
327            if let Some((route, _)) = routes.iter().min_by_key(|(v, _)| v.rtt) {
328                list.push((id.clone(), *route));
329            }
330        }
331        list
332    }
333
334    pub fn oldest_route(&self) -> Option<(PeerID, Route, Instant)> {
335        if self.route_table.is_empty() {
336            return None;
337        }
338        let mut option: Option<(PeerID, Route, Instant)> = None;
339        for entry in self.route_table.iter() {
340            let (peer_id, (_, routes)) = (entry.key(), entry.value());
341            for (route, time) in routes {
342                let instant = time.load();
343                if let Some((t_peer_id, t_route, t_instant)) = &mut option {
344                    if *t_instant > instant {
345                        *t_peer_id = peer_id.clone();
346                        *t_route = *route;
347                        *t_instant = instant;
348                    }
349                } else {
350                    option.replace((peer_id.clone(), *route, instant));
351                }
352            }
353        }
354        option
355    }
356}
357impl<PeerID: Hash + Eq + Clone> RouteTable<PeerID> {
358    fn add_route_(&self, id: PeerID, route: Route, only_if_absent: bool) -> bool {
359        let key = route.route_key();
360        if only_if_absent {
361            if let Some(entry) = self.route_table.get(&id) {
362                let (_, routes) = entry.value();
363                for (x, time) in routes {
364                    if x.route_key() == key {
365                        time.store(Instant::now());
366                        return true;
367                    }
368                }
369            }
370        }
371        let mut route_table = self
372            .route_table
373            .entry(id)
374            .or_insert_with(|| (AtomicUsize::new(0), Vec::with_capacity(4)));
375        let (peer_id, (_, list)) = route_table.pair_mut();
376        let mut exist_index = None;
377        for (index, (x, time)) in list.iter_mut().enumerate() {
378            if x.metric < route.metric && self.load_balance != LoadBalance::LowestLatency {
379                //非优先延迟的情况下 不能比当前的路径更长
380                return false;
381            }
382            if x.route_key() == key {
383                time.store(Instant::now());
384                if only_if_absent {
385                    return true;
386                }
387                x.metric = route.metric;
388                x.rtt = route.rtt;
389                exist_index = Some(index);
390                break;
391            }
392        }
393        if let Some(index) = exist_index {
394            if self.load_balance != LoadBalance::MostRecent {
395                list.sort_by_key(|(k, _)| k.rtt);
396            } else {
397                list.swap(0, index);
398            }
399        } else {
400            if self.load_balance != LoadBalance::LowestLatency && route.is_direct() {
401                //非优先延迟的情况下 添加了直连的则排除非直连的
402                list.retain(|(k, _)| k.is_direct());
403            };
404            if route.is_direct() {
405                self.route_key_table
406                    .insert(route.route_key(), peer_id.clone());
407            }
408            if self.load_balance == LoadBalance::MostRecent {
409                list.insert(0, (route, AtomicCell::new(Instant::now())));
410            } else {
411                list.sort_by_key(|(k, _)| k.rtt);
412                list.push((route, AtomicCell::new(Instant::now())));
413            }
414        }
415        true
416    }
417}