sosistab/listener/
table.rs

1use std::{collections::BTreeMap, net::SocketAddr, sync::Arc, time::Instant};
2
3use crate::{buffer::Buff, SVec, SessionBack};
4
5use parking_lot::RwLock;
6use rand::Rng;
7use rustc_hash::FxHashMap;
8
9pub struct ShardedAddrs {
10    // maps shard ID to socketaddr and last update time
11    map: FxHashMap<u8, (SocketAddr, Instant)>,
12}
13
14impl ShardedAddrs {
15    /// Creates a new table of shard addresses.
16    pub fn new(initial_shard: u8, initial_addr: SocketAddr) -> Self {
17        let mut map = FxHashMap::default();
18        map.insert(initial_shard, (initial_addr, Instant::now()));
19        Self { map }
20    }
21
22    /// Gets the most appropriate address to send a packet down.
23    pub fn get_addr(&self) -> SocketAddr {
24        // svec to prevent allocating in such an extremely hot path
25        let recently_used_shards = self
26            .map
27            .iter()
28            .filter(|(_, (_, usage))| usage.elapsed().as_millis() < 10000)
29            .map(|f| f.1 .0)
30            .collect::<SVec<_>>();
31        // if no recently used, then push the most recently used one
32        if recently_used_shards.is_empty() {
33            let (most_recent, _) = self
34                .map
35                .values()
36                .max_by_key(|v| v.1)
37                .copied()
38                .expect("no shards at all");
39            tracing::trace!("sending down most recent {}", most_recent);
40            most_recent
41        } else {
42            let random =
43                recently_used_shards[rand::thread_rng().gen_range(0, recently_used_shards.len())];
44            tracing::trace!("sending down random {}", random);
45            random
46        }
47    }
48
49    /// Sets an index to a particular address
50    pub fn insert_addr(&mut self, index: u8, addr: SocketAddr) -> Option<SocketAddr> {
51        self.map.insert(index, (addr, Instant::now())).map(|v| v.0)
52    }
53}
54
55struct SessEntry {
56    session_back: Arc<SessionBack>,
57    addrs: Arc<RwLock<ShardedAddrs>>,
58}
59
60#[derive(Default, Clone)]
61pub(crate) struct SessionTable {
62    token_to_sess: Arc<RwLock<BTreeMap<Buff, SessEntry>>>,
63    addr_to_token: Arc<RwLock<BTreeMap<SocketAddr, Buff>>>,
64}
65
66impl SessionTable {
67    pub fn rebind(&self, addr: SocketAddr, shard_id: u8, token: Buff) -> bool {
68        let token_to_sess = self.token_to_sess.write();
69        let mut addr_to_token = self.addr_to_token.write();
70        if let Some(entry) = token_to_sess.get(&token) {
71            let old = entry.addrs.write().insert_addr(shard_id, addr);
72            tracing::trace!("binding {}=>{}", shard_id, addr);
73            if let Some(old) = old {
74                addr_to_token.remove(&old);
75            }
76            addr_to_token.insert(addr, token);
77            true
78        } else {
79            tracing::debug!(
80                "[{:p}] token {:?} not in {:?}",
81                self,
82                token,
83                token_to_sess.keys().collect::<Vec<_>>()
84            );
85            false
86        }
87    }
88    pub fn delete(&self, token: Buff) {
89        tracing::debug!("removing token {:?}", token);
90        let mut token_to_sess = self.token_to_sess.write();
91        let mut addr_to_token = self.addr_to_token.write();
92        if let Some(entry) = token_to_sess.remove(&token) {
93            for (addr, _) in entry.addrs.read().map.values() {
94                addr_to_token.remove(addr);
95            }
96        }
97    }
98
99    pub fn lookup(&self, addr: SocketAddr) -> Option<Arc<SessionBack>> {
100        let token_to_sess = self.token_to_sess.read();
101        let addr_to_token = self.addr_to_token.read();
102        let token = addr_to_token.get(&addr)?;
103        let entry = token_to_sess.get(token)?;
104        Some(entry.session_back.clone())
105    }
106
107    pub fn new_sess(
108        &self,
109        token: Buff,
110        session_back: Arc<SessionBack>,
111        locked_addrs: Arc<RwLock<ShardedAddrs>>,
112    ) {
113        let mut token_to_sess = self.token_to_sess.write();
114        let entry = SessEntry {
115            session_back,
116            addrs: locked_addrs,
117        };
118        token_to_sess.insert(token.clone(), entry);
119        tracing::debug!(
120            "[{:p}] token {:?} now in {:?}",
121            self,
122            token,
123            token_to_sess.keys().collect::<Vec<_>>()
124        );
125    }
126}