slim_datapath/tables/
connection_table.rs1use std::sync::Arc;
5
6use arc_swap::ArcSwap;
7use parking_lot::Mutex;
8
9use super::pool::Pool;
10
11#[derive(Debug)]
26pub struct ConnectionTable<T>
27where
28 T: Clone,
29{
30 pool: ArcSwap<Pool<Arc<T>>>,
31 write_lock: Mutex<()>,
32}
33
34impl<T> ConnectionTable<T>
35where
36 T: Clone,
37{
38 pub fn with_capacity(capacity: usize) -> Self {
40 ConnectionTable {
41 pool: ArcSwap::from_pointee(Pool::with_capacity(capacity)),
42 write_lock: Mutex::new(()),
43 }
44 }
45
46 pub fn insert(&self, connection: T) -> u64 {
48 let _guard = self.write_lock.lock();
49 let mut pool = (**self.pool.load()).clone();
50 let id = pool.insert(Arc::new(connection));
51 self.pool.store(Arc::new(pool));
52 id
53 }
54
55 pub fn insert_at(&self, connection: T, id: u64) {
57 let _guard = self.write_lock.lock();
58 let mut pool = (**self.pool.load()).clone();
59 pool.insert_at(Arc::new(connection), id);
60 self.pool.store(Arc::new(pool));
61 }
62
63 pub fn remove(&self, id: u64) -> bool {
65 let _guard = self.write_lock.lock();
66 let mut pool = (**self.pool.load()).clone();
67 let existed = pool.remove(id);
68 self.pool.store(Arc::new(pool));
69 existed
70 }
71
72 #[allow(dead_code)]
74 pub fn len(&self) -> usize {
75 self.pool.load().len()
76 }
77
78 #[allow(dead_code)]
80 pub fn capacity(&self) -> usize {
81 self.pool.load().capacity()
82 }
83
84 #[allow(dead_code)]
86 pub fn is_empty(&self) -> bool {
87 self.pool.load().is_empty()
88 }
89
90 pub fn get(&self, id: u64) -> Option<Arc<T>> {
92 self.pool.load().get(id).cloned()
93 }
94
95 pub fn for_each<F>(&self, mut f: F)
97 where
98 F: FnMut(u64, Arc<T>),
99 {
100 let pool = self.pool.load();
101 for (id, conn) in pool.iter_with_ids() {
102 f(id, conn.clone());
103 }
104 }
105
106 pub fn update<F>(&self, id: u64, f: F) -> bool
111 where
112 F: FnOnce(&mut T),
113 {
114 let _guard = self.write_lock.lock();
115 let mut pool = (**self.pool.load()).clone();
116 if let Some(entry) = pool.get_mut(id) {
117 let conn = Arc::make_mut(entry);
118 f(conn);
119 self.pool.store(Arc::new(pool));
120 true
121 } else {
122 false
123 }
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130
131 #[test]
132 fn test_connection_table() {
133 let table = ConnectionTable::with_capacity(10);
134 assert_eq!(table.len(), 0);
135 assert!(table.capacity() >= 10);
136 assert!(table.is_empty());
137
138 let connection = 10;
139 let id = table.insert(connection);
140 assert_eq!(table.len(), 1);
141 assert!(!table.is_empty());
142
143 let connection_ret = table.get(id).unwrap();
145 assert_eq!(*connection_ret, connection);
146
147 assert!(table.remove(id));
149
150 assert!(!table.remove(id));
152
153 assert!(table.get(id).is_none());
155 }
156}