oxirs_stream/performance_optimizer/
memory.rs1use super::config::MemoryPoolConfig;
7use anyhow::Result;
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::alloc::{alloc, dealloc, Layout};
11use std::collections::VecDeque;
12use std::ptr::NonNull;
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::Arc;
15use std::time::Instant;
16use tracing::{debug, info};
17
18pub struct MemoryPool {
20 config: MemoryPoolConfig,
21 available_blocks: Arc<RwLock<VecDeque<MemoryBlock>>>,
22 stats: Arc<MemoryPoolStats>,
23 total_allocated: AtomicUsize,
24 peak_allocated: AtomicUsize,
25 last_compaction: Arc<RwLock<Instant>>,
26}
27
28#[derive(Debug)]
30pub struct MemoryBlock {
31 ptr: NonNull<u8>,
32 size: usize,
33 layout: Layout,
34 allocated_at: Instant,
35}
36
37unsafe impl Send for MemoryBlock {}
40unsafe impl Sync for MemoryBlock {}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MemoryPoolStats {
45 pub total_allocated: usize,
47 pub total_freed: usize,
49 pub current_allocated: usize,
51 pub peak_allocated: usize,
53 pub total_bytes_allocated: usize,
55 pub total_bytes_freed: usize,
57 pub current_bytes_allocated: usize,
59 pub peak_bytes_allocated: usize,
61 pub pool_hits: usize,
63 pub pool_misses: usize,
65 pub fragmentation_ratio: f64,
67 pub average_allocation_size: f64,
69 pub last_compaction: Option<u64>,
71}
72
73impl Default for MemoryPoolStats {
74 fn default() -> Self {
75 Self {
76 total_allocated: 0,
77 total_freed: 0,
78 current_allocated: 0,
79 peak_allocated: 0,
80 total_bytes_allocated: 0,
81 total_bytes_freed: 0,
82 current_bytes_allocated: 0,
83 peak_bytes_allocated: 0,
84 pool_hits: 0,
85 pool_misses: 0,
86 fragmentation_ratio: 0.0,
87 average_allocation_size: 0.0,
88 last_compaction: None,
89 }
90 }
91}
92
93impl MemoryPool {
94 pub fn new(config: MemoryPoolConfig) -> Self {
96 Self {
97 config,
98 available_blocks: Arc::new(RwLock::new(VecDeque::new())),
99 stats: Arc::new(MemoryPoolStats::default()),
100 total_allocated: AtomicUsize::new(0),
101 peak_allocated: AtomicUsize::new(0),
102 last_compaction: Arc::new(RwLock::new(Instant::now())),
103 }
104 }
105
106 pub fn allocate(&self, size: usize) -> Result<MemoryHandle> {
108 let layout = Layout::from_size_align(size, std::mem::align_of::<u8>())
109 .map_err(|e| anyhow::anyhow!("Invalid layout: {}", e))?;
110
111 if let Some(block) = self.try_reuse_block(size) {
113 self.update_stats_on_hit();
114 return Ok(MemoryHandle {
115 block,
116 pool: self.available_blocks.clone(),
117 });
118 }
119
120 let ptr = unsafe { alloc(layout) };
122 if ptr.is_null() {
123 return Err(anyhow::anyhow!("Failed to allocate memory"));
124 }
125
126 let block = MemoryBlock {
127 ptr: NonNull::new(ptr).unwrap(),
128 size,
129 layout,
130 allocated_at: Instant::now(),
131 };
132
133 self.update_stats_on_miss(size);
134
135 Ok(MemoryHandle {
136 block,
137 pool: self.available_blocks.clone(),
138 })
139 }
140
141 fn try_reuse_block(&self, size: usize) -> Option<MemoryBlock> {
143 let mut available = self.available_blocks.write();
144
145 for (i, block) in available.iter().enumerate() {
147 if block.size >= size {
148 return available.remove(i);
149 }
150 }
151
152 None
153 }
154
155 fn update_stats_on_hit(&self) {
157 debug!("Memory pool hit");
160 }
161
162 fn update_stats_on_miss(&self, size: usize) {
164 let current = self.total_allocated.fetch_add(1, Ordering::Relaxed) + 1;
165 let peak = self.peak_allocated.load(Ordering::Relaxed);
166
167 if current > peak {
168 self.peak_allocated.store(current, Ordering::Relaxed);
169 }
170
171 debug!("Memory pool miss, allocated block of size: {}", size);
172 }
173
174 pub fn deallocate(&self, block: MemoryBlock) {
176 let mut available = self.available_blocks.write();
177
178 if available.len() < self.config.max_size / block.size {
180 available.push_back(block);
181 debug!("Returned block to pool");
182 } else {
183 unsafe {
185 dealloc(block.ptr.as_ptr(), block.layout);
186 }
187 debug!("Deallocated block (pool full)");
188 }
189
190 self.total_allocated.fetch_sub(1, Ordering::Relaxed);
191 }
192
193 pub fn stats(&self) -> MemoryPoolStats {
195 let available = self.available_blocks.read();
196 let current_allocated = self.total_allocated.load(Ordering::Relaxed);
197 let peak_allocated = self.peak_allocated.load(Ordering::Relaxed);
198
199 MemoryPoolStats {
200 total_allocated: current_allocated + available.len(),
201 total_freed: 0, current_allocated,
203 peak_allocated,
204 total_bytes_allocated: 0, total_bytes_freed: 0,
206 current_bytes_allocated: 0,
207 peak_bytes_allocated: 0,
208 pool_hits: 0,
209 pool_misses: 0,
210 fragmentation_ratio: self.calculate_fragmentation(),
211 average_allocation_size: 0.0,
212 last_compaction: Some(
213 std::time::SystemTime::now()
214 .duration_since(std::time::UNIX_EPOCH)
215 .unwrap()
216 .as_secs(),
217 ),
218 }
219 }
220
221 fn calculate_fragmentation(&self) -> f64 {
223 let available = self.available_blocks.read();
224 if available.is_empty() {
225 return 0.0;
226 }
227
228 let total_size: usize = available.iter().map(|b| b.size).sum();
229 let largest_block = available.iter().map(|b| b.size).max().unwrap_or(0);
230
231 if total_size == 0 {
232 0.0
233 } else {
234 1.0 - (largest_block as f64 / total_size as f64)
235 }
236 }
237
238 pub fn compact(&self) -> Result<()> {
240 let mut available = self.available_blocks.write();
241 let mut last_compaction = self.last_compaction.write();
242
243 let mut blocks: Vec<_> = available.drain(..).collect();
245 blocks.sort_by_key(|b| b.size);
246
247 let now = Instant::now();
249 let threshold = std::time::Duration::from_secs(300); blocks.retain(|block| now.duration_since(block.allocated_at) < threshold);
252
253 available.extend(blocks);
255 *last_compaction = now;
256
257 info!(
258 "Memory pool compacted, {} blocks remaining",
259 available.len()
260 );
261 Ok(())
262 }
263
264 pub fn needs_compaction(&self) -> bool {
266 let last_compaction = self.last_compaction.read();
267 let elapsed = last_compaction.elapsed();
268
269 elapsed.as_secs() > self.config.compaction_interval
270 }
271
272 pub fn pool_size(&self) -> usize {
274 self.available_blocks.read().len()
275 }
276
277 pub fn total_allocated(&self) -> usize {
279 self.total_allocated.load(Ordering::Relaxed)
280 }
281}
282
283pub struct MemoryHandle {
285 block: MemoryBlock,
286 pool: Arc<RwLock<VecDeque<MemoryBlock>>>,
287}
288
289impl MemoryHandle {
290 pub fn as_ptr(&self) -> *mut u8 {
292 self.block.ptr.as_ptr()
293 }
294
295 pub fn size(&self) -> usize {
297 self.block.size
298 }
299
300 pub unsafe fn as_slice(&self) -> &[u8] {
306 std::slice::from_raw_parts(self.block.ptr.as_ptr(), self.block.size)
307 }
308
309 pub unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
315 std::slice::from_raw_parts_mut(self.block.ptr.as_ptr(), self.block.size)
316 }
317}
318
319impl Drop for MemoryHandle {
320 fn drop(&mut self) {
321 let block = MemoryBlock {
323 ptr: self.block.ptr,
324 size: self.block.size,
325 layout: self.block.layout,
326 allocated_at: self.block.allocated_at,
327 };
328
329 let mut available = self.pool.write();
330 available.push_back(block);
331 }
332}
333
334unsafe impl Send for MemoryHandle {}
335unsafe impl Sync for MemoryHandle {}
336
337#[derive(Debug, Clone, Serialize, Deserialize, Default)]
339pub enum AllocationStrategy {
340 #[default]
342 FirstFit,
343 BestFit,
345 WorstFit,
347 NextFit,
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 #[test]
356 fn test_memory_pool_creation() {
357 let config = MemoryPoolConfig::default();
358 let pool = MemoryPool::new(config);
359
360 assert_eq!(pool.pool_size(), 0);
361 assert_eq!(pool.total_allocated(), 0);
362 }
363
364 #[test]
365 fn test_memory_allocation() {
366 let config = MemoryPoolConfig::default();
367 let pool = MemoryPool::new(config);
368
369 let handle = pool.allocate(1024).unwrap();
370 assert_eq!(handle.size(), 1024);
371 assert_eq!(pool.total_allocated(), 1);
372 }
373
374 #[test]
375 fn test_memory_pool_stats() {
376 let config = MemoryPoolConfig::default();
377 let pool = MemoryPool::new(config);
378
379 let stats = pool.stats();
380 assert_eq!(stats.current_allocated, 0);
381 assert_eq!(stats.peak_allocated, 0);
382 }
383
384 #[test]
385 fn test_memory_pool_compaction() {
386 let config = MemoryPoolConfig::default();
387 let pool = MemoryPool::new(config);
388
389 assert!(pool.compact().is_ok());
390 }
391}