Skip to main content

slim_datapath/tables/
connection_table.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use arc_swap::ArcSwap;
7use parking_lot::Mutex;
8
9use super::pool::Pool;
10
11/// A connection table that provides lock-free reads via `ArcSwap`.
12///
13/// Concurrency design:
14///   - Hot read path (`get`, `for_each`, `len`, `is_empty`): lock-free via
15///     `ArcSwap::load()`.  A single atomic pointer load gives callers an
16///     immutable snapshot of the pool.
17///   - Write path (`insert`, `insert_at`, `remove`): serialised by
18///     `write_lock`.  Writers load the current snapshot, clone the pool,
19///     apply the change, then atomically store the new `Arc`.  No write
20///     lock is ever held during reads.
21///
22/// Connections are stored as `Arc<T>` so that pool clones (made on every
23/// write) only bump refcounts rather than deep-copying each connection.
24/// `get()` returns `Option<Arc<T>>`; callers can auto-deref into `T`.
25#[derive(Debug)]
26pub struct ConnectionTable<T>
27where
28    T: Clone,
29{
30    pool: ArcSwap<Pool<Arc<T>>>,
31    write_lock: Mutex<()>,
32}
33
34impl<T> ConnectionTable<T>
35where
36    T: Clone,
37{
38    /// Create a new connection table with a given capacity
39    pub fn with_capacity(capacity: usize) -> Self {
40        ConnectionTable {
41            pool: ArcSwap::from_pointee(Pool::with_capacity(capacity)),
42            write_lock: Mutex::new(()),
43        }
44    }
45
46    /// Add a connection to the table, returning its stable ID.
47    pub fn insert(&self, connection: T) -> u64 {
48        let _guard = self.write_lock.lock();
49        let mut pool = (**self.pool.load()).clone();
50        let id = pool.insert(Arc::new(connection));
51        self.pool.store(Arc::new(pool));
52        id
53    }
54
55    /// Add a connection at a specific ID.
56    pub fn insert_at(&self, connection: T, id: u64) {
57        let _guard = self.write_lock.lock();
58        let mut pool = (**self.pool.load()).clone();
59        pool.insert_at(Arc::new(connection), id);
60        self.pool.store(Arc::new(pool));
61    }
62
63    /// Remove the connection with the given ID.
64    pub fn remove(&self, id: u64) -> bool {
65        let _guard = self.write_lock.lock();
66        let mut pool = (**self.pool.load()).clone();
67        let existed = pool.remove(id);
68        self.pool.store(Arc::new(pool));
69        existed
70    }
71
72    /// Number of connections in the table.
73    #[allow(dead_code)]
74    pub fn len(&self) -> usize {
75        self.pool.load().len()
76    }
77
78    /// Current allocated capacity.
79    #[allow(dead_code)]
80    pub fn capacity(&self) -> usize {
81        self.pool.load().capacity()
82    }
83
84    /// Returns `true` if the table contains no connections.
85    #[allow(dead_code)]
86    pub fn is_empty(&self) -> bool {
87        self.pool.load().is_empty()
88    }
89
90    /// Look up a connection by its stable ID.
91    pub fn get(&self, id: u64) -> Option<Arc<T>> {
92        self.pool.load().get(id).cloned()
93    }
94
95    /// Call `f(id, connection)` for every live connection.
96    pub fn for_each<F>(&self, mut f: F)
97    where
98        F: FnMut(u64, Arc<T>),
99    {
100        let pool = self.pool.load();
101        for (id, conn) in pool.iter_with_ids() {
102            f(id, conn.clone());
103        }
104    }
105
106    /// Mutate a connection in-place (copy-on-write).
107    ///
108    /// Clones the pool, applies `f` to the entry, and replaces the pool atomically.
109    /// Returns `true` if the connection was found and updated.
110    pub fn update<F>(&self, id: u64, f: F) -> bool
111    where
112        F: FnOnce(&mut T),
113    {
114        let _guard = self.write_lock.lock();
115        let mut pool = (**self.pool.load()).clone();
116        if let Some(entry) = pool.get_mut(id) {
117            let conn = Arc::make_mut(entry);
118            f(conn);
119            self.pool.store(Arc::new(pool));
120            true
121        } else {
122            false
123        }
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130
131    #[test]
132    fn test_connection_table() {
133        let table = ConnectionTable::with_capacity(10);
134        assert_eq!(table.len(), 0);
135        assert!(table.capacity() >= 10);
136        assert!(table.is_empty());
137
138        let connection = 10;
139        let id = table.insert(connection);
140        assert_eq!(table.len(), 1);
141        assert!(!table.is_empty());
142
143        // get element from the table
144        let connection_ret = table.get(id).unwrap();
145        assert_eq!(*connection_ret, connection);
146
147        // remove element from the table
148        assert!(table.remove(id));
149
150        // removing again returns false
151        assert!(!table.remove(id));
152
153        // element is no longer accessible
154        assert!(table.get(id).is_none());
155    }
156}