agp_datapath/tables/
connection_table.rs

1// Copyright AGNTCY Contributors (https://github.com/agntcy)
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use parking_lot::RwLock;
7
8use super::pool::Pool;
9
10#[derive(Debug)]
11pub struct ConnectionTable<T>
12where
13    T: Default + Clone,
14{
15    /// Connection pool
16    pool: RwLock<Pool<Arc<T>>>,
17}
18
19impl<T> ConnectionTable<T>
20where
21    T: Default + Clone,
22{
23    /// Create a new connection table with a given capacity
24    pub fn with_capacity(capacity: usize) -> Self {
25        ConnectionTable {
26            pool: RwLock::new(Pool::with_capacity(capacity)),
27        }
28    }
29
30    /// Add a connection to the table. This cannot fail
31    pub fn insert(&self, connection: T) -> usize {
32        // Get a write lock on the pool
33        let mut pool = self.pool.write();
34        pool.insert(Arc::new(connection))
35    }
36
37    /// Add a connection to the table on a give index
38    pub fn insert_at(&self, connection: T, index: usize) -> bool {
39        // Get a write lock on the pool
40        let mut pool = self.pool.write();
41        pool.insert_at(Arc::new(connection), index)
42    }
43
44    /// remove a connection from the table
45    pub fn remove(&self, index: usize) -> bool {
46        // Get a write lock on the pool
47        let mut pool = self.pool.write();
48        pool.remove(index)
49    }
50
51    /// Get the number of connections in the table
52    #[allow(dead_code)]
53    pub fn len(&self) -> usize {
54        // Get a read lock on the pool
55        let pool = self.pool.read();
56        pool.len()
57    }
58
59    /// Get the capacity of the table
60    #[allow(dead_code)]
61    pub fn capacity(&self) -> usize {
62        // Get a read lock on the pool
63        let pool = self.pool.read();
64        pool.capacity()
65    }
66
67    /// Check if the table is empty
68    #[allow(dead_code)]
69    pub fn is_empty(&self) -> bool {
70        let pool = self.pool.read();
71        pool.is_empty()
72    }
73
74    /// Get a connection from the table
75    pub fn get(&self, index: usize) -> Option<Arc<T>> {
76        // get a read lock on the pool
77        let pool = self.pool.read();
78        pool.get(index).cloned()
79    }
80}
81
82// tests
83#[cfg(test)]
84mod tests {
85    use super::*;
86
87    #[test]
88    fn test_connection_table() {
89        let table = ConnectionTable::with_capacity(10);
90        assert_eq!(table.len(), 0);
91        assert_eq!(table.capacity(), 10);
92        assert!(table.is_empty());
93
94        let connection = 10;
95        let index = table.insert(connection);
96        assert_eq!(table.len(), 1);
97        assert!(!table.is_empty());
98
99        // get element from the table
100        let connection_ret = table.get(index).unwrap();
101        assert_eq!(*connection_ret, connection);
102
103        // remove element from the table
104        assert!(table.remove(index));
105
106        // remove an element that does not exist
107        assert!(!table.remove(index));
108    }
109}