dataforge/memory/
mod.rs

1//! 内存管理模块
2//! 
3//! 提供高效的内存池管理和零拷贝技术
4
5use std::collections::HashMap;
6use std::sync::{Arc, Mutex, RwLock};
7use std::time::{Duration, Instant};
8
9/// 内存池配置
10#[derive(Debug, Clone)]
11pub struct MemoryPoolConfig {
12    /// 初始池大小
13    pub initial_size: usize,
14    /// 最大池大小
15    pub max_size: usize,
16    /// 缓冲区大小分类
17    pub size_classes: Vec<usize>,
18    /// 清理间隔
19    pub cleanup_interval: Duration,
20    /// 最大空闲时间
21    pub max_idle_time: Duration,
22}
23
24impl Default for MemoryPoolConfig {
25    fn default() -> Self {
26        Self {
27            initial_size: 10,
28            max_size: 1000,
29            size_classes: vec![64, 256, 1024, 4096, 16384, 65536],
30            cleanup_interval: Duration::from_secs(60),
31            max_idle_time: Duration::from_secs(300),
32        }
33    }
34}
35
36/// 缓冲区元数据
37#[derive(Debug)]
38struct BufferMetadata {
39    /// 创建时间
40    #[allow(dead_code)]
41    created_at: Instant,
42    /// 最后使用时间
43    last_used: Instant,
44    /// 使用次数
45    use_count: usize,
46}
47
48/// 内存池
49pub struct MemoryPool {
50    /// 按大小分类的缓冲区池
51    pools: RwLock<HashMap<usize, Vec<(Arc<[u8]>, BufferMetadata)>>>,
52    /// 配置
53    config: MemoryPoolConfig,
54    /// 统计信息
55    stats: Arc<Mutex<MemoryStats>>,
56    /// 最后清理时间
57    last_cleanup: Mutex<Instant>,
58}
59
60/// 内存统计信息
61#[derive(Debug, Default)]
62pub struct MemoryStats {
63    /// 总分配次数
64    pub total_allocations: usize,
65    /// 总释放次数
66    pub total_deallocations: usize,
67    /// 缓存命中次数
68    pub cache_hits: usize,
69    /// 缓存未命中次数
70    pub cache_misses: usize,
71    /// 当前活跃缓冲区数量
72    pub active_buffers: usize,
73    /// 池中缓冲区数量
74    pub pooled_buffers: usize,
75    /// 总内存使用量
76    pub total_memory_bytes: usize,
77}
78
79impl MemoryPool {
80    /// 创建新的内存池
81    pub fn new(config: MemoryPoolConfig) -> Self {
82        let mut pools = HashMap::new();
83        
84        // 为每个大小类别预分配缓冲区
85        for &size in &config.size_classes {
86            let mut buffers = Vec::with_capacity(config.initial_size);
87            for _ in 0..config.initial_size {
88                let buffer = Arc::from(vec![0u8; size].into_boxed_slice());
89                let metadata = BufferMetadata {
90                    created_at: Instant::now(),
91                    last_used: Instant::now(),
92                    use_count: 0,
93                };
94                buffers.push((buffer, metadata));
95            }
96            pools.insert(size, buffers);
97        }
98
99        Self {
100            pools: RwLock::new(pools),
101            config,
102            stats: Arc::new(Mutex::new(MemoryStats::default())),
103            last_cleanup: Mutex::new(Instant::now()),
104        }
105    }
106
107    /// 分配指定大小的缓冲区
108    pub fn allocate(&self, size: usize) -> Arc<[u8]> {
109        let size_class = self.find_size_class(size);
110        let mut stats = self.stats.lock().unwrap();
111        stats.total_allocations += 1;
112
113        // 尝试从池中获取缓冲区
114        if let Some(buffer) = self.try_get_from_pool(size_class) {
115            stats.cache_hits += 1;
116            stats.active_buffers += 1;
117            return buffer;
118        }
119
120        // 池中没有可用缓冲区,创建新的
121        stats.cache_misses += 1;
122        stats.active_buffers += 1;
123        stats.total_memory_bytes += size_class;
124        
125        Arc::from(vec![0u8; size_class].into_boxed_slice())
126    }
127
128    /// 释放缓冲区
129    pub fn deallocate(&self, buffer: Arc<[u8]>) {
130        let size = buffer.len();
131        let mut stats = self.stats.lock().unwrap();
132        stats.total_deallocations += 1;
133        stats.active_buffers = stats.active_buffers.saturating_sub(1);
134
135        // 检查是否应该放回池中
136        if self.should_pool_buffer(size) {
137            self.return_to_pool(buffer, size);
138            stats.pooled_buffers += 1;
139        } else {
140            stats.total_memory_bytes = stats.total_memory_bytes.saturating_sub(size);
141        }
142
143        // 定期清理
144        self.maybe_cleanup();
145    }
146
147    /// 查找合适的大小类别
148    fn find_size_class(&self, size: usize) -> usize {
149        self.config.size_classes
150            .iter()
151            .find(|&&class_size| class_size >= size)
152            .copied()
153            .unwrap_or_else(|| {
154                // 如果没有合适的大小类别,使用下一个2的幂
155                let mut class_size = 1;
156                while class_size < size {
157                    class_size <<= 1;
158                }
159                class_size
160            })
161    }
162
163    /// 尝试从池中获取缓冲区
164    fn try_get_from_pool(&self, size_class: usize) -> Option<Arc<[u8]>> {
165        let mut pools = self.pools.write().unwrap();
166        
167        if let Some(buffers) = pools.get_mut(&size_class) {
168            if let Some((buffer, mut metadata)) = buffers.pop() {
169                metadata.last_used = Instant::now();
170                metadata.use_count += 1;
171                return Some(buffer);
172            }
173        }
174        
175        None
176    }
177
178    /// 将缓冲区返回到池中
179    fn return_to_pool(&self, buffer: Arc<[u8]>, size: usize) {
180        let size_class = self.find_size_class(size);
181        let mut pools = self.pools.write().unwrap();
182        
183        let buffers = pools.entry(size_class).or_insert_with(Vec::new);
184        
185        // 检查池是否已满
186        if buffers.len() < self.config.max_size {
187            let metadata = BufferMetadata {
188                created_at: Instant::now(),
189                last_used: Instant::now(),
190                use_count: 0,
191            };
192            buffers.push((buffer, metadata));
193        }
194    }
195
196    /// 判断是否应该将缓冲区放回池中
197    fn should_pool_buffer(&self, size: usize) -> bool {
198        // 只有在配置的大小类别范围内的缓冲区才会被池化
199        self.config.size_classes.contains(&self.find_size_class(size))
200    }
201
202    /// 可能执行清理操作
203    fn maybe_cleanup(&self) {
204        let mut last_cleanup = self.last_cleanup.lock().unwrap();
205        let now = Instant::now();
206        
207        if now.duration_since(*last_cleanup) >= self.config.cleanup_interval {
208            *last_cleanup = now;
209            drop(last_cleanup); // 释放锁
210            self.cleanup_expired_buffers();
211        }
212    }
213
214    /// 清理过期的缓冲区
215    fn cleanup_expired_buffers(&self) {
216        let mut pools = self.pools.write().unwrap();
217        let now = Instant::now();
218        let mut stats = self.stats.lock().unwrap();
219        
220        for (size_class, buffers) in pools.iter_mut() {
221            let initial_count = buffers.len();
222            
223            buffers.retain(|(_, metadata)| {
224                now.duration_since(metadata.last_used) < self.config.max_idle_time
225            });
226            
227            let removed_count = initial_count - buffers.len();
228            stats.pooled_buffers = stats.pooled_buffers.saturating_sub(removed_count);
229            stats.total_memory_bytes = stats.total_memory_bytes.saturating_sub(removed_count * size_class);
230        }
231    }
232
233    /// 获取统计信息
234    pub fn stats(&self) -> MemoryStats {
235        let stats = self.stats.lock().unwrap();
236        stats.clone()
237    }
238
239    /// 重置统计信息
240    pub fn reset_stats(&self) {
241        let mut stats = self.stats.lock().unwrap();
242        *stats = MemoryStats::default();
243    }
244
245    /// 强制清理所有缓冲区
246    pub fn clear(&self) {
247        let mut pools = self.pools.write().unwrap();
248        let mut stats = self.stats.lock().unwrap();
249        
250        for buffers in pools.values_mut() {
251            buffers.clear();
252        }
253        
254        stats.pooled_buffers = 0;
255        stats.total_memory_bytes = 0;
256    }
257
258    /// 获取池状态信息
259    pub fn pool_info(&self) -> HashMap<usize, usize> {
260        let pools = self.pools.read().unwrap();
261        pools.iter()
262            .map(|(&size, buffers)| (size, buffers.len()))
263            .collect()
264    }
265}
266
267impl Default for MemoryPool {
268    fn default() -> Self {
269        Self::new(MemoryPoolConfig::default())
270    }
271}
272
273/// 零拷贝字符串池
274pub struct StringPool {
275    /// 字符串缓存
276    cache: RwLock<HashMap<String, Arc<str>>>,
277    /// 最大缓存大小
278    max_size: usize,
279    /// 访问计数
280    access_count: Mutex<HashMap<String, usize>>,
281}
282
283impl StringPool {
284    /// 创建新的字符串池
285    pub fn new(max_size: usize) -> Self {
286        Self {
287            cache: RwLock::new(HashMap::new()),
288            max_size,
289            access_count: Mutex::new(HashMap::new()),
290        }
291    }
292
293    /// 获取字符串,如果不存在则创建
294    pub fn get_or_create(&self, s: &str) -> Arc<str> {
295        // 首先尝试从缓存中获取
296        {
297            let cache = self.cache.read().unwrap();
298            if let Some(cached) = cache.get(s) {
299                // 更新访问计数
300                let mut access_count = self.access_count.lock().unwrap();
301                *access_count.entry(s.to_string()).or_insert(0) += 1;
302                return Arc::clone(cached);
303            }
304        }
305
306        // 缓存中不存在,需要创建
307        let arc_str: Arc<str> = Arc::from(s);
308        
309        {
310            let mut cache = self.cache.write().unwrap();
311            
312            // 检查缓存大小,如果超过限制则清理
313            if cache.len() >= self.max_size {
314                self.evict_least_used(&mut cache);
315            }
316            
317            cache.insert(s.to_string(), Arc::clone(&arc_str));
318        }
319
320        // 更新访问计数
321        {
322            let mut access_count = self.access_count.lock().unwrap();
323            access_count.insert(s.to_string(), 1);
324        }
325
326        arc_str
327    }
328
329    /// 清理最少使用的条目
330    fn evict_least_used(&self, cache: &mut HashMap<String, Arc<str>>) {
331        let access_count = self.access_count.lock().unwrap();
332        
333        // 找到访问次数最少的条目
334        if let Some((least_used_key, _)) = access_count
335            .iter()
336            .min_by_key(|(_, &count)| count)
337        {
338            cache.remove(least_used_key);
339        }
340    }
341
342    /// 获取缓存统计信息
343    pub fn cache_info(&self) -> (usize, usize) {
344        let cache = self.cache.read().unwrap();
345        (cache.len(), self.max_size)
346    }
347
348    /// 清空缓存
349    pub fn clear(&self) {
350        let mut cache = self.cache.write().unwrap();
351        let mut access_count = self.access_count.lock().unwrap();
352        
353        cache.clear();
354        access_count.clear();
355    }
356}
357
358impl Default for StringPool {
359    fn default() -> Self {
360        Self::new(1000)
361    }
362}
363
364impl Clone for MemoryStats {
365    fn clone(&self) -> Self {
366        Self {
367            total_allocations: self.total_allocations,
368            total_deallocations: self.total_deallocations,
369            cache_hits: self.cache_hits,
370            cache_misses: self.cache_misses,
371            active_buffers: self.active_buffers,
372            pooled_buffers: self.pooled_buffers,
373            total_memory_bytes: self.total_memory_bytes,
374        }
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381
382    #[test]
383    fn test_memory_pool_allocation() {
384        let pool = MemoryPool::default();
385        
386        let buffer = pool.allocate(1024);
387        assert_eq!(buffer.len(), 1024);
388        
389        let stats = pool.stats();
390        assert_eq!(stats.total_allocations, 1);
391        assert_eq!(stats.active_buffers, 1);
392    }
393
394    #[test]
395    fn test_memory_pool_deallocation() {
396        let pool = MemoryPool::default();
397        
398        let buffer = pool.allocate(1024);
399        pool.deallocate(buffer);
400        
401        let stats = pool.stats();
402        assert_eq!(stats.total_deallocations, 1);
403        assert_eq!(stats.active_buffers, 0);
404    }
405
406    #[test]
407    fn test_memory_pool_reuse() {
408        // 创建一个没有预分配缓冲区的内存池
409        let config = MemoryPoolConfig {
410            initial_size: 0,  // 不预分配
411            max_size: 1000,
412            size_classes: vec![64, 256, 1024, 4096, 16384, 65536],
413            cleanup_interval: Duration::from_secs(60),
414            max_idle_time: Duration::from_secs(300),
415        };
416        let pool = MemoryPool::new(config);
417        
418        // 第一次分配应该是 cache miss
419        let buffer1 = pool.allocate(1024);
420        let stats1 = pool.stats();
421        assert_eq!(stats1.cache_hits, 0);
422        assert_eq!(stats1.cache_misses, 1);
423        
424        // 释放缓冲区
425        pool.deallocate(buffer1);
426        
427        // 第二次分配应该是 cache hit(重用释放的缓冲区)
428        let _buffer2 = pool.allocate(1024);
429        let stats2 = pool.stats();
430        assert_eq!(stats2.cache_hits, 1);
431        assert_eq!(stats2.cache_misses, 1);
432    }
433
434    #[test]
435    fn test_string_pool() {
436        let pool = StringPool::new(10);
437        
438        let s1 = pool.get_or_create("hello");
439        let s2 = pool.get_or_create("hello");
440        
441        assert!(Arc::ptr_eq(&s1, &s2));
442        
443        let (cache_size, max_size) = pool.cache_info();
444        assert_eq!(cache_size, 1);
445        assert_eq!(max_size, 10);
446    }
447
448    #[test]
449    fn test_string_pool_eviction() {
450        let pool = StringPool::new(2);
451        
452        let _s1 = pool.get_or_create("hello");
453        let _s2 = pool.get_or_create("world");
454        let _s3 = pool.get_or_create("rust"); // 这应该触发清理
455        
456        let (cache_size, _) = pool.cache_info();
457        assert_eq!(cache_size, 2); // 缓存大小应该保持在限制内
458    }
459
460    #[test]
461    fn test_size_class_selection() {
462        let pool = MemoryPool::default();
463        
464        assert_eq!(pool.find_size_class(50), 64);
465        assert_eq!(pool.find_size_class(100), 256);
466        assert_eq!(pool.find_size_class(1000), 1024);
467        assert_eq!(pool.find_size_class(100000), 131072); // 下一个2的幂
468    }
469}