lumosai_vector_core/
performance.rs

1//! 性能优化模块
2//! 
3//! 提供连接池、缓存机制和性能监控功能
4
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::{RwLock, Semaphore};
9
10#[cfg(feature = "serde")]
11use serde::{Deserialize, Serialize};
12
13use crate::{Result, VectorError};
14
15/// 连接池配置
16#[derive(Debug, Clone)]
17#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
18pub struct ConnectionPoolConfig {
19    /// 最大连接数
20    pub max_connections: usize,
21    /// 最小连接数
22    pub min_connections: usize,
23    /// 连接超时时间
24    pub connection_timeout: Duration,
25    /// 空闲连接超时时间
26    pub idle_timeout: Duration,
27    /// 连接重试次数
28    pub max_retries: u32,
29}
30
31impl Default for ConnectionPoolConfig {
32    fn default() -> Self {
33        Self {
34            max_connections: 10,
35            min_connections: 2,
36            connection_timeout: Duration::from_secs(30),
37            idle_timeout: Duration::from_secs(300),
38            max_retries: 3,
39        }
40    }
41}
42
43/// 缓存配置
44#[derive(Debug, Clone)]
45#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
46pub struct CacheConfig {
47    /// 最大缓存条目数
48    pub max_entries: usize,
49    /// 缓存过期时间
50    pub ttl: Duration,
51    /// 是否启用LRU淘汰策略
52    pub enable_lru: bool,
53    /// 缓存命中率统计间隔
54    pub stats_interval: Duration,
55}
56
57impl Default for CacheConfig {
58    fn default() -> Self {
59        Self {
60            max_entries: 1000,
61            ttl: Duration::from_secs(3600),
62            enable_lru: true,
63            stats_interval: Duration::from_secs(60),
64        }
65    }
66}
67
68/// 连接池
69pub struct ConnectionPool<T> {
70    config: ConnectionPoolConfig,
71    connections: Arc<RwLock<Vec<PooledConnection<T>>>>,
72    semaphore: Arc<Semaphore>,
73    stats: Arc<RwLock<ConnectionPoolStats>>,
74}
75
76/// 池化连接
77pub struct PooledConnection<T> {
78    connection: T,
79    created_at: Instant,
80    last_used: Instant,
81    is_active: bool,
82}
83
84/// 连接池统计信息
85#[derive(Debug, Default, Clone)]
86pub struct ConnectionPoolStats {
87    pub total_connections: usize,
88    pub active_connections: usize,
89    pub idle_connections: usize,
90    pub total_requests: u64,
91    pub successful_requests: u64,
92    pub failed_requests: u64,
93    pub average_wait_time: Duration,
94}
95
96impl<T> ConnectionPool<T> {
97    /// 创建新的连接池
98    pub fn new(config: ConnectionPoolConfig) -> Self {
99        let semaphore = Arc::new(Semaphore::new(config.max_connections));
100        
101        Self {
102            config,
103            connections: Arc::new(RwLock::new(Vec::new())),
104            semaphore,
105            stats: Arc::new(RwLock::new(ConnectionPoolStats::default())),
106        }
107    }
108    
109    /// 获取连接
110    pub async fn get_connection(&self) -> Result<PooledConnection<T>>
111    where
112        T: Clone,
113    {
114        let start_time = Instant::now();
115        
116        // 等待可用连接槽位
117        let _permit = self.semaphore.acquire().await
118            .map_err(|e| VectorError::ConnectionFailed(format!("Failed to acquire connection: {}", e)))?;
119        
120        let mut connections = self.connections.write().await;
121        
122        // 查找可用连接
123        if let Some(pos) = connections.iter().position(|conn| !conn.is_active) {
124            let mut conn = connections.remove(pos);
125            conn.is_active = true;
126            conn.last_used = Instant::now();
127            
128            // 更新统计信息
129            let mut stats = self.stats.write().await;
130            stats.total_requests += 1;
131            stats.successful_requests += 1;
132            stats.average_wait_time = start_time.elapsed();
133            
134            return Ok(conn);
135        }
136        
137        // 如果没有可用连接且未达到最大连接数,创建新连接
138        if connections.len() < self.config.max_connections {
139            // 这里需要实际的连接创建逻辑,暂时返回错误
140            return Err(VectorError::ConnectionFailed("Connection creation not implemented".to_string()));
141        }
142
143        Err(VectorError::ConnectionFailed("No available connections".to_string()))
144    }
145    
146    /// 归还连接
147    pub async fn return_connection(&self, mut connection: PooledConnection<T>) {
148        connection.is_active = false;
149        connection.last_used = Instant::now();
150        
151        let mut connections = self.connections.write().await;
152        connections.push(connection);
153    }
154    
155    /// 获取连接池统计信息
156    pub async fn get_stats(&self) -> ConnectionPoolStats {
157        self.stats.read().await.clone()
158    }
159    
160    /// 清理过期连接
161    pub async fn cleanup_expired_connections(&self) {
162        let mut connections = self.connections.write().await;
163        let now = Instant::now();
164        
165        connections.retain(|conn| {
166            !conn.is_active && now.duration_since(conn.last_used) < self.config.idle_timeout
167        });
168    }
169}
170
171/// 缓存条目
172#[derive(Debug, Clone)]
173pub struct CacheEntry<T> {
174    pub value: T,
175    pub created_at: Instant,
176    pub last_accessed: Instant,
177    pub access_count: u64,
178}
179
180/// LRU缓存
181pub struct LRUCache<K, V> {
182    config: CacheConfig,
183    entries: Arc<RwLock<HashMap<K, CacheEntry<V>>>>,
184    stats: Arc<RwLock<CacheStats>>,
185}
186
187/// 缓存统计信息
188#[derive(Debug, Default, Clone)]
189pub struct CacheStats {
190    pub total_requests: u64,
191    pub cache_hits: u64,
192    pub cache_misses: u64,
193    pub evictions: u64,
194    pub current_size: usize,
195    pub hit_rate: f64,
196}
197
198impl<K, V> LRUCache<K, V>
199where
200    K: std::hash::Hash + Eq + Clone,
201    V: Clone,
202{
203    /// 创建新的LRU缓存
204    pub fn new(config: CacheConfig) -> Self {
205        Self {
206            config,
207            entries: Arc::new(RwLock::new(HashMap::new())),
208            stats: Arc::new(RwLock::new(CacheStats::default())),
209        }
210    }
211    
212    /// 获取缓存值
213    pub async fn get(&self, key: &K) -> Option<V> {
214        let mut stats = self.stats.write().await;
215        stats.total_requests += 1;
216        
217        let mut entries = self.entries.write().await;
218        
219        if let Some(entry) = entries.get_mut(key) {
220            let now = Instant::now();
221            
222            // 检查是否过期
223            if now.duration_since(entry.created_at) > self.config.ttl {
224                entries.remove(key);
225                stats.cache_misses += 1;
226                stats.current_size = entries.len();
227                return None;
228            }
229            
230            // 更新访问信息
231            entry.last_accessed = now;
232            entry.access_count += 1;
233            
234            stats.cache_hits += 1;
235            stats.hit_rate = stats.cache_hits as f64 / stats.total_requests as f64;
236            
237            Some(entry.value.clone())
238        } else {
239            stats.cache_misses += 1;
240            stats.hit_rate = stats.cache_hits as f64 / stats.total_requests as f64;
241            None
242        }
243    }
244    
245    /// 设置缓存值
246    pub async fn set(&self, key: K, value: V) {
247        let mut entries = self.entries.write().await;
248        let now = Instant::now();
249        
250        // 如果缓存已满,执行LRU淘汰
251        if entries.len() >= self.config.max_entries && !entries.contains_key(&key) {
252            self.evict_lru(&mut entries).await;
253        }
254        
255        let entry = CacheEntry {
256            value,
257            created_at: now,
258            last_accessed: now,
259            access_count: 1,
260        };
261        
262        entries.insert(key, entry);
263        
264        let mut stats = self.stats.write().await;
265        stats.current_size = entries.len();
266    }
267    
268    /// 删除缓存条目
269    pub async fn remove(&self, key: &K) -> Option<V> {
270        let mut entries = self.entries.write().await;
271        let result = entries.remove(key).map(|entry| entry.value);
272        
273        let mut stats = self.stats.write().await;
274        stats.current_size = entries.len();
275        
276        result
277    }
278    
279    /// 清空缓存
280    pub async fn clear(&self) {
281        let mut entries = self.entries.write().await;
282        entries.clear();
283        
284        let mut stats = self.stats.write().await;
285        stats.current_size = 0;
286    }
287    
288    /// 获取缓存统计信息
289    pub async fn get_stats(&self) -> CacheStats {
290        self.stats.read().await.clone()
291    }
292    
293    /// LRU淘汰策略
294    async fn evict_lru(&self, entries: &mut HashMap<K, CacheEntry<V>>) {
295        if entries.is_empty() {
296            return;
297        }
298        
299        // 找到最久未访问的条目
300        let mut oldest_key = None;
301        let mut oldest_time = Instant::now();
302        
303        for (key, entry) in entries.iter() {
304            if entry.last_accessed < oldest_time {
305                oldest_time = entry.last_accessed;
306                oldest_key = Some(key.clone());
307            }
308        }
309        
310        if let Some(key) = oldest_key {
311            entries.remove(&key);
312            
313            let mut stats = self.stats.write().await;
314            stats.evictions += 1;
315        }
316    }
317}
318
319/// 性能监控器
320pub struct PerformanceMonitor {
321    metrics: Arc<RwLock<PerformanceMetrics>>,
322}
323
324/// 性能指标
325#[derive(Debug, Default, Clone)]
326pub struct PerformanceMetrics {
327    pub total_operations: u64,
328    pub successful_operations: u64,
329    pub failed_operations: u64,
330    pub average_response_time: Duration,
331    pub min_response_time: Duration,
332    pub max_response_time: Duration,
333    pub operations_per_second: f64,
334    pub memory_usage_mb: f64,
335    pub cpu_usage_percent: f64,
336}
337
338impl PerformanceMonitor {
339    /// 创建新的性能监控器
340    pub fn new() -> Self {
341        Self {
342            metrics: Arc::new(RwLock::new(PerformanceMetrics::default())),
343        }
344    }
345    
346    /// 记录操作
347    pub async fn record_operation(&self, duration: Duration, success: bool) {
348        let mut metrics = self.metrics.write().await;
349        
350        metrics.total_operations += 1;
351        
352        if success {
353            metrics.successful_operations += 1;
354        } else {
355            metrics.failed_operations += 1;
356        }
357        
358        // 更新响应时间统计
359        if metrics.total_operations == 1 {
360            metrics.min_response_time = duration;
361            metrics.max_response_time = duration;
362            metrics.average_response_time = duration;
363        } else {
364            if duration < metrics.min_response_time {
365                metrics.min_response_time = duration;
366            }
367            if duration > metrics.max_response_time {
368                metrics.max_response_time = duration;
369            }
370            
371            // 计算移动平均
372            let total_time = metrics.average_response_time.as_nanos() as f64 * (metrics.total_operations - 1) as f64;
373            metrics.average_response_time = Duration::from_nanos(
374                ((total_time + duration.as_nanos() as f64) / metrics.total_operations as f64) as u64
375            );
376        }
377    }
378    
379    /// 获取性能指标
380    pub async fn get_metrics(&self) -> PerformanceMetrics {
381        self.metrics.read().await.clone()
382    }
383    
384    /// 重置指标
385    pub async fn reset_metrics(&self) {
386        let mut metrics = self.metrics.write().await;
387        *metrics = PerformanceMetrics::default();
388    }
389}