rust_p2p_core/route/
route_table.rs

1use std::collections::HashMap;
2use std::hash::Hash;
3use std::io;
4use std::net::SocketAddr;
5use std::sync::atomic::AtomicUsize;
6use std::sync::Arc;
7use std::time::Instant;
8
9use crate::route::{Index, RouteKey, RouteSortKey, DEFAULT_RTT};
10use crate::tunnel::config::LoadBalance;
11use crossbeam_utils::atomic::AtomicCell;
12use dashmap::DashMap;
13
14#[derive(Copy, Clone, Debug)]
15pub struct Route {
16    index: Index,
17    addr: SocketAddr,
18    metric: u8,
19    rtt: u32,
20}
21impl Route {
22    pub fn from(route_key: RouteKey, metric: u8, rtt: u32) -> Self {
23        Self {
24            index: route_key.index,
25            addr: route_key.addr,
26            metric,
27            rtt,
28        }
29    }
30    pub fn from_default_rt(route_key: RouteKey, metric: u8) -> Self {
31        Self {
32            index: route_key.index,
33            addr: route_key.addr,
34            metric,
35            rtt: DEFAULT_RTT,
36        }
37    }
38    pub fn route_key(&self) -> RouteKey {
39        RouteKey {
40            index: self.index,
41            addr: self.addr,
42        }
43    }
44    pub fn sort_key(&self) -> RouteSortKey {
45        RouteSortKey {
46            metric: self.metric,
47            rtt: self.rtt,
48        }
49    }
50    pub fn is_direct(&self) -> bool {
51        self.metric == 0
52    }
53    pub fn is_relay(&self) -> bool {
54        self.metric > 0
55    }
56    pub fn rtt(&self) -> u32 {
57        self.rtt
58    }
59    pub fn metric(&self) -> u8 {
60        self.metric
61    }
62}
63
64impl From<(RouteKey, u8)> for Route {
65    fn from((key, metric): (RouteKey, u8)) -> Self {
66        Route::from_default_rt(key, metric)
67    }
68}
69
70pub(crate) type RouteTableInner<PeerID> =
71    Arc<DashMap<PeerID, (AtomicUsize, Vec<(Route, AtomicCell<Instant>)>)>>;
72pub struct RouteTable<PeerID> {
73    pub(crate) route_table: RouteTableInner<PeerID>,
74    route_key_table: Arc<DashMap<RouteKey, PeerID>>,
75    load_balance: LoadBalance,
76}
77impl<PeerID: Hash + Eq> Default for RouteTable<PeerID> {
78    fn default() -> Self {
79        Self {
80            route_table: Default::default(),
81            route_key_table: Default::default(),
82            load_balance: Default::default(),
83        }
84    }
85}
86impl<PeerID> Clone for RouteTable<PeerID> {
87    fn clone(&self) -> Self {
88        Self {
89            route_table: self.route_table.clone(),
90            route_key_table: self.route_key_table.clone(),
91            load_balance: self.load_balance,
92        }
93    }
94}
95impl<PeerID: Hash + Eq> RouteTable<PeerID> {
96    pub fn new(load_balance: LoadBalance) -> RouteTable<PeerID> {
97        Self {
98            route_table: Arc::new(DashMap::with_capacity(64)),
99            route_key_table: Arc::new(DashMap::with_capacity(64)),
100            load_balance,
101        }
102    }
103}
104impl<PeerID: Hash + Eq> RouteTable<PeerID> {
105    pub fn is_empty(&self) -> bool {
106        self.route_table.is_empty()
107    }
108    pub fn is_route_of_peer_id(&self, id: &PeerID, route_key: &RouteKey) -> bool {
109        if let Some(src) = self.route_key_table.get(route_key) {
110            return src.value() == id;
111        }
112        false
113    }
114    pub fn get_route_by_id(&self, id: &PeerID) -> io::Result<Route> {
115        if let Some(entry) = self.route_table.get(id) {
116            let (count, routes) = entry.value();
117            if LoadBalance::RoundRobin != self.load_balance {
118                if let Some((route, _)) = routes.first() {
119                    return Ok(*route);
120                }
121            } else {
122                let len = routes.len();
123                if len != 0 {
124                    let index = count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % len;
125                    return Ok(routes[index].0);
126                }
127            }
128        }
129        Err(io::Error::new(io::ErrorKind::NotFound, "route not found"))
130    }
131}
132impl<PeerID: Hash + Eq + Clone> RouteTable<PeerID> {
133    pub fn add_route_if_absent(&self, id: PeerID, route: Route) -> bool {
134        self.add_route_(id, route, true)
135    }
136    pub fn add_route<R: Into<Route>>(&self, id: PeerID, route: R) -> bool {
137        self.add_route_(id, route.into(), false)
138    }
139    /// Update the usage time of the route,
140    /// routes that have not received data for a long time will be excluded
141    pub fn update_read_time(&self, id: &PeerID, route_key: &RouteKey) -> bool {
142        if let Some(entry) = self.route_table.get(id) {
143            let (_, routes) = entry.value();
144            for (route, time) in routes {
145                if &route.route_key() == route_key {
146                    time.store(Instant::now());
147                    return true;
148                }
149            }
150        }
151        false
152    }
153    /// Remove specified route
154    pub fn remove_route(&self, id: &PeerID, route_key: &RouteKey) {
155        self.route_table.remove_if_mut(id, |_, (_, routes)| {
156            routes.retain(|(x, _)| &x.route_key() != route_key);
157            self.route_key_table.remove_if(route_key, |_, v| v == id);
158            routes.is_empty()
159        });
160    }
161    pub fn remove_all(&self, id: &PeerID) {
162        self.route_table.remove_if(id, |_, (_, routes)| {
163            for (route, _) in routes {
164                if route.is_direct() {
165                    self.route_key_table
166                        .remove_if(&route.route_key(), |_, v| v == id);
167                }
168            }
169            true
170        });
171    }
172    pub fn get_id_by_route_key(&self, route_key: &RouteKey) -> Option<PeerID> {
173        self.route_key_table
174            .get(route_key)
175            .map(|v| v.value().clone())
176    }
177    pub fn route(&self, id: &PeerID) -> Option<Vec<Route>> {
178        if let Some(entry) = self.route_table.get(id) {
179            let (_, routes) = entry.value();
180            Some(routes.iter().map(|(i, _)| *i).collect())
181        } else {
182            None
183        }
184    }
185    pub fn route_one(&self, id: &PeerID) -> Option<Route> {
186        if let Some(entry) = self.route_table.get(id) {
187            let (_, routes) = entry.value();
188            routes.first().map(|(i, _)| *i)
189        } else {
190            None
191        }
192    }
193    pub fn route_one_p2p(&self, id: &PeerID) -> Option<Route> {
194        if let Some(entry) = self.route_table.get(id) {
195            let (_, routes) = entry.value();
196            for (i, _) in routes {
197                if i.is_direct() {
198                    return Some(*i);
199                }
200            }
201        }
202        None
203    }
204    pub fn route_to_id(&self, route_key: &RouteKey) -> Option<PeerID> {
205        let table = self.route_table.iter();
206        for entry in table {
207            let (id, (_, routes)) = (entry.key(), entry.value());
208            for (route, _) in routes {
209                if &route.route_key() == route_key && route.is_direct() {
210                    return Some(id.clone());
211                }
212            }
213        }
214        None
215    }
216    pub fn need_punch(&self, id: &PeerID) -> bool {
217        if let Some(entry) = self.route_table.get(id) {
218            let (_, routes) = entry.value();
219            //p2p的通道数符合要求
220            return !routes.iter().any(|(k, _)| k.is_direct());
221        }
222        true
223    }
224    pub fn no_need_punch(&self, id: &PeerID) -> bool {
225        !self.need_punch(id)
226    }
227    pub fn p2p_num(&self, id: &PeerID) -> usize {
228        if let Some(entry) = self.route_table.get(id) {
229            let (_, routes) = entry.value();
230            routes.iter().filter(|(k, _)| k.is_direct()).count()
231        } else {
232            0
233        }
234    }
235    pub fn relay_num(&self, id: &PeerID) -> usize {
236        if let Some(entry) = self.route_table.get(id) {
237            let (_, routes) = entry.value();
238            routes.iter().filter(|(k, _)| k.is_relay()).count()
239        } else {
240            0
241        }
242    }
243    /// Return all routes
244    pub fn route_table(&self) -> Vec<(PeerID, Vec<Route>)> {
245        let table = self.route_table.iter();
246
247        table
248            .map(|entry| {
249                (
250                    entry.key().clone(),
251                    entry.value().1.iter().map(|(i, _)| *i).collect(),
252                )
253            })
254            .collect()
255    }
256    /// Return all P2P routes
257    pub fn route_table_p2p(&self) -> Vec<(PeerID, Route)> {
258        let table = self.route_table.iter();
259        let mut list = Vec::with_capacity(8);
260        for entry in table {
261            let (id, (_, routes)) = (entry.key(), entry.value());
262            for (route, _) in routes.iter() {
263                if route.is_direct() {
264                    list.push((id.clone(), *route));
265                    break;
266                }
267            }
268        }
269        list
270    }
271    /// Return to the first route
272    pub fn route_table_one(&self) -> Vec<(PeerID, Route)> {
273        let mut list = Vec::with_capacity(8);
274        let table = self.route_table.iter();
275        for entry in table {
276            let (id, (_, routes)) = (entry.key(), entry.value());
277            if let Some((route, _)) = routes.first() {
278                list.push((id.clone(), *route));
279            }
280        }
281        list
282    }
283    /// Return to `route_key` -> `Vec<PeerID>`,
284    /// where `vec[0]` is the owner of the route
285    pub fn route_key_table(&self) -> HashMap<RouteKey, Vec<PeerID>> {
286        let mut map: HashMap<RouteKey, Vec<PeerID>> = HashMap::new();
287        let table = self.route_table.iter();
288        for entry in table {
289            let (id, (_, routes)) = (entry.key(), entry.value());
290            for (route, _) in routes {
291                let is_p2p = route.is_direct();
292                map.entry(route.route_key())
293                    .and_modify(|list| {
294                        list.push(id.clone());
295                        if is_p2p {
296                            let last_index = list.len() - 1;
297                            list.swap(0, last_index);
298                        }
299                    })
300                    .or_insert_with(|| vec![id.clone()]);
301            }
302        }
303        map
304    }
305    pub fn route_table_ids(&self) -> Vec<PeerID> {
306        self.route_table.iter().map(|v| v.key().clone()).collect()
307    }
308    pub fn route_table_min_metric(&self) -> Vec<(PeerID, Route)> {
309        let mut list = Vec::with_capacity(8);
310        let table = self.route_table.iter();
311        for entry in table {
312            let (id, (_, routes)) = (entry.key(), entry.value());
313            if let Some((route, _)) = routes.iter().min_by_key(|(v, _)| v.metric) {
314                list.push((id.clone(), *route));
315            }
316        }
317        list
318    }
319    pub fn route_table_min_rtt(&self) -> Vec<(PeerID, Route)> {
320        let mut list = Vec::with_capacity(8);
321        let table = self.route_table.iter();
322        for entry in table {
323            let (id, (_, routes)) = (entry.key(), entry.value());
324            if let Some((route, _)) = routes.iter().min_by_key(|(v, _)| v.rtt) {
325                list.push((id.clone(), *route));
326            }
327        }
328        list
329    }
330
331    pub fn oldest_route(&self) -> Option<(PeerID, Route, Instant)> {
332        if self.route_table.is_empty() {
333            return None;
334        }
335        let mut option: Option<(PeerID, Route, Instant)> = None;
336        for entry in self.route_table.iter() {
337            let (peer_id, (_, routes)) = (entry.key(), entry.value());
338            for (route, time) in routes {
339                let instant = time.load();
340                if let Some((t_peer_id, t_route, t_instant)) = &mut option {
341                    if *t_instant > instant {
342                        *t_peer_id = peer_id.clone();
343                        *t_route = *route;
344                        *t_instant = instant;
345                    }
346                } else {
347                    option.replace((peer_id.clone(), *route, instant));
348                }
349            }
350        }
351        option
352    }
353}
354impl<PeerID: Hash + Eq + Clone> RouteTable<PeerID> {
355    fn add_route_(&self, id: PeerID, route: Route, only_if_absent: bool) -> bool {
356        let key = route.route_key();
357        if only_if_absent {
358            if let Some(entry) = self.route_table.get(&id) {
359                let (_, routes) = entry.value();
360                for (x, time) in routes {
361                    if x.route_key() == key {
362                        time.store(Instant::now());
363                        return true;
364                    }
365                }
366            }
367        }
368        let mut route_table = self
369            .route_table
370            .entry(id)
371            .or_insert_with(|| (AtomicUsize::new(0), Vec::with_capacity(4)));
372        let (peer_id, (_, list)) = route_table.pair_mut();
373        let mut exist = false;
374        for (x, time) in list.iter_mut() {
375            if x.metric < route.metric && self.load_balance != LoadBalance::LowestLatency {
376                //非优先延迟的情况下 不能比当前的路径更长
377                return false;
378            }
379            if x.route_key() == key {
380                time.store(Instant::now());
381                if only_if_absent {
382                    return true;
383                }
384                x.metric = route.metric;
385                x.rtt = route.rtt;
386                exist = true;
387                break;
388            }
389        }
390        if exist {
391            if self.load_balance != LoadBalance::MostRecent {
392                list.sort_by_key(|(k, _)| k.rtt);
393            }
394        } else {
395            if self.load_balance != LoadBalance::LowestLatency && route.is_direct() {
396                //非优先延迟的情况下 添加了直连的则排除非直连的
397                list.retain(|(k, _)| k.is_direct());
398            };
399            if route.is_direct() {
400                self.route_key_table
401                    .insert(route.route_key(), peer_id.clone());
402            }
403            if self.load_balance == LoadBalance::MostRecent {
404                list.insert(0, (route, AtomicCell::new(Instant::now())));
405            } else {
406                list.sort_by_key(|(k, _)| k.rtt);
407                list.push((route, AtomicCell::new(Instant::now())));
408            }
409        }
410        true
411    }
412}