sosistab/listener/
table.rs1use std::{collections::BTreeMap, net::SocketAddr, sync::Arc, time::Instant};
2
3use crate::{buffer::Buff, SVec, SessionBack};
4
5use parking_lot::RwLock;
6use rand::Rng;
7use rustc_hash::FxHashMap;
8
9pub struct ShardedAddrs {
10 map: FxHashMap<u8, (SocketAddr, Instant)>,
12}
13
14impl ShardedAddrs {
15 pub fn new(initial_shard: u8, initial_addr: SocketAddr) -> Self {
17 let mut map = FxHashMap::default();
18 map.insert(initial_shard, (initial_addr, Instant::now()));
19 Self { map }
20 }
21
22 pub fn get_addr(&self) -> SocketAddr {
24 let recently_used_shards = self
26 .map
27 .iter()
28 .filter(|(_, (_, usage))| usage.elapsed().as_millis() < 10000)
29 .map(|f| f.1 .0)
30 .collect::<SVec<_>>();
31 if recently_used_shards.is_empty() {
33 let (most_recent, _) = self
34 .map
35 .values()
36 .max_by_key(|v| v.1)
37 .copied()
38 .expect("no shards at all");
39 tracing::trace!("sending down most recent {}", most_recent);
40 most_recent
41 } else {
42 let random =
43 recently_used_shards[rand::thread_rng().gen_range(0, recently_used_shards.len())];
44 tracing::trace!("sending down random {}", random);
45 random
46 }
47 }
48
49 pub fn insert_addr(&mut self, index: u8, addr: SocketAddr) -> Option<SocketAddr> {
51 self.map.insert(index, (addr, Instant::now())).map(|v| v.0)
52 }
53}
54
55struct SessEntry {
56 session_back: Arc<SessionBack>,
57 addrs: Arc<RwLock<ShardedAddrs>>,
58}
59
60#[derive(Default, Clone)]
61pub(crate) struct SessionTable {
62 token_to_sess: Arc<RwLock<BTreeMap<Buff, SessEntry>>>,
63 addr_to_token: Arc<RwLock<BTreeMap<SocketAddr, Buff>>>,
64}
65
66impl SessionTable {
67 pub fn rebind(&self, addr: SocketAddr, shard_id: u8, token: Buff) -> bool {
68 let token_to_sess = self.token_to_sess.write();
69 let mut addr_to_token = self.addr_to_token.write();
70 if let Some(entry) = token_to_sess.get(&token) {
71 let old = entry.addrs.write().insert_addr(shard_id, addr);
72 tracing::trace!("binding {}=>{}", shard_id, addr);
73 if let Some(old) = old {
74 addr_to_token.remove(&old);
75 }
76 addr_to_token.insert(addr, token);
77 true
78 } else {
79 tracing::debug!(
80 "[{:p}] token {:?} not in {:?}",
81 self,
82 token,
83 token_to_sess.keys().collect::<Vec<_>>()
84 );
85 false
86 }
87 }
88 pub fn delete(&self, token: Buff) {
89 tracing::debug!("removing token {:?}", token);
90 let mut token_to_sess = self.token_to_sess.write();
91 let mut addr_to_token = self.addr_to_token.write();
92 if let Some(entry) = token_to_sess.remove(&token) {
93 for (addr, _) in entry.addrs.read().map.values() {
94 addr_to_token.remove(addr);
95 }
96 }
97 }
98
99 pub fn lookup(&self, addr: SocketAddr) -> Option<Arc<SessionBack>> {
100 let token_to_sess = self.token_to_sess.read();
101 let addr_to_token = self.addr_to_token.read();
102 let token = addr_to_token.get(&addr)?;
103 let entry = token_to_sess.get(token)?;
104 Some(entry.session_back.clone())
105 }
106
107 pub fn new_sess(
108 &self,
109 token: Buff,
110 session_back: Arc<SessionBack>,
111 locked_addrs: Arc<RwLock<ShardedAddrs>>,
112 ) {
113 let mut token_to_sess = self.token_to_sess.write();
114 let entry = SessEntry {
115 session_back,
116 addrs: locked_addrs,
117 };
118 token_to_sess.insert(token.clone(), entry);
119 tracing::debug!(
120 "[{:p}] token {:?} now in {:?}",
121 self,
122 token,
123 token_to_sess.keys().collect::<Vec<_>>()
124 );
125 }
126}