Skip to main content

reifydb_core/value/column/pool/
allocator.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{
5	collections::HashMap,
6	sync::{Arc, Mutex},
7};
8
9use crate::value::column::pool::{capacity::ContainerCapacity, stats::PoolStats};
10
11/// Core trait for container pooling operations
12pub trait PoolAllocator<C> {
13	/// Get a container with at least the specified capacity
14	fn acquire(&self, capacity: usize) -> C;
15
16	/// Return a container to the pool for reuse
17	fn release(&self, container: C);
18
19	/// Clear all pooled containers
20	fn clear(&self);
21
22	/// Get statistics about pool usage
23	fn stats(&self) -> PoolStats;
24}
25
26/// Generic pool implementation for any container type.
27///
28/// Uses std::sync::Mutex intentionally because:
29/// 1. The PoolAllocator trait methods (acquire/release) are synchronous
30/// 2. Lock hold times are very brief (just push/pop operations)
31/// 3. Thread-local pools are the primary access pattern, reducing contention
32/// 4. Async mutex would require trait changes and add unnecessary overhead
33pub struct StdPoolAllocator<C> {
34	pools: Arc<Mutex<HashMap<usize, Vec<C>>>>,
35	stats: Arc<Mutex<PoolStats>>,
36	max_pool_size: usize,
37}
38
39impl<C> StdPoolAllocator<C> {
40	pub(crate) fn new(max_pool_size: usize) -> Self {
41		let result = Self {
42			pools: Arc::new(Mutex::new(HashMap::new())),
43			stats: Arc::new(Mutex::new(PoolStats::default())),
44			max_pool_size,
45		};
46		result
47	}
48
49	/// Create a new container with the specified capacity
50	pub(crate) fn create_new(&self, capacity: usize) -> C
51	where
52		C: ContainerCapacity,
53	{
54		C::with_capacity(capacity)
55	}
56
57	/// Get the capacity bucket for a given capacity (rounds up to nearest
58	/// power of 2)
59	pub(crate) fn capacity_bucket(capacity: usize) -> usize {
60		if capacity == 0 {
61			return 8; // minimum bucket size
62		}
63		capacity.next_power_of_two().max(8)
64	}
65}
66
67impl<C> PoolAllocator<C> for StdPoolAllocator<C>
68where
69	C: ContainerCapacity,
70{
71	fn acquire(&self, capacity: usize) -> C {
72		let bucket = Self::capacity_bucket(capacity);
73
74		// Try to get from pool first
75		if let Ok(mut pools) = self.pools.lock() {
76			if let Some(pool) = pools.get_mut(&bucket) {
77				if let Some(mut container) = pool.pop() {
78					container.clear();
79
80					// Update stats
81					if let Ok(mut stats) = self.stats.lock() {
82						stats.total_acquired += 1;
83					}
84
85					return container;
86				}
87			}
88		}
89
90		// Create new container if pool is empty
91		let container = self.create_new(bucket);
92
93		// Update stats
94		if let Ok(mut stats) = self.stats.lock() {
95			stats.total_acquired += 1;
96		}
97
98		container
99	}
100
101	fn release(&self, container: C) {
102		let capacity = container.capacity();
103		let bucket = Self::capacity_bucket(capacity);
104
105		if let Ok(mut pools) = self.pools.lock() {
106			let pool = pools.entry(bucket).or_insert_with(Vec::new);
107
108			// Only keep containers if we haven't exceeded max pool
109			// size
110			if pool.len() < self.max_pool_size {
111				pool.push(container);
112			}
113		}
114
115		// Update stats
116		if let Ok(mut stats) = self.stats.lock() {
117			stats.total_released += 1;
118		}
119	}
120
121	fn clear(&self) {
122		if let Ok(mut pools) = self.pools.lock() {
123			pools.clear();
124		}
125
126		if let Ok(mut stats) = self.stats.lock() {
127			*stats = PoolStats::default();
128		}
129	}
130
131	fn stats(&self) -> PoolStats {
132		if let Ok(pools) = self.pools.lock() {
133			let available = pools.values().map(|pool| pool.len()).sum();
134
135			if let Ok(stats) = self.stats.lock() {
136				return PoolStats {
137					available,
138					total_acquired: stats.total_acquired,
139					total_released: stats.total_released,
140				};
141			}
142		}
143
144		PoolStats::default()
145	}
146}
147
148#[cfg(test)]
149pub mod tests {
150	use reifydb_type::value::container::{bool::BoolContainer, number::NumberContainer, utf8::Utf8Container};
151
152	use crate::value::column::pool::allocator::{PoolAllocator, StdPoolAllocator};
153
154	#[test]
155	fn test_allocate_bool() {
156		let pools = StdPoolAllocator::<BoolContainer>::new(4);
157
158		// Acquire a container
159		let container1 = pools.acquire(10);
160		assert!(container1.capacity() >= 10);
161
162		// Release it back to pool
163		pools.release(container1);
164
165		// Stats should reflect the operation
166		let stats = pools.stats();
167		assert_eq!(stats.total_acquired, 1);
168		assert_eq!(stats.total_released, 1);
169		assert_eq!(stats.available, 1);
170
171		// Acquire again should reuse the container
172		let container2 = pools.acquire(10);
173		let stats2 = pools.stats();
174		assert_eq!(stats2.total_acquired, 2);
175		assert_eq!(stats2.available, 0);
176
177		pools.release(container2);
178	}
179
180	#[test]
181	fn test_allocate_string() {
182		let pools = StdPoolAllocator::<Utf8Container>::new(4);
183
184		let container = pools.acquire(20);
185		assert!(container.capacity() >= 20);
186
187		pools.release(container);
188
189		let stats = pools.stats();
190		assert_eq!(stats.total_acquired, 1);
191		assert_eq!(stats.total_released, 1);
192		assert_eq!(stats.available, 1);
193	}
194
195	#[test]
196	fn test_allocate_number() {
197		let pools = StdPoolAllocator::<NumberContainer<i32>>::new(4);
198
199		let container = pools.acquire(50);
200		assert!(container.capacity() >= 50);
201
202		pools.release(container);
203
204		let stats = pools.stats();
205		assert_eq!(stats.total_acquired, 1);
206		assert_eq!(stats.total_released, 1);
207		assert_eq!(stats.available, 1);
208	}
209
210	#[test]
211	fn test_pool_max_size() {
212		let pools = StdPoolAllocator::<BoolContainer>::new(2);
213
214		// Fill the pool beyond its max size
215		let c1 = pools.acquire(10);
216		let c2 = pools.acquire(10);
217		let c3 = pools.acquire(10);
218
219		pools.release(c1);
220		pools.release(c2);
221		pools.release(c3); // This should be discarded due to max size
222
223		let stats = pools.stats();
224		assert_eq!(stats.available, 2); // Only 2 should be kept due to max_pool_size
225	}
226
227	#[test]
228	fn test_capacity_bucket() {
229		assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(0), 8);
230		assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(1), 8);
231		assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(8), 8);
232		assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(9), 16);
233		assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(16), 16);
234		assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(17), 32);
235		assert_eq!(StdPoolAllocator::<BoolContainer>::capacity_bucket(100), 128);
236	}
237}